fixing block reconstruction start/shutdown code
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs.c
23  * @brief gnunet anonymity protocol implementation
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - collect traffic data for anonymity levels > 1
28  * - implement transmission restrictions for anonymity level > 1
29  * - more statistics
30  */
31 #include "platform.h"
32 #include <float.h>
33 #include "gnunet_constants.h"
34 #include "gnunet_core_service.h"
35 #include "gnunet_dht_service.h"
36 #include "gnunet_datastore_service.h"
37 #include "gnunet_load_lib.h"
38 #include "gnunet_peer_lib.h"
39 #include "gnunet_protocols.h"
40 #include "gnunet_signatures.h"
41 #include "gnunet_statistics_service.h"
42 #include "gnunet_transport_service.h"
43 #include "gnunet_util_lib.h"
44 #include "gnunet-service-fs_indexing.h"
45 #include "fs.h"
46
47 #define DEBUG_FS GNUNET_NO
48
49 /**
50  * Should we introduce random latency in processing?  Required for proper
51  * implementation of GAP, but can be disabled for performance evaluation of
52  * the basic routing algorithm.
53  *
54  * Note that with delays enabled, performance can be significantly lower
55  * (several orders of magnitude in 2-peer test runs); if you want to
56  * measure throughput of other components, set this to NO.  Also, you
57  * might want to consider changing 'RETRY_PROBABILITY_INV' to 1 for
58  * a rather wasteful mode of operation (that might still get the highest
59  * throughput overall).
60  *
61  * Performance measurements (for 50 MB file, 2 peers):
62  *
63  * - Without delays: 3300 kb/s
64  * - With    delays:  101 kb/s
65  */
66 #define SUPPORT_DELAYS GNUNET_NO
67
68 /**
69  * Size for the hash map for DHT requests from the FS
70  * service.  Should be about the number of concurrent
71  * DHT requests we plan to make.
72  */
73 #define FS_DHT_HT_SIZE 1024
74
75 /**
76  * At what frequency should our datastore load decrease
77  * automatically (since if we don't use it, clearly the
78  * load must be going down).
79  */
80 #define DATASTORE_LOAD_AUTODECLINE GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 250)
81
82 /**
83  * How often do we flush trust values to disk?
84  */
85 #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
86
87 /**
88  * How quickly do we age cover traffic?  At the given 
89  * time interval, remaining cover traffic counters are
90  * decremented by 1/16th.
91  */
92 #define COVER_AGE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
93
94 /**
95  * How often do we at most PUT content into the DHT?
96  */
97 #define MAX_DHT_PUT_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
98
99 /**
100  * Inverse of the probability that we will submit the same query
101  * to the same peer again.  If the same peer already got the query
102  * repeatedly recently, the probability is multiplied by the inverse
103  * of this number each time.  Note that we only try about every TTL_DECREMENT/2
104  * plus MAX_CORK_DELAY (so roughly every 3.5s).
105  *
106  * Note that this factor is a key influence to performance in small
107  * networks (especially test networks of 2 peers) because if there is
108  * only a single peer with the data, this value will determine how
109  * soon we might re-try.  For example, a value of 3 can result in 
110  * 1.7 MB/s transfer rates for a 10 MB file when a value of 1 would
111  * give us 5 MB/s.  OTOH, obviously re-trying the same peer can be
112  * rather inefficient in larger networks, hence picking 1 is in 
113  * general not the best choice.
114  *
115  * Performance measurements (for 50 MB file, 2 peers, no delays):
116  *
117  * - 1: 3300 kb/s (consistently)
118  * - 3: 2046 kb/s, 754 kb/s, 3490 kb/s
119  * - 5:  759 kb/s, 968 kb/s, 1160 kb/s
120  *
121  * Note that this does NOT mean that the value should be 1 since
122  * a 2-peer network is far from representative here (and this fails
123  * to take into consideration bandwidth wasted by repeatedly 
124  * sending queries to peers that don't have the content).  Also,
125  * it is expected that higher values lead to more inconsistent
126  * measurements since this only affects lost messages towards the
127  * end of the download.
128  *
129  * Finally, we should probably consider changing this and making
130  * it dependent on the number of connected peers or a related
131  * metric (bad magic constants...).
132  */
133 #define RETRY_PROBABILITY_INV 1
134
135 /**
136  * What is the maximum delay for a P2P FS message (in our interaction
137  * with core)?  FS-internal delays are another story.  The value is
138  * chosen based on the 32k block size.  Given that peers typcially
139  * have at least 1 kb/s bandwidth, 45s waits give us a chance to
140  * transmit one message even to the lowest-bandwidth peers.
141  */
142 #define MAX_TRANSMIT_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 45)
143
144 /**
145  * Maximum number of requests (from other peers, overall) that we're
146  * willing to have pending at any given point in time.  Can be changed
147  * via the configuration file (32k is just the default).
148  */
149 static unsigned long long max_pending_requests = (32 * 1024);
150
151
152 /**
153  * Information we keep for each pending reply.  The
154  * actual message follows at the end of this struct.
155  */
156 struct PendingMessage;
157
158 /**
159  * Function called upon completion of a transmission.
160  *
161  * @param cls closure
162  * @param pid ID of receiving peer, 0 on transmission error
163  */
164 typedef void (*TransmissionContinuation)(void * cls, 
165                                          GNUNET_PEER_Id tpid);
166
167
168 /**
169  * Information we keep for each pending message (GET/PUT).  The
170  * actual message follows at the end of this struct.
171  */
172 struct PendingMessage
173 {
174   /**
175    * This is a doubly-linked list of messages to the same peer.
176    */
177   struct PendingMessage *next;
178
179   /**
180    * This is a doubly-linked list of messages to the same peer.
181    */
182   struct PendingMessage *prev;
183
184   /**
185    * Entry in pending message list for this pending message.
186    */ 
187   struct PendingMessageList *pml;  
188
189   /**
190    * Function to call immediately once we have transmitted this
191    * message.
192    */
193   TransmissionContinuation cont;
194
195   /**
196    * Closure for cont.
197    */
198   void *cont_cls;
199
200   /**
201    * Do not transmit this pending message until this deadline.
202    */
203   struct GNUNET_TIME_Absolute delay_until;
204
205   /**
206    * Size of the reply; actual reply message follows
207    * at the end of this struct.
208    */
209   size_t msize;
210   
211   /**
212    * How important is this message for us?
213    */
214   uint32_t priority;
215  
216 };
217
218
219 /**
220  * Information about a peer that we are connected to.
221  * We track data that is useful for determining which
222  * peers should receive our requests.  We also keep
223  * a list of messages to transmit to this peer.
224  */
225 struct ConnectedPeer
226 {
227
228   /**
229    * List of the last clients for which this peer successfully
230    * answered a query.
231    */
232   struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
233
234   /**
235    * List of the last PIDs for which
236    * this peer successfully answered a query;
237    * We use 0 to indicate no successful reply.
238    */
239   GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
240
241   /**
242    * Average delay between sending the peer a request and
243    * getting a reply (only calculated over the requests for
244    * which we actually got a reply).   Calculated
245    * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
246    */ 
247   struct GNUNET_TIME_Relative avg_delay;
248
249   /**
250    * Point in time until which this peer does not want us to migrate content
251    * to it.
252    */
253   struct GNUNET_TIME_Absolute migration_blocked;
254
255   /**
256    * Time until when we blocked this peer from migrating
257    * data to us.
258    */
259   struct GNUNET_TIME_Absolute last_migration_block;
260
261   /**
262    * Transmission times for the last MAX_QUEUE_PER_PEER
263    * requests for this peer.  Used as a ring buffer, current
264    * offset is stored in 'last_request_times_off'.  If the
265    * oldest entry is more recent than the 'avg_delay', we should
266    * not send any more requests right now.
267    */
268   struct GNUNET_TIME_Absolute last_request_times[MAX_QUEUE_PER_PEER];
269
270   /**
271    * Handle for an active request for transmission to this
272    * peer, or NULL.
273    */
274   struct GNUNET_CORE_TransmitHandle *cth;
275
276   /**
277    * Messages (replies, queries, content migration) we would like to
278    * send to this peer in the near future.  Sorted by priority, head.
279    */
280   struct PendingMessage *pending_messages_head;
281
282   /**
283    * Messages (replies, queries, content migration) we would like to
284    * send to this peer in the near future.  Sorted by priority, tail.
285    */
286   struct PendingMessage *pending_messages_tail;
287
288   /**
289    * How long does it typically take for us to transmit a message
290    * to this peer?  (delay between the request being issued and
291    * the callback being invoked).
292    */
293   struct GNUNET_LOAD_Value *transmission_delay;
294
295   /**
296    * Context of our GNUNET_CORE_peer_change_preference call (or NULL).
297    */
298   struct GNUNET_CORE_InformationRequestContext *irc;
299
300   /**
301    * Request for which 'irc' is currently active (or NULL).
302    */
303   struct PendingRequest *pr;
304
305   /**
306    * Time when the last transmission request was issued.
307    */
308   struct GNUNET_TIME_Absolute last_transmission_request_start;
309
310   /**
311    * ID of delay task for scheduling transmission.
312    */
313   GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task;
314
315   /**
316    * Average priority of successful replies.  Calculated
317    * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
318    */
319   double avg_priority;
320
321   /**
322    * Increase in traffic preference still to be submitted
323    * to the core service for this peer.
324    */
325   uint64_t inc_preference;
326
327   /**
328    * Trust rating for this peer
329    */
330   uint32_t trust;
331
332   /**
333    * Trust rating for this peer on disk.
334    */
335   uint32_t disk_trust;
336
337   /**
338    * The peer's identity.
339    */
340   GNUNET_PEER_Id pid;  
341
342   /**
343    * Size of the linked list of 'pending_messages'.
344    */
345   unsigned int pending_requests;
346
347   /**
348    * Which offset in "last_p2p_replies" will be updated next?
349    * (we go round-robin).
350    */
351   unsigned int last_p2p_replies_woff;
352
353   /**
354    * Which offset in "last_client_replies" will be updated next?
355    * (we go round-robin).
356    */
357   unsigned int last_client_replies_woff;
358
359   /**
360    * Current offset into 'last_request_times' ring buffer.
361    */
362   unsigned int last_request_times_off;
363
364 };
365
366
367 /**
368  * Information we keep for each pending request.  We should try to
369  * keep this struct as small as possible since its memory consumption
370  * is key to how many requests we can have pending at once.
371  */
372 struct PendingRequest;
373
374
375 /**
376  * Doubly-linked list of requests we are performing
377  * on behalf of the same client.
378  */
379 struct ClientRequestList
380 {
381
382   /**
383    * This is a doubly-linked list.
384    */
385   struct ClientRequestList *next;
386
387   /**
388    * This is a doubly-linked list.
389    */
390   struct ClientRequestList *prev;
391
392   /**
393    * Request this entry represents.
394    */
395   struct PendingRequest *req;
396
397   /**
398    * Client list this request belongs to.
399    */
400   struct ClientList *client_list;
401
402 };
403
404
405 /**
406  * Replies to be transmitted to the client.  The actual
407  * response message is allocated after this struct.
408  */
409 struct ClientResponseMessage
410 {
411   /**
412    * This is a doubly-linked list.
413    */
414   struct ClientResponseMessage *next;
415
416   /**
417    * This is a doubly-linked list.
418    */
419   struct ClientResponseMessage *prev;
420
421   /**
422    * Client list entry this response belongs to.
423    */
424   struct ClientList *client_list;
425
426   /**
427    * Number of bytes in the response.
428    */
429   size_t msize;
430 };
431
432
433 /**
434  * Linked list of clients we are performing requests
435  * for right now.
436  */
437 struct ClientList
438 {
439   /**
440    * This is a linked list.
441    */
442   struct ClientList *next;
443
444   /**
445    * ID of a client making a request, NULL if this entry is for a
446    * peer.
447    */
448   struct GNUNET_SERVER_Client *client;
449
450   /**
451    * Head of list of requests performed on behalf
452    * of this client right now.
453    */
454   struct ClientRequestList *rl_head;
455
456   /**
457    * Tail of list of requests performed on behalf
458    * of this client right now.
459    */
460   struct ClientRequestList *rl_tail;
461
462   /**
463    * Head of linked list of responses.
464    */
465   struct ClientResponseMessage *res_head;
466
467   /**
468    * Tail of linked list of responses.
469    */
470   struct ClientResponseMessage *res_tail;
471
472   /**
473    * Context for sending replies.
474    */
475   struct GNUNET_CONNECTION_TransmitHandle *th;
476
477 };
478
479
480 /**
481  * Information about a peer that we have forwarded this
482  * request to already.  
483  */
484 struct UsedTargetEntry
485 {
486   /**
487    * What was the last time we have transmitted this request to this
488    * peer?
489    */
490   struct GNUNET_TIME_Absolute last_request_time;
491
492   /**
493    * How often have we transmitted this request to this peer?
494    */
495   unsigned int num_requests;
496
497   /**
498    * PID of the target peer.
499    */
500   GNUNET_PEER_Id pid;
501
502 };
503
504
505 /**
506  * Doubly-linked list of messages we are performing
507  * due to a pending request.
508  */
509 struct PendingMessageList
510 {
511
512   /**
513    * This is a doubly-linked list of messages on behalf of the same request.
514    */
515   struct PendingMessageList *next;
516
517   /**
518    * This is a doubly-linked list of messages on behalf of the same request.
519    */
520   struct PendingMessageList *prev;
521
522   /**
523    * Message this entry represents.
524    */
525   struct PendingMessage *pm;
526
527   /**
528    * Request this entry belongs to.
529    */
530   struct PendingRequest *req;
531
532   /**
533    * Peer this message is targeted for.
534    */
535   struct ConnectedPeer *target;
536
537 };
538
539
540 /**
541  * Information we keep for each pending request.  We should try to
542  * keep this struct as small as possible since its memory consumption
543  * is key to how many requests we can have pending at once.
544  */
545 struct PendingRequest
546 {
547
548   /**
549    * If this request was made by a client, this is our entry in the
550    * client request list; otherwise NULL.
551    */
552   struct ClientRequestList *client_request_list;
553
554   /**
555    * Entry of peer responsible for this entry (if this request
556    * was made by a peer).
557    */
558   struct ConnectedPeer *cp;
559
560   /**
561    * If this is a namespace query, pointer to the hash of the public
562    * key of the namespace; otherwise NULL.  Pointer will be to the 
563    * end of this struct (so no need to free it).
564    */
565   const GNUNET_HashCode *namespace;
566
567   /**
568    * Bloomfilter we use to filter out replies that we don't care about
569    * (anymore).  NULL as long as we are interested in all replies.
570    */
571   struct GNUNET_CONTAINER_BloomFilter *bf;
572
573   /**
574    * Reference to DHT get operation for this request (or NULL).
575    */
576   struct GNUNET_DHT_GetHandle *dht_get;
577
578   /**
579    * Context of our GNUNET_CORE_peer_change_preference call.
580    */
581   struct ConnectedPeer *pirc;
582
583   /**
584    * Hash code of all replies that we have seen so far (only valid
585    * if client is not NULL since we only track replies like this for
586    * our own clients).
587    */
588   GNUNET_HashCode *replies_seen;
589
590   /**
591    * Node in the heap representing this entry; NULL
592    * if we have no heap node.
593    */
594   struct GNUNET_CONTAINER_HeapNode *hnode;
595
596   /**
597    * Head of list of messages being performed on behalf of this
598    * request.
599    */
600   struct PendingMessageList *pending_head;
601
602   /**
603    * Tail of list of messages being performed on behalf of this
604    * request.
605    */
606   struct PendingMessageList *pending_tail;
607
608   /**
609    * When did we first see this request (form this peer), or, if our
610    * client is initiating, when did we last initiate a search?
611    */
612   struct GNUNET_TIME_Absolute start_time;
613
614   /**
615    * The query that this request is for.
616    */
617   GNUNET_HashCode query;
618
619   /**
620    * The task responsible for transmitting queries
621    * for this request.
622    */
623   GNUNET_SCHEDULER_TaskIdentifier task;
624
625   /**
626    * (Interned) Peer identifier that identifies a preferred target
627    * for requests.
628    */
629   GNUNET_PEER_Id target_pid;
630
631   /**
632    * (Interned) Peer identifiers of peers that have already
633    * received our query for this content.
634    */
635   struct UsedTargetEntry *used_targets;
636   
637   /**
638    * Our entry in the queue (non-NULL while we wait for our
639    * turn to interact with the local database).
640    */
641   struct GNUNET_DATASTORE_QueueEntry *qe;
642
643   /**
644    * Size of the 'bf' (in bytes).
645    */
646   size_t bf_size;
647
648   /**
649    * Desired anonymity level; only valid for requests from a local client.
650    */
651   uint32_t anonymity_level;
652
653   /**
654    * How many entries in "used_targets" are actually valid?
655    */
656   unsigned int used_targets_off;
657
658   /**
659    * How long is the "used_targets" array?
660    */
661   unsigned int used_targets_size;
662
663   /**
664    * Number of results found for this request.
665    */
666   unsigned int results_found;
667
668   /**
669    * How many entries in "replies_seen" are actually valid?
670    */
671   unsigned int replies_seen_off;
672
673   /**
674    * How long is the "replies_seen" array?
675    */
676   unsigned int replies_seen_size;
677   
678   /**
679    * Priority with which this request was made.  If one of our clients
680    * made the request, then this is the current priority that we are
681    * using when initiating the request.  This value is used when
682    * we decide to reward other peers with trust for providing a reply.
683    */
684   uint32_t priority;
685
686   /**
687    * Priority points left for us to spend when forwarding this request
688    * to other peers.
689    */
690   uint32_t remaining_priority;
691
692   /**
693    * Number to mingle hashes for bloom-filter tests with.
694    */
695   int32_t mingle;
696
697   /**
698    * TTL with which we saw this request (or, if we initiated, TTL that
699    * we used for the request).
700    */
701   int32_t ttl;
702   
703   /**
704    * Type of the content that this request is for.
705    */
706   enum GNUNET_BLOCK_Type type;
707
708   /**
709    * Remove this request after transmission of the current response.
710    */
711   int8_t do_remove;
712
713   /**
714    * GNUNET_YES if we should not forward this request to other peers.
715    */
716   int8_t local_only;
717
718   /**
719    * GNUNET_YES if we should not forward this request to other peers.
720    */
721   int8_t forward_only;
722
723 };
724
725
726 /**
727  * Block that is ready for migration to other peers.  Actual data is at the end of the block.
728  */
729 struct MigrationReadyBlock
730 {
731
732   /**
733    * This is a doubly-linked list.
734    */
735   struct MigrationReadyBlock *next;
736
737   /**
738    * This is a doubly-linked list.
739    */
740   struct MigrationReadyBlock *prev;
741
742   /**
743    * Query for the block.
744    */
745   GNUNET_HashCode query;
746
747   /**
748    * When does this block expire? 
749    */
750   struct GNUNET_TIME_Absolute expiration;
751
752   /**
753    * Peers we would consider forwarding this
754    * block to.  Zero for empty entries.
755    */
756   GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
757
758   /**
759    * Size of the block.
760    */
761   size_t size;
762
763   /**
764    *  Number of targets already used.
765    */
766   unsigned int used_targets;
767
768   /**
769    * Type of the block.
770    */
771   enum GNUNET_BLOCK_Type type;
772 };
773
774
775 /**
776  * Our connection to the datastore.
777  */
778 static struct GNUNET_DATASTORE_Handle *dsh;
779
780 /**
781  * Our block context.
782  */
783 static struct GNUNET_BLOCK_Context *block_ctx;
784
785 /**
786  * Our block configuration.
787  */
788 static struct GNUNET_CONFIGURATION_Handle *block_cfg;
789
790 /**
791  * Our configuration.
792  */
793 static const struct GNUNET_CONFIGURATION_Handle *cfg;
794
795 /**
796  * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
797  */
798 static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
799
800 /**
801  * Map of peer identifiers to "struct PendingRequest" (for that peer).
802  */
803 static struct GNUNET_CONTAINER_MultiHashMap *peer_request_map;
804
805 /**
806  * Map of query identifiers to "struct PendingRequest" (for that query).
807  */
808 static struct GNUNET_CONTAINER_MultiHashMap *query_request_map;
809
810 /**
811  * Heap with the request that will expire next at the top.  Contains
812  * pointers of type "struct PendingRequest*"; these will *also* be
813  * aliased from the "requests_by_peer" data structures and the
814  * "requests_by_query" table.  Note that requests from our clients
815  * don't expire and are thus NOT in the "requests_by_expiration"
816  * (or the "requests_by_peer" tables).
817  */
818 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
819
820 /**
821  * Handle for reporting statistics.
822  */
823 static struct GNUNET_STATISTICS_Handle *stats;
824
825 /**
826  * Linked list of clients we are currently processing requests for.
827  */
828 static struct ClientList *client_list;
829
830 /**
831  * Pointer to handle to the core service (points to NULL until we've
832  * connected to it).
833  */
834 static struct GNUNET_CORE_Handle *core;
835
836 /**
837  * Head of linked list of blocks that can be migrated.
838  */
839 static struct MigrationReadyBlock *mig_head;
840
841 /**
842  * Tail of linked list of blocks that can be migrated.
843  */
844 static struct MigrationReadyBlock *mig_tail;
845
846 /**
847  * Request to datastore for migration (or NULL).
848  */
849 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
850
851 /**
852  * Request to datastore for DHT PUTs (or NULL).
853  */
854 static struct GNUNET_DATASTORE_QueueEntry *dht_qe;
855
856 /**
857  * Type we will request for the next DHT PUT round from the datastore.
858  */
859 static enum GNUNET_BLOCK_Type dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
860
861 /**
862  * Where do we store trust information?
863  */
864 static char *trustDirectory;
865
866 /**
867  * ID of task that collects blocks for migration.
868  */
869 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
870
871 /**
872  * ID of task that collects blocks for DHT PUTs.
873  */
874 static GNUNET_SCHEDULER_TaskIdentifier dht_task;
875
876 /**
877  * What is the maximum frequency at which we are allowed to
878  * poll the datastore for migration content?
879  */
880 static struct GNUNET_TIME_Relative min_migration_delay;
881
882 /**
883  * Handle for DHT operations.
884  */
885 static struct GNUNET_DHT_Handle *dht_handle;
886
887 /**
888  * Size of the doubly-linked list of migration blocks.
889  */
890 static unsigned int mig_size;
891
892 /**
893  * Are we allowed to migrate content to this peer.
894  */
895 static int active_to_migration;
896
897 /**
898  * Are we allowed to push out content from this peer.
899  */
900 static int active_from_migration;
901
902 /**
903  * How many entires with zero anonymity do we currently estimate
904  * to have in the database?
905  */
906 static unsigned int zero_anonymity_count_estimate;
907
908 /**
909  * Typical priorities we're seeing from other peers right now.  Since
910  * most priorities will be zero, this value is the weighted average of
911  * non-zero priorities seen "recently".  In order to ensure that new
912  * values do not dramatically change the ratio, values are first
913  * "capped" to a reasonable range (+N of the current value) and then
914  * averaged into the existing value by a ratio of 1:N.  Hence
915  * receiving the largest possible priority can still only raise our
916  * "current_priorities" by at most 1.
917  */
918 static double current_priorities;
919
920 /**
921  * Datastore 'GET' load tracking.
922  */
923 static struct GNUNET_LOAD_Value *datastore_get_load;
924
925 /**
926  * Datastore 'PUT' load tracking.
927  */
928 static struct GNUNET_LOAD_Value *datastore_put_load;
929
930 /**
931  * How long do requests typically stay in the routing table?
932  */
933 static struct GNUNET_LOAD_Value *rt_entry_lifetime;
934
935 /**
936  * How many query messages have we received 'recently' that 
937  * have not yet been claimed as cover traffic?
938  */
939 static unsigned int cover_query_count;
940
941 /**
942  * How many content messages have we received 'recently' that 
943  * have not yet been claimed as cover traffic?
944  */
945 static unsigned int cover_content_count;
946
947 /**
948  * ID of our task that we use to age the cover counters.
949  */
950 static GNUNET_SCHEDULER_TaskIdentifier cover_age_task;
951
952 static void
953 age_cover_counters (void *cls,
954                     const struct GNUNET_SCHEDULER_TaskContext *tc)
955 {
956   cover_content_count = (cover_content_count * 15) / 16;
957   cover_query_count = (cover_query_count * 15) / 16;
958   cover_age_task = GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY,
959                                                  &age_cover_counters,
960                                                  NULL);
961 }
962
963 /**
964  * We've just now completed a datastore request.  Update our
965  * datastore load calculations.
966  *
967  * @param start time when the datastore request was issued
968  */
969 static void
970 update_datastore_delays (struct GNUNET_TIME_Absolute start)
971 {
972   struct GNUNET_TIME_Relative delay;
973
974   delay = GNUNET_TIME_absolute_get_duration (start);
975   GNUNET_LOAD_update (datastore_get_load,
976                       delay.rel_value);
977 }
978
979
980 /**
981  * Get the filename under which we would store the GNUNET_HELLO_Message
982  * for the given host and protocol.
983  * @return filename of the form DIRECTORY/HOSTID
984  */
985 static char *
986 get_trust_filename (const struct GNUNET_PeerIdentity *id)
987 {
988   struct GNUNET_CRYPTO_HashAsciiEncoded fil;
989   char *fn;
990
991   GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
992   GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil);
993   return fn;
994 }
995
996
997
998 /**
999  * Transmit messages by copying it to the target buffer
1000  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
1001  * for writing in the meantime.  In that case, do nothing
1002  * (the disconnect or shutdown handler will take care of the rest).
1003  * If we were able to transmit messages and there are still more
1004  * pending, ask core again for further calls to this function.
1005  *
1006  * @param cls closure, pointer to the 'struct ConnectedPeer*'
1007  * @param size number of bytes available in buf
1008  * @param buf where the callee should write the message
1009  * @return number of bytes written to buf
1010  */
1011 static size_t
1012 transmit_to_peer (void *cls,
1013                   size_t size, void *buf);
1014
1015
1016 /* ******************* clean up functions ************************ */
1017
1018 /**
1019  * Delete the given migration block.
1020  *
1021  * @param mb block to delete
1022  */
1023 static void
1024 delete_migration_block (struct MigrationReadyBlock *mb)
1025 {
1026   GNUNET_CONTAINER_DLL_remove (mig_head,
1027                                mig_tail,
1028                                mb);
1029   GNUNET_PEER_decrement_rcs (mb->target_list,
1030                              MIGRATION_LIST_SIZE);
1031   mig_size--;
1032   GNUNET_free (mb);
1033 }
1034
1035
1036 /**
1037  * Compare the distance of two peers to a key.
1038  *
1039  * @param key key
1040  * @param p1 first peer
1041  * @param p2 second peer
1042  * @return GNUNET_YES if P1 is closer to key than P2
1043  */
1044 static int
1045 is_closer (const GNUNET_HashCode *key,
1046            const struct GNUNET_PeerIdentity *p1,
1047            const struct GNUNET_PeerIdentity *p2)
1048 {
1049   return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey,
1050                                     &p2->hashPubKey,
1051                                     key);
1052 }
1053
1054
1055 /**
1056  * Consider migrating content to a given peer.
1057  *
1058  * @param cls 'struct MigrationReadyBlock*' to select
1059  *            targets for (or NULL for none)
1060  * @param key ID of the peer 
1061  * @param value 'struct ConnectedPeer' of the peer
1062  * @return GNUNET_YES (always continue iteration)
1063  */
1064 static int
1065 consider_migration (void *cls,
1066                     const GNUNET_HashCode *key,
1067                     void *value)
1068 {
1069   struct MigrationReadyBlock *mb = cls;
1070   struct ConnectedPeer *cp = value;
1071   struct MigrationReadyBlock *pos;
1072   struct GNUNET_PeerIdentity cppid;
1073   struct GNUNET_PeerIdentity otherpid;
1074   struct GNUNET_PeerIdentity worstpid;
1075   size_t msize;
1076   unsigned int i;
1077   unsigned int repl;
1078   
1079   /* consider 'cp' as a migration target for mb */
1080   if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).rel_value > 0)
1081     return GNUNET_YES; /* peer has requested no migration! */
1082   if (mb != NULL)
1083     {
1084       GNUNET_PEER_resolve (cp->pid,
1085                            &cppid);
1086       repl = MIGRATION_LIST_SIZE;
1087       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1088         {
1089           if (mb->target_list[i] == 0)
1090             {
1091               mb->target_list[i] = cp->pid;
1092               GNUNET_PEER_change_rc (mb->target_list[i], 1);
1093               repl = MIGRATION_LIST_SIZE;
1094               break;
1095             }
1096           GNUNET_PEER_resolve (mb->target_list[i],
1097                                &otherpid);
1098           if ( (repl == MIGRATION_LIST_SIZE) &&
1099                is_closer (&mb->query,
1100                           &cppid,
1101                           &otherpid)) 
1102             {
1103               repl = i;
1104               worstpid = otherpid;
1105             }
1106           else if ( (repl != MIGRATION_LIST_SIZE) &&
1107                     (is_closer (&mb->query,
1108                                 &worstpid,
1109                                 &otherpid) ) )
1110             {
1111               repl = i;
1112               worstpid = otherpid;
1113             }       
1114         }
1115       if (repl != MIGRATION_LIST_SIZE) 
1116         {
1117           GNUNET_PEER_change_rc (mb->target_list[repl], -1);
1118           mb->target_list[repl] = cp->pid;
1119           GNUNET_PEER_change_rc (mb->target_list[repl], 1);
1120         }
1121     }
1122
1123   /* consider scheduling transmission to cp for content migration */
1124   if (cp->cth != NULL)        
1125     return GNUNET_YES; 
1126   msize = 0;
1127   pos = mig_head;
1128   while (pos != NULL)
1129     {
1130       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1131         {
1132           if (cp->pid == pos->target_list[i])
1133             {
1134               if (msize == 0)
1135                 msize = pos->size;
1136               else
1137                 msize = GNUNET_MIN (msize,
1138                                     pos->size);
1139               break;
1140             }
1141         }
1142       pos = pos->next;
1143     }
1144   if (msize == 0)
1145     return GNUNET_YES; /* no content available */
1146 #if DEBUG_FS
1147   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1148               "Trying to migrate at least %u bytes to peer `%s'\n",
1149               msize,
1150               GNUNET_h2s (key));
1151 #endif
1152   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
1153     {
1154       GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task);
1155       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
1156     }
1157   cp->cth 
1158     = GNUNET_CORE_notify_transmit_ready (core,
1159                                          0, GNUNET_TIME_UNIT_FOREVER_REL,
1160                                          (const struct GNUNET_PeerIdentity*) key,
1161                                          msize + sizeof (struct PutMessage),
1162                                          &transmit_to_peer,
1163                                          cp);
1164   return GNUNET_YES;
1165 }
1166
1167
1168 /**
1169  * Task that is run periodically to obtain blocks for content
1170  * migration
1171  * 
1172  * @param cls unused
1173  * @param tc scheduler context (also unused)
1174  */
1175 static void
1176 gather_migration_blocks (void *cls,
1177                          const struct GNUNET_SCHEDULER_TaskContext *tc);
1178
1179
1180
1181
1182 /**
1183  * Task that is run periodically to obtain blocks for DHT PUTs.
1184  * 
1185  * @param cls type of blocks to gather
1186  * @param tc scheduler context (unused)
1187  */
1188 static void
1189 gather_dht_put_blocks (void *cls,
1190                        const struct GNUNET_SCHEDULER_TaskContext *tc);
1191
1192
1193 /**
1194  * If the migration task is not currently running, consider
1195  * (re)scheduling it with the appropriate delay.
1196  */
1197 static void
1198 consider_migration_gathering ()
1199 {
1200   struct GNUNET_TIME_Relative delay;
1201
1202   if (dsh == NULL)
1203     return;
1204   if (mig_qe != NULL)
1205     return;
1206   if (mig_task != GNUNET_SCHEDULER_NO_TASK)
1207     return;
1208   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
1209                                          mig_size);
1210   delay = GNUNET_TIME_relative_divide (delay,
1211                                        MAX_MIGRATION_QUEUE);
1212   delay = GNUNET_TIME_relative_max (delay,
1213                                     min_migration_delay);
1214   mig_task = GNUNET_SCHEDULER_add_delayed (delay,
1215                                            &gather_migration_blocks,
1216                                            NULL);
1217 }
1218
1219
1220 /**
1221  * If the DHT PUT gathering task is not currently running, consider
1222  * (re)scheduling it with the appropriate delay.
1223  */
1224 static void
1225 consider_dht_put_gathering (void *cls)
1226 {
1227   struct GNUNET_TIME_Relative delay;
1228
1229   if (dsh == NULL)
1230     return;
1231   if (dht_qe != NULL)
1232     return;
1233   if (dht_task != GNUNET_SCHEDULER_NO_TASK)
1234     return;
1235   if (zero_anonymity_count_estimate > 0)
1236     {
1237       delay = GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY,
1238                                            zero_anonymity_count_estimate);
1239       delay = GNUNET_TIME_relative_min (delay,
1240                                         MAX_DHT_PUT_FREQ);
1241     }
1242   else
1243     {
1244       /* if we have NO zero-anonymity content yet, wait 5 minutes for some to
1245          (hopefully) appear */
1246       delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5);
1247     }
1248   dht_task = GNUNET_SCHEDULER_add_delayed (delay,
1249                                            &gather_dht_put_blocks,
1250                                            cls);
1251 }
1252
1253
1254 /**
1255  * Process content offered for migration.
1256  *
1257  * @param cls closure
1258  * @param key key for the content
1259  * @param size number of bytes in data
1260  * @param data content stored
1261  * @param type type of the content
1262  * @param priority priority of the content
1263  * @param anonymity anonymity-level for the content
1264  * @param expiration expiration time for the content
1265  * @param uid unique identifier for the datum;
1266  *        maybe 0 if no unique identifier is available
1267  */
1268 static void
1269 process_migration_content (void *cls,
1270                            const GNUNET_HashCode * key,
1271                            size_t size,
1272                            const void *data,
1273                            enum GNUNET_BLOCK_Type type,
1274                            uint32_t priority,
1275                            uint32_t anonymity,
1276                            struct GNUNET_TIME_Absolute
1277                            expiration, uint64_t uid)
1278 {
1279   struct MigrationReadyBlock *mb;
1280   
1281   if (key == NULL)
1282     {
1283       mig_qe = NULL;
1284       if (mig_size < MAX_MIGRATION_QUEUE)  
1285         consider_migration_gathering ();
1286       return;
1287     }
1288   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1289     {
1290       if (GNUNET_OK !=
1291           GNUNET_FS_handle_on_demand_block (key, size, data,
1292                                             type, priority, anonymity,
1293                                             expiration, uid, 
1294                                             &process_migration_content,
1295                                             NULL))
1296         {
1297           GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1298         }
1299       return;
1300     }
1301 #if DEBUG_FS
1302   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1303               "Retrieved block `%s' of type %u for migration\n",
1304               GNUNET_h2s (key),
1305               type);
1306 #endif
1307   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
1308   mb->query = *key;
1309   mb->expiration = expiration;
1310   mb->size = size;
1311   mb->type = type;
1312   memcpy (&mb[1], data, size);
1313   GNUNET_CONTAINER_DLL_insert_after (mig_head,
1314                                      mig_tail,
1315                                      mig_tail,
1316                                      mb);
1317   mig_size++;
1318   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1319                                          &consider_migration,
1320                                          mb);
1321   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1322 }
1323
1324
1325 /**
1326  * Function called upon completion of the DHT PUT operation.
1327  */
1328 static void
1329 dht_put_continuation (void *cls,
1330                       const struct GNUNET_SCHEDULER_TaskContext *tc)
1331 {
1332   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1333 }
1334
1335
1336 /**
1337  * Store content in DHT.
1338  *
1339  * @param cls closure
1340  * @param key key for the content
1341  * @param size number of bytes in data
1342  * @param data content stored
1343  * @param type type of the content
1344  * @param priority priority of the content
1345  * @param anonymity anonymity-level for the content
1346  * @param expiration expiration time for the content
1347  * @param uid unique identifier for the datum;
1348  *        maybe 0 if no unique identifier is available
1349  */
1350 static void
1351 process_dht_put_content (void *cls,
1352                          const GNUNET_HashCode * key,
1353                          size_t size,
1354                          const void *data,
1355                          enum GNUNET_BLOCK_Type type,
1356                          uint32_t priority,
1357                          uint32_t anonymity,
1358                          struct GNUNET_TIME_Absolute
1359                          expiration, uint64_t uid)
1360
1361   static unsigned int counter;
1362   static GNUNET_HashCode last_vhash;
1363   static GNUNET_HashCode vhash;
1364
1365   if (key == NULL)
1366     {
1367       dht_qe = NULL;
1368       consider_dht_put_gathering (cls);
1369       return;
1370     }
1371   /* slightly funky code to estimate the total number of values with zero
1372      anonymity from the maximum observed length of a monotonically increasing 
1373      sequence of hashes over the contents */
1374   GNUNET_CRYPTO_hash (data, size, &vhash);
1375   if (GNUNET_CRYPTO_hash_cmp (&vhash, &last_vhash) <= 0)
1376     {
1377       if (zero_anonymity_count_estimate > 0)
1378         zero_anonymity_count_estimate /= 2;
1379       counter = 0;
1380     }
1381   last_vhash = vhash;
1382   if (counter < 31)
1383     counter++;
1384   if (zero_anonymity_count_estimate < (1 << counter))
1385     zero_anonymity_count_estimate = (1 << counter);
1386 #if DEBUG_FS
1387   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1388               "Retrieved block `%s' of type %u for DHT PUT\n",
1389               GNUNET_h2s (key),
1390               type);
1391 #endif
1392   GNUNET_DHT_put (dht_handle,
1393                   key,
1394                   DEFAULT_PUT_REPLICATION,
1395                   GNUNET_DHT_RO_NONE,
1396                   type,
1397                   size,
1398                   data,
1399                   expiration,
1400                   GNUNET_TIME_UNIT_FOREVER_REL,
1401                   &dht_put_continuation,
1402                   cls);
1403 }
1404
1405
1406 /**
1407  * Task that is run periodically to obtain blocks for content
1408  * migration
1409  * 
1410  * @param cls unused
1411  * @param tc scheduler context (also unused)
1412  */
1413 static void
1414 gather_migration_blocks (void *cls,
1415                          const struct GNUNET_SCHEDULER_TaskContext *tc)
1416 {
1417   mig_task = GNUNET_SCHEDULER_NO_TASK;
1418   if (dsh != NULL)
1419     {
1420       mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, UINT_MAX,
1421                                             GNUNET_TIME_UNIT_FOREVER_REL,
1422                                             &process_migration_content, NULL);
1423       GNUNET_assert (mig_qe != NULL);
1424     }
1425 }
1426
1427
1428 /**
1429  * Task that is run periodically to obtain blocks for DHT PUTs.
1430  * 
1431  * @param cls type of blocks to gather
1432  * @param tc scheduler context (unused)
1433  */
1434 static void
1435 gather_dht_put_blocks (void *cls,
1436                        const struct GNUNET_SCHEDULER_TaskContext *tc)
1437 {
1438   dht_task = GNUNET_SCHEDULER_NO_TASK;
1439   if (dsh != NULL)
1440     {
1441       if (dht_put_type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1442         dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
1443       dht_qe = GNUNET_DATASTORE_get_zero_anonymity (dsh, 0, UINT_MAX,
1444                                                     GNUNET_TIME_UNIT_FOREVER_REL,
1445                                                     dht_put_type++,
1446                                                     &process_dht_put_content, NULL);
1447       GNUNET_assert (dht_qe != NULL);
1448     }
1449 }
1450
1451
1452 /**
1453  * We're done with a particular message list entry.
1454  * Free all associated resources.
1455  * 
1456  * @param pml entry to destroy
1457  */
1458 static void
1459 destroy_pending_message_list_entry (struct PendingMessageList *pml)
1460 {
1461   GNUNET_CONTAINER_DLL_remove (pml->req->pending_head,
1462                                pml->req->pending_tail,
1463                                pml);
1464   GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head,
1465                                pml->target->pending_messages_tail,
1466                                pml->pm);
1467   pml->target->pending_requests--;
1468   GNUNET_free (pml->pm);
1469   GNUNET_free (pml);
1470 }
1471
1472
1473 /**
1474  * Destroy the given pending message (and call the respective
1475  * continuation).
1476  *
1477  * @param pm message to destroy
1478  * @param tpid id of peer that the message was delivered to, or 0 for none
1479  */
1480 static void
1481 destroy_pending_message (struct PendingMessage *pm,
1482                          GNUNET_PEER_Id tpid)
1483 {
1484   struct PendingMessageList *pml = pm->pml;
1485   TransmissionContinuation cont;
1486   void *cont_cls;
1487
1488   cont = pm->cont;
1489   cont_cls = pm->cont_cls;
1490   if (pml != NULL)
1491     {
1492       GNUNET_assert (pml->pm == pm);
1493       GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
1494       destroy_pending_message_list_entry (pml);
1495     }
1496   else
1497     {
1498       GNUNET_free (pm);
1499     }
1500   if (cont != NULL)
1501     cont (cont_cls, tpid);  
1502 }
1503
1504
1505 /**
1506  * We're done processing a particular request.
1507  * Free all associated resources.
1508  *
1509  * @param pr request to destroy
1510  */
1511 static void
1512 destroy_pending_request (struct PendingRequest *pr)
1513 {
1514   struct GNUNET_PeerIdentity pid;
1515   unsigned int i;
1516
1517   if (pr->hnode != NULL)
1518     {
1519       GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
1520                                          pr->hnode);
1521       pr->hnode = NULL;
1522     }
1523   if (NULL == pr->client_request_list)
1524     {
1525       GNUNET_STATISTICS_update (stats,
1526                                 gettext_noop ("# P2P searches active"),
1527                                 -1,
1528                                 GNUNET_NO);
1529     }
1530   else
1531     {
1532       GNUNET_STATISTICS_update (stats,
1533                                 gettext_noop ("# client searches active"),
1534                                 -1,
1535                                 GNUNET_NO);
1536     }
1537   if (GNUNET_YES == 
1538       GNUNET_CONTAINER_multihashmap_remove (query_request_map,
1539                                             &pr->query,
1540                                             pr))
1541     {
1542       GNUNET_LOAD_update (rt_entry_lifetime,
1543                           GNUNET_TIME_absolute_get_duration (pr->start_time).rel_value);
1544     }
1545   if (pr->qe != NULL)
1546      {
1547       GNUNET_DATASTORE_cancel (pr->qe);
1548       pr->qe = NULL;
1549     }
1550   if (pr->dht_get != NULL)
1551     {
1552       GNUNET_DHT_get_stop (pr->dht_get);
1553       pr->dht_get = NULL;
1554     }
1555   if (pr->client_request_list != NULL)
1556     {
1557       GNUNET_CONTAINER_DLL_remove (pr->client_request_list->client_list->rl_head,
1558                                    pr->client_request_list->client_list->rl_tail,
1559                                    pr->client_request_list);
1560       GNUNET_free (pr->client_request_list);
1561       pr->client_request_list = NULL;
1562     }
1563   if (pr->cp != NULL)
1564     {
1565       GNUNET_PEER_resolve (pr->cp->pid,
1566                            &pid);
1567       (void) GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1568                                                    &pid.hashPubKey,
1569                                                    pr);
1570       pr->cp = NULL;
1571     }
1572   if (pr->bf != NULL)
1573     {
1574       GNUNET_CONTAINER_bloomfilter_free (pr->bf);                                        
1575       pr->bf = NULL;
1576     }
1577   if (pr->pirc != NULL)
1578     {
1579       GNUNET_CORE_peer_change_preference_cancel (pr->pirc->irc);
1580       pr->pirc->irc = NULL;
1581       pr->pirc = NULL;
1582     }
1583   if (pr->replies_seen != NULL)
1584     {
1585       GNUNET_free (pr->replies_seen);
1586       pr->replies_seen = NULL;
1587     }
1588   if (pr->task != GNUNET_SCHEDULER_NO_TASK)
1589     {
1590       GNUNET_SCHEDULER_cancel (pr->task);
1591       pr->task = GNUNET_SCHEDULER_NO_TASK;
1592     }
1593   while (NULL != pr->pending_head)    
1594     destroy_pending_message_list_entry (pr->pending_head);
1595   GNUNET_PEER_change_rc (pr->target_pid, -1);
1596   if (pr->used_targets != NULL)
1597     {
1598       for (i=0;i<pr->used_targets_off;i++)
1599         GNUNET_PEER_change_rc (pr->used_targets[i].pid, -1);
1600       GNUNET_free (pr->used_targets);
1601       pr->used_targets_off = 0;
1602       pr->used_targets_size = 0;
1603       pr->used_targets = NULL;
1604     }
1605   GNUNET_free (pr);
1606 }
1607
1608
1609 /**
1610  * Find latency information in 'atsi'.
1611  *
1612  * @param atsi performance data
1613  * @return connection latency
1614  */
1615 static struct GNUNET_TIME_Relative
1616 get_latency (const struct GNUNET_TRANSPORT_ATS_Information *atsi)
1617 {
1618   while ( (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR) &&
1619           (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_QUALITY_NET_DELAY) )
1620     atsi++;
1621   if (ntohl (atsi->type) == GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR) 
1622     {
1623       GNUNET_break (0);
1624       /* how can we not have latency data? */
1625       return GNUNET_TIME_UNIT_SECONDS;
1626     }
1627   return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
1628                                         ntohl (atsi->value));
1629 }
1630
1631
1632 /**
1633  * Method called whenever a given peer connects.
1634  *
1635  * @param cls closure, not used
1636  * @param peer peer identity this notification is about
1637  * @param atsi performance information
1638  */
1639 static void 
1640 peer_connect_handler (void *cls,
1641                       const struct
1642                       GNUNET_PeerIdentity * peer,
1643                       const struct GNUNET_TRANSPORT_ATS_Information *atsi)
1644 {
1645   struct ConnectedPeer *cp;
1646   struct MigrationReadyBlock *pos;
1647   char *fn;
1648   uint32_t trust;
1649   struct GNUNET_TIME_Relative latency;
1650
1651   latency = get_latency (atsi);
1652   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1653                                           &peer->hashPubKey);
1654   if (NULL != cp)
1655     {
1656       GNUNET_break (0);
1657       return;
1658     }
1659   cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
1660   cp->transmission_delay = GNUNET_LOAD_value_init (latency);
1661   cp->pid = GNUNET_PEER_intern (peer);
1662
1663   fn = get_trust_filename (peer);
1664   if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
1665       (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
1666     cp->disk_trust = cp->trust = ntohl (trust);
1667   GNUNET_free (fn);
1668
1669   GNUNET_break (GNUNET_OK ==
1670                 GNUNET_CONTAINER_multihashmap_put (connected_peers,
1671                                                    &peer->hashPubKey,
1672                                                    cp,
1673                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1674
1675   pos = mig_head;
1676   while (NULL != pos)
1677     {
1678       (void) consider_migration (pos, &peer->hashPubKey, cp);
1679       pos = pos->next;
1680     }
1681 }
1682
1683
1684 /**
1685  * Method called whenever a given peer has a status change.
1686  *
1687  * @param cls closure
1688  * @param peer peer identity this notification is about
1689  * @param bandwidth_in available amount of inbound bandwidth
1690  * @param bandwidth_out available amount of outbound bandwidth
1691  * @param timeout absolute time when this peer will time out
1692  *        unless we see some further activity from it
1693  * @param atsi status information
1694  */
1695 static void
1696 peer_status_handler (void *cls,
1697                      const struct
1698                      GNUNET_PeerIdentity * peer,
1699                      struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in,
1700                      struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
1701                      struct GNUNET_TIME_Absolute timeout,
1702                      const struct GNUNET_TRANSPORT_ATS_Information *atsi)
1703 {
1704   struct ConnectedPeer *cp;
1705   struct GNUNET_TIME_Relative latency;
1706
1707   latency = get_latency (atsi);
1708   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1709                                           &peer->hashPubKey);
1710   if (cp == NULL)
1711     {
1712       GNUNET_break (0);
1713       return;
1714     }
1715   GNUNET_LOAD_value_set_decline (cp->transmission_delay,
1716                                  latency);  
1717 }
1718
1719
1720
1721 /**
1722  * Increase the host credit by a value.
1723  *
1724  * @param host which peer to change the trust value on
1725  * @param value is the int value by which the
1726  *  host credit is to be increased or decreased
1727  * @returns the actual change in trust (positive or negative)
1728  */
1729 static int
1730 change_host_trust (struct ConnectedPeer *host, int value)
1731 {
1732   if (value == 0)
1733     return 0;
1734   GNUNET_assert (host != NULL);
1735   if (value > 0)
1736     {
1737       if (host->trust + value < host->trust)
1738         {
1739           value = UINT32_MAX - host->trust;
1740           host->trust = UINT32_MAX;
1741         }
1742       else
1743         host->trust += value;
1744     }
1745   else
1746     {
1747       if (host->trust < -value)
1748         {
1749           value = -host->trust;
1750           host->trust = 0;
1751         }
1752       else
1753         host->trust += value;
1754     }
1755   return value;
1756 }
1757
1758
1759 /**
1760  * Write host-trust information to a file - flush the buffer entry!
1761  */
1762 static int
1763 flush_trust (void *cls,
1764              const GNUNET_HashCode *key,
1765              void *value)
1766 {
1767   struct ConnectedPeer *host = value;
1768   char *fn;
1769   uint32_t trust;
1770   struct GNUNET_PeerIdentity pid;
1771
1772   if (host->trust == host->disk_trust)
1773     return GNUNET_OK;                     /* unchanged */
1774   GNUNET_PEER_resolve (host->pid,
1775                        &pid);
1776   fn = get_trust_filename (&pid);
1777   if (host->trust == 0)
1778     {
1779       if ((0 != UNLINK (fn)) && (errno != ENOENT))
1780         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
1781                                   GNUNET_ERROR_TYPE_BULK, "unlink", fn);
1782     }
1783   else
1784     {
1785       trust = htonl (host->trust);
1786       if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust, 
1787                                                     sizeof(uint32_t),
1788                                                     GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE
1789                                                     | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ))
1790         host->disk_trust = host->trust;
1791     }
1792   GNUNET_free (fn);
1793   return GNUNET_OK;
1794 }
1795
1796 /**
1797  * Call this method periodically to scan data/hosts for new hosts.
1798  */
1799 static void
1800 cron_flush_trust (void *cls,
1801                   const struct GNUNET_SCHEDULER_TaskContext *tc)
1802 {
1803
1804   if (NULL == connected_peers)
1805     return;
1806   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1807                                          &flush_trust,
1808                                          NULL);
1809   if (NULL == tc)
1810     return;
1811   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1812     return;
1813   GNUNET_SCHEDULER_add_delayed (TRUST_FLUSH_FREQ, &cron_flush_trust, NULL);
1814 }
1815
1816
1817 /**
1818  * Free (each) request made by the peer.
1819  *
1820  * @param cls closure, points to peer that the request belongs to
1821  * @param key current key code
1822  * @param value value in the hash map
1823  * @return GNUNET_YES (we should continue to iterate)
1824  */
1825 static int
1826 destroy_request (void *cls,
1827                  const GNUNET_HashCode * key,
1828                  void *value)
1829 {
1830   const struct GNUNET_PeerIdentity * peer = cls;
1831   struct PendingRequest *pr = value;
1832   
1833   GNUNET_break (GNUNET_YES ==
1834                 GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1835                                                       &peer->hashPubKey,
1836                                                       pr));
1837   destroy_pending_request (pr);
1838   return GNUNET_YES;
1839 }
1840
1841
1842 /**
1843  * Method called whenever a peer disconnects.
1844  *
1845  * @param cls closure, not used
1846  * @param peer peer identity this notification is about
1847  */
1848 static void
1849 peer_disconnect_handler (void *cls,
1850                          const struct
1851                          GNUNET_PeerIdentity * peer)
1852 {
1853   struct ConnectedPeer *cp;
1854   struct PendingMessage *pm;
1855   unsigned int i;
1856   struct MigrationReadyBlock *pos;
1857   struct MigrationReadyBlock *next;
1858
1859   GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map,
1860                                               &peer->hashPubKey,
1861                                               &destroy_request,
1862                                               (void*) peer);
1863   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1864                                           &peer->hashPubKey);
1865   if (cp == NULL)
1866     return;
1867   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1868     {
1869       if (NULL != cp->last_client_replies[i])
1870         {
1871           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1872           cp->last_client_replies[i] = NULL;
1873         }
1874     }
1875   GNUNET_break (GNUNET_YES ==
1876                 GNUNET_CONTAINER_multihashmap_remove (connected_peers,
1877                                                       &peer->hashPubKey,
1878                                                       cp));
1879   if (cp->irc != NULL)
1880     {
1881       GNUNET_CORE_peer_change_preference_cancel (cp->irc);
1882       cp->irc = NULL;
1883       cp->pr->pirc = NULL;
1884       cp->pr = NULL;
1885     }
1886
1887   /* remove this peer from migration considerations; schedule
1888      alternatives */
1889   next = mig_head;
1890   while (NULL != (pos = next))
1891     {
1892       next = pos->next;
1893       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1894         {
1895           if (pos->target_list[i] == cp->pid)
1896             {
1897               GNUNET_PEER_change_rc (pos->target_list[i], -1);
1898               pos->target_list[i] = 0;
1899             }
1900          }
1901       if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers))
1902         {
1903           delete_migration_block (pos);
1904           consider_migration_gathering ();
1905           continue;
1906         }
1907       GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1908                                              &consider_migration,
1909                                              pos);
1910     }
1911   GNUNET_PEER_change_rc (cp->pid, -1);
1912   GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1913   if (NULL != cp->cth)
1914     {
1915       GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1916       cp->cth = NULL;
1917     }
1918   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
1919     {
1920       GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task);
1921       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
1922     }
1923   while (NULL != (pm = cp->pending_messages_head))
1924     destroy_pending_message (pm, 0 /* delivery failed */);
1925   GNUNET_LOAD_value_free (cp->transmission_delay);
1926   GNUNET_break (0 == cp->pending_requests);
1927   GNUNET_free (cp);
1928 }
1929
1930
1931 /**
1932  * Iterator over hash map entries that removes all occurences
1933  * of the given 'client' from the 'last_client_replies' of the
1934  * given connected peer.
1935  *
1936  * @param cls closure, the 'struct GNUNET_SERVER_Client*' to remove
1937  * @param key current key code (unused)
1938  * @param value value in the hash map (the 'struct ConnectedPeer*' to change)
1939  * @return GNUNET_YES (we should continue to iterate)
1940  */
1941 static int
1942 remove_client_from_last_client_replies (void *cls,
1943                                         const GNUNET_HashCode * key,
1944                                         void *value)
1945 {
1946   struct GNUNET_SERVER_Client *client = cls;
1947   struct ConnectedPeer *cp = value;
1948   unsigned int i;
1949
1950   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1951     {
1952       if (cp->last_client_replies[i] == client)
1953         {
1954           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1955           cp->last_client_replies[i] = NULL;
1956         }
1957     }  
1958   return GNUNET_YES;
1959 }
1960
1961
1962 /**
1963  * A client disconnected.  Remove all of its pending queries.
1964  *
1965  * @param cls closure, NULL
1966  * @param client identification of the client
1967  */
1968 static void
1969 handle_client_disconnect (void *cls,
1970                           struct GNUNET_SERVER_Client
1971                           * client)
1972 {
1973   struct ClientList *pos;
1974   struct ClientList *prev;
1975   struct ClientRequestList *rcl;
1976   struct ClientResponseMessage *creply;
1977
1978   if (client == NULL)
1979     return;
1980   prev = NULL;
1981   pos = client_list;
1982   while ( (NULL != pos) &&
1983           (pos->client != client) )
1984     {
1985       prev = pos;
1986       pos = pos->next;
1987     }
1988   if (pos == NULL)
1989     return; /* no requests pending for this client */
1990   while (NULL != (rcl = pos->rl_head))
1991     {
1992       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1993                   "Destroying pending request `%s' on disconnect\n",
1994                   GNUNET_h2s (&rcl->req->query));
1995       destroy_pending_request (rcl->req);
1996     }
1997   if (prev == NULL)
1998     client_list = pos->next;
1999   else
2000     prev->next = pos->next;
2001   if (pos->th != NULL)
2002     {
2003       GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
2004       pos->th = NULL;
2005     }
2006   while (NULL != (creply = pos->res_head))
2007     {
2008       GNUNET_CONTAINER_DLL_remove (pos->res_head,
2009                                    pos->res_tail,
2010                                    creply);
2011       GNUNET_free (creply);
2012     }    
2013   GNUNET_SERVER_client_drop (pos->client);
2014   GNUNET_free (pos);
2015   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
2016                                          &remove_client_from_last_client_replies,
2017                                          client);
2018 }
2019
2020
2021 /**
2022  * Iterator to free peer entries.
2023  *
2024  * @param cls closure, unused
2025  * @param key current key code
2026  * @param value value in the hash map (peer entry)
2027  * @return GNUNET_YES (we should continue to iterate)
2028  */
2029 static int 
2030 clean_peer (void *cls,
2031             const GNUNET_HashCode * key,
2032             void *value)
2033 {
2034   peer_disconnect_handler (NULL, (const struct GNUNET_PeerIdentity*) key);
2035   return GNUNET_YES;
2036 }
2037
2038
2039 /**
2040  * Task run during shutdown.
2041  *
2042  * @param cls unused
2043  * @param tc unused
2044  */
2045 static void
2046 shutdown_task (void *cls,
2047                const struct GNUNET_SCHEDULER_TaskContext *tc)
2048 {
2049   if (mig_qe != NULL)
2050     {
2051       GNUNET_DATASTORE_cancel (mig_qe);
2052       mig_qe = NULL;
2053     }
2054   if (dht_qe != NULL)
2055     {
2056       GNUNET_DATASTORE_cancel (dht_qe);
2057       dht_qe = NULL;
2058     }
2059   if (GNUNET_SCHEDULER_NO_TASK != mig_task)
2060     {
2061       GNUNET_SCHEDULER_cancel (mig_task);
2062       mig_task = GNUNET_SCHEDULER_NO_TASK;
2063     }
2064   if (GNUNET_SCHEDULER_NO_TASK != dht_task)
2065     {
2066       GNUNET_SCHEDULER_cancel (dht_task);
2067       dht_task = GNUNET_SCHEDULER_NO_TASK;
2068     }
2069   while (client_list != NULL)
2070     handle_client_disconnect (NULL,
2071                               client_list->client);
2072   cron_flush_trust (NULL, NULL);
2073   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
2074                                          &clean_peer,
2075                                          NULL);
2076   GNUNET_break (0 == GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap));
2077   GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
2078   requests_by_expiration_heap = 0;
2079   GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
2080   connected_peers = NULL;
2081   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map));
2082   GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
2083   query_request_map = NULL;
2084   GNUNET_LOAD_value_free (rt_entry_lifetime);
2085   rt_entry_lifetime = NULL;
2086   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map));
2087   GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
2088   peer_request_map = NULL;
2089   GNUNET_assert (NULL != core);
2090   GNUNET_CORE_disconnect (core);
2091   core = NULL;
2092   if (stats != NULL)
2093     {
2094       GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
2095       stats = NULL;
2096     }
2097   if (dsh != NULL)
2098     {
2099       GNUNET_DATASTORE_disconnect (dsh,
2100                                    GNUNET_NO);
2101       dsh = NULL;
2102     }
2103   while (mig_head != NULL)
2104     delete_migration_block (mig_head);
2105   GNUNET_assert (0 == mig_size);
2106   GNUNET_DHT_disconnect (dht_handle);
2107   dht_handle = NULL;
2108   GNUNET_LOAD_value_free (datastore_get_load);
2109   datastore_get_load = NULL;
2110   GNUNET_LOAD_value_free (datastore_put_load);
2111   datastore_put_load = NULL;
2112   GNUNET_BLOCK_context_destroy (block_ctx);
2113   block_ctx = NULL;
2114   GNUNET_CONFIGURATION_destroy (block_cfg);
2115   block_cfg = NULL;
2116   cfg = NULL;  
2117   GNUNET_free_non_null (trustDirectory);
2118   trustDirectory = NULL;
2119   GNUNET_SCHEDULER_cancel (cover_age_task);
2120   cover_age_task = GNUNET_SCHEDULER_NO_TASK;
2121 }
2122
2123
2124 /* ******************* Utility functions  ******************** */
2125
2126
2127 /**
2128  * We've had to delay a request for transmission to core, but now
2129  * we should be ready.  Run it.
2130  *
2131  * @param cls the 'struct ConnectedPeer' for which a request was delayed
2132  * @param tc task context (unused)
2133  */
2134 static void
2135 delayed_transmission_request (void *cls,
2136                               const struct GNUNET_SCHEDULER_TaskContext *tc)
2137 {
2138   struct ConnectedPeer *cp = cls;
2139   struct GNUNET_PeerIdentity pid;
2140   struct PendingMessage *pm;
2141
2142   pm = cp->pending_messages_head;
2143   cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
2144   GNUNET_assert (cp->cth == NULL);
2145   if (pm == NULL)
2146     return;
2147   GNUNET_PEER_resolve (cp->pid,
2148                        &pid);
2149   cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
2150   cp->cth = GNUNET_CORE_notify_transmit_ready (core,
2151                                                pm->priority,
2152                                                GNUNET_CONSTANTS_SERVICE_TIMEOUT,
2153                                                &pid,
2154                                                pm->msize,
2155                                                &transmit_to_peer,
2156                                                cp);
2157 }
2158
2159
2160 /**
2161  * Transmit messages by copying it to the target buffer
2162  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
2163  * for writing in the meantime.  In that case, do nothing
2164  * (the disconnect or shutdown handler will take care of the rest).
2165  * If we were able to transmit messages and there are still more
2166  * pending, ask core again for further calls to this function.
2167  *
2168  * @param cls closure, pointer to the 'struct ConnectedPeer*'
2169  * @param size number of bytes available in buf
2170  * @param buf where the callee should write the message
2171  * @return number of bytes written to buf
2172  */
2173 static size_t
2174 transmit_to_peer (void *cls,
2175                   size_t size, void *buf)
2176 {
2177   struct ConnectedPeer *cp = cls;
2178   char *cbuf = buf;
2179   struct PendingMessage *pm;
2180   struct PendingMessage *next_pm;
2181   struct GNUNET_TIME_Absolute now;
2182   struct GNUNET_TIME_Relative min_delay;
2183   struct MigrationReadyBlock *mb;
2184   struct MigrationReadyBlock *next;
2185   struct PutMessage migm;
2186   size_t msize;
2187   unsigned int i;
2188   struct GNUNET_PeerIdentity pid;
2189  
2190   cp->cth = NULL;
2191   if (NULL == buf)
2192     {
2193 #if DEBUG_FS
2194       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2195                   "Dropping message, core too busy.\n");
2196 #endif
2197       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2198                   "Dropping message, core too busy.\n");
2199       GNUNET_LOAD_update (cp->transmission_delay,
2200                           UINT64_MAX);
2201       
2202       if (NULL != (pm = cp->pending_messages_head))
2203         {
2204           GNUNET_CONTAINER_DLL_remove (cp->pending_messages_head,
2205                                        cp->pending_messages_tail,
2206                                        pm);
2207           cp->pending_requests--;    
2208           destroy_pending_message (pm, 0);
2209         }
2210       if (NULL != (pm = cp->pending_messages_head))
2211         {
2212           GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == cp->delayed_transmission_request_task);
2213           min_delay = GNUNET_TIME_absolute_get_remaining (pm->delay_until);
2214           cp->delayed_transmission_request_task
2215             = GNUNET_SCHEDULER_add_delayed (min_delay,
2216                                             &delayed_transmission_request,
2217                                             cp);
2218         }
2219       return 0;
2220     }  
2221   GNUNET_LOAD_update (cp->transmission_delay,
2222                       GNUNET_TIME_absolute_get_duration (cp->last_transmission_request_start).rel_value);
2223   now = GNUNET_TIME_absolute_get ();
2224   msize = 0;
2225   min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
2226   next_pm = cp->pending_messages_head;
2227   while ( (NULL != (pm = next_pm) ) &&
2228           (pm->msize <= size) )
2229     {
2230       next_pm = pm->next;
2231       if (pm->delay_until.abs_value > now.abs_value)
2232         {
2233           min_delay = GNUNET_TIME_relative_min (min_delay,
2234                                                 GNUNET_TIME_absolute_get_remaining (pm->delay_until));
2235           continue;
2236         }
2237       memcpy (&cbuf[msize], &pm[1], pm->msize);
2238       msize += pm->msize;
2239       size -= pm->msize;
2240       if (NULL == pm->pml)
2241         {
2242           GNUNET_CONTAINER_DLL_remove (cp->pending_messages_head,
2243                                        cp->pending_messages_tail,
2244                                        pm);
2245           cp->pending_requests--;
2246         }
2247       destroy_pending_message (pm, cp->pid);
2248     }
2249   if (pm != NULL)
2250     min_delay = GNUNET_TIME_UNIT_ZERO;
2251   if (NULL != cp->pending_messages_head)
2252     {     
2253       GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == cp->delayed_transmission_request_task);
2254       cp->delayed_transmission_request_task
2255         = GNUNET_SCHEDULER_add_delayed (min_delay,
2256                                         &delayed_transmission_request,
2257                                         cp);
2258     }
2259   if (pm == NULL)
2260     {      
2261       GNUNET_PEER_resolve (cp->pid,
2262                            &pid);
2263       next = mig_head;
2264       while (NULL != (mb = next))
2265         {
2266           next = mb->next;
2267           for (i=0;i<MIGRATION_LIST_SIZE;i++)
2268             {
2269               if ( (cp->pid == mb->target_list[i]) &&
2270                    (mb->size + sizeof (migm) <= size) )
2271                 {
2272                   GNUNET_PEER_change_rc (mb->target_list[i], -1);
2273                   mb->target_list[i] = 0;
2274                   mb->used_targets++;
2275                   memset (&migm, 0, sizeof (migm));
2276                   migm.header.size = htons (sizeof (migm) + mb->size);
2277                   migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2278                   migm.type = htonl (mb->type);
2279                   migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
2280                   memcpy (&cbuf[msize], &migm, sizeof (migm));
2281                   msize += sizeof (migm);
2282                   size -= sizeof (migm);
2283                   memcpy (&cbuf[msize], &mb[1], mb->size);
2284                   msize += mb->size;
2285                   size -= mb->size;
2286 #if DEBUG_FS
2287                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2288                               "Pushing migration block `%s' (%u bytes) to `%s'\n",
2289                               GNUNET_h2s (&mb->query),
2290                               (unsigned int) mb->size,
2291                               GNUNET_i2s (&pid));
2292 #endif    
2293                   break;
2294                 }
2295               else
2296                 {
2297 #if DEBUG_FS
2298                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2299                               "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
2300                               GNUNET_h2s (&mb->query),
2301                               (unsigned int) mb->size,
2302                               GNUNET_i2s (&pid));
2303 #endif    
2304                 }
2305             }
2306           if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
2307                (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
2308             {
2309               delete_migration_block (mb);
2310               consider_migration_gathering ();
2311             }
2312         }
2313       consider_migration (NULL, 
2314                           &pid.hashPubKey,
2315                           cp);
2316     }
2317 #if DEBUG_FS
2318   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2319               "Transmitting %u bytes to peer with PID %u\n",
2320               (unsigned int) msize,
2321               (unsigned int) cp->pid);
2322 #endif
2323   return msize;
2324 }
2325
2326
2327 /**
2328  * Add a message to the set of pending messages for the given peer.
2329  *
2330  * @param cp peer to send message to
2331  * @param pm message to queue
2332  * @param pr request on which behalf this message is being queued
2333  */
2334 static void
2335 add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
2336                                   struct PendingMessage *pm,
2337                                   struct PendingRequest *pr)
2338 {
2339   struct PendingMessage *pos;
2340   struct PendingMessageList *pml;
2341   struct GNUNET_PeerIdentity pid;
2342
2343   GNUNET_assert (pm->next == NULL);
2344   GNUNET_assert (pm->pml == NULL);    
2345   if (pr != NULL)
2346     {
2347       pml = GNUNET_malloc (sizeof (struct PendingMessageList));
2348       pml->req = pr;
2349       pml->target = cp;
2350       pml->pm = pm;
2351       pm->pml = pml;  
2352       GNUNET_CONTAINER_DLL_insert (pr->pending_head,
2353                                    pr->pending_tail,
2354                                    pml);
2355     }
2356   pos = cp->pending_messages_head;
2357   while ( (pos != NULL) &&
2358           (pm->priority < pos->priority) )
2359     pos = pos->next;    
2360   GNUNET_CONTAINER_DLL_insert_after (cp->pending_messages_head,
2361                                      cp->pending_messages_tail,
2362                                      pos,
2363                                      pm);
2364   cp->pending_requests++;
2365   if (cp->pending_requests > MAX_QUEUE_PER_PEER)
2366     {
2367       GNUNET_STATISTICS_update (stats,
2368                                 gettext_noop ("# P2P searches discarded (queue length bound)"),
2369                                 1,
2370                                 GNUNET_NO);
2371       destroy_pending_message (cp->pending_messages_tail, 0);  
2372     }
2373   GNUNET_PEER_resolve (cp->pid, &pid);
2374   if (NULL != cp->cth)
2375     {
2376       GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
2377       cp->cth = NULL;
2378     }
2379   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
2380     {
2381       GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task);
2382       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
2383     }
2384   /* need to schedule transmission */
2385   cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
2386   cp->cth = GNUNET_CORE_notify_transmit_ready (core,
2387                                                cp->pending_messages_head->priority,
2388                                                MAX_TRANSMIT_DELAY,
2389                                                &pid,
2390                                                cp->pending_messages_head->msize,
2391                                                &transmit_to_peer,
2392                                                cp);
2393   if (cp->cth == NULL)
2394     {
2395 #if DEBUG_FS
2396       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2397                   "Failed to schedule transmission with core!\n");
2398 #endif
2399       GNUNET_STATISTICS_update (stats,
2400                                 gettext_noop ("# CORE transmission failures"),
2401                                 1,
2402                                 GNUNET_NO);
2403     }
2404 }
2405
2406
2407 /**
2408  * Test if the DATABASE (GET) load on this peer is too high
2409  * to even consider processing the query at
2410  * all.  
2411  * 
2412  * @return GNUNET_YES if the load is too high to do anything (load high)
2413  *         GNUNET_NO to process normally (load normal)
2414  *         GNUNET_SYSERR to process for free (load low)
2415  */
2416 static int
2417 test_get_load_too_high (uint32_t priority)
2418 {
2419   double ld;
2420
2421   ld = GNUNET_LOAD_get_load (datastore_get_load);
2422   if (ld < 1)
2423     return GNUNET_SYSERR;    
2424   if (ld <= priority)    
2425     return GNUNET_NO;    
2426   return GNUNET_YES;
2427 }
2428
2429
2430
2431
2432 /**
2433  * Test if the DATABASE (PUT) load on this peer is too high
2434  * to even consider processing the query at
2435  * all.  
2436  * 
2437  * @return GNUNET_YES if the load is too high to do anything (load high)
2438  *         GNUNET_NO to process normally (load normal or low)
2439  */
2440 static int
2441 test_put_load_too_high (uint32_t priority)
2442 {
2443   double ld;
2444
2445   if (GNUNET_LOAD_get_average (datastore_put_load) < 50)
2446     return GNUNET_NO; /* very fast */
2447   ld = GNUNET_LOAD_get_load (datastore_put_load);
2448   if (ld < 2.0 * (1 + priority))
2449     return GNUNET_NO;
2450   GNUNET_STATISTICS_update (stats,
2451                             gettext_noop ("# storage requests dropped due to high load"),
2452                             1,
2453                             GNUNET_NO);
2454   return GNUNET_YES;
2455 }
2456
2457
2458 /* ******************* Pending Request Refresh Task ******************** */
2459
2460
2461
2462 /**
2463  * We use a random delay to make the timing of requests less
2464  * predictable.  This function returns such a random delay.  We add a base
2465  * delay of MAX_CORK_DELAY (1s).
2466  *
2467  * FIXME: make schedule dependent on the specifics of the request?
2468  * Or bandwidth and number of connected peers and load?
2469  *
2470  * @return random delay to use for some request, between 1s and 1000+TTL_DECREMENT ms
2471  */
2472 static struct GNUNET_TIME_Relative
2473 get_processing_delay ()
2474 {
2475   return 
2476     GNUNET_TIME_relative_add (GNUNET_CONSTANTS_MAX_CORK_DELAY,
2477                               GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
2478                                                              GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2479                                                                                        TTL_DECREMENT)));
2480 }
2481
2482
2483 /**
2484  * We're processing a GET request from another peer and have decided
2485  * to forward it to other peers.  This function is called periodically
2486  * and should forward the request to other peers until we have all
2487  * possible replies.  If we have transmitted the *only* reply to
2488  * the initiator we should destroy the pending request.  If we have
2489  * many replies in the queue to the initiator, we should delay sending
2490  * out more queries until the reply queue has shrunk some.
2491  *
2492  * @param cls our "struct ProcessGetContext *"
2493  * @param tc unused
2494  */
2495 static void
2496 forward_request_task (void *cls,
2497                       const struct GNUNET_SCHEDULER_TaskContext *tc);
2498
2499
2500 /**
2501  * Function called after we either failed or succeeded
2502  * at transmitting a query to a peer.  
2503  *
2504  * @param cls the requests "struct PendingRequest*"
2505  * @param tpid ID of receiving peer, 0 on transmission error
2506  */
2507 static void
2508 transmit_query_continuation (void *cls,
2509                              GNUNET_PEER_Id tpid)
2510 {
2511   struct PendingRequest *pr = cls;
2512   unsigned int i;
2513
2514   GNUNET_STATISTICS_update (stats,
2515                             gettext_noop ("# queries scheduled for forwarding"),
2516                             -1,
2517                             GNUNET_NO);
2518   if (tpid == 0)   
2519     {
2520 #if DEBUG_FS
2521       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2522                   "Transmission of request failed, will try again later.\n");
2523 #endif
2524       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2525         pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2526                                                  &forward_request_task,
2527                                                  pr); 
2528       return;    
2529     }
2530 #if DEBUG_FS
2531   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2532               "Transmitted query `%s'\n",
2533               GNUNET_h2s (&pr->query));
2534 #endif
2535   GNUNET_STATISTICS_update (stats,
2536                             gettext_noop ("# queries forwarded"),
2537                             1,
2538                             GNUNET_NO);
2539   for (i=0;i<pr->used_targets_off;i++)
2540     if (pr->used_targets[i].pid == tpid)
2541       break; /* found match! */    
2542   if (i == pr->used_targets_off)
2543     {
2544       /* need to create new entry */
2545       if (pr->used_targets_off == pr->used_targets_size)
2546         GNUNET_array_grow (pr->used_targets,
2547                            pr->used_targets_size,
2548                            pr->used_targets_size * 2 + 2);
2549       GNUNET_PEER_change_rc (tpid, 1);
2550       pr->used_targets[pr->used_targets_off].pid = tpid;
2551       pr->used_targets[pr->used_targets_off].num_requests = 0;
2552       i = pr->used_targets_off++;
2553     }
2554   pr->used_targets[i].last_request_time = GNUNET_TIME_absolute_get ();
2555   pr->used_targets[i].num_requests++;
2556   if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2557     pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2558                                              &forward_request_task,
2559                                              pr);
2560 }
2561
2562
2563 /**
2564  * How many bytes should a bloomfilter be if we have already seen
2565  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
2566  * of bits set per entry.  Furthermore, we should not re-size the
2567  * filter too often (to keep it cheap).
2568  *
2569  * Since other peers will also add entries but not resize the filter,
2570  * we should generally pick a slightly larger size than what the
2571  * strict math would suggest.
2572  *
2573  * @return must be a power of two and smaller or equal to 2^15.
2574  */
2575 static size_t
2576 compute_bloomfilter_size (unsigned int entry_count)
2577 {
2578   size_t size;
2579   unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
2580   uint16_t max = 1 << 15;
2581
2582   if (entry_count > max)
2583     return max;
2584   size = 8;
2585   while ((size < max) && (size < ideal))
2586     size *= 2;
2587   if (size > max)
2588     return max;
2589   return size;
2590 }
2591
2592
2593 /**
2594  * Recalculate our bloom filter for filtering replies.  This function
2595  * will create a new bloom filter from scratch, so it should only be
2596  * called if we have no bloomfilter at all (and hence can create a
2597  * fresh one of minimal size without problems) OR if our peer is the
2598  * initiator (in which case we may resize to larger than mimimum size).
2599  *
2600  * @param pr request for which the BF is to be recomputed
2601  */
2602 static void
2603 refresh_bloomfilter (struct PendingRequest *pr)
2604 {
2605   unsigned int i;
2606   size_t nsize;
2607   GNUNET_HashCode mhash;
2608
2609   nsize = compute_bloomfilter_size (pr->replies_seen_off);
2610   if (nsize == pr->bf_size)
2611     return; /* size not changed */
2612   if (pr->bf != NULL)
2613     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
2614   pr->bf_size = nsize;
2615   pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
2616   pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
2617                                               pr->bf_size,
2618                                               BLOOMFILTER_K);
2619   for (i=0;i<pr->replies_seen_off;i++)
2620     {
2621       GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i],
2622                                 pr->mingle,
2623                                 &mhash);
2624       GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
2625     }
2626 }
2627
2628
2629 /**
2630  * Function called after we've tried to reserve a certain amount of
2631  * bandwidth for a reply.  Check if we succeeded and if so send our
2632  * query.
2633  *
2634  * @param cls the requests "struct PendingRequest*"
2635  * @param peer identifies the peer
2636  * @param bpm_out set to the current bandwidth limit (sending) for this peer
2637  * @param amount set to the amount that was actually reserved or unreserved
2638  * @param preference current traffic preference for the given peer
2639  */
2640 static void
2641 target_reservation_cb (void *cls,
2642                        const struct
2643                        GNUNET_PeerIdentity * peer,
2644                        struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
2645                        int amount,
2646                        uint64_t preference)
2647 {
2648   struct PendingRequest *pr = cls;
2649   struct ConnectedPeer *cp;
2650   struct PendingMessage *pm;
2651   struct GetMessage *gm;
2652   GNUNET_HashCode *ext;
2653   char *bfdata;
2654   size_t msize;
2655   unsigned int k;
2656   int no_route;
2657   uint32_t bm;
2658   unsigned int i;
2659
2660   /* (3) transmit, update ttl/priority */
2661   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2662                                           &peer->hashPubKey);
2663   if (cp == NULL)
2664     {
2665       /* Peer must have just left */
2666 #if DEBUG_FS
2667       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2668                   "Selected peer disconnected!\n");
2669 #endif
2670       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2671         pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2672                                                  &forward_request_task,
2673                                                  pr);
2674       return;
2675     }
2676   cp->irc = NULL;
2677   pr->pirc = NULL;
2678   if (peer == NULL)
2679     {
2680       /* error in communication with core, try again later */
2681       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2682         pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2683                                                  &forward_request_task,
2684                                                  pr);
2685       return;
2686     }
2687   no_route = GNUNET_NO;
2688   if (amount == 0)
2689     {
2690       if (pr->cp == NULL)
2691         {
2692 #if DEBUG_FS > 1
2693           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2694                       "Failed to reserve bandwidth for reply (got %d/%u bytes only)!\n",
2695                       amount,
2696                       DBLOCK_SIZE);
2697 #endif
2698           GNUNET_STATISTICS_update (stats,
2699                                     gettext_noop ("# reply bandwidth reservation requests failed"),
2700                                     1,
2701                                     GNUNET_NO);
2702           if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2703             pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2704                                                      &forward_request_task,
2705                                                      pr);
2706           return;  /* this target round failed */
2707         }
2708       no_route = GNUNET_YES;
2709     }
2710   
2711   GNUNET_STATISTICS_update (stats,
2712                             gettext_noop ("# queries scheduled for forwarding"),
2713                             1,
2714                             GNUNET_NO);
2715   for (i=0;i<pr->used_targets_off;i++)
2716     if (pr->used_targets[i].pid == cp->pid) 
2717       {
2718         GNUNET_STATISTICS_update (stats,
2719                                   gettext_noop ("# queries retransmitted to same target"),
2720                                   1,
2721                                   GNUNET_NO);
2722         break;
2723       } 
2724
2725   /* build message and insert message into priority queue */
2726 #if DEBUG_FS
2727   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2728               "Forwarding request `%s' to `%4s'!\n",
2729               GNUNET_h2s (&pr->query),
2730               GNUNET_i2s (peer));
2731 #endif
2732   k = 0;
2733   bm = 0;
2734   if (GNUNET_YES == no_route)
2735     {
2736       bm |= GET_MESSAGE_BIT_RETURN_TO;
2737       k++;      
2738     }
2739   if (pr->namespace != NULL)
2740     {
2741       bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
2742       k++;
2743     }
2744   if (pr->target_pid != 0)
2745     {
2746       bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
2747       k++;
2748     }
2749   msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
2750   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
2751   pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
2752   pm->msize = msize;
2753   gm = (struct GetMessage*) &pm[1];
2754   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
2755   gm->header.size = htons (msize);
2756   gm->type = htonl (pr->type);
2757   pr->remaining_priority /= 2;
2758   gm->priority = htonl (pr->remaining_priority);
2759   gm->ttl = htonl (pr->ttl);
2760   gm->filter_mutator = htonl(pr->mingle); 
2761   gm->hash_bitmap = htonl (bm);
2762   gm->query = pr->query;
2763   ext = (GNUNET_HashCode*) &gm[1];
2764   k = 0;
2765   if (GNUNET_YES == no_route)
2766     GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2767   if (pr->namespace != NULL)
2768     memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
2769   if (pr->target_pid != 0)
2770     GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2771   bfdata = (char *) &ext[k];
2772   if (pr->bf != NULL)
2773     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
2774                                                bfdata,
2775                                                pr->bf_size);
2776   pm->cont = &transmit_query_continuation;
2777   pm->cont_cls = pr;
2778   cp->last_request_times[(cp->last_request_times_off++) % MAX_QUEUE_PER_PEER] = GNUNET_TIME_absolute_get ();
2779   add_to_pending_messages_for_peer (cp, pm, pr);
2780 }
2781
2782
2783 /**
2784  * Closure used for "target_peer_select_cb".
2785  */
2786 struct PeerSelectionContext 
2787 {
2788   /**
2789    * The request for which we are selecting
2790    * peers.
2791    */
2792   struct PendingRequest *pr;
2793
2794   /**
2795    * Current "prime" target.
2796    */
2797   struct GNUNET_PeerIdentity target;
2798
2799   /**
2800    * How much do we like this target?
2801    */
2802   double target_score;
2803
2804   /**
2805    * Does it make sense to we re-try quickly again?
2806    */
2807   int fast_retry;
2808
2809 };
2810
2811
2812 /**
2813  * Function called for each connected peer to determine
2814  * which one(s) would make good targets for forwarding.
2815  *
2816  * @param cls closure (struct PeerSelectionContext)
2817  * @param key current key code (peer identity)
2818  * @param value value in the hash map (struct ConnectedPeer)
2819  * @return GNUNET_YES if we should continue to
2820  *         iterate,
2821  *         GNUNET_NO if not.
2822  */
2823 static int
2824 target_peer_select_cb (void *cls,
2825                        const GNUNET_HashCode * key,
2826                        void *value)
2827 {
2828   struct PeerSelectionContext *psc = cls;
2829   struct ConnectedPeer *cp = value;
2830   struct PendingRequest *pr = psc->pr;
2831   struct GNUNET_TIME_Relative delay;
2832   double score;
2833   unsigned int i;
2834   unsigned int pc;
2835
2836   /* 1) check that this peer is not the initiator */
2837   if (cp == pr->cp)     
2838     {
2839 #if DEBUG_FS
2840       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2841                   "Skipping initiator in forwarding selection\n");
2842 #endif
2843       return GNUNET_YES; /* skip */        
2844     }
2845   if (cp->irc != NULL)
2846     {
2847       psc->fast_retry = GNUNET_YES;
2848       return GNUNET_YES; /* skip: already querying core about this peer for other reasons */
2849     }
2850
2851   /* 2) check if we have already (recently) forwarded to this peer */
2852   /* 2a) this particular request */
2853   pc = 0;
2854   for (i=0;i<pr->used_targets_off;i++)
2855     if (pr->used_targets[i].pid == cp->pid) 
2856       {
2857         pc = pr->used_targets[i].num_requests;
2858         GNUNET_assert (pc > 0);
2859         /* FIXME: make re-enabling a peer independent of how often
2860            this function is called??? */
2861         if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2862                                            RETRY_PROBABILITY_INV * pc))
2863           {
2864 #if DEBUG_FS
2865             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2866                         "NOT re-trying query that was previously transmitted %u times\n",
2867                         (unsigned int) pc);
2868 #endif
2869             return GNUNET_YES; /* skip */
2870           }
2871         break;
2872       }
2873 #if DEBUG_FS
2874   if (0 < pc)
2875     {
2876       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2877                   "Re-trying query that was previously transmitted %u times to this peer\n",
2878                   (unsigned int) pc);
2879     }
2880 #endif
2881   /* 2b) many other requests to this peer */
2882   delay = GNUNET_TIME_absolute_get_duration (cp->last_request_times[cp->last_request_times_off % MAX_QUEUE_PER_PEER]);
2883   if (delay.rel_value <= cp->avg_delay.rel_value)
2884     {
2885 #if DEBUG_FS
2886       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2887                   "NOT sending query since we send %u others to this peer in the last %llums\n",
2888                   MAX_QUEUE_PER_PEER,
2889                   cp->avg_delay.rel_value);
2890 #endif
2891       return GNUNET_YES; /* skip */      
2892     }
2893
2894   /* 3) calculate how much we'd like to forward to this peer,
2895      starting with a random value that is strong enough
2896      to at least give any peer a chance sometimes 
2897      (compared to the other factors that come later) */
2898   /* 3a) count successful (recent) routes from cp for same source */
2899   if (pr->cp != NULL)
2900     {
2901       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2902                                         P2P_SUCCESS_LIST_SIZE);
2903       for (i=0;i<P2P_SUCCESS_LIST_SIZE;i++)
2904         if (cp->last_p2p_replies[i] == pr->cp->pid)
2905           score += 1.0; /* likely successful based on hot path */
2906     }
2907   else
2908     {
2909       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2910                                         CS2P_SUCCESS_LIST_SIZE);
2911       for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
2912         if (cp->last_client_replies[i] == pr->client_request_list->client_list->client)
2913           score += 1.0; /* likely successful based on hot path */
2914     }
2915   /* 3b) include latency */
2916   if (cp->avg_delay.rel_value < 4 * TTL_DECREMENT)
2917     score += 1.0; /* likely fast based on latency */
2918   /* 3c) include priorities */
2919   if (cp->avg_priority <= pr->remaining_priority / 2.0)
2920     score += 1.0; /* likely successful based on priorities */
2921   /* 3d) penalize for queue size */  
2922   score -= (2.0 * cp->pending_requests / (double) MAX_QUEUE_PER_PEER); 
2923   /* 3e) include peer proximity */
2924   score -= (2.0 * (GNUNET_CRYPTO_hash_distance_u32 (key,
2925                                                     &pr->query)) / (double) UINT32_MAX);
2926   /* 4) super-bonus for being the known target */
2927   if (pr->target_pid == cp->pid)
2928     score += 100.0;
2929   /* store best-fit in closure */
2930   score++; /* avoid zero */
2931   if (score > psc->target_score)
2932     {
2933       psc->target_score = score;
2934       psc->target.hashPubKey = *key; 
2935     }
2936 #if DEBUG_FS
2937   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2938               "Peer `%s' gets score %f for forwarding query, max is %8f\n",
2939               GNUNET_h2s (key),
2940               score,
2941               psc->target_score);
2942 #endif
2943   return GNUNET_YES;
2944 }
2945   
2946
2947 /**
2948  * The priority level imposes a bound on the maximum
2949  * value for the ttl that can be requested.
2950  *
2951  * @param ttl_in requested ttl
2952  * @param prio given priority
2953  * @return ttl_in if ttl_in is below the limit,
2954  *         otherwise the ttl-limit for the given priority
2955  */
2956 static int32_t
2957 bound_ttl (int32_t ttl_in, uint32_t prio)
2958 {
2959   unsigned long long allowed;
2960
2961   if (ttl_in <= 0)
2962     return ttl_in;
2963   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
2964   if (ttl_in > allowed)      
2965     {
2966       if (allowed >= (1 << 30))
2967         return 1 << 30;
2968       return allowed;
2969     }
2970   return ttl_in;
2971 }
2972
2973
2974 /**
2975  * Iterator called on each result obtained for a DHT
2976  * operation that expects a reply
2977  *
2978  * @param cls closure
2979  * @param exp when will this value expire
2980  * @param key key of the result
2981  * @param get_path NULL-terminated array of pointers
2982  *                 to the peers on reverse GET path (or NULL if not recorded)
2983  * @param put_path NULL-terminated array of pointers
2984  *                 to the peers on the PUT path (or NULL if not recorded)
2985  * @param type type of the result
2986  * @param size number of bytes in data
2987  * @param data pointer to the result data
2988  */
2989 static void
2990 process_dht_reply (void *cls,
2991                    struct GNUNET_TIME_Absolute exp,
2992                    const GNUNET_HashCode * key,
2993                    const struct GNUNET_PeerIdentity * const *get_path,
2994                    const struct GNUNET_PeerIdentity * const *put_path,
2995                    enum GNUNET_BLOCK_Type type,
2996                    size_t size,
2997                    const void *data);
2998
2999
3000 /**
3001  * We're processing a GET request and have decided
3002  * to forward it to other peers.  This function is called periodically
3003  * and should forward the request to other peers until we have all
3004  * possible replies.  If we have transmitted the *only* reply to
3005  * the initiator we should destroy the pending request.  If we have
3006  * many replies in the queue to the initiator, we should delay sending
3007  * out more queries until the reply queue has shrunk some.
3008  *
3009  * @param cls our "struct ProcessGetContext *"
3010  * @param tc unused
3011  */
3012 static void
3013 forward_request_task (void *cls,
3014                      const struct GNUNET_SCHEDULER_TaskContext *tc)
3015 {
3016   struct PendingRequest *pr = cls;
3017   struct PeerSelectionContext psc;
3018   struct ConnectedPeer *cp; 
3019   struct GNUNET_TIME_Relative delay;
3020
3021   pr->task = GNUNET_SCHEDULER_NO_TASK;
3022   if (pr->pirc != NULL)
3023     {
3024 #if DEBUG_FS
3025       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3026                   "Forwarding of query `%s' not attempted due to pending local lookup!\n",
3027                   GNUNET_h2s (&pr->query));
3028 #endif
3029       return; /* already pending */
3030     }
3031   if (GNUNET_YES == pr->local_only)
3032     return; /* configured to not do P2P search */
3033   /* (0) try DHT */
3034   if ( (0 == pr->anonymity_level) &&
3035        (GNUNET_YES != pr->forward_only) &&
3036        (pr->type != GNUNET_BLOCK_TYPE_FS_DBLOCK) &&
3037        (pr->type != GNUNET_BLOCK_TYPE_FS_IBLOCK) )
3038     {
3039       pr->dht_get = GNUNET_DHT_get_start (dht_handle,
3040                                           GNUNET_TIME_UNIT_FOREVER_REL,
3041                                           pr->type,
3042                                           &pr->query,
3043                                           DEFAULT_GET_REPLICATION,
3044                                           GNUNET_DHT_RO_NONE,
3045                                           pr->bf,
3046                                           pr->mingle,
3047                                           pr->namespace,
3048                                           (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
3049                                           &process_dht_reply,
3050                                           pr);
3051     }
3052
3053   if ( (pr->anonymity_level > 1) &&
3054        (cover_query_count < pr->anonymity_level - 1) )
3055     {
3056       delay = get_processing_delay ();
3057 #if DEBUG_FS 
3058       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3059                   "Not enough cover traffic to forward query `%s', will try again in %llu ms!\n",
3060                   GNUNET_h2s (&pr->query),
3061                   delay.rel_value);
3062 #endif
3063       pr->task = GNUNET_SCHEDULER_add_delayed (delay,
3064                                                &forward_request_task,
3065                                                pr);
3066       return;
3067     }
3068   /* consume cover traffic */
3069   if (pr->anonymity_level > 1) 
3070     cover_query_count -= pr->anonymity_level - 1;
3071
3072   /* (1) select target */
3073   psc.pr = pr;
3074   psc.target_score = -DBL_MAX;
3075   psc.fast_retry = GNUNET_NO;
3076   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
3077                                          &target_peer_select_cb,
3078                                          &psc);  
3079   if (psc.target_score == -DBL_MAX)
3080     {
3081       if (psc.fast_retry == GNUNET_YES)
3082         delay = GNUNET_TIME_UNIT_MILLISECONDS; /* FIXME: store adaptive fast-retry value in 'pr' */
3083       else
3084         delay = get_processing_delay ();
3085 #if DEBUG_FS 
3086       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3087                   "No peer selected for forwarding of query `%s', will try again in %llu ms!\n",
3088                   GNUNET_h2s (&pr->query),
3089                   delay.rel_value);
3090 #endif
3091       pr->task = GNUNET_SCHEDULER_add_delayed (delay,
3092                                                &forward_request_task,
3093                                                pr);
3094       return; /* nobody selected */
3095     }
3096   /* (3) update TTL/priority */
3097   if (pr->client_request_list != NULL)
3098     {
3099       /* FIXME: use better algorithm!? */
3100       if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3101                                          4))
3102         pr->priority++;
3103       /* bound priority we use by priorities we see from other peers
3104          rounded up (must round up so that we can see non-zero
3105          priorities, but round up as little as possible to make it
3106          plausible that we forwarded another peers request) */
3107       if (pr->priority > current_priorities + 1.0)
3108         pr->priority = (uint32_t) current_priorities + 1.0;
3109       pr->ttl = bound_ttl (pr->ttl + TTL_DECREMENT * 2,
3110                            pr->priority);
3111 #if DEBUG_FS
3112       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3113                   "Trying query `%s' with priority %u and TTL %d.\n",
3114                   GNUNET_h2s (&pr->query),
3115                   pr->priority,
3116                   pr->ttl);
3117 #endif
3118     }
3119
3120   /* (3) reserve reply bandwidth */
3121   if (GNUNET_NO == pr->forward_only)
3122     {
3123       cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3124                                               &psc.target.hashPubKey);
3125       GNUNET_assert (NULL != cp);
3126       GNUNET_assert (cp->irc == NULL);
3127       pr->pirc = cp;
3128       cp->pr = pr;
3129       cp->irc = GNUNET_CORE_peer_change_preference (core,
3130                                                     &psc.target,
3131                                                     GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
3132                                                     GNUNET_BANDWIDTH_value_init (UINT32_MAX),
3133                                                     DBLOCK_SIZE * 2, 
3134                                                     cp->inc_preference,
3135                                                     &target_reservation_cb,
3136                                                     pr);
3137       GNUNET_assert (cp->irc != NULL);
3138       cp->inc_preference = 0;
3139     }
3140   else
3141     {
3142       /* force forwarding */
3143       static struct GNUNET_BANDWIDTH_Value32NBO zerobw;
3144       target_reservation_cb (pr, &psc.target,
3145                              zerobw, 0, 0.0);
3146     }
3147 }
3148
3149
3150 /* **************************** P2P PUT Handling ************************ */
3151
3152
3153 /**
3154  * Function called after we either failed or succeeded
3155  * at transmitting a reply to a peer.  
3156  *
3157  * @param cls the requests "struct PendingRequest*"
3158  * @param tpid ID of receiving peer, 0 on transmission error
3159  */
3160 static void
3161 transmit_reply_continuation (void *cls,
3162                              GNUNET_PEER_Id tpid)
3163 {
3164   struct PendingRequest *pr = cls;
3165   
3166   switch (pr->type)
3167     {
3168     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
3169     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
3170       /* only one reply expected, done with the request! */
3171       destroy_pending_request (pr);
3172       break;
3173     case GNUNET_BLOCK_TYPE_ANY:
3174     case GNUNET_BLOCK_TYPE_FS_KBLOCK:
3175     case GNUNET_BLOCK_TYPE_FS_SBLOCK:
3176       break;
3177     default:
3178       GNUNET_break (0);
3179       break;
3180     }
3181 }
3182
3183
3184 /**
3185  * Transmit the given message by copying it to the target buffer
3186  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
3187  * for writing in the meantime.  In that case, do nothing
3188  * (the disconnect or shutdown handler will take care of the rest).
3189  * If we were able to transmit messages and there are still more
3190  * pending, ask core again for further calls to this function.
3191  *
3192  * @param cls closure, pointer to the 'struct ClientList*'
3193  * @param size number of bytes available in buf
3194  * @param buf where the callee should write the message
3195  * @return number of bytes written to buf
3196  */
3197 static size_t
3198 transmit_to_client (void *cls,
3199                   size_t size, void *buf)
3200 {
3201   struct ClientList *cl = cls;
3202   char *cbuf = buf;
3203   struct ClientResponseMessage *creply;
3204   size_t msize;
3205   
3206   cl->th = NULL;
3207   if (NULL == buf)
3208     {
3209 #if DEBUG_FS
3210       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3211                   "Not sending reply, client communication problem.\n");
3212 #endif
3213       return 0;
3214     }
3215   msize = 0;
3216   while ( (NULL != (creply = cl->res_head) ) &&
3217           (creply->msize <= size) )
3218     {
3219       memcpy (&cbuf[msize], &creply[1], creply->msize);
3220       msize += creply->msize;
3221       size -= creply->msize;
3222       GNUNET_CONTAINER_DLL_remove (cl->res_head,
3223                                    cl->res_tail,
3224                                    creply);
3225       GNUNET_free (creply);
3226     }
3227   if (NULL != creply)
3228     cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
3229                                                   creply->msize,
3230                                                   GNUNET_TIME_UNIT_FOREVER_REL,
3231                                                   &transmit_to_client,
3232                                                   cl);
3233 #if DEBUG_FS
3234   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3235               "Transmitted %u bytes to client\n",
3236               (unsigned int) msize);
3237 #endif
3238   return msize;
3239 }
3240
3241
3242 /**
3243  * Closure for "process_reply" function.
3244  */
3245 struct ProcessReplyClosure
3246 {
3247   /**
3248    * The data for the reply.
3249    */
3250   const void *data;
3251
3252   /**
3253    * Who gave us this reply? NULL for local host (or DHT)
3254    */
3255   struct ConnectedPeer *sender;
3256
3257   /**
3258    * When the reply expires.
3259    */
3260   struct GNUNET_TIME_Absolute expiration;
3261
3262   /**
3263    * Size of data.
3264    */
3265   size_t size;
3266
3267   /**
3268    * Type of the block.
3269    */
3270   enum GNUNET_BLOCK_Type type;
3271
3272   /**
3273    * How much was this reply worth to us?
3274    */
3275   uint32_t priority;
3276
3277   /**
3278    * Anonymity requirements for this reply.
3279    */
3280   uint32_t anonymity_level;
3281
3282   /**
3283    * Evaluation result (returned).
3284    */
3285   enum GNUNET_BLOCK_EvaluationResult eval;
3286
3287   /**
3288    * Did we finish processing the associated request?
3289    */ 
3290   int finished;
3291
3292   /**
3293    * Did we find a matching request?
3294    */
3295   int request_found;
3296 };
3297
3298
3299 /**
3300  * We have received a reply; handle it!
3301  *
3302  * @param cls response (struct ProcessReplyClosure)
3303  * @param key our query
3304  * @param value value in the hash map (info about the query)
3305  * @return GNUNET_YES (we should continue to iterate)
3306  */
3307 static int
3308 process_reply (void *cls,
3309                const GNUNET_HashCode * key,
3310                void *value)
3311 {
3312   struct ProcessReplyClosure *prq = cls;
3313   struct PendingRequest *pr = value;
3314   struct PendingMessage *reply;
3315   struct ClientResponseMessage *creply;
3316   struct ClientList *cl;
3317   struct PutMessage *pm;
3318   struct ConnectedPeer *cp;
3319   struct GNUNET_TIME_Relative cur_delay;
3320 #if SUPPORT_DELAYS  
3321 struct GNUNET_TIME_Relative art_delay;
3322 #endif
3323   size_t msize;
3324   unsigned int i;
3325
3326   if (NULL == pr->client_request_list)
3327     {
3328       /* reply will go over the network, check for cover traffic */
3329       if ( (prq->anonymity_level >  1) &&
3330            (cover_content_count < prq->anonymity_level - 1) )
3331         {
3332           /* insufficient cover traffic, skip */
3333           GNUNET_STATISTICS_update (stats,
3334                                     gettext_noop ("# replies suppressed due to lack of cover traffic"),
3335                                     1,
3336                                     GNUNET_NO);
3337           return GNUNET_YES;
3338         }       
3339       if (prq->anonymity_level >  1) 
3340         cover_content_count -= prq->anonymity_level - 1;
3341     }
3342 #if DEBUG_FS
3343   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3344               "Matched result (type %u) for query `%s' with pending request\n",
3345               (unsigned int) prq->type,
3346               GNUNET_h2s (key));
3347 #endif  
3348   GNUNET_STATISTICS_update (stats,
3349                             gettext_noop ("# replies received and matched"),
3350                             1,
3351                             GNUNET_NO);
3352   if (prq->sender != NULL)
3353     {
3354       for (i=0;i<pr->used_targets_off;i++)
3355         if (pr->used_targets[i].pid == prq->sender->pid)
3356           break;
3357       if (i < pr->used_targets_off)
3358         {
3359           cur_delay = GNUNET_TIME_absolute_get_duration (pr->used_targets[i].last_request_time);      
3360           prq->sender->avg_delay.rel_value
3361             = (prq->sender->avg_delay.rel_value * 
3362                (RUNAVG_DELAY_N - 1) + cur_delay.rel_value) / RUNAVG_DELAY_N; 
3363           prq->sender->avg_priority
3364             = (prq->sender->avg_priority * 
3365                (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
3366         }
3367       if (pr->cp != NULL)
3368         {
3369           GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
3370                                  [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], 
3371                                  -1);
3372           GNUNET_PEER_change_rc (pr->cp->pid, 1);
3373           prq->sender->last_p2p_replies
3374             [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
3375             = pr->cp->pid;
3376         }
3377       else
3378         {
3379           if (NULL != prq->sender->last_client_replies
3380               [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
3381             GNUNET_SERVER_client_drop (prq->sender->last_client_replies
3382                                        [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
3383           prq->sender->last_client_replies
3384             [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
3385             = pr->client_request_list->client_list->client;
3386           GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
3387         }
3388     }
3389   prq->eval = GNUNET_BLOCK_evaluate (block_ctx,
3390                                      prq->type,
3391                                      key,
3392                                      &pr->bf,
3393                                      pr->mingle,
3394                                      pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
3395                                      prq->data,
3396                                      prq->size);
3397   switch (prq->eval)
3398     {
3399     case GNUNET_BLOCK_EVALUATION_OK_MORE:
3400       break;
3401     case GNUNET_BLOCK_EVALUATION_OK_LAST:
3402       while (NULL != pr->pending_head)
3403         destroy_pending_message_list_entry (pr->pending_head);
3404       if (pr->qe != NULL)
3405         {
3406           if (pr->client_request_list != NULL)
3407             GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
3408                                         GNUNET_YES);
3409           GNUNET_DATASTORE_cancel (pr->qe);
3410           pr->qe = NULL;
3411         }
3412       pr->do_remove = GNUNET_YES;
3413       if (pr->task != GNUNET_SCHEDULER_NO_TASK)
3414         {
3415           GNUNET_SCHEDULER_cancel (pr->task);
3416           pr->task = GNUNET_SCHEDULER_NO_TASK;
3417         }
3418       GNUNET_break (GNUNET_YES ==
3419                     GNUNET_CONTAINER_multihashmap_remove (query_request_map,
3420                                                           key,
3421                                                           pr));
3422       GNUNET_LOAD_update (rt_entry_lifetime,
3423                           GNUNET_TIME_absolute_get_duration (pr->start_time).rel_value);
3424       break;
3425     case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
3426       GNUNET_STATISTICS_update (stats,
3427                                 gettext_noop ("# duplicate replies discarded (bloomfilter)"),
3428                                 1,
3429                                 GNUNET_NO);
3430 #if DEBUG_FS
3431 /*      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3432                   "Duplicate response `%s', discarding.\n",
3433                   GNUNET_h2s (&mhash));*/
3434 #endif
3435       return GNUNET_YES; /* duplicate */
3436     case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
3437       return GNUNET_YES; /* wrong namespace */  
3438     case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
3439       GNUNET_break (0);
3440       return GNUNET_YES;
3441     case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
3442       GNUNET_break (0);
3443       return GNUNET_YES;
3444     case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
3445       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3446                   _("Unsupported block type %u\n"),
3447                   prq->type);
3448       return GNUNET_NO;
3449     }
3450   if (pr->client_request_list != NULL)
3451     {
3452       if (pr->replies_seen_size == pr->replies_seen_off)
3453         GNUNET_array_grow (pr->replies_seen,
3454                            pr->replies_seen_size,
3455                            pr->replies_seen_size * 2 + 4);      
3456       GNUNET_CRYPTO_hash (prq->data,
3457                           prq->size,
3458                           &pr->replies_seen[pr->replies_seen_off++]);         
3459       refresh_bloomfilter (pr);
3460     }
3461   if (NULL == prq->sender)
3462     {
3463 #if DEBUG_FS
3464       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3465                   "Found result for query `%s' in local datastore\n",
3466                   GNUNET_h2s (key));
3467 #endif
3468       GNUNET_STATISTICS_update (stats,
3469                                 gettext_noop ("# results found locally"),
3470                                 1,
3471                                 GNUNET_NO);      
3472     }
3473   prq->priority += pr->remaining_priority;
3474   pr->remaining_priority = 0;
3475   pr->results_found++;
3476   prq->request_found = GNUNET_YES;
3477   if (NULL != pr->client_request_list)
3478     {
3479       GNUNET_STATISTICS_update (stats,
3480                                 gettext_noop ("# replies received for local clients"),
3481                                 1,
3482                                 GNUNET_NO);
3483       cl = pr->client_request_list->client_list;
3484       msize = sizeof (struct PutMessage) + prq->size;
3485       creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage));
3486       creply->msize = msize;
3487       creply->client_list = cl;
3488       GNUNET_CONTAINER_DLL_insert_after (cl->res_head,
3489                                          cl->res_tail,
3490                                          cl->res_tail,
3491                                          creply);      
3492       pm = (struct PutMessage*) &creply[1];
3493       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3494       pm->header.size = htons (msize);
3495       pm->type = htonl (prq->type);
3496       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3497       memcpy (&pm[1], prq->data, prq->size);      
3498       if (NULL == cl->th)
3499         {
3500 #if DEBUG_FS
3501           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3502                       "Transmitting result for query `%s' to client\n",
3503                       GNUNET_h2s (key));
3504 #endif  
3505           cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
3506                                                         msize,
3507                                                         GNUNET_TIME_UNIT_FOREVER_REL,
3508                                                         &transmit_to_client,
3509                                                         cl);
3510         }
3511       GNUNET_break (cl->th != NULL);
3512       if (pr->do_remove)                
3513         {
3514           prq->finished = GNUNET_YES;
3515           destroy_pending_request (pr);         
3516         }
3517     }
3518   else
3519     {
3520       cp = pr->cp;
3521 #if DEBUG_FS
3522       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3523                   "Transmitting result for query `%s' to other peer (PID=%u)\n",
3524                   GNUNET_h2s (key),
3525                   (unsigned int) cp->pid);
3526 #endif  
3527       GNUNET_STATISTICS_update (stats,
3528                                 gettext_noop ("# replies received for other peers"),
3529                                 1,
3530                                 GNUNET_NO);
3531       msize = sizeof (struct PutMessage) + prq->size;
3532       reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
3533       reply->cont = &transmit_reply_continuation;
3534       reply->cont_cls = pr;
3535 #if SUPPORT_DELAYS
3536       art_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
3537                                                  GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3538                                                                            TTL_DECREMENT));
3539       reply->delay_until 
3540         = GNUNET_TIME_relative_to_absolute (art_delay);
3541       GNUNET_STATISTICS_update (stats,
3542                                 gettext_noop ("cummulative artificial delay introduced (ms)"),
3543                                 art_delay.abs_value,
3544                                 GNUNET_NO);
3545 #endif
3546       reply->msize = msize;
3547       reply->priority = UINT32_MAX; /* send replies first! */
3548       pm = (struct PutMessage*) &reply[1];
3549       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3550       pm->header.size = htons (msize);
3551       pm->type = htonl (prq->type);
3552       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3553       memcpy (&pm[1], prq->data, prq->size);
3554       add_to_pending_messages_for_peer (cp, reply, pr);
3555     }
3556   return GNUNET_YES;
3557 }
3558
3559
3560 /**
3561  * Iterator called on each result obtained for a DHT
3562  * operation that expects a reply
3563  *
3564  * @param cls closure
3565  * @param exp when will this value expire
3566  * @param key key of the result
3567  * @param get_path NULL-terminated array of pointers
3568  *                 to the peers on reverse GET path (or NULL if not recorded)
3569  * @param put_path NULL-terminated array of pointers
3570  *                 to the peers on the PUT path (or NULL if not recorded)
3571  * @param type type of the result
3572  * @param size number of bytes in data
3573  * @param data pointer to the result data
3574  */
3575 static void
3576 process_dht_reply (void *cls,
3577                    struct GNUNET_TIME_Absolute exp,
3578                    const GNUNET_HashCode * key,
3579                    const struct GNUNET_PeerIdentity * const *get_path,
3580                    const struct GNUNET_PeerIdentity * const *put_path,
3581                    enum GNUNET_BLOCK_Type type,
3582                    size_t size,
3583                    const void *data)
3584 {
3585   struct PendingRequest *pr = cls;
3586   struct ProcessReplyClosure prq;
3587
3588   memset (&prq, 0, sizeof (prq));
3589   prq.data = data;
3590   prq.expiration = exp;
3591   prq.size = size;  
3592   prq.type = type;
3593   process_reply (&prq, key, pr);
3594 }
3595
3596
3597
3598 /**
3599  * Continuation called to notify client about result of the
3600  * operation.
3601  *
3602  * @param cls closure
3603  * @param success GNUNET_SYSERR on failure
3604  * @param msg NULL on success, otherwise an error message
3605  */
3606 static void 
3607 put_migration_continuation (void *cls,
3608                             int success,
3609                             const char *msg)
3610 {
3611   struct GNUNET_TIME_Absolute *start = cls;
3612   struct GNUNET_TIME_Relative delay;
3613   
3614   delay = GNUNET_TIME_absolute_get_duration (*start);
3615   GNUNET_free (start);
3616   GNUNET_LOAD_update (datastore_put_load,
3617                       delay.rel_value);
3618   if (GNUNET_OK == success)
3619     return;
3620   GNUNET_STATISTICS_update (stats,
3621                             gettext_noop ("# datastore 'put' failures"),
3622                             1,
3623                             GNUNET_NO);
3624 }
3625
3626
3627 /**
3628  * Handle P2P "PUT" message.
3629  *
3630  * @param cls closure, always NULL
3631  * @param other the other peer involved (sender or receiver, NULL
3632  *        for loopback messages where we are both sender and receiver)
3633  * @param message the actual message
3634  * @param atsi performance information
3635  * @return GNUNET_OK to keep the connection open,
3636  *         GNUNET_SYSERR to close it (signal serious error)
3637  */
3638 static int
3639 handle_p2p_put (void *cls,
3640                 const struct GNUNET_PeerIdentity *other,
3641                 const struct GNUNET_MessageHeader *message,
3642                 const struct GNUNET_TRANSPORT_ATS_Information *atsi)
3643 {
3644   const struct PutMessage *put;
3645   uint16_t msize;
3646   size_t dsize;
3647   enum GNUNET_BLOCK_Type type;
3648   struct GNUNET_TIME_Absolute expiration;
3649   GNUNET_HashCode query;
3650   struct ProcessReplyClosure prq;
3651   struct GNUNET_TIME_Absolute *start;
3652   struct GNUNET_TIME_Relative block_time;  
3653   double putl;
3654   struct ConnectedPeer *cp; 
3655   struct PendingMessage *pm;
3656   struct MigrationStopMessage *msm;
3657
3658   msize = ntohs (message->size);
3659   if (msize < sizeof (struct PutMessage))
3660     {
3661       GNUNET_break_op(0);
3662       return GNUNET_SYSERR;
3663     }
3664   put = (const struct PutMessage*) message;
3665   dsize = msize - sizeof (struct PutMessage);
3666   type = ntohl (put->type);
3667   expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
3668
3669   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3670     return GNUNET_SYSERR;
3671   if (GNUNET_OK !=
3672       GNUNET_BLOCK_get_key (block_ctx,
3673                             type,
3674                             &put[1],
3675                             dsize,
3676                             &query))
3677     {
3678       GNUNET_break_op (0);
3679       return GNUNET_SYSERR;
3680     }
3681   cover_content_count++;
3682 #if DEBUG_FS
3683   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3684               "Received result for query `%s' from peer `%4s'\n",
3685               GNUNET_h2s (&query),
3686               GNUNET_i2s (other));
3687 #endif
3688   GNUNET_STATISTICS_update (stats,
3689                             gettext_noop ("# replies received (overall)"),
3690                             1,
3691                             GNUNET_NO);
3692   /* now, lookup 'query' */
3693   prq.data = (const void*) &put[1];
3694   if (other != NULL)
3695     prq.sender = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3696                                                     &other->hashPubKey);
3697   else
3698     prq.sender = NULL;
3699   prq.size = dsize;
3700   prq.type = type;
3701   prq.expiration = expiration;
3702   prq.priority = 0;
3703   prq.anonymity_level = 1;
3704   prq.finished = GNUNET_NO;
3705   prq.request_found = GNUNET_NO;
3706   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3707                                               &query,
3708                                               &process_reply,
3709                                               &prq);
3710   if (prq.sender != NULL)
3711     {
3712       prq.sender->inc_preference += CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority;
3713       change_host_trust (prq.sender, prq.priority);
3714     }
3715   if ( (GNUNET_YES == active_to_migration) &&
3716        (GNUNET_NO == test_put_load_too_high (prq.priority)) )
3717     {      
3718 #if DEBUG_FS
3719       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3720                   "Replicating result for query `%s' with priority %u\n",
3721                   GNUNET_h2s (&query),
3722                   prq.priority);
3723 #endif
3724       start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
3725       *start = GNUNET_TIME_absolute_get ();
3726       GNUNET_DATASTORE_put (dsh,
3727                             0, &query, dsize, &put[1],
3728                             type, prq.priority, 1 /* anonymity */, 
3729                             expiration, 
3730                             1 + prq.priority, MAX_DATASTORE_QUEUE,
3731                             GNUNET_CONSTANTS_SERVICE_TIMEOUT,
3732                             &put_migration_continuation, 
3733                             start);
3734     }
3735   putl = GNUNET_LOAD_get_load (datastore_put_load);
3736   if ( (GNUNET_NO == prq.request_found) &&
3737        ( (GNUNET_YES != active_to_migration) ||
3738          (putl > 2.5 * (1 + prq.priority)) ) )
3739     {
3740       cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3741                                               &other->hashPubKey);
3742       if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).rel_value < 5000)
3743         return GNUNET_OK; /* already blocked */
3744       /* We're too busy; send MigrationStop message! */
3745       if (GNUNET_YES != active_to_migration) 
3746         putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
3747       block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
3748                                                   5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3749                                                                                    (unsigned int) (60000 * putl * putl)));
3750       
3751       cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
3752       pm = GNUNET_malloc (sizeof (struct PendingMessage) + 
3753                           sizeof (struct MigrationStopMessage));
3754       pm->msize = sizeof (struct MigrationStopMessage);
3755       pm->priority = UINT32_MAX;
3756       msm = (struct MigrationStopMessage*) &pm[1];
3757       msm->header.size = htons (sizeof (struct MigrationStopMessage));
3758       msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
3759       msm->duration = GNUNET_TIME_relative_hton (block_time);
3760       add_to_pending_messages_for_peer (cp,
3761                                         pm,
3762                                         NULL);
3763     }
3764   return GNUNET_OK;
3765 }
3766
3767
3768 /**
3769  * Handle P2P "MIGRATION_STOP" message.
3770  *
3771  * @param cls closure, always NULL
3772  * @param other the other peer involved (sender or receiver, NULL
3773  *        for loopback messages where we are both sender and receiver)
3774  * @param message the actual message
3775  * @param atsi performance information
3776  * @return GNUNET_OK to keep the connection open,
3777  *         GNUNET_SYSERR to close it (signal serious error)
3778  */
3779 static int
3780 handle_p2p_migration_stop (void *cls,
3781                            const struct GNUNET_PeerIdentity *other,
3782                            const struct GNUNET_MessageHeader *message,
3783                            const struct GNUNET_TRANSPORT_ATS_Information *atsi)
3784 {
3785   struct ConnectedPeer *cp; 
3786   const struct MigrationStopMessage *msm;
3787
3788   msm = (const struct MigrationStopMessage*) message;
3789   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3790                                           &other->hashPubKey);
3791   if (cp == NULL)
3792     {
3793       GNUNET_break (0);
3794       return GNUNET_OK;
3795     }
3796   cp->migration_blocked = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration));
3797   return GNUNET_OK;
3798 }
3799
3800
3801
3802 /* **************************** P2P GET Handling ************************ */
3803
3804
3805 /**
3806  * Closure for 'check_duplicate_request_{peer,client}'.
3807  */
3808 struct CheckDuplicateRequestClosure
3809 {
3810   /**
3811    * The new request we should check if it already exists.
3812    */
3813   const struct PendingRequest *pr;
3814
3815   /**
3816    * Existing request found by the checker, NULL if none.
3817    */
3818   struct PendingRequest *have;
3819 };
3820
3821
3822 /**
3823  * Iterator over entries in the 'query_request_map' that
3824  * tries to see if we have the same request pending from
3825  * the same client already.
3826  *
3827  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
3828  * @param key current key code (query, ignored, must match)
3829  * @param value value in the hash map (a 'struct PendingRequest' 
3830  *              that already exists)
3831  * @return GNUNET_YES if we should continue to
3832  *         iterate (no match yet)
3833  *         GNUNET_NO if not (match found).
3834  */
3835 static int
3836 check_duplicate_request_client (void *cls,
3837                                 const GNUNET_HashCode * key,
3838                                 void *value)
3839 {
3840   struct CheckDuplicateRequestClosure *cdc = cls;
3841   struct PendingRequest *have = value;
3842
3843   if (have->client_request_list == NULL)
3844     return GNUNET_YES;
3845   if ( (cdc->pr->client_request_list->client_list->client == have->client_request_list->client_list->client) &&
3846        (cdc->pr != have) )
3847     {
3848       cdc->have = have;
3849       return GNUNET_NO;
3850     }
3851   return GNUNET_YES;
3852 }
3853
3854
3855 /**
3856  * We're processing (local) results for a search request
3857  * from another peer.  Pass applicable results to the
3858  * peer and if we are done either clean up (operation
3859  * complete) or forward to other peers (more results possible).
3860  *
3861  * @param cls our closure (struct LocalGetContext)
3862  * @param key key for the content
3863  * @param size number of bytes in data
3864  * @param data content stored
3865  * @param type type of the content
3866  * @param priority priority of the content
3867  * @param anonymity anonymity-level for the content
3868  * @param expiration expiration time for the content
3869  * @param uid unique identifier for the datum;
3870  *        maybe 0 if no unique identifier is available
3871  */
3872 static void
3873 process_local_reply (void *cls,
3874                      const GNUNET_HashCode * key,
3875                      size_t size,
3876                      const void *data,
3877                      enum GNUNET_BLOCK_Type type,
3878                      uint32_t priority,
3879                      uint32_t anonymity,
3880                      struct GNUNET_TIME_Absolute
3881                      expiration, 
3882                      uint64_t uid)
3883 {
3884   struct PendingRequest *pr = cls;
3885   struct ProcessReplyClosure prq;
3886   struct CheckDuplicateRequestClosure cdrc;
3887   GNUNET_HashCode query;
3888   unsigned int old_rf;
3889   
3890   if (NULL == key)
3891     {
3892 #if DEBUG_FS > 1
3893       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3894                   "Done processing local replies, forwarding request to other peers.\n");
3895 #endif
3896       pr->qe = NULL;
3897       if (pr->client_request_list != NULL)
3898         {
3899           GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
3900                                       GNUNET_YES);
3901           /* Figure out if this is a duplicate request and possibly
3902              merge 'struct PendingRequest' entries */
3903           cdrc.have = NULL;
3904           cdrc.pr = pr;
3905           GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3906                                                       &pr->query,
3907                                                       &check_duplicate_request_client,
3908                                                       &cdrc);
3909           if (cdrc.have != NULL)
3910             {
3911 #if DEBUG_FS
3912               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3913                           "Received request for block `%s' twice from client, will only request once.\n",
3914                           GNUNET_h2s (&pr->query));
3915 #endif
3916               
3917               destroy_pending_request (pr);
3918               return;
3919             }
3920         }
3921       if (pr->local_only == GNUNET_YES)
3922         {
3923           destroy_pending_request (pr);
3924           return;
3925         }
3926       /* no more results */
3927       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
3928         pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task,
3929                                              pr);      
3930       return;
3931     }
3932 #if DEBUG_FS
3933   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3934               "New local response to `%s' of type %u.\n",
3935               GNUNET_h2s (key),
3936               type);
3937 #endif
3938   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3939     {
3940 #if DEBUG_FS
3941       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3942                   "Found ONDEMAND block, performing on-demand encoding\n");
3943 #endif
3944       GNUNET_STATISTICS_update (stats,
3945                                 gettext_noop ("# on-demand blocks matched requests"),
3946                                 1,
3947                                 GNUNET_NO);
3948       if (GNUNET_OK != 
3949           GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, 
3950                                             anonymity, expiration, uid, 
3951                                             &process_local_reply,
3952                                             pr))
3953       if (pr->qe != NULL)
3954         {
3955           GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3956         }
3957       return;
3958     }
3959   old_rf = pr->results_found;
3960   memset (&prq, 0, sizeof (prq));
3961   prq.data = data;
3962   prq.expiration = expiration;
3963   prq.size = size;  
3964   if (GNUNET_OK != 
3965       GNUNET_BLOCK_get_key (block_ctx,
3966                             type,
3967                             data,
3968                             size,
3969                             &query))
3970     {
3971       GNUNET_break (0);
3972       GNUNET_DATASTORE_remove (dsh,
3973                                key,
3974                                size, data,
3975                                -1, -1, 
3976                                GNUNET_TIME_UNIT_FOREVER_REL,
3977                                NULL, NULL);
3978       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3979       return;
3980     }
3981   prq.type = type;
3982   prq.priority = priority;  
3983   prq.finished = GNUNET_NO;
3984   prq.request_found = GNUNET_NO;
3985   prq.anonymity_level = anonymity;
3986   if ( (old_rf == 0) &&
3987        (pr->results_found == 0) )
3988     update_datastore_delays (pr->start_time);
3989   process_reply (&prq, key, pr);
3990   if (prq.finished == GNUNET_YES)
3991     return;
3992   if (pr->qe == NULL)
3993     return; /* done here */
3994   if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
3995     {
3996       pr->local_only = GNUNET_YES; /* do not forward */
3997       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3998       return;
3999     }
4000   if ( (pr->client_request_list == NULL) &&
4001        ( (GNUNET_YES == test_get_load_too_high (0)) ||
4002          (pr->results_found > 5 + 2 * pr->priority) ) )
4003     {
4004 #if DEBUG_FS > 2
4005       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4006                   "Load too high, done with request\n");
4007 #endif
4008       GNUNET_STATISTICS_update (stats,
4009                                 gettext_noop ("# processing result set cut short due to load"),
4010                                 1,
4011                                 GNUNET_NO);
4012       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
4013       return;
4014     }
4015   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
4016 }
4017
4018
4019 /**
4020  * We've received a request with the specified priority.  Bound it
4021  * according to how much we trust the given peer.
4022  * 
4023  * @param prio_in requested priority
4024  * @param cp the peer making the request
4025  * @return effective priority
4026  */
4027 static int32_t
4028 bound_priority (uint32_t prio_in,
4029                 struct ConnectedPeer *cp)
4030 {
4031 #define N ((double)128.0)
4032   uint32_t ret;
4033   double rret;
4034   int ld;
4035
4036   ld = test_get_load_too_high (0);
4037   if (ld == GNUNET_SYSERR)
4038     {
4039       GNUNET_STATISTICS_update (stats,
4040                                 gettext_noop ("# requests done for free (low load)"),
4041                                 1,
4042                                 GNUNET_NO);
4043       return 0; /* excess resources */
4044     }
4045   if (prio_in > INT32_MAX)
4046     prio_in = INT32_MAX;
4047   ret = - change_host_trust (cp, - (int) prio_in);
4048   if (ret > 0)
4049     {
4050       if (ret > current_priorities + N)
4051         rret = current_priorities + N;
4052       else
4053         rret = ret;
4054       current_priorities 
4055         = (current_priorities * (N-1) + rret)/N;
4056     }
4057   if ( (ld == GNUNET_YES) && (ret > 0) )
4058     {
4059       /* try with charging */
4060       ld = test_get_load_too_high (ret);
4061     }
4062   if (ld == GNUNET_YES)
4063     {
4064       GNUNET_STATISTICS_update (stats,
4065                                 gettext_noop ("# request dropped, priority insufficient"),
4066                                 1,
4067                                 GNUNET_NO);
4068       /* undo charge */
4069       change_host_trust (cp, (int) ret);
4070       return -1; /* not enough resources */
4071     }
4072   else
4073     {
4074       GNUNET_STATISTICS_update (stats,
4075                                 gettext_noop ("# requests done for a price (normal load)"),
4076                                 1,
4077                                 GNUNET_NO);
4078     }
4079 #undef N
4080   return ret;
4081 }
4082
4083
4084 /**
4085  * Iterator over entries in the 'query_request_map' that
4086  * tries to see if we have the same request pending from
4087  * the same peer already.
4088  *
4089  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
4090  * @param key current key code (query, ignored, must match)
4091  * @param value value in the hash map (a 'struct PendingRequest' 
4092  *              that already exists)
4093  * @return GNUNET_YES if we should continue to
4094  *         iterate (no match yet)
4095  *         GNUNET_NO if not (match found).
4096  */
4097 static int
4098 check_duplicate_request_peer (void *cls,
4099                               const GNUNET_HashCode * key,
4100                               void *value)
4101 {
4102   struct CheckDuplicateRequestClosure *cdc = cls;
4103   struct PendingRequest *have = value;
4104
4105   if (cdc->pr->target_pid == have->target_pid)
4106     {
4107       cdc->have = have;
4108       return GNUNET_NO;
4109     }
4110   return GNUNET_YES;
4111 }
4112
4113
4114 /**
4115  * Handle P2P "GET" request.
4116  *
4117  * @param cls closure, always NULL
4118  * @param other the other peer involved (sender or receiver, NULL
4119  *        for loopback messages where we are both sender and receiver)
4120  * @param message the actual message
4121  * @param atsi performance information
4122  * @return GNUNET_OK to keep the connection open,
4123  *         GNUNET_SYSERR to close it (signal serious error)
4124  */
4125 static int
4126 handle_p2p_get (void *cls,
4127                 const struct GNUNET_PeerIdentity *other,
4128                 const struct GNUNET_MessageHeader *message,
4129                 const struct GNUNET_TRANSPORT_ATS_Information *atsi)
4130 {
4131   struct PendingRequest *pr;
4132   struct ConnectedPeer *cp;
4133   struct ConnectedPeer *cps;
4134   struct CheckDuplicateRequestClosure cdc;
4135   struct GNUNET_TIME_Relative timeout;
4136   uint16_t msize;
4137   const struct GetMessage *gm;
4138   unsigned int bits;
4139   const GNUNET_HashCode *opt;
4140   uint32_t bm;
4141   size_t bfsize;
4142   uint32_t ttl_decrement;
4143   int32_t priority;
4144   enum GNUNET_BLOCK_Type type;
4145   int have_ns;
4146
4147   msize = ntohs(message->size);
4148   if (msize < sizeof (struct GetMessage))
4149     {
4150       GNUNET_break_op (0);
4151       return GNUNET_SYSERR;
4152     }
4153   gm = (const struct GetMessage*) message;
4154 #if DEBUG_FS
4155   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4156               "Received request for `%s'\n",
4157               GNUNET_h2s (&gm->query));
4158 #endif
4159   type = ntohl (gm->type);
4160   bm = ntohl (gm->hash_bitmap);
4161   bits = 0;
4162   while (bm > 0)
4163     {
4164       if (1 == (bm & 1))
4165         bits++;
4166       bm >>= 1;
4167     }
4168   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
4169     {
4170       GNUNET_break_op (0);
4171       return GNUNET_SYSERR;
4172     }  
4173   opt = (const GNUNET_HashCode*) &gm[1];
4174   bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (GNUNET_HashCode);
4175   /* bfsize must be power of 2, check! */
4176   if (0 != ( (bfsize - 1) & bfsize))
4177     {
4178       GNUNET_break_op (0);
4179       return GNUNET_SYSERR;
4180     }
4181   cover_query_count++;
4182   bm = ntohl (gm->hash_bitmap);
4183   bits = 0;
4184   cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
4185                                            &other->hashPubKey);
4186   if (NULL == cps)
4187     {
4188       /* peer must have just disconnected */
4189       GNUNET_STATISTICS_update (stats,
4190                                 gettext_noop ("# requests dropped due to initiator not being connected"),
4191                                 1,
4192                                 GNUNET_NO);
4193       return GNUNET_SYSERR;
4194     }
4195   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
4196     cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
4197                                             &opt[bits++]);
4198   else
4199     cp = cps;
4200   if (cp == NULL)
4201     {
4202 #if DEBUG_FS
4203       if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
4204         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4205                     "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
4206                     GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
4207       
4208       else
4209         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4210                     "Failed to find peer `%4s' in connection set. Dropping query.\n",
4211                     GNUNET_i2s (other));
4212 #endif
4213       GNUNET_STATISTICS_update (stats,
4214                                 gettext_noop ("# requests dropped due to missing reverse route"),
4215                                 1,
4216                                 GNUNET_NO);
4217      /* FIXME: try connect? */
4218       return GNUNET_OK;
4219     }
4220   /* note that we can really only check load here since otherwise
4221      peers could find out that we are overloaded by not being
4222      disconnected after sending us a malformed query... */
4223   priority = bound_priority (ntohl (gm->priority), cps);
4224   if (priority < 0)
4225     {
4226 #if DEBUG_FS
4227       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4228                   "Dropping query from `%s', this peer is too busy.\n",
4229                   GNUNET_i2s (other));
4230 #endif
4231       return GNUNET_OK;
4232     }
4233 #if DEBUG_FS 
4234   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4235               "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
4236               GNUNET_h2s (&gm->query),
4237               (unsigned int) type,
4238               GNUNET_i2s (other),
4239               (unsigned int) bm);
4240 #endif
4241   have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
4242   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
4243                       (have_ns ? sizeof(GNUNET_HashCode) : 0));
4244   if (have_ns)
4245     {
4246       pr->namespace = (GNUNET_HashCode*) &pr[1];
4247       memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
4248     }
4249   if ( (GNUNET_LOAD_get_load (cp->transmission_delay) > 3 * (1 + priority)) ||
4250        (GNUNET_LOAD_get_average (cp->transmission_delay) > 
4251         GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value * 2 + GNUNET_LOAD_get_average (rt_entry_lifetime)) )
4252     {
4253       /* don't have BW to send to peer, or would likely take longer than we have for it,
4254          so at best indirect the query */
4255       priority = 0;
4256       pr->forward_only = GNUNET_YES;
4257     }
4258   pr->type = type;
4259   pr->mingle = ntohl (gm->filter_mutator);
4260   if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
4261     pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
4262   pr->anonymity_level = 1;
4263   pr->priority = (uint32_t) priority;
4264   pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
4265   pr->query = gm->query;
4266   /* decrement ttl (always) */
4267   ttl_decrement = 2 * TTL_DECREMENT +
4268     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
4269                               TTL_DECREMENT);
4270   if ( (pr->ttl < 0) &&
4271        (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
4272     {
4273 #if DEBUG_FS
4274       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4275                   "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
4276                   GNUNET_i2s (other),
4277                   pr->ttl,
4278                   ttl_decrement);
4279 #endif
4280       GNUNET_STATISTICS_update (stats,
4281                                 gettext_noop ("# requests dropped due TTL underflow"),
4282                                 1,
4283                                 GNUNET_NO);
4284       /* integer underflow => drop (should be very rare)! */      
4285       GNUNET_free (pr);
4286       return GNUNET_OK;
4287     } 
4288   pr->ttl -= ttl_decrement;
4289   pr->start_time = GNUNET_TIME_absolute_get ();
4290
4291   /* get bloom filter */
4292   if (bfsize > 0)
4293     {
4294       pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
4295                                                   bfsize,
4296                                                   BLOOMFILTER_K);
4297       pr->bf_size = bfsize;
4298     }
4299   cdc.have = NULL;
4300   cdc.pr = pr;
4301   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
4302                                               &gm->query,
4303                                               &check_duplicate_request_peer,
4304                                               &cdc);
4305   if (cdc.have != NULL)
4306     {
4307       if (cdc.have->start_time.abs_value + cdc.have->ttl >=
4308           pr->start_time.abs_value + pr->ttl)
4309         {
4310           /* existing request has higher TTL, drop new one! */
4311           cdc.have->priority += pr->priority;
4312           destroy_pending_request (pr);
4313 #if DEBUG_FS
4314           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4315                       "Have existing request with higher TTL, dropping new request.\n",
4316                       GNUNET_i2s (other));
4317 #endif
4318           GNUNET_STATISTICS_update (stats,
4319                                     gettext_noop ("# requests dropped due to higher-TTL request"),
4320                                     1,
4321                                     GNUNET_NO);
4322           return GNUNET_OK;
4323         }
4324       else
4325         {
4326           /* existing request has lower TTL, drop old one! */
4327           pr->priority += cdc.have->priority;
4328           /* Possible optimization: if we have applicable pending
4329              replies in 'cdc.have', we might want to move those over
4330              (this is a really rare special-case, so it is not clear
4331              that this would be worth it) */
4332           destroy_pending_request (cdc.have);
4333           /* keep processing 'pr'! */
4334         }
4335     }
4336
4337   pr->cp = cp;
4338   GNUNET_break (GNUNET_OK ==
4339                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
4340                                                    &gm->query,
4341                                                    pr,
4342                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4343   GNUNET_break (GNUNET_OK ==
4344                 GNUNET_CONTAINER_multihashmap_put (peer_request_map,
4345                                                    &other->hashPubKey,
4346                                                    pr,
4347                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4348   
4349   pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
4350                                             pr,
4351                                             pr->start_time.abs_value + pr->ttl);
4352
4353   GNUNET_STATISTICS_update (stats,
4354                             gettext_noop ("# P2P searches received"),
4355                             1,
4356                             GNUNET_NO);
4357   GNUNET_STATISTICS_update (stats,
4358                             gettext_noop ("# P2P searches active"),
4359                             1,
4360                             GNUNET_NO);
4361
4362   /* calculate change in traffic preference */
4363   cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
4364   /* process locally */
4365   if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
4366     type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
4367   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
4368                                            (pr->priority + 1)); 
4369   if (GNUNET_YES != pr->forward_only)
4370     {
4371 #if DEBUG_FS
4372       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4373                   "Handing request for `%s' to datastore\n",
4374                   GNUNET_h2s (&gm->query));
4375 #endif
4376       pr->qe = GNUNET_DATASTORE_get (dsh,
4377                                      &gm->query,
4378                                      type,                             
4379                                      pr->priority + 1,
4380                                      MAX_DATASTORE_QUEUE,                                
4381                                      timeout,
4382                                      &process_local_reply,
4383                                      pr);
4384       if (NULL == pr->qe)
4385         {
4386           GNUNET_STATISTICS_update (stats,
4387                                     gettext_noop ("# requests dropped by datastore (queue length limit)"),
4388                                     1,
4389                                     GNUNET_NO);
4390         }
4391     }
4392   else
4393     {
4394       GNUNET_STATISTICS_update (stats,
4395                                 gettext_noop ("# requests forwarded due to high load"),
4396                                 1,
4397                                 GNUNET_NO);
4398     }
4399
4400   /* Are multiple results possible (and did we look locally)?  If so, start processing remotely now! */
4401   switch (pr->type)
4402     {
4403     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
4404     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
4405       /* only one result, wait for datastore */
4406       if (GNUNET_YES != pr->forward_only)
4407         {
4408           GNUNET_STATISTICS_update (stats,
4409                                     gettext_noop ("# requests not instantly forwarded (waiting for datastore)"),
4410                                     1,
4411                                     GNUNET_NO);
4412           break;
4413         }
4414     default:
4415       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
4416         pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task,
4417                                              pr);
4418     }
4419
4420   /* make sure we don't track too many requests */
4421   if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
4422     {
4423       pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
4424       GNUNET_assert (pr != NULL);
4425       destroy_pending_request (pr);
4426     }
4427   return GNUNET_OK;
4428 }
4429
4430
4431 /* **************************** CS GET Handling ************************ */
4432
4433
4434 /**
4435  * Handle START_SEARCH-message (search request from client).
4436  *
4437  * @param cls closure
4438  * @param client identification of the client
4439  * @param message the actual message
4440  */
4441 static void
4442 handle_start_search (void *cls,
4443                      struct GNUNET_SERVER_Client *client,
4444                      const struct GNUNET_MessageHeader *message)
4445 {
4446   static GNUNET_HashCode all_zeros;
4447   const struct SearchMessage *sm;
4448   struct ClientList *cl;
4449   struct ClientRequestList *crl;
4450   struct PendingRequest *pr;
4451   uint16_t msize;
4452   unsigned int sc;
4453   enum GNUNET_BLOCK_Type type;
4454
4455   msize = ntohs (message->size);
4456   if ( (msize < sizeof (struct SearchMessage)) ||
4457        (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
4458     {
4459       GNUNET_break (0);
4460       GNUNET_SERVER_receive_done (client,
4461                                   GNUNET_SYSERR);
4462       return;
4463     }
4464   GNUNET_STATISTICS_update (stats,
4465                             gettext_noop ("# client searches received"),
4466                             1,
4467                             GNUNET_NO);
4468   sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
4469   sm = (const struct SearchMessage*) message;
4470   type = ntohl (sm->type);
4471 #if DEBUG_FS
4472   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4473               "Received request for `%s' of type %u from local client\n",
4474               GNUNET_h2s (&sm->query),
4475               (unsigned int) type);
4476 #endif
4477   cl = client_list;
4478   while ( (cl != NULL) &&
4479           (cl->client != client) )
4480     cl = cl->next;
4481   if (cl == NULL)
4482     {
4483       cl = GNUNET_malloc (sizeof (struct ClientList));
4484       cl->client = client;
4485       GNUNET_SERVER_client_keep (client);
4486       cl->next = client_list;
4487       client_list = cl;
4488     }
4489   /* detect duplicate KBLOCK requests */
4490   if ( (type == GNUNET_BLOCK_TYPE_FS_KBLOCK) ||
4491        (type == GNUNET_BLOCK_TYPE_FS_NBLOCK) ||
4492        (type == GNUNET_BLOCK_TYPE_ANY) )
4493     {
4494       crl = cl->rl_head;
4495       while ( (crl != NULL) &&
4496               ( (0 != memcmp (&crl->req->query,
4497                               &sm->query,
4498                               sizeof (GNUNET_HashCode))) ||
4499                 (crl->req->type != type) ) )
4500         crl = crl->next;
4501       if (crl != NULL)  
4502         { 
4503 #if DEBUG_FS
4504           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4505                       "Have existing request, merging content-seen lists.\n");
4506 #endif
4507           pr = crl->req;
4508           /* Duplicate request (used to send long list of
4509              known/blocked results); merge 'pr->replies_seen'
4510              and update bloom filter */
4511           GNUNET_array_grow (pr->replies_seen,
4512                              pr->replies_seen_size,
4513                              pr->replies_seen_off + sc);
4514           memcpy (&pr->replies_seen[pr->replies_seen_off],
4515                   &sm[1],
4516                   sc * sizeof (GNUNET_HashCode));
4517           pr->replies_seen_off += sc;
4518           refresh_bloomfilter (pr);
4519           GNUNET_STATISTICS_update (stats,
4520                                     gettext_noop ("# client searches updated (merged content seen list)"),
4521                                     1,
4522                                     GNUNET_NO);
4523           GNUNET_SERVER_receive_done (client,
4524                                       GNUNET_OK);
4525           return;
4526         }
4527     }
4528   GNUNET_STATISTICS_update (stats,
4529                             gettext_noop ("# client searches active"),
4530                             1,
4531                             GNUNET_NO);
4532   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
4533                       ((type == GNUNET_BLOCK_TYPE_FS_SBLOCK) ? sizeof(GNUNET_HashCode) : 0));
4534   crl = GNUNET_malloc (sizeof (struct ClientRequestList));
4535   memset (crl, 0, sizeof (struct ClientRequestList));
4536   crl->client_list = cl;
4537   GNUNET_CONTAINER_DLL_insert (cl->rl_head,
4538                                cl->rl_tail,
4539                                crl);  
4540   crl->req = pr;
4541   pr->type = type;
4542   pr->client_request_list = crl;
4543   GNUNET_array_grow (pr->replies_seen,
4544                      pr->replies_seen_size,
4545                      sc);
4546   memcpy (pr->replies_seen,
4547           &sm[1],
4548           sc * sizeof (GNUNET_HashCode));
4549   pr->replies_seen_off = sc;
4550   pr->anonymity_level = ntohl (sm->anonymity_level); 
4551   pr->start_time = GNUNET_TIME_absolute_get ();
4552   refresh_bloomfilter (pr);
4553   pr->query = sm->query;
4554   if (0 == (1 & ntohl (sm->options)))
4555     pr->local_only = GNUNET_NO;
4556   else
4557     pr->local_only = GNUNET_YES;
4558   switch (type)
4559     {
4560     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
4561     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
4562       if (0 != memcmp (&sm->target,
4563                        &all_zeros,
4564                        sizeof (GNUNET_HashCode)))
4565         pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target);
4566       break;
4567     case GNUNET_BLOCK_TYPE_FS_SBLOCK:
4568       pr->namespace = (GNUNET_HashCode*) &pr[1];
4569       memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode));
4570       break;
4571     default:
4572       break;
4573     }
4574   GNUNET_break (GNUNET_OK ==
4575                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
4576                                                    &sm->query,
4577                                                    pr,
4578                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4579   if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
4580     type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */
4581   pr->qe = GNUNET_DATASTORE_get (dsh,
4582                                  &sm->query,
4583                                  type,
4584                                  -3, -1,
4585                                  GNUNET_CONSTANTS_SERVICE_TIMEOUT,                             
4586                                  &process_local_reply,
4587                                  pr);
4588 }
4589
4590
4591 /* **************************** Startup ************************ */
4592
4593 /**
4594  * Process fs requests.
4595  *
4596  * @param server the initialized server
4597  * @param c configuration to use
4598  */
4599 static int
4600 main_init (struct GNUNET_SERVER_Handle *server,
4601            const struct GNUNET_CONFIGURATION_Handle *c)
4602 {
4603   static const struct GNUNET_CORE_MessageHandler p2p_handlers[] =
4604     {
4605       { &handle_p2p_get, 
4606         GNUNET_MESSAGE_TYPE_FS_GET, 0 },
4607       { &handle_p2p_put, 
4608         GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
4609       { &handle_p2p_migration_stop, 
4610         GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP,
4611         sizeof (struct MigrationStopMessage) },
4612       { NULL, 0, 0 }
4613     };
4614   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
4615     {&GNUNET_FS_handle_index_start, NULL, 
4616      GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
4617     {&GNUNET_FS_handle_index_list_get, NULL, 
4618      GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
4619     {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
4620      sizeof (struct UnindexMessage) },
4621     {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
4622      0 },
4623     {NULL, NULL, 0, 0}
4624   };
4625   unsigned long long enc = 128;
4626
4627   cfg = c;
4628   stats = GNUNET_STATISTICS_create ("fs", cfg);
4629   min_migration_delay = GNUNET_TIME_UNIT_SECONDS;
4630   if ( (GNUNET_OK !=
4631         GNUNET_CONFIGURATION_get_value_number (cfg,
4632                                                "fs",
4633                                                "MAX_PENDING_REQUESTS",
4634                                                &max_pending_requests)) ||
4635        (GNUNET_OK !=
4636         GNUNET_CONFIGURATION_get_value_number (cfg,
4637                                                "fs",
4638                                                "EXPECTED_NEIGHBOUR_COUNT",
4639                                                &enc)) ||
4640        (GNUNET_OK != 
4641         GNUNET_CONFIGURATION_get_value_time (cfg,
4642                                              "fs",
4643                                              "MIN_MIGRATION_DELAY",
4644                                              &min_migration_delay)) )
4645     {
4646       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4647                   _("Configuration fails to specify certain parameters, assuming default values."));
4648     }
4649   connected_peers = GNUNET_CONTAINER_multihashmap_create (enc); 
4650   query_request_map = GNUNET_CONTAINER_multihashmap_create (max_pending_requests);
4651   rt_entry_lifetime = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_FOREVER_REL);
4652   peer_request_map = GNUNET_CONTAINER_multihashmap_create (enc);
4653   requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
4654   core = GNUNET_CORE_connect (cfg,
4655                               1, /* larger? */
4656                               NULL,
4657                               NULL,
4658                               &peer_connect_handler,
4659                               &peer_disconnect_handler,
4660                               &peer_status_handler,
4661                               NULL, GNUNET_NO,
4662                               NULL, GNUNET_NO,
4663                               p2p_handlers);
4664   if (NULL == core)
4665     {
4666       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
4667                   _("Failed to connect to `%s' service.\n"),
4668                   "core");
4669       GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
4670       connected_peers = NULL;
4671       GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
4672       query_request_map = NULL;
4673       GNUNET_LOAD_value_free (rt_entry_lifetime);
4674       rt_entry_lifetime = NULL;
4675       GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
4676       requests_by_expiration_heap = NULL;
4677       GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
4678       peer_request_map = NULL;
4679       if (dsh != NULL)
4680         {
4681           GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4682           dsh = NULL;
4683         }
4684       return GNUNET_SYSERR;
4685     }
4686   if (active_from_migration) 
4687     {
4688       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4689                   _("Content migration is enabled, will start to gather data\n"));
4690       consider_migration_gathering ();
4691     }
4692   consider_dht_put_gathering (NULL);
4693   GNUNET_SERVER_disconnect_notify (server, 
4694                                    &handle_client_disconnect,
4695                                    NULL);
4696   GNUNET_assert (GNUNET_OK ==
4697                  GNUNET_CONFIGURATION_get_value_filename (cfg,
4698                                                           "fs",
4699                                                           "TRUST",
4700                                                           &trustDirectory));
4701   GNUNET_DISK_directory_create (trustDirectory);
4702   GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_HIGH,
4703                                       &cron_flush_trust, NULL);
4704
4705
4706   GNUNET_SERVER_add_handlers (server, handlers);
4707   cover_age_task = GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY,
4708                                                  &age_cover_counters,
4709                                                  NULL);
4710   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
4711                                 &shutdown_task,
4712                                 NULL);
4713   return GNUNET_OK;
4714 }
4715
4716
4717 /**
4718  * Process fs requests.
4719  *
4720  * @param cls closure
4721  * @param server the initialized server
4722  * @param cfg configuration to use
4723  */
4724 static void
4725 run (void *cls,
4726      struct GNUNET_SERVER_Handle *server,
4727      const struct GNUNET_CONFIGURATION_Handle *cfg)
4728 {
4729   active_to_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
4730                                                               "FS",
4731                                                               "CONTENT_CACHING");
4732   active_from_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
4733                                                                 "FS",
4734                                                                 "CONTENT_PUSHING");
4735   dsh = GNUNET_DATASTORE_connect (cfg);
4736   if (dsh == NULL)
4737     {
4738       GNUNET_SCHEDULER_shutdown ();
4739       return;
4740     }
4741   datastore_get_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
4742   datastore_put_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
4743   block_cfg = GNUNET_CONFIGURATION_create ();
4744   GNUNET_CONFIGURATION_set_value_string (block_cfg,
4745                                          "block",
4746                                          "PLUGINS",
4747                                          "fs");
4748   block_ctx = GNUNET_BLOCK_context_create (block_cfg);
4749   GNUNET_assert (NULL != block_ctx);
4750   dht_handle = GNUNET_DHT_connect (cfg,
4751                                    FS_DHT_HT_SIZE);
4752   if ( (GNUNET_OK != GNUNET_FS_indexing_init (cfg, dsh)) ||
4753        (GNUNET_OK != main_init (server, cfg)) )
4754     {    
4755       GNUNET_SCHEDULER_shutdown ();
4756       GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4757       dsh = NULL;
4758       GNUNET_DHT_disconnect (dht_handle);
4759       dht_handle = NULL;
4760       GNUNET_BLOCK_context_destroy (block_ctx);
4761       block_ctx = NULL;
4762       GNUNET_CONFIGURATION_destroy (block_cfg);
4763       block_cfg = NULL;
4764       GNUNET_LOAD_value_free (datastore_get_load);
4765       datastore_get_load = NULL;
4766       GNUNET_LOAD_value_free (datastore_put_load);
4767       datastore_put_load = NULL;
4768       return;   
4769     }
4770 }
4771
4772
4773 /**
4774  * The main function for the fs service.
4775  *
4776  * @param argc number of arguments from the command line
4777  * @param argv command line arguments
4778  * @return 0 ok, 1 on error
4779  */
4780 int
4781 main (int argc, char *const *argv)
4782 {
4783   return (GNUNET_OK ==
4784           GNUNET_SERVICE_run (argc,
4785                               argv,
4786                               "fs",
4787                               GNUNET_SERVICE_OPTION_NONE,
4788                               &run, NULL)) ? 0 : 1;
4789 }
4790
4791 /* end of gnunet-service-fs.c */