code already there, doh
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs.c
23  * @brief gnunet anonymity protocol implementation
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include <float.h>
28 #include "gnunet_constants.h"
29 #include "gnunet_core_service.h"
30 #include "gnunet_dht_service.h"
31 #include "gnunet_datastore_service.h"
32 #include "gnunet_load_lib.h"
33 #include "gnunet_peer_lib.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_signatures.h"
36 #include "gnunet_statistics_service.h"
37 #include "gnunet_transport_service.h"
38 #include "gnunet_util_lib.h"
39 #include "gnunet-service-fs_indexing.h"
40 #include "fs.h"
41
42 #define DEBUG_FS GNUNET_NO
43
44 /**
45  * Should we introduce random latency in processing?  Required for proper
46  * implementation of GAP, but can be disabled for performance evaluation of
47  * the basic routing algorithm.
48  *
49  * Note that with delays enabled, performance can be significantly lower
50  * (several orders of magnitude in 2-peer test runs); if you want to
51  * measure throughput of other components, set this to NO.  Also, you
52  * might want to consider changing 'RETRY_PROBABILITY_INV' to 1 for
53  * a rather wasteful mode of operation (that might still get the highest
54  * throughput overall).
55  *
56  * Performance measurements (for 50 MB file, 2 peers):
57  *
58  * - Without delays: 3300 kb/s
59  * - With    delays:  101 kb/s
60  */
61 #define SUPPORT_DELAYS GNUNET_NO
62
63 /**
64  * Size for the hash map for DHT requests from the FS
65  * service.  Should be about the number of concurrent
66  * DHT requests we plan to make.
67  */
68 #define FS_DHT_HT_SIZE 1024
69
70 /**
71  * At what frequency should our datastore load decrease
72  * automatically (since if we don't use it, clearly the
73  * load must be going down).
74  */
75 #define DATASTORE_LOAD_AUTODECLINE GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 250)
76
77 /**
78  * How often do we flush trust values to disk?
79  */
80 #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
81
82 /**
83  * How quickly do we age cover traffic?  At the given 
84  * time interval, remaining cover traffic counters are
85  * decremented by 1/16th.
86  */
87 #define COVER_AGE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
88
89 /**
90  * How often do we at most PUT content into the DHT?
91  */
92 #define MAX_DHT_PUT_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
93
94 /**
95  * Inverse of the probability that we will submit the same query
96  * to the same peer again.  If the same peer already got the query
97  * repeatedly recently, the probability is multiplied by the inverse
98  * of this number each time.  Note that we only try about every TTL_DECREMENT/2
99  * plus MAX_CORK_DELAY (so roughly every 3.5s).
100  *
101  * Note that this factor is a key influence to performance in small
102  * networks (especially test networks of 2 peers) because if there is
103  * only a single peer with the data, this value will determine how
104  * soon we might re-try.  For example, a value of 3 can result in 
105  * 1.7 MB/s transfer rates for a 10 MB file when a value of 1 would
106  * give us 5 MB/s.  OTOH, obviously re-trying the same peer can be
107  * rather inefficient in larger networks, hence picking 1 is in 
108  * general not the best choice.
109  *
110  * Performance measurements (for 50 MB file, 2 peers, no delays):
111  *
112  * - 1: 3300 kb/s (consistently)
113  * - 3: 2046 kb/s, 754 kb/s, 3490 kb/s
114  * - 5:  759 kb/s, 968 kb/s, 1160 kb/s
115  *
116  * Note that this does NOT mean that the value should be 1 since
117  * a 2-peer network is far from representative here (and this fails
118  * to take into consideration bandwidth wasted by repeatedly 
119  * sending queries to peers that don't have the content).  Also,
120  * it is expected that higher values lead to more inconsistent
121  * measurements since this only affects lost messages towards the
122  * end of the download.
123  *
124  * Finally, we should probably consider changing this and making
125  * it dependent on the number of connected peers or a related
126  * metric (bad magic constants...).
127  */
128 #define RETRY_PROBABILITY_INV 1
129
130 /**
131  * What is the maximum delay for a P2P FS message (in our interaction
132  * with core)?  FS-internal delays are another story.  The value is
133  * chosen based on the 32k block size.  Given that peers typcially
134  * have at least 1 kb/s bandwidth, 45s waits give us a chance to
135  * transmit one message even to the lowest-bandwidth peers.
136  */
137 #define MAX_TRANSMIT_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 45)
138
139 /**
140  * Maximum number of requests (from other peers, overall) that we're
141  * willing to have pending at any given point in time.  Can be changed
142  * via the configuration file (32k is just the default).
143  */
144 static unsigned long long max_pending_requests = (32 * 1024);
145
146 /**
147  * Information we keep for each pending reply.  The
148  * actual message follows at the end of this struct.
149  */
150 struct PendingMessage;
151
152 /**
153  * Function called upon completion of a transmission.
154  *
155  * @param cls closure
156  * @param pid ID of receiving peer, 0 on transmission error
157  */
158 typedef void (*TransmissionContinuation)(void * cls, 
159                                          GNUNET_PEER_Id tpid);
160
161
162 /**
163  * Information we keep for each pending message (GET/PUT).  The
164  * actual message follows at the end of this struct.
165  */
166 struct PendingMessage
167 {
168   /**
169    * This is a doubly-linked list of messages to the same peer.
170    */
171   struct PendingMessage *next;
172
173   /**
174    * This is a doubly-linked list of messages to the same peer.
175    */
176   struct PendingMessage *prev;
177
178   /**
179    * Entry in pending message list for this pending message.
180    */ 
181   struct PendingMessageList *pml;  
182
183   /**
184    * Function to call immediately once we have transmitted this
185    * message.
186    */
187   TransmissionContinuation cont;
188
189   /**
190    * Closure for cont.
191    */
192   void *cont_cls;
193
194   /**
195    * Do not transmit this pending message until this deadline.
196    */
197   struct GNUNET_TIME_Absolute delay_until;
198
199   /**
200    * Size of the reply; actual reply message follows
201    * at the end of this struct.
202    */
203   size_t msize;
204   
205   /**
206    * How important is this message for us?
207    */
208   uint32_t priority;
209  
210 };
211
212
213 /**
214  * Information about a peer that we are connected to.
215  * We track data that is useful for determining which
216  * peers should receive our requests.  We also keep
217  * a list of messages to transmit to this peer.
218  */
219 struct ConnectedPeer
220 {
221
222   /**
223    * List of the last clients for which this peer successfully
224    * answered a query.
225    */
226   struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
227
228   /**
229    * List of the last PIDs for which
230    * this peer successfully answered a query;
231    * We use 0 to indicate no successful reply.
232    */
233   GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
234
235   /**
236    * Average delay between sending the peer a request and
237    * getting a reply (only calculated over the requests for
238    * which we actually got a reply).   Calculated
239    * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
240    */ 
241   struct GNUNET_TIME_Relative avg_delay;
242
243   /**
244    * Point in time until which this peer does not want us to migrate content
245    * to it.
246    */
247   struct GNUNET_TIME_Absolute migration_blocked;
248
249   /**
250    * Time until when we blocked this peer from migrating
251    * data to us.
252    */
253   struct GNUNET_TIME_Absolute last_migration_block;
254
255   /**
256    * Transmission times for the last MAX_QUEUE_PER_PEER
257    * requests for this peer.  Used as a ring buffer, current
258    * offset is stored in 'last_request_times_off'.  If the
259    * oldest entry is more recent than the 'avg_delay', we should
260    * not send any more requests right now.
261    */
262   struct GNUNET_TIME_Absolute last_request_times[MAX_QUEUE_PER_PEER];
263
264   /**
265    * Handle for an active request for transmission to this
266    * peer, or NULL.
267    */
268   struct GNUNET_CORE_TransmitHandle *cth;
269
270   /**
271    * Messages (replies, queries, content migration) we would like to
272    * send to this peer in the near future.  Sorted by priority, head.
273    */
274   struct PendingMessage *pending_messages_head;
275
276   /**
277    * Messages (replies, queries, content migration) we would like to
278    * send to this peer in the near future.  Sorted by priority, tail.
279    */
280   struct PendingMessage *pending_messages_tail;
281
282   /**
283    * How long does it typically take for us to transmit a message
284    * to this peer?  (delay between the request being issued and
285    * the callback being invoked).
286    */
287   struct GNUNET_LOAD_Value *transmission_delay;
288
289   /**
290    * Context of our GNUNET_CORE_peer_change_preference call (or NULL).
291    */
292   struct GNUNET_CORE_InformationRequestContext *irc;
293
294   /**
295    * Request for which 'irc' is currently active (or NULL).
296    */
297   struct PendingRequest *pr;
298
299   /**
300    * Time when the last transmission request was issued.
301    */
302   struct GNUNET_TIME_Absolute last_transmission_request_start;
303
304   /**
305    * ID of delay task for scheduling transmission.
306    */
307   GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task;
308
309   /**
310    * Average priority of successful replies.  Calculated
311    * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
312    */
313   double avg_priority;
314
315   /**
316    * Increase in traffic preference still to be submitted
317    * to the core service for this peer.
318    */
319   uint64_t inc_preference;
320
321   /**
322    * Trust rating for this peer
323    */
324   uint32_t trust;
325
326   /**
327    * Trust rating for this peer on disk.
328    */
329   uint32_t disk_trust;
330
331   /**
332    * The peer's identity.
333    */
334   GNUNET_PEER_Id pid;  
335
336   /**
337    * Size of the linked list of 'pending_messages'.
338    */
339   unsigned int pending_requests;
340
341   /**
342    * Which offset in "last_p2p_replies" will be updated next?
343    * (we go round-robin).
344    */
345   unsigned int last_p2p_replies_woff;
346
347   /**
348    * Which offset in "last_client_replies" will be updated next?
349    * (we go round-robin).
350    */
351   unsigned int last_client_replies_woff;
352
353   /**
354    * Current offset into 'last_request_times' ring buffer.
355    */
356   unsigned int last_request_times_off;
357
358 };
359
360
361 /**
362  * Information we keep for each pending request.  We should try to
363  * keep this struct as small as possible since its memory consumption
364  * is key to how many requests we can have pending at once.
365  */
366 struct PendingRequest;
367
368
369 /**
370  * Doubly-linked list of requests we are performing
371  * on behalf of the same client.
372  */
373 struct ClientRequestList
374 {
375
376   /**
377    * This is a doubly-linked list.
378    */
379   struct ClientRequestList *next;
380
381   /**
382    * This is a doubly-linked list.
383    */
384   struct ClientRequestList *prev;
385
386   /**
387    * Request this entry represents.
388    */
389   struct PendingRequest *req;
390
391   /**
392    * Client list this request belongs to.
393    */
394   struct ClientList *client_list;
395
396 };
397
398
399 /**
400  * Replies to be transmitted to the client.  The actual
401  * response message is allocated after this struct.
402  */
403 struct ClientResponseMessage
404 {
405   /**
406    * This is a doubly-linked list.
407    */
408   struct ClientResponseMessage *next;
409
410   /**
411    * This is a doubly-linked list.
412    */
413   struct ClientResponseMessage *prev;
414
415   /**
416    * Client list entry this response belongs to.
417    */
418   struct ClientList *client_list;
419
420   /**
421    * Number of bytes in the response.
422    */
423   size_t msize;
424 };
425
426
427 /**
428  * Linked list of clients we are performing requests
429  * for right now.
430  */
431 struct ClientList
432 {
433   /**
434    * This is a linked list.
435    */
436   struct ClientList *next;
437
438   /**
439    * ID of a client making a request, NULL if this entry is for a
440    * peer.
441    */
442   struct GNUNET_SERVER_Client *client;
443
444   /**
445    * Head of list of requests performed on behalf
446    * of this client right now.
447    */
448   struct ClientRequestList *rl_head;
449
450   /**
451    * Tail of list of requests performed on behalf
452    * of this client right now.
453    */
454   struct ClientRequestList *rl_tail;
455
456   /**
457    * Head of linked list of responses.
458    */
459   struct ClientResponseMessage *res_head;
460
461   /**
462    * Tail of linked list of responses.
463    */
464   struct ClientResponseMessage *res_tail;
465
466   /**
467    * Context for sending replies.
468    */
469   struct GNUNET_CONNECTION_TransmitHandle *th;
470
471 };
472
473
474 /**
475  * Information about a peer that we have forwarded this
476  * request to already.  
477  */
478 struct UsedTargetEntry
479 {
480   /**
481    * What was the last time we have transmitted this request to this
482    * peer?
483    */
484   struct GNUNET_TIME_Absolute last_request_time;
485
486   /**
487    * How often have we transmitted this request to this peer?
488    */
489   unsigned int num_requests;
490
491   /**
492    * PID of the target peer.
493    */
494   GNUNET_PEER_Id pid;
495
496 };
497
498
499 /**
500  * Doubly-linked list of messages we are performing
501  * due to a pending request.
502  */
503 struct PendingMessageList
504 {
505
506   /**
507    * This is a doubly-linked list of messages on behalf of the same request.
508    */
509   struct PendingMessageList *next;
510
511   /**
512    * This is a doubly-linked list of messages on behalf of the same request.
513    */
514   struct PendingMessageList *prev;
515
516   /**
517    * Message this entry represents.
518    */
519   struct PendingMessage *pm;
520
521   /**
522    * Request this entry belongs to.
523    */
524   struct PendingRequest *req;
525
526   /**
527    * Peer this message is targeted for.
528    */
529   struct ConnectedPeer *target;
530
531 };
532
533
534 /**
535  * Information we keep for each pending request.  We should try to
536  * keep this struct as small as possible since its memory consumption
537  * is key to how many requests we can have pending at once.
538  */
539 struct PendingRequest
540 {
541
542   /**
543    * If this request was made by a client, this is our entry in the
544    * client request list; otherwise NULL.
545    */
546   struct ClientRequestList *client_request_list;
547
548   /**
549    * Entry of peer responsible for this entry (if this request
550    * was made by a peer).
551    */
552   struct ConnectedPeer *cp;
553
554   /**
555    * If this is a namespace query, pointer to the hash of the public
556    * key of the namespace; otherwise NULL.  Pointer will be to the 
557    * end of this struct (so no need to free it).
558    */
559   const GNUNET_HashCode *namespace;
560
561   /**
562    * Bloomfilter we use to filter out replies that we don't care about
563    * (anymore).  NULL as long as we are interested in all replies.
564    */
565   struct GNUNET_CONTAINER_BloomFilter *bf;
566
567   /**
568    * Reference to DHT get operation for this request (or NULL).
569    */
570   struct GNUNET_DHT_GetHandle *dht_get;
571
572   /**
573    * Context of our GNUNET_CORE_peer_change_preference call.
574    */
575   struct ConnectedPeer *pirc;
576
577   /**
578    * Hash code of all replies that we have seen so far (only valid
579    * if client is not NULL since we only track replies like this for
580    * our own clients).
581    */
582   GNUNET_HashCode *replies_seen;
583
584   /**
585    * Node in the heap representing this entry; NULL
586    * if we have no heap node.
587    */
588   struct GNUNET_CONTAINER_HeapNode *hnode;
589
590   /**
591    * Head of list of messages being performed on behalf of this
592    * request.
593    */
594   struct PendingMessageList *pending_head;
595
596   /**
597    * Tail of list of messages being performed on behalf of this
598    * request.
599    */
600   struct PendingMessageList *pending_tail;
601
602   /**
603    * When did we first see this request (form this peer), or, if our
604    * client is initiating, when did we last initiate a search?
605    */
606   struct GNUNET_TIME_Absolute start_time;
607
608   /**
609    * The query that this request is for.
610    */
611   GNUNET_HashCode query;
612
613   /**
614    * The task responsible for transmitting queries
615    * for this request.
616    */
617   GNUNET_SCHEDULER_TaskIdentifier task;
618
619   /**
620    * (Interned) Peer identifier that identifies a preferred target
621    * for requests.
622    */
623   GNUNET_PEER_Id target_pid;
624
625   /**
626    * (Interned) Peer identifiers of peers that have already
627    * received our query for this content.
628    */
629   struct UsedTargetEntry *used_targets;
630   
631   /**
632    * Our entry in the queue (non-NULL while we wait for our
633    * turn to interact with the local database).
634    */
635   struct GNUNET_DATASTORE_QueueEntry *qe;
636
637   /**
638    * Size of the 'bf' (in bytes).
639    */
640   size_t bf_size;
641
642   /**
643    * Desired anonymity level; only valid for requests from a local client.
644    */
645   uint32_t anonymity_level;
646
647   /**
648    * How many entries in "used_targets" are actually valid?
649    */
650   unsigned int used_targets_off;
651
652   /**
653    * How long is the "used_targets" array?
654    */
655   unsigned int used_targets_size;
656
657   /**
658    * Number of results found for this request.
659    */
660   unsigned int results_found;
661
662   /**
663    * How many entries in "replies_seen" are actually valid?
664    */
665   unsigned int replies_seen_off;
666
667   /**
668    * How long is the "replies_seen" array?
669    */
670   unsigned int replies_seen_size;
671   
672   /**
673    * Priority with which this request was made.  If one of our clients
674    * made the request, then this is the current priority that we are
675    * using when initiating the request.  This value is used when
676    * we decide to reward other peers with trust for providing a reply.
677    */
678   uint32_t priority;
679
680   /**
681    * Priority points left for us to spend when forwarding this request
682    * to other peers.
683    */
684   uint32_t remaining_priority;
685
686   /**
687    * Number to mingle hashes for bloom-filter tests with.
688    */
689   int32_t mingle;
690
691   /**
692    * TTL with which we saw this request (or, if we initiated, TTL that
693    * we used for the request).
694    */
695   int32_t ttl;
696   
697   /**
698    * Type of the content that this request is for.
699    */
700   enum GNUNET_BLOCK_Type type;
701
702   /**
703    * Remove this request after transmission of the current response.
704    */
705   int8_t do_remove;
706
707   /**
708    * GNUNET_YES if we should not forward this request to other peers.
709    */
710   int8_t local_only;
711
712   /**
713    * GNUNET_YES if we should not forward this request to other peers.
714    */
715   int8_t forward_only;
716
717 };
718
719
720 /**
721  * Block that is ready for migration to other peers.  Actual data is at the end of the block.
722  */
723 struct MigrationReadyBlock
724 {
725
726   /**
727    * This is a doubly-linked list.
728    */
729   struct MigrationReadyBlock *next;
730
731   /**
732    * This is a doubly-linked list.
733    */
734   struct MigrationReadyBlock *prev;
735
736   /**
737    * Query for the block.
738    */
739   GNUNET_HashCode query;
740
741   /**
742    * When does this block expire? 
743    */
744   struct GNUNET_TIME_Absolute expiration;
745
746   /**
747    * Peers we would consider forwarding this
748    * block to.  Zero for empty entries.
749    */
750   GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
751
752   /**
753    * Size of the block.
754    */
755   size_t size;
756
757   /**
758    *  Number of targets already used.
759    */
760   unsigned int used_targets;
761
762   /**
763    * Type of the block.
764    */
765   enum GNUNET_BLOCK_Type type;
766 };
767
768 /**
769  * Identity of this peer.
770  */
771 static struct GNUNET_PeerIdentity my_id;
772
773 /**
774  * Our connection to the datastore.
775  */
776 static struct GNUNET_DATASTORE_Handle *dsh;
777
778 /**
779  * Our block context.
780  */
781 static struct GNUNET_BLOCK_Context *block_ctx;
782
783 /**
784  * Our block configuration.
785  */
786 static struct GNUNET_CONFIGURATION_Handle *block_cfg;
787
788 /**
789  * Our configuration.
790  */
791 static const struct GNUNET_CONFIGURATION_Handle *cfg;
792
793 /**
794  * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
795  */
796 static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
797
798 /**
799  * Map of peer identifiers to "struct PendingRequest" (for that peer).
800  */
801 static struct GNUNET_CONTAINER_MultiHashMap *peer_request_map;
802
803 /**
804  * Map of query identifiers to "struct PendingRequest" (for that query).
805  */
806 static struct GNUNET_CONTAINER_MultiHashMap *query_request_map;
807
808 /**
809  * Heap with the request that will expire next at the top.  Contains
810  * pointers of type "struct PendingRequest*"; these will *also* be
811  * aliased from the "requests_by_peer" data structures and the
812  * "requests_by_query" table.  Note that requests from our clients
813  * don't expire and are thus NOT in the "requests_by_expiration"
814  * (or the "requests_by_peer" tables).
815  */
816 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
817
818 /**
819  * Handle for reporting statistics.
820  */
821 static struct GNUNET_STATISTICS_Handle *stats;
822
823 /**
824  * Linked list of clients we are currently processing requests for.
825  */
826 static struct ClientList *client_list;
827
828 /**
829  * Pointer to handle to the core service (points to NULL until we've
830  * connected to it).
831  */
832 static struct GNUNET_CORE_Handle *core;
833
834 /**
835  * Head of linked list of blocks that can be migrated.
836  */
837 static struct MigrationReadyBlock *mig_head;
838
839 /**
840  * Tail of linked list of blocks that can be migrated.
841  */
842 static struct MigrationReadyBlock *mig_tail;
843
844 /**
845  * Request to datastore for migration (or NULL).
846  */
847 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
848
849 /**
850  * Request to datastore for DHT PUTs (or NULL).
851  */
852 static struct GNUNET_DATASTORE_QueueEntry *dht_qe;
853
854 /**
855  * Type we will request for the next DHT PUT round from the datastore.
856  */
857 static enum GNUNET_BLOCK_Type dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
858
859 /**
860  * Where do we store trust information?
861  */
862 static char *trustDirectory;
863
864 /**
865  * ID of task that collects blocks for migration.
866  */
867 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
868
869 /**
870  * ID of task that collects blocks for DHT PUTs.
871  */
872 static GNUNET_SCHEDULER_TaskIdentifier dht_task;
873
874 /**
875  * What is the maximum frequency at which we are allowed to
876  * poll the datastore for migration content?
877  */
878 static struct GNUNET_TIME_Relative min_migration_delay;
879
880 /**
881  * Handle for DHT operations.
882  */
883 static struct GNUNET_DHT_Handle *dht_handle;
884
885 /**
886  * Size of the doubly-linked list of migration blocks.
887  */
888 static unsigned int mig_size;
889
890 /**
891  * Are we allowed to migrate content to this peer.
892  */
893 static int active_to_migration;
894
895 /**
896  * Are we allowed to push out content from this peer.
897  */
898 static int active_from_migration;
899
900 /**
901  * How many entires with zero anonymity do we currently estimate
902  * to have in the database?
903  */
904 static unsigned int zero_anonymity_count_estimate;
905
906 /**
907  * Typical priorities we're seeing from other peers right now.  Since
908  * most priorities will be zero, this value is the weighted average of
909  * non-zero priorities seen "recently".  In order to ensure that new
910  * values do not dramatically change the ratio, values are first
911  * "capped" to a reasonable range (+N of the current value) and then
912  * averaged into the existing value by a ratio of 1:N.  Hence
913  * receiving the largest possible priority can still only raise our
914  * "current_priorities" by at most 1.
915  */
916 static double current_priorities;
917
918 /**
919  * Datastore 'GET' load tracking.
920  */
921 static struct GNUNET_LOAD_Value *datastore_get_load;
922
923 /**
924  * Datastore 'PUT' load tracking.
925  */
926 static struct GNUNET_LOAD_Value *datastore_put_load;
927
928 /**
929  * How long do requests typically stay in the routing table?
930  */
931 static struct GNUNET_LOAD_Value *rt_entry_lifetime;
932
933 /**
934  * How many query messages have we received 'recently' that 
935  * have not yet been claimed as cover traffic?
936  */
937 static unsigned int cover_query_count;
938
939 /**
940  * How many content messages have we received 'recently' that 
941  * have not yet been claimed as cover traffic?
942  */
943 static unsigned int cover_content_count;
944
945 /**
946  * ID of our task that we use to age the cover counters.
947  */
948 static GNUNET_SCHEDULER_TaskIdentifier cover_age_task;
949
950 static void
951 age_cover_counters (void *cls,
952                     const struct GNUNET_SCHEDULER_TaskContext *tc)
953 {
954   cover_content_count = (cover_content_count * 15) / 16;
955   cover_query_count = (cover_query_count * 15) / 16;
956   cover_age_task = GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY,
957                                                  &age_cover_counters,
958                                                  NULL);
959 }
960
961 /**
962  * We've just now completed a datastore request.  Update our
963  * datastore load calculations.
964  *
965  * @param start time when the datastore request was issued
966  */
967 static void
968 update_datastore_delays (struct GNUNET_TIME_Absolute start)
969 {
970   struct GNUNET_TIME_Relative delay;
971
972   delay = GNUNET_TIME_absolute_get_duration (start);
973   GNUNET_LOAD_update (datastore_get_load,
974                       delay.rel_value);
975 }
976
977
978 /**
979  * Get the filename under which we would store the GNUNET_HELLO_Message
980  * for the given host and protocol.
981  * @return filename of the form DIRECTORY/HOSTID
982  */
983 static char *
984 get_trust_filename (const struct GNUNET_PeerIdentity *id)
985 {
986   struct GNUNET_CRYPTO_HashAsciiEncoded fil;
987   char *fn;
988
989   GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
990   GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil);
991   return fn;
992 }
993
994
995
996 /**
997  * Transmit messages by copying it to the target buffer
998  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
999  * for writing in the meantime.  In that case, do nothing
1000  * (the disconnect or shutdown handler will take care of the rest).
1001  * If we were able to transmit messages and there are still more
1002  * pending, ask core again for further calls to this function.
1003  *
1004  * @param cls closure, pointer to the 'struct ConnectedPeer*'
1005  * @param size number of bytes available in buf
1006  * @param buf where the callee should write the message
1007  * @return number of bytes written to buf
1008  */
1009 static size_t
1010 transmit_to_peer (void *cls,
1011                   size_t size, void *buf);
1012
1013
1014 /* ******************* clean up functions ************************ */
1015
1016 /**
1017  * Delete the given migration block.
1018  *
1019  * @param mb block to delete
1020  */
1021 static void
1022 delete_migration_block (struct MigrationReadyBlock *mb)
1023 {
1024   GNUNET_CONTAINER_DLL_remove (mig_head,
1025                                mig_tail,
1026                                mb);
1027   GNUNET_PEER_decrement_rcs (mb->target_list,
1028                              MIGRATION_LIST_SIZE);
1029   mig_size--;
1030   GNUNET_free (mb);
1031 }
1032
1033
1034 /**
1035  * Compare the distance of two peers to a key.
1036  *
1037  * @param key key
1038  * @param p1 first peer
1039  * @param p2 second peer
1040  * @return GNUNET_YES if P1 is closer to key than P2
1041  */
1042 static int
1043 is_closer (const GNUNET_HashCode *key,
1044            const struct GNUNET_PeerIdentity *p1,
1045            const struct GNUNET_PeerIdentity *p2)
1046 {
1047   return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey,
1048                                     &p2->hashPubKey,
1049                                     key);
1050 }
1051
1052
1053 /**
1054  * Consider migrating content to a given peer.
1055  *
1056  * @param cls 'struct MigrationReadyBlock*' to select
1057  *            targets for (or NULL for none)
1058  * @param key ID of the peer 
1059  * @param value 'struct ConnectedPeer' of the peer
1060  * @return GNUNET_YES (always continue iteration)
1061  */
1062 static int
1063 consider_migration (void *cls,
1064                     const GNUNET_HashCode *key,
1065                     void *value)
1066 {
1067   struct MigrationReadyBlock *mb = cls;
1068   struct ConnectedPeer *cp = value;
1069   struct MigrationReadyBlock *pos;
1070   struct GNUNET_PeerIdentity cppid;
1071   struct GNUNET_PeerIdentity otherpid;
1072   struct GNUNET_PeerIdentity worstpid;
1073   size_t msize;
1074   unsigned int i;
1075   unsigned int repl;
1076   
1077   /* consider 'cp' as a migration target for mb */
1078   if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).rel_value > 0)
1079     return GNUNET_YES; /* peer has requested no migration! */
1080   if (mb != NULL)
1081     {
1082       GNUNET_PEER_resolve (cp->pid,
1083                            &cppid);
1084       repl = MIGRATION_LIST_SIZE;
1085       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1086         {
1087           if (mb->target_list[i] == 0)
1088             {
1089               mb->target_list[i] = cp->pid;
1090               GNUNET_PEER_change_rc (mb->target_list[i], 1);
1091               repl = MIGRATION_LIST_SIZE;
1092               break;
1093             }
1094           GNUNET_PEER_resolve (mb->target_list[i],
1095                                &otherpid);
1096           if ( (repl == MIGRATION_LIST_SIZE) &&
1097                is_closer (&mb->query,
1098                           &cppid,
1099                           &otherpid)) 
1100             {
1101               repl = i;
1102               worstpid = otherpid;
1103             }
1104           else if ( (repl != MIGRATION_LIST_SIZE) &&
1105                     (is_closer (&mb->query,
1106                                 &worstpid,
1107                                 &otherpid) ) )
1108             {
1109               repl = i;
1110               worstpid = otherpid;
1111             }       
1112         }
1113       if (repl != MIGRATION_LIST_SIZE) 
1114         {
1115           GNUNET_PEER_change_rc (mb->target_list[repl], -1);
1116           mb->target_list[repl] = cp->pid;
1117           GNUNET_PEER_change_rc (mb->target_list[repl], 1);
1118         }
1119     }
1120
1121   /* consider scheduling transmission to cp for content migration */
1122   if (cp->cth != NULL)        
1123     return GNUNET_YES; 
1124   msize = 0;
1125   pos = mig_head;
1126   while (pos != NULL)
1127     {
1128       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1129         {
1130           if (cp->pid == pos->target_list[i])
1131             {
1132               if (msize == 0)
1133                 msize = pos->size;
1134               else
1135                 msize = GNUNET_MIN (msize,
1136                                     pos->size);
1137               break;
1138             }
1139         }
1140       pos = pos->next;
1141     }
1142   if (msize == 0)
1143     return GNUNET_YES; /* no content available */
1144 #if DEBUG_FS
1145   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1146               "Trying to migrate at least %u bytes to peer `%s'\n",
1147               msize,
1148               GNUNET_h2s (key));
1149 #endif
1150   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
1151     {
1152       GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task);
1153       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
1154     }
1155   cp->cth 
1156     = GNUNET_CORE_notify_transmit_ready (core,
1157                                          0, GNUNET_TIME_UNIT_FOREVER_REL,
1158                                          (const struct GNUNET_PeerIdentity*) key,
1159                                          msize + sizeof (struct PutMessage),
1160                                          &transmit_to_peer,
1161                                          cp);
1162   return GNUNET_YES;
1163 }
1164
1165
1166 /**
1167  * Task that is run periodically to obtain blocks for content
1168  * migration
1169  * 
1170  * @param cls unused
1171  * @param tc scheduler context (also unused)
1172  */
1173 static void
1174 gather_migration_blocks (void *cls,
1175                          const struct GNUNET_SCHEDULER_TaskContext *tc);
1176
1177
1178
1179
1180 /**
1181  * Task that is run periodically to obtain blocks for DHT PUTs.
1182  * 
1183  * @param cls type of blocks to gather
1184  * @param tc scheduler context (unused)
1185  */
1186 static void
1187 gather_dht_put_blocks (void *cls,
1188                        const struct GNUNET_SCHEDULER_TaskContext *tc);
1189
1190
1191 /**
1192  * If the migration task is not currently running, consider
1193  * (re)scheduling it with the appropriate delay.
1194  */
1195 static void
1196 consider_migration_gathering ()
1197 {
1198   struct GNUNET_TIME_Relative delay;
1199
1200   if (dsh == NULL)
1201     return;
1202   if (mig_qe != NULL)
1203     return;
1204   if (mig_task != GNUNET_SCHEDULER_NO_TASK)
1205     return;
1206   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
1207                                          mig_size);
1208   delay = GNUNET_TIME_relative_divide (delay,
1209                                        MAX_MIGRATION_QUEUE);
1210   delay = GNUNET_TIME_relative_max (delay,
1211                                     min_migration_delay);
1212   mig_task = GNUNET_SCHEDULER_add_delayed (delay,
1213                                            &gather_migration_blocks,
1214                                            NULL);
1215 }
1216
1217
1218 /**
1219  * If the DHT PUT gathering task is not currently running, consider
1220  * (re)scheduling it with the appropriate delay.
1221  */
1222 static void
1223 consider_dht_put_gathering (void *cls)
1224 {
1225   struct GNUNET_TIME_Relative delay;
1226
1227   if (dsh == NULL)
1228     return;
1229   if (dht_qe != NULL)
1230     return;
1231   if (dht_task != GNUNET_SCHEDULER_NO_TASK)
1232     return;
1233   if (zero_anonymity_count_estimate > 0)
1234     {
1235       delay = GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY,
1236                                            zero_anonymity_count_estimate);
1237       delay = GNUNET_TIME_relative_min (delay,
1238                                         MAX_DHT_PUT_FREQ);
1239     }
1240   else
1241     {
1242       /* if we have NO zero-anonymity content yet, wait 5 minutes for some to
1243          (hopefully) appear */
1244       delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5);
1245     }
1246   dht_task = GNUNET_SCHEDULER_add_delayed (delay,
1247                                            &gather_dht_put_blocks,
1248                                            cls);
1249 }
1250
1251
1252 /**
1253  * Process content offered for migration.
1254  *
1255  * @param cls closure
1256  * @param key key for the content
1257  * @param size number of bytes in data
1258  * @param data content stored
1259  * @param type type of the content
1260  * @param priority priority of the content
1261  * @param anonymity anonymity-level for the content
1262  * @param expiration expiration time for the content
1263  * @param uid unique identifier for the datum;
1264  *        maybe 0 if no unique identifier is available
1265  */
1266 static void
1267 process_migration_content (void *cls,
1268                            const GNUNET_HashCode * key,
1269                            size_t size,
1270                            const void *data,
1271                            enum GNUNET_BLOCK_Type type,
1272                            uint32_t priority,
1273                            uint32_t anonymity,
1274                            struct GNUNET_TIME_Absolute
1275                            expiration, uint64_t uid)
1276 {
1277   struct MigrationReadyBlock *mb;
1278   
1279   if (key == NULL)
1280     {
1281       mig_qe = NULL;
1282       if (mig_size < MAX_MIGRATION_QUEUE)  
1283         consider_migration_gathering ();
1284       return;
1285     }
1286   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1287     {
1288       if (GNUNET_OK !=
1289           GNUNET_FS_handle_on_demand_block (key, size, data,
1290                                             type, priority, anonymity,
1291                                             expiration, uid, 
1292                                             &process_migration_content,
1293                                             NULL))
1294         {
1295           GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1296         }
1297       return;
1298     }
1299 #if DEBUG_FS
1300   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1301               "Retrieved block `%s' of type %u for migration\n",
1302               GNUNET_h2s (key),
1303               type);
1304 #endif
1305   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
1306   mb->query = *key;
1307   mb->expiration = expiration;
1308   mb->size = size;
1309   mb->type = type;
1310   memcpy (&mb[1], data, size);
1311   GNUNET_CONTAINER_DLL_insert_after (mig_head,
1312                                      mig_tail,
1313                                      mig_tail,
1314                                      mb);
1315   mig_size++;
1316   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1317                                          &consider_migration,
1318                                          mb);
1319   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1320 }
1321
1322
1323 /**
1324  * Function called upon completion of the DHT PUT operation.
1325  */
1326 static void
1327 dht_put_continuation (void *cls,
1328                       const struct GNUNET_SCHEDULER_TaskContext *tc)
1329 {
1330   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1331 }
1332
1333
1334 /**
1335  * Store content in DHT.
1336  *
1337  * @param cls closure
1338  * @param key key for the content
1339  * @param size number of bytes in data
1340  * @param data content stored
1341  * @param type type of the content
1342  * @param priority priority of the content
1343  * @param anonymity anonymity-level for the content
1344  * @param expiration expiration time for the content
1345  * @param uid unique identifier for the datum;
1346  *        maybe 0 if no unique identifier is available
1347  */
1348 static void
1349 process_dht_put_content (void *cls,
1350                          const GNUNET_HashCode * key,
1351                          size_t size,
1352                          const void *data,
1353                          enum GNUNET_BLOCK_Type type,
1354                          uint32_t priority,
1355                          uint32_t anonymity,
1356                          struct GNUNET_TIME_Absolute
1357                          expiration, uint64_t uid)
1358
1359   static unsigned int counter;
1360   static GNUNET_HashCode last_vhash;
1361   static GNUNET_HashCode vhash;
1362
1363   if (key == NULL)
1364     {
1365       dht_qe = NULL;
1366       consider_dht_put_gathering (cls);
1367       return;
1368     }
1369   /* slightly funky code to estimate the total number of values with zero
1370      anonymity from the maximum observed length of a monotonically increasing 
1371      sequence of hashes over the contents */
1372   GNUNET_CRYPTO_hash (data, size, &vhash);
1373   if (GNUNET_CRYPTO_hash_cmp (&vhash, &last_vhash) <= 0)
1374     {
1375       if (zero_anonymity_count_estimate > 0)
1376         zero_anonymity_count_estimate /= 2;
1377       counter = 0;
1378     }
1379   last_vhash = vhash;
1380   if (counter < 31)
1381     counter++;
1382   if (zero_anonymity_count_estimate < (1 << counter))
1383     zero_anonymity_count_estimate = (1 << counter);
1384 #if DEBUG_FS
1385   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1386               "Retrieved block `%s' of type %u for DHT PUT\n",
1387               GNUNET_h2s (key),
1388               type);
1389 #endif
1390   GNUNET_DHT_put (dht_handle,
1391                   key,
1392                   DEFAULT_PUT_REPLICATION,
1393                   GNUNET_DHT_RO_NONE,
1394                   type,
1395                   size,
1396                   data,
1397                   expiration,
1398                   GNUNET_TIME_UNIT_FOREVER_REL,
1399                   &dht_put_continuation,
1400                   cls);
1401 }
1402
1403
1404 /**
1405  * Task that is run periodically to obtain blocks for content
1406  * migration
1407  * 
1408  * @param cls unused
1409  * @param tc scheduler context (also unused)
1410  */
1411 static void
1412 gather_migration_blocks (void *cls,
1413                          const struct GNUNET_SCHEDULER_TaskContext *tc)
1414 {
1415   mig_task = GNUNET_SCHEDULER_NO_TASK;
1416   if (dsh != NULL)
1417     {
1418       mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, UINT_MAX,
1419                                             GNUNET_TIME_UNIT_FOREVER_REL,
1420                                             &process_migration_content, NULL);
1421       GNUNET_assert (mig_qe != NULL);
1422     }
1423 }
1424
1425
1426 /**
1427  * Task that is run periodically to obtain blocks for DHT PUTs.
1428  * 
1429  * @param cls type of blocks to gather
1430  * @param tc scheduler context (unused)
1431  */
1432 static void
1433 gather_dht_put_blocks (void *cls,
1434                        const struct GNUNET_SCHEDULER_TaskContext *tc)
1435 {
1436   dht_task = GNUNET_SCHEDULER_NO_TASK;
1437   if (dsh != NULL)
1438     {
1439       if (dht_put_type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1440         dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
1441       dht_qe = GNUNET_DATASTORE_get_zero_anonymity (dsh, 0, UINT_MAX,
1442                                                     GNUNET_TIME_UNIT_FOREVER_REL,
1443                                                     dht_put_type++,
1444                                                     &process_dht_put_content, NULL);
1445       GNUNET_assert (dht_qe != NULL);
1446     }
1447 }
1448
1449
1450 /**
1451  * We're done with a particular message list entry.
1452  * Free all associated resources.
1453  * 
1454  * @param pml entry to destroy
1455  */
1456 static void
1457 destroy_pending_message_list_entry (struct PendingMessageList *pml)
1458 {
1459   GNUNET_CONTAINER_DLL_remove (pml->req->pending_head,
1460                                pml->req->pending_tail,
1461                                pml);
1462   GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head,
1463                                pml->target->pending_messages_tail,
1464                                pml->pm);
1465   pml->target->pending_requests--;
1466   GNUNET_free (pml->pm);
1467   GNUNET_free (pml);
1468 }
1469
1470
1471 /**
1472  * Destroy the given pending message (and call the respective
1473  * continuation).
1474  *
1475  * @param pm message to destroy
1476  * @param tpid id of peer that the message was delivered to, or 0 for none
1477  */
1478 static void
1479 destroy_pending_message (struct PendingMessage *pm,
1480                          GNUNET_PEER_Id tpid)
1481 {
1482   struct PendingMessageList *pml = pm->pml;
1483   TransmissionContinuation cont;
1484   void *cont_cls;
1485
1486   cont = pm->cont;
1487   cont_cls = pm->cont_cls;
1488   if (pml != NULL)
1489     {
1490       GNUNET_assert (pml->pm == pm);
1491       GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
1492       destroy_pending_message_list_entry (pml);
1493     }
1494   else
1495     {
1496       GNUNET_free (pm);
1497     }
1498   if (cont != NULL)
1499     cont (cont_cls, tpid);  
1500 }
1501
1502
1503 /**
1504  * We're done processing a particular request.
1505  * Free all associated resources.
1506  *
1507  * @param pr request to destroy
1508  */
1509 static void
1510 destroy_pending_request (struct PendingRequest *pr)
1511 {
1512   struct GNUNET_PeerIdentity pid;
1513   unsigned int i;
1514
1515   if (pr->hnode != NULL)
1516     {
1517       GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
1518                                          pr->hnode);
1519       pr->hnode = NULL;
1520     }
1521   if (NULL == pr->client_request_list)
1522     {
1523       GNUNET_STATISTICS_update (stats,
1524                                 gettext_noop ("# P2P searches active"),
1525                                 -1,
1526                                 GNUNET_NO);
1527     }
1528   else
1529     {
1530       GNUNET_STATISTICS_update (stats,
1531                                 gettext_noop ("# client searches active"),
1532                                 -1,
1533                                 GNUNET_NO);
1534     }
1535   if (GNUNET_YES == 
1536       GNUNET_CONTAINER_multihashmap_remove (query_request_map,
1537                                             &pr->query,
1538                                             pr))
1539     {
1540       GNUNET_LOAD_update (rt_entry_lifetime,
1541                           GNUNET_TIME_absolute_get_duration (pr->start_time).rel_value);
1542     }
1543   if (pr->qe != NULL)
1544      {
1545       GNUNET_DATASTORE_cancel (pr->qe);
1546       pr->qe = NULL;
1547     }
1548   if (pr->dht_get != NULL)
1549     {
1550       GNUNET_DHT_get_stop (pr->dht_get);
1551       pr->dht_get = NULL;
1552     }
1553   if (pr->client_request_list != NULL)
1554     {
1555       GNUNET_CONTAINER_DLL_remove (pr->client_request_list->client_list->rl_head,
1556                                    pr->client_request_list->client_list->rl_tail,
1557                                    pr->client_request_list);
1558       GNUNET_free (pr->client_request_list);
1559       pr->client_request_list = NULL;
1560     }
1561   if (pr->cp != NULL)
1562     {
1563       GNUNET_PEER_resolve (pr->cp->pid,
1564                            &pid);
1565       (void) GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1566                                                    &pid.hashPubKey,
1567                                                    pr);
1568       pr->cp = NULL;
1569     }
1570   if (pr->bf != NULL)
1571     {
1572       GNUNET_CONTAINER_bloomfilter_free (pr->bf);                                        
1573       pr->bf = NULL;
1574     }
1575   if (pr->pirc != NULL)
1576     {
1577       GNUNET_CORE_peer_change_preference_cancel (pr->pirc->irc);
1578       pr->pirc->irc = NULL;
1579       pr->pirc = NULL;
1580     }
1581   if (pr->replies_seen != NULL)
1582     {
1583       GNUNET_free (pr->replies_seen);
1584       pr->replies_seen = NULL;
1585     }
1586   if (pr->task != GNUNET_SCHEDULER_NO_TASK)
1587     {
1588       GNUNET_SCHEDULER_cancel (pr->task);
1589       pr->task = GNUNET_SCHEDULER_NO_TASK;
1590     }
1591   while (NULL != pr->pending_head)    
1592     destroy_pending_message_list_entry (pr->pending_head);
1593   GNUNET_PEER_change_rc (pr->target_pid, -1);
1594   if (pr->used_targets != NULL)
1595     {
1596       for (i=0;i<pr->used_targets_off;i++)
1597         GNUNET_PEER_change_rc (pr->used_targets[i].pid, -1);
1598       GNUNET_free (pr->used_targets);
1599       pr->used_targets_off = 0;
1600       pr->used_targets_size = 0;
1601       pr->used_targets = NULL;
1602     }
1603   GNUNET_free (pr);
1604 }
1605
1606
1607 /**
1608  * Find latency information in 'atsi'.
1609  *
1610  * @param atsi performance data
1611  * @return connection latency
1612  */
1613 static struct GNUNET_TIME_Relative
1614 get_latency (const struct GNUNET_TRANSPORT_ATS_Information *atsi)
1615 {
1616   if (atsi == NULL)
1617     return GNUNET_TIME_UNIT_SECONDS;
1618   while ( (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR) &&
1619           (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_QUALITY_NET_DELAY) )
1620     atsi++;
1621   if (ntohl (atsi->type) == GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR) 
1622     {
1623       GNUNET_break (0);
1624       /* how can we not have latency data? */
1625       return GNUNET_TIME_UNIT_SECONDS;
1626     }
1627   return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
1628                                         ntohl (atsi->value));
1629 }
1630
1631
1632 /**
1633  * Method called whenever a given peer connects.
1634  *
1635  * @param cls closure, not used
1636  * @param peer peer identity this notification is about
1637  * @param atsi performance information
1638  */
1639 static void 
1640 peer_connect_handler (void *cls,
1641                       const struct
1642                       GNUNET_PeerIdentity * peer,
1643                       const struct GNUNET_TRANSPORT_ATS_Information *atsi)
1644 {
1645   struct ConnectedPeer *cp;
1646   struct MigrationReadyBlock *pos;
1647   char *fn;
1648   uint32_t trust;
1649   struct GNUNET_TIME_Relative latency;
1650
1651   if (0 == memcmp (&my_id, peer, sizeof (struct GNUNET_PeerIdentity)))
1652     return;
1653   latency = get_latency (atsi);
1654   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1655                                           &peer->hashPubKey);
1656   if (NULL != cp)
1657     {
1658       GNUNET_break (0);
1659       return;
1660     }
1661   cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
1662   cp->transmission_delay = GNUNET_LOAD_value_init (latency);
1663   cp->pid = GNUNET_PEER_intern (peer);
1664
1665   fn = get_trust_filename (peer);
1666   if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
1667       (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
1668     cp->disk_trust = cp->trust = ntohl (trust);
1669   GNUNET_free (fn);
1670
1671   GNUNET_break (GNUNET_OK ==
1672                 GNUNET_CONTAINER_multihashmap_put (connected_peers,
1673                                                    &peer->hashPubKey,
1674                                                    cp,
1675                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1676
1677   pos = mig_head;
1678   while (NULL != pos)
1679     {
1680       (void) consider_migration (pos, &peer->hashPubKey, cp);
1681       pos = pos->next;
1682     }
1683 }
1684
1685
1686 /**
1687  * Method called whenever a given peer has a status change.
1688  *
1689  * @param cls closure
1690  * @param peer peer identity this notification is about
1691  * @param bandwidth_in available amount of inbound bandwidth
1692  * @param bandwidth_out available amount of outbound bandwidth
1693  * @param timeout absolute time when this peer will time out
1694  *        unless we see some further activity from it
1695  * @param atsi status information
1696  */
1697 static void
1698 peer_status_handler (void *cls,
1699                      const struct
1700                      GNUNET_PeerIdentity * peer,
1701                      struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in,
1702                      struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
1703                      struct GNUNET_TIME_Absolute timeout,
1704                      const struct GNUNET_TRANSPORT_ATS_Information *atsi)
1705 {
1706   struct ConnectedPeer *cp;
1707   struct GNUNET_TIME_Relative latency;
1708
1709   latency = get_latency (atsi);
1710   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1711                                           &peer->hashPubKey);
1712   if (cp == NULL)
1713     {
1714       GNUNET_break (0);
1715       return;
1716     }
1717   GNUNET_LOAD_value_set_decline (cp->transmission_delay,
1718                                  latency);  
1719 }
1720
1721
1722
1723 /**
1724  * Increase the host credit by a value.
1725  *
1726  * @param host which peer to change the trust value on
1727  * @param value is the int value by which the
1728  *  host credit is to be increased or decreased
1729  * @returns the actual change in trust (positive or negative)
1730  */
1731 static int
1732 change_host_trust (struct ConnectedPeer *host, int value)
1733 {
1734   if (value == 0)
1735     return 0;
1736   GNUNET_assert (host != NULL);
1737   if (value > 0)
1738     {
1739       if (host->trust + value < host->trust)
1740         {
1741           value = UINT32_MAX - host->trust;
1742           host->trust = UINT32_MAX;
1743         }
1744       else
1745         host->trust += value;
1746     }
1747   else
1748     {
1749       if (host->trust < -value)
1750         {
1751           value = -host->trust;
1752           host->trust = 0;
1753         }
1754       else
1755         host->trust += value;
1756     }
1757   return value;
1758 }
1759
1760
1761 /**
1762  * Write host-trust information to a file - flush the buffer entry!
1763  */
1764 static int
1765 flush_trust (void *cls,
1766              const GNUNET_HashCode *key,
1767              void *value)
1768 {
1769   struct ConnectedPeer *host = value;
1770   char *fn;
1771   uint32_t trust;
1772   struct GNUNET_PeerIdentity pid;
1773
1774   if (host->trust == host->disk_trust)
1775     return GNUNET_OK;                     /* unchanged */
1776   GNUNET_PEER_resolve (host->pid,
1777                        &pid);
1778   fn = get_trust_filename (&pid);
1779   if (host->trust == 0)
1780     {
1781       if ((0 != UNLINK (fn)) && (errno != ENOENT))
1782         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
1783                                   GNUNET_ERROR_TYPE_BULK, "unlink", fn);
1784     }
1785   else
1786     {
1787       trust = htonl (host->trust);
1788       if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust, 
1789                                                     sizeof(uint32_t),
1790                                                     GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE
1791                                                     | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ))
1792         host->disk_trust = host->trust;
1793     }
1794   GNUNET_free (fn);
1795   return GNUNET_OK;
1796 }
1797
1798 /**
1799  * Call this method periodically to scan data/hosts for new hosts.
1800  */
1801 static void
1802 cron_flush_trust (void *cls,
1803                   const struct GNUNET_SCHEDULER_TaskContext *tc)
1804 {
1805
1806   if (NULL == connected_peers)
1807     return;
1808   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1809                                          &flush_trust,
1810                                          NULL);
1811   if (NULL == tc)
1812     return;
1813   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1814     return;
1815   GNUNET_SCHEDULER_add_delayed (TRUST_FLUSH_FREQ, &cron_flush_trust, NULL);
1816 }
1817
1818
1819 /**
1820  * Free (each) request made by the peer.
1821  *
1822  * @param cls closure, points to peer that the request belongs to
1823  * @param key current key code
1824  * @param value value in the hash map
1825  * @return GNUNET_YES (we should continue to iterate)
1826  */
1827 static int
1828 destroy_request (void *cls,
1829                  const GNUNET_HashCode * key,
1830                  void *value)
1831 {
1832   const struct GNUNET_PeerIdentity * peer = cls;
1833   struct PendingRequest *pr = value;
1834   
1835   GNUNET_break (GNUNET_YES ==
1836                 GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1837                                                       &peer->hashPubKey,
1838                                                       pr));
1839   destroy_pending_request (pr);
1840   return GNUNET_YES;
1841 }
1842
1843
1844 /**
1845  * Method called whenever a peer disconnects.
1846  *
1847  * @param cls closure, not used
1848  * @param peer peer identity this notification is about
1849  */
1850 static void
1851 peer_disconnect_handler (void *cls,
1852                          const struct
1853                          GNUNET_PeerIdentity * peer)
1854 {
1855   struct ConnectedPeer *cp;
1856   struct PendingMessage *pm;
1857   unsigned int i;
1858   struct MigrationReadyBlock *pos;
1859   struct MigrationReadyBlock *next;
1860
1861   if (0 == memcmp (&my_id, peer, sizeof (struct GNUNET_PeerIdentity)))
1862     return;
1863   GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map,
1864                                               &peer->hashPubKey,
1865                                               &destroy_request,
1866                                               (void*) peer);
1867   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1868                                           &peer->hashPubKey);
1869   if (cp == NULL)
1870     return;
1871   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1872     {
1873       if (NULL != cp->last_client_replies[i])
1874         {
1875           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1876           cp->last_client_replies[i] = NULL;
1877         }
1878     }
1879   GNUNET_break (GNUNET_YES ==
1880                 GNUNET_CONTAINER_multihashmap_remove (connected_peers,
1881                                                       &peer->hashPubKey,
1882                                                       cp));
1883   if (cp->irc != NULL)
1884     {
1885       GNUNET_CORE_peer_change_preference_cancel (cp->irc);
1886       cp->irc = NULL;
1887       cp->pr->pirc = NULL;
1888       cp->pr = NULL;
1889     }
1890
1891   /* remove this peer from migration considerations; schedule
1892      alternatives */
1893   next = mig_head;
1894   while (NULL != (pos = next))
1895     {
1896       next = pos->next;
1897       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1898         {
1899           if (pos->target_list[i] == cp->pid)
1900             {
1901               GNUNET_PEER_change_rc (pos->target_list[i], -1);
1902               pos->target_list[i] = 0;
1903             }
1904          }
1905       if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers))
1906         {
1907           delete_migration_block (pos);
1908           consider_migration_gathering ();
1909           continue;
1910         }
1911       GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1912                                              &consider_migration,
1913                                              pos);
1914     }
1915   GNUNET_PEER_change_rc (cp->pid, -1);
1916   GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1917   if (NULL != cp->cth)
1918     {
1919       GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1920       cp->cth = NULL;
1921     }
1922   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
1923     {
1924       GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task);
1925       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
1926     }
1927   while (NULL != (pm = cp->pending_messages_head))
1928     destroy_pending_message (pm, 0 /* delivery failed */);
1929   GNUNET_LOAD_value_free (cp->transmission_delay);
1930   GNUNET_break (0 == cp->pending_requests);
1931   GNUNET_free (cp);
1932 }
1933
1934
1935 /**
1936  * Iterator over hash map entries that removes all occurences
1937  * of the given 'client' from the 'last_client_replies' of the
1938  * given connected peer.
1939  *
1940  * @param cls closure, the 'struct GNUNET_SERVER_Client*' to remove
1941  * @param key current key code (unused)
1942  * @param value value in the hash map (the 'struct ConnectedPeer*' to change)
1943  * @return GNUNET_YES (we should continue to iterate)
1944  */
1945 static int
1946 remove_client_from_last_client_replies (void *cls,
1947                                         const GNUNET_HashCode * key,
1948                                         void *value)
1949 {
1950   struct GNUNET_SERVER_Client *client = cls;
1951   struct ConnectedPeer *cp = value;
1952   unsigned int i;
1953
1954   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1955     {
1956       if (cp->last_client_replies[i] == client)
1957         {
1958           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1959           cp->last_client_replies[i] = NULL;
1960         }
1961     }  
1962   return GNUNET_YES;
1963 }
1964
1965
1966 /**
1967  * A client disconnected.  Remove all of its pending queries.
1968  *
1969  * @param cls closure, NULL
1970  * @param client identification of the client
1971  */
1972 static void
1973 handle_client_disconnect (void *cls,
1974                           struct GNUNET_SERVER_Client
1975                           * client)
1976 {
1977   struct ClientList *pos;
1978   struct ClientList *prev;
1979   struct ClientRequestList *rcl;
1980   struct ClientResponseMessage *creply;
1981
1982   if (client == NULL)
1983     return;
1984   prev = NULL;
1985   pos = client_list;
1986   while ( (NULL != pos) &&
1987           (pos->client != client) )
1988     {
1989       prev = pos;
1990       pos = pos->next;
1991     }
1992   if (pos == NULL)
1993     return; /* no requests pending for this client */
1994   while (NULL != (rcl = pos->rl_head))
1995     {
1996       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1997                   "Destroying pending request `%s' on disconnect\n",
1998                   GNUNET_h2s (&rcl->req->query));
1999       destroy_pending_request (rcl->req);
2000     }
2001   if (prev == NULL)
2002     client_list = pos->next;
2003   else
2004     prev->next = pos->next;
2005   if (pos->th != NULL)
2006     {
2007       GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
2008       pos->th = NULL;
2009     }
2010   while (NULL != (creply = pos->res_head))
2011     {
2012       GNUNET_CONTAINER_DLL_remove (pos->res_head,
2013                                    pos->res_tail,
2014                                    creply);
2015       GNUNET_free (creply);
2016     }    
2017   GNUNET_SERVER_client_drop (pos->client);
2018   GNUNET_free (pos);
2019   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
2020                                          &remove_client_from_last_client_replies,
2021                                          client);
2022 }
2023
2024
2025 /**
2026  * Iterator to free peer entries.
2027  *
2028  * @param cls closure, unused
2029  * @param key current key code
2030  * @param value value in the hash map (peer entry)
2031  * @return GNUNET_YES (we should continue to iterate)
2032  */
2033 static int 
2034 clean_peer (void *cls,
2035             const GNUNET_HashCode * key,
2036             void *value)
2037 {
2038   peer_disconnect_handler (NULL, (const struct GNUNET_PeerIdentity*) key);
2039   return GNUNET_YES;
2040 }
2041
2042
2043 /**
2044  * Task run during shutdown.
2045  *
2046  * @param cls unused
2047  * @param tc unused
2048  */
2049 static void
2050 shutdown_task (void *cls,
2051                const struct GNUNET_SCHEDULER_TaskContext *tc)
2052 {
2053   if (mig_qe != NULL)
2054     {
2055       GNUNET_DATASTORE_cancel (mig_qe);
2056       mig_qe = NULL;
2057     }
2058   if (dht_qe != NULL)
2059     {
2060       GNUNET_DATASTORE_cancel (dht_qe);
2061       dht_qe = NULL;
2062     }
2063   if (GNUNET_SCHEDULER_NO_TASK != mig_task)
2064     {
2065       GNUNET_SCHEDULER_cancel (mig_task);
2066       mig_task = GNUNET_SCHEDULER_NO_TASK;
2067     }
2068   if (GNUNET_SCHEDULER_NO_TASK != dht_task)
2069     {
2070       GNUNET_SCHEDULER_cancel (dht_task);
2071       dht_task = GNUNET_SCHEDULER_NO_TASK;
2072     }
2073   while (client_list != NULL)
2074     handle_client_disconnect (NULL,
2075                               client_list->client);
2076   cron_flush_trust (NULL, NULL);
2077   GNUNET_assert (NULL != core);
2078   GNUNET_CORE_disconnect (core);
2079   core = NULL;
2080   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
2081                                          &clean_peer,
2082                                          NULL);
2083   GNUNET_break (0 == GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap));
2084   GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
2085   requests_by_expiration_heap = 0;
2086   GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
2087   connected_peers = NULL;
2088   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map));
2089   GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
2090   query_request_map = NULL;
2091   GNUNET_LOAD_value_free (rt_entry_lifetime);
2092   rt_entry_lifetime = NULL;
2093   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map));
2094   GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
2095   peer_request_map = NULL;
2096   if (stats != NULL)
2097     {
2098       GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
2099       stats = NULL;
2100     }
2101   if (dsh != NULL)
2102     {
2103       GNUNET_DATASTORE_disconnect (dsh,
2104                                    GNUNET_NO);
2105       dsh = NULL;
2106     }
2107   while (mig_head != NULL)
2108     delete_migration_block (mig_head);
2109   GNUNET_assert (0 == mig_size);
2110   GNUNET_DHT_disconnect (dht_handle);
2111   dht_handle = NULL;
2112   GNUNET_LOAD_value_free (datastore_get_load);
2113   datastore_get_load = NULL;
2114   GNUNET_LOAD_value_free (datastore_put_load);
2115   datastore_put_load = NULL;
2116   GNUNET_BLOCK_context_destroy (block_ctx);
2117   block_ctx = NULL;
2118   GNUNET_CONFIGURATION_destroy (block_cfg);
2119   block_cfg = NULL;
2120   cfg = NULL;  
2121   GNUNET_free_non_null (trustDirectory);
2122   trustDirectory = NULL;
2123   GNUNET_SCHEDULER_cancel (cover_age_task);
2124   cover_age_task = GNUNET_SCHEDULER_NO_TASK;
2125 }
2126
2127
2128 /* ******************* Utility functions  ******************** */
2129
2130
2131 /**
2132  * We've had to delay a request for transmission to core, but now
2133  * we should be ready.  Run it.
2134  *
2135  * @param cls the 'struct ConnectedPeer' for which a request was delayed
2136  * @param tc task context (unused)
2137  */
2138 static void
2139 delayed_transmission_request (void *cls,
2140                               const struct GNUNET_SCHEDULER_TaskContext *tc)
2141 {
2142   struct ConnectedPeer *cp = cls;
2143   struct GNUNET_PeerIdentity pid;
2144   struct PendingMessage *pm;
2145
2146   pm = cp->pending_messages_head;
2147   cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
2148   GNUNET_assert (cp->cth == NULL);
2149   if (pm == NULL)
2150     return;
2151   GNUNET_PEER_resolve (cp->pid,
2152                        &pid);
2153   cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
2154   cp->cth = GNUNET_CORE_notify_transmit_ready (core,
2155                                                pm->priority,
2156                                                GNUNET_CONSTANTS_SERVICE_TIMEOUT,
2157                                                &pid,
2158                                                pm->msize,
2159                                                &transmit_to_peer,
2160                                                cp);
2161 }
2162
2163
2164 /**
2165  * Transmit messages by copying it to the target buffer
2166  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
2167  * for writing in the meantime.  In that case, do nothing
2168  * (the disconnect or shutdown handler will take care of the rest).
2169  * If we were able to transmit messages and there are still more
2170  * pending, ask core again for further calls to this function.
2171  *
2172  * @param cls closure, pointer to the 'struct ConnectedPeer*'
2173  * @param size number of bytes available in buf
2174  * @param buf where the callee should write the message
2175  * @return number of bytes written to buf
2176  */
2177 static size_t
2178 transmit_to_peer (void *cls,
2179                   size_t size, void *buf)
2180 {
2181   struct ConnectedPeer *cp = cls;
2182   char *cbuf = buf;
2183   struct PendingMessage *pm;
2184   struct PendingMessage *next_pm;
2185   struct GNUNET_TIME_Absolute now;
2186   struct GNUNET_TIME_Relative min_delay;
2187   struct MigrationReadyBlock *mb;
2188   struct MigrationReadyBlock *next;
2189   struct PutMessage migm;
2190   size_t msize;
2191   unsigned int i;
2192   struct GNUNET_PeerIdentity pid;
2193  
2194   cp->cth = NULL;
2195   if (NULL == buf)
2196     {
2197 #if DEBUG_FS
2198       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2199                   "Dropping message, core too busy.\n");
2200 #endif
2201       GNUNET_LOAD_update (cp->transmission_delay,
2202                           UINT64_MAX);
2203       
2204       if (NULL != (pm = cp->pending_messages_head))
2205         {
2206           GNUNET_CONTAINER_DLL_remove (cp->pending_messages_head,
2207                                        cp->pending_messages_tail,
2208                                        pm);
2209           cp->pending_requests--;    
2210           destroy_pending_message (pm, 0);
2211         }
2212       if (NULL != (pm = cp->pending_messages_head))
2213         {
2214           GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == cp->delayed_transmission_request_task);
2215           min_delay = GNUNET_TIME_absolute_get_remaining (pm->delay_until);
2216           cp->delayed_transmission_request_task
2217             = GNUNET_SCHEDULER_add_delayed (min_delay,
2218                                             &delayed_transmission_request,
2219                                             cp);
2220         }
2221       return 0;
2222     }  
2223   GNUNET_LOAD_update (cp->transmission_delay,
2224                       GNUNET_TIME_absolute_get_duration (cp->last_transmission_request_start).rel_value);
2225   now = GNUNET_TIME_absolute_get ();
2226   msize = 0;
2227   min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
2228   next_pm = cp->pending_messages_head;
2229   while ( (NULL != (pm = next_pm) ) &&
2230           (pm->msize <= size) )
2231     {
2232       next_pm = pm->next;
2233       if (pm->delay_until.abs_value > now.abs_value)
2234         {
2235           min_delay = GNUNET_TIME_relative_min (min_delay,
2236                                                 GNUNET_TIME_absolute_get_remaining (pm->delay_until));
2237           continue;
2238         }
2239       memcpy (&cbuf[msize], &pm[1], pm->msize);
2240       msize += pm->msize;
2241       size -= pm->msize;
2242       if (NULL == pm->pml)
2243         {
2244           GNUNET_CONTAINER_DLL_remove (cp->pending_messages_head,
2245                                        cp->pending_messages_tail,
2246                                        pm);
2247           cp->pending_requests--;
2248         }
2249       destroy_pending_message (pm, cp->pid);
2250     }
2251   if (pm != NULL)
2252     min_delay = GNUNET_TIME_UNIT_ZERO;
2253   if (NULL != cp->pending_messages_head)
2254     {     
2255       GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == cp->delayed_transmission_request_task);
2256       cp->delayed_transmission_request_task
2257         = GNUNET_SCHEDULER_add_delayed (min_delay,
2258                                         &delayed_transmission_request,
2259                                         cp);
2260     }
2261   if (pm == NULL)
2262     {      
2263       GNUNET_PEER_resolve (cp->pid,
2264                            &pid);
2265       next = mig_head;
2266       while (NULL != (mb = next))
2267         {
2268           next = mb->next;
2269           for (i=0;i<MIGRATION_LIST_SIZE;i++)
2270             {
2271               if ( (cp->pid == mb->target_list[i]) &&
2272                    (mb->size + sizeof (migm) <= size) )
2273                 {
2274                   GNUNET_PEER_change_rc (mb->target_list[i], -1);
2275                   mb->target_list[i] = 0;
2276                   mb->used_targets++;
2277                   memset (&migm, 0, sizeof (migm));
2278                   migm.header.size = htons (sizeof (migm) + mb->size);
2279                   migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2280                   migm.type = htonl (mb->type);
2281                   migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
2282                   memcpy (&cbuf[msize], &migm, sizeof (migm));
2283                   msize += sizeof (migm);
2284                   size -= sizeof (migm);
2285                   memcpy (&cbuf[msize], &mb[1], mb->size);
2286                   msize += mb->size;
2287                   size -= mb->size;
2288 #if DEBUG_FS
2289                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2290                               "Pushing migration block `%s' (%u bytes) to `%s'\n",
2291                               GNUNET_h2s (&mb->query),
2292                               (unsigned int) mb->size,
2293                               GNUNET_i2s (&pid));
2294 #endif    
2295                   break;
2296                 }
2297               else
2298                 {
2299 #if DEBUG_FS
2300                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2301                               "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
2302                               GNUNET_h2s (&mb->query),
2303                               (unsigned int) mb->size,
2304                               GNUNET_i2s (&pid));
2305 #endif    
2306                 }
2307             }
2308           if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
2309                (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
2310             {
2311               delete_migration_block (mb);
2312               consider_migration_gathering ();
2313             }
2314         }
2315       consider_migration (NULL, 
2316                           &pid.hashPubKey,
2317                           cp);
2318     }
2319 #if DEBUG_FS
2320   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2321               "Transmitting %u bytes to peer with PID %u\n",
2322               (unsigned int) msize,
2323               (unsigned int) cp->pid);
2324 #endif
2325   return msize;
2326 }
2327
2328
2329 /**
2330  * Add a message to the set of pending messages for the given peer.
2331  *
2332  * @param cp peer to send message to
2333  * @param pm message to queue
2334  * @param pr request on which behalf this message is being queued
2335  */
2336 static void
2337 add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
2338                                   struct PendingMessage *pm,
2339                                   struct PendingRequest *pr)
2340 {
2341   struct PendingMessage *pos;
2342   struct PendingMessageList *pml;
2343   struct GNUNET_PeerIdentity pid;
2344
2345   GNUNET_assert (pm->next == NULL);
2346   GNUNET_assert (pm->pml == NULL);    
2347   if (pr != NULL)
2348     {
2349       pml = GNUNET_malloc (sizeof (struct PendingMessageList));
2350       pml->req = pr;
2351       pml->target = cp;
2352       pml->pm = pm;
2353       pm->pml = pml;  
2354       GNUNET_CONTAINER_DLL_insert (pr->pending_head,
2355                                    pr->pending_tail,
2356                                    pml);
2357     }
2358   pos = cp->pending_messages_head;
2359   while ( (pos != NULL) &&
2360           (pm->priority < pos->priority) )
2361     pos = pos->next;    
2362   GNUNET_CONTAINER_DLL_insert_after (cp->pending_messages_head,
2363                                      cp->pending_messages_tail,
2364                                      pos,
2365                                      pm);
2366   cp->pending_requests++;
2367   if (cp->pending_requests > MAX_QUEUE_PER_PEER)
2368     {
2369       GNUNET_STATISTICS_update (stats,
2370                                 gettext_noop ("# P2P searches discarded (queue length bound)"),
2371                                 1,
2372                                 GNUNET_NO);
2373       destroy_pending_message (cp->pending_messages_tail, 0);  
2374     }
2375   GNUNET_PEER_resolve (cp->pid, &pid);
2376   if (NULL != cp->cth)
2377     {
2378       GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
2379       cp->cth = NULL;
2380     }
2381   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
2382     {
2383       GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task);
2384       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
2385     }
2386   /* need to schedule transmission */
2387   cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
2388   cp->cth = GNUNET_CORE_notify_transmit_ready (core,
2389                                                cp->pending_messages_head->priority,
2390                                                MAX_TRANSMIT_DELAY,
2391                                                &pid,
2392                                                cp->pending_messages_head->msize,
2393                                                &transmit_to_peer,
2394                                                cp);
2395   if (cp->cth == NULL)
2396     {
2397 #if DEBUG_FS
2398       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2399                   "Failed to schedule transmission with core!\n");
2400 #endif
2401       GNUNET_STATISTICS_update (stats,
2402                                 gettext_noop ("# CORE transmission failures"),
2403                                 1,
2404                                 GNUNET_NO);
2405     }
2406 }
2407
2408
2409 /**
2410  * Test if the DATABASE (GET) load on this peer is too high
2411  * to even consider processing the query at
2412  * all.  
2413  * 
2414  * @return GNUNET_YES if the load is too high to do anything (load high)
2415  *         GNUNET_NO to process normally (load normal)
2416  *         GNUNET_SYSERR to process for free (load low)
2417  */
2418 static int
2419 test_get_load_too_high (uint32_t priority)
2420 {
2421   double ld;
2422
2423   ld = GNUNET_LOAD_get_load (datastore_get_load);
2424   if (ld < 1)
2425     return GNUNET_SYSERR;    
2426   if (ld <= priority)    
2427     return GNUNET_NO;    
2428   return GNUNET_YES;
2429 }
2430
2431
2432
2433
2434 /**
2435  * Test if the DATABASE (PUT) load on this peer is too high
2436  * to even consider processing the query at
2437  * all.  
2438  * 
2439  * @return GNUNET_YES if the load is too high to do anything (load high)
2440  *         GNUNET_NO to process normally (load normal or low)
2441  */
2442 static int
2443 test_put_load_too_high (uint32_t priority)
2444 {
2445   double ld;
2446
2447   if (GNUNET_LOAD_get_average (datastore_put_load) < 50)
2448     return GNUNET_NO; /* very fast */
2449   ld = GNUNET_LOAD_get_load (datastore_put_load);
2450   if (ld < 2.0 * (1 + priority))
2451     return GNUNET_NO;
2452   GNUNET_STATISTICS_update (stats,
2453                             gettext_noop ("# storage requests dropped due to high load"),
2454                             1,
2455                             GNUNET_NO);
2456   return GNUNET_YES;
2457 }
2458
2459
2460 /* ******************* Pending Request Refresh Task ******************** */
2461
2462
2463
2464 /**
2465  * We use a random delay to make the timing of requests less
2466  * predictable.  This function returns such a random delay.  We add a base
2467  * delay of MAX_CORK_DELAY (1s).
2468  *
2469  * FIXME: make schedule dependent on the specifics of the request?
2470  * Or bandwidth and number of connected peers and load?
2471  *
2472  * @return random delay to use for some request, between 1s and 1000+TTL_DECREMENT ms
2473  */
2474 static struct GNUNET_TIME_Relative
2475 get_processing_delay ()
2476 {
2477   return 
2478     GNUNET_TIME_relative_add (GNUNET_CONSTANTS_MAX_CORK_DELAY,
2479                               GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
2480                                                              GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2481                                                                                        TTL_DECREMENT)));
2482 }
2483
2484
2485 /**
2486  * We're processing a GET request from another peer and have decided
2487  * to forward it to other peers.  This function is called periodically
2488  * and should forward the request to other peers until we have all
2489  * possible replies.  If we have transmitted the *only* reply to
2490  * the initiator we should destroy the pending request.  If we have
2491  * many replies in the queue to the initiator, we should delay sending
2492  * out more queries until the reply queue has shrunk some.
2493  *
2494  * @param cls our "struct ProcessGetContext *"
2495  * @param tc unused
2496  */
2497 static void
2498 forward_request_task (void *cls,
2499                       const struct GNUNET_SCHEDULER_TaskContext *tc);
2500
2501
2502 /**
2503  * Function called after we either failed or succeeded
2504  * at transmitting a query to a peer.  
2505  *
2506  * @param cls the requests "struct PendingRequest*"
2507  * @param tpid ID of receiving peer, 0 on transmission error
2508  */
2509 static void
2510 transmit_query_continuation (void *cls,
2511                              GNUNET_PEER_Id tpid)
2512 {
2513   struct PendingRequest *pr = cls;
2514   unsigned int i;
2515
2516   GNUNET_STATISTICS_update (stats,
2517                             gettext_noop ("# queries scheduled for forwarding"),
2518                             -1,
2519                             GNUNET_NO);
2520   if (tpid == 0)   
2521     {
2522 #if DEBUG_FS
2523       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2524                   "Transmission of request failed, will try again later.\n");
2525 #endif
2526       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2527         pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2528                                                  &forward_request_task,
2529                                                  pr); 
2530       return;    
2531     }
2532 #if DEBUG_FS
2533   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2534               "Transmitted query `%s'\n",
2535               GNUNET_h2s (&pr->query));
2536 #endif
2537   GNUNET_STATISTICS_update (stats,
2538                             gettext_noop ("# queries forwarded"),
2539                             1,
2540                             GNUNET_NO);
2541   for (i=0;i<pr->used_targets_off;i++)
2542     if (pr->used_targets[i].pid == tpid)
2543       break; /* found match! */    
2544   if (i == pr->used_targets_off)
2545     {
2546       /* need to create new entry */
2547       if (pr->used_targets_off == pr->used_targets_size)
2548         GNUNET_array_grow (pr->used_targets,
2549                            pr->used_targets_size,
2550                            pr->used_targets_size * 2 + 2);
2551       GNUNET_PEER_change_rc (tpid, 1);
2552       pr->used_targets[pr->used_targets_off].pid = tpid;
2553       pr->used_targets[pr->used_targets_off].num_requests = 0;
2554       i = pr->used_targets_off++;
2555     }
2556   pr->used_targets[i].last_request_time = GNUNET_TIME_absolute_get ();
2557   pr->used_targets[i].num_requests++;
2558   if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2559     pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2560                                              &forward_request_task,
2561                                              pr);
2562 }
2563
2564
2565 /**
2566  * How many bytes should a bloomfilter be if we have already seen
2567  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
2568  * of bits set per entry.  Furthermore, we should not re-size the
2569  * filter too often (to keep it cheap).
2570  *
2571  * Since other peers will also add entries but not resize the filter,
2572  * we should generally pick a slightly larger size than what the
2573  * strict math would suggest.
2574  *
2575  * @return must be a power of two and smaller or equal to 2^15.
2576  */
2577 static size_t
2578 compute_bloomfilter_size (unsigned int entry_count)
2579 {
2580   size_t size;
2581   unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
2582   uint16_t max = 1 << 15;
2583
2584   if (entry_count > max)
2585     return max;
2586   size = 8;
2587   while ((size < max) && (size < ideal))
2588     size *= 2;
2589   if (size > max)
2590     return max;
2591   return size;
2592 }
2593
2594
2595 /**
2596  * Recalculate our bloom filter for filtering replies.  This function
2597  * will create a new bloom filter from scratch, so it should only be
2598  * called if we have no bloomfilter at all (and hence can create a
2599  * fresh one of minimal size without problems) OR if our peer is the
2600  * initiator (in which case we may resize to larger than mimimum size).
2601  *
2602  * @param pr request for which the BF is to be recomputed
2603  */
2604 static void
2605 refresh_bloomfilter (struct PendingRequest *pr)
2606 {
2607   unsigned int i;
2608   size_t nsize;
2609   GNUNET_HashCode mhash;
2610
2611   nsize = compute_bloomfilter_size (pr->replies_seen_off);
2612   if (nsize == pr->bf_size)
2613     return; /* size not changed */
2614   if (pr->bf != NULL)
2615     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
2616   pr->bf_size = nsize;
2617   pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
2618   pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
2619                                               pr->bf_size,
2620                                               BLOOMFILTER_K);
2621   for (i=0;i<pr->replies_seen_off;i++)
2622     {
2623       GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i],
2624                                 pr->mingle,
2625                                 &mhash);
2626       GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
2627     }
2628 }
2629
2630
2631 /**
2632  * Function called after we've tried to reserve a certain amount of
2633  * bandwidth for a reply.  Check if we succeeded and if so send our
2634  * query.
2635  *
2636  * @param cls the requests "struct PendingRequest*"
2637  * @param peer identifies the peer
2638  * @param bpm_out set to the current bandwidth limit (sending) for this peer
2639  * @param amount set to the amount that was actually reserved or unreserved
2640  * @param preference current traffic preference for the given peer
2641  */
2642 static void
2643 target_reservation_cb (void *cls,
2644                        const struct
2645                        GNUNET_PeerIdentity * peer,
2646                        struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
2647                        int amount,
2648                        uint64_t preference)
2649 {
2650   struct PendingRequest *pr = cls;
2651   struct ConnectedPeer *cp;
2652   struct PendingMessage *pm;
2653   struct GetMessage *gm;
2654   GNUNET_HashCode *ext;
2655   char *bfdata;
2656   size_t msize;
2657   unsigned int k;
2658   int no_route;
2659   uint32_t bm;
2660   unsigned int i;
2661
2662   /* (3) transmit, update ttl/priority */
2663   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2664                                           &peer->hashPubKey);
2665   if (cp == NULL)
2666     {
2667       /* Peer must have just left */
2668 #if DEBUG_FS
2669       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2670                   "Selected peer disconnected!\n");
2671 #endif
2672       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2673         pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2674                                                  &forward_request_task,
2675                                                  pr);
2676       return;
2677     }
2678   cp->irc = NULL;
2679   pr->pirc = NULL;
2680   if (peer == NULL)
2681     {
2682       /* error in communication with core, try again later */
2683       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2684         pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2685                                                  &forward_request_task,
2686                                                  pr);
2687       return;
2688     }
2689   no_route = GNUNET_NO;
2690   if (amount == 0)
2691     {
2692       if (pr->cp == NULL)
2693         {
2694 #if DEBUG_FS > 1
2695           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2696                       "Failed to reserve bandwidth for reply (got %d/%u bytes only)!\n",
2697                       amount,
2698                       DBLOCK_SIZE);
2699 #endif
2700           GNUNET_STATISTICS_update (stats,
2701                                     gettext_noop ("# reply bandwidth reservation requests failed"),
2702                                     1,
2703                                     GNUNET_NO);
2704           if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2705             pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2706                                                      &forward_request_task,
2707                                                      pr);
2708           return;  /* this target round failed */
2709         }
2710       no_route = GNUNET_YES;
2711     }
2712   
2713   GNUNET_STATISTICS_update (stats,
2714                             gettext_noop ("# queries scheduled for forwarding"),
2715                             1,
2716                             GNUNET_NO);
2717   for (i=0;i<pr->used_targets_off;i++)
2718     if (pr->used_targets[i].pid == cp->pid) 
2719       {
2720         GNUNET_STATISTICS_update (stats,
2721                                   gettext_noop ("# queries retransmitted to same target"),
2722                                   1,
2723                                   GNUNET_NO);
2724         break;
2725       } 
2726
2727   /* build message and insert message into priority queue */
2728 #if DEBUG_FS
2729   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2730               "Forwarding request `%s' to `%4s'!\n",
2731               GNUNET_h2s (&pr->query),
2732               GNUNET_i2s (peer));
2733 #endif
2734   k = 0;
2735   bm = 0;
2736   if (GNUNET_YES == no_route)
2737     {
2738       bm |= GET_MESSAGE_BIT_RETURN_TO;
2739       k++;      
2740     }
2741   if (pr->namespace != NULL)
2742     {
2743       bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
2744       k++;
2745     }
2746   if (pr->target_pid != 0)
2747     {
2748       bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
2749       k++;
2750     }
2751   msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
2752   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
2753   pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
2754   pm->msize = msize;
2755   gm = (struct GetMessage*) &pm[1];
2756   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
2757   gm->header.size = htons (msize);
2758   gm->type = htonl (pr->type);
2759   pr->remaining_priority /= 2;
2760   gm->priority = htonl (pr->remaining_priority);
2761   gm->ttl = htonl (pr->ttl);
2762   gm->filter_mutator = htonl(pr->mingle); 
2763   gm->hash_bitmap = htonl (bm);
2764   gm->query = pr->query;
2765   ext = (GNUNET_HashCode*) &gm[1];
2766   k = 0;
2767   if (GNUNET_YES == no_route)
2768     GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2769   if (pr->namespace != NULL)
2770     memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
2771   if (pr->target_pid != 0)
2772     GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2773   bfdata = (char *) &ext[k];
2774   if (pr->bf != NULL)
2775     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
2776                                                bfdata,
2777                                                pr->bf_size);
2778   pm->cont = &transmit_query_continuation;
2779   pm->cont_cls = pr;
2780   cp->last_request_times[(cp->last_request_times_off++) % MAX_QUEUE_PER_PEER] = GNUNET_TIME_absolute_get ();
2781   add_to_pending_messages_for_peer (cp, pm, pr);
2782 }
2783
2784
2785 /**
2786  * Closure used for "target_peer_select_cb".
2787  */
2788 struct PeerSelectionContext 
2789 {
2790   /**
2791    * The request for which we are selecting
2792    * peers.
2793    */
2794   struct PendingRequest *pr;
2795
2796   /**
2797    * Current "prime" target.
2798    */
2799   struct GNUNET_PeerIdentity target;
2800
2801   /**
2802    * How much do we like this target?
2803    */
2804   double target_score;
2805
2806   /**
2807    * Does it make sense to we re-try quickly again?
2808    */
2809   int fast_retry;
2810
2811 };
2812
2813
2814 /**
2815  * Function called for each connected peer to determine
2816  * which one(s) would make good targets for forwarding.
2817  *
2818  * @param cls closure (struct PeerSelectionContext)
2819  * @param key current key code (peer identity)
2820  * @param value value in the hash map (struct ConnectedPeer)
2821  * @return GNUNET_YES if we should continue to
2822  *         iterate,
2823  *         GNUNET_NO if not.
2824  */
2825 static int
2826 target_peer_select_cb (void *cls,
2827                        const GNUNET_HashCode * key,
2828                        void *value)
2829 {
2830   struct PeerSelectionContext *psc = cls;
2831   struct ConnectedPeer *cp = value;
2832   struct PendingRequest *pr = psc->pr;
2833   struct GNUNET_TIME_Relative delay;
2834   double score;
2835   unsigned int i;
2836   unsigned int pc;
2837
2838   /* 1) check that this peer is not the initiator */
2839   if (cp == pr->cp)     
2840     {
2841 #if DEBUG_FS
2842       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2843                   "Skipping initiator in forwarding selection\n");
2844 #endif
2845       return GNUNET_YES; /* skip */        
2846     }
2847   if (cp->irc != NULL)
2848     {
2849       psc->fast_retry = GNUNET_YES;
2850       return GNUNET_YES; /* skip: already querying core about this peer for other reasons */
2851     }
2852
2853   /* 2) check if we have already (recently) forwarded to this peer */
2854   /* 2a) this particular request */
2855   pc = 0;
2856   for (i=0;i<pr->used_targets_off;i++)
2857     if (pr->used_targets[i].pid == cp->pid) 
2858       {
2859         pc = pr->used_targets[i].num_requests;
2860         GNUNET_assert (pc > 0);
2861         /* FIXME: make re-enabling a peer independent of how often
2862            this function is called??? */
2863         if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2864                                            RETRY_PROBABILITY_INV * pc))
2865           {
2866 #if DEBUG_FS
2867             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2868                         "NOT re-trying query that was previously transmitted %u times\n",
2869                         (unsigned int) pc);
2870 #endif
2871             return GNUNET_YES; /* skip */
2872           }
2873         break;
2874       }
2875 #if DEBUG_FS
2876   if (0 < pc)
2877     {
2878       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2879                   "Re-trying query that was previously transmitted %u times to this peer\n",
2880                   (unsigned int) pc);
2881     }
2882 #endif
2883   /* 2b) many other requests to this peer */
2884   delay = GNUNET_TIME_absolute_get_duration (cp->last_request_times[cp->last_request_times_off % MAX_QUEUE_PER_PEER]);
2885   if (delay.rel_value <= cp->avg_delay.rel_value)
2886     {
2887 #if DEBUG_FS
2888       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2889                   "NOT sending query since we send %u others to this peer in the last %llums\n",
2890                   MAX_QUEUE_PER_PEER,
2891                   cp->avg_delay.rel_value);
2892 #endif
2893       return GNUNET_YES; /* skip */      
2894     }
2895
2896   /* 3) calculate how much we'd like to forward to this peer,
2897      starting with a random value that is strong enough
2898      to at least give any peer a chance sometimes 
2899      (compared to the other factors that come later) */
2900   /* 3a) count successful (recent) routes from cp for same source */
2901   if (pr->cp != NULL)
2902     {
2903       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2904                                         P2P_SUCCESS_LIST_SIZE);
2905       for (i=0;i<P2P_SUCCESS_LIST_SIZE;i++)
2906         if (cp->last_p2p_replies[i] == pr->cp->pid)
2907           score += 1.0; /* likely successful based on hot path */
2908     }
2909   else
2910     {
2911       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2912                                         CS2P_SUCCESS_LIST_SIZE);
2913       for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
2914         if (cp->last_client_replies[i] == pr->client_request_list->client_list->client)
2915           score += 1.0; /* likely successful based on hot path */
2916     }
2917   /* 3b) include latency */
2918   if (cp->avg_delay.rel_value < 4 * TTL_DECREMENT)
2919     score += 1.0; /* likely fast based on latency */
2920   /* 3c) include priorities */
2921   if (cp->avg_priority <= pr->remaining_priority / 2.0)
2922     score += 1.0; /* likely successful based on priorities */
2923   /* 3d) penalize for queue size */  
2924   score -= (2.0 * cp->pending_requests / (double) MAX_QUEUE_PER_PEER); 
2925   /* 3e) include peer proximity */
2926   score -= (2.0 * (GNUNET_CRYPTO_hash_distance_u32 (key,
2927                                                     &pr->query)) / (double) UINT32_MAX);
2928   /* 4) super-bonus for being the known target */
2929   if (pr->target_pid == cp->pid)
2930     score += 100.0;
2931   /* store best-fit in closure */
2932   score++; /* avoid zero */
2933   if (score > psc->target_score)
2934     {
2935       psc->target_score = score;
2936       psc->target.hashPubKey = *key; 
2937     }
2938 #if DEBUG_FS
2939   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2940               "Peer `%s' gets score %f for forwarding query, max is %8f\n",
2941               GNUNET_h2s (key),
2942               score,
2943               psc->target_score);
2944 #endif
2945   return GNUNET_YES;
2946 }
2947   
2948
2949 /**
2950  * The priority level imposes a bound on the maximum
2951  * value for the ttl that can be requested.
2952  *
2953  * @param ttl_in requested ttl
2954  * @param prio given priority
2955  * @return ttl_in if ttl_in is below the limit,
2956  *         otherwise the ttl-limit for the given priority
2957  */
2958 static int32_t
2959 bound_ttl (int32_t ttl_in, uint32_t prio)
2960 {
2961   unsigned long long allowed;
2962
2963   if (ttl_in <= 0)
2964     return ttl_in;
2965   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
2966   if (ttl_in > allowed)      
2967     {
2968       if (allowed >= (1 << 30))
2969         return 1 << 30;
2970       return allowed;
2971     }
2972   return ttl_in;
2973 }
2974
2975
2976 /**
2977  * Iterator called on each result obtained for a DHT
2978  * operation that expects a reply
2979  *
2980  * @param cls closure
2981  * @param exp when will this value expire
2982  * @param key key of the result
2983  * @param get_path NULL-terminated array of pointers
2984  *                 to the peers on reverse GET path (or NULL if not recorded)
2985  * @param put_path NULL-terminated array of pointers
2986  *                 to the peers on the PUT path (or NULL if not recorded)
2987  * @param type type of the result
2988  * @param size number of bytes in data
2989  * @param data pointer to the result data
2990  */
2991 static void
2992 process_dht_reply (void *cls,
2993                    struct GNUNET_TIME_Absolute exp,
2994                    const GNUNET_HashCode * key,
2995                    const struct GNUNET_PeerIdentity * const *get_path,
2996                    const struct GNUNET_PeerIdentity * const *put_path,
2997                    enum GNUNET_BLOCK_Type type,
2998                    size_t size,
2999                    const void *data);
3000
3001
3002 /**
3003  * We're processing a GET request and have decided
3004  * to forward it to other peers.  This function is called periodically
3005  * and should forward the request to other peers until we have all
3006  * possible replies.  If we have transmitted the *only* reply to
3007  * the initiator we should destroy the pending request.  If we have
3008  * many replies in the queue to the initiator, we should delay sending
3009  * out more queries until the reply queue has shrunk some.
3010  *
3011  * @param cls our "struct ProcessGetContext *"
3012  * @param tc unused
3013  */
3014 static void
3015 forward_request_task (void *cls,
3016                      const struct GNUNET_SCHEDULER_TaskContext *tc)
3017 {
3018   struct PendingRequest *pr = cls;
3019   struct PeerSelectionContext psc;
3020   struct ConnectedPeer *cp; 
3021   struct GNUNET_TIME_Relative delay;
3022
3023   pr->task = GNUNET_SCHEDULER_NO_TASK;
3024   if (pr->pirc != NULL)
3025     {
3026 #if DEBUG_FS
3027       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3028                   "Forwarding of query `%s' not attempted due to pending local lookup!\n",
3029                   GNUNET_h2s (&pr->query));
3030 #endif
3031       return; /* already pending */
3032     }
3033   if (GNUNET_YES == pr->local_only)
3034     return; /* configured to not do P2P search */
3035   /* (0) try DHT */
3036   if ( (0 == pr->anonymity_level) &&
3037        (GNUNET_YES != pr->forward_only) &&
3038        (pr->type != GNUNET_BLOCK_TYPE_FS_DBLOCK) &&
3039        (pr->type != GNUNET_BLOCK_TYPE_FS_IBLOCK) )
3040     {
3041       pr->dht_get = GNUNET_DHT_get_start (dht_handle,
3042                                           GNUNET_TIME_UNIT_FOREVER_REL,
3043                                           pr->type,
3044                                           &pr->query,
3045                                           DEFAULT_GET_REPLICATION,
3046                                           GNUNET_DHT_RO_NONE,
3047                                           pr->bf,
3048                                           pr->mingle,
3049                                           pr->namespace,
3050                                           (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
3051                                           &process_dht_reply,
3052                                           pr);
3053     }
3054
3055   if ( (pr->anonymity_level > 1) &&
3056        (cover_query_count < pr->anonymity_level - 1) )
3057     {
3058       delay = get_processing_delay ();
3059 #if DEBUG_FS 
3060       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3061                   "Not enough cover traffic to forward query `%s', will try again in %llu ms!\n",
3062                   GNUNET_h2s (&pr->query),
3063                   delay.rel_value);
3064 #endif
3065       pr->task = GNUNET_SCHEDULER_add_delayed (delay,
3066                                                &forward_request_task,
3067                                                pr);
3068       return;
3069     }
3070   /* consume cover traffic */
3071   if (pr->anonymity_level > 1) 
3072     cover_query_count -= pr->anonymity_level - 1;
3073
3074   /* (1) select target */
3075   psc.pr = pr;
3076   psc.target_score = -DBL_MAX;
3077   psc.fast_retry = GNUNET_NO;
3078   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
3079                                          &target_peer_select_cb,
3080                                          &psc);  
3081   if (psc.target_score == -DBL_MAX)
3082     {
3083       if (psc.fast_retry == GNUNET_YES)
3084         delay = GNUNET_TIME_UNIT_MILLISECONDS; /* FIXME: store adaptive fast-retry value in 'pr' */
3085       else
3086         delay = get_processing_delay ();
3087 #if DEBUG_FS 
3088       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3089                   "No peer selected for forwarding of query `%s', will try again in %llu ms!\n",
3090                   GNUNET_h2s (&pr->query),
3091                   delay.rel_value);
3092 #endif
3093       pr->task = GNUNET_SCHEDULER_add_delayed (delay,
3094                                                &forward_request_task,
3095                                                pr);
3096       return; /* nobody selected */
3097     }
3098   /* (3) update TTL/priority */
3099   if (pr->client_request_list != NULL)
3100     {
3101       /* FIXME: use better algorithm!? */
3102       if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3103                                          4))
3104         pr->priority++;
3105       /* bound priority we use by priorities we see from other peers
3106          rounded up (must round up so that we can see non-zero
3107          priorities, but round up as little as possible to make it
3108          plausible that we forwarded another peers request) */
3109       if (pr->priority > current_priorities + 1.0)
3110         pr->priority = (uint32_t) current_priorities + 1.0;
3111       pr->ttl = bound_ttl (pr->ttl + TTL_DECREMENT * 2,
3112                            pr->priority);
3113 #if DEBUG_FS
3114       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3115                   "Trying query `%s' with priority %u and TTL %d.\n",
3116                   GNUNET_h2s (&pr->query),
3117                   pr->priority,
3118                   pr->ttl);
3119 #endif
3120     }
3121
3122   /* (3) reserve reply bandwidth */
3123   if (GNUNET_NO == pr->forward_only)
3124     {
3125       cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3126                                               &psc.target.hashPubKey);
3127       GNUNET_assert (NULL != cp);
3128       GNUNET_assert (cp->irc == NULL);
3129       pr->pirc = cp;
3130       cp->pr = pr;
3131       cp->irc = GNUNET_CORE_peer_change_preference (core,
3132                                                     &psc.target,
3133                                                     GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
3134                                                     GNUNET_BANDWIDTH_value_init (UINT32_MAX),
3135                                                     DBLOCK_SIZE * 2, 
3136                                                     cp->inc_preference,
3137                                                     &target_reservation_cb,
3138                                                     pr);
3139       GNUNET_assert (cp->irc != NULL);
3140       cp->inc_preference = 0;
3141     }
3142   else
3143     {
3144       /* force forwarding */
3145       static struct GNUNET_BANDWIDTH_Value32NBO zerobw;
3146       target_reservation_cb (pr, &psc.target,
3147                              zerobw, 0, 0.0);
3148     }
3149 }
3150
3151
3152 /* **************************** P2P PUT Handling ************************ */
3153
3154
3155 /**
3156  * Function called after we either failed or succeeded
3157  * at transmitting a reply to a peer.  
3158  *
3159  * @param cls the requests "struct PendingRequest*"
3160  * @param tpid ID of receiving peer, 0 on transmission error
3161  */
3162 static void
3163 transmit_reply_continuation (void *cls,
3164                              GNUNET_PEER_Id tpid)
3165 {
3166   struct PendingRequest *pr = cls;
3167   
3168   switch (pr->type)
3169     {
3170     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
3171     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
3172       /* only one reply expected, done with the request! */
3173       destroy_pending_request (pr);
3174       break;
3175     case GNUNET_BLOCK_TYPE_ANY:
3176     case GNUNET_BLOCK_TYPE_FS_KBLOCK:
3177     case GNUNET_BLOCK_TYPE_FS_SBLOCK:
3178       break;
3179     default:
3180       GNUNET_break (0);
3181       break;
3182     }
3183 }
3184
3185
3186 /**
3187  * Transmit the given message by copying it to the target buffer
3188  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
3189  * for writing in the meantime.  In that case, do nothing
3190  * (the disconnect or shutdown handler will take care of the rest).
3191  * If we were able to transmit messages and there are still more
3192  * pending, ask core again for further calls to this function.
3193  *
3194  * @param cls closure, pointer to the 'struct ClientList*'
3195  * @param size number of bytes available in buf
3196  * @param buf where the callee should write the message
3197  * @return number of bytes written to buf
3198  */
3199 static size_t
3200 transmit_to_client (void *cls,
3201                   size_t size, void *buf)
3202 {
3203   struct ClientList *cl = cls;
3204   char *cbuf = buf;
3205   struct ClientResponseMessage *creply;
3206   size_t msize;
3207   
3208   cl->th = NULL;
3209   if (NULL == buf)
3210     {
3211 #if DEBUG_FS
3212       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3213                   "Not sending reply, client communication problem.\n");
3214 #endif
3215       return 0;
3216     }
3217   msize = 0;
3218   while ( (NULL != (creply = cl->res_head) ) &&
3219           (creply->msize <= size) )
3220     {
3221       memcpy (&cbuf[msize], &creply[1], creply->msize);
3222       msize += creply->msize;
3223       size -= creply->msize;
3224       GNUNET_CONTAINER_DLL_remove (cl->res_head,
3225                                    cl->res_tail,
3226                                    creply);
3227       GNUNET_free (creply);
3228     }
3229   if (NULL != creply)
3230     cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
3231                                                   creply->msize,
3232                                                   GNUNET_TIME_UNIT_FOREVER_REL,
3233                                                   &transmit_to_client,
3234                                                   cl);
3235 #if DEBUG_FS
3236   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3237               "Transmitted %u bytes to client\n",
3238               (unsigned int) msize);
3239 #endif
3240   return msize;
3241 }
3242
3243
3244 /**
3245  * Closure for "process_reply" function.
3246  */
3247 struct ProcessReplyClosure
3248 {
3249   /**
3250    * The data for the reply.
3251    */
3252   const void *data;
3253
3254   /**
3255    * Who gave us this reply? NULL for local host (or DHT)
3256    */
3257   struct ConnectedPeer *sender;
3258
3259   /**
3260    * When the reply expires.
3261    */
3262   struct GNUNET_TIME_Absolute expiration;
3263
3264   /**
3265    * Size of data.
3266    */
3267   size_t size;
3268
3269   /**
3270    * Type of the block.
3271    */
3272   enum GNUNET_BLOCK_Type type;
3273
3274   /**
3275    * How much was this reply worth to us?
3276    */
3277   uint32_t priority;
3278
3279   /**
3280    * Anonymity requirements for this reply.
3281    */
3282   uint32_t anonymity_level;
3283
3284   /**
3285    * Evaluation result (returned).
3286    */
3287   enum GNUNET_BLOCK_EvaluationResult eval;
3288
3289   /**
3290    * Did we finish processing the associated request?
3291    */ 
3292   int finished;
3293
3294   /**
3295    * Did we find a matching request?
3296    */
3297   int request_found;
3298 };
3299
3300
3301 /**
3302  * We have received a reply; handle it!
3303  *
3304  * @param cls response (struct ProcessReplyClosure)
3305  * @param key our query
3306  * @param value value in the hash map (info about the query)
3307  * @return GNUNET_YES (we should continue to iterate)
3308  */
3309 static int
3310 process_reply (void *cls,
3311                const GNUNET_HashCode * key,
3312                void *value)
3313 {
3314   struct ProcessReplyClosure *prq = cls;
3315   struct PendingRequest *pr = value;
3316   struct PendingMessage *reply;
3317   struct ClientResponseMessage *creply;
3318   struct ClientList *cl;
3319   struct PutMessage *pm;
3320   struct ConnectedPeer *cp;
3321   struct GNUNET_TIME_Relative cur_delay;
3322 #if SUPPORT_DELAYS  
3323 struct GNUNET_TIME_Relative art_delay;
3324 #endif
3325   size_t msize;
3326   unsigned int i;
3327
3328   if (NULL == pr->client_request_list)
3329     {
3330       /* reply will go over the network, check for cover traffic */
3331       if ( (prq->anonymity_level >  1) &&
3332            (cover_content_count < prq->anonymity_level - 1) )
3333         {
3334           /* insufficient cover traffic, skip */
3335           GNUNET_STATISTICS_update (stats,
3336                                     gettext_noop ("# replies suppressed due to lack of cover traffic"),
3337                                     1,
3338                                     GNUNET_NO);
3339           return GNUNET_YES;
3340         }       
3341       if (prq->anonymity_level >  1) 
3342         cover_content_count -= prq->anonymity_level - 1;
3343     }
3344 #if DEBUG_FS
3345   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3346               "Matched result (type %u) for query `%s' with pending request\n",
3347               (unsigned int) prq->type,
3348               GNUNET_h2s (key));
3349 #endif  
3350   GNUNET_STATISTICS_update (stats,
3351                             gettext_noop ("# replies received and matched"),
3352                             1,
3353                             GNUNET_NO);
3354   if (prq->sender != NULL)
3355     {
3356       for (i=0;i<pr->used_targets_off;i++)
3357         if (pr->used_targets[i].pid == prq->sender->pid)
3358           break;
3359       if (i < pr->used_targets_off)
3360         {
3361           cur_delay = GNUNET_TIME_absolute_get_duration (pr->used_targets[i].last_request_time);      
3362           prq->sender->avg_delay.rel_value
3363             = (prq->sender->avg_delay.rel_value * 
3364                (RUNAVG_DELAY_N - 1) + cur_delay.rel_value) / RUNAVG_DELAY_N; 
3365           prq->sender->avg_priority
3366             = (prq->sender->avg_priority * 
3367                (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
3368         }
3369       if (pr->cp != NULL)
3370         {
3371           GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
3372                                  [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], 
3373                                  -1);
3374           GNUNET_PEER_change_rc (pr->cp->pid, 1);
3375           prq->sender->last_p2p_replies
3376             [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
3377             = pr->cp->pid;
3378         }
3379       else
3380         {
3381           if (NULL != prq->sender->last_client_replies
3382               [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
3383             GNUNET_SERVER_client_drop (prq->sender->last_client_replies
3384                                        [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
3385           prq->sender->last_client_replies
3386             [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
3387             = pr->client_request_list->client_list->client;
3388           GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
3389         }
3390     }
3391   prq->eval = GNUNET_BLOCK_evaluate (block_ctx,
3392                                      prq->type,
3393                                      key,
3394                                      &pr->bf,
3395                                      pr->mingle,
3396                                      pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
3397                                      prq->data,
3398                                      prq->size);
3399   switch (prq->eval)
3400     {
3401     case GNUNET_BLOCK_EVALUATION_OK_MORE:
3402       break;
3403     case GNUNET_BLOCK_EVALUATION_OK_LAST:
3404       while (NULL != pr->pending_head)
3405         destroy_pending_message_list_entry (pr->pending_head);
3406       if (pr->qe != NULL)
3407         {
3408           if (pr->client_request_list != NULL)
3409             GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
3410                                         GNUNET_YES);
3411           GNUNET_DATASTORE_cancel (pr->qe);
3412           pr->qe = NULL;
3413         }
3414       pr->do_remove = GNUNET_YES;
3415       if (pr->task != GNUNET_SCHEDULER_NO_TASK)
3416         {
3417           GNUNET_SCHEDULER_cancel (pr->task);
3418           pr->task = GNUNET_SCHEDULER_NO_TASK;
3419         }
3420       GNUNET_break (GNUNET_YES ==
3421                     GNUNET_CONTAINER_multihashmap_remove (query_request_map,
3422                                                           key,
3423                                                           pr));
3424       GNUNET_LOAD_update (rt_entry_lifetime,
3425                           GNUNET_TIME_absolute_get_duration (pr->start_time).rel_value);
3426       break;
3427     case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
3428       GNUNET_STATISTICS_update (stats,
3429                                 gettext_noop ("# duplicate replies discarded (bloomfilter)"),
3430                                 1,
3431                                 GNUNET_NO);
3432 #if DEBUG_FS
3433 /*      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3434                   "Duplicate response `%s', discarding.\n",
3435                   GNUNET_h2s (&mhash));*/
3436 #endif
3437       return GNUNET_YES; /* duplicate */
3438     case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
3439       return GNUNET_YES; /* wrong namespace */  
3440     case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
3441       GNUNET_break (0);
3442       return GNUNET_YES;
3443     case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
3444       GNUNET_break (0);
3445       return GNUNET_YES;
3446     case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
3447       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3448                   _("Unsupported block type %u\n"),
3449                   prq->type);
3450       return GNUNET_NO;
3451     }
3452   if (pr->client_request_list != NULL)
3453     {
3454       if (pr->replies_seen_size == pr->replies_seen_off)
3455         GNUNET_array_grow (pr->replies_seen,
3456                            pr->replies_seen_size,
3457                            pr->replies_seen_size * 2 + 4);      
3458       GNUNET_CRYPTO_hash (prq->data,
3459                           prq->size,
3460                           &pr->replies_seen[pr->replies_seen_off++]);         
3461       refresh_bloomfilter (pr);
3462     }
3463   if (NULL == prq->sender)
3464     {
3465 #if DEBUG_FS
3466       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3467                   "Found result for query `%s' in local datastore\n",
3468                   GNUNET_h2s (key));
3469 #endif
3470       GNUNET_STATISTICS_update (stats,
3471                                 gettext_noop ("# results found locally"),
3472                                 1,
3473                                 GNUNET_NO);      
3474     }
3475   prq->priority += pr->remaining_priority;
3476   pr->remaining_priority = 0;
3477   pr->results_found++;
3478   prq->request_found = GNUNET_YES;
3479   if (NULL != pr->client_request_list)
3480     {
3481       GNUNET_STATISTICS_update (stats,
3482                                 gettext_noop ("# replies received for local clients"),
3483                                 1,
3484                                 GNUNET_NO);
3485       cl = pr->client_request_list->client_list;
3486       msize = sizeof (struct PutMessage) + prq->size;
3487       creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage));
3488       creply->msize = msize;
3489       creply->client_list = cl;
3490       GNUNET_CONTAINER_DLL_insert_after (cl->res_head,
3491                                          cl->res_tail,
3492                                          cl->res_tail,
3493                                          creply);      
3494       pm = (struct PutMessage*) &creply[1];
3495       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3496       pm->header.size = htons (msize);
3497       pm->type = htonl (prq->type);
3498       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3499       memcpy (&pm[1], prq->data, prq->size);      
3500       if (NULL == cl->th)
3501         {
3502 #if DEBUG_FS
3503           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3504                       "Transmitting result for query `%s' to client\n",
3505                       GNUNET_h2s (key));
3506 #endif  
3507           cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
3508                                                         msize,
3509                                                         GNUNET_TIME_UNIT_FOREVER_REL,
3510                                                         &transmit_to_client,
3511                                                         cl);
3512         }
3513       GNUNET_break (cl->th != NULL);
3514       if (pr->do_remove)                
3515         {
3516           prq->finished = GNUNET_YES;
3517           destroy_pending_request (pr);         
3518         }
3519     }
3520   else
3521     {
3522       cp = pr->cp;
3523 #if DEBUG_FS
3524       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3525                   "Transmitting result for query `%s' to other peer (PID=%u)\n",
3526                   GNUNET_h2s (key),
3527                   (unsigned int) cp->pid);
3528 #endif  
3529       GNUNET_STATISTICS_update (stats,
3530                                 gettext_noop ("# replies received for other peers"),
3531                                 1,
3532                                 GNUNET_NO);
3533       msize = sizeof (struct PutMessage) + prq->size;
3534       reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
3535       reply->cont = &transmit_reply_continuation;
3536       reply->cont_cls = pr;
3537 #if SUPPORT_DELAYS
3538       art_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
3539                                                  GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3540                                                                            TTL_DECREMENT));
3541       reply->delay_until 
3542         = GNUNET_TIME_relative_to_absolute (art_delay);
3543       GNUNET_STATISTICS_update (stats,
3544                                 gettext_noop ("cummulative artificial delay introduced (ms)"),
3545                                 art_delay.abs_value,
3546                                 GNUNET_NO);
3547 #endif
3548       reply->msize = msize;
3549       reply->priority = UINT32_MAX; /* send replies first! */
3550       pm = (struct PutMessage*) &reply[1];
3551       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3552       pm->header.size = htons (msize);
3553       pm->type = htonl (prq->type);
3554       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3555       memcpy (&pm[1], prq->data, prq->size);
3556       add_to_pending_messages_for_peer (cp, reply, pr);
3557     }
3558   return GNUNET_YES;
3559 }
3560
3561
3562 /**
3563  * Iterator called on each result obtained for a DHT
3564  * operation that expects a reply
3565  *
3566  * @param cls closure
3567  * @param exp when will this value expire
3568  * @param key key of the result
3569  * @param get_path NULL-terminated array of pointers
3570  *                 to the peers on reverse GET path (or NULL if not recorded)
3571  * @param put_path NULL-terminated array of pointers
3572  *                 to the peers on the PUT path (or NULL if not recorded)
3573  * @param type type of the result
3574  * @param size number of bytes in data
3575  * @param data pointer to the result data
3576  */
3577 static void
3578 process_dht_reply (void *cls,
3579                    struct GNUNET_TIME_Absolute exp,
3580                    const GNUNET_HashCode * key,
3581                    const struct GNUNET_PeerIdentity * const *get_path,
3582                    const struct GNUNET_PeerIdentity * const *put_path,
3583                    enum GNUNET_BLOCK_Type type,
3584                    size_t size,
3585                    const void *data)
3586 {
3587   struct PendingRequest *pr = cls;
3588   struct ProcessReplyClosure prq;
3589
3590   memset (&prq, 0, sizeof (prq));
3591   prq.data = data;
3592   prq.expiration = exp;
3593   prq.size = size;  
3594   prq.type = type;
3595   process_reply (&prq, key, pr);
3596 }
3597
3598
3599
3600 /**
3601  * Continuation called to notify client about result of the
3602  * operation.
3603  *
3604  * @param cls closure
3605  * @param success GNUNET_SYSERR on failure
3606  * @param msg NULL on success, otherwise an error message
3607  */
3608 static void 
3609 put_migration_continuation (void *cls,
3610                             int success,
3611                             const char *msg)
3612 {
3613   struct GNUNET_TIME_Absolute *start = cls;
3614   struct GNUNET_TIME_Relative delay;
3615   
3616   delay = GNUNET_TIME_absolute_get_duration (*start);
3617   GNUNET_free (start);
3618   GNUNET_LOAD_update (datastore_put_load,
3619                       delay.rel_value);
3620   if (GNUNET_OK == success)
3621     return;
3622   GNUNET_STATISTICS_update (stats,
3623                             gettext_noop ("# datastore 'put' failures"),
3624                             1,
3625                             GNUNET_NO);
3626 }
3627
3628
3629 /**
3630  * Handle P2P "PUT" message.
3631  *
3632  * @param cls closure, always NULL
3633  * @param other the other peer involved (sender or receiver, NULL
3634  *        for loopback messages where we are both sender and receiver)
3635  * @param message the actual message
3636  * @param atsi performance information
3637  * @return GNUNET_OK to keep the connection open,
3638  *         GNUNET_SYSERR to close it (signal serious error)
3639  */
3640 static int
3641 handle_p2p_put (void *cls,
3642                 const struct GNUNET_PeerIdentity *other,
3643                 const struct GNUNET_MessageHeader *message,
3644                 const struct GNUNET_TRANSPORT_ATS_Information *atsi)
3645 {
3646   const struct PutMessage *put;
3647   uint16_t msize;
3648   size_t dsize;
3649   enum GNUNET_BLOCK_Type type;
3650   struct GNUNET_TIME_Absolute expiration;
3651   GNUNET_HashCode query;
3652   struct ProcessReplyClosure prq;
3653   struct GNUNET_TIME_Absolute *start;
3654   struct GNUNET_TIME_Relative block_time;  
3655   double putl;
3656   struct ConnectedPeer *cp; 
3657   struct PendingMessage *pm;
3658   struct MigrationStopMessage *msm;
3659
3660   msize = ntohs (message->size);
3661   if (msize < sizeof (struct PutMessage))
3662     {
3663       GNUNET_break_op(0);
3664       return GNUNET_SYSERR;
3665     }
3666   put = (const struct PutMessage*) message;
3667   dsize = msize - sizeof (struct PutMessage);
3668   type = ntohl (put->type);
3669   expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
3670
3671   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3672     return GNUNET_SYSERR;
3673   if (GNUNET_OK !=
3674       GNUNET_BLOCK_get_key (block_ctx,
3675                             type,
3676                             &put[1],
3677                             dsize,
3678                             &query))
3679     {
3680       GNUNET_break_op (0);
3681       return GNUNET_SYSERR;
3682     }
3683   cover_content_count++;
3684 #if DEBUG_FS
3685   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3686               "Received result for query `%s' from peer `%4s'\n",
3687               GNUNET_h2s (&query),
3688               GNUNET_i2s (other));
3689 #endif
3690   GNUNET_STATISTICS_update (stats,
3691                             gettext_noop ("# replies received (overall)"),
3692                             1,
3693                             GNUNET_NO);
3694   /* now, lookup 'query' */
3695   prq.data = (const void*) &put[1];
3696   if (other != NULL)
3697     prq.sender = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3698                                                     &other->hashPubKey);
3699   else
3700     prq.sender = NULL;
3701   prq.size = dsize;
3702   prq.type = type;
3703   prq.expiration = expiration;
3704   prq.priority = 0;
3705   prq.anonymity_level = 1;
3706   prq.finished = GNUNET_NO;
3707   prq.request_found = GNUNET_NO;
3708   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3709                                               &query,
3710                                               &process_reply,
3711                                               &prq);
3712   if (prq.sender != NULL)
3713     {
3714       prq.sender->inc_preference += CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority;
3715       change_host_trust (prq.sender, prq.priority);
3716     }
3717   if ( (GNUNET_YES == active_to_migration) &&
3718        (GNUNET_NO == test_put_load_too_high (prq.priority)) )
3719     {      
3720 #if DEBUG_FS
3721       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3722                   "Replicating result for query `%s' with priority %u\n",
3723                   GNUNET_h2s (&query),
3724                   prq.priority);
3725 #endif
3726       start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
3727       *start = GNUNET_TIME_absolute_get ();
3728       GNUNET_DATASTORE_put (dsh,
3729                             0, &query, dsize, &put[1],
3730                             type, prq.priority, 1 /* anonymity */, 
3731                             expiration, 
3732                             1 + prq.priority, MAX_DATASTORE_QUEUE,
3733                             GNUNET_CONSTANTS_SERVICE_TIMEOUT,
3734                             &put_migration_continuation, 
3735                             start);
3736     }
3737   putl = GNUNET_LOAD_get_load (datastore_put_load);
3738   if ( (NULL != (cp = prq.sender)) &&
3739        (GNUNET_NO == prq.request_found) &&
3740        ( (GNUNET_YES != active_to_migration) ||
3741          (putl > 2.5 * (1 + prq.priority)) ) ) 
3742     {
3743       if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).rel_value < 5000)
3744         return GNUNET_OK; /* already blocked */
3745       /* We're too busy; send MigrationStop message! */
3746       if (GNUNET_YES != active_to_migration) 
3747         putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
3748       block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
3749                                                   5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3750                                                                                    (unsigned int) (60000 * putl * putl)));
3751       
3752       cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
3753       pm = GNUNET_malloc (sizeof (struct PendingMessage) + 
3754                           sizeof (struct MigrationStopMessage));
3755       pm->msize = sizeof (struct MigrationStopMessage);
3756       pm->priority = UINT32_MAX;
3757       msm = (struct MigrationStopMessage*) &pm[1];
3758       msm->header.size = htons (sizeof (struct MigrationStopMessage));
3759       msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
3760       msm->duration = GNUNET_TIME_relative_hton (block_time);
3761       add_to_pending_messages_for_peer (cp,
3762                                         pm,
3763                                         NULL);
3764     }
3765   return GNUNET_OK;
3766 }
3767
3768
3769 /**
3770  * Handle P2P "MIGRATION_STOP" message.
3771  *
3772  * @param cls closure, always NULL
3773  * @param other the other peer involved (sender or receiver, NULL
3774  *        for loopback messages where we are both sender and receiver)
3775  * @param message the actual message
3776  * @param atsi performance information
3777  * @return GNUNET_OK to keep the connection open,
3778  *         GNUNET_SYSERR to close it (signal serious error)
3779  */
3780 static int
3781 handle_p2p_migration_stop (void *cls,
3782                            const struct GNUNET_PeerIdentity *other,
3783                            const struct GNUNET_MessageHeader *message,
3784                            const struct GNUNET_TRANSPORT_ATS_Information *atsi)
3785 {
3786   struct ConnectedPeer *cp; 
3787   const struct MigrationStopMessage *msm;
3788
3789   msm = (const struct MigrationStopMessage*) message;
3790   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3791                                           &other->hashPubKey);
3792   if (cp == NULL)
3793     {
3794       GNUNET_break (0);
3795       return GNUNET_OK;
3796     }
3797   cp->migration_blocked = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration));
3798   return GNUNET_OK;
3799 }
3800
3801
3802
3803 /* **************************** P2P GET Handling ************************ */
3804
3805
3806 /**
3807  * Closure for 'check_duplicate_request_{peer,client}'.
3808  */
3809 struct CheckDuplicateRequestClosure
3810 {
3811   /**
3812    * The new request we should check if it already exists.
3813    */
3814   const struct PendingRequest *pr;
3815
3816   /**
3817    * Existing request found by the checker, NULL if none.
3818    */
3819   struct PendingRequest *have;
3820 };
3821
3822
3823 /**
3824  * Iterator over entries in the 'query_request_map' that
3825  * tries to see if we have the same request pending from
3826  * the same client already.
3827  *
3828  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
3829  * @param key current key code (query, ignored, must match)
3830  * @param value value in the hash map (a 'struct PendingRequest' 
3831  *              that already exists)
3832  * @return GNUNET_YES if we should continue to
3833  *         iterate (no match yet)
3834  *         GNUNET_NO if not (match found).
3835  */
3836 static int
3837 check_duplicate_request_client (void *cls,
3838                                 const GNUNET_HashCode * key,
3839                                 void *value)
3840 {
3841   struct CheckDuplicateRequestClosure *cdc = cls;
3842   struct PendingRequest *have = value;
3843
3844   if (have->client_request_list == NULL)
3845     return GNUNET_YES;
3846   if ( (cdc->pr->client_request_list->client_list->client == have->client_request_list->client_list->client) &&
3847        (cdc->pr != have) )
3848     {
3849       cdc->have = have;
3850       return GNUNET_NO;
3851     }
3852   return GNUNET_YES;
3853 }
3854
3855
3856 /**
3857  * We're processing (local) results for a search request
3858  * from another peer.  Pass applicable results to the
3859  * peer and if we are done either clean up (operation
3860  * complete) or forward to other peers (more results possible).
3861  *
3862  * @param cls our closure (struct LocalGetContext)
3863  * @param key key for the content
3864  * @param size number of bytes in data
3865  * @param data content stored
3866  * @param type type of the content
3867  * @param priority priority of the content
3868  * @param anonymity anonymity-level for the content
3869  * @param expiration expiration time for the content
3870  * @param uid unique identifier for the datum;
3871  *        maybe 0 if no unique identifier is available
3872  */
3873 static void
3874 process_local_reply (void *cls,
3875                      const GNUNET_HashCode * key,
3876                      size_t size,
3877                      const void *data,
3878                      enum GNUNET_BLOCK_Type type,
3879                      uint32_t priority,
3880                      uint32_t anonymity,
3881                      struct GNUNET_TIME_Absolute
3882                      expiration, 
3883                      uint64_t uid)
3884 {
3885   struct PendingRequest *pr = cls;
3886   struct ProcessReplyClosure prq;
3887   struct CheckDuplicateRequestClosure cdrc;
3888   GNUNET_HashCode query;
3889   unsigned int old_rf;
3890   
3891   if (NULL == key)
3892     {
3893 #if DEBUG_FS > 1
3894       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3895                   "Done processing local replies, forwarding request to other peers.\n");
3896 #endif
3897       pr->qe = NULL;
3898       if (pr->client_request_list != NULL)
3899         {
3900           GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
3901                                       GNUNET_YES);
3902           /* Figure out if this is a duplicate request and possibly
3903              merge 'struct PendingRequest' entries */
3904           cdrc.have = NULL;
3905           cdrc.pr = pr;
3906           GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3907                                                       &pr->query,
3908                                                       &check_duplicate_request_client,
3909                                                       &cdrc);
3910           if (cdrc.have != NULL)
3911             {
3912 #if DEBUG_FS
3913               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3914                           "Received request for block `%s' twice from client, will only request once.\n",
3915                           GNUNET_h2s (&pr->query));
3916 #endif
3917               
3918               destroy_pending_request (pr);
3919               return;
3920             }
3921         }
3922       if (pr->local_only == GNUNET_YES)
3923         {
3924           destroy_pending_request (pr);
3925           return;
3926         }
3927       /* no more results */
3928       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
3929         pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task,
3930                                              pr);      
3931       return;
3932     }
3933 #if DEBUG_FS
3934   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3935               "New local response to `%s' of type %u.\n",
3936               GNUNET_h2s (key),
3937               type);
3938 #endif
3939   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3940     {
3941 #if DEBUG_FS
3942       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3943                   "Found ONDEMAND block, performing on-demand encoding\n");
3944 #endif
3945       GNUNET_STATISTICS_update (stats,
3946                                 gettext_noop ("# on-demand blocks matched requests"),
3947                                 1,
3948                                 GNUNET_NO);
3949       if (GNUNET_OK != 
3950           GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, 
3951                                             anonymity, expiration, uid, 
3952                                             &process_local_reply,
3953                                             pr))
3954       if (pr->qe != NULL)
3955         {
3956           GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3957         }
3958       return;
3959     }
3960   old_rf = pr->results_found;
3961   memset (&prq, 0, sizeof (prq));
3962   prq.data = data;
3963   prq.expiration = expiration;
3964   prq.size = size;  
3965   if (GNUNET_OK != 
3966       GNUNET_BLOCK_get_key (block_ctx,
3967                             type,
3968                             data,
3969                             size,
3970                             &query))
3971     {
3972       GNUNET_break (0);
3973       GNUNET_DATASTORE_remove (dsh,
3974                                key,
3975                                size, data,
3976                                -1, -1, 
3977                                GNUNET_TIME_UNIT_FOREVER_REL,
3978                                NULL, NULL);
3979       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3980       return;
3981     }
3982   prq.type = type;
3983   prq.priority = priority;  
3984   prq.finished = GNUNET_NO;
3985   prq.request_found = GNUNET_NO;
3986   prq.anonymity_level = anonymity;
3987   if ( (old_rf == 0) &&
3988        (pr->results_found == 0) )
3989     update_datastore_delays (pr->start_time);
3990   process_reply (&prq, key, pr);
3991   if (prq.finished == GNUNET_YES)
3992     return;
3993   if (pr->qe == NULL)
3994     return; /* done here */
3995   if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
3996     {
3997       pr->local_only = GNUNET_YES; /* do not forward */
3998       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3999       return;
4000     }
4001   if ( (pr->client_request_list == NULL) &&
4002        ( (GNUNET_YES == test_get_load_too_high (0)) ||
4003          (pr->results_found > 5 + 2 * pr->priority) ) )
4004     {
4005 #if DEBUG_FS > 2
4006       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4007                   "Load too high, done with request\n");
4008 #endif
4009       GNUNET_STATISTICS_update (stats,
4010                                 gettext_noop ("# processing result set cut short due to load"),
4011                                 1,
4012                                 GNUNET_NO);
4013       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
4014       return;
4015     }
4016   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
4017 }
4018
4019
4020 /**
4021  * We've received a request with the specified priority.  Bound it
4022  * according to how much we trust the given peer.
4023  * 
4024  * @param prio_in requested priority
4025  * @param cp the peer making the request
4026  * @return effective priority
4027  */
4028 static int32_t
4029 bound_priority (uint32_t prio_in,
4030                 struct ConnectedPeer *cp)
4031 {
4032 #define N ((double)128.0)
4033   uint32_t ret;
4034   double rret;
4035   int ld;
4036
4037   ld = test_get_load_too_high (0);
4038   if (ld == GNUNET_SYSERR)
4039     {
4040       GNUNET_STATISTICS_update (stats,
4041                                 gettext_noop ("# requests done for free (low load)"),
4042                                 1,
4043                                 GNUNET_NO);
4044       return 0; /* excess resources */
4045     }
4046   if (prio_in > INT32_MAX)
4047     prio_in = INT32_MAX;
4048   ret = - change_host_trust (cp, - (int) prio_in);
4049   if (ret > 0)
4050     {
4051       if (ret > current_priorities + N)
4052         rret = current_priorities + N;
4053       else
4054         rret = ret;
4055       current_priorities 
4056         = (current_priorities * (N-1) + rret)/N;
4057     }
4058   if ( (ld == GNUNET_YES) && (ret > 0) )
4059     {
4060       /* try with charging */
4061       ld = test_get_load_too_high (ret);
4062     }
4063   if (ld == GNUNET_YES)
4064     {
4065       GNUNET_STATISTICS_update (stats,
4066                                 gettext_noop ("# request dropped, priority insufficient"),
4067                                 1,
4068                                 GNUNET_NO);
4069       /* undo charge */
4070       change_host_trust (cp, (int) ret);
4071       return -1; /* not enough resources */
4072     }
4073   else
4074     {
4075       GNUNET_STATISTICS_update (stats,
4076                                 gettext_noop ("# requests done for a price (normal load)"),
4077                                 1,
4078                                 GNUNET_NO);
4079     }
4080 #undef N
4081   return ret;
4082 }
4083
4084
4085 /**
4086  * Iterator over entries in the 'query_request_map' that
4087  * tries to see if we have the same request pending from
4088  * the same peer already.
4089  *
4090  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
4091  * @param key current key code (query, ignored, must match)
4092  * @param value value in the hash map (a 'struct PendingRequest' 
4093  *              that already exists)
4094  * @return GNUNET_YES if we should continue to
4095  *         iterate (no match yet)
4096  *         GNUNET_NO if not (match found).
4097  */
4098 static int
4099 check_duplicate_request_peer (void *cls,
4100                               const GNUNET_HashCode * key,
4101                               void *value)
4102 {
4103   struct CheckDuplicateRequestClosure *cdc = cls;
4104   struct PendingRequest *have = value;
4105
4106   if (cdc->pr->target_pid == have->target_pid)
4107     {
4108       cdc->have = have;
4109       return GNUNET_NO;
4110     }
4111   return GNUNET_YES;
4112 }
4113
4114
4115 /**
4116  * Handle P2P "GET" request.
4117  *
4118  * @param cls closure, always NULL
4119  * @param other the other peer involved (sender or receiver, NULL
4120  *        for loopback messages where we are both sender and receiver)
4121  * @param message the actual message
4122  * @param atsi performance information
4123  * @return GNUNET_OK to keep the connection open,
4124  *         GNUNET_SYSERR to close it (signal serious error)
4125  */
4126 static int
4127 handle_p2p_get (void *cls,
4128                 const struct GNUNET_PeerIdentity *other,
4129                 const struct GNUNET_MessageHeader *message,
4130                 const struct GNUNET_TRANSPORT_ATS_Information *atsi)
4131 {
4132   struct PendingRequest *pr;
4133   struct ConnectedPeer *cp;
4134   struct ConnectedPeer *cps;
4135   struct CheckDuplicateRequestClosure cdc;
4136   struct GNUNET_TIME_Relative timeout;
4137   uint16_t msize;
4138   const struct GetMessage *gm;
4139   unsigned int bits;
4140   const GNUNET_HashCode *opt;
4141   uint32_t bm;
4142   size_t bfsize;
4143   uint32_t ttl_decrement;
4144   int32_t priority;
4145   enum GNUNET_BLOCK_Type type;
4146   int have_ns;
4147
4148   msize = ntohs(message->size);
4149   if (msize < sizeof (struct GetMessage))
4150     {
4151       GNUNET_break_op (0);
4152       return GNUNET_SYSERR;
4153     }
4154   gm = (const struct GetMessage*) message;
4155 #if DEBUG_FS
4156   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4157               "Received request for `%s'\n",
4158               GNUNET_h2s (&gm->query));
4159 #endif
4160   type = ntohl (gm->type);
4161   bm = ntohl (gm->hash_bitmap);
4162   bits = 0;
4163   while (bm > 0)
4164     {
4165       if (1 == (bm & 1))
4166         bits++;
4167       bm >>= 1;
4168     }
4169   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
4170     {
4171       GNUNET_break_op (0);
4172       return GNUNET_SYSERR;
4173     }  
4174   opt = (const GNUNET_HashCode*) &gm[1];
4175   bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (GNUNET_HashCode);
4176   /* bfsize must be power of 2, check! */
4177   if (0 != ( (bfsize - 1) & bfsize))
4178     {
4179       GNUNET_break_op (0);
4180       return GNUNET_SYSERR;
4181     }
4182   cover_query_count++;
4183   bm = ntohl (gm->hash_bitmap);
4184   bits = 0;
4185   cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
4186                                            &other->hashPubKey);
4187   if (NULL == cps)
4188     {
4189       /* peer must have just disconnected */
4190       GNUNET_STATISTICS_update (stats,
4191                                 gettext_noop ("# requests dropped due to initiator not being connected"),
4192                                 1,
4193                                 GNUNET_NO);
4194       return GNUNET_SYSERR;
4195     }
4196   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
4197     cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
4198                                             &opt[bits++]);
4199   else
4200     cp = cps;
4201   if (cp == NULL)
4202     {
4203 #if DEBUG_FS
4204       if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
4205         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4206                     "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
4207                     GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
4208       
4209       else
4210         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4211                     "Failed to find peer `%4s' in connection set. Dropping query.\n",
4212                     GNUNET_i2s (other));
4213 #endif
4214       GNUNET_STATISTICS_update (stats,
4215                                 gettext_noop ("# requests dropped due to missing reverse route"),
4216                                 1,
4217                                 GNUNET_NO);
4218      /* FIXME: try connect? */
4219       return GNUNET_OK;
4220     }
4221   /* note that we can really only check load here since otherwise
4222      peers could find out that we are overloaded by not being
4223      disconnected after sending us a malformed query... */
4224   priority = bound_priority (ntohl (gm->priority), cps);
4225   if (priority < 0)
4226     {
4227 #if DEBUG_FS
4228       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4229                   "Dropping query from `%s', this peer is too busy.\n",
4230                   GNUNET_i2s (other));
4231 #endif
4232       return GNUNET_OK;
4233     }
4234 #if DEBUG_FS 
4235   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4236               "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
4237               GNUNET_h2s (&gm->query),
4238               (unsigned int) type,
4239               GNUNET_i2s (other),
4240               (unsigned int) bm);
4241 #endif
4242   have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
4243   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
4244                       (have_ns ? sizeof(GNUNET_HashCode) : 0));
4245   if (have_ns)
4246     {
4247       pr->namespace = (GNUNET_HashCode*) &pr[1];
4248       memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
4249     }
4250   if ( (GNUNET_LOAD_get_load (cp->transmission_delay) > 3 * (1 + priority)) ||
4251        (GNUNET_LOAD_get_average (cp->transmission_delay) > 
4252         GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value * 2 + GNUNET_LOAD_get_average (rt_entry_lifetime)) )
4253     {
4254       /* don't have BW to send to peer, or would likely take longer than we have for it,
4255          so at best indirect the query */
4256       priority = 0;
4257       pr->forward_only = GNUNET_YES;
4258     }
4259   pr->type = type;
4260   pr->mingle = ntohl (gm->filter_mutator);
4261   if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
4262     pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
4263   pr->anonymity_level = 1;
4264   pr->priority = (uint32_t) priority;
4265   pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
4266   pr->query = gm->query;
4267   /* decrement ttl (always) */
4268   ttl_decrement = 2 * TTL_DECREMENT +
4269     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
4270                               TTL_DECREMENT);
4271   if ( (pr->ttl < 0) &&
4272        (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
4273     {
4274 #if DEBUG_FS
4275       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4276                   "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
4277                   GNUNET_i2s (other),
4278                   pr->ttl,
4279                   ttl_decrement);
4280 #endif
4281       GNUNET_STATISTICS_update (stats,
4282                                 gettext_noop ("# requests dropped due TTL underflow"),
4283                                 1,
4284                                 GNUNET_NO);
4285       /* integer underflow => drop (should be very rare)! */      
4286       GNUNET_free (pr);
4287       return GNUNET_OK;
4288     } 
4289   pr->ttl -= ttl_decrement;
4290   pr->start_time = GNUNET_TIME_absolute_get ();
4291
4292   /* get bloom filter */
4293   if (bfsize > 0)
4294     {
4295       pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
4296                                                   bfsize,
4297                                                   BLOOMFILTER_K);
4298       pr->bf_size = bfsize;
4299     }
4300   cdc.have = NULL;
4301   cdc.pr = pr;
4302   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
4303                                               &gm->query,
4304                                               &check_duplicate_request_peer,
4305                                               &cdc);
4306   if (cdc.have != NULL)
4307     {
4308       if (cdc.have->start_time.abs_value + cdc.have->ttl >=
4309           pr->start_time.abs_value + pr->ttl)
4310         {
4311           /* existing request has higher TTL, drop new one! */
4312           cdc.have->priority += pr->priority;
4313           destroy_pending_request (pr);
4314 #if DEBUG_FS
4315           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4316                       "Have existing request with higher TTL, dropping new request.\n",
4317                       GNUNET_i2s (other));
4318 #endif
4319           GNUNET_STATISTICS_update (stats,
4320                                     gettext_noop ("# requests dropped due to higher-TTL request"),
4321                                     1,
4322                                     GNUNET_NO);
4323           return GNUNET_OK;
4324         }
4325       else
4326         {
4327           /* existing request has lower TTL, drop old one! */
4328           pr->priority += cdc.have->priority;
4329           /* Possible optimization: if we have applicable pending
4330              replies in 'cdc.have', we might want to move those over
4331              (this is a really rare special-case, so it is not clear
4332              that this would be worth it) */
4333           destroy_pending_request (cdc.have);
4334           /* keep processing 'pr'! */
4335         }
4336     }
4337
4338   pr->cp = cp;
4339   GNUNET_break (GNUNET_OK ==
4340                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
4341                                                    &gm->query,
4342                                                    pr,
4343                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4344   GNUNET_break (GNUNET_OK ==
4345                 GNUNET_CONTAINER_multihashmap_put (peer_request_map,
4346                                                    &other->hashPubKey,
4347                                                    pr,
4348                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4349   
4350   pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
4351                                             pr,
4352                                             pr->start_time.abs_value + pr->ttl);
4353
4354   GNUNET_STATISTICS_update (stats,
4355                             gettext_noop ("# P2P searches received"),
4356                             1,
4357                             GNUNET_NO);
4358   GNUNET_STATISTICS_update (stats,
4359                             gettext_noop ("# P2P searches active"),
4360                             1,
4361                             GNUNET_NO);
4362
4363   /* calculate change in traffic preference */
4364   cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
4365   /* process locally */
4366   if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
4367     type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
4368   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
4369                                            (pr->priority + 1)); 
4370   if (GNUNET_YES != pr->forward_only)
4371     {
4372 #if DEBUG_FS
4373       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4374                   "Handing request for `%s' to datastore\n",
4375                   GNUNET_h2s (&gm->query));
4376 #endif
4377       pr->qe = GNUNET_DATASTORE_get (dsh,
4378                                      &gm->query,
4379                                      type,                             
4380                                      pr->priority + 1,
4381                                      MAX_DATASTORE_QUEUE,                                
4382                                      timeout,
4383                                      &process_local_reply,
4384                                      pr);
4385       if (NULL == pr->qe)
4386         {
4387           GNUNET_STATISTICS_update (stats,
4388                                     gettext_noop ("# requests dropped by datastore (queue length limit)"),
4389                                     1,
4390                                     GNUNET_NO);
4391         }
4392     }
4393   else
4394     {
4395       GNUNET_STATISTICS_update (stats,
4396                                 gettext_noop ("# requests forwarded due to high load"),
4397                                 1,
4398                                 GNUNET_NO);
4399     }
4400
4401   /* Are multiple results possible (and did we look locally)?  If so, start processing remotely now! */
4402   switch (pr->type)
4403     {
4404     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
4405     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
4406       /* only one result, wait for datastore */
4407       if (GNUNET_YES != pr->forward_only)
4408         {
4409           GNUNET_STATISTICS_update (stats,
4410                                     gettext_noop ("# requests not instantly forwarded (waiting for datastore)"),
4411                                     1,
4412                                     GNUNET_NO);
4413           break;
4414         }
4415     default:
4416       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
4417         pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task,
4418                                              pr);
4419     }
4420
4421   /* make sure we don't track too many requests */
4422   if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
4423     {
4424       pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
4425       GNUNET_assert (pr != NULL);
4426       destroy_pending_request (pr);
4427     }
4428   return GNUNET_OK;
4429 }
4430
4431
4432 /* **************************** CS GET Handling ************************ */
4433
4434
4435 /**
4436  * Handle START_SEARCH-message (search request from client).
4437  *
4438  * @param cls closure
4439  * @param client identification of the client
4440  * @param message the actual message
4441  */
4442 static void
4443 handle_start_search (void *cls,
4444                      struct GNUNET_SERVER_Client *client,
4445                      const struct GNUNET_MessageHeader *message)
4446 {
4447   static GNUNET_HashCode all_zeros;
4448   const struct SearchMessage *sm;
4449   struct ClientList *cl;
4450   struct ClientRequestList *crl;
4451   struct PendingRequest *pr;
4452   uint16_t msize;
4453   unsigned int sc;
4454   enum GNUNET_BLOCK_Type type;
4455
4456   msize = ntohs (message->size);
4457   if ( (msize < sizeof (struct SearchMessage)) ||
4458        (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
4459     {
4460       GNUNET_break (0);
4461       GNUNET_SERVER_receive_done (client,
4462                                   GNUNET_SYSERR);
4463       return;
4464     }
4465   GNUNET_STATISTICS_update (stats,
4466                             gettext_noop ("# client searches received"),
4467                             1,
4468                             GNUNET_NO);
4469   sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
4470   sm = (const struct SearchMessage*) message;
4471   type = ntohl (sm->type);
4472 #if DEBUG_FS
4473   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4474               "Received request for `%s' of type %u from local client\n",
4475               GNUNET_h2s (&sm->query),
4476               (unsigned int) type);
4477 #endif
4478   cl = client_list;
4479   while ( (cl != NULL) &&
4480           (cl->client != client) )
4481     cl = cl->next;
4482   if (cl == NULL)
4483     {
4484       cl = GNUNET_malloc (sizeof (struct ClientList));
4485       cl->client = client;
4486       GNUNET_SERVER_client_keep (client);
4487       cl->next = client_list;
4488       client_list = cl;
4489     }
4490   /* detect duplicate KBLOCK requests */
4491   if ( (type == GNUNET_BLOCK_TYPE_FS_KBLOCK) ||
4492        (type == GNUNET_BLOCK_TYPE_FS_NBLOCK) ||
4493        (type == GNUNET_BLOCK_TYPE_ANY) )
4494     {
4495       crl = cl->rl_head;
4496       while ( (crl != NULL) &&
4497               ( (0 != memcmp (&crl->req->query,
4498                               &sm->query,
4499                               sizeof (GNUNET_HashCode))) ||
4500                 (crl->req->type != type) ) )
4501         crl = crl->next;
4502       if (crl != NULL)  
4503         { 
4504 #if DEBUG_FS
4505           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4506                       "Have existing request, merging content-seen lists.\n");
4507 #endif
4508           pr = crl->req;
4509           /* Duplicate request (used to send long list of
4510              known/blocked results); merge 'pr->replies_seen'
4511              and update bloom filter */
4512           GNUNET_array_grow (pr->replies_seen,
4513                              pr->replies_seen_size,
4514                              pr->replies_seen_off + sc);
4515           memcpy (&pr->replies_seen[pr->replies_seen_off],
4516                   &sm[1],
4517                   sc * sizeof (GNUNET_HashCode));
4518           pr->replies_seen_off += sc;
4519           refresh_bloomfilter (pr);
4520           GNUNET_STATISTICS_update (stats,
4521                                     gettext_noop ("# client searches updated (merged content seen list)"),
4522                                     1,
4523                                     GNUNET_NO);
4524           GNUNET_SERVER_receive_done (client,
4525                                       GNUNET_OK);
4526           return;
4527         }
4528     }
4529   GNUNET_STATISTICS_update (stats,
4530                             gettext_noop ("# client searches active"),
4531                             1,
4532                             GNUNET_NO);
4533   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
4534                       ((type == GNUNET_BLOCK_TYPE_FS_SBLOCK) ? sizeof(GNUNET_HashCode) : 0));
4535   crl = GNUNET_malloc (sizeof (struct ClientRequestList));
4536   memset (crl, 0, sizeof (struct ClientRequestList));
4537   crl->client_list = cl;
4538   GNUNET_CONTAINER_DLL_insert (cl->rl_head,
4539                                cl->rl_tail,
4540                                crl);  
4541   crl->req = pr;
4542   pr->type = type;
4543   pr->client_request_list = crl;
4544   GNUNET_array_grow (pr->replies_seen,
4545                      pr->replies_seen_size,
4546                      sc);
4547   memcpy (pr->replies_seen,
4548           &sm[1],
4549           sc * sizeof (GNUNET_HashCode));
4550   pr->replies_seen_off = sc;
4551   pr->anonymity_level = ntohl (sm->anonymity_level); 
4552   pr->start_time = GNUNET_TIME_absolute_get ();
4553   refresh_bloomfilter (pr);
4554   pr->query = sm->query;
4555   if (0 == (1 & ntohl (sm->options)))
4556     pr->local_only = GNUNET_NO;
4557   else
4558     pr->local_only = GNUNET_YES;
4559   switch (type)
4560     {
4561     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
4562     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
4563       if (0 != memcmp (&sm->target,
4564                        &all_zeros,
4565                        sizeof (GNUNET_HashCode)))
4566         pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target);
4567       break;
4568     case GNUNET_BLOCK_TYPE_FS_SBLOCK:
4569       pr->namespace = (GNUNET_HashCode*) &pr[1];
4570       memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode));
4571       break;
4572     default:
4573       break;
4574     }
4575   GNUNET_break (GNUNET_OK ==
4576                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
4577                                                    &sm->query,
4578                                                    pr,
4579                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4580   if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
4581     type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */
4582   pr->qe = GNUNET_DATASTORE_get (dsh,
4583                                  &sm->query,
4584                                  type,
4585                                  -3, -1,
4586                                  GNUNET_CONSTANTS_SERVICE_TIMEOUT,                             
4587                                  &process_local_reply,
4588                                  pr);
4589 }
4590
4591
4592 /* **************************** Startup ************************ */
4593
4594
4595
4596 /**
4597  * Function called after GNUNET_CORE_connect has succeeded
4598  * (or failed for good).  Note that the private key of the
4599  * peer is intentionally not exposed here; if you need it,
4600  * your process should try to read the private key file
4601  * directly (which should work if you are authorized...).
4602  *
4603  * @param cls closure
4604  * @param server handle to the server, NULL if we failed
4605  * @param my_identity ID of this peer, NULL if we failed
4606  * @param publicKey public key of this peer, NULL if we failed
4607  */
4608 static void
4609 peer_init_handler (void *cls,
4610                    struct GNUNET_CORE_Handle * server,
4611                    const struct GNUNET_PeerIdentity *
4612                    my_identity,
4613                    const struct
4614                    GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *
4615                    publicKey)
4616 {
4617   my_id = *my_identity;
4618 }
4619
4620
4621
4622
4623 /**
4624  * Process fs requests.
4625  *
4626  * @param server the initialized server
4627  * @param c configuration to use
4628  */
4629 static int
4630 main_init (struct GNUNET_SERVER_Handle *server,
4631            const struct GNUNET_CONFIGURATION_Handle *c)
4632 {
4633   static const struct GNUNET_CORE_MessageHandler p2p_handlers[] =
4634     {
4635       { &handle_p2p_get, 
4636         GNUNET_MESSAGE_TYPE_FS_GET, 0 },
4637       { &handle_p2p_put, 
4638         GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
4639       { &handle_p2p_migration_stop, 
4640         GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP,
4641         sizeof (struct MigrationStopMessage) },
4642       { NULL, 0, 0 }
4643     };
4644   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
4645     {&GNUNET_FS_handle_index_start, NULL, 
4646      GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
4647     {&GNUNET_FS_handle_index_list_get, NULL, 
4648      GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
4649     {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
4650      sizeof (struct UnindexMessage) },
4651     {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
4652      0 },
4653     {NULL, NULL, 0, 0}
4654   };
4655   unsigned long long enc = 128;
4656
4657   cfg = c;
4658   stats = GNUNET_STATISTICS_create ("fs", cfg);
4659   min_migration_delay = GNUNET_TIME_UNIT_SECONDS;
4660   if ( (GNUNET_OK !=
4661         GNUNET_CONFIGURATION_get_value_number (cfg,
4662                                                "fs",
4663                                                "MAX_PENDING_REQUESTS",
4664                                                &max_pending_requests)) ||
4665        (GNUNET_OK !=
4666         GNUNET_CONFIGURATION_get_value_number (cfg,
4667                                                "fs",
4668                                                "EXPECTED_NEIGHBOUR_COUNT",
4669                                                &enc)) ||
4670        (GNUNET_OK != 
4671         GNUNET_CONFIGURATION_get_value_time (cfg,
4672                                              "fs",
4673                                              "MIN_MIGRATION_DELAY",
4674                                              &min_migration_delay)) )
4675     {
4676       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4677                   _("Configuration fails to specify certain parameters, assuming default values."));
4678     }
4679   connected_peers = GNUNET_CONTAINER_multihashmap_create (enc); 
4680   query_request_map = GNUNET_CONTAINER_multihashmap_create (max_pending_requests);
4681   rt_entry_lifetime = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_FOREVER_REL);
4682   peer_request_map = GNUNET_CONTAINER_multihashmap_create (enc);
4683   requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
4684   core = GNUNET_CORE_connect (cfg,
4685                               1, /* larger? */
4686                               NULL,
4687                               &peer_init_handler,
4688                               &peer_connect_handler,
4689                               &peer_disconnect_handler,
4690                               &peer_status_handler,
4691                               NULL, GNUNET_NO,
4692                               NULL, GNUNET_NO,
4693                               p2p_handlers);
4694   if (NULL == core)
4695     {
4696       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
4697                   _("Failed to connect to `%s' service.\n"),
4698                   "core");
4699       GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
4700       connected_peers = NULL;
4701       GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
4702       query_request_map = NULL;
4703       GNUNET_LOAD_value_free (rt_entry_lifetime);
4704       rt_entry_lifetime = NULL;
4705       GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
4706       requests_by_expiration_heap = NULL;
4707       GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
4708       peer_request_map = NULL;
4709       if (dsh != NULL)
4710         {
4711           GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4712           dsh = NULL;
4713         }
4714       return GNUNET_SYSERR;
4715     }
4716   if (active_from_migration) 
4717     {
4718       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4719                   _("Content migration is enabled, will start to gather data\n"));
4720       consider_migration_gathering ();
4721     }
4722   consider_dht_put_gathering (NULL);
4723   GNUNET_SERVER_disconnect_notify (server, 
4724                                    &handle_client_disconnect,
4725                                    NULL);
4726   GNUNET_assert (GNUNET_OK ==
4727                  GNUNET_CONFIGURATION_get_value_filename (cfg,
4728                                                           "fs",
4729                                                           "TRUST",
4730                                                           &trustDirectory));
4731   GNUNET_DISK_directory_create (trustDirectory);
4732   GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_HIGH,
4733                                       &cron_flush_trust, NULL);
4734
4735
4736   GNUNET_SERVER_add_handlers (server, handlers);
4737   cover_age_task = GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY,
4738                                                  &age_cover_counters,
4739                                                  NULL);
4740   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
4741                                 &shutdown_task,
4742                                 NULL);
4743   return GNUNET_OK;
4744 }
4745
4746
4747 /**
4748  * Process fs requests.
4749  *
4750  * @param cls closure
4751  * @param server the initialized server
4752  * @param cfg configuration to use
4753  */
4754 static void
4755 run (void *cls,
4756      struct GNUNET_SERVER_Handle *server,
4757      const struct GNUNET_CONFIGURATION_Handle *cfg)
4758 {
4759   active_to_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
4760                                                               "FS",
4761                                                               "CONTENT_CACHING");
4762   active_from_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
4763                                                                 "FS",
4764                                                                 "CONTENT_PUSHING");
4765   dsh = GNUNET_DATASTORE_connect (cfg);
4766   if (dsh == NULL)
4767     {
4768       GNUNET_SCHEDULER_shutdown ();
4769       return;
4770     }
4771   datastore_get_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
4772   datastore_put_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
4773   block_cfg = GNUNET_CONFIGURATION_create ();
4774   GNUNET_CONFIGURATION_set_value_string (block_cfg,
4775                                          "block",
4776                                          "PLUGINS",
4777                                          "fs");
4778   block_ctx = GNUNET_BLOCK_context_create (block_cfg);
4779   GNUNET_assert (NULL != block_ctx);
4780   dht_handle = GNUNET_DHT_connect (cfg,
4781                                    FS_DHT_HT_SIZE);
4782   if ( (GNUNET_OK != GNUNET_FS_indexing_init (cfg, dsh)) ||
4783        (GNUNET_OK != main_init (server, cfg)) )
4784     {    
4785       GNUNET_SCHEDULER_shutdown ();
4786       GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4787       dsh = NULL;
4788       GNUNET_DHT_disconnect (dht_handle);
4789       dht_handle = NULL;
4790       GNUNET_BLOCK_context_destroy (block_ctx);
4791       block_ctx = NULL;
4792       GNUNET_CONFIGURATION_destroy (block_cfg);
4793       block_cfg = NULL;
4794       GNUNET_LOAD_value_free (datastore_get_load);
4795       datastore_get_load = NULL;
4796       GNUNET_LOAD_value_free (datastore_put_load);
4797       datastore_put_load = NULL;
4798       return;   
4799     }
4800 }
4801
4802
4803 /**
4804  * The main function for the fs service.
4805  *
4806  * @param argc number of arguments from the command line
4807  * @param argv command line arguments
4808  * @return 0 ok, 1 on error
4809  */
4810 int
4811 main (int argc, char *const *argv)
4812 {
4813   return (GNUNET_OK ==
4814           GNUNET_SERVICE_run (argc,
4815                               argv,
4816                               "fs",
4817                               GNUNET_SERVICE_OPTION_NONE,
4818                               &run, NULL)) ? 0 : 1;
4819 }
4820
4821 /* end of gnunet-service-fs.c */