b4b5b834c5a194db4b888659b951250a5a64aefb
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs.c
23  * @brief gnunet anonymity protocol implementation
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - collect traffic data for anonymity levels > 1
28  * - implement transmission restrictions for anonymity level > 1
29  * - more statistics
30  */
31 #include "platform.h"
32 #include <float.h>
33 #include "gnunet_constants.h"
34 #include "gnunet_core_service.h"
35 #include "gnunet_dht_service.h"
36 #include "gnunet_datastore_service.h"
37 #include "gnunet_load_lib.h"
38 #include "gnunet_peer_lib.h"
39 #include "gnunet_protocols.h"
40 #include "gnunet_signatures.h"
41 #include "gnunet_statistics_service.h"
42 #include "gnunet_transport_service.h"
43 #include "gnunet_util_lib.h"
44 #include "gnunet-service-fs_indexing.h"
45 #include "fs.h"
46
47 #define DEBUG_FS GNUNET_NO
48
49 /**
50  * Should we introduce random latency in processing?  Required for proper
51  * implementation of GAP, but can be disabled for performance evaluation of
52  * the basic routing algorithm.
53  *
54  * Note that with delays enabled, performance can be significantly lower
55  * (several orders of magnitude in 2-peer test runs); if you want to
56  * measure throughput of other components, set this to NO.  Also, you
57  * might want to consider changing 'RETRY_PROBABILITY_INV' to 1 for
58  * a rather wasteful mode of operation (that might still get the highest
59  * throughput overall).
60  *
61  * Performance measurements (for 50 MB file, 2 peers):
62  *
63  * - Without delays: 3300 kb/s
64  * - With    delays:  101 kb/s
65  */
66 #define SUPPORT_DELAYS GNUNET_NO
67
68 /**
69  * Size for the hash map for DHT requests from the FS
70  * service.  Should be about the number of concurrent
71  * DHT requests we plan to make.
72  */
73 #define FS_DHT_HT_SIZE 1024
74
75 /**
76  * At what frequency should our datastore load decrease
77  * automatically (since if we don't use it, clearly the
78  * load must be going down).
79  */
80 #define DATASTORE_LOAD_AUTODECLINE GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 250)
81
82 /**
83  * How often do we flush trust values to disk?
84  */
85 #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
86
87 /**
88  * How quickly do we age cover traffic?  At the given 
89  * time interval, remaining cover traffic counters are
90  * decremented by 1/16th.
91  */
92 #define COVER_AGE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
93
94 /**
95  * How often do we at most PUT content into the DHT?
96  */
97 #define MAX_DHT_PUT_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
98
99 /**
100  * Inverse of the probability that we will submit the same query
101  * to the same peer again.  If the same peer already got the query
102  * repeatedly recently, the probability is multiplied by the inverse
103  * of this number each time.  Note that we only try about every TTL_DECREMENT/2
104  * plus MAX_CORK_DELAY (so roughly every 3.5s).
105  *
106  * Note that this factor is a key influence to performance in small
107  * networks (especially test networks of 2 peers) because if there is
108  * only a single peer with the data, this value will determine how
109  * soon we might re-try.  For example, a value of 3 can result in 
110  * 1.7 MB/s transfer rates for a 10 MB file when a value of 1 would
111  * give us 5 MB/s.  OTOH, obviously re-trying the same peer can be
112  * rather inefficient in larger networks, hence picking 1 is in 
113  * general not the best choice.
114  *
115  * Performance measurements (for 50 MB file, 2 peers, no delays):
116  *
117  * - 1: 3300 kb/s (consistently)
118  * - 3: 2046 kb/s, 754 kb/s, 3490 kb/s
119  * - 5:  759 kb/s, 968 kb/s, 1160 kb/s
120  *
121  * Note that this does NOT mean that the value should be 1 since
122  * a 2-peer network is far from representative here (and this fails
123  * to take into consideration bandwidth wasted by repeatedly 
124  * sending queries to peers that don't have the content).  Also,
125  * it is expected that higher values lead to more inconsistent
126  * measurements since this only affects lost messages towards the
127  * end of the download.
128  *
129  * Finally, we should probably consider changing this and making
130  * it dependent on the number of connected peers or a related
131  * metric (bad magic constants...).
132  */
133 #define RETRY_PROBABILITY_INV 1
134
135 /**
136  * What is the maximum delay for a P2P FS message (in our interaction
137  * with core)?  FS-internal delays are another story.  The value is
138  * chosen based on the 32k block size.  Given that peers typcially
139  * have at least 1 kb/s bandwidth, 45s waits give us a chance to
140  * transmit one message even to the lowest-bandwidth peers.
141  */
142 #define MAX_TRANSMIT_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 45)
143
144 /**
145  * Maximum number of requests (from other peers, overall) that we're
146  * willing to have pending at any given point in time.  Can be changed
147  * via the configuration file (32k is just the default).
148  */
149 static unsigned long long max_pending_requests = (32 * 1024);
150
151
152 /**
153  * Information we keep for each pending reply.  The
154  * actual message follows at the end of this struct.
155  */
156 struct PendingMessage;
157
158 /**
159  * Function called upon completion of a transmission.
160  *
161  * @param cls closure
162  * @param pid ID of receiving peer, 0 on transmission error
163  */
164 typedef void (*TransmissionContinuation)(void * cls, 
165                                          GNUNET_PEER_Id tpid);
166
167
168 /**
169  * Information we keep for each pending message (GET/PUT).  The
170  * actual message follows at the end of this struct.
171  */
172 struct PendingMessage
173 {
174   /**
175    * This is a doubly-linked list of messages to the same peer.
176    */
177   struct PendingMessage *next;
178
179   /**
180    * This is a doubly-linked list of messages to the same peer.
181    */
182   struct PendingMessage *prev;
183
184   /**
185    * Entry in pending message list for this pending message.
186    */ 
187   struct PendingMessageList *pml;  
188
189   /**
190    * Function to call immediately once we have transmitted this
191    * message.
192    */
193   TransmissionContinuation cont;
194
195   /**
196    * Closure for cont.
197    */
198   void *cont_cls;
199
200   /**
201    * Do not transmit this pending message until this deadline.
202    */
203   struct GNUNET_TIME_Absolute delay_until;
204
205   /**
206    * Size of the reply; actual reply message follows
207    * at the end of this struct.
208    */
209   size_t msize;
210   
211   /**
212    * How important is this message for us?
213    */
214   uint32_t priority;
215  
216 };
217
218
219 /**
220  * Information about a peer that we are connected to.
221  * We track data that is useful for determining which
222  * peers should receive our requests.  We also keep
223  * a list of messages to transmit to this peer.
224  */
225 struct ConnectedPeer
226 {
227
228   /**
229    * List of the last clients for which this peer successfully
230    * answered a query.
231    */
232   struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
233
234   /**
235    * List of the last PIDs for which
236    * this peer successfully answered a query;
237    * We use 0 to indicate no successful reply.
238    */
239   GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
240
241   /**
242    * Average delay between sending the peer a request and
243    * getting a reply (only calculated over the requests for
244    * which we actually got a reply).   Calculated
245    * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
246    */ 
247   struct GNUNET_TIME_Relative avg_delay;
248
249   /**
250    * Point in time until which this peer does not want us to migrate content
251    * to it.
252    */
253   struct GNUNET_TIME_Absolute migration_blocked;
254
255   /**
256    * Time until when we blocked this peer from migrating
257    * data to us.
258    */
259   struct GNUNET_TIME_Absolute last_migration_block;
260
261   /**
262    * Transmission times for the last MAX_QUEUE_PER_PEER
263    * requests for this peer.  Used as a ring buffer, current
264    * offset is stored in 'last_request_times_off'.  If the
265    * oldest entry is more recent than the 'avg_delay', we should
266    * not send any more requests right now.
267    */
268   struct GNUNET_TIME_Absolute last_request_times[MAX_QUEUE_PER_PEER];
269
270   /**
271    * Handle for an active request for transmission to this
272    * peer, or NULL.
273    */
274   struct GNUNET_CORE_TransmitHandle *cth;
275
276   /**
277    * Messages (replies, queries, content migration) we would like to
278    * send to this peer in the near future.  Sorted by priority, head.
279    */
280   struct PendingMessage *pending_messages_head;
281
282   /**
283    * Messages (replies, queries, content migration) we would like to
284    * send to this peer in the near future.  Sorted by priority, tail.
285    */
286   struct PendingMessage *pending_messages_tail;
287
288   /**
289    * How long does it typically take for us to transmit a message
290    * to this peer?  (delay between the request being issued and
291    * the callback being invoked).
292    */
293   struct GNUNET_LOAD_Value *transmission_delay;
294
295   /**
296    * Context of our GNUNET_CORE_peer_change_preference call (or NULL).
297    */
298   struct GNUNET_CORE_InformationRequestContext *irc;
299
300   /**
301    * Request for which 'irc' is currently active (or NULL).
302    */
303   struct PendingRequest *pr;
304
305   /**
306    * Time when the last transmission request was issued.
307    */
308   struct GNUNET_TIME_Absolute last_transmission_request_start;
309
310   /**
311    * ID of delay task for scheduling transmission.
312    */
313   GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task;
314
315   /**
316    * Average priority of successful replies.  Calculated
317    * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
318    */
319   double avg_priority;
320
321   /**
322    * Increase in traffic preference still to be submitted
323    * to the core service for this peer.
324    */
325   uint64_t inc_preference;
326
327   /**
328    * Trust rating for this peer
329    */
330   uint32_t trust;
331
332   /**
333    * Trust rating for this peer on disk.
334    */
335   uint32_t disk_trust;
336
337   /**
338    * The peer's identity.
339    */
340   GNUNET_PEER_Id pid;  
341
342   /**
343    * Size of the linked list of 'pending_messages'.
344    */
345   unsigned int pending_requests;
346
347   /**
348    * Which offset in "last_p2p_replies" will be updated next?
349    * (we go round-robin).
350    */
351   unsigned int last_p2p_replies_woff;
352
353   /**
354    * Which offset in "last_client_replies" will be updated next?
355    * (we go round-robin).
356    */
357   unsigned int last_client_replies_woff;
358
359   /**
360    * Current offset into 'last_request_times' ring buffer.
361    */
362   unsigned int last_request_times_off;
363
364 };
365
366
367 /**
368  * Information we keep for each pending request.  We should try to
369  * keep this struct as small as possible since its memory consumption
370  * is key to how many requests we can have pending at once.
371  */
372 struct PendingRequest;
373
374
375 /**
376  * Doubly-linked list of requests we are performing
377  * on behalf of the same client.
378  */
379 struct ClientRequestList
380 {
381
382   /**
383    * This is a doubly-linked list.
384    */
385   struct ClientRequestList *next;
386
387   /**
388    * This is a doubly-linked list.
389    */
390   struct ClientRequestList *prev;
391
392   /**
393    * Request this entry represents.
394    */
395   struct PendingRequest *req;
396
397   /**
398    * Client list this request belongs to.
399    */
400   struct ClientList *client_list;
401
402 };
403
404
405 /**
406  * Replies to be transmitted to the client.  The actual
407  * response message is allocated after this struct.
408  */
409 struct ClientResponseMessage
410 {
411   /**
412    * This is a doubly-linked list.
413    */
414   struct ClientResponseMessage *next;
415
416   /**
417    * This is a doubly-linked list.
418    */
419   struct ClientResponseMessage *prev;
420
421   /**
422    * Client list entry this response belongs to.
423    */
424   struct ClientList *client_list;
425
426   /**
427    * Number of bytes in the response.
428    */
429   size_t msize;
430 };
431
432
433 /**
434  * Linked list of clients we are performing requests
435  * for right now.
436  */
437 struct ClientList
438 {
439   /**
440    * This is a linked list.
441    */
442   struct ClientList *next;
443
444   /**
445    * ID of a client making a request, NULL if this entry is for a
446    * peer.
447    */
448   struct GNUNET_SERVER_Client *client;
449
450   /**
451    * Head of list of requests performed on behalf
452    * of this client right now.
453    */
454   struct ClientRequestList *rl_head;
455
456   /**
457    * Tail of list of requests performed on behalf
458    * of this client right now.
459    */
460   struct ClientRequestList *rl_tail;
461
462   /**
463    * Head of linked list of responses.
464    */
465   struct ClientResponseMessage *res_head;
466
467   /**
468    * Tail of linked list of responses.
469    */
470   struct ClientResponseMessage *res_tail;
471
472   /**
473    * Context for sending replies.
474    */
475   struct GNUNET_CONNECTION_TransmitHandle *th;
476
477 };
478
479
480 /**
481  * Information about a peer that we have forwarded this
482  * request to already.  
483  */
484 struct UsedTargetEntry
485 {
486   /**
487    * What was the last time we have transmitted this request to this
488    * peer?
489    */
490   struct GNUNET_TIME_Absolute last_request_time;
491
492   /**
493    * How often have we transmitted this request to this peer?
494    */
495   unsigned int num_requests;
496
497   /**
498    * PID of the target peer.
499    */
500   GNUNET_PEER_Id pid;
501
502 };
503
504
505 /**
506  * Doubly-linked list of messages we are performing
507  * due to a pending request.
508  */
509 struct PendingMessageList
510 {
511
512   /**
513    * This is a doubly-linked list of messages on behalf of the same request.
514    */
515   struct PendingMessageList *next;
516
517   /**
518    * This is a doubly-linked list of messages on behalf of the same request.
519    */
520   struct PendingMessageList *prev;
521
522   /**
523    * Message this entry represents.
524    */
525   struct PendingMessage *pm;
526
527   /**
528    * Request this entry belongs to.
529    */
530   struct PendingRequest *req;
531
532   /**
533    * Peer this message is targeted for.
534    */
535   struct ConnectedPeer *target;
536
537 };
538
539
540 /**
541  * Information we keep for each pending request.  We should try to
542  * keep this struct as small as possible since its memory consumption
543  * is key to how many requests we can have pending at once.
544  */
545 struct PendingRequest
546 {
547
548   /**
549    * If this request was made by a client, this is our entry in the
550    * client request list; otherwise NULL.
551    */
552   struct ClientRequestList *client_request_list;
553
554   /**
555    * Entry of peer responsible for this entry (if this request
556    * was made by a peer).
557    */
558   struct ConnectedPeer *cp;
559
560   /**
561    * If this is a namespace query, pointer to the hash of the public
562    * key of the namespace; otherwise NULL.  Pointer will be to the 
563    * end of this struct (so no need to free it).
564    */
565   const GNUNET_HashCode *namespace;
566
567   /**
568    * Bloomfilter we use to filter out replies that we don't care about
569    * (anymore).  NULL as long as we are interested in all replies.
570    */
571   struct GNUNET_CONTAINER_BloomFilter *bf;
572
573   /**
574    * Reference to DHT get operation for this request (or NULL).
575    */
576   struct GNUNET_DHT_GetHandle *dht_get;
577
578   /**
579    * Context of our GNUNET_CORE_peer_change_preference call.
580    */
581   struct ConnectedPeer *pirc;
582
583   /**
584    * Hash code of all replies that we have seen so far (only valid
585    * if client is not NULL since we only track replies like this for
586    * our own clients).
587    */
588   GNUNET_HashCode *replies_seen;
589
590   /**
591    * Node in the heap representing this entry; NULL
592    * if we have no heap node.
593    */
594   struct GNUNET_CONTAINER_HeapNode *hnode;
595
596   /**
597    * Head of list of messages being performed on behalf of this
598    * request.
599    */
600   struct PendingMessageList *pending_head;
601
602   /**
603    * Tail of list of messages being performed on behalf of this
604    * request.
605    */
606   struct PendingMessageList *pending_tail;
607
608   /**
609    * When did we first see this request (form this peer), or, if our
610    * client is initiating, when did we last initiate a search?
611    */
612   struct GNUNET_TIME_Absolute start_time;
613
614   /**
615    * The query that this request is for.
616    */
617   GNUNET_HashCode query;
618
619   /**
620    * The task responsible for transmitting queries
621    * for this request.
622    */
623   GNUNET_SCHEDULER_TaskIdentifier task;
624
625   /**
626    * (Interned) Peer identifier that identifies a preferred target
627    * for requests.
628    */
629   GNUNET_PEER_Id target_pid;
630
631   /**
632    * (Interned) Peer identifiers of peers that have already
633    * received our query for this content.
634    */
635   struct UsedTargetEntry *used_targets;
636   
637   /**
638    * Our entry in the queue (non-NULL while we wait for our
639    * turn to interact with the local database).
640    */
641   struct GNUNET_DATASTORE_QueueEntry *qe;
642
643   /**
644    * Size of the 'bf' (in bytes).
645    */
646   size_t bf_size;
647
648   /**
649    * Desired anonymity level; only valid for requests from a local client.
650    */
651   uint32_t anonymity_level;
652
653   /**
654    * How many entries in "used_targets" are actually valid?
655    */
656   unsigned int used_targets_off;
657
658   /**
659    * How long is the "used_targets" array?
660    */
661   unsigned int used_targets_size;
662
663   /**
664    * Number of results found for this request.
665    */
666   unsigned int results_found;
667
668   /**
669    * How many entries in "replies_seen" are actually valid?
670    */
671   unsigned int replies_seen_off;
672
673   /**
674    * How long is the "replies_seen" array?
675    */
676   unsigned int replies_seen_size;
677   
678   /**
679    * Priority with which this request was made.  If one of our clients
680    * made the request, then this is the current priority that we are
681    * using when initiating the request.  This value is used when
682    * we decide to reward other peers with trust for providing a reply.
683    */
684   uint32_t priority;
685
686   /**
687    * Priority points left for us to spend when forwarding this request
688    * to other peers.
689    */
690   uint32_t remaining_priority;
691
692   /**
693    * Number to mingle hashes for bloom-filter tests with.
694    */
695   int32_t mingle;
696
697   /**
698    * TTL with which we saw this request (or, if we initiated, TTL that
699    * we used for the request).
700    */
701   int32_t ttl;
702   
703   /**
704    * Type of the content that this request is for.
705    */
706   enum GNUNET_BLOCK_Type type;
707
708   /**
709    * Remove this request after transmission of the current response.
710    */
711   int8_t do_remove;
712
713   /**
714    * GNUNET_YES if we should not forward this request to other peers.
715    */
716   int8_t local_only;
717
718   /**
719    * GNUNET_YES if we should not forward this request to other peers.
720    */
721   int8_t forward_only;
722
723 };
724
725
726 /**
727  * Block that is ready for migration to other peers.  Actual data is at the end of the block.
728  */
729 struct MigrationReadyBlock
730 {
731
732   /**
733    * This is a doubly-linked list.
734    */
735   struct MigrationReadyBlock *next;
736
737   /**
738    * This is a doubly-linked list.
739    */
740   struct MigrationReadyBlock *prev;
741
742   /**
743    * Query for the block.
744    */
745   GNUNET_HashCode query;
746
747   /**
748    * When does this block expire? 
749    */
750   struct GNUNET_TIME_Absolute expiration;
751
752   /**
753    * Peers we would consider forwarding this
754    * block to.  Zero for empty entries.
755    */
756   GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
757
758   /**
759    * Size of the block.
760    */
761   size_t size;
762
763   /**
764    *  Number of targets already used.
765    */
766   unsigned int used_targets;
767
768   /**
769    * Type of the block.
770    */
771   enum GNUNET_BLOCK_Type type;
772 };
773
774 /**
775  * Identity of this peer.
776  */
777 static struct GNUNET_PeerIdentity my_id;
778
779 /**
780  * Our connection to the datastore.
781  */
782 static struct GNUNET_DATASTORE_Handle *dsh;
783
784 /**
785  * Our block context.
786  */
787 static struct GNUNET_BLOCK_Context *block_ctx;
788
789 /**
790  * Our block configuration.
791  */
792 static struct GNUNET_CONFIGURATION_Handle *block_cfg;
793
794 /**
795  * Our configuration.
796  */
797 static const struct GNUNET_CONFIGURATION_Handle *cfg;
798
799 /**
800  * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
801  */
802 static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
803
804 /**
805  * Map of peer identifiers to "struct PendingRequest" (for that peer).
806  */
807 static struct GNUNET_CONTAINER_MultiHashMap *peer_request_map;
808
809 /**
810  * Map of query identifiers to "struct PendingRequest" (for that query).
811  */
812 static struct GNUNET_CONTAINER_MultiHashMap *query_request_map;
813
814 /**
815  * Heap with the request that will expire next at the top.  Contains
816  * pointers of type "struct PendingRequest*"; these will *also* be
817  * aliased from the "requests_by_peer" data structures and the
818  * "requests_by_query" table.  Note that requests from our clients
819  * don't expire and are thus NOT in the "requests_by_expiration"
820  * (or the "requests_by_peer" tables).
821  */
822 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
823
824 /**
825  * Handle for reporting statistics.
826  */
827 static struct GNUNET_STATISTICS_Handle *stats;
828
829 /**
830  * Linked list of clients we are currently processing requests for.
831  */
832 static struct ClientList *client_list;
833
834 /**
835  * Pointer to handle to the core service (points to NULL until we've
836  * connected to it).
837  */
838 static struct GNUNET_CORE_Handle *core;
839
840 /**
841  * Head of linked list of blocks that can be migrated.
842  */
843 static struct MigrationReadyBlock *mig_head;
844
845 /**
846  * Tail of linked list of blocks that can be migrated.
847  */
848 static struct MigrationReadyBlock *mig_tail;
849
850 /**
851  * Request to datastore for migration (or NULL).
852  */
853 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
854
855 /**
856  * Request to datastore for DHT PUTs (or NULL).
857  */
858 static struct GNUNET_DATASTORE_QueueEntry *dht_qe;
859
860 /**
861  * Type we will request for the next DHT PUT round from the datastore.
862  */
863 static enum GNUNET_BLOCK_Type dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
864
865 /**
866  * Where do we store trust information?
867  */
868 static char *trustDirectory;
869
870 /**
871  * ID of task that collects blocks for migration.
872  */
873 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
874
875 /**
876  * ID of task that collects blocks for DHT PUTs.
877  */
878 static GNUNET_SCHEDULER_TaskIdentifier dht_task;
879
880 /**
881  * What is the maximum frequency at which we are allowed to
882  * poll the datastore for migration content?
883  */
884 static struct GNUNET_TIME_Relative min_migration_delay;
885
886 /**
887  * Handle for DHT operations.
888  */
889 static struct GNUNET_DHT_Handle *dht_handle;
890
891 /**
892  * Size of the doubly-linked list of migration blocks.
893  */
894 static unsigned int mig_size;
895
896 /**
897  * Are we allowed to migrate content to this peer.
898  */
899 static int active_to_migration;
900
901 /**
902  * Are we allowed to push out content from this peer.
903  */
904 static int active_from_migration;
905
906 /**
907  * How many entires with zero anonymity do we currently estimate
908  * to have in the database?
909  */
910 static unsigned int zero_anonymity_count_estimate;
911
912 /**
913  * Typical priorities we're seeing from other peers right now.  Since
914  * most priorities will be zero, this value is the weighted average of
915  * non-zero priorities seen "recently".  In order to ensure that new
916  * values do not dramatically change the ratio, values are first
917  * "capped" to a reasonable range (+N of the current value) and then
918  * averaged into the existing value by a ratio of 1:N.  Hence
919  * receiving the largest possible priority can still only raise our
920  * "current_priorities" by at most 1.
921  */
922 static double current_priorities;
923
924 /**
925  * Datastore 'GET' load tracking.
926  */
927 static struct GNUNET_LOAD_Value *datastore_get_load;
928
929 /**
930  * Datastore 'PUT' load tracking.
931  */
932 static struct GNUNET_LOAD_Value *datastore_put_load;
933
934 /**
935  * How long do requests typically stay in the routing table?
936  */
937 static struct GNUNET_LOAD_Value *rt_entry_lifetime;
938
939 /**
940  * How many query messages have we received 'recently' that 
941  * have not yet been claimed as cover traffic?
942  */
943 static unsigned int cover_query_count;
944
945 /**
946  * How many content messages have we received 'recently' that 
947  * have not yet been claimed as cover traffic?
948  */
949 static unsigned int cover_content_count;
950
951 /**
952  * ID of our task that we use to age the cover counters.
953  */
954 static GNUNET_SCHEDULER_TaskIdentifier cover_age_task;
955
956 static void
957 age_cover_counters (void *cls,
958                     const struct GNUNET_SCHEDULER_TaskContext *tc)
959 {
960   cover_content_count = (cover_content_count * 15) / 16;
961   cover_query_count = (cover_query_count * 15) / 16;
962   cover_age_task = GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY,
963                                                  &age_cover_counters,
964                                                  NULL);
965 }
966
967 /**
968  * We've just now completed a datastore request.  Update our
969  * datastore load calculations.
970  *
971  * @param start time when the datastore request was issued
972  */
973 static void
974 update_datastore_delays (struct GNUNET_TIME_Absolute start)
975 {
976   struct GNUNET_TIME_Relative delay;
977
978   delay = GNUNET_TIME_absolute_get_duration (start);
979   GNUNET_LOAD_update (datastore_get_load,
980                       delay.rel_value);
981 }
982
983
984 /**
985  * Get the filename under which we would store the GNUNET_HELLO_Message
986  * for the given host and protocol.
987  * @return filename of the form DIRECTORY/HOSTID
988  */
989 static char *
990 get_trust_filename (const struct GNUNET_PeerIdentity *id)
991 {
992   struct GNUNET_CRYPTO_HashAsciiEncoded fil;
993   char *fn;
994
995   GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
996   GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil);
997   return fn;
998 }
999
1000
1001
1002 /**
1003  * Transmit messages by copying it to the target buffer
1004  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
1005  * for writing in the meantime.  In that case, do nothing
1006  * (the disconnect or shutdown handler will take care of the rest).
1007  * If we were able to transmit messages and there are still more
1008  * pending, ask core again for further calls to this function.
1009  *
1010  * @param cls closure, pointer to the 'struct ConnectedPeer*'
1011  * @param size number of bytes available in buf
1012  * @param buf where the callee should write the message
1013  * @return number of bytes written to buf
1014  */
1015 static size_t
1016 transmit_to_peer (void *cls,
1017                   size_t size, void *buf);
1018
1019
1020 /* ******************* clean up functions ************************ */
1021
1022 /**
1023  * Delete the given migration block.
1024  *
1025  * @param mb block to delete
1026  */
1027 static void
1028 delete_migration_block (struct MigrationReadyBlock *mb)
1029 {
1030   GNUNET_CONTAINER_DLL_remove (mig_head,
1031                                mig_tail,
1032                                mb);
1033   GNUNET_PEER_decrement_rcs (mb->target_list,
1034                              MIGRATION_LIST_SIZE);
1035   mig_size--;
1036   GNUNET_free (mb);
1037 }
1038
1039
1040 /**
1041  * Compare the distance of two peers to a key.
1042  *
1043  * @param key key
1044  * @param p1 first peer
1045  * @param p2 second peer
1046  * @return GNUNET_YES if P1 is closer to key than P2
1047  */
1048 static int
1049 is_closer (const GNUNET_HashCode *key,
1050            const struct GNUNET_PeerIdentity *p1,
1051            const struct GNUNET_PeerIdentity *p2)
1052 {
1053   return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey,
1054                                     &p2->hashPubKey,
1055                                     key);
1056 }
1057
1058
1059 /**
1060  * Consider migrating content to a given peer.
1061  *
1062  * @param cls 'struct MigrationReadyBlock*' to select
1063  *            targets for (or NULL for none)
1064  * @param key ID of the peer 
1065  * @param value 'struct ConnectedPeer' of the peer
1066  * @return GNUNET_YES (always continue iteration)
1067  */
1068 static int
1069 consider_migration (void *cls,
1070                     const GNUNET_HashCode *key,
1071                     void *value)
1072 {
1073   struct MigrationReadyBlock *mb = cls;
1074   struct ConnectedPeer *cp = value;
1075   struct MigrationReadyBlock *pos;
1076   struct GNUNET_PeerIdentity cppid;
1077   struct GNUNET_PeerIdentity otherpid;
1078   struct GNUNET_PeerIdentity worstpid;
1079   size_t msize;
1080   unsigned int i;
1081   unsigned int repl;
1082   
1083   /* consider 'cp' as a migration target for mb */
1084   if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).rel_value > 0)
1085     return GNUNET_YES; /* peer has requested no migration! */
1086   if (mb != NULL)
1087     {
1088       GNUNET_PEER_resolve (cp->pid,
1089                            &cppid);
1090       repl = MIGRATION_LIST_SIZE;
1091       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1092         {
1093           if (mb->target_list[i] == 0)
1094             {
1095               mb->target_list[i] = cp->pid;
1096               GNUNET_PEER_change_rc (mb->target_list[i], 1);
1097               repl = MIGRATION_LIST_SIZE;
1098               break;
1099             }
1100           GNUNET_PEER_resolve (mb->target_list[i],
1101                                &otherpid);
1102           if ( (repl == MIGRATION_LIST_SIZE) &&
1103                is_closer (&mb->query,
1104                           &cppid,
1105                           &otherpid)) 
1106             {
1107               repl = i;
1108               worstpid = otherpid;
1109             }
1110           else if ( (repl != MIGRATION_LIST_SIZE) &&
1111                     (is_closer (&mb->query,
1112                                 &worstpid,
1113                                 &otherpid) ) )
1114             {
1115               repl = i;
1116               worstpid = otherpid;
1117             }       
1118         }
1119       if (repl != MIGRATION_LIST_SIZE) 
1120         {
1121           GNUNET_PEER_change_rc (mb->target_list[repl], -1);
1122           mb->target_list[repl] = cp->pid;
1123           GNUNET_PEER_change_rc (mb->target_list[repl], 1);
1124         }
1125     }
1126
1127   /* consider scheduling transmission to cp for content migration */
1128   if (cp->cth != NULL)        
1129     return GNUNET_YES; 
1130   msize = 0;
1131   pos = mig_head;
1132   while (pos != NULL)
1133     {
1134       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1135         {
1136           if (cp->pid == pos->target_list[i])
1137             {
1138               if (msize == 0)
1139                 msize = pos->size;
1140               else
1141                 msize = GNUNET_MIN (msize,
1142                                     pos->size);
1143               break;
1144             }
1145         }
1146       pos = pos->next;
1147     }
1148   if (msize == 0)
1149     return GNUNET_YES; /* no content available */
1150 #if DEBUG_FS
1151   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1152               "Trying to migrate at least %u bytes to peer `%s'\n",
1153               msize,
1154               GNUNET_h2s (key));
1155 #endif
1156   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
1157     {
1158       GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task);
1159       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
1160     }
1161   cp->cth 
1162     = GNUNET_CORE_notify_transmit_ready (core,
1163                                          0, GNUNET_TIME_UNIT_FOREVER_REL,
1164                                          (const struct GNUNET_PeerIdentity*) key,
1165                                          msize + sizeof (struct PutMessage),
1166                                          &transmit_to_peer,
1167                                          cp);
1168   return GNUNET_YES;
1169 }
1170
1171
1172 /**
1173  * Task that is run periodically to obtain blocks for content
1174  * migration
1175  * 
1176  * @param cls unused
1177  * @param tc scheduler context (also unused)
1178  */
1179 static void
1180 gather_migration_blocks (void *cls,
1181                          const struct GNUNET_SCHEDULER_TaskContext *tc);
1182
1183
1184
1185
1186 /**
1187  * Task that is run periodically to obtain blocks for DHT PUTs.
1188  * 
1189  * @param cls type of blocks to gather
1190  * @param tc scheduler context (unused)
1191  */
1192 static void
1193 gather_dht_put_blocks (void *cls,
1194                        const struct GNUNET_SCHEDULER_TaskContext *tc);
1195
1196
1197 /**
1198  * If the migration task is not currently running, consider
1199  * (re)scheduling it with the appropriate delay.
1200  */
1201 static void
1202 consider_migration_gathering ()
1203 {
1204   struct GNUNET_TIME_Relative delay;
1205
1206   if (dsh == NULL)
1207     return;
1208   if (mig_qe != NULL)
1209     return;
1210   if (mig_task != GNUNET_SCHEDULER_NO_TASK)
1211     return;
1212   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
1213                                          mig_size);
1214   delay = GNUNET_TIME_relative_divide (delay,
1215                                        MAX_MIGRATION_QUEUE);
1216   delay = GNUNET_TIME_relative_max (delay,
1217                                     min_migration_delay);
1218   mig_task = GNUNET_SCHEDULER_add_delayed (delay,
1219                                            &gather_migration_blocks,
1220                                            NULL);
1221 }
1222
1223
1224 /**
1225  * If the DHT PUT gathering task is not currently running, consider
1226  * (re)scheduling it with the appropriate delay.
1227  */
1228 static void
1229 consider_dht_put_gathering (void *cls)
1230 {
1231   struct GNUNET_TIME_Relative delay;
1232
1233   if (dsh == NULL)
1234     return;
1235   if (dht_qe != NULL)
1236     return;
1237   if (dht_task != GNUNET_SCHEDULER_NO_TASK)
1238     return;
1239   if (zero_anonymity_count_estimate > 0)
1240     {
1241       delay = GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY,
1242                                            zero_anonymity_count_estimate);
1243       delay = GNUNET_TIME_relative_min (delay,
1244                                         MAX_DHT_PUT_FREQ);
1245     }
1246   else
1247     {
1248       /* if we have NO zero-anonymity content yet, wait 5 minutes for some to
1249          (hopefully) appear */
1250       delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5);
1251     }
1252   dht_task = GNUNET_SCHEDULER_add_delayed (delay,
1253                                            &gather_dht_put_blocks,
1254                                            cls);
1255 }
1256
1257
1258 /**
1259  * Process content offered for migration.
1260  *
1261  * @param cls closure
1262  * @param key key for the content
1263  * @param size number of bytes in data
1264  * @param data content stored
1265  * @param type type of the content
1266  * @param priority priority of the content
1267  * @param anonymity anonymity-level for the content
1268  * @param expiration expiration time for the content
1269  * @param uid unique identifier for the datum;
1270  *        maybe 0 if no unique identifier is available
1271  */
1272 static void
1273 process_migration_content (void *cls,
1274                            const GNUNET_HashCode * key,
1275                            size_t size,
1276                            const void *data,
1277                            enum GNUNET_BLOCK_Type type,
1278                            uint32_t priority,
1279                            uint32_t anonymity,
1280                            struct GNUNET_TIME_Absolute
1281                            expiration, uint64_t uid)
1282 {
1283   struct MigrationReadyBlock *mb;
1284   
1285   if (key == NULL)
1286     {
1287       mig_qe = NULL;
1288       if (mig_size < MAX_MIGRATION_QUEUE)  
1289         consider_migration_gathering ();
1290       return;
1291     }
1292   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1293     {
1294       if (GNUNET_OK !=
1295           GNUNET_FS_handle_on_demand_block (key, size, data,
1296                                             type, priority, anonymity,
1297                                             expiration, uid, 
1298                                             &process_migration_content,
1299                                             NULL))
1300         {
1301           GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1302         }
1303       return;
1304     }
1305 #if DEBUG_FS
1306   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1307               "Retrieved block `%s' of type %u for migration\n",
1308               GNUNET_h2s (key),
1309               type);
1310 #endif
1311   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
1312   mb->query = *key;
1313   mb->expiration = expiration;
1314   mb->size = size;
1315   mb->type = type;
1316   memcpy (&mb[1], data, size);
1317   GNUNET_CONTAINER_DLL_insert_after (mig_head,
1318                                      mig_tail,
1319                                      mig_tail,
1320                                      mb);
1321   mig_size++;
1322   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1323                                          &consider_migration,
1324                                          mb);
1325   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1326 }
1327
1328
1329 /**
1330  * Function called upon completion of the DHT PUT operation.
1331  */
1332 static void
1333 dht_put_continuation (void *cls,
1334                       const struct GNUNET_SCHEDULER_TaskContext *tc)
1335 {
1336   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1337 }
1338
1339
1340 /**
1341  * Store content in DHT.
1342  *
1343  * @param cls closure
1344  * @param key key for the content
1345  * @param size number of bytes in data
1346  * @param data content stored
1347  * @param type type of the content
1348  * @param priority priority of the content
1349  * @param anonymity anonymity-level for the content
1350  * @param expiration expiration time for the content
1351  * @param uid unique identifier for the datum;
1352  *        maybe 0 if no unique identifier is available
1353  */
1354 static void
1355 process_dht_put_content (void *cls,
1356                          const GNUNET_HashCode * key,
1357                          size_t size,
1358                          const void *data,
1359                          enum GNUNET_BLOCK_Type type,
1360                          uint32_t priority,
1361                          uint32_t anonymity,
1362                          struct GNUNET_TIME_Absolute
1363                          expiration, uint64_t uid)
1364
1365   static unsigned int counter;
1366   static GNUNET_HashCode last_vhash;
1367   static GNUNET_HashCode vhash;
1368
1369   if (key == NULL)
1370     {
1371       dht_qe = NULL;
1372       consider_dht_put_gathering (cls);
1373       return;
1374     }
1375   /* slightly funky code to estimate the total number of values with zero
1376      anonymity from the maximum observed length of a monotonically increasing 
1377      sequence of hashes over the contents */
1378   GNUNET_CRYPTO_hash (data, size, &vhash);
1379   if (GNUNET_CRYPTO_hash_cmp (&vhash, &last_vhash) <= 0)
1380     {
1381       if (zero_anonymity_count_estimate > 0)
1382         zero_anonymity_count_estimate /= 2;
1383       counter = 0;
1384     }
1385   last_vhash = vhash;
1386   if (counter < 31)
1387     counter++;
1388   if (zero_anonymity_count_estimate < (1 << counter))
1389     zero_anonymity_count_estimate = (1 << counter);
1390 #if DEBUG_FS
1391   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1392               "Retrieved block `%s' of type %u for DHT PUT\n",
1393               GNUNET_h2s (key),
1394               type);
1395 #endif
1396   GNUNET_DHT_put (dht_handle,
1397                   key,
1398                   DEFAULT_PUT_REPLICATION,
1399                   GNUNET_DHT_RO_NONE,
1400                   type,
1401                   size,
1402                   data,
1403                   expiration,
1404                   GNUNET_TIME_UNIT_FOREVER_REL,
1405                   &dht_put_continuation,
1406                   cls);
1407 }
1408
1409
1410 /**
1411  * Task that is run periodically to obtain blocks for content
1412  * migration
1413  * 
1414  * @param cls unused
1415  * @param tc scheduler context (also unused)
1416  */
1417 static void
1418 gather_migration_blocks (void *cls,
1419                          const struct GNUNET_SCHEDULER_TaskContext *tc)
1420 {
1421   mig_task = GNUNET_SCHEDULER_NO_TASK;
1422   if (dsh != NULL)
1423     {
1424       mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, UINT_MAX,
1425                                             GNUNET_TIME_UNIT_FOREVER_REL,
1426                                             &process_migration_content, NULL);
1427       GNUNET_assert (mig_qe != NULL);
1428     }
1429 }
1430
1431
1432 /**
1433  * Task that is run periodically to obtain blocks for DHT PUTs.
1434  * 
1435  * @param cls type of blocks to gather
1436  * @param tc scheduler context (unused)
1437  */
1438 static void
1439 gather_dht_put_blocks (void *cls,
1440                        const struct GNUNET_SCHEDULER_TaskContext *tc)
1441 {
1442   dht_task = GNUNET_SCHEDULER_NO_TASK;
1443   if (dsh != NULL)
1444     {
1445       if (dht_put_type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1446         dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
1447       dht_qe = GNUNET_DATASTORE_get_zero_anonymity (dsh, 0, UINT_MAX,
1448                                                     GNUNET_TIME_UNIT_FOREVER_REL,
1449                                                     dht_put_type++,
1450                                                     &process_dht_put_content, NULL);
1451       GNUNET_assert (dht_qe != NULL);
1452     }
1453 }
1454
1455
1456 /**
1457  * We're done with a particular message list entry.
1458  * Free all associated resources.
1459  * 
1460  * @param pml entry to destroy
1461  */
1462 static void
1463 destroy_pending_message_list_entry (struct PendingMessageList *pml)
1464 {
1465   GNUNET_CONTAINER_DLL_remove (pml->req->pending_head,
1466                                pml->req->pending_tail,
1467                                pml);
1468   GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head,
1469                                pml->target->pending_messages_tail,
1470                                pml->pm);
1471   pml->target->pending_requests--;
1472   GNUNET_free (pml->pm);
1473   GNUNET_free (pml);
1474 }
1475
1476
1477 /**
1478  * Destroy the given pending message (and call the respective
1479  * continuation).
1480  *
1481  * @param pm message to destroy
1482  * @param tpid id of peer that the message was delivered to, or 0 for none
1483  */
1484 static void
1485 destroy_pending_message (struct PendingMessage *pm,
1486                          GNUNET_PEER_Id tpid)
1487 {
1488   struct PendingMessageList *pml = pm->pml;
1489   TransmissionContinuation cont;
1490   void *cont_cls;
1491
1492   cont = pm->cont;
1493   cont_cls = pm->cont_cls;
1494   if (pml != NULL)
1495     {
1496       GNUNET_assert (pml->pm == pm);
1497       GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
1498       destroy_pending_message_list_entry (pml);
1499     }
1500   else
1501     {
1502       GNUNET_free (pm);
1503     }
1504   if (cont != NULL)
1505     cont (cont_cls, tpid);  
1506 }
1507
1508
1509 /**
1510  * We're done processing a particular request.
1511  * Free all associated resources.
1512  *
1513  * @param pr request to destroy
1514  */
1515 static void
1516 destroy_pending_request (struct PendingRequest *pr)
1517 {
1518   struct GNUNET_PeerIdentity pid;
1519   unsigned int i;
1520
1521   if (pr->hnode != NULL)
1522     {
1523       GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
1524                                          pr->hnode);
1525       pr->hnode = NULL;
1526     }
1527   if (NULL == pr->client_request_list)
1528     {
1529       GNUNET_STATISTICS_update (stats,
1530                                 gettext_noop ("# P2P searches active"),
1531                                 -1,
1532                                 GNUNET_NO);
1533     }
1534   else
1535     {
1536       GNUNET_STATISTICS_update (stats,
1537                                 gettext_noop ("# client searches active"),
1538                                 -1,
1539                                 GNUNET_NO);
1540     }
1541   if (GNUNET_YES == 
1542       GNUNET_CONTAINER_multihashmap_remove (query_request_map,
1543                                             &pr->query,
1544                                             pr))
1545     {
1546       GNUNET_LOAD_update (rt_entry_lifetime,
1547                           GNUNET_TIME_absolute_get_duration (pr->start_time).rel_value);
1548     }
1549   if (pr->qe != NULL)
1550      {
1551       GNUNET_DATASTORE_cancel (pr->qe);
1552       pr->qe = NULL;
1553     }
1554   if (pr->dht_get != NULL)
1555     {
1556       GNUNET_DHT_get_stop (pr->dht_get);
1557       pr->dht_get = NULL;
1558     }
1559   if (pr->client_request_list != NULL)
1560     {
1561       GNUNET_CONTAINER_DLL_remove (pr->client_request_list->client_list->rl_head,
1562                                    pr->client_request_list->client_list->rl_tail,
1563                                    pr->client_request_list);
1564       GNUNET_free (pr->client_request_list);
1565       pr->client_request_list = NULL;
1566     }
1567   if (pr->cp != NULL)
1568     {
1569       GNUNET_PEER_resolve (pr->cp->pid,
1570                            &pid);
1571       (void) GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1572                                                    &pid.hashPubKey,
1573                                                    pr);
1574       pr->cp = NULL;
1575     }
1576   if (pr->bf != NULL)
1577     {
1578       GNUNET_CONTAINER_bloomfilter_free (pr->bf);                                        
1579       pr->bf = NULL;
1580     }
1581   if (pr->pirc != NULL)
1582     {
1583       GNUNET_CORE_peer_change_preference_cancel (pr->pirc->irc);
1584       pr->pirc->irc = NULL;
1585       pr->pirc = NULL;
1586     }
1587   if (pr->replies_seen != NULL)
1588     {
1589       GNUNET_free (pr->replies_seen);
1590       pr->replies_seen = NULL;
1591     }
1592   if (pr->task != GNUNET_SCHEDULER_NO_TASK)
1593     {
1594       GNUNET_SCHEDULER_cancel (pr->task);
1595       pr->task = GNUNET_SCHEDULER_NO_TASK;
1596     }
1597   while (NULL != pr->pending_head)    
1598     destroy_pending_message_list_entry (pr->pending_head);
1599   GNUNET_PEER_change_rc (pr->target_pid, -1);
1600   if (pr->used_targets != NULL)
1601     {
1602       for (i=0;i<pr->used_targets_off;i++)
1603         GNUNET_PEER_change_rc (pr->used_targets[i].pid, -1);
1604       GNUNET_free (pr->used_targets);
1605       pr->used_targets_off = 0;
1606       pr->used_targets_size = 0;
1607       pr->used_targets = NULL;
1608     }
1609   GNUNET_free (pr);
1610 }
1611
1612
1613 /**
1614  * Find latency information in 'atsi'.
1615  *
1616  * @param atsi performance data
1617  * @return connection latency
1618  */
1619 static struct GNUNET_TIME_Relative
1620 get_latency (const struct GNUNET_TRANSPORT_ATS_Information *atsi)
1621 {
1622   if (atsi == NULL)
1623     return GNUNET_TIME_UNIT_SECONDS;
1624   while ( (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR) &&
1625           (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_QUALITY_NET_DELAY) )
1626     atsi++;
1627   if (ntohl (atsi->type) == GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR) 
1628     {
1629       GNUNET_break (0);
1630       /* how can we not have latency data? */
1631       return GNUNET_TIME_UNIT_SECONDS;
1632     }
1633   return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
1634                                         ntohl (atsi->value));
1635 }
1636
1637
1638 /**
1639  * Method called whenever a given peer connects.
1640  *
1641  * @param cls closure, not used
1642  * @param peer peer identity this notification is about
1643  * @param atsi performance information
1644  */
1645 static void 
1646 peer_connect_handler (void *cls,
1647                       const struct
1648                       GNUNET_PeerIdentity * peer,
1649                       const struct GNUNET_TRANSPORT_ATS_Information *atsi)
1650 {
1651   struct ConnectedPeer *cp;
1652   struct MigrationReadyBlock *pos;
1653   char *fn;
1654   uint32_t trust;
1655   struct GNUNET_TIME_Relative latency;
1656
1657   if (0 == memcmp (&my_id, peer, sizeof (struct GNUNET_PeerIdentity)))
1658     return;
1659   latency = get_latency (atsi);
1660   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1661                                           &peer->hashPubKey);
1662   if (NULL != cp)
1663     {
1664       GNUNET_break (0);
1665       return;
1666     }
1667   cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
1668   cp->transmission_delay = GNUNET_LOAD_value_init (latency);
1669   cp->pid = GNUNET_PEER_intern (peer);
1670
1671   fn = get_trust_filename (peer);
1672   if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
1673       (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
1674     cp->disk_trust = cp->trust = ntohl (trust);
1675   GNUNET_free (fn);
1676
1677   GNUNET_break (GNUNET_OK ==
1678                 GNUNET_CONTAINER_multihashmap_put (connected_peers,
1679                                                    &peer->hashPubKey,
1680                                                    cp,
1681                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1682
1683   pos = mig_head;
1684   while (NULL != pos)
1685     {
1686       (void) consider_migration (pos, &peer->hashPubKey, cp);
1687       pos = pos->next;
1688     }
1689 }
1690
1691
1692 /**
1693  * Method called whenever a given peer has a status change.
1694  *
1695  * @param cls closure
1696  * @param peer peer identity this notification is about
1697  * @param bandwidth_in available amount of inbound bandwidth
1698  * @param bandwidth_out available amount of outbound bandwidth
1699  * @param timeout absolute time when this peer will time out
1700  *        unless we see some further activity from it
1701  * @param atsi status information
1702  */
1703 static void
1704 peer_status_handler (void *cls,
1705                      const struct
1706                      GNUNET_PeerIdentity * peer,
1707                      struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in,
1708                      struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
1709                      struct GNUNET_TIME_Absolute timeout,
1710                      const struct GNUNET_TRANSPORT_ATS_Information *atsi)
1711 {
1712   struct ConnectedPeer *cp;
1713   struct GNUNET_TIME_Relative latency;
1714
1715   latency = get_latency (atsi);
1716   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1717                                           &peer->hashPubKey);
1718   if (cp == NULL)
1719     {
1720       GNUNET_break (0);
1721       return;
1722     }
1723   GNUNET_LOAD_value_set_decline (cp->transmission_delay,
1724                                  latency);  
1725 }
1726
1727
1728
1729 /**
1730  * Increase the host credit by a value.
1731  *
1732  * @param host which peer to change the trust value on
1733  * @param value is the int value by which the
1734  *  host credit is to be increased or decreased
1735  * @returns the actual change in trust (positive or negative)
1736  */
1737 static int
1738 change_host_trust (struct ConnectedPeer *host, int value)
1739 {
1740   if (value == 0)
1741     return 0;
1742   GNUNET_assert (host != NULL);
1743   if (value > 0)
1744     {
1745       if (host->trust + value < host->trust)
1746         {
1747           value = UINT32_MAX - host->trust;
1748           host->trust = UINT32_MAX;
1749         }
1750       else
1751         host->trust += value;
1752     }
1753   else
1754     {
1755       if (host->trust < -value)
1756         {
1757           value = -host->trust;
1758           host->trust = 0;
1759         }
1760       else
1761         host->trust += value;
1762     }
1763   return value;
1764 }
1765
1766
1767 /**
1768  * Write host-trust information to a file - flush the buffer entry!
1769  */
1770 static int
1771 flush_trust (void *cls,
1772              const GNUNET_HashCode *key,
1773              void *value)
1774 {
1775   struct ConnectedPeer *host = value;
1776   char *fn;
1777   uint32_t trust;
1778   struct GNUNET_PeerIdentity pid;
1779
1780   if (host->trust == host->disk_trust)
1781     return GNUNET_OK;                     /* unchanged */
1782   GNUNET_PEER_resolve (host->pid,
1783                        &pid);
1784   fn = get_trust_filename (&pid);
1785   if (host->trust == 0)
1786     {
1787       if ((0 != UNLINK (fn)) && (errno != ENOENT))
1788         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
1789                                   GNUNET_ERROR_TYPE_BULK, "unlink", fn);
1790     }
1791   else
1792     {
1793       trust = htonl (host->trust);
1794       if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust, 
1795                                                     sizeof(uint32_t),
1796                                                     GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE
1797                                                     | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ))
1798         host->disk_trust = host->trust;
1799     }
1800   GNUNET_free (fn);
1801   return GNUNET_OK;
1802 }
1803
1804 /**
1805  * Call this method periodically to scan data/hosts for new hosts.
1806  */
1807 static void
1808 cron_flush_trust (void *cls,
1809                   const struct GNUNET_SCHEDULER_TaskContext *tc)
1810 {
1811
1812   if (NULL == connected_peers)
1813     return;
1814   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1815                                          &flush_trust,
1816                                          NULL);
1817   if (NULL == tc)
1818     return;
1819   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1820     return;
1821   GNUNET_SCHEDULER_add_delayed (TRUST_FLUSH_FREQ, &cron_flush_trust, NULL);
1822 }
1823
1824
1825 /**
1826  * Free (each) request made by the peer.
1827  *
1828  * @param cls closure, points to peer that the request belongs to
1829  * @param key current key code
1830  * @param value value in the hash map
1831  * @return GNUNET_YES (we should continue to iterate)
1832  */
1833 static int
1834 destroy_request (void *cls,
1835                  const GNUNET_HashCode * key,
1836                  void *value)
1837 {
1838   const struct GNUNET_PeerIdentity * peer = cls;
1839   struct PendingRequest *pr = value;
1840   
1841   GNUNET_break (GNUNET_YES ==
1842                 GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1843                                                       &peer->hashPubKey,
1844                                                       pr));
1845   destroy_pending_request (pr);
1846   return GNUNET_YES;
1847 }
1848
1849
1850 /**
1851  * Method called whenever a peer disconnects.
1852  *
1853  * @param cls closure, not used
1854  * @param peer peer identity this notification is about
1855  */
1856 static void
1857 peer_disconnect_handler (void *cls,
1858                          const struct
1859                          GNUNET_PeerIdentity * peer)
1860 {
1861   struct ConnectedPeer *cp;
1862   struct PendingMessage *pm;
1863   unsigned int i;
1864   struct MigrationReadyBlock *pos;
1865   struct MigrationReadyBlock *next;
1866
1867   if (0 == memcmp (&my_id, peer, sizeof (struct GNUNET_PeerIdentity)))
1868     return;
1869   GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map,
1870                                               &peer->hashPubKey,
1871                                               &destroy_request,
1872                                               (void*) peer);
1873   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1874                                           &peer->hashPubKey);
1875   if (cp == NULL)
1876     return;
1877   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1878     {
1879       if (NULL != cp->last_client_replies[i])
1880         {
1881           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1882           cp->last_client_replies[i] = NULL;
1883         }
1884     }
1885   GNUNET_break (GNUNET_YES ==
1886                 GNUNET_CONTAINER_multihashmap_remove (connected_peers,
1887                                                       &peer->hashPubKey,
1888                                                       cp));
1889   if (cp->irc != NULL)
1890     {
1891       GNUNET_CORE_peer_change_preference_cancel (cp->irc);
1892       cp->irc = NULL;
1893       cp->pr->pirc = NULL;
1894       cp->pr = NULL;
1895     }
1896
1897   /* remove this peer from migration considerations; schedule
1898      alternatives */
1899   next = mig_head;
1900   while (NULL != (pos = next))
1901     {
1902       next = pos->next;
1903       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1904         {
1905           if (pos->target_list[i] == cp->pid)
1906             {
1907               GNUNET_PEER_change_rc (pos->target_list[i], -1);
1908               pos->target_list[i] = 0;
1909             }
1910          }
1911       if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers))
1912         {
1913           delete_migration_block (pos);
1914           consider_migration_gathering ();
1915           continue;
1916         }
1917       GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1918                                              &consider_migration,
1919                                              pos);
1920     }
1921   GNUNET_PEER_change_rc (cp->pid, -1);
1922   GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1923   if (NULL != cp->cth)
1924     {
1925       GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1926       cp->cth = NULL;
1927     }
1928   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
1929     {
1930       GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task);
1931       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
1932     }
1933   while (NULL != (pm = cp->pending_messages_head))
1934     destroy_pending_message (pm, 0 /* delivery failed */);
1935   GNUNET_LOAD_value_free (cp->transmission_delay);
1936   GNUNET_break (0 == cp->pending_requests);
1937   GNUNET_free (cp);
1938 }
1939
1940
1941 /**
1942  * Iterator over hash map entries that removes all occurences
1943  * of the given 'client' from the 'last_client_replies' of the
1944  * given connected peer.
1945  *
1946  * @param cls closure, the 'struct GNUNET_SERVER_Client*' to remove
1947  * @param key current key code (unused)
1948  * @param value value in the hash map (the 'struct ConnectedPeer*' to change)
1949  * @return GNUNET_YES (we should continue to iterate)
1950  */
1951 static int
1952 remove_client_from_last_client_replies (void *cls,
1953                                         const GNUNET_HashCode * key,
1954                                         void *value)
1955 {
1956   struct GNUNET_SERVER_Client *client = cls;
1957   struct ConnectedPeer *cp = value;
1958   unsigned int i;
1959
1960   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1961     {
1962       if (cp->last_client_replies[i] == client)
1963         {
1964           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1965           cp->last_client_replies[i] = NULL;
1966         }
1967     }  
1968   return GNUNET_YES;
1969 }
1970
1971
1972 /**
1973  * A client disconnected.  Remove all of its pending queries.
1974  *
1975  * @param cls closure, NULL
1976  * @param client identification of the client
1977  */
1978 static void
1979 handle_client_disconnect (void *cls,
1980                           struct GNUNET_SERVER_Client
1981                           * client)
1982 {
1983   struct ClientList *pos;
1984   struct ClientList *prev;
1985   struct ClientRequestList *rcl;
1986   struct ClientResponseMessage *creply;
1987
1988   if (client == NULL)
1989     return;
1990   prev = NULL;
1991   pos = client_list;
1992   while ( (NULL != pos) &&
1993           (pos->client != client) )
1994     {
1995       prev = pos;
1996       pos = pos->next;
1997     }
1998   if (pos == NULL)
1999     return; /* no requests pending for this client */
2000   while (NULL != (rcl = pos->rl_head))
2001     {
2002       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2003                   "Destroying pending request `%s' on disconnect\n",
2004                   GNUNET_h2s (&rcl->req->query));
2005       destroy_pending_request (rcl->req);
2006     }
2007   if (prev == NULL)
2008     client_list = pos->next;
2009   else
2010     prev->next = pos->next;
2011   if (pos->th != NULL)
2012     {
2013       GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
2014       pos->th = NULL;
2015     }
2016   while (NULL != (creply = pos->res_head))
2017     {
2018       GNUNET_CONTAINER_DLL_remove (pos->res_head,
2019                                    pos->res_tail,
2020                                    creply);
2021       GNUNET_free (creply);
2022     }    
2023   GNUNET_SERVER_client_drop (pos->client);
2024   GNUNET_free (pos);
2025   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
2026                                          &remove_client_from_last_client_replies,
2027                                          client);
2028 }
2029
2030
2031 /**
2032  * Iterator to free peer entries.
2033  *
2034  * @param cls closure, unused
2035  * @param key current key code
2036  * @param value value in the hash map (peer entry)
2037  * @return GNUNET_YES (we should continue to iterate)
2038  */
2039 static int 
2040 clean_peer (void *cls,
2041             const GNUNET_HashCode * key,
2042             void *value)
2043 {
2044   peer_disconnect_handler (NULL, (const struct GNUNET_PeerIdentity*) key);
2045   return GNUNET_YES;
2046 }
2047
2048
2049 /**
2050  * Task run during shutdown.
2051  *
2052  * @param cls unused
2053  * @param tc unused
2054  */
2055 static void
2056 shutdown_task (void *cls,
2057                const struct GNUNET_SCHEDULER_TaskContext *tc)
2058 {
2059   if (mig_qe != NULL)
2060     {
2061       GNUNET_DATASTORE_cancel (mig_qe);
2062       mig_qe = NULL;
2063     }
2064   if (dht_qe != NULL)
2065     {
2066       GNUNET_DATASTORE_cancel (dht_qe);
2067       dht_qe = NULL;
2068     }
2069   if (GNUNET_SCHEDULER_NO_TASK != mig_task)
2070     {
2071       GNUNET_SCHEDULER_cancel (mig_task);
2072       mig_task = GNUNET_SCHEDULER_NO_TASK;
2073     }
2074   if (GNUNET_SCHEDULER_NO_TASK != dht_task)
2075     {
2076       GNUNET_SCHEDULER_cancel (dht_task);
2077       dht_task = GNUNET_SCHEDULER_NO_TASK;
2078     }
2079   while (client_list != NULL)
2080     handle_client_disconnect (NULL,
2081                               client_list->client);
2082   cron_flush_trust (NULL, NULL);
2083   GNUNET_assert (NULL != core);
2084   GNUNET_CORE_disconnect (core);
2085   core = NULL;
2086   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
2087                                          &clean_peer,
2088                                          NULL);
2089   GNUNET_break (0 == GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap));
2090   GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
2091   requests_by_expiration_heap = 0;
2092   GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
2093   connected_peers = NULL;
2094   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map));
2095   GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
2096   query_request_map = NULL;
2097   GNUNET_LOAD_value_free (rt_entry_lifetime);
2098   rt_entry_lifetime = NULL;
2099   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map));
2100   GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
2101   peer_request_map = NULL;
2102   if (stats != NULL)
2103     {
2104       GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
2105       stats = NULL;
2106     }
2107   if (dsh != NULL)
2108     {
2109       GNUNET_DATASTORE_disconnect (dsh,
2110                                    GNUNET_NO);
2111       dsh = NULL;
2112     }
2113   while (mig_head != NULL)
2114     delete_migration_block (mig_head);
2115   GNUNET_assert (0 == mig_size);
2116   GNUNET_DHT_disconnect (dht_handle);
2117   dht_handle = NULL;
2118   GNUNET_LOAD_value_free (datastore_get_load);
2119   datastore_get_load = NULL;
2120   GNUNET_LOAD_value_free (datastore_put_load);
2121   datastore_put_load = NULL;
2122   GNUNET_BLOCK_context_destroy (block_ctx);
2123   block_ctx = NULL;
2124   GNUNET_CONFIGURATION_destroy (block_cfg);
2125   block_cfg = NULL;
2126   cfg = NULL;  
2127   GNUNET_free_non_null (trustDirectory);
2128   trustDirectory = NULL;
2129   GNUNET_SCHEDULER_cancel (cover_age_task);
2130   cover_age_task = GNUNET_SCHEDULER_NO_TASK;
2131 }
2132
2133
2134 /* ******************* Utility functions  ******************** */
2135
2136
2137 /**
2138  * We've had to delay a request for transmission to core, but now
2139  * we should be ready.  Run it.
2140  *
2141  * @param cls the 'struct ConnectedPeer' for which a request was delayed
2142  * @param tc task context (unused)
2143  */
2144 static void
2145 delayed_transmission_request (void *cls,
2146                               const struct GNUNET_SCHEDULER_TaskContext *tc)
2147 {
2148   struct ConnectedPeer *cp = cls;
2149   struct GNUNET_PeerIdentity pid;
2150   struct PendingMessage *pm;
2151
2152   pm = cp->pending_messages_head;
2153   cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
2154   GNUNET_assert (cp->cth == NULL);
2155   if (pm == NULL)
2156     return;
2157   GNUNET_PEER_resolve (cp->pid,
2158                        &pid);
2159   cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
2160   cp->cth = GNUNET_CORE_notify_transmit_ready (core,
2161                                                pm->priority,
2162                                                GNUNET_CONSTANTS_SERVICE_TIMEOUT,
2163                                                &pid,
2164                                                pm->msize,
2165                                                &transmit_to_peer,
2166                                                cp);
2167 }
2168
2169
2170 /**
2171  * Transmit messages by copying it to the target buffer
2172  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
2173  * for writing in the meantime.  In that case, do nothing
2174  * (the disconnect or shutdown handler will take care of the rest).
2175  * If we were able to transmit messages and there are still more
2176  * pending, ask core again for further calls to this function.
2177  *
2178  * @param cls closure, pointer to the 'struct ConnectedPeer*'
2179  * @param size number of bytes available in buf
2180  * @param buf where the callee should write the message
2181  * @return number of bytes written to buf
2182  */
2183 static size_t
2184 transmit_to_peer (void *cls,
2185                   size_t size, void *buf)
2186 {
2187   struct ConnectedPeer *cp = cls;
2188   char *cbuf = buf;
2189   struct PendingMessage *pm;
2190   struct PendingMessage *next_pm;
2191   struct GNUNET_TIME_Absolute now;
2192   struct GNUNET_TIME_Relative min_delay;
2193   struct MigrationReadyBlock *mb;
2194   struct MigrationReadyBlock *next;
2195   struct PutMessage migm;
2196   size_t msize;
2197   unsigned int i;
2198   struct GNUNET_PeerIdentity pid;
2199  
2200   cp->cth = NULL;
2201   if (NULL == buf)
2202     {
2203 #if DEBUG_FS
2204       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2205                   "Dropping message, core too busy.\n");
2206 #endif
2207       GNUNET_LOAD_update (cp->transmission_delay,
2208                           UINT64_MAX);
2209       
2210       if (NULL != (pm = cp->pending_messages_head))
2211         {
2212           GNUNET_CONTAINER_DLL_remove (cp->pending_messages_head,
2213                                        cp->pending_messages_tail,
2214                                        pm);
2215           cp->pending_requests--;    
2216           destroy_pending_message (pm, 0);
2217         }
2218       if (NULL != (pm = cp->pending_messages_head))
2219         {
2220           GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == cp->delayed_transmission_request_task);
2221           min_delay = GNUNET_TIME_absolute_get_remaining (pm->delay_until);
2222           cp->delayed_transmission_request_task
2223             = GNUNET_SCHEDULER_add_delayed (min_delay,
2224                                             &delayed_transmission_request,
2225                                             cp);
2226         }
2227       return 0;
2228     }  
2229   GNUNET_LOAD_update (cp->transmission_delay,
2230                       GNUNET_TIME_absolute_get_duration (cp->last_transmission_request_start).rel_value);
2231   now = GNUNET_TIME_absolute_get ();
2232   msize = 0;
2233   min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
2234   next_pm = cp->pending_messages_head;
2235   while ( (NULL != (pm = next_pm) ) &&
2236           (pm->msize <= size) )
2237     {
2238       next_pm = pm->next;
2239       if (pm->delay_until.abs_value > now.abs_value)
2240         {
2241           min_delay = GNUNET_TIME_relative_min (min_delay,
2242                                                 GNUNET_TIME_absolute_get_remaining (pm->delay_until));
2243           continue;
2244         }
2245       memcpy (&cbuf[msize], &pm[1], pm->msize);
2246       msize += pm->msize;
2247       size -= pm->msize;
2248       if (NULL == pm->pml)
2249         {
2250           GNUNET_CONTAINER_DLL_remove (cp->pending_messages_head,
2251                                        cp->pending_messages_tail,
2252                                        pm);
2253           cp->pending_requests--;
2254         }
2255       destroy_pending_message (pm, cp->pid);
2256     }
2257   if (pm != NULL)
2258     min_delay = GNUNET_TIME_UNIT_ZERO;
2259   if (NULL != cp->pending_messages_head)
2260     {     
2261       GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == cp->delayed_transmission_request_task);
2262       cp->delayed_transmission_request_task
2263         = GNUNET_SCHEDULER_add_delayed (min_delay,
2264                                         &delayed_transmission_request,
2265                                         cp);
2266     }
2267   if (pm == NULL)
2268     {      
2269       GNUNET_PEER_resolve (cp->pid,
2270                            &pid);
2271       next = mig_head;
2272       while (NULL != (mb = next))
2273         {
2274           next = mb->next;
2275           for (i=0;i<MIGRATION_LIST_SIZE;i++)
2276             {
2277               if ( (cp->pid == mb->target_list[i]) &&
2278                    (mb->size + sizeof (migm) <= size) )
2279                 {
2280                   GNUNET_PEER_change_rc (mb->target_list[i], -1);
2281                   mb->target_list[i] = 0;
2282                   mb->used_targets++;
2283                   memset (&migm, 0, sizeof (migm));
2284                   migm.header.size = htons (sizeof (migm) + mb->size);
2285                   migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2286                   migm.type = htonl (mb->type);
2287                   migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
2288                   memcpy (&cbuf[msize], &migm, sizeof (migm));
2289                   msize += sizeof (migm);
2290                   size -= sizeof (migm);
2291                   memcpy (&cbuf[msize], &mb[1], mb->size);
2292                   msize += mb->size;
2293                   size -= mb->size;
2294 #if DEBUG_FS
2295                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2296                               "Pushing migration block `%s' (%u bytes) to `%s'\n",
2297                               GNUNET_h2s (&mb->query),
2298                               (unsigned int) mb->size,
2299                               GNUNET_i2s (&pid));
2300 #endif    
2301                   break;
2302                 }
2303               else
2304                 {
2305 #if DEBUG_FS
2306                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2307                               "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
2308                               GNUNET_h2s (&mb->query),
2309                               (unsigned int) mb->size,
2310                               GNUNET_i2s (&pid));
2311 #endif    
2312                 }
2313             }
2314           if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
2315                (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
2316             {
2317               delete_migration_block (mb);
2318               consider_migration_gathering ();
2319             }
2320         }
2321       consider_migration (NULL, 
2322                           &pid.hashPubKey,
2323                           cp);
2324     }
2325 #if DEBUG_FS
2326   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2327               "Transmitting %u bytes to peer with PID %u\n",
2328               (unsigned int) msize,
2329               (unsigned int) cp->pid);
2330 #endif
2331   return msize;
2332 }
2333
2334
2335 /**
2336  * Add a message to the set of pending messages for the given peer.
2337  *
2338  * @param cp peer to send message to
2339  * @param pm message to queue
2340  * @param pr request on which behalf this message is being queued
2341  */
2342 static void
2343 add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
2344                                   struct PendingMessage *pm,
2345                                   struct PendingRequest *pr)
2346 {
2347   struct PendingMessage *pos;
2348   struct PendingMessageList *pml;
2349   struct GNUNET_PeerIdentity pid;
2350
2351   GNUNET_assert (pm->next == NULL);
2352   GNUNET_assert (pm->pml == NULL);    
2353   if (pr != NULL)
2354     {
2355       pml = GNUNET_malloc (sizeof (struct PendingMessageList));
2356       pml->req = pr;
2357       pml->target = cp;
2358       pml->pm = pm;
2359       pm->pml = pml;  
2360       GNUNET_CONTAINER_DLL_insert (pr->pending_head,
2361                                    pr->pending_tail,
2362                                    pml);
2363     }
2364   pos = cp->pending_messages_head;
2365   while ( (pos != NULL) &&
2366           (pm->priority < pos->priority) )
2367     pos = pos->next;    
2368   GNUNET_CONTAINER_DLL_insert_after (cp->pending_messages_head,
2369                                      cp->pending_messages_tail,
2370                                      pos,
2371                                      pm);
2372   cp->pending_requests++;
2373   if (cp->pending_requests > MAX_QUEUE_PER_PEER)
2374     {
2375       GNUNET_STATISTICS_update (stats,
2376                                 gettext_noop ("# P2P searches discarded (queue length bound)"),
2377                                 1,
2378                                 GNUNET_NO);
2379       destroy_pending_message (cp->pending_messages_tail, 0);  
2380     }
2381   GNUNET_PEER_resolve (cp->pid, &pid);
2382   if (NULL != cp->cth)
2383     {
2384       GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
2385       cp->cth = NULL;
2386     }
2387   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
2388     {
2389       GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task);
2390       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
2391     }
2392   /* need to schedule transmission */
2393   cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
2394   cp->cth = GNUNET_CORE_notify_transmit_ready (core,
2395                                                cp->pending_messages_head->priority,
2396                                                MAX_TRANSMIT_DELAY,
2397                                                &pid,
2398                                                cp->pending_messages_head->msize,
2399                                                &transmit_to_peer,
2400                                                cp);
2401   if (cp->cth == NULL)
2402     {
2403 #if DEBUG_FS
2404       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2405                   "Failed to schedule transmission with core!\n");
2406 #endif
2407       GNUNET_STATISTICS_update (stats,
2408                                 gettext_noop ("# CORE transmission failures"),
2409                                 1,
2410                                 GNUNET_NO);
2411     }
2412 }
2413
2414
2415 /**
2416  * Test if the DATABASE (GET) load on this peer is too high
2417  * to even consider processing the query at
2418  * all.  
2419  * 
2420  * @return GNUNET_YES if the load is too high to do anything (load high)
2421  *         GNUNET_NO to process normally (load normal)
2422  *         GNUNET_SYSERR to process for free (load low)
2423  */
2424 static int
2425 test_get_load_too_high (uint32_t priority)
2426 {
2427   double ld;
2428
2429   ld = GNUNET_LOAD_get_load (datastore_get_load);
2430   if (ld < 1)
2431     return GNUNET_SYSERR;    
2432   if (ld <= priority)    
2433     return GNUNET_NO;    
2434   return GNUNET_YES;
2435 }
2436
2437
2438
2439
2440 /**
2441  * Test if the DATABASE (PUT) load on this peer is too high
2442  * to even consider processing the query at
2443  * all.  
2444  * 
2445  * @return GNUNET_YES if the load is too high to do anything (load high)
2446  *         GNUNET_NO to process normally (load normal or low)
2447  */
2448 static int
2449 test_put_load_too_high (uint32_t priority)
2450 {
2451   double ld;
2452
2453   if (GNUNET_LOAD_get_average (datastore_put_load) < 50)
2454     return GNUNET_NO; /* very fast */
2455   ld = GNUNET_LOAD_get_load (datastore_put_load);
2456   if (ld < 2.0 * (1 + priority))
2457     return GNUNET_NO;
2458   GNUNET_STATISTICS_update (stats,
2459                             gettext_noop ("# storage requests dropped due to high load"),
2460                             1,
2461                             GNUNET_NO);
2462   return GNUNET_YES;
2463 }
2464
2465
2466 /* ******************* Pending Request Refresh Task ******************** */
2467
2468
2469
2470 /**
2471  * We use a random delay to make the timing of requests less
2472  * predictable.  This function returns such a random delay.  We add a base
2473  * delay of MAX_CORK_DELAY (1s).
2474  *
2475  * FIXME: make schedule dependent on the specifics of the request?
2476  * Or bandwidth and number of connected peers and load?
2477  *
2478  * @return random delay to use for some request, between 1s and 1000+TTL_DECREMENT ms
2479  */
2480 static struct GNUNET_TIME_Relative
2481 get_processing_delay ()
2482 {
2483   return 
2484     GNUNET_TIME_relative_add (GNUNET_CONSTANTS_MAX_CORK_DELAY,
2485                               GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
2486                                                              GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2487                                                                                        TTL_DECREMENT)));
2488 }
2489
2490
2491 /**
2492  * We're processing a GET request from another peer and have decided
2493  * to forward it to other peers.  This function is called periodically
2494  * and should forward the request to other peers until we have all
2495  * possible replies.  If we have transmitted the *only* reply to
2496  * the initiator we should destroy the pending request.  If we have
2497  * many replies in the queue to the initiator, we should delay sending
2498  * out more queries until the reply queue has shrunk some.
2499  *
2500  * @param cls our "struct ProcessGetContext *"
2501  * @param tc unused
2502  */
2503 static void
2504 forward_request_task (void *cls,
2505                       const struct GNUNET_SCHEDULER_TaskContext *tc);
2506
2507
2508 /**
2509  * Function called after we either failed or succeeded
2510  * at transmitting a query to a peer.  
2511  *
2512  * @param cls the requests "struct PendingRequest*"
2513  * @param tpid ID of receiving peer, 0 on transmission error
2514  */
2515 static void
2516 transmit_query_continuation (void *cls,
2517                              GNUNET_PEER_Id tpid)
2518 {
2519   struct PendingRequest *pr = cls;
2520   unsigned int i;
2521
2522   GNUNET_STATISTICS_update (stats,
2523                             gettext_noop ("# queries scheduled for forwarding"),
2524                             -1,
2525                             GNUNET_NO);
2526   if (tpid == 0)   
2527     {
2528 #if DEBUG_FS
2529       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2530                   "Transmission of request failed, will try again later.\n");
2531 #endif
2532       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2533         pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2534                                                  &forward_request_task,
2535                                                  pr); 
2536       return;    
2537     }
2538 #if DEBUG_FS
2539   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2540               "Transmitted query `%s'\n",
2541               GNUNET_h2s (&pr->query));
2542 #endif
2543   GNUNET_STATISTICS_update (stats,
2544                             gettext_noop ("# queries forwarded"),
2545                             1,
2546                             GNUNET_NO);
2547   for (i=0;i<pr->used_targets_off;i++)
2548     if (pr->used_targets[i].pid == tpid)
2549       break; /* found match! */    
2550   if (i == pr->used_targets_off)
2551     {
2552       /* need to create new entry */
2553       if (pr->used_targets_off == pr->used_targets_size)
2554         GNUNET_array_grow (pr->used_targets,
2555                            pr->used_targets_size,
2556                            pr->used_targets_size * 2 + 2);
2557       GNUNET_PEER_change_rc (tpid, 1);
2558       pr->used_targets[pr->used_targets_off].pid = tpid;
2559       pr->used_targets[pr->used_targets_off].num_requests = 0;
2560       i = pr->used_targets_off++;
2561     }
2562   pr->used_targets[i].last_request_time = GNUNET_TIME_absolute_get ();
2563   pr->used_targets[i].num_requests++;
2564   if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2565     pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2566                                              &forward_request_task,
2567                                              pr);
2568 }
2569
2570
2571 /**
2572  * How many bytes should a bloomfilter be if we have already seen
2573  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
2574  * of bits set per entry.  Furthermore, we should not re-size the
2575  * filter too often (to keep it cheap).
2576  *
2577  * Since other peers will also add entries but not resize the filter,
2578  * we should generally pick a slightly larger size than what the
2579  * strict math would suggest.
2580  *
2581  * @return must be a power of two and smaller or equal to 2^15.
2582  */
2583 static size_t
2584 compute_bloomfilter_size (unsigned int entry_count)
2585 {
2586   size_t size;
2587   unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
2588   uint16_t max = 1 << 15;
2589
2590   if (entry_count > max)
2591     return max;
2592   size = 8;
2593   while ((size < max) && (size < ideal))
2594     size *= 2;
2595   if (size > max)
2596     return max;
2597   return size;
2598 }
2599
2600
2601 /**
2602  * Recalculate our bloom filter for filtering replies.  This function
2603  * will create a new bloom filter from scratch, so it should only be
2604  * called if we have no bloomfilter at all (and hence can create a
2605  * fresh one of minimal size without problems) OR if our peer is the
2606  * initiator (in which case we may resize to larger than mimimum size).
2607  *
2608  * @param pr request for which the BF is to be recomputed
2609  */
2610 static void
2611 refresh_bloomfilter (struct PendingRequest *pr)
2612 {
2613   unsigned int i;
2614   size_t nsize;
2615   GNUNET_HashCode mhash;
2616
2617   nsize = compute_bloomfilter_size (pr->replies_seen_off);
2618   if (nsize == pr->bf_size)
2619     return; /* size not changed */
2620   if (pr->bf != NULL)
2621     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
2622   pr->bf_size = nsize;
2623   pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
2624   pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
2625                                               pr->bf_size,
2626                                               BLOOMFILTER_K);
2627   for (i=0;i<pr->replies_seen_off;i++)
2628     {
2629       GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i],
2630                                 pr->mingle,
2631                                 &mhash);
2632       GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
2633     }
2634 }
2635
2636
2637 /**
2638  * Function called after we've tried to reserve a certain amount of
2639  * bandwidth for a reply.  Check if we succeeded and if so send our
2640  * query.
2641  *
2642  * @param cls the requests "struct PendingRequest*"
2643  * @param peer identifies the peer
2644  * @param bpm_out set to the current bandwidth limit (sending) for this peer
2645  * @param amount set to the amount that was actually reserved or unreserved
2646  * @param preference current traffic preference for the given peer
2647  */
2648 static void
2649 target_reservation_cb (void *cls,
2650                        const struct
2651                        GNUNET_PeerIdentity * peer,
2652                        struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
2653                        int amount,
2654                        uint64_t preference)
2655 {
2656   struct PendingRequest *pr = cls;
2657   struct ConnectedPeer *cp;
2658   struct PendingMessage *pm;
2659   struct GetMessage *gm;
2660   GNUNET_HashCode *ext;
2661   char *bfdata;
2662   size_t msize;
2663   unsigned int k;
2664   int no_route;
2665   uint32_t bm;
2666   unsigned int i;
2667
2668   /* (3) transmit, update ttl/priority */
2669   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2670                                           &peer->hashPubKey);
2671   if (cp == NULL)
2672     {
2673       /* Peer must have just left */
2674 #if DEBUG_FS
2675       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2676                   "Selected peer disconnected!\n");
2677 #endif
2678       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2679         pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2680                                                  &forward_request_task,
2681                                                  pr);
2682       return;
2683     }
2684   cp->irc = NULL;
2685   pr->pirc = NULL;
2686   if (peer == NULL)
2687     {
2688       /* error in communication with core, try again later */
2689       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2690         pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2691                                                  &forward_request_task,
2692                                                  pr);
2693       return;
2694     }
2695   no_route = GNUNET_NO;
2696   if (amount == 0)
2697     {
2698       if (pr->cp == NULL)
2699         {
2700 #if DEBUG_FS > 1
2701           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2702                       "Failed to reserve bandwidth for reply (got %d/%u bytes only)!\n",
2703                       amount,
2704                       DBLOCK_SIZE);
2705 #endif
2706           GNUNET_STATISTICS_update (stats,
2707                                     gettext_noop ("# reply bandwidth reservation requests failed"),
2708                                     1,
2709                                     GNUNET_NO);
2710           if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2711             pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2712                                                      &forward_request_task,
2713                                                      pr);
2714           return;  /* this target round failed */
2715         }
2716       no_route = GNUNET_YES;
2717     }
2718   
2719   GNUNET_STATISTICS_update (stats,
2720                             gettext_noop ("# queries scheduled for forwarding"),
2721                             1,
2722                             GNUNET_NO);
2723   for (i=0;i<pr->used_targets_off;i++)
2724     if (pr->used_targets[i].pid == cp->pid) 
2725       {
2726         GNUNET_STATISTICS_update (stats,
2727                                   gettext_noop ("# queries retransmitted to same target"),
2728                                   1,
2729                                   GNUNET_NO);
2730         break;
2731       } 
2732
2733   /* build message and insert message into priority queue */
2734 #if DEBUG_FS
2735   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2736               "Forwarding request `%s' to `%4s'!\n",
2737               GNUNET_h2s (&pr->query),
2738               GNUNET_i2s (peer));
2739 #endif
2740   k = 0;
2741   bm = 0;
2742   if (GNUNET_YES == no_route)
2743     {
2744       bm |= GET_MESSAGE_BIT_RETURN_TO;
2745       k++;      
2746     }
2747   if (pr->namespace != NULL)
2748     {
2749       bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
2750       k++;
2751     }
2752   if (pr->target_pid != 0)
2753     {
2754       bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
2755       k++;
2756     }
2757   msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
2758   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
2759   pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
2760   pm->msize = msize;
2761   gm = (struct GetMessage*) &pm[1];
2762   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
2763   gm->header.size = htons (msize);
2764   gm->type = htonl (pr->type);
2765   pr->remaining_priority /= 2;
2766   gm->priority = htonl (pr->remaining_priority);
2767   gm->ttl = htonl (pr->ttl);
2768   gm->filter_mutator = htonl(pr->mingle); 
2769   gm->hash_bitmap = htonl (bm);
2770   gm->query = pr->query;
2771   ext = (GNUNET_HashCode*) &gm[1];
2772   k = 0;
2773   if (GNUNET_YES == no_route)
2774     GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2775   if (pr->namespace != NULL)
2776     memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
2777   if (pr->target_pid != 0)
2778     GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2779   bfdata = (char *) &ext[k];
2780   if (pr->bf != NULL)
2781     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
2782                                                bfdata,
2783                                                pr->bf_size);
2784   pm->cont = &transmit_query_continuation;
2785   pm->cont_cls = pr;
2786   cp->last_request_times[(cp->last_request_times_off++) % MAX_QUEUE_PER_PEER] = GNUNET_TIME_absolute_get ();
2787   add_to_pending_messages_for_peer (cp, pm, pr);
2788 }
2789
2790
2791 /**
2792  * Closure used for "target_peer_select_cb".
2793  */
2794 struct PeerSelectionContext 
2795 {
2796   /**
2797    * The request for which we are selecting
2798    * peers.
2799    */
2800   struct PendingRequest *pr;
2801
2802   /**
2803    * Current "prime" target.
2804    */
2805   struct GNUNET_PeerIdentity target;
2806
2807   /**
2808    * How much do we like this target?
2809    */
2810   double target_score;
2811
2812   /**
2813    * Does it make sense to we re-try quickly again?
2814    */
2815   int fast_retry;
2816
2817 };
2818
2819
2820 /**
2821  * Function called for each connected peer to determine
2822  * which one(s) would make good targets for forwarding.
2823  *
2824  * @param cls closure (struct PeerSelectionContext)
2825  * @param key current key code (peer identity)
2826  * @param value value in the hash map (struct ConnectedPeer)
2827  * @return GNUNET_YES if we should continue to
2828  *         iterate,
2829  *         GNUNET_NO if not.
2830  */
2831 static int
2832 target_peer_select_cb (void *cls,
2833                        const GNUNET_HashCode * key,
2834                        void *value)
2835 {
2836   struct PeerSelectionContext *psc = cls;
2837   struct ConnectedPeer *cp = value;
2838   struct PendingRequest *pr = psc->pr;
2839   struct GNUNET_TIME_Relative delay;
2840   double score;
2841   unsigned int i;
2842   unsigned int pc;
2843
2844   /* 1) check that this peer is not the initiator */
2845   if (cp == pr->cp)     
2846     {
2847 #if DEBUG_FS
2848       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2849                   "Skipping initiator in forwarding selection\n");
2850 #endif
2851       return GNUNET_YES; /* skip */        
2852     }
2853   if (cp->irc != NULL)
2854     {
2855       psc->fast_retry = GNUNET_YES;
2856       return GNUNET_YES; /* skip: already querying core about this peer for other reasons */
2857     }
2858
2859   /* 2) check if we have already (recently) forwarded to this peer */
2860   /* 2a) this particular request */
2861   pc = 0;
2862   for (i=0;i<pr->used_targets_off;i++)
2863     if (pr->used_targets[i].pid == cp->pid) 
2864       {
2865         pc = pr->used_targets[i].num_requests;
2866         GNUNET_assert (pc > 0);
2867         /* FIXME: make re-enabling a peer independent of how often
2868            this function is called??? */
2869         if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2870                                            RETRY_PROBABILITY_INV * pc))
2871           {
2872 #if DEBUG_FS
2873             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2874                         "NOT re-trying query that was previously transmitted %u times\n",
2875                         (unsigned int) pc);
2876 #endif
2877             return GNUNET_YES; /* skip */
2878           }
2879         break;
2880       }
2881 #if DEBUG_FS
2882   if (0 < pc)
2883     {
2884       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2885                   "Re-trying query that was previously transmitted %u times to this peer\n",
2886                   (unsigned int) pc);
2887     }
2888 #endif
2889   /* 2b) many other requests to this peer */
2890   delay = GNUNET_TIME_absolute_get_duration (cp->last_request_times[cp->last_request_times_off % MAX_QUEUE_PER_PEER]);
2891   if (delay.rel_value <= cp->avg_delay.rel_value)
2892     {
2893 #if DEBUG_FS
2894       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2895                   "NOT sending query since we send %u others to this peer in the last %llums\n",
2896                   MAX_QUEUE_PER_PEER,
2897                   cp->avg_delay.rel_value);
2898 #endif
2899       return GNUNET_YES; /* skip */      
2900     }
2901
2902   /* 3) calculate how much we'd like to forward to this peer,
2903      starting with a random value that is strong enough
2904      to at least give any peer a chance sometimes 
2905      (compared to the other factors that come later) */
2906   /* 3a) count successful (recent) routes from cp for same source */
2907   if (pr->cp != NULL)
2908     {
2909       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2910                                         P2P_SUCCESS_LIST_SIZE);
2911       for (i=0;i<P2P_SUCCESS_LIST_SIZE;i++)
2912         if (cp->last_p2p_replies[i] == pr->cp->pid)
2913           score += 1.0; /* likely successful based on hot path */
2914     }
2915   else
2916     {
2917       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2918                                         CS2P_SUCCESS_LIST_SIZE);
2919       for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
2920         if (cp->last_client_replies[i] == pr->client_request_list->client_list->client)
2921           score += 1.0; /* likely successful based on hot path */
2922     }
2923   /* 3b) include latency */
2924   if (cp->avg_delay.rel_value < 4 * TTL_DECREMENT)
2925     score += 1.0; /* likely fast based on latency */
2926   /* 3c) include priorities */
2927   if (cp->avg_priority <= pr->remaining_priority / 2.0)
2928     score += 1.0; /* likely successful based on priorities */
2929   /* 3d) penalize for queue size */  
2930   score -= (2.0 * cp->pending_requests / (double) MAX_QUEUE_PER_PEER); 
2931   /* 3e) include peer proximity */
2932   score -= (2.0 * (GNUNET_CRYPTO_hash_distance_u32 (key,
2933                                                     &pr->query)) / (double) UINT32_MAX);
2934   /* 4) super-bonus for being the known target */
2935   if (pr->target_pid == cp->pid)
2936     score += 100.0;
2937   /* store best-fit in closure */
2938   score++; /* avoid zero */
2939   if (score > psc->target_score)
2940     {
2941       psc->target_score = score;
2942       psc->target.hashPubKey = *key; 
2943     }
2944 #if DEBUG_FS
2945   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2946               "Peer `%s' gets score %f for forwarding query, max is %8f\n",
2947               GNUNET_h2s (key),
2948               score,
2949               psc->target_score);
2950 #endif
2951   return GNUNET_YES;
2952 }
2953   
2954
2955 /**
2956  * The priority level imposes a bound on the maximum
2957  * value for the ttl that can be requested.
2958  *
2959  * @param ttl_in requested ttl
2960  * @param prio given priority
2961  * @return ttl_in if ttl_in is below the limit,
2962  *         otherwise the ttl-limit for the given priority
2963  */
2964 static int32_t
2965 bound_ttl (int32_t ttl_in, uint32_t prio)
2966 {
2967   unsigned long long allowed;
2968
2969   if (ttl_in <= 0)
2970     return ttl_in;
2971   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
2972   if (ttl_in > allowed)      
2973     {
2974       if (allowed >= (1 << 30))
2975         return 1 << 30;
2976       return allowed;
2977     }
2978   return ttl_in;
2979 }
2980
2981
2982 /**
2983  * Iterator called on each result obtained for a DHT
2984  * operation that expects a reply
2985  *
2986  * @param cls closure
2987  * @param exp when will this value expire
2988  * @param key key of the result
2989  * @param get_path NULL-terminated array of pointers
2990  *                 to the peers on reverse GET path (or NULL if not recorded)
2991  * @param put_path NULL-terminated array of pointers
2992  *                 to the peers on the PUT path (or NULL if not recorded)
2993  * @param type type of the result
2994  * @param size number of bytes in data
2995  * @param data pointer to the result data
2996  */
2997 static void
2998 process_dht_reply (void *cls,
2999                    struct GNUNET_TIME_Absolute exp,
3000                    const GNUNET_HashCode * key,
3001                    const struct GNUNET_PeerIdentity * const *get_path,
3002                    const struct GNUNET_PeerIdentity * const *put_path,
3003                    enum GNUNET_BLOCK_Type type,
3004                    size_t size,
3005                    const void *data);
3006
3007
3008 /**
3009  * We're processing a GET request and have decided
3010  * to forward it to other peers.  This function is called periodically
3011  * and should forward the request to other peers until we have all
3012  * possible replies.  If we have transmitted the *only* reply to
3013  * the initiator we should destroy the pending request.  If we have
3014  * many replies in the queue to the initiator, we should delay sending
3015  * out more queries until the reply queue has shrunk some.
3016  *
3017  * @param cls our "struct ProcessGetContext *"
3018  * @param tc unused
3019  */
3020 static void
3021 forward_request_task (void *cls,
3022                      const struct GNUNET_SCHEDULER_TaskContext *tc)
3023 {
3024   struct PendingRequest *pr = cls;
3025   struct PeerSelectionContext psc;
3026   struct ConnectedPeer *cp; 
3027   struct GNUNET_TIME_Relative delay;
3028
3029   pr->task = GNUNET_SCHEDULER_NO_TASK;
3030   if (pr->pirc != NULL)
3031     {
3032 #if DEBUG_FS
3033       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3034                   "Forwarding of query `%s' not attempted due to pending local lookup!\n",
3035                   GNUNET_h2s (&pr->query));
3036 #endif
3037       return; /* already pending */
3038     }
3039   if (GNUNET_YES == pr->local_only)
3040     return; /* configured to not do P2P search */
3041   /* (0) try DHT */
3042   if ( (0 == pr->anonymity_level) &&
3043        (GNUNET_YES != pr->forward_only) &&
3044        (pr->type != GNUNET_BLOCK_TYPE_FS_DBLOCK) &&
3045        (pr->type != GNUNET_BLOCK_TYPE_FS_IBLOCK) )
3046     {
3047       pr->dht_get = GNUNET_DHT_get_start (dht_handle,
3048                                           GNUNET_TIME_UNIT_FOREVER_REL,
3049                                           pr->type,
3050                                           &pr->query,
3051                                           DEFAULT_GET_REPLICATION,
3052                                           GNUNET_DHT_RO_NONE,
3053                                           pr->bf,
3054                                           pr->mingle,
3055                                           pr->namespace,
3056                                           (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
3057                                           &process_dht_reply,
3058                                           pr);
3059     }
3060
3061   if ( (pr->anonymity_level > 1) &&
3062        (cover_query_count < pr->anonymity_level - 1) )
3063     {
3064       delay = get_processing_delay ();
3065 #if DEBUG_FS 
3066       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3067                   "Not enough cover traffic to forward query `%s', will try again in %llu ms!\n",
3068                   GNUNET_h2s (&pr->query),
3069                   delay.rel_value);
3070 #endif
3071       pr->task = GNUNET_SCHEDULER_add_delayed (delay,
3072                                                &forward_request_task,
3073                                                pr);
3074       return;
3075     }
3076   /* consume cover traffic */
3077   if (pr->anonymity_level > 1) 
3078     cover_query_count -= pr->anonymity_level - 1;
3079
3080   /* (1) select target */
3081   psc.pr = pr;
3082   psc.target_score = -DBL_MAX;
3083   psc.fast_retry = GNUNET_NO;
3084   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
3085                                          &target_peer_select_cb,
3086                                          &psc);  
3087   if (psc.target_score == -DBL_MAX)
3088     {
3089       if (psc.fast_retry == GNUNET_YES)
3090         delay = GNUNET_TIME_UNIT_MILLISECONDS; /* FIXME: store adaptive fast-retry value in 'pr' */
3091       else
3092         delay = get_processing_delay ();
3093 #if DEBUG_FS 
3094       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3095                   "No peer selected for forwarding of query `%s', will try again in %llu ms!\n",
3096                   GNUNET_h2s (&pr->query),
3097                   delay.rel_value);
3098 #endif
3099       pr->task = GNUNET_SCHEDULER_add_delayed (delay,
3100                                                &forward_request_task,
3101                                                pr);
3102       return; /* nobody selected */
3103     }
3104   /* (3) update TTL/priority */
3105   if (pr->client_request_list != NULL)
3106     {
3107       /* FIXME: use better algorithm!? */
3108       if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3109                                          4))
3110         pr->priority++;
3111       /* bound priority we use by priorities we see from other peers
3112          rounded up (must round up so that we can see non-zero
3113          priorities, but round up as little as possible to make it
3114          plausible that we forwarded another peers request) */
3115       if (pr->priority > current_priorities + 1.0)
3116         pr->priority = (uint32_t) current_priorities + 1.0;
3117       pr->ttl = bound_ttl (pr->ttl + TTL_DECREMENT * 2,
3118                            pr->priority);
3119 #if DEBUG_FS
3120       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3121                   "Trying query `%s' with priority %u and TTL %d.\n",
3122                   GNUNET_h2s (&pr->query),
3123                   pr->priority,
3124                   pr->ttl);
3125 #endif
3126     }
3127
3128   /* (3) reserve reply bandwidth */
3129   if (GNUNET_NO == pr->forward_only)
3130     {
3131       cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3132                                               &psc.target.hashPubKey);
3133       GNUNET_assert (NULL != cp);
3134       GNUNET_assert (cp->irc == NULL);
3135       pr->pirc = cp;
3136       cp->pr = pr;
3137       cp->irc = GNUNET_CORE_peer_change_preference (core,
3138                                                     &psc.target,
3139                                                     GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
3140                                                     GNUNET_BANDWIDTH_value_init (UINT32_MAX),
3141                                                     DBLOCK_SIZE * 2, 
3142                                                     cp->inc_preference,
3143                                                     &target_reservation_cb,
3144                                                     pr);
3145       GNUNET_assert (cp->irc != NULL);
3146       cp->inc_preference = 0;
3147     }
3148   else
3149     {
3150       /* force forwarding */
3151       static struct GNUNET_BANDWIDTH_Value32NBO zerobw;
3152       target_reservation_cb (pr, &psc.target,
3153                              zerobw, 0, 0.0);
3154     }
3155 }
3156
3157
3158 /* **************************** P2P PUT Handling ************************ */
3159
3160
3161 /**
3162  * Function called after we either failed or succeeded
3163  * at transmitting a reply to a peer.  
3164  *
3165  * @param cls the requests "struct PendingRequest*"
3166  * @param tpid ID of receiving peer, 0 on transmission error
3167  */
3168 static void
3169 transmit_reply_continuation (void *cls,
3170                              GNUNET_PEER_Id tpid)
3171 {
3172   struct PendingRequest *pr = cls;
3173   
3174   switch (pr->type)
3175     {
3176     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
3177     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
3178       /* only one reply expected, done with the request! */
3179       destroy_pending_request (pr);
3180       break;
3181     case GNUNET_BLOCK_TYPE_ANY:
3182     case GNUNET_BLOCK_TYPE_FS_KBLOCK:
3183     case GNUNET_BLOCK_TYPE_FS_SBLOCK:
3184       break;
3185     default:
3186       GNUNET_break (0);
3187       break;
3188     }
3189 }
3190
3191
3192 /**
3193  * Transmit the given message by copying it to the target buffer
3194  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
3195  * for writing in the meantime.  In that case, do nothing
3196  * (the disconnect or shutdown handler will take care of the rest).
3197  * If we were able to transmit messages and there are still more
3198  * pending, ask core again for further calls to this function.
3199  *
3200  * @param cls closure, pointer to the 'struct ClientList*'
3201  * @param size number of bytes available in buf
3202  * @param buf where the callee should write the message
3203  * @return number of bytes written to buf
3204  */
3205 static size_t
3206 transmit_to_client (void *cls,
3207                   size_t size, void *buf)
3208 {
3209   struct ClientList *cl = cls;
3210   char *cbuf = buf;
3211   struct ClientResponseMessage *creply;
3212   size_t msize;
3213   
3214   cl->th = NULL;
3215   if (NULL == buf)
3216     {
3217 #if DEBUG_FS
3218       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3219                   "Not sending reply, client communication problem.\n");
3220 #endif
3221       return 0;
3222     }
3223   msize = 0;
3224   while ( (NULL != (creply = cl->res_head) ) &&
3225           (creply->msize <= size) )
3226     {
3227       memcpy (&cbuf[msize], &creply[1], creply->msize);
3228       msize += creply->msize;
3229       size -= creply->msize;
3230       GNUNET_CONTAINER_DLL_remove (cl->res_head,
3231                                    cl->res_tail,
3232                                    creply);
3233       GNUNET_free (creply);
3234     }
3235   if (NULL != creply)
3236     cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
3237                                                   creply->msize,
3238                                                   GNUNET_TIME_UNIT_FOREVER_REL,
3239                                                   &transmit_to_client,
3240                                                   cl);
3241 #if DEBUG_FS
3242   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3243               "Transmitted %u bytes to client\n",
3244               (unsigned int) msize);
3245 #endif
3246   return msize;
3247 }
3248
3249
3250 /**
3251  * Closure for "process_reply" function.
3252  */
3253 struct ProcessReplyClosure
3254 {
3255   /**
3256    * The data for the reply.
3257    */
3258   const void *data;
3259
3260   /**
3261    * Who gave us this reply? NULL for local host (or DHT)
3262    */
3263   struct ConnectedPeer *sender;
3264
3265   /**
3266    * When the reply expires.
3267    */
3268   struct GNUNET_TIME_Absolute expiration;
3269
3270   /**
3271    * Size of data.
3272    */
3273   size_t size;
3274
3275   /**
3276    * Type of the block.
3277    */
3278   enum GNUNET_BLOCK_Type type;
3279
3280   /**
3281    * How much was this reply worth to us?
3282    */
3283   uint32_t priority;
3284
3285   /**
3286    * Anonymity requirements for this reply.
3287    */
3288   uint32_t anonymity_level;
3289
3290   /**
3291    * Evaluation result (returned).
3292    */
3293   enum GNUNET_BLOCK_EvaluationResult eval;
3294
3295   /**
3296    * Did we finish processing the associated request?
3297    */ 
3298   int finished;
3299
3300   /**
3301    * Did we find a matching request?
3302    */
3303   int request_found;
3304 };
3305
3306
3307 /**
3308  * We have received a reply; handle it!
3309  *
3310  * @param cls response (struct ProcessReplyClosure)
3311  * @param key our query
3312  * @param value value in the hash map (info about the query)
3313  * @return GNUNET_YES (we should continue to iterate)
3314  */
3315 static int
3316 process_reply (void *cls,
3317                const GNUNET_HashCode * key,
3318                void *value)
3319 {
3320   struct ProcessReplyClosure *prq = cls;
3321   struct PendingRequest *pr = value;
3322   struct PendingMessage *reply;
3323   struct ClientResponseMessage *creply;
3324   struct ClientList *cl;
3325   struct PutMessage *pm;
3326   struct ConnectedPeer *cp;
3327   struct GNUNET_TIME_Relative cur_delay;
3328 #if SUPPORT_DELAYS  
3329 struct GNUNET_TIME_Relative art_delay;
3330 #endif
3331   size_t msize;
3332   unsigned int i;
3333
3334   if (NULL == pr->client_request_list)
3335     {
3336       /* reply will go over the network, check for cover traffic */
3337       if ( (prq->anonymity_level >  1) &&
3338            (cover_content_count < prq->anonymity_level - 1) )
3339         {
3340           /* insufficient cover traffic, skip */
3341           GNUNET_STATISTICS_update (stats,
3342                                     gettext_noop ("# replies suppressed due to lack of cover traffic"),
3343                                     1,
3344                                     GNUNET_NO);
3345           return GNUNET_YES;
3346         }       
3347       if (prq->anonymity_level >  1) 
3348         cover_content_count -= prq->anonymity_level - 1;
3349     }
3350 #if DEBUG_FS
3351   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3352               "Matched result (type %u) for query `%s' with pending request\n",
3353               (unsigned int) prq->type,
3354               GNUNET_h2s (key));
3355 #endif  
3356   GNUNET_STATISTICS_update (stats,
3357                             gettext_noop ("# replies received and matched"),
3358                             1,
3359                             GNUNET_NO);
3360   if (prq->sender != NULL)
3361     {
3362       for (i=0;i<pr->used_targets_off;i++)
3363         if (pr->used_targets[i].pid == prq->sender->pid)
3364           break;
3365       if (i < pr->used_targets_off)
3366         {
3367           cur_delay = GNUNET_TIME_absolute_get_duration (pr->used_targets[i].last_request_time);      
3368           prq->sender->avg_delay.rel_value
3369             = (prq->sender->avg_delay.rel_value * 
3370                (RUNAVG_DELAY_N - 1) + cur_delay.rel_value) / RUNAVG_DELAY_N; 
3371           prq->sender->avg_priority
3372             = (prq->sender->avg_priority * 
3373                (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
3374         }
3375       if (pr->cp != NULL)
3376         {
3377           GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
3378                                  [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], 
3379                                  -1);
3380           GNUNET_PEER_change_rc (pr->cp->pid, 1);
3381           prq->sender->last_p2p_replies
3382             [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
3383             = pr->cp->pid;
3384         }
3385       else
3386         {
3387           if (NULL != prq->sender->last_client_replies
3388               [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
3389             GNUNET_SERVER_client_drop (prq->sender->last_client_replies
3390                                        [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
3391           prq->sender->last_client_replies
3392             [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
3393             = pr->client_request_list->client_list->client;
3394           GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
3395         }
3396     }
3397   prq->eval = GNUNET_BLOCK_evaluate (block_ctx,
3398                                      prq->type,
3399                                      key,
3400                                      &pr->bf,
3401                                      pr->mingle,
3402                                      pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
3403                                      prq->data,
3404                                      prq->size);
3405   switch (prq->eval)
3406     {
3407     case GNUNET_BLOCK_EVALUATION_OK_MORE:
3408       break;
3409     case GNUNET_BLOCK_EVALUATION_OK_LAST:
3410       while (NULL != pr->pending_head)
3411         destroy_pending_message_list_entry (pr->pending_head);
3412       if (pr->qe != NULL)
3413         {
3414           if (pr->client_request_list != NULL)
3415             GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
3416                                         GNUNET_YES);
3417           GNUNET_DATASTORE_cancel (pr->qe);
3418           pr->qe = NULL;
3419         }
3420       pr->do_remove = GNUNET_YES;
3421       if (pr->task != GNUNET_SCHEDULER_NO_TASK)
3422         {
3423           GNUNET_SCHEDULER_cancel (pr->task);
3424           pr->task = GNUNET_SCHEDULER_NO_TASK;
3425         }
3426       GNUNET_break (GNUNET_YES ==
3427                     GNUNET_CONTAINER_multihashmap_remove (query_request_map,
3428                                                           key,
3429                                                           pr));
3430       GNUNET_LOAD_update (rt_entry_lifetime,
3431                           GNUNET_TIME_absolute_get_duration (pr->start_time).rel_value);
3432       break;
3433     case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
3434       GNUNET_STATISTICS_update (stats,
3435                                 gettext_noop ("# duplicate replies discarded (bloomfilter)"),
3436                                 1,
3437                                 GNUNET_NO);
3438 #if DEBUG_FS
3439 /*      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3440                   "Duplicate response `%s', discarding.\n",
3441                   GNUNET_h2s (&mhash));*/
3442 #endif
3443       return GNUNET_YES; /* duplicate */
3444     case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
3445       return GNUNET_YES; /* wrong namespace */  
3446     case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
3447       GNUNET_break (0);
3448       return GNUNET_YES;
3449     case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
3450       GNUNET_break (0);
3451       return GNUNET_YES;
3452     case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
3453       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3454                   _("Unsupported block type %u\n"),
3455                   prq->type);
3456       return GNUNET_NO;
3457     }
3458   if (pr->client_request_list != NULL)
3459     {
3460       if (pr->replies_seen_size == pr->replies_seen_off)
3461         GNUNET_array_grow (pr->replies_seen,
3462                            pr->replies_seen_size,
3463                            pr->replies_seen_size * 2 + 4);      
3464       GNUNET_CRYPTO_hash (prq->data,
3465                           prq->size,
3466                           &pr->replies_seen[pr->replies_seen_off++]);         
3467       refresh_bloomfilter (pr);
3468     }
3469   if (NULL == prq->sender)
3470     {
3471 #if DEBUG_FS
3472       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3473                   "Found result for query `%s' in local datastore\n",
3474                   GNUNET_h2s (key));
3475 #endif
3476       GNUNET_STATISTICS_update (stats,
3477                                 gettext_noop ("# results found locally"),
3478                                 1,
3479                                 GNUNET_NO);      
3480     }
3481   prq->priority += pr->remaining_priority;
3482   pr->remaining_priority = 0;
3483   pr->results_found++;
3484   prq->request_found = GNUNET_YES;
3485   if (NULL != pr->client_request_list)
3486     {
3487       GNUNET_STATISTICS_update (stats,
3488                                 gettext_noop ("# replies received for local clients"),
3489                                 1,
3490                                 GNUNET_NO);
3491       cl = pr->client_request_list->client_list;
3492       msize = sizeof (struct PutMessage) + prq->size;
3493       creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage));
3494       creply->msize = msize;
3495       creply->client_list = cl;
3496       GNUNET_CONTAINER_DLL_insert_after (cl->res_head,
3497                                          cl->res_tail,
3498                                          cl->res_tail,
3499                                          creply);      
3500       pm = (struct PutMessage*) &creply[1];
3501       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3502       pm->header.size = htons (msize);
3503       pm->type = htonl (prq->type);
3504       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3505       memcpy (&pm[1], prq->data, prq->size);      
3506       if (NULL == cl->th)
3507         {
3508 #if DEBUG_FS
3509           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3510                       "Transmitting result for query `%s' to client\n",
3511                       GNUNET_h2s (key));
3512 #endif  
3513           cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
3514                                                         msize,
3515                                                         GNUNET_TIME_UNIT_FOREVER_REL,
3516                                                         &transmit_to_client,
3517                                                         cl);
3518         }
3519       GNUNET_break (cl->th != NULL);
3520       if (pr->do_remove)                
3521         {
3522           prq->finished = GNUNET_YES;
3523           destroy_pending_request (pr);         
3524         }
3525     }
3526   else
3527     {
3528       cp = pr->cp;
3529 #if DEBUG_FS
3530       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3531                   "Transmitting result for query `%s' to other peer (PID=%u)\n",
3532                   GNUNET_h2s (key),
3533                   (unsigned int) cp->pid);
3534 #endif  
3535       GNUNET_STATISTICS_update (stats,
3536                                 gettext_noop ("# replies received for other peers"),
3537                                 1,
3538                                 GNUNET_NO);
3539       msize = sizeof (struct PutMessage) + prq->size;
3540       reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
3541       reply->cont = &transmit_reply_continuation;
3542       reply->cont_cls = pr;
3543 #if SUPPORT_DELAYS
3544       art_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
3545                                                  GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3546                                                                            TTL_DECREMENT));
3547       reply->delay_until 
3548         = GNUNET_TIME_relative_to_absolute (art_delay);
3549       GNUNET_STATISTICS_update (stats,
3550                                 gettext_noop ("cummulative artificial delay introduced (ms)"),
3551                                 art_delay.abs_value,
3552                                 GNUNET_NO);
3553 #endif
3554       reply->msize = msize;
3555       reply->priority = UINT32_MAX; /* send replies first! */
3556       pm = (struct PutMessage*) &reply[1];
3557       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3558       pm->header.size = htons (msize);
3559       pm->type = htonl (prq->type);
3560       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3561       memcpy (&pm[1], prq->data, prq->size);
3562       add_to_pending_messages_for_peer (cp, reply, pr);
3563     }
3564   return GNUNET_YES;
3565 }
3566
3567
3568 /**
3569  * Iterator called on each result obtained for a DHT
3570  * operation that expects a reply
3571  *
3572  * @param cls closure
3573  * @param exp when will this value expire
3574  * @param key key of the result
3575  * @param get_path NULL-terminated array of pointers
3576  *                 to the peers on reverse GET path (or NULL if not recorded)
3577  * @param put_path NULL-terminated array of pointers
3578  *                 to the peers on the PUT path (or NULL if not recorded)
3579  * @param type type of the result
3580  * @param size number of bytes in data
3581  * @param data pointer to the result data
3582  */
3583 static void
3584 process_dht_reply (void *cls,
3585                    struct GNUNET_TIME_Absolute exp,
3586                    const GNUNET_HashCode * key,
3587                    const struct GNUNET_PeerIdentity * const *get_path,
3588                    const struct GNUNET_PeerIdentity * const *put_path,
3589                    enum GNUNET_BLOCK_Type type,
3590                    size_t size,
3591                    const void *data)
3592 {
3593   struct PendingRequest *pr = cls;
3594   struct ProcessReplyClosure prq;
3595
3596   memset (&prq, 0, sizeof (prq));
3597   prq.data = data;
3598   prq.expiration = exp;
3599   prq.size = size;  
3600   prq.type = type;
3601   process_reply (&prq, key, pr);
3602 }
3603
3604
3605
3606 /**
3607  * Continuation called to notify client about result of the
3608  * operation.
3609  *
3610  * @param cls closure
3611  * @param success GNUNET_SYSERR on failure
3612  * @param msg NULL on success, otherwise an error message
3613  */
3614 static void 
3615 put_migration_continuation (void *cls,
3616                             int success,
3617                             const char *msg)
3618 {
3619   struct GNUNET_TIME_Absolute *start = cls;
3620   struct GNUNET_TIME_Relative delay;
3621   
3622   delay = GNUNET_TIME_absolute_get_duration (*start);
3623   GNUNET_free (start);
3624   GNUNET_LOAD_update (datastore_put_load,
3625                       delay.rel_value);
3626   if (GNUNET_OK == success)
3627     return;
3628   GNUNET_STATISTICS_update (stats,
3629                             gettext_noop ("# datastore 'put' failures"),
3630                             1,
3631                             GNUNET_NO);
3632 }
3633
3634
3635 /**
3636  * Handle P2P "PUT" message.
3637  *
3638  * @param cls closure, always NULL
3639  * @param other the other peer involved (sender or receiver, NULL
3640  *        for loopback messages where we are both sender and receiver)
3641  * @param message the actual message
3642  * @param atsi performance information
3643  * @return GNUNET_OK to keep the connection open,
3644  *         GNUNET_SYSERR to close it (signal serious error)
3645  */
3646 static int
3647 handle_p2p_put (void *cls,
3648                 const struct GNUNET_PeerIdentity *other,
3649                 const struct GNUNET_MessageHeader *message,
3650                 const struct GNUNET_TRANSPORT_ATS_Information *atsi)
3651 {
3652   const struct PutMessage *put;
3653   uint16_t msize;
3654   size_t dsize;
3655   enum GNUNET_BLOCK_Type type;
3656   struct GNUNET_TIME_Absolute expiration;
3657   GNUNET_HashCode query;
3658   struct ProcessReplyClosure prq;
3659   struct GNUNET_TIME_Absolute *start;
3660   struct GNUNET_TIME_Relative block_time;  
3661   double putl;
3662   struct ConnectedPeer *cp; 
3663   struct PendingMessage *pm;
3664   struct MigrationStopMessage *msm;
3665
3666   msize = ntohs (message->size);
3667   if (msize < sizeof (struct PutMessage))
3668     {
3669       GNUNET_break_op(0);
3670       return GNUNET_SYSERR;
3671     }
3672   put = (const struct PutMessage*) message;
3673   dsize = msize - sizeof (struct PutMessage);
3674   type = ntohl (put->type);
3675   expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
3676
3677   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3678     return GNUNET_SYSERR;
3679   if (GNUNET_OK !=
3680       GNUNET_BLOCK_get_key (block_ctx,
3681                             type,
3682                             &put[1],
3683                             dsize,
3684                             &query))
3685     {
3686       GNUNET_break_op (0);
3687       return GNUNET_SYSERR;
3688     }
3689   cover_content_count++;
3690 #if DEBUG_FS
3691   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3692               "Received result for query `%s' from peer `%4s'\n",
3693               GNUNET_h2s (&query),
3694               GNUNET_i2s (other));
3695 #endif
3696   GNUNET_STATISTICS_update (stats,
3697                             gettext_noop ("# replies received (overall)"),
3698                             1,
3699                             GNUNET_NO);
3700   /* now, lookup 'query' */
3701   prq.data = (const void*) &put[1];
3702   if (other != NULL)
3703     prq.sender = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3704                                                     &other->hashPubKey);
3705   else
3706     prq.sender = NULL;
3707   prq.size = dsize;
3708   prq.type = type;
3709   prq.expiration = expiration;
3710   prq.priority = 0;
3711   prq.anonymity_level = 1;
3712   prq.finished = GNUNET_NO;
3713   prq.request_found = GNUNET_NO;
3714   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3715                                               &query,
3716                                               &process_reply,
3717                                               &prq);
3718   if (prq.sender != NULL)
3719     {
3720       prq.sender->inc_preference += CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority;
3721       change_host_trust (prq.sender, prq.priority);
3722     }
3723   if ( (GNUNET_YES == active_to_migration) &&
3724        (GNUNET_NO == test_put_load_too_high (prq.priority)) )
3725     {      
3726 #if DEBUG_FS
3727       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3728                   "Replicating result for query `%s' with priority %u\n",
3729                   GNUNET_h2s (&query),
3730                   prq.priority);
3731 #endif
3732       start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
3733       *start = GNUNET_TIME_absolute_get ();
3734       GNUNET_DATASTORE_put (dsh,
3735                             0, &query, dsize, &put[1],
3736                             type, prq.priority, 1 /* anonymity */, 
3737                             expiration, 
3738                             1 + prq.priority, MAX_DATASTORE_QUEUE,
3739                             GNUNET_CONSTANTS_SERVICE_TIMEOUT,
3740                             &put_migration_continuation, 
3741                             start);
3742     }
3743   putl = GNUNET_LOAD_get_load (datastore_put_load);
3744   if ( (NULL != (cp = prq.sender)) &&
3745        (GNUNET_NO == prq.request_found) &&
3746        ( (GNUNET_YES != active_to_migration) ||
3747          (putl > 2.5 * (1 + prq.priority)) ) ) 
3748     {
3749       if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).rel_value < 5000)
3750         return GNUNET_OK; /* already blocked */
3751       /* We're too busy; send MigrationStop message! */
3752       if (GNUNET_YES != active_to_migration) 
3753         putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
3754       block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
3755                                                   5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3756                                                                                    (unsigned int) (60000 * putl * putl)));
3757       
3758       cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
3759       pm = GNUNET_malloc (sizeof (struct PendingMessage) + 
3760                           sizeof (struct MigrationStopMessage));
3761       pm->msize = sizeof (struct MigrationStopMessage);
3762       pm->priority = UINT32_MAX;
3763       msm = (struct MigrationStopMessage*) &pm[1];
3764       msm->header.size = htons (sizeof (struct MigrationStopMessage));
3765       msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
3766       msm->duration = GNUNET_TIME_relative_hton (block_time);
3767       add_to_pending_messages_for_peer (cp,
3768                                         pm,
3769                                         NULL);
3770     }
3771   return GNUNET_OK;
3772 }
3773
3774
3775 /**
3776  * Handle P2P "MIGRATION_STOP" message.
3777  *
3778  * @param cls closure, always NULL
3779  * @param other the other peer involved (sender or receiver, NULL
3780  *        for loopback messages where we are both sender and receiver)
3781  * @param message the actual message
3782  * @param atsi performance information
3783  * @return GNUNET_OK to keep the connection open,
3784  *         GNUNET_SYSERR to close it (signal serious error)
3785  */
3786 static int
3787 handle_p2p_migration_stop (void *cls,
3788                            const struct GNUNET_PeerIdentity *other,
3789                            const struct GNUNET_MessageHeader *message,
3790                            const struct GNUNET_TRANSPORT_ATS_Information *atsi)
3791 {
3792   struct ConnectedPeer *cp; 
3793   const struct MigrationStopMessage *msm;
3794
3795   msm = (const struct MigrationStopMessage*) message;
3796   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3797                                           &other->hashPubKey);
3798   if (cp == NULL)
3799     {
3800       GNUNET_break (0);
3801       return GNUNET_OK;
3802     }
3803   cp->migration_blocked = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration));
3804   return GNUNET_OK;
3805 }
3806
3807
3808
3809 /* **************************** P2P GET Handling ************************ */
3810
3811
3812 /**
3813  * Closure for 'check_duplicate_request_{peer,client}'.
3814  */
3815 struct CheckDuplicateRequestClosure
3816 {
3817   /**
3818    * The new request we should check if it already exists.
3819    */
3820   const struct PendingRequest *pr;
3821
3822   /**
3823    * Existing request found by the checker, NULL if none.
3824    */
3825   struct PendingRequest *have;
3826 };
3827
3828
3829 /**
3830  * Iterator over entries in the 'query_request_map' that
3831  * tries to see if we have the same request pending from
3832  * the same client already.
3833  *
3834  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
3835  * @param key current key code (query, ignored, must match)
3836  * @param value value in the hash map (a 'struct PendingRequest' 
3837  *              that already exists)
3838  * @return GNUNET_YES if we should continue to
3839  *         iterate (no match yet)
3840  *         GNUNET_NO if not (match found).
3841  */
3842 static int
3843 check_duplicate_request_client (void *cls,
3844                                 const GNUNET_HashCode * key,
3845                                 void *value)
3846 {
3847   struct CheckDuplicateRequestClosure *cdc = cls;
3848   struct PendingRequest *have = value;
3849
3850   if (have->client_request_list == NULL)
3851     return GNUNET_YES;
3852   if ( (cdc->pr->client_request_list->client_list->client == have->client_request_list->client_list->client) &&
3853        (cdc->pr != have) )
3854     {
3855       cdc->have = have;
3856       return GNUNET_NO;
3857     }
3858   return GNUNET_YES;
3859 }
3860
3861
3862 /**
3863  * We're processing (local) results for a search request
3864  * from another peer.  Pass applicable results to the
3865  * peer and if we are done either clean up (operation
3866  * complete) or forward to other peers (more results possible).
3867  *
3868  * @param cls our closure (struct LocalGetContext)
3869  * @param key key for the content
3870  * @param size number of bytes in data
3871  * @param data content stored
3872  * @param type type of the content
3873  * @param priority priority of the content
3874  * @param anonymity anonymity-level for the content
3875  * @param expiration expiration time for the content
3876  * @param uid unique identifier for the datum;
3877  *        maybe 0 if no unique identifier is available
3878  */
3879 static void
3880 process_local_reply (void *cls,
3881                      const GNUNET_HashCode * key,
3882                      size_t size,
3883                      const void *data,
3884                      enum GNUNET_BLOCK_Type type,
3885                      uint32_t priority,
3886                      uint32_t anonymity,
3887                      struct GNUNET_TIME_Absolute
3888                      expiration, 
3889                      uint64_t uid)
3890 {
3891   struct PendingRequest *pr = cls;
3892   struct ProcessReplyClosure prq;
3893   struct CheckDuplicateRequestClosure cdrc;
3894   GNUNET_HashCode query;
3895   unsigned int old_rf;
3896   
3897   if (NULL == key)
3898     {
3899 #if DEBUG_FS > 1
3900       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3901                   "Done processing local replies, forwarding request to other peers.\n");
3902 #endif
3903       pr->qe = NULL;
3904       if (pr->client_request_list != NULL)
3905         {
3906           GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
3907                                       GNUNET_YES);
3908           /* Figure out if this is a duplicate request and possibly
3909              merge 'struct PendingRequest' entries */
3910           cdrc.have = NULL;
3911           cdrc.pr = pr;
3912           GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3913                                                       &pr->query,
3914                                                       &check_duplicate_request_client,
3915                                                       &cdrc);
3916           if (cdrc.have != NULL)
3917             {
3918 #if DEBUG_FS
3919               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3920                           "Received request for block `%s' twice from client, will only request once.\n",
3921                           GNUNET_h2s (&pr->query));
3922 #endif
3923               
3924               destroy_pending_request (pr);
3925               return;
3926             }
3927         }
3928       if (pr->local_only == GNUNET_YES)
3929         {
3930           destroy_pending_request (pr);
3931           return;
3932         }
3933       /* no more results */
3934       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
3935         pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task,
3936                                              pr);      
3937       return;
3938     }
3939 #if DEBUG_FS
3940   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3941               "New local response to `%s' of type %u.\n",
3942               GNUNET_h2s (key),
3943               type);
3944 #endif
3945   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3946     {
3947 #if DEBUG_FS
3948       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3949                   "Found ONDEMAND block, performing on-demand encoding\n");
3950 #endif
3951       GNUNET_STATISTICS_update (stats,
3952                                 gettext_noop ("# on-demand blocks matched requests"),
3953                                 1,
3954                                 GNUNET_NO);
3955       if (GNUNET_OK != 
3956           GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, 
3957                                             anonymity, expiration, uid, 
3958                                             &process_local_reply,
3959                                             pr))
3960       if (pr->qe != NULL)
3961         {
3962           GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3963         }
3964       return;
3965     }
3966   old_rf = pr->results_found;
3967   memset (&prq, 0, sizeof (prq));
3968   prq.data = data;
3969   prq.expiration = expiration;
3970   prq.size = size;  
3971   if (GNUNET_OK != 
3972       GNUNET_BLOCK_get_key (block_ctx,
3973                             type,
3974                             data,
3975                             size,
3976                             &query))
3977     {
3978       GNUNET_break (0);
3979       GNUNET_DATASTORE_remove (dsh,
3980                                key,
3981                                size, data,
3982                                -1, -1, 
3983                                GNUNET_TIME_UNIT_FOREVER_REL,
3984                                NULL, NULL);
3985       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3986       return;
3987     }
3988   prq.type = type;
3989   prq.priority = priority;  
3990   prq.finished = GNUNET_NO;
3991   prq.request_found = GNUNET_NO;
3992   prq.anonymity_level = anonymity;
3993   if ( (old_rf == 0) &&
3994        (pr->results_found == 0) )
3995     update_datastore_delays (pr->start_time);
3996   process_reply (&prq, key, pr);
3997   if (prq.finished == GNUNET_YES)
3998     return;
3999   if (pr->qe == NULL)
4000     return; /* done here */
4001   if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
4002     {
4003       pr->local_only = GNUNET_YES; /* do not forward */
4004       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
4005       return;
4006     }
4007   if ( (pr->client_request_list == NULL) &&
4008        ( (GNUNET_YES == test_get_load_too_high (0)) ||
4009          (pr->results_found > 5 + 2 * pr->priority) ) )
4010     {
4011 #if DEBUG_FS > 2
4012       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4013                   "Load too high, done with request\n");
4014 #endif
4015       GNUNET_STATISTICS_update (stats,
4016                                 gettext_noop ("# processing result set cut short due to load"),
4017                                 1,
4018                                 GNUNET_NO);
4019       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
4020       return;
4021     }
4022   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
4023 }
4024
4025
4026 /**
4027  * We've received a request with the specified priority.  Bound it
4028  * according to how much we trust the given peer.
4029  * 
4030  * @param prio_in requested priority
4031  * @param cp the peer making the request
4032  * @return effective priority
4033  */
4034 static int32_t
4035 bound_priority (uint32_t prio_in,
4036                 struct ConnectedPeer *cp)
4037 {
4038 #define N ((double)128.0)
4039   uint32_t ret;
4040   double rret;
4041   int ld;
4042
4043   ld = test_get_load_too_high (0);
4044   if (ld == GNUNET_SYSERR)
4045     {
4046       GNUNET_STATISTICS_update (stats,
4047                                 gettext_noop ("# requests done for free (low load)"),
4048                                 1,
4049                                 GNUNET_NO);
4050       return 0; /* excess resources */
4051     }
4052   if (prio_in > INT32_MAX)
4053     prio_in = INT32_MAX;
4054   ret = - change_host_trust (cp, - (int) prio_in);
4055   if (ret > 0)
4056     {
4057       if (ret > current_priorities + N)
4058         rret = current_priorities + N;
4059       else
4060         rret = ret;
4061       current_priorities 
4062         = (current_priorities * (N-1) + rret)/N;
4063     }
4064   if ( (ld == GNUNET_YES) && (ret > 0) )
4065     {
4066       /* try with charging */
4067       ld = test_get_load_too_high (ret);
4068     }
4069   if (ld == GNUNET_YES)
4070     {
4071       GNUNET_STATISTICS_update (stats,
4072                                 gettext_noop ("# request dropped, priority insufficient"),
4073                                 1,
4074                                 GNUNET_NO);
4075       /* undo charge */
4076       change_host_trust (cp, (int) ret);
4077       return -1; /* not enough resources */
4078     }
4079   else
4080     {
4081       GNUNET_STATISTICS_update (stats,
4082                                 gettext_noop ("# requests done for a price (normal load)"),
4083                                 1,
4084                                 GNUNET_NO);
4085     }
4086 #undef N
4087   return ret;
4088 }
4089
4090
4091 /**
4092  * Iterator over entries in the 'query_request_map' that
4093  * tries to see if we have the same request pending from
4094  * the same peer already.
4095  *
4096  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
4097  * @param key current key code (query, ignored, must match)
4098  * @param value value in the hash map (a 'struct PendingRequest' 
4099  *              that already exists)
4100  * @return GNUNET_YES if we should continue to
4101  *         iterate (no match yet)
4102  *         GNUNET_NO if not (match found).
4103  */
4104 static int
4105 check_duplicate_request_peer (void *cls,
4106                               const GNUNET_HashCode * key,
4107                               void *value)
4108 {
4109   struct CheckDuplicateRequestClosure *cdc = cls;
4110   struct PendingRequest *have = value;
4111
4112   if (cdc->pr->target_pid == have->target_pid)
4113     {
4114       cdc->have = have;
4115       return GNUNET_NO;
4116     }
4117   return GNUNET_YES;
4118 }
4119
4120
4121 /**
4122  * Handle P2P "GET" request.
4123  *
4124  * @param cls closure, always NULL
4125  * @param other the other peer involved (sender or receiver, NULL
4126  *        for loopback messages where we are both sender and receiver)
4127  * @param message the actual message
4128  * @param atsi performance information
4129  * @return GNUNET_OK to keep the connection open,
4130  *         GNUNET_SYSERR to close it (signal serious error)
4131  */
4132 static int
4133 handle_p2p_get (void *cls,
4134                 const struct GNUNET_PeerIdentity *other,
4135                 const struct GNUNET_MessageHeader *message,
4136                 const struct GNUNET_TRANSPORT_ATS_Information *atsi)
4137 {
4138   struct PendingRequest *pr;
4139   struct ConnectedPeer *cp;
4140   struct ConnectedPeer *cps;
4141   struct CheckDuplicateRequestClosure cdc;
4142   struct GNUNET_TIME_Relative timeout;
4143   uint16_t msize;
4144   const struct GetMessage *gm;
4145   unsigned int bits;
4146   const GNUNET_HashCode *opt;
4147   uint32_t bm;
4148   size_t bfsize;
4149   uint32_t ttl_decrement;
4150   int32_t priority;
4151   enum GNUNET_BLOCK_Type type;
4152   int have_ns;
4153
4154   msize = ntohs(message->size);
4155   if (msize < sizeof (struct GetMessage))
4156     {
4157       GNUNET_break_op (0);
4158       return GNUNET_SYSERR;
4159     }
4160   gm = (const struct GetMessage*) message;
4161 #if DEBUG_FS
4162   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4163               "Received request for `%s'\n",
4164               GNUNET_h2s (&gm->query));
4165 #endif
4166   type = ntohl (gm->type);
4167   bm = ntohl (gm->hash_bitmap);
4168   bits = 0;
4169   while (bm > 0)
4170     {
4171       if (1 == (bm & 1))
4172         bits++;
4173       bm >>= 1;
4174     }
4175   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
4176     {
4177       GNUNET_break_op (0);
4178       return GNUNET_SYSERR;
4179     }  
4180   opt = (const GNUNET_HashCode*) &gm[1];
4181   bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (GNUNET_HashCode);
4182   /* bfsize must be power of 2, check! */
4183   if (0 != ( (bfsize - 1) & bfsize))
4184     {
4185       GNUNET_break_op (0);
4186       return GNUNET_SYSERR;
4187     }
4188   cover_query_count++;
4189   bm = ntohl (gm->hash_bitmap);
4190   bits = 0;
4191   cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
4192                                            &other->hashPubKey);
4193   if (NULL == cps)
4194     {
4195       /* peer must have just disconnected */
4196       GNUNET_STATISTICS_update (stats,
4197                                 gettext_noop ("# requests dropped due to initiator not being connected"),
4198                                 1,
4199                                 GNUNET_NO);
4200       return GNUNET_SYSERR;
4201     }
4202   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
4203     cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
4204                                             &opt[bits++]);
4205   else
4206     cp = cps;
4207   if (cp == NULL)
4208     {
4209 #if DEBUG_FS
4210       if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
4211         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4212                     "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
4213                     GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
4214       
4215       else
4216         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4217                     "Failed to find peer `%4s' in connection set. Dropping query.\n",
4218                     GNUNET_i2s (other));
4219 #endif
4220       GNUNET_STATISTICS_update (stats,
4221                                 gettext_noop ("# requests dropped due to missing reverse route"),
4222                                 1,
4223                                 GNUNET_NO);
4224      /* FIXME: try connect? */
4225       return GNUNET_OK;
4226     }
4227   /* note that we can really only check load here since otherwise
4228      peers could find out that we are overloaded by not being
4229      disconnected after sending us a malformed query... */
4230   priority = bound_priority (ntohl (gm->priority), cps);
4231   if (priority < 0)
4232     {
4233 #if DEBUG_FS
4234       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4235                   "Dropping query from `%s', this peer is too busy.\n",
4236                   GNUNET_i2s (other));
4237 #endif
4238       return GNUNET_OK;
4239     }
4240 #if DEBUG_FS 
4241   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4242               "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
4243               GNUNET_h2s (&gm->query),
4244               (unsigned int) type,
4245               GNUNET_i2s (other),
4246               (unsigned int) bm);
4247 #endif
4248   have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
4249   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
4250                       (have_ns ? sizeof(GNUNET_HashCode) : 0));
4251   if (have_ns)
4252     {
4253       pr->namespace = (GNUNET_HashCode*) &pr[1];
4254       memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
4255     }
4256   if ( (GNUNET_LOAD_get_load (cp->transmission_delay) > 3 * (1 + priority)) ||
4257        (GNUNET_LOAD_get_average (cp->transmission_delay) > 
4258         GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value * 2 + GNUNET_LOAD_get_average (rt_entry_lifetime)) )
4259     {
4260       /* don't have BW to send to peer, or would likely take longer than we have for it,
4261          so at best indirect the query */
4262       priority = 0;
4263       pr->forward_only = GNUNET_YES;
4264     }
4265   pr->type = type;
4266   pr->mingle = ntohl (gm->filter_mutator);
4267   if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
4268     pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
4269   pr->anonymity_level = 1;
4270   pr->priority = (uint32_t) priority;
4271   pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
4272   pr->query = gm->query;
4273   /* decrement ttl (always) */
4274   ttl_decrement = 2 * TTL_DECREMENT +
4275     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
4276                               TTL_DECREMENT);
4277   if ( (pr->ttl < 0) &&
4278        (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
4279     {
4280 #if DEBUG_FS
4281       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4282                   "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
4283                   GNUNET_i2s (other),
4284                   pr->ttl,
4285                   ttl_decrement);
4286 #endif
4287       GNUNET_STATISTICS_update (stats,
4288                                 gettext_noop ("# requests dropped due TTL underflow"),
4289                                 1,
4290                                 GNUNET_NO);
4291       /* integer underflow => drop (should be very rare)! */      
4292       GNUNET_free (pr);
4293       return GNUNET_OK;
4294     } 
4295   pr->ttl -= ttl_decrement;
4296   pr->start_time = GNUNET_TIME_absolute_get ();
4297
4298   /* get bloom filter */
4299   if (bfsize > 0)
4300     {
4301       pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
4302                                                   bfsize,
4303                                                   BLOOMFILTER_K);
4304       pr->bf_size = bfsize;
4305     }
4306   cdc.have = NULL;
4307   cdc.pr = pr;
4308   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
4309                                               &gm->query,
4310                                               &check_duplicate_request_peer,
4311                                               &cdc);
4312   if (cdc.have != NULL)
4313     {
4314       if (cdc.have->start_time.abs_value + cdc.have->ttl >=
4315           pr->start_time.abs_value + pr->ttl)
4316         {
4317           /* existing request has higher TTL, drop new one! */
4318           cdc.have->priority += pr->priority;
4319           destroy_pending_request (pr);
4320 #if DEBUG_FS
4321           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4322                       "Have existing request with higher TTL, dropping new request.\n",
4323                       GNUNET_i2s (other));
4324 #endif
4325           GNUNET_STATISTICS_update (stats,
4326                                     gettext_noop ("# requests dropped due to higher-TTL request"),
4327                                     1,
4328                                     GNUNET_NO);
4329           return GNUNET_OK;
4330         }
4331       else
4332         {
4333           /* existing request has lower TTL, drop old one! */
4334           pr->priority += cdc.have->priority;
4335           /* Possible optimization: if we have applicable pending
4336              replies in 'cdc.have', we might want to move those over
4337              (this is a really rare special-case, so it is not clear
4338              that this would be worth it) */
4339           destroy_pending_request (cdc.have);
4340           /* keep processing 'pr'! */
4341         }
4342     }
4343
4344   pr->cp = cp;
4345   GNUNET_break (GNUNET_OK ==
4346                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
4347                                                    &gm->query,
4348                                                    pr,
4349                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4350   GNUNET_break (GNUNET_OK ==
4351                 GNUNET_CONTAINER_multihashmap_put (peer_request_map,
4352                                                    &other->hashPubKey,
4353                                                    pr,
4354                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4355   
4356   pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
4357                                             pr,
4358                                             pr->start_time.abs_value + pr->ttl);
4359
4360   GNUNET_STATISTICS_update (stats,
4361                             gettext_noop ("# P2P searches received"),
4362                             1,
4363                             GNUNET_NO);
4364   GNUNET_STATISTICS_update (stats,
4365                             gettext_noop ("# P2P searches active"),
4366                             1,
4367                             GNUNET_NO);
4368
4369   /* calculate change in traffic preference */
4370   cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
4371   /* process locally */
4372   if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
4373     type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
4374   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
4375                                            (pr->priority + 1)); 
4376   if (GNUNET_YES != pr->forward_only)
4377     {
4378 #if DEBUG_FS
4379       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4380                   "Handing request for `%s' to datastore\n",
4381                   GNUNET_h2s (&gm->query));
4382 #endif
4383       pr->qe = GNUNET_DATASTORE_get (dsh,
4384                                      &gm->query,
4385                                      type,                             
4386                                      pr->priority + 1,
4387                                      MAX_DATASTORE_QUEUE,                                
4388                                      timeout,
4389                                      &process_local_reply,
4390                                      pr);
4391       if (NULL == pr->qe)
4392         {
4393           GNUNET_STATISTICS_update (stats,
4394                                     gettext_noop ("# requests dropped by datastore (queue length limit)"),
4395                                     1,
4396                                     GNUNET_NO);
4397         }
4398     }
4399   else
4400     {
4401       GNUNET_STATISTICS_update (stats,
4402                                 gettext_noop ("# requests forwarded due to high load"),
4403                                 1,
4404                                 GNUNET_NO);
4405     }
4406
4407   /* Are multiple results possible (and did we look locally)?  If so, start processing remotely now! */
4408   switch (pr->type)
4409     {
4410     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
4411     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
4412       /* only one result, wait for datastore */
4413       if (GNUNET_YES != pr->forward_only)
4414         {
4415           GNUNET_STATISTICS_update (stats,
4416                                     gettext_noop ("# requests not instantly forwarded (waiting for datastore)"),
4417                                     1,
4418                                     GNUNET_NO);
4419           break;
4420         }
4421     default:
4422       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
4423         pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task,
4424                                              pr);
4425     }
4426
4427   /* make sure we don't track too many requests */
4428   if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
4429     {
4430       pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
4431       GNUNET_assert (pr != NULL);
4432       destroy_pending_request (pr);
4433     }
4434   return GNUNET_OK;
4435 }
4436
4437
4438 /* **************************** CS GET Handling ************************ */
4439
4440
4441 /**
4442  * Handle START_SEARCH-message (search request from client).
4443  *
4444  * @param cls closure
4445  * @param client identification of the client
4446  * @param message the actual message
4447  */
4448 static void
4449 handle_start_search (void *cls,
4450                      struct GNUNET_SERVER_Client *client,
4451                      const struct GNUNET_MessageHeader *message)
4452 {
4453   static GNUNET_HashCode all_zeros;
4454   const struct SearchMessage *sm;
4455   struct ClientList *cl;
4456   struct ClientRequestList *crl;
4457   struct PendingRequest *pr;
4458   uint16_t msize;
4459   unsigned int sc;
4460   enum GNUNET_BLOCK_Type type;
4461
4462   msize = ntohs (message->size);
4463   if ( (msize < sizeof (struct SearchMessage)) ||
4464        (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
4465     {
4466       GNUNET_break (0);
4467       GNUNET_SERVER_receive_done (client,
4468                                   GNUNET_SYSERR);
4469       return;
4470     }
4471   GNUNET_STATISTICS_update (stats,
4472                             gettext_noop ("# client searches received"),
4473                             1,
4474                             GNUNET_NO);
4475   sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
4476   sm = (const struct SearchMessage*) message;
4477   type = ntohl (sm->type);
4478 #if DEBUG_FS
4479   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4480               "Received request for `%s' of type %u from local client\n",
4481               GNUNET_h2s (&sm->query),
4482               (unsigned int) type);
4483 #endif
4484   cl = client_list;
4485   while ( (cl != NULL) &&
4486           (cl->client != client) )
4487     cl = cl->next;
4488   if (cl == NULL)
4489     {
4490       cl = GNUNET_malloc (sizeof (struct ClientList));
4491       cl->client = client;
4492       GNUNET_SERVER_client_keep (client);
4493       cl->next = client_list;
4494       client_list = cl;
4495     }
4496   /* detect duplicate KBLOCK requests */
4497   if ( (type == GNUNET_BLOCK_TYPE_FS_KBLOCK) ||
4498        (type == GNUNET_BLOCK_TYPE_FS_NBLOCK) ||
4499        (type == GNUNET_BLOCK_TYPE_ANY) )
4500     {
4501       crl = cl->rl_head;
4502       while ( (crl != NULL) &&
4503               ( (0 != memcmp (&crl->req->query,
4504                               &sm->query,
4505                               sizeof (GNUNET_HashCode))) ||
4506                 (crl->req->type != type) ) )
4507         crl = crl->next;
4508       if (crl != NULL)  
4509         { 
4510 #if DEBUG_FS
4511           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4512                       "Have existing request, merging content-seen lists.\n");
4513 #endif
4514           pr = crl->req;
4515           /* Duplicate request (used to send long list of
4516              known/blocked results); merge 'pr->replies_seen'
4517              and update bloom filter */
4518           GNUNET_array_grow (pr->replies_seen,
4519                              pr->replies_seen_size,
4520                              pr->replies_seen_off + sc);
4521           memcpy (&pr->replies_seen[pr->replies_seen_off],
4522                   &sm[1],
4523                   sc * sizeof (GNUNET_HashCode));
4524           pr->replies_seen_off += sc;
4525           refresh_bloomfilter (pr);
4526           GNUNET_STATISTICS_update (stats,
4527                                     gettext_noop ("# client searches updated (merged content seen list)"),
4528                                     1,
4529                                     GNUNET_NO);
4530           GNUNET_SERVER_receive_done (client,
4531                                       GNUNET_OK);
4532           return;
4533         }
4534     }
4535   GNUNET_STATISTICS_update (stats,
4536                             gettext_noop ("# client searches active"),
4537                             1,
4538                             GNUNET_NO);
4539   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
4540                       ((type == GNUNET_BLOCK_TYPE_FS_SBLOCK) ? sizeof(GNUNET_HashCode) : 0));
4541   crl = GNUNET_malloc (sizeof (struct ClientRequestList));
4542   memset (crl, 0, sizeof (struct ClientRequestList));
4543   crl->client_list = cl;
4544   GNUNET_CONTAINER_DLL_insert (cl->rl_head,
4545                                cl->rl_tail,
4546                                crl);  
4547   crl->req = pr;
4548   pr->type = type;
4549   pr->client_request_list = crl;
4550   GNUNET_array_grow (pr->replies_seen,
4551                      pr->replies_seen_size,
4552                      sc);
4553   memcpy (pr->replies_seen,
4554           &sm[1],
4555           sc * sizeof (GNUNET_HashCode));
4556   pr->replies_seen_off = sc;
4557   pr->anonymity_level = ntohl (sm->anonymity_level); 
4558   pr->start_time = GNUNET_TIME_absolute_get ();
4559   refresh_bloomfilter (pr);
4560   pr->query = sm->query;
4561   if (0 == (1 & ntohl (sm->options)))
4562     pr->local_only = GNUNET_NO;
4563   else
4564     pr->local_only = GNUNET_YES;
4565   switch (type)
4566     {
4567     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
4568     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
4569       if (0 != memcmp (&sm->target,
4570                        &all_zeros,
4571                        sizeof (GNUNET_HashCode)))
4572         pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target);
4573       break;
4574     case GNUNET_BLOCK_TYPE_FS_SBLOCK:
4575       pr->namespace = (GNUNET_HashCode*) &pr[1];
4576       memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode));
4577       break;
4578     default:
4579       break;
4580     }
4581   GNUNET_break (GNUNET_OK ==
4582                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
4583                                                    &sm->query,
4584                                                    pr,
4585                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4586   if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
4587     type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */
4588   pr->qe = GNUNET_DATASTORE_get (dsh,
4589                                  &sm->query,
4590                                  type,
4591                                  -3, -1,
4592                                  GNUNET_CONSTANTS_SERVICE_TIMEOUT,                             
4593                                  &process_local_reply,
4594                                  pr);
4595 }
4596
4597
4598 /* **************************** Startup ************************ */
4599
4600
4601
4602 /**
4603  * Function called after GNUNET_CORE_connect has succeeded
4604  * (or failed for good).  Note that the private key of the
4605  * peer is intentionally not exposed here; if you need it,
4606  * your process should try to read the private key file
4607  * directly (which should work if you are authorized...).
4608  *
4609  * @param cls closure
4610  * @param server handle to the server, NULL if we failed
4611  * @param my_identity ID of this peer, NULL if we failed
4612  * @param publicKey public key of this peer, NULL if we failed
4613  */
4614 static void
4615 peer_init_handler (void *cls,
4616                    struct GNUNET_CORE_Handle * server,
4617                    const struct GNUNET_PeerIdentity *
4618                    my_identity,
4619                    const struct
4620                    GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *
4621                    publicKey)
4622 {
4623   my_id = *my_identity;
4624 }
4625
4626
4627
4628
4629 /**
4630  * Process fs requests.
4631  *
4632  * @param server the initialized server
4633  * @param c configuration to use
4634  */
4635 static int
4636 main_init (struct GNUNET_SERVER_Handle *server,
4637            const struct GNUNET_CONFIGURATION_Handle *c)
4638 {
4639   static const struct GNUNET_CORE_MessageHandler p2p_handlers[] =
4640     {
4641       { &handle_p2p_get, 
4642         GNUNET_MESSAGE_TYPE_FS_GET, 0 },
4643       { &handle_p2p_put, 
4644         GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
4645       { &handle_p2p_migration_stop, 
4646         GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP,
4647         sizeof (struct MigrationStopMessage) },
4648       { NULL, 0, 0 }
4649     };
4650   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
4651     {&GNUNET_FS_handle_index_start, NULL, 
4652      GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
4653     {&GNUNET_FS_handle_index_list_get, NULL, 
4654      GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
4655     {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
4656      sizeof (struct UnindexMessage) },
4657     {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
4658      0 },
4659     {NULL, NULL, 0, 0}
4660   };
4661   unsigned long long enc = 128;
4662
4663   cfg = c;
4664   stats = GNUNET_STATISTICS_create ("fs", cfg);
4665   min_migration_delay = GNUNET_TIME_UNIT_SECONDS;
4666   if ( (GNUNET_OK !=
4667         GNUNET_CONFIGURATION_get_value_number (cfg,
4668                                                "fs",
4669                                                "MAX_PENDING_REQUESTS",
4670                                                &max_pending_requests)) ||
4671        (GNUNET_OK !=
4672         GNUNET_CONFIGURATION_get_value_number (cfg,
4673                                                "fs",
4674                                                "EXPECTED_NEIGHBOUR_COUNT",
4675                                                &enc)) ||
4676        (GNUNET_OK != 
4677         GNUNET_CONFIGURATION_get_value_time (cfg,
4678                                              "fs",
4679                                              "MIN_MIGRATION_DELAY",
4680                                              &min_migration_delay)) )
4681     {
4682       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4683                   _("Configuration fails to specify certain parameters, assuming default values."));
4684     }
4685   connected_peers = GNUNET_CONTAINER_multihashmap_create (enc); 
4686   query_request_map = GNUNET_CONTAINER_multihashmap_create (max_pending_requests);
4687   rt_entry_lifetime = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_FOREVER_REL);
4688   peer_request_map = GNUNET_CONTAINER_multihashmap_create (enc);
4689   requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
4690   core = GNUNET_CORE_connect (cfg,
4691                               1, /* larger? */
4692                               NULL,
4693                               &peer_init_handler,
4694                               &peer_connect_handler,
4695                               &peer_disconnect_handler,
4696                               &peer_status_handler,
4697                               NULL, GNUNET_NO,
4698                               NULL, GNUNET_NO,
4699                               p2p_handlers);
4700   if (NULL == core)
4701     {
4702       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
4703                   _("Failed to connect to `%s' service.\n"),
4704                   "core");
4705       GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
4706       connected_peers = NULL;
4707       GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
4708       query_request_map = NULL;
4709       GNUNET_LOAD_value_free (rt_entry_lifetime);
4710       rt_entry_lifetime = NULL;
4711       GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
4712       requests_by_expiration_heap = NULL;
4713       GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
4714       peer_request_map = NULL;
4715       if (dsh != NULL)
4716         {
4717           GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4718           dsh = NULL;
4719         }
4720       return GNUNET_SYSERR;
4721     }
4722   if (active_from_migration) 
4723     {
4724       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4725                   _("Content migration is enabled, will start to gather data\n"));
4726       consider_migration_gathering ();
4727     }
4728   consider_dht_put_gathering (NULL);
4729   GNUNET_SERVER_disconnect_notify (server, 
4730                                    &handle_client_disconnect,
4731                                    NULL);
4732   GNUNET_assert (GNUNET_OK ==
4733                  GNUNET_CONFIGURATION_get_value_filename (cfg,
4734                                                           "fs",
4735                                                           "TRUST",
4736                                                           &trustDirectory));
4737   GNUNET_DISK_directory_create (trustDirectory);
4738   GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_HIGH,
4739                                       &cron_flush_trust, NULL);
4740
4741
4742   GNUNET_SERVER_add_handlers (server, handlers);
4743   cover_age_task = GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY,
4744                                                  &age_cover_counters,
4745                                                  NULL);
4746   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
4747                                 &shutdown_task,
4748                                 NULL);
4749   return GNUNET_OK;
4750 }
4751
4752
4753 /**
4754  * Process fs requests.
4755  *
4756  * @param cls closure
4757  * @param server the initialized server
4758  * @param cfg configuration to use
4759  */
4760 static void
4761 run (void *cls,
4762      struct GNUNET_SERVER_Handle *server,
4763      const struct GNUNET_CONFIGURATION_Handle *cfg)
4764 {
4765   active_to_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
4766                                                               "FS",
4767                                                               "CONTENT_CACHING");
4768   active_from_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
4769                                                                 "FS",
4770                                                                 "CONTENT_PUSHING");
4771   dsh = GNUNET_DATASTORE_connect (cfg);
4772   if (dsh == NULL)
4773     {
4774       GNUNET_SCHEDULER_shutdown ();
4775       return;
4776     }
4777   datastore_get_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
4778   datastore_put_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
4779   block_cfg = GNUNET_CONFIGURATION_create ();
4780   GNUNET_CONFIGURATION_set_value_string (block_cfg,
4781                                          "block",
4782                                          "PLUGINS",
4783                                          "fs");
4784   block_ctx = GNUNET_BLOCK_context_create (block_cfg);
4785   GNUNET_assert (NULL != block_ctx);
4786   dht_handle = GNUNET_DHT_connect (cfg,
4787                                    FS_DHT_HT_SIZE);
4788   if ( (GNUNET_OK != GNUNET_FS_indexing_init (cfg, dsh)) ||
4789        (GNUNET_OK != main_init (server, cfg)) )
4790     {    
4791       GNUNET_SCHEDULER_shutdown ();
4792       GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4793       dsh = NULL;
4794       GNUNET_DHT_disconnect (dht_handle);
4795       dht_handle = NULL;
4796       GNUNET_BLOCK_context_destroy (block_ctx);
4797       block_ctx = NULL;
4798       GNUNET_CONFIGURATION_destroy (block_cfg);
4799       block_cfg = NULL;
4800       GNUNET_LOAD_value_free (datastore_get_load);
4801       datastore_get_load = NULL;
4802       GNUNET_LOAD_value_free (datastore_put_load);
4803       datastore_put_load = NULL;
4804       return;   
4805     }
4806 }
4807
4808
4809 /**
4810  * The main function for the fs service.
4811  *
4812  * @param argc number of arguments from the command line
4813  * @param argv command line arguments
4814  * @return 0 ok, 1 on error
4815  */
4816 int
4817 main (int argc, char *const *argv)
4818 {
4819   return (GNUNET_OK ==
4820           GNUNET_SERVICE_run (argc,
4821                               argv,
4822                               "fs",
4823                               GNUNET_SERVICE_OPTION_NONE,
4824                               &run, NULL)) ? 0 : 1;
4825 }
4826
4827 /* end of gnunet-service-fs.c */