another place
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs.c
23  * @brief gnunet anonymity protocol implementation
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - more statistics
28  */
29 #include "platform.h"
30 #include <float.h>
31 #include "gnunet_constants.h"
32 #include "gnunet_core_service.h"
33 #include "gnunet_dht_service.h"
34 #include "gnunet_datastore_service.h"
35 #include "gnunet_load_lib.h"
36 #include "gnunet_peer_lib.h"
37 #include "gnunet_protocols.h"
38 #include "gnunet_signatures.h"
39 #include "gnunet_statistics_service.h"
40 #include "gnunet_util_lib.h"
41 #include "gnunet-service-fs_indexing.h"
42 #include "fs.h"
43
44 #define DEBUG_FS GNUNET_NO
45
46 /**
47  * Should we introduce random latency in processing?  Required for proper
48  * implementation of GAP, but can be disabled for performance evaluation of
49  * the basic routing algorithm.
50  *
51  * Note that with delays enabled, performance can be significantly lower
52  * (several orders of magnitude in 2-peer test runs); if you want to
53  * measure throughput of other components, set this to NO.  Also, you
54  * might want to consider changing 'RETRY_PROBABILITY_INV' to 1 for
55  * a rather wasteful mode of operation (that might still get the highest
56  * throughput overall).
57  */
58 #define SUPPORT_DELAYS GNUNET_YES
59
60 /**
61  * Size for the hash map for DHT requests from the FS
62  * service.  Should be about the number of concurrent
63  * DHT requests we plan to make.
64  */
65 #define FS_DHT_HT_SIZE 1024
66
67 /**
68  * How often do we flush trust values to disk?
69  */
70 #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
71
72 /**
73  * How often do we at most PUT content into the DHT?
74  */
75 #define MAX_DHT_PUT_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
76
77 /**
78  * Inverse of the probability that we will submit the same query
79  * to the same peer again.  If the same peer already got the query
80  * repeatedly recently, the probability is multiplied by the inverse
81  * of this number each time.  Note that we only try about every TTL_DECREMENT/2
82  * plus MAX_CORK_DELAY (so roughly every 3.5s).
83  *
84  * Note that this factor is a key influence to performance in small
85  * networks (especially test networks of 2 peers) because if there is
86  * only a single peer with the data, this value will determine how
87  * soon we might re-try.  For example, a value of 3 can result in 
88  * 1.7 MB/s transfer rates for a 10 MB file when a value of 1 would
89  * give us 5 MB/s.  OTOH, obviously re-trying the same peer can be
90  * rather inefficient in larger networks, hence picking 1 is in 
91  * general not the best choice.
92  */
93 #define RETRY_PROBABILITY_INV 3
94
95 /**
96  * What is the maximum delay for a P2P FS message (in our interaction
97  * with core)?  FS-internal delays are another story.  The value is
98  * chosen based on the 32k block size.  Given that peers typcially
99  * have at least 1 kb/s bandwidth, 45s waits give us a chance to
100  * transmit one message even to the lowest-bandwidth peers.
101  */
102 #define MAX_TRANSMIT_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 45)
103
104 /**
105  * Maximum number of requests (from other peers, overall) that we're
106  * willing to have pending at any given point in time.  Can be changed
107  * via the configuration file (32k is just the default).
108  */
109 static unsigned long long max_pending_requests = (32 * 1024);
110
111
112 /**
113  * Information we keep for each pending reply.  The
114  * actual message follows at the end of this struct.
115  */
116 struct PendingMessage;
117
118 /**
119  * Function called upon completion of a transmission.
120  *
121  * @param cls closure
122  * @param pid ID of receiving peer, 0 on transmission error
123  */
124 typedef void (*TransmissionContinuation)(void * cls, 
125                                          GNUNET_PEER_Id tpid);
126
127
128 /**
129  * Information we keep for each pending message (GET/PUT).  The
130  * actual message follows at the end of this struct.
131  */
132 struct PendingMessage
133 {
134   /**
135    * This is a doubly-linked list of messages to the same peer.
136    */
137   struct PendingMessage *next;
138
139   /**
140    * This is a doubly-linked list of messages to the same peer.
141    */
142   struct PendingMessage *prev;
143
144   /**
145    * Entry in pending message list for this pending message.
146    */ 
147   struct PendingMessageList *pml;  
148
149   /**
150    * Function to call immediately once we have transmitted this
151    * message.
152    */
153   TransmissionContinuation cont;
154
155   /**
156    * Closure for cont.
157    */
158   void *cont_cls;
159
160   /**
161    * Do not transmit this pending message until this deadline.
162    */
163   struct GNUNET_TIME_Absolute delay_until;
164
165   /**
166    * Size of the reply; actual reply message follows
167    * at the end of this struct.
168    */
169   size_t msize;
170   
171   /**
172    * How important is this message for us?
173    */
174   uint32_t priority;
175  
176 };
177
178
179 /**
180  * Information about a peer that we are connected to.
181  * We track data that is useful for determining which
182  * peers should receive our requests.  We also keep
183  * a list of messages to transmit to this peer.
184  */
185 struct ConnectedPeer
186 {
187
188   /**
189    * List of the last clients for which this peer successfully
190    * answered a query.
191    */
192   struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
193
194   /**
195    * List of the last PIDs for which
196    * this peer successfully answered a query;
197    * We use 0 to indicate no successful reply.
198    */
199   GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
200
201   /**
202    * Average delay between sending the peer a request and
203    * getting a reply (only calculated over the requests for
204    * which we actually got a reply).   Calculated
205    * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
206    *
207    * FIXME: actually, this is currently the delay between us originally
208    * receiving (not forwarding!) a request and us receiving a reply from
209    * this peer (regardless of when we transmitted this request to this peer!)
210    */ 
211   struct GNUNET_TIME_Relative avg_delay;
212
213   /**
214    * Point in time until which this peer does not want us to migrate content
215    * to it.
216    */
217   struct GNUNET_TIME_Absolute migration_blocked;
218
219   /**
220    * Time until when we blocked this peer from migrating
221    * data to us.
222    */
223   struct GNUNET_TIME_Absolute last_migration_block;
224
225   /**
226    * Transmission times for the last MAX_QUEUE_PER_PEER
227    * requests for this peer.  Used as a ring buffer, current
228    * offset is stored in 'last_request_times_off'.  If the
229    * oldest entry is more recent than the 'avg_delay', we should
230    * not send any more requests right now.
231    */
232   struct GNUNET_TIME_Absolute last_request_times[MAX_QUEUE_PER_PEER];
233
234   /**
235    * Handle for an active request for transmission to this
236    * peer, or NULL.
237    */
238   struct GNUNET_CORE_TransmitHandle *cth;
239
240   /**
241    * Messages (replies, queries, content migration) we would like to
242    * send to this peer in the near future.  Sorted by priority, head.
243    */
244   struct PendingMessage *pending_messages_head;
245
246   /**
247    * Messages (replies, queries, content migration) we would like to
248    * send to this peer in the near future.  Sorted by priority, tail.
249    */
250   struct PendingMessage *pending_messages_tail;
251
252   /**
253    * How long does it typically take for us to transmit a message
254    * to this peer?  (delay between the request being issued and
255    * the callback being invoked).
256    */
257   struct GNUNET_LOAD_Value *transmission_delay;
258
259   /**
260    * Time when the last transmission request was issued.
261    */
262   struct GNUNET_TIME_Absolute last_transmission_request_start;
263
264   /**
265    * ID of delay task for scheduling transmission.
266    */
267   GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task;
268
269   /**
270    * Average priority of successful replies.  Calculated
271    * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
272    */
273   double avg_priority;
274
275   /**
276    * Increase in traffic preference still to be submitted
277    * to the core service for this peer.
278    */
279   uint64_t inc_preference;
280
281   /**
282    * Trust rating for this peer
283    */
284   uint32_t trust;
285
286   /**
287    * Trust rating for this peer on disk.
288    */
289   uint32_t disk_trust;
290
291   /**
292    * The peer's identity.
293    */
294   GNUNET_PEER_Id pid;  
295
296   /**
297    * Size of the linked list of 'pending_messages'.
298    */
299   unsigned int pending_requests;
300
301   /**
302    * Which offset in "last_p2p_replies" will be updated next?
303    * (we go round-robin).
304    */
305   unsigned int last_p2p_replies_woff;
306
307   /**
308    * Which offset in "last_client_replies" will be updated next?
309    * (we go round-robin).
310    */
311   unsigned int last_client_replies_woff;
312
313   /**
314    * Current offset into 'last_request_times' ring buffer.
315    */
316   unsigned int last_request_times_off;
317
318 };
319
320
321 /**
322  * Information we keep for each pending request.  We should try to
323  * keep this struct as small as possible since its memory consumption
324  * is key to how many requests we can have pending at once.
325  */
326 struct PendingRequest;
327
328
329 /**
330  * Doubly-linked list of requests we are performing
331  * on behalf of the same client.
332  */
333 struct ClientRequestList
334 {
335
336   /**
337    * This is a doubly-linked list.
338    */
339   struct ClientRequestList *next;
340
341   /**
342    * This is a doubly-linked list.
343    */
344   struct ClientRequestList *prev;
345
346   /**
347    * Request this entry represents.
348    */
349   struct PendingRequest *req;
350
351   /**
352    * Client list this request belongs to.
353    */
354   struct ClientList *client_list;
355
356 };
357
358
359 /**
360  * Replies to be transmitted to the client.  The actual
361  * response message is allocated after this struct.
362  */
363 struct ClientResponseMessage
364 {
365   /**
366    * This is a doubly-linked list.
367    */
368   struct ClientResponseMessage *next;
369
370   /**
371    * This is a doubly-linked list.
372    */
373   struct ClientResponseMessage *prev;
374
375   /**
376    * Client list entry this response belongs to.
377    */
378   struct ClientList *client_list;
379
380   /**
381    * Number of bytes in the response.
382    */
383   size_t msize;
384 };
385
386
387 /**
388  * Linked list of clients we are performing requests
389  * for right now.
390  */
391 struct ClientList
392 {
393   /**
394    * This is a linked list.
395    */
396   struct ClientList *next;
397
398   /**
399    * ID of a client making a request, NULL if this entry is for a
400    * peer.
401    */
402   struct GNUNET_SERVER_Client *client;
403
404   /**
405    * Head of list of requests performed on behalf
406    * of this client right now.
407    */
408   struct ClientRequestList *rl_head;
409
410   /**
411    * Tail of list of requests performed on behalf
412    * of this client right now.
413    */
414   struct ClientRequestList *rl_tail;
415
416   /**
417    * Head of linked list of responses.
418    */
419   struct ClientResponseMessage *res_head;
420
421   /**
422    * Tail of linked list of responses.
423    */
424   struct ClientResponseMessage *res_tail;
425
426   /**
427    * Context for sending replies.
428    */
429   struct GNUNET_CONNECTION_TransmitHandle *th;
430
431 };
432
433
434 /**
435  * Information about a peer that we have forwarded this
436  * request to already.  
437  */
438 struct UsedTargetEntry
439 {
440   /**
441    * What was the last time we have transmitted this request to this
442    * peer?
443    */
444   struct GNUNET_TIME_Absolute last_request_time;
445
446   /**
447    * How often have we transmitted this request to this peer?
448    */
449   unsigned int num_requests;
450
451   /**
452    * PID of the target peer.
453    */
454   GNUNET_PEER_Id pid;
455
456 };
457
458
459
460
461
462 /**
463  * Doubly-linked list of messages we are performing
464  * due to a pending request.
465  */
466 struct PendingMessageList
467 {
468
469   /**
470    * This is a doubly-linked list of messages on behalf of the same request.
471    */
472   struct PendingMessageList *next;
473
474   /**
475    * This is a doubly-linked list of messages on behalf of the same request.
476    */
477   struct PendingMessageList *prev;
478
479   /**
480    * Message this entry represents.
481    */
482   struct PendingMessage *pm;
483
484   /**
485    * Request this entry belongs to.
486    */
487   struct PendingRequest *req;
488
489   /**
490    * Peer this message is targeted for.
491    */
492   struct ConnectedPeer *target;
493
494 };
495
496
497 /**
498  * Information we keep for each pending request.  We should try to
499  * keep this struct as small as possible since its memory consumption
500  * is key to how many requests we can have pending at once.
501  */
502 struct PendingRequest
503 {
504
505   /**
506    * If this request was made by a client, this is our entry in the
507    * client request list; otherwise NULL.
508    */
509   struct ClientRequestList *client_request_list;
510
511   /**
512    * Entry of peer responsible for this entry (if this request
513    * was made by a peer).
514    */
515   struct ConnectedPeer *cp;
516
517   /**
518    * If this is a namespace query, pointer to the hash of the public
519    * key of the namespace; otherwise NULL.  Pointer will be to the 
520    * end of this struct (so no need to free it).
521    */
522   const GNUNET_HashCode *namespace;
523
524   /**
525    * Bloomfilter we use to filter out replies that we don't care about
526    * (anymore).  NULL as long as we are interested in all replies.
527    */
528   struct GNUNET_CONTAINER_BloomFilter *bf;
529
530   /**
531    * Context of our GNUNET_CORE_peer_change_preference call.
532    */
533   struct GNUNET_CORE_InformationRequestContext *irc;
534
535   /**
536    * Reference to DHT get operation for this request (or NULL).
537    */
538   struct GNUNET_DHT_GetHandle *dht_get;
539
540   /**
541    * Hash code of all replies that we have seen so far (only valid
542    * if client is not NULL since we only track replies like this for
543    * our own clients).
544    */
545   GNUNET_HashCode *replies_seen;
546
547   /**
548    * Node in the heap representing this entry; NULL
549    * if we have no heap node.
550    */
551   struct GNUNET_CONTAINER_HeapNode *hnode;
552
553   /**
554    * Head of list of messages being performed on behalf of this
555    * request.
556    */
557   struct PendingMessageList *pending_head;
558
559   /**
560    * Tail of list of messages being performed on behalf of this
561    * request.
562    */
563   struct PendingMessageList *pending_tail;
564
565   /**
566    * When did we first see this request (form this peer), or, if our
567    * client is initiating, when did we last initiate a search?
568    */
569   struct GNUNET_TIME_Absolute start_time;
570
571   /**
572    * The query that this request is for.
573    */
574   GNUNET_HashCode query;
575
576   /**
577    * The task responsible for transmitting queries
578    * for this request.
579    */
580   GNUNET_SCHEDULER_TaskIdentifier task;
581
582   /**
583    * (Interned) Peer identifier that identifies a preferred target
584    * for requests.
585    */
586   GNUNET_PEER_Id target_pid;
587
588   /**
589    * (Interned) Peer identifiers of peers that have already
590    * received our query for this content.
591    */
592   struct UsedTargetEntry *used_targets;
593   
594   /**
595    * Our entry in the queue (non-NULL while we wait for our
596    * turn to interact with the local database).
597    */
598   struct GNUNET_DATASTORE_QueueEntry *qe;
599
600   /**
601    * Size of the 'bf' (in bytes).
602    */
603   size_t bf_size;
604
605   /**
606    * Desired anonymity level; only valid for requests from a local client.
607    */
608   uint32_t anonymity_level;
609
610   /**
611    * How many entries in "used_targets" are actually valid?
612    */
613   unsigned int used_targets_off;
614
615   /**
616    * How long is the "used_targets" array?
617    */
618   unsigned int used_targets_size;
619
620   /**
621    * Number of results found for this request.
622    */
623   unsigned int results_found;
624
625   /**
626    * How many entries in "replies_seen" are actually valid?
627    */
628   unsigned int replies_seen_off;
629
630   /**
631    * How long is the "replies_seen" array?
632    */
633   unsigned int replies_seen_size;
634   
635   /**
636    * Priority with which this request was made.  If one of our clients
637    * made the request, then this is the current priority that we are
638    * using when initiating the request.  This value is used when
639    * we decide to reward other peers with trust for providing a reply.
640    */
641   uint32_t priority;
642
643   /**
644    * Priority points left for us to spend when forwarding this request
645    * to other peers.
646    */
647   uint32_t remaining_priority;
648
649   /**
650    * Number to mingle hashes for bloom-filter tests with.
651    */
652   int32_t mingle;
653
654   /**
655    * TTL with which we saw this request (or, if we initiated, TTL that
656    * we used for the request).
657    */
658   int32_t ttl;
659   
660   /**
661    * Type of the content that this request is for.
662    */
663   enum GNUNET_BLOCK_Type type;
664
665   /**
666    * Remove this request after transmission of the current response.
667    */
668   int8_t do_remove;
669
670   /**
671    * GNUNET_YES if we should not forward this request to other peers.
672    */
673   int8_t local_only;
674
675   /**
676    * GNUNET_YES if we should not forward this request to other peers.
677    */
678   int8_t forward_only;
679
680 };
681
682
683 /**
684  * Block that is ready for migration to other peers.  Actual data is at the end of the block.
685  */
686 struct MigrationReadyBlock
687 {
688
689   /**
690    * This is a doubly-linked list.
691    */
692   struct MigrationReadyBlock *next;
693
694   /**
695    * This is a doubly-linked list.
696    */
697   struct MigrationReadyBlock *prev;
698
699   /**
700    * Query for the block.
701    */
702   GNUNET_HashCode query;
703
704   /**
705    * When does this block expire? 
706    */
707   struct GNUNET_TIME_Absolute expiration;
708
709   /**
710    * Peers we would consider forwarding this
711    * block to.  Zero for empty entries.
712    */
713   GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
714
715   /**
716    * Size of the block.
717    */
718   size_t size;
719
720   /**
721    *  Number of targets already used.
722    */
723   unsigned int used_targets;
724
725   /**
726    * Type of the block.
727    */
728   enum GNUNET_BLOCK_Type type;
729 };
730
731
732 /**
733  * Our connection to the datastore.
734  */
735 static struct GNUNET_DATASTORE_Handle *dsh;
736
737 /**
738  * Our block context.
739  */
740 static struct GNUNET_BLOCK_Context *block_ctx;
741
742 /**
743  * Our block configuration.
744  */
745 static struct GNUNET_CONFIGURATION_Handle *block_cfg;
746
747 /**
748  * Our scheduler.
749  */
750 static struct GNUNET_SCHEDULER_Handle *sched;
751
752 /**
753  * Our configuration.
754  */
755 static const struct GNUNET_CONFIGURATION_Handle *cfg;
756
757 /**
758  * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
759  */
760 static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
761
762 /**
763  * Map of peer identifiers to "struct PendingRequest" (for that peer).
764  */
765 static struct GNUNET_CONTAINER_MultiHashMap *peer_request_map;
766
767 /**
768  * Map of query identifiers to "struct PendingRequest" (for that query).
769  */
770 static struct GNUNET_CONTAINER_MultiHashMap *query_request_map;
771
772 /**
773  * Heap with the request that will expire next at the top.  Contains
774  * pointers of type "struct PendingRequest*"; these will *also* be
775  * aliased from the "requests_by_peer" data structures and the
776  * "requests_by_query" table.  Note that requests from our clients
777  * don't expire and are thus NOT in the "requests_by_expiration"
778  * (or the "requests_by_peer" tables).
779  */
780 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
781
782 /**
783  * Handle for reporting statistics.
784  */
785 static struct GNUNET_STATISTICS_Handle *stats;
786
787 /**
788  * Linked list of clients we are currently processing requests for.
789  */
790 static struct ClientList *client_list;
791
792 /**
793  * Pointer to handle to the core service (points to NULL until we've
794  * connected to it).
795  */
796 static struct GNUNET_CORE_Handle *core;
797
798 /**
799  * Head of linked list of blocks that can be migrated.
800  */
801 static struct MigrationReadyBlock *mig_head;
802
803 /**
804  * Tail of linked list of blocks that can be migrated.
805  */
806 static struct MigrationReadyBlock *mig_tail;
807
808 /**
809  * Request to datastore for migration (or NULL).
810  */
811 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
812
813 /**
814  * Request to datastore for DHT PUTs (or NULL).
815  */
816 static struct GNUNET_DATASTORE_QueueEntry *dht_qe;
817
818 /**
819  * Type we will request for the next DHT PUT round from the datastore.
820  */
821 static enum GNUNET_BLOCK_Type dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
822
823 /**
824  * Where do we store trust information?
825  */
826 static char *trustDirectory;
827
828 /**
829  * ID of task that collects blocks for migration.
830  */
831 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
832
833 /**
834  * ID of task that collects blocks for DHT PUTs.
835  */
836 static GNUNET_SCHEDULER_TaskIdentifier dht_task;
837
838 /**
839  * What is the maximum frequency at which we are allowed to
840  * poll the datastore for migration content?
841  */
842 static struct GNUNET_TIME_Relative min_migration_delay;
843
844 /**
845  * Handle for DHT operations.
846  */
847 static struct GNUNET_DHT_Handle *dht_handle;
848
849 /**
850  * Size of the doubly-linked list of migration blocks.
851  */
852 static unsigned int mig_size;
853
854 /**
855  * Are we allowed to migrate content to this peer.
856  */
857 static int active_migration;
858
859 /**
860  * How many entires with zero anonymity do we currently estimate
861  * to have in the database?
862  */
863 static unsigned int zero_anonymity_count_estimate;
864
865 /**
866  * Typical priorities we're seeing from other peers right now.  Since
867  * most priorities will be zero, this value is the weighted average of
868  * non-zero priorities seen "recently".  In order to ensure that new
869  * values do not dramatically change the ratio, values are first
870  * "capped" to a reasonable range (+N of the current value) and then
871  * averaged into the existing value by a ratio of 1:N.  Hence
872  * receiving the largest possible priority can still only raise our
873  * "current_priorities" by at most 1.
874  */
875 static double current_priorities;
876
877 /**
878  * Datastore 'GET' load tracking.
879  */
880 static struct GNUNET_LOAD_Value *datastore_get_load;
881
882 /**
883  * Datastore 'PUT' load tracking.
884  */
885 static struct GNUNET_LOAD_Value *datastore_put_load;
886
887 /**
888  * How long do requests typically stay in the routing table?
889  */
890 static struct GNUNET_LOAD_Value *rt_entry_lifetime;
891
892 /**
893  * We've just now completed a datastore request.  Update our
894  * datastore load calculations.
895  *
896  * @param start time when the datastore request was issued
897  */
898 static void
899 update_datastore_delays (struct GNUNET_TIME_Absolute start)
900 {
901   struct GNUNET_TIME_Relative delay;
902
903   delay = GNUNET_TIME_absolute_get_duration (start);
904   GNUNET_LOAD_update (datastore_get_load,
905                       delay.value);
906 }
907
908
909 /**
910  * Get the filename under which we would store the GNUNET_HELLO_Message
911  * for the given host and protocol.
912  * @return filename of the form DIRECTORY/HOSTID
913  */
914 static char *
915 get_trust_filename (const struct GNUNET_PeerIdentity *id)
916 {
917   struct GNUNET_CRYPTO_HashAsciiEncoded fil;
918   char *fn;
919
920   GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
921   GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil);
922   return fn;
923 }
924
925
926
927 /**
928  * Transmit messages by copying it to the target buffer
929  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
930  * for writing in the meantime.  In that case, do nothing
931  * (the disconnect or shutdown handler will take care of the rest).
932  * If we were able to transmit messages and there are still more
933  * pending, ask core again for further calls to this function.
934  *
935  * @param cls closure, pointer to the 'struct ConnectedPeer*'
936  * @param size number of bytes available in buf
937  * @param buf where the callee should write the message
938  * @return number of bytes written to buf
939  */
940 static size_t
941 transmit_to_peer (void *cls,
942                   size_t size, void *buf);
943
944
945 /* ******************* clean up functions ************************ */
946
947 /**
948  * Delete the given migration block.
949  *
950  * @param mb block to delete
951  */
952 static void
953 delete_migration_block (struct MigrationReadyBlock *mb)
954 {
955   GNUNET_CONTAINER_DLL_remove (mig_head,
956                                mig_tail,
957                                mb);
958   GNUNET_PEER_decrement_rcs (mb->target_list,
959                              MIGRATION_LIST_SIZE);
960   mig_size--;
961   GNUNET_free (mb);
962 }
963
964
965 /**
966  * Compare the distance of two peers to a key.
967  *
968  * @param key key
969  * @param p1 first peer
970  * @param p2 second peer
971  * @return GNUNET_YES if P1 is closer to key than P2
972  */
973 static int
974 is_closer (const GNUNET_HashCode *key,
975            const struct GNUNET_PeerIdentity *p1,
976            const struct GNUNET_PeerIdentity *p2)
977 {
978   return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey,
979                                     &p2->hashPubKey,
980                                     key);
981 }
982
983
984 /**
985  * Consider migrating content to a given peer.
986  *
987  * @param cls 'struct MigrationReadyBlock*' to select
988  *            targets for (or NULL for none)
989  * @param key ID of the peer 
990  * @param value 'struct ConnectedPeer' of the peer
991  * @return GNUNET_YES (always continue iteration)
992  */
993 static int
994 consider_migration (void *cls,
995                     const GNUNET_HashCode *key,
996                     void *value)
997 {
998   struct MigrationReadyBlock *mb = cls;
999   struct ConnectedPeer *cp = value;
1000   struct MigrationReadyBlock *pos;
1001   struct GNUNET_PeerIdentity cppid;
1002   struct GNUNET_PeerIdentity otherpid;
1003   struct GNUNET_PeerIdentity worstpid;
1004   size_t msize;
1005   unsigned int i;
1006   unsigned int repl;
1007   
1008   /* consider 'cp' as a migration target for mb */
1009   if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).value > 0)
1010     return GNUNET_YES; /* peer has requested no migration! */
1011   if (mb != NULL)
1012     {
1013       GNUNET_PEER_resolve (cp->pid,
1014                            &cppid);
1015       repl = MIGRATION_LIST_SIZE;
1016       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1017         {
1018           if (mb->target_list[i] == 0)
1019             {
1020               mb->target_list[i] = cp->pid;
1021               GNUNET_PEER_change_rc (mb->target_list[i], 1);
1022               repl = MIGRATION_LIST_SIZE;
1023               break;
1024             }
1025           GNUNET_PEER_resolve (mb->target_list[i],
1026                                &otherpid);
1027           if ( (repl == MIGRATION_LIST_SIZE) &&
1028                is_closer (&mb->query,
1029                           &cppid,
1030                           &otherpid)) 
1031             {
1032               repl = i;
1033               worstpid = otherpid;
1034             }
1035           else if ( (repl != MIGRATION_LIST_SIZE) &&
1036                     (is_closer (&mb->query,
1037                                 &worstpid,
1038                                 &otherpid) ) )
1039             {
1040               repl = i;
1041               worstpid = otherpid;
1042             }       
1043         }
1044       if (repl != MIGRATION_LIST_SIZE) 
1045         {
1046           GNUNET_PEER_change_rc (mb->target_list[repl], -1);
1047           mb->target_list[repl] = cp->pid;
1048           GNUNET_PEER_change_rc (mb->target_list[repl], 1);
1049         }
1050     }
1051
1052   /* consider scheduling transmission to cp for content migration */
1053   if (cp->cth != NULL)        
1054     return GNUNET_YES; 
1055   msize = 0;
1056   pos = mig_head;
1057   while (pos != NULL)
1058     {
1059       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1060         {
1061           if (cp->pid == pos->target_list[i])
1062             {
1063               if (msize == 0)
1064                 msize = pos->size;
1065               else
1066                 msize = GNUNET_MIN (msize,
1067                                     pos->size);
1068               break;
1069             }
1070         }
1071       pos = pos->next;
1072     }
1073   if (msize == 0)
1074     return GNUNET_YES; /* no content available */
1075 #if DEBUG_FS
1076   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1077               "Trying to migrate at least %u bytes to peer `%s'\n",
1078               msize,
1079               GNUNET_h2s (key));
1080 #endif
1081   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
1082     {
1083       GNUNET_SCHEDULER_cancel (sched, cp->delayed_transmission_request_task);
1084       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
1085     }
1086   cp->cth 
1087     = GNUNET_CORE_notify_transmit_ready (core,
1088                                          0, GNUNET_TIME_UNIT_FOREVER_REL,
1089                                          (const struct GNUNET_PeerIdentity*) key,
1090                                          msize + sizeof (struct PutMessage),
1091                                          &transmit_to_peer,
1092                                          cp);
1093   return GNUNET_YES;
1094 }
1095
1096
1097 /**
1098  * Task that is run periodically to obtain blocks for content
1099  * migration
1100  * 
1101  * @param cls unused
1102  * @param tc scheduler context (also unused)
1103  */
1104 static void
1105 gather_migration_blocks (void *cls,
1106                          const struct GNUNET_SCHEDULER_TaskContext *tc);
1107
1108
1109
1110
1111 /**
1112  * Task that is run periodically to obtain blocks for DHT PUTs.
1113  * 
1114  * @param cls type of blocks to gather
1115  * @param tc scheduler context (unused)
1116  */
1117 static void
1118 gather_dht_put_blocks (void *cls,
1119                        const struct GNUNET_SCHEDULER_TaskContext *tc);
1120
1121
1122 /**
1123  * If the migration task is not currently running, consider
1124  * (re)scheduling it with the appropriate delay.
1125  */
1126 static void
1127 consider_migration_gathering ()
1128 {
1129   struct GNUNET_TIME_Relative delay;
1130
1131   if (dsh == NULL)
1132     return;
1133   if (mig_qe != NULL)
1134     return;
1135   if (mig_task != GNUNET_SCHEDULER_NO_TASK)
1136     return;
1137   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
1138                                          mig_size);
1139   delay = GNUNET_TIME_relative_divide (delay,
1140                                        MAX_MIGRATION_QUEUE);
1141   delay = GNUNET_TIME_relative_max (delay,
1142                                     min_migration_delay);
1143   mig_task = GNUNET_SCHEDULER_add_delayed (sched,
1144                                            delay,
1145                                            &gather_migration_blocks,
1146                                            NULL);
1147 }
1148
1149
1150 /**
1151  * If the DHT PUT gathering task is not currently running, consider
1152  * (re)scheduling it with the appropriate delay.
1153  */
1154 static void
1155 consider_dht_put_gathering (void *cls)
1156 {
1157   struct GNUNET_TIME_Relative delay;
1158
1159   if (dsh == NULL)
1160     return;
1161   if (dht_qe != NULL)
1162     return;
1163   if (dht_task != GNUNET_SCHEDULER_NO_TASK)
1164     return;
1165   if (zero_anonymity_count_estimate > 0)
1166     {
1167       delay = GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY,
1168                                            zero_anonymity_count_estimate);
1169       delay = GNUNET_TIME_relative_min (delay,
1170                                         MAX_DHT_PUT_FREQ);
1171     }
1172   else
1173     {
1174       /* if we have NO zero-anonymity content yet, wait 5 minutes for some to
1175          (hopefully) appear */
1176       delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5);
1177     }
1178   dht_task = GNUNET_SCHEDULER_add_delayed (sched,
1179                                            delay,
1180                                            &gather_dht_put_blocks,
1181                                            cls);
1182 }
1183
1184
1185 /**
1186  * Process content offered for migration.
1187  *
1188  * @param cls closure
1189  * @param key key for the content
1190  * @param size number of bytes in data
1191  * @param data content stored
1192  * @param type type of the content
1193  * @param priority priority of the content
1194  * @param anonymity anonymity-level for the content
1195  * @param expiration expiration time for the content
1196  * @param uid unique identifier for the datum;
1197  *        maybe 0 if no unique identifier is available
1198  */
1199 static void
1200 process_migration_content (void *cls,
1201                            const GNUNET_HashCode * key,
1202                            size_t size,
1203                            const void *data,
1204                            enum GNUNET_BLOCK_Type type,
1205                            uint32_t priority,
1206                            uint32_t anonymity,
1207                            struct GNUNET_TIME_Absolute
1208                            expiration, uint64_t uid)
1209 {
1210   struct MigrationReadyBlock *mb;
1211   
1212   if (key == NULL)
1213     {
1214       mig_qe = NULL;
1215       if (mig_size < MAX_MIGRATION_QUEUE)  
1216         consider_migration_gathering ();
1217       return;
1218     }
1219   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1220     {
1221       if (GNUNET_OK !=
1222           GNUNET_FS_handle_on_demand_block (key, size, data,
1223                                             type, priority, anonymity,
1224                                             expiration, uid, 
1225                                             &process_migration_content,
1226                                             NULL))
1227         {
1228           GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1229         }
1230       return;
1231     }
1232 #if DEBUG_FS
1233   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1234               "Retrieved block `%s' of type %u for migration\n",
1235               GNUNET_h2s (key),
1236               type);
1237 #endif
1238   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
1239   mb->query = *key;
1240   mb->expiration = expiration;
1241   mb->size = size;
1242   mb->type = type;
1243   memcpy (&mb[1], data, size);
1244   GNUNET_CONTAINER_DLL_insert_after (mig_head,
1245                                      mig_tail,
1246                                      mig_tail,
1247                                      mb);
1248   mig_size++;
1249   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1250                                          &consider_migration,
1251                                          mb);
1252   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1253 }
1254
1255
1256 /**
1257  * Function called upon completion of the DHT PUT operation.
1258  */
1259 static void
1260 dht_put_continuation (void *cls,
1261                       const struct GNUNET_SCHEDULER_TaskContext *tc)
1262 {
1263   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1264 }
1265
1266
1267 /**
1268  * Store content in DHT.
1269  *
1270  * @param cls closure
1271  * @param key key for the content
1272  * @param size number of bytes in data
1273  * @param data content stored
1274  * @param type type of the content
1275  * @param priority priority of the content
1276  * @param anonymity anonymity-level for the content
1277  * @param expiration expiration time for the content
1278  * @param uid unique identifier for the datum;
1279  *        maybe 0 if no unique identifier is available
1280  */
1281 static void
1282 process_dht_put_content (void *cls,
1283                          const GNUNET_HashCode * key,
1284                          size_t size,
1285                          const void *data,
1286                          enum GNUNET_BLOCK_Type type,
1287                          uint32_t priority,
1288                          uint32_t anonymity,
1289                          struct GNUNET_TIME_Absolute
1290                          expiration, uint64_t uid)
1291
1292   static unsigned int counter;
1293   static GNUNET_HashCode last_vhash;
1294   static GNUNET_HashCode vhash;
1295
1296   if (key == NULL)
1297     {
1298       dht_qe = NULL;
1299       consider_dht_put_gathering (cls);
1300       return;
1301     }
1302   /* slightly funky code to estimate the total number of values with zero
1303      anonymity from the maximum observed length of a monotonically increasing 
1304      sequence of hashes over the contents */
1305   GNUNET_CRYPTO_hash (data, size, &vhash);
1306   if (GNUNET_CRYPTO_hash_cmp (&vhash, &last_vhash) <= 0)
1307     {
1308       if (zero_anonymity_count_estimate > 0)
1309         zero_anonymity_count_estimate /= 2;
1310       counter = 0;
1311     }
1312   last_vhash = vhash;
1313   if (counter < 31)
1314     counter++;
1315   if (zero_anonymity_count_estimate < (1 << counter))
1316     zero_anonymity_count_estimate = (1 << counter);
1317 #if DEBUG_FS
1318   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1319               "Retrieved block `%s' of type %u for DHT PUT\n",
1320               GNUNET_h2s (key),
1321               type);
1322 #endif
1323   GNUNET_DHT_put (dht_handle,
1324                   key,
1325                   GNUNET_DHT_RO_NONE,
1326                   type,
1327                   size,
1328                   data,
1329                   expiration,
1330                   GNUNET_TIME_UNIT_FOREVER_REL,
1331                   &dht_put_continuation,
1332                   cls);
1333 }
1334
1335
1336 /**
1337  * Task that is run periodically to obtain blocks for content
1338  * migration
1339  * 
1340  * @param cls unused
1341  * @param tc scheduler context (also unused)
1342  */
1343 static void
1344 gather_migration_blocks (void *cls,
1345                          const struct GNUNET_SCHEDULER_TaskContext *tc)
1346 {
1347   mig_task = GNUNET_SCHEDULER_NO_TASK;
1348   if (dsh != NULL)
1349     {
1350       mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, UINT_MAX,
1351                                             GNUNET_TIME_UNIT_FOREVER_REL,
1352                                             &process_migration_content, NULL);
1353       GNUNET_assert (mig_qe != NULL);
1354     }
1355 }
1356
1357
1358 /**
1359  * Task that is run periodically to obtain blocks for DHT PUTs.
1360  * 
1361  * @param cls type of blocks to gather
1362  * @param tc scheduler context (unused)
1363  */
1364 static void
1365 gather_dht_put_blocks (void *cls,
1366                        const struct GNUNET_SCHEDULER_TaskContext *tc)
1367 {
1368   dht_task = GNUNET_SCHEDULER_NO_TASK;
1369   if (dsh != NULL)
1370     {
1371       if (dht_put_type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1372         dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
1373       dht_qe = GNUNET_DATASTORE_get_zero_anonymity (dsh, 0, UINT_MAX,
1374                                                     GNUNET_TIME_UNIT_FOREVER_REL,
1375                                                     dht_put_type++,
1376                                                     &process_dht_put_content, NULL);
1377       GNUNET_assert (dht_qe != NULL);
1378     }
1379 }
1380
1381
1382 /**
1383  * We're done with a particular message list entry.
1384  * Free all associated resources.
1385  * 
1386  * @param pml entry to destroy
1387  */
1388 static void
1389 destroy_pending_message_list_entry (struct PendingMessageList *pml)
1390 {
1391   GNUNET_CONTAINER_DLL_remove (pml->req->pending_head,
1392                                pml->req->pending_tail,
1393                                pml);
1394   GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head,
1395                                pml->target->pending_messages_tail,
1396                                pml->pm);
1397   pml->target->pending_requests--;
1398   GNUNET_free (pml->pm);
1399   GNUNET_free (pml);
1400 }
1401
1402
1403 /**
1404  * Destroy the given pending message (and call the respective
1405  * continuation).
1406  *
1407  * @param pm message to destroy
1408  * @param tpid id of peer that the message was delivered to, or 0 for none
1409  */
1410 static void
1411 destroy_pending_message (struct PendingMessage *pm,
1412                          GNUNET_PEER_Id tpid)
1413 {
1414   struct PendingMessageList *pml = pm->pml;
1415   TransmissionContinuation cont;
1416   void *cont_cls;
1417
1418   cont = pm->cont;
1419   cont_cls = pm->cont_cls;
1420   if (pml != NULL)
1421     {
1422       GNUNET_assert (pml->pm == pm);
1423       GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
1424       destroy_pending_message_list_entry (pml);
1425     }
1426   else
1427     {
1428       GNUNET_free (pm);
1429     }
1430   if (cont != NULL)
1431     cont (cont_cls, tpid);  
1432 }
1433
1434
1435 /**
1436  * We're done processing a particular request.
1437  * Free all associated resources.
1438  *
1439  * @param pr request to destroy
1440  */
1441 static void
1442 destroy_pending_request (struct PendingRequest *pr)
1443 {
1444   struct GNUNET_PeerIdentity pid;
1445   unsigned int i;
1446
1447   if (pr->hnode != NULL)
1448     {
1449       GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
1450                                          pr->hnode);
1451       pr->hnode = NULL;
1452     }
1453   if (NULL == pr->client_request_list)
1454     {
1455       GNUNET_STATISTICS_update (stats,
1456                                 gettext_noop ("# P2P searches active"),
1457                                 -1,
1458                                 GNUNET_NO);
1459     }
1460   else
1461     {
1462       GNUNET_STATISTICS_update (stats,
1463                                 gettext_noop ("# client searches active"),
1464                                 -1,
1465                                 GNUNET_NO);
1466     }
1467   if (GNUNET_YES == 
1468       GNUNET_CONTAINER_multihashmap_remove (query_request_map,
1469                                             &pr->query,
1470                                             pr))
1471     {
1472       GNUNET_LOAD_update (rt_entry_lifetime,
1473                           GNUNET_TIME_absolute_get_duration (pr->start_time).value);
1474     }
1475   if (pr->qe != NULL)
1476      {
1477       GNUNET_DATASTORE_cancel (pr->qe);
1478       pr->qe = NULL;
1479     }
1480   if (pr->dht_get != NULL)
1481     {
1482       GNUNET_DHT_get_stop (pr->dht_get);
1483       pr->dht_get = NULL;
1484     }
1485   if (pr->client_request_list != NULL)
1486     {
1487       GNUNET_CONTAINER_DLL_remove (pr->client_request_list->client_list->rl_head,
1488                                    pr->client_request_list->client_list->rl_tail,
1489                                    pr->client_request_list);
1490       GNUNET_free (pr->client_request_list);
1491       pr->client_request_list = NULL;
1492     }
1493   if (pr->cp != NULL)
1494     {
1495       GNUNET_PEER_resolve (pr->cp->pid,
1496                            &pid);
1497       (void) GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1498                                                    &pid.hashPubKey,
1499                                                    pr);
1500       pr->cp = NULL;
1501     }
1502   if (pr->bf != NULL)
1503     {
1504       GNUNET_CONTAINER_bloomfilter_free (pr->bf);                                        
1505       pr->bf = NULL;
1506     }
1507   if (pr->irc != NULL)
1508     {
1509       GNUNET_CORE_peer_change_preference_cancel (pr->irc);
1510       pr->irc = NULL;
1511     }
1512   if (pr->replies_seen != NULL)
1513     {
1514       GNUNET_free (pr->replies_seen);
1515       pr->replies_seen = NULL;
1516     }
1517   if (pr->task != GNUNET_SCHEDULER_NO_TASK)
1518     {
1519       GNUNET_SCHEDULER_cancel (sched,
1520                                pr->task);
1521       pr->task = GNUNET_SCHEDULER_NO_TASK;
1522     }
1523   while (NULL != pr->pending_head)    
1524     destroy_pending_message_list_entry (pr->pending_head);
1525   GNUNET_PEER_change_rc (pr->target_pid, -1);
1526   if (pr->used_targets != NULL)
1527     {
1528       for (i=0;i<pr->used_targets_off;i++)
1529         GNUNET_PEER_change_rc (pr->used_targets[i].pid, -1);
1530       GNUNET_free (pr->used_targets);
1531       pr->used_targets_off = 0;
1532       pr->used_targets_size = 0;
1533       pr->used_targets = NULL;
1534     }
1535   GNUNET_free (pr);
1536 }
1537
1538
1539 /**
1540  * Method called whenever a given peer connects.
1541  *
1542  * @param cls closure, not used
1543  * @param peer peer identity this notification is about
1544  * @param latency reported latency of the connection with 'other'
1545  * @param distance reported distance (DV) to 'other' 
1546  */
1547 static void 
1548 peer_connect_handler (void *cls,
1549                       const struct
1550                       GNUNET_PeerIdentity * peer,
1551                       struct GNUNET_TIME_Relative latency,
1552                       uint32_t distance)
1553 {
1554   struct ConnectedPeer *cp;
1555   struct MigrationReadyBlock *pos;
1556   char *fn;
1557   uint32_t trust;
1558   
1559   cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
1560   cp->transmission_delay = GNUNET_LOAD_value_init ();
1561   cp->pid = GNUNET_PEER_intern (peer);
1562
1563   fn = get_trust_filename (peer);
1564   if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
1565       (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
1566     cp->disk_trust = cp->trust = ntohl (trust);
1567   GNUNET_free (fn);
1568
1569   GNUNET_break (GNUNET_OK ==
1570                 GNUNET_CONTAINER_multihashmap_put (connected_peers,
1571                                                    &peer->hashPubKey,
1572                                                    cp,
1573                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1574
1575   pos = mig_head;
1576   while (NULL != pos)
1577     {
1578       (void) consider_migration (pos, &peer->hashPubKey, cp);
1579       pos = pos->next;
1580     }
1581 }
1582
1583
1584 /**
1585  * Increase the host credit by a value.
1586  *
1587  * @param host which peer to change the trust value on
1588  * @param value is the int value by which the
1589  *  host credit is to be increased or decreased
1590  * @returns the actual change in trust (positive or negative)
1591  */
1592 static int
1593 change_host_trust (struct ConnectedPeer *host, int value)
1594 {
1595   unsigned int old_trust;
1596
1597   if (value == 0)
1598     return 0;
1599   GNUNET_assert (host != NULL);
1600   old_trust = host->trust;
1601   if (value > 0)
1602     {
1603       if (host->trust + value < host->trust)
1604         {
1605           value = UINT32_MAX - host->trust;
1606           host->trust = UINT32_MAX;
1607         }
1608       else
1609         host->trust += value;
1610     }
1611   else
1612     {
1613       if (host->trust < -value)
1614         {
1615           value = -host->trust;
1616           host->trust = 0;
1617         }
1618       else
1619         host->trust += value;
1620     }
1621   return value;
1622 }
1623
1624
1625 /**
1626  * Write host-trust information to a file - flush the buffer entry!
1627  */
1628 static int
1629 flush_trust (void *cls,
1630              const GNUNET_HashCode *key,
1631              void *value)
1632 {
1633   struct ConnectedPeer *host = value;
1634   char *fn;
1635   uint32_t trust;
1636   struct GNUNET_PeerIdentity pid;
1637
1638   if (host->trust == host->disk_trust)
1639     return GNUNET_OK;                     /* unchanged */
1640   GNUNET_PEER_resolve (host->pid,
1641                        &pid);
1642   fn = get_trust_filename (&pid);
1643   if (host->trust == 0)
1644     {
1645       if ((0 != UNLINK (fn)) && (errno != ENOENT))
1646         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
1647                                   GNUNET_ERROR_TYPE_BULK, "unlink", fn);
1648     }
1649   else
1650     {
1651       trust = htonl (host->trust);
1652       if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust, 
1653                                                     sizeof(uint32_t),
1654                                                     GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE
1655                                                     | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ))
1656         host->disk_trust = host->trust;
1657     }
1658   GNUNET_free (fn);
1659   return GNUNET_OK;
1660 }
1661
1662 /**
1663  * Call this method periodically to scan data/hosts for new hosts.
1664  */
1665 static void
1666 cron_flush_trust (void *cls,
1667                   const struct GNUNET_SCHEDULER_TaskContext *tc)
1668 {
1669
1670   if (NULL == connected_peers)
1671     return;
1672   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1673                                          &flush_trust,
1674                                          NULL);
1675   if (NULL == tc)
1676     return;
1677   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1678     return;
1679   GNUNET_SCHEDULER_add_delayed (tc->sched,
1680                                 TRUST_FLUSH_FREQ, &cron_flush_trust, NULL);
1681 }
1682
1683
1684 /**
1685  * Free (each) request made by the peer.
1686  *
1687  * @param cls closure, points to peer that the request belongs to
1688  * @param key current key code
1689  * @param value value in the hash map
1690  * @return GNUNET_YES (we should continue to iterate)
1691  */
1692 static int
1693 destroy_request (void *cls,
1694                  const GNUNET_HashCode * key,
1695                  void *value)
1696 {
1697   const struct GNUNET_PeerIdentity * peer = cls;
1698   struct PendingRequest *pr = value;
1699   
1700   GNUNET_break (GNUNET_YES ==
1701                 GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1702                                                       &peer->hashPubKey,
1703                                                       pr));
1704   destroy_pending_request (pr);
1705   return GNUNET_YES;
1706 }
1707
1708
1709 /**
1710  * Method called whenever a peer disconnects.
1711  *
1712  * @param cls closure, not used
1713  * @param peer peer identity this notification is about
1714  */
1715 static void
1716 peer_disconnect_handler (void *cls,
1717                          const struct
1718                          GNUNET_PeerIdentity * peer)
1719 {
1720   struct ConnectedPeer *cp;
1721   struct PendingMessage *pm;
1722   unsigned int i;
1723   struct MigrationReadyBlock *pos;
1724   struct MigrationReadyBlock *next;
1725
1726   GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map,
1727                                               &peer->hashPubKey,
1728                                               &destroy_request,
1729                                               (void*) peer);
1730   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1731                                           &peer->hashPubKey);
1732   if (cp == NULL)
1733     return;
1734   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1735     {
1736       if (NULL != cp->last_client_replies[i])
1737         {
1738           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1739           cp->last_client_replies[i] = NULL;
1740         }
1741     }
1742   GNUNET_break (GNUNET_YES ==
1743                 GNUNET_CONTAINER_multihashmap_remove (connected_peers,
1744                                                       &peer->hashPubKey,
1745                                                       cp));
1746   /* remove this peer from migration considerations; schedule
1747      alternatives */
1748   next = mig_head;
1749   while (NULL != (pos = next))
1750     {
1751       next = pos->next;
1752       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1753         {
1754           if (pos->target_list[i] == cp->pid)
1755             {
1756               GNUNET_PEER_change_rc (pos->target_list[i], -1);
1757               pos->target_list[i] = 0;
1758             }
1759          }
1760       if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers))
1761         {
1762           delete_migration_block (pos);
1763           consider_migration_gathering ();
1764           continue;
1765         }
1766       GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1767                                              &consider_migration,
1768                                              pos);
1769     }
1770   GNUNET_PEER_change_rc (cp->pid, -1);
1771   GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1772   if (NULL != cp->cth)
1773     {
1774       GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1775       cp->cth = NULL;
1776     }
1777   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
1778     {
1779       GNUNET_SCHEDULER_cancel (sched, cp->delayed_transmission_request_task);
1780       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
1781     }
1782   while (NULL != (pm = cp->pending_messages_head))
1783     destroy_pending_message (pm, 0 /* delivery failed */);
1784   GNUNET_LOAD_value_free (cp->transmission_delay);
1785   GNUNET_break (0 == cp->pending_requests);
1786   GNUNET_free (cp);
1787 }
1788
1789
1790 /**
1791  * Iterator over hash map entries that removes all occurences
1792  * of the given 'client' from the 'last_client_replies' of the
1793  * given connected peer.
1794  *
1795  * @param cls closure, the 'struct GNUNET_SERVER_Client*' to remove
1796  * @param key current key code (unused)
1797  * @param value value in the hash map (the 'struct ConnectedPeer*' to change)
1798  * @return GNUNET_YES (we should continue to iterate)
1799  */
1800 static int
1801 remove_client_from_last_client_replies (void *cls,
1802                                         const GNUNET_HashCode * key,
1803                                         void *value)
1804 {
1805   struct GNUNET_SERVER_Client *client = cls;
1806   struct ConnectedPeer *cp = value;
1807   unsigned int i;
1808
1809   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1810     {
1811       if (cp->last_client_replies[i] == client)
1812         {
1813           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1814           cp->last_client_replies[i] = NULL;
1815         }
1816     }  
1817   return GNUNET_YES;
1818 }
1819
1820
1821 /**
1822  * A client disconnected.  Remove all of its pending queries.
1823  *
1824  * @param cls closure, NULL
1825  * @param client identification of the client
1826  */
1827 static void
1828 handle_client_disconnect (void *cls,
1829                           struct GNUNET_SERVER_Client
1830                           * client)
1831 {
1832   struct ClientList *pos;
1833   struct ClientList *prev;
1834   struct ClientRequestList *rcl;
1835   struct ClientResponseMessage *creply;
1836
1837   if (client == NULL)
1838     return;
1839   prev = NULL;
1840   pos = client_list;
1841   while ( (NULL != pos) &&
1842           (pos->client != client) )
1843     {
1844       prev = pos;
1845       pos = pos->next;
1846     }
1847   if (pos == NULL)
1848     return; /* no requests pending for this client */
1849   while (NULL != (rcl = pos->rl_head))
1850     {
1851       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1852                   "Destroying pending request `%s' on disconnect\n",
1853                   GNUNET_h2s (&rcl->req->query));
1854       destroy_pending_request (rcl->req);
1855     }
1856   if (prev == NULL)
1857     client_list = pos->next;
1858   else
1859     prev->next = pos->next;
1860   if (pos->th != NULL)
1861     {
1862       GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
1863       pos->th = NULL;
1864     }
1865   while (NULL != (creply = pos->res_head))
1866     {
1867       GNUNET_CONTAINER_DLL_remove (pos->res_head,
1868                                    pos->res_tail,
1869                                    creply);
1870       GNUNET_free (creply);
1871     }    
1872   GNUNET_SERVER_client_drop (pos->client);
1873   GNUNET_free (pos);
1874   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1875                                          &remove_client_from_last_client_replies,
1876                                          client);
1877 }
1878
1879
1880 /**
1881  * Iterator to free peer entries.
1882  *
1883  * @param cls closure, unused
1884  * @param key current key code
1885  * @param value value in the hash map (peer entry)
1886  * @return GNUNET_YES (we should continue to iterate)
1887  */
1888 static int 
1889 clean_peer (void *cls,
1890             const GNUNET_HashCode * key,
1891             void *value)
1892 {
1893   peer_disconnect_handler (NULL, (const struct GNUNET_PeerIdentity*) key);
1894   return GNUNET_YES;
1895 }
1896
1897
1898 /**
1899  * Task run during shutdown.
1900  *
1901  * @param cls unused
1902  * @param tc unused
1903  */
1904 static void
1905 shutdown_task (void *cls,
1906                const struct GNUNET_SCHEDULER_TaskContext *tc)
1907 {
1908   if (mig_qe != NULL)
1909     {
1910       GNUNET_DATASTORE_cancel (mig_qe);
1911       mig_qe = NULL;
1912     }
1913   if (dht_qe != NULL)
1914     {
1915       GNUNET_DATASTORE_cancel (dht_qe);
1916       dht_qe = NULL;
1917     }
1918   if (GNUNET_SCHEDULER_NO_TASK != mig_task)
1919     {
1920       GNUNET_SCHEDULER_cancel (sched, mig_task);
1921       mig_task = GNUNET_SCHEDULER_NO_TASK;
1922     }
1923   if (GNUNET_SCHEDULER_NO_TASK != dht_task)
1924     {
1925       GNUNET_SCHEDULER_cancel (sched, dht_task);
1926       dht_task = GNUNET_SCHEDULER_NO_TASK;
1927     }
1928   while (client_list != NULL)
1929     handle_client_disconnect (NULL,
1930                               client_list->client);
1931   cron_flush_trust (NULL, NULL);
1932   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1933                                          &clean_peer,
1934                                          NULL);
1935   GNUNET_break (0 == GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap));
1936   GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
1937   requests_by_expiration_heap = 0;
1938   GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
1939   connected_peers = NULL;
1940   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map));
1941   GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
1942   query_request_map = NULL;
1943   GNUNET_LOAD_value_free (rt_entry_lifetime);
1944   rt_entry_lifetime = NULL;
1945   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map));
1946   GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
1947   peer_request_map = NULL;
1948   GNUNET_assert (NULL != core);
1949   GNUNET_CORE_disconnect (core);
1950   core = NULL;
1951   if (stats != NULL)
1952     {
1953       GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
1954       stats = NULL;
1955     }
1956   if (dsh != NULL)
1957     {
1958       GNUNET_DATASTORE_disconnect (dsh,
1959                                    GNUNET_NO);
1960       dsh = NULL;
1961     }
1962   while (mig_head != NULL)
1963     delete_migration_block (mig_head);
1964   GNUNET_assert (0 == mig_size);
1965   GNUNET_DHT_disconnect (dht_handle);
1966   dht_handle = NULL;
1967   GNUNET_LOAD_value_free (datastore_get_load);
1968   datastore_get_load = NULL;
1969   GNUNET_LOAD_value_free (datastore_put_load);
1970   datastore_put_load = NULL;
1971   GNUNET_BLOCK_context_destroy (block_ctx);
1972   block_ctx = NULL;
1973   GNUNET_CONFIGURATION_destroy (block_cfg);
1974   block_cfg = NULL;
1975   sched = NULL;
1976   cfg = NULL;  
1977   GNUNET_free_non_null (trustDirectory);
1978   trustDirectory = NULL;
1979 }
1980
1981
1982 /* ******************* Utility functions  ******************** */
1983
1984
1985 /**
1986  * We've had to delay a request for transmission to core, but now
1987  * we should be ready.  Run it.
1988  *
1989  * @param cls the 'struct ConnectedPeer' for which a request was delayed
1990  * @param tc task context (unused)
1991  */
1992 static void
1993 delayed_transmission_request (void *cls,
1994                               const struct GNUNET_SCHEDULER_TaskContext *tc)
1995 {
1996   struct ConnectedPeer *cp = cls;
1997   struct GNUNET_PeerIdentity pid;
1998   struct PendingMessage *pm;
1999
2000   pm = cp->pending_messages_head;
2001   cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
2002   GNUNET_assert (cp->cth == NULL);
2003   if (pm == NULL)
2004     return;
2005   GNUNET_PEER_resolve (cp->pid,
2006                        &pid);
2007   cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
2008   cp->cth = GNUNET_CORE_notify_transmit_ready (core,
2009                                                pm->priority,
2010                                                GNUNET_CONSTANTS_SERVICE_TIMEOUT,
2011                                                &pid,
2012                                                pm->msize,
2013                                                &transmit_to_peer,
2014                                                cp);
2015 }
2016
2017
2018 /**
2019  * Transmit messages by copying it to the target buffer
2020  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
2021  * for writing in the meantime.  In that case, do nothing
2022  * (the disconnect or shutdown handler will take care of the rest).
2023  * If we were able to transmit messages and there are still more
2024  * pending, ask core again for further calls to this function.
2025  *
2026  * @param cls closure, pointer to the 'struct ConnectedPeer*'
2027  * @param size number of bytes available in buf
2028  * @param buf where the callee should write the message
2029  * @return number of bytes written to buf
2030  */
2031 static size_t
2032 transmit_to_peer (void *cls,
2033                   size_t size, void *buf)
2034 {
2035   struct ConnectedPeer *cp = cls;
2036   char *cbuf = buf;
2037   struct PendingMessage *pm;
2038   struct PendingMessage *next_pm;
2039   struct GNUNET_TIME_Absolute now;
2040   struct GNUNET_TIME_Relative min_delay;
2041   struct MigrationReadyBlock *mb;
2042   struct MigrationReadyBlock *next;
2043   struct PutMessage migm;
2044   size_t msize;
2045   unsigned int i;
2046   struct GNUNET_PeerIdentity pid;
2047  
2048   cp->cth = NULL;
2049   if (NULL == buf)
2050     {
2051 #if DEBUG_FS
2052       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2053                   "Dropping message, core too busy.\n");
2054 #endif
2055       GNUNET_LOAD_update (cp->transmission_delay,
2056                           UINT64_MAX);
2057       return 0;
2058     }  
2059   GNUNET_LOAD_update (cp->transmission_delay,
2060                       GNUNET_TIME_absolute_get_duration (cp->last_transmission_request_start).value);
2061   now = GNUNET_TIME_absolute_get ();
2062   msize = 0;
2063   min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
2064   next_pm = cp->pending_messages_head;
2065   while ( (NULL != (pm = next_pm) ) &&
2066           (pm->msize <= size) )
2067     {
2068       next_pm = pm->next;
2069       if (pm->delay_until.value > now.value)
2070         {
2071           min_delay = GNUNET_TIME_relative_min (min_delay,
2072                                                 GNUNET_TIME_absolute_get_remaining (pm->delay_until));
2073           continue;
2074         }
2075       memcpy (&cbuf[msize], &pm[1], pm->msize);
2076       msize += pm->msize;
2077       size -= pm->msize;
2078       if (NULL == pm->pml)
2079         {
2080           GNUNET_CONTAINER_DLL_remove (cp->pending_messages_head,
2081                                        cp->pending_messages_tail,
2082                                        pm);
2083           cp->pending_requests--;
2084         }
2085       destroy_pending_message (pm, cp->pid);
2086     }
2087   if (pm != NULL)
2088     min_delay = GNUNET_TIME_UNIT_ZERO;
2089   if (NULL != cp->pending_messages_head)
2090     {     
2091       GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == cp->delayed_transmission_request_task);
2092       cp->delayed_transmission_request_task
2093         = GNUNET_SCHEDULER_add_delayed (sched,
2094                                         min_delay,
2095                                         &delayed_transmission_request,
2096                                         cp);
2097     }
2098   if (pm == NULL)
2099     {      
2100       GNUNET_PEER_resolve (cp->pid,
2101                            &pid);
2102       next = mig_head;
2103       while (NULL != (mb = next))
2104         {
2105           next = mb->next;
2106           for (i=0;i<MIGRATION_LIST_SIZE;i++)
2107             {
2108               if ( (cp->pid == mb->target_list[i]) &&
2109                    (mb->size + sizeof (migm) <= size) )
2110                 {
2111                   GNUNET_PEER_change_rc (mb->target_list[i], -1);
2112                   mb->target_list[i] = 0;
2113                   mb->used_targets++;
2114                   memset (&migm, 0, sizeof (migm));
2115                   migm.header.size = htons (sizeof (migm) + mb->size);
2116                   migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2117                   migm.type = htonl (mb->type);
2118                   migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
2119                   memcpy (&cbuf[msize], &migm, sizeof (migm));
2120                   msize += sizeof (migm);
2121                   size -= sizeof (migm);
2122                   memcpy (&cbuf[msize], &mb[1], mb->size);
2123                   msize += mb->size;
2124                   size -= mb->size;
2125 #if DEBUG_FS
2126                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2127                               "Pushing migration block `%s' (%u bytes) to `%s'\n",
2128                               GNUNET_h2s (&mb->query),
2129                               (unsigned int) mb->size,
2130                               GNUNET_i2s (&pid));
2131 #endif    
2132                   break;
2133                 }
2134               else
2135                 {
2136 #if DEBUG_FS
2137                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2138                               "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
2139                               GNUNET_h2s (&mb->query),
2140                               (unsigned int) mb->size,
2141                               GNUNET_i2s (&pid));
2142 #endif    
2143                 }
2144             }
2145           if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
2146                (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
2147             {
2148               delete_migration_block (mb);
2149               consider_migration_gathering ();
2150             }
2151         }
2152       consider_migration (NULL, 
2153                           &pid.hashPubKey,
2154                           cp);
2155     }
2156 #if DEBUG_FS
2157   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2158               "Transmitting %u bytes to peer with PID %u\n",
2159               (unsigned int) msize,
2160               (unsigned int) cp->pid);
2161 #endif
2162   return msize;
2163 }
2164
2165
2166 /**
2167  * Add a message to the set of pending messages for the given peer.
2168  *
2169  * @param cp peer to send message to
2170  * @param pm message to queue
2171  * @param pr request on which behalf this message is being queued
2172  */
2173 static void
2174 add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
2175                                   struct PendingMessage *pm,
2176                                   struct PendingRequest *pr)
2177 {
2178   struct PendingMessage *pos;
2179   struct PendingMessageList *pml;
2180   struct GNUNET_PeerIdentity pid;
2181
2182   GNUNET_assert (pm->next == NULL);
2183   GNUNET_assert (pm->pml == NULL);    
2184   if (pr != NULL)
2185     {
2186       pml = GNUNET_malloc (sizeof (struct PendingMessageList));
2187       pml->req = pr;
2188       pml->target = cp;
2189       pml->pm = pm;
2190       pm->pml = pml;  
2191       GNUNET_CONTAINER_DLL_insert (pr->pending_head,
2192                                    pr->pending_tail,
2193                                    pml);
2194     }
2195   pos = cp->pending_messages_head;
2196   while ( (pos != NULL) &&
2197           (pm->priority < pos->priority) )
2198     pos = pos->next;    
2199   GNUNET_CONTAINER_DLL_insert_after (cp->pending_messages_head,
2200                                      cp->pending_messages_tail,
2201                                      pos,
2202                                      pm);
2203   cp->pending_requests++;
2204   if (cp->pending_requests > MAX_QUEUE_PER_PEER)
2205     {
2206       GNUNET_STATISTICS_update (stats,
2207                                 gettext_noop ("# P2P searches discarded (queue length bound)"),
2208                                 1,
2209                                 GNUNET_NO);
2210       destroy_pending_message (cp->pending_messages_tail, 0);  
2211     }
2212   GNUNET_PEER_resolve (cp->pid, &pid);
2213   if (NULL != cp->cth)
2214     {
2215       GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
2216       cp->cth = NULL;
2217     }
2218   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
2219     {
2220       GNUNET_SCHEDULER_cancel (sched, cp->delayed_transmission_request_task);
2221       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
2222     }
2223   /* need to schedule transmission */
2224   cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
2225   cp->cth = GNUNET_CORE_notify_transmit_ready (core,
2226                                                cp->pending_messages_head->priority,
2227                                                MAX_TRANSMIT_DELAY,
2228                                                &pid,
2229                                                cp->pending_messages_head->msize,
2230                                                &transmit_to_peer,
2231                                                cp);
2232   if (cp->cth == NULL)
2233     {
2234 #if DEBUG_FS
2235       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2236                   "Failed to schedule transmission with core!\n");
2237 #endif
2238       GNUNET_STATISTICS_update (stats,
2239                                 gettext_noop ("# CORE transmission failures"),
2240                                 1,
2241                                 GNUNET_NO);
2242     }
2243 }
2244
2245
2246 /**
2247  * Test if the DATABASE (GET) load on this peer is too high
2248  * to even consider processing the query at
2249  * all.  
2250  * 
2251  * @return GNUNET_YES if the load is too high to do anything (load high)
2252  *         GNUNET_NO to process normally (load normal)
2253  *         GNUNET_SYSERR to process for free (load low)
2254  */
2255 static int
2256 test_get_load_too_high (uint32_t priority)
2257 {
2258   double ld;
2259
2260   ld = GNUNET_LOAD_get_load (datastore_get_load);
2261   if (ld < 1)
2262     {
2263       GNUNET_STATISTICS_update (stats,
2264                                 gettext_noop ("# requests done for free (low load)"),
2265                                 1,
2266                                 GNUNET_NO);
2267       return GNUNET_SYSERR;
2268     }
2269   if (ld <= priority)
2270     {
2271       GNUNET_STATISTICS_update (stats,
2272                                 gettext_noop ("# requests done for a price (normal load)"),
2273                                 1,
2274                                 GNUNET_NO);
2275       return GNUNET_NO;
2276     }
2277   GNUNET_STATISTICS_update (stats,
2278                             gettext_noop ("# priority determined to be high"),
2279                             1,
2280                             GNUNET_NO);
2281   return GNUNET_YES;
2282 }
2283
2284
2285
2286
2287 /**
2288  * Test if the DATABASE (PUT) load on this peer is too high
2289  * to even consider processing the query at
2290  * all.  
2291  * 
2292  * @return GNUNET_YES if the load is too high to do anything (load high)
2293  *         GNUNET_NO to process normally (load normal or low)
2294  */
2295 static int
2296 test_put_load_too_high (uint32_t priority)
2297 {
2298   double ld;
2299
2300   if (GNUNET_LOAD_get_average (datastore_put_load) < 50)
2301     return GNUNET_NO; /* very fast */
2302   ld = GNUNET_LOAD_get_load (datastore_put_load);
2303   if (ld < 2.0 * (1 + priority))
2304     return GNUNET_NO;
2305   GNUNET_STATISTICS_update (stats,
2306                             gettext_noop ("# storage requests dropped due to high load"),
2307                             1,
2308                             GNUNET_NO);
2309   return GNUNET_YES;
2310 }
2311
2312
2313 /* ******************* Pending Request Refresh Task ******************** */
2314
2315
2316
2317 /**
2318  * We use a random delay to make the timing of requests less
2319  * predictable.  This function returns such a random delay.  We add a base
2320  * delay of MAX_CORK_DELAY (1s).
2321  *
2322  * FIXME: make schedule dependent on the specifics of the request?
2323  * Or bandwidth and number of connected peers and load?
2324  *
2325  * @return random delay to use for some request, between 1s and 1000+TTL_DECREMENT ms
2326  */
2327 static struct GNUNET_TIME_Relative
2328 get_processing_delay ()
2329 {
2330   return 
2331     GNUNET_TIME_relative_add (GNUNET_CONSTANTS_MAX_CORK_DELAY,
2332                               GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
2333                                                              GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2334                                                                                        TTL_DECREMENT)));
2335 }
2336
2337
2338 /**
2339  * We're processing a GET request from another peer and have decided
2340  * to forward it to other peers.  This function is called periodically
2341  * and should forward the request to other peers until we have all
2342  * possible replies.  If we have transmitted the *only* reply to
2343  * the initiator we should destroy the pending request.  If we have
2344  * many replies in the queue to the initiator, we should delay sending
2345  * out more queries until the reply queue has shrunk some.
2346  *
2347  * @param cls our "struct ProcessGetContext *"
2348  * @param tc unused
2349  */
2350 static void
2351 forward_request_task (void *cls,
2352                       const struct GNUNET_SCHEDULER_TaskContext *tc);
2353
2354
2355 /**
2356  * Function called after we either failed or succeeded
2357  * at transmitting a query to a peer.  
2358  *
2359  * @param cls the requests "struct PendingRequest*"
2360  * @param tpid ID of receiving peer, 0 on transmission error
2361  */
2362 static void
2363 transmit_query_continuation (void *cls,
2364                              GNUNET_PEER_Id tpid)
2365 {
2366   struct PendingRequest *pr = cls;
2367   unsigned int i;
2368
2369   GNUNET_STATISTICS_update (stats,
2370                             gettext_noop ("# queries scheduled for forwarding"),
2371                             -1,
2372                             GNUNET_NO);
2373   if (tpid == 0)   
2374     {
2375 #if DEBUG_FS
2376       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2377                   "Transmission of request failed, will try again later.\n");
2378 #endif
2379       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2380         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2381                                                  get_processing_delay (),
2382                                                  &forward_request_task,
2383                                                  pr); 
2384       return;    
2385     }
2386 #if DEBUG_FS
2387   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2388               "Transmitted query `%s'\n",
2389               GNUNET_h2s (&pr->query));
2390 #endif
2391   GNUNET_STATISTICS_update (stats,
2392                             gettext_noop ("# queries forwarded"),
2393                             1,
2394                             GNUNET_NO);
2395   for (i=0;i<pr->used_targets_off;i++)
2396     if (pr->used_targets[i].pid == tpid)
2397       break; /* found match! */    
2398   if (i == pr->used_targets_off)
2399     {
2400       /* need to create new entry */
2401       if (pr->used_targets_off == pr->used_targets_size)
2402         GNUNET_array_grow (pr->used_targets,
2403                            pr->used_targets_size,
2404                            pr->used_targets_size * 2 + 2);
2405       GNUNET_PEER_change_rc (tpid, 1);
2406       pr->used_targets[pr->used_targets_off].pid = tpid;
2407       pr->used_targets[pr->used_targets_off].num_requests = 0;
2408       i = pr->used_targets_off++;
2409     }
2410   pr->used_targets[i].last_request_time = GNUNET_TIME_absolute_get ();
2411   pr->used_targets[i].num_requests++;
2412   if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2413     pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2414                                              get_processing_delay (),
2415                                              &forward_request_task,
2416                                              pr);
2417 }
2418
2419
2420 /**
2421  * How many bytes should a bloomfilter be if we have already seen
2422  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
2423  * of bits set per entry.  Furthermore, we should not re-size the
2424  * filter too often (to keep it cheap).
2425  *
2426  * Since other peers will also add entries but not resize the filter,
2427  * we should generally pick a slightly larger size than what the
2428  * strict math would suggest.
2429  *
2430  * @return must be a power of two and smaller or equal to 2^15.
2431  */
2432 static size_t
2433 compute_bloomfilter_size (unsigned int entry_count)
2434 {
2435   size_t size;
2436   unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
2437   uint16_t max = 1 << 15;
2438
2439   if (entry_count > max)
2440     return max;
2441   size = 8;
2442   while ((size < max) && (size < ideal))
2443     size *= 2;
2444   if (size > max)
2445     return max;
2446   return size;
2447 }
2448
2449
2450 /**
2451  * Recalculate our bloom filter for filtering replies.  This function
2452  * will create a new bloom filter from scratch, so it should only be
2453  * called if we have no bloomfilter at all (and hence can create a
2454  * fresh one of minimal size without problems) OR if our peer is the
2455  * initiator (in which case we may resize to larger than mimimum size).
2456  *
2457  * @param pr request for which the BF is to be recomputed
2458  */
2459 static void
2460 refresh_bloomfilter (struct PendingRequest *pr)
2461 {
2462   unsigned int i;
2463   size_t nsize;
2464   GNUNET_HashCode mhash;
2465
2466   nsize = compute_bloomfilter_size (pr->replies_seen_off);
2467   if (nsize == pr->bf_size)
2468     return; /* size not changed */
2469   if (pr->bf != NULL)
2470     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
2471   pr->bf_size = nsize;
2472   pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
2473   pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
2474                                               pr->bf_size,
2475                                               BLOOMFILTER_K);
2476   for (i=0;i<pr->replies_seen_off;i++)
2477     {
2478       GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i],
2479                                 pr->mingle,
2480                                 &mhash);
2481       GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
2482     }
2483 }
2484
2485
2486 /**
2487  * Function called after we've tried to reserve a certain amount of
2488  * bandwidth for a reply.  Check if we succeeded and if so send our
2489  * query.
2490  *
2491  * @param cls the requests "struct PendingRequest*"
2492  * @param peer identifies the peer
2493  * @param bpm_in set to the current bandwidth limit (receiving) for this peer
2494  * @param bpm_out set to the current bandwidth limit (sending) for this peer
2495  * @param amount set to the amount that was actually reserved or unreserved
2496  * @param preference current traffic preference for the given peer
2497  */
2498 static void
2499 target_reservation_cb (void *cls,
2500                        const struct
2501                        GNUNET_PeerIdentity * peer,
2502                        struct GNUNET_BANDWIDTH_Value32NBO bpm_in,
2503                        struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
2504                        int amount,
2505                        uint64_t preference)
2506 {
2507   struct PendingRequest *pr = cls;
2508   struct ConnectedPeer *cp;
2509   struct PendingMessage *pm;
2510   struct GetMessage *gm;
2511   GNUNET_HashCode *ext;
2512   char *bfdata;
2513   size_t msize;
2514   unsigned int k;
2515   int no_route;
2516   uint32_t bm;
2517   unsigned int i;
2518
2519   pr->irc = NULL;
2520   if (peer == NULL)
2521     {
2522       /* error in communication with core, try again later */
2523       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2524         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2525                                                  get_processing_delay (),
2526                                                  &forward_request_task,
2527                                                  pr);
2528       return;
2529     }
2530   /* (3) transmit, update ttl/priority */
2531   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2532                                           &peer->hashPubKey);
2533   if (cp == NULL)
2534     {
2535       /* Peer must have just left */
2536 #if DEBUG_FS
2537       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2538                   "Selected peer disconnected!\n");
2539 #endif
2540       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2541         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2542                                                  get_processing_delay (),
2543                                                  &forward_request_task,
2544                                                  pr);
2545       return;
2546     }
2547   no_route = GNUNET_NO;
2548   if (amount == 0)
2549     {
2550       if (pr->cp == NULL)
2551         {
2552 #if DEBUG_FS > 1
2553           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2554                       "Failed to reserve bandwidth for reply (got %d/%u bytes only)!\n",
2555                       amount,
2556                       DBLOCK_SIZE);
2557 #endif
2558           GNUNET_STATISTICS_update (stats,
2559                                     gettext_noop ("# reply bandwidth reservation requests failed"),
2560                                     1,
2561                                     GNUNET_NO);
2562           if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2563             pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2564                                                      get_processing_delay (),
2565                                                      &forward_request_task,
2566                                                      pr);
2567           return;  /* this target round failed */
2568         }
2569       no_route = GNUNET_YES;
2570     }
2571   
2572   GNUNET_STATISTICS_update (stats,
2573                             gettext_noop ("# queries scheduled for forwarding"),
2574                             1,
2575                             GNUNET_NO);
2576   for (i=0;i<pr->used_targets_off;i++)
2577     if (pr->used_targets[i].pid == cp->pid) 
2578       {
2579         GNUNET_STATISTICS_update (stats,
2580                                   gettext_noop ("# queries retransmitted to same target"),
2581                                   1,
2582                                   GNUNET_NO);
2583         break;
2584       } 
2585
2586   /* build message and insert message into priority queue */
2587 #if DEBUG_FS
2588   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2589               "Forwarding request `%s' to `%4s'!\n",
2590               GNUNET_h2s (&pr->query),
2591               GNUNET_i2s (peer));
2592 #endif
2593   k = 0;
2594   bm = 0;
2595   if (GNUNET_YES == no_route)
2596     {
2597       bm |= GET_MESSAGE_BIT_RETURN_TO;
2598       k++;      
2599     }
2600   if (pr->namespace != NULL)
2601     {
2602       bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
2603       k++;
2604     }
2605   if (pr->target_pid != 0)
2606     {
2607       bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
2608       k++;
2609     }
2610   msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
2611   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
2612   pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
2613   pm->msize = msize;
2614   gm = (struct GetMessage*) &pm[1];
2615   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
2616   gm->header.size = htons (msize);
2617   gm->type = htonl (pr->type);
2618   pr->remaining_priority /= 2;
2619   gm->priority = htonl (pr->remaining_priority);
2620   gm->ttl = htonl (pr->ttl);
2621   gm->filter_mutator = htonl(pr->mingle); 
2622   gm->hash_bitmap = htonl (bm);
2623   gm->query = pr->query;
2624   ext = (GNUNET_HashCode*) &gm[1];
2625   k = 0;
2626   if (GNUNET_YES == no_route)
2627     GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2628   if (pr->namespace != NULL)
2629     memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
2630   if (pr->target_pid != 0)
2631     GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2632   bfdata = (char *) &ext[k];
2633   if (pr->bf != NULL)
2634     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
2635                                                bfdata,
2636                                                pr->bf_size);
2637   pm->cont = &transmit_query_continuation;
2638   pm->cont_cls = pr;
2639   cp->last_request_times[(cp->last_request_times_off++) % MAX_QUEUE_PER_PEER] = GNUNET_TIME_absolute_get ();
2640   add_to_pending_messages_for_peer (cp, pm, pr);
2641 }
2642
2643
2644 /**
2645  * Closure used for "target_peer_select_cb".
2646  */
2647 struct PeerSelectionContext 
2648 {
2649   /**
2650    * The request for which we are selecting
2651    * peers.
2652    */
2653   struct PendingRequest *pr;
2654
2655   /**
2656    * Current "prime" target.
2657    */
2658   struct GNUNET_PeerIdentity target;
2659
2660   /**
2661    * How much do we like this target?
2662    */
2663   double target_score;
2664
2665 };
2666
2667
2668 /**
2669  * Function called for each connected peer to determine
2670  * which one(s) would make good targets for forwarding.
2671  *
2672  * @param cls closure (struct PeerSelectionContext)
2673  * @param key current key code (peer identity)
2674  * @param value value in the hash map (struct ConnectedPeer)
2675  * @return GNUNET_YES if we should continue to
2676  *         iterate,
2677  *         GNUNET_NO if not.
2678  */
2679 static int
2680 target_peer_select_cb (void *cls,
2681                        const GNUNET_HashCode * key,
2682                        void *value)
2683 {
2684   struct PeerSelectionContext *psc = cls;
2685   struct ConnectedPeer *cp = value;
2686   struct PendingRequest *pr = psc->pr;
2687   struct GNUNET_TIME_Relative delay;
2688   double score;
2689   unsigned int i;
2690   unsigned int pc;
2691
2692   /* 1) check that this peer is not the initiator */
2693   if (cp == pr->cp)
2694     {
2695 #if DEBUG_FS
2696       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2697                   "Skipping initiator in forwarding selection\n");
2698 #endif
2699       return GNUNET_YES; /* skip */        
2700     }
2701
2702   /* 2) check if we have already (recently) forwarded to this peer */
2703   /* 2a) this particular request */
2704   pc = 0;
2705   for (i=0;i<pr->used_targets_off;i++)
2706     if (pr->used_targets[i].pid == cp->pid) 
2707       {
2708         pc = pr->used_targets[i].num_requests;
2709         GNUNET_assert (pc > 0);
2710         if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2711                                            RETRY_PROBABILITY_INV * pc))
2712           {
2713 #if DEBUG_FS
2714             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2715                         "NOT re-trying query that was previously transmitted %u times\n",
2716                         (unsigned int) pc);
2717 #endif
2718             return GNUNET_YES; /* skip */
2719           }
2720         break;
2721       }
2722 #if DEBUG_FS
2723   if (0 < pc)
2724     {
2725       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2726                   "Re-trying query that was previously transmitted %u times to this peer\n",
2727                   (unsigned int) pc);
2728     }
2729 #endif
2730   /* 2b) many other requests to this peer */
2731   delay = GNUNET_TIME_absolute_get_duration (cp->last_request_times[cp->last_request_times_off % MAX_QUEUE_PER_PEER]);
2732   if (delay.value <= cp->avg_delay.value)
2733     {
2734 #if DEBUG_FS
2735       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2736                   "NOT sending query since we send %u others to this peer in the last %llums\n",
2737                   MAX_QUEUE_PER_PEER,
2738                   cp->avg_delay.value);
2739 #endif
2740       return GNUNET_YES; /* skip */      
2741     }
2742
2743   /* 3) calculate how much we'd like to forward to this peer,
2744      starting with a random value that is strong enough
2745      to at least give any peer a chance sometimes 
2746      (compared to the other factors that come later) */
2747   /* 3a) count successful (recent) routes from cp for same source */
2748   if (pr->cp != NULL)
2749     {
2750       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2751                                         P2P_SUCCESS_LIST_SIZE);
2752       for (i=0;i<P2P_SUCCESS_LIST_SIZE;i++)
2753         if (cp->last_p2p_replies[i] == pr->cp->pid)
2754           score += 1.0; /* likely successful based on hot path */
2755     }
2756   else
2757     {
2758       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2759                                         CS2P_SUCCESS_LIST_SIZE);
2760       for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
2761         if (cp->last_client_replies[i] == pr->client_request_list->client_list->client)
2762           score += 1.0; /* likely successful based on hot path */
2763     }
2764   /* 3b) include latency */
2765   if (cp->avg_delay.value < 4 * TTL_DECREMENT)
2766     score += 1.0; /* likely fast based on latency */
2767   /* 3c) include priorities */
2768   if (cp->avg_priority <= pr->remaining_priority / 2.0)
2769     score += 1.0; /* likely successful based on priorities */
2770   /* 3d) penalize for queue size */  
2771   score -= (2.0 * cp->pending_requests / (double) MAX_QUEUE_PER_PEER); 
2772   /* 3e) include peer proximity */
2773   score -= (2.0 * (GNUNET_CRYPTO_hash_distance_u32 (key,
2774                                                     &pr->query)) / (double) UINT32_MAX);
2775   /* 4) super-bonus for being the known target */
2776   if (pr->target_pid == cp->pid)
2777     score += 100.0;
2778   /* store best-fit in closure */
2779 #if DEBUG_FS
2780   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2781               "Peer `%s' gets score %f for forwarding query, max is %f\n",
2782               GNUNET_h2s (key),
2783               score,
2784               psc->target_score);
2785 #endif  
2786   score++; /* avoid zero */
2787   if (score > psc->target_score)
2788     {
2789       psc->target_score = score;
2790       psc->target.hashPubKey = *key; 
2791     }
2792   return GNUNET_YES;
2793 }
2794   
2795
2796 /**
2797  * The priority level imposes a bound on the maximum
2798  * value for the ttl that can be requested.
2799  *
2800  * @param ttl_in requested ttl
2801  * @param prio given priority
2802  * @return ttl_in if ttl_in is below the limit,
2803  *         otherwise the ttl-limit for the given priority
2804  */
2805 static int32_t
2806 bound_ttl (int32_t ttl_in, uint32_t prio)
2807 {
2808   unsigned long long allowed;
2809
2810   if (ttl_in <= 0)
2811     return ttl_in;
2812   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
2813   if (ttl_in > allowed)      
2814     {
2815       if (allowed >= (1 << 30))
2816         return 1 << 30;
2817       return allowed;
2818     }
2819   return ttl_in;
2820 }
2821
2822
2823 /**
2824  * Iterator called on each result obtained for a DHT
2825  * operation that expects a reply
2826  *
2827  * @param cls closure
2828  * @param exp when will this value expire
2829  * @param key key of the result
2830  * @param get_path NULL-terminated array of pointers
2831  *                 to the peers on reverse GET path (or NULL if not recorded)
2832  * @param put_path NULL-terminated array of pointers
2833  *                 to the peers on the PUT path (or NULL if not recorded)
2834  * @param type type of the result
2835  * @param size number of bytes in data
2836  * @param data pointer to the result data
2837  */
2838 static void
2839 process_dht_reply (void *cls,
2840                    struct GNUNET_TIME_Absolute exp,
2841                    const GNUNET_HashCode * key,
2842                    const struct GNUNET_PeerIdentity * const *get_path,
2843                    const struct GNUNET_PeerIdentity * const *put_path,
2844                    enum GNUNET_BLOCK_Type type,
2845                    size_t size,
2846                    const void *data);
2847
2848
2849 /**
2850  * We're processing a GET request and have decided
2851  * to forward it to other peers.  This function is called periodically
2852  * and should forward the request to other peers until we have all
2853  * possible replies.  If we have transmitted the *only* reply to
2854  * the initiator we should destroy the pending request.  If we have
2855  * many replies in the queue to the initiator, we should delay sending
2856  * out more queries until the reply queue has shrunk some.
2857  *
2858  * @param cls our "struct ProcessGetContext *"
2859  * @param tc unused
2860  */
2861 static void
2862 forward_request_task (void *cls,
2863                      const struct GNUNET_SCHEDULER_TaskContext *tc)
2864 {
2865   struct PendingRequest *pr = cls;
2866   struct PeerSelectionContext psc;
2867   struct ConnectedPeer *cp; 
2868   struct GNUNET_TIME_Relative delay;
2869
2870   pr->task = GNUNET_SCHEDULER_NO_TASK;
2871   if (pr->irc != NULL)
2872     {
2873 #if DEBUG_FS
2874       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2875                   "Forwarding of query `%s' not attempted due to pending local lookup!\n",
2876                   GNUNET_h2s (&pr->query));
2877 #endif
2878       return; /* already pending */
2879     }
2880   if (GNUNET_YES == pr->local_only)
2881     return; /* configured to not do P2P search */
2882   /* (0) try DHT */
2883   if ( (0 == pr->anonymity_level) &&
2884        (GNUNET_YES != pr->forward_only) &&
2885        (pr->type != GNUNET_BLOCK_TYPE_FS_DBLOCK) &&
2886        (pr->type != GNUNET_BLOCK_TYPE_FS_IBLOCK) )
2887     {
2888       pr->dht_get = GNUNET_DHT_get_start (dht_handle,
2889                                           GNUNET_TIME_UNIT_FOREVER_REL,
2890                                           pr->type,
2891                                           &pr->query,
2892                                           GNUNET_DHT_RO_NONE,
2893                                           pr->bf,
2894                                           pr->mingle,
2895                                           pr->namespace,
2896                                           (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
2897                                           &process_dht_reply,
2898                                           pr);
2899     }
2900   /* (1) select target */
2901   psc.pr = pr;
2902   psc.target_score = -DBL_MAX;
2903   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
2904                                          &target_peer_select_cb,
2905                                          &psc);  
2906   if (psc.target_score == -DBL_MAX)
2907     {
2908       delay = get_processing_delay ();
2909 #if DEBUG_FS 
2910       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2911                   "No peer selected for forwarding of query `%s', will try again in %llu ms!\n",
2912                   GNUNET_h2s (&pr->query),
2913                   delay.value);
2914 #endif
2915       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2916                                                delay,
2917                                                &forward_request_task,
2918                                                pr);
2919       return; /* nobody selected */
2920     }
2921   /* (3) update TTL/priority */
2922   if (pr->client_request_list != NULL)
2923     {
2924       /* FIXME: use better algorithm!? */
2925       if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2926                                          4))
2927         pr->priority++;
2928       /* bound priority we use by priorities we see from other peers
2929          rounded up (must round up so that we can see non-zero
2930          priorities, but round up as little as possible to make it
2931          plausible that we forwarded another peers request) */
2932       if (pr->priority > current_priorities + 1.0)
2933         pr->priority = (uint32_t) current_priorities + 1.0;
2934       pr->ttl = bound_ttl (pr->ttl + TTL_DECREMENT * 2,
2935                            pr->priority);
2936 #if DEBUG_FS
2937       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2938                   "Trying query `%s' with priority %u and TTL %d.\n",
2939                   GNUNET_h2s (&pr->query),
2940                   pr->priority,
2941                   pr->ttl);
2942 #endif
2943     }
2944
2945   /* (3) reserve reply bandwidth */
2946   if (GNUNET_NO == pr->forward_only)
2947     {
2948       cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2949                                               &psc.target.hashPubKey);
2950       GNUNET_assert (NULL != cp);
2951       pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
2952                                                     &psc.target,
2953                                                     GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
2954                                                     GNUNET_BANDWIDTH_value_init (UINT32_MAX),
2955                                                     DBLOCK_SIZE * 2, 
2956                                                     cp->inc_preference,
2957                                                     &target_reservation_cb,
2958                                                     pr);
2959       cp->inc_preference = 0;
2960     }
2961   else
2962     {
2963       /* force forwarding */
2964       static struct GNUNET_BANDWIDTH_Value32NBO zerobw;
2965       target_reservation_cb (pr, &psc.target,
2966                              zerobw, zerobw, 0, 0.0);
2967     }
2968 }
2969
2970
2971 /* **************************** P2P PUT Handling ************************ */
2972
2973
2974 /**
2975  * Function called after we either failed or succeeded
2976  * at transmitting a reply to a peer.  
2977  *
2978  * @param cls the requests "struct PendingRequest*"
2979  * @param tpid ID of receiving peer, 0 on transmission error
2980  */
2981 static void
2982 transmit_reply_continuation (void *cls,
2983                              GNUNET_PEER_Id tpid)
2984 {
2985   struct PendingRequest *pr = cls;
2986   
2987   switch (pr->type)
2988     {
2989     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
2990     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
2991       /* only one reply expected, done with the request! */
2992       destroy_pending_request (pr);
2993       break;
2994     case GNUNET_BLOCK_TYPE_ANY:
2995     case GNUNET_BLOCK_TYPE_FS_KBLOCK:
2996     case GNUNET_BLOCK_TYPE_FS_SBLOCK:
2997       break;
2998     default:
2999       GNUNET_break (0);
3000       break;
3001     }
3002 }
3003
3004
3005 /**
3006  * Transmit the given message by copying it to the target buffer
3007  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
3008  * for writing in the meantime.  In that case, do nothing
3009  * (the disconnect or shutdown handler will take care of the rest).
3010  * If we were able to transmit messages and there are still more
3011  * pending, ask core again for further calls to this function.
3012  *
3013  * @param cls closure, pointer to the 'struct ClientList*'
3014  * @param size number of bytes available in buf
3015  * @param buf where the callee should write the message
3016  * @return number of bytes written to buf
3017  */
3018 static size_t
3019 transmit_to_client (void *cls,
3020                   size_t size, void *buf)
3021 {
3022   struct ClientList *cl = cls;
3023   char *cbuf = buf;
3024   struct ClientResponseMessage *creply;
3025   size_t msize;
3026   
3027   cl->th = NULL;
3028   if (NULL == buf)
3029     {
3030 #if DEBUG_FS
3031       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3032                   "Not sending reply, client communication problem.\n");
3033 #endif
3034       return 0;
3035     }
3036   msize = 0;
3037   while ( (NULL != (creply = cl->res_head) ) &&
3038           (creply->msize <= size) )
3039     {
3040       memcpy (&cbuf[msize], &creply[1], creply->msize);
3041       msize += creply->msize;
3042       size -= creply->msize;
3043       GNUNET_CONTAINER_DLL_remove (cl->res_head,
3044                                    cl->res_tail,
3045                                    creply);
3046       GNUNET_free (creply);
3047     }
3048   if (NULL != creply)
3049     cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
3050                                                   creply->msize,
3051                                                   GNUNET_TIME_UNIT_FOREVER_REL,
3052                                                   &transmit_to_client,
3053                                                   cl);
3054 #if DEBUG_FS
3055   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3056               "Transmitted %u bytes to client\n",
3057               (unsigned int) msize);
3058 #endif
3059   return msize;
3060 }
3061
3062
3063 /**
3064  * Closure for "process_reply" function.
3065  */
3066 struct ProcessReplyClosure
3067 {
3068   /**
3069    * The data for the reply.
3070    */
3071   const void *data;
3072
3073   /**
3074    * Who gave us this reply? NULL for local host (or DHT)
3075    */
3076   struct ConnectedPeer *sender;
3077
3078   /**
3079    * When the reply expires.
3080    */
3081   struct GNUNET_TIME_Absolute expiration;
3082
3083   /**
3084    * Size of data.
3085    */
3086   size_t size;
3087
3088   /**
3089    * Type of the block.
3090    */
3091   enum GNUNET_BLOCK_Type type;
3092
3093   /**
3094    * How much was this reply worth to us?
3095    */
3096   uint32_t priority;
3097
3098   /**
3099    * Evaluation result (returned).
3100    */
3101   enum GNUNET_BLOCK_EvaluationResult eval;
3102
3103   /**
3104    * Did we finish processing the associated request?
3105    */ 
3106   int finished;
3107
3108   /**
3109    * Did we find a matching request?
3110    */
3111   int request_found;
3112 };
3113
3114
3115 /**
3116  * We have received a reply; handle it!
3117  *
3118  * @param cls response (struct ProcessReplyClosure)
3119  * @param key our query
3120  * @param value value in the hash map (info about the query)
3121  * @return GNUNET_YES (we should continue to iterate)
3122  */
3123 static int
3124 process_reply (void *cls,
3125                const GNUNET_HashCode * key,
3126                void *value)
3127 {
3128   struct ProcessReplyClosure *prq = cls;
3129   struct PendingRequest *pr = value;
3130   struct PendingMessage *reply;
3131   struct ClientResponseMessage *creply;
3132   struct ClientList *cl;
3133   struct PutMessage *pm;
3134   struct ConnectedPeer *cp;
3135   struct GNUNET_TIME_Relative cur_delay;
3136 #if SUPPORT_DELAYS  
3137 struct GNUNET_TIME_Relative art_delay;
3138 #endif
3139   size_t msize;
3140   unsigned int i;
3141
3142 #if DEBUG_FS
3143   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3144               "Matched result (type %u) for query `%s' with pending request\n",
3145               (unsigned int) prq->type,
3146               GNUNET_h2s (key));
3147 #endif  
3148   GNUNET_STATISTICS_update (stats,
3149                             gettext_noop ("# replies received and matched"),
3150                             1,
3151                             GNUNET_NO);
3152   if (prq->sender != NULL)
3153     {
3154       for (i=0;i<pr->used_targets_off;i++)
3155         if (pr->used_targets[i].pid == prq->sender->pid)
3156           break;
3157       if (i < pr->used_targets_off)
3158         {
3159           cur_delay = GNUNET_TIME_absolute_get_duration (pr->used_targets[i].last_request_time);      
3160           prq->sender->avg_delay.value
3161             = (prq->sender->avg_delay.value * 
3162                (RUNAVG_DELAY_N - 1) + cur_delay.value) / RUNAVG_DELAY_N; 
3163           prq->sender->avg_priority
3164             = (prq->sender->avg_priority * 
3165                (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
3166         }
3167       if (pr->cp != NULL)
3168         {
3169           GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
3170                                  [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], 
3171                                  -1);
3172           GNUNET_PEER_change_rc (pr->cp->pid, 1);
3173           prq->sender->last_p2p_replies
3174             [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
3175             = pr->cp->pid;
3176         }
3177       else
3178         {
3179           if (NULL != prq->sender->last_client_replies
3180               [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
3181             GNUNET_SERVER_client_drop (prq->sender->last_client_replies
3182                                        [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
3183           prq->sender->last_client_replies
3184             [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
3185             = pr->client_request_list->client_list->client;
3186           GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
3187         }
3188     }
3189   prq->eval = GNUNET_BLOCK_evaluate (block_ctx,
3190                                      prq->type,
3191                                      key,
3192                                      &pr->bf,
3193                                      pr->mingle,
3194                                      pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
3195                                      prq->data,
3196                                      prq->size);
3197   switch (prq->eval)
3198     {
3199     case GNUNET_BLOCK_EVALUATION_OK_MORE:
3200       break;
3201     case GNUNET_BLOCK_EVALUATION_OK_LAST:
3202       while (NULL != pr->pending_head)
3203         destroy_pending_message_list_entry (pr->pending_head);
3204       if (pr->qe != NULL)
3205         {
3206           if (pr->client_request_list != NULL)
3207             GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
3208                                         GNUNET_YES);
3209           GNUNET_DATASTORE_cancel (pr->qe);
3210           pr->qe = NULL;
3211         }
3212       pr->do_remove = GNUNET_YES;
3213       if (pr->task != GNUNET_SCHEDULER_NO_TASK)
3214         {
3215           GNUNET_SCHEDULER_cancel (sched,
3216                                    pr->task);
3217           pr->task = GNUNET_SCHEDULER_NO_TASK;
3218         }
3219       GNUNET_break (GNUNET_YES ==
3220                     GNUNET_CONTAINER_multihashmap_remove (query_request_map,
3221                                                           key,
3222                                                           pr));
3223       GNUNET_LOAD_update (rt_entry_lifetime,
3224                           GNUNET_TIME_absolute_get_duration (pr->start_time).value);
3225       break;
3226     case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
3227       GNUNET_STATISTICS_update (stats,
3228                                 gettext_noop ("# duplicate replies discarded (bloomfilter)"),
3229                                 1,
3230                                 GNUNET_NO);
3231 #if DEBUG_FS
3232       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3233                   "Duplicate response `%s', discarding.\n",
3234                   GNUNET_h2s (&mhash));
3235 #endif
3236       return GNUNET_YES; /* duplicate */
3237     case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
3238       return GNUNET_YES; /* wrong namespace */  
3239     case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
3240       GNUNET_break (0);
3241       return GNUNET_YES;
3242     case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
3243       GNUNET_break (0);
3244       return GNUNET_YES;
3245     case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
3246       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3247                   _("Unsupported block type %u\n"),
3248                   prq->type);
3249       return GNUNET_NO;
3250     }
3251   if (pr->client_request_list != NULL)
3252     {
3253       if (pr->replies_seen_size == pr->replies_seen_off)
3254         GNUNET_array_grow (pr->replies_seen,
3255                            pr->replies_seen_size,
3256                            pr->replies_seen_size * 2 + 4);      
3257       GNUNET_CRYPTO_hash (prq->data,
3258                           prq->size,
3259                           &pr->replies_seen[pr->replies_seen_off++]);         
3260       refresh_bloomfilter (pr);
3261     }
3262   if (NULL == prq->sender)
3263     {
3264 #if DEBUG_FS
3265       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3266                   "Found result for query `%s' in local datastore\n",
3267                   GNUNET_h2s (key));
3268 #endif
3269       GNUNET_STATISTICS_update (stats,
3270                                 gettext_noop ("# results found locally"),
3271                                 1,
3272                                 GNUNET_NO);      
3273     }
3274   prq->priority += pr->remaining_priority;
3275   pr->remaining_priority = 0;
3276   pr->results_found++;
3277   prq->request_found = GNUNET_YES;
3278   if (NULL != pr->client_request_list)
3279     {
3280       GNUNET_STATISTICS_update (stats,
3281                                 gettext_noop ("# replies received for local clients"),
3282                                 1,
3283                                 GNUNET_NO);
3284       cl = pr->client_request_list->client_list;
3285       msize = sizeof (struct PutMessage) + prq->size;
3286       creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage));
3287       creply->msize = msize;
3288       creply->client_list = cl;
3289       GNUNET_CONTAINER_DLL_insert_after (cl->res_head,
3290                                          cl->res_tail,
3291                                          cl->res_tail,
3292                                          creply);      
3293       pm = (struct PutMessage*) &creply[1];
3294       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3295       pm->header.size = htons (msize);
3296       pm->type = htonl (prq->type);
3297       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3298       memcpy (&pm[1], prq->data, prq->size);      
3299       if (NULL == cl->th)
3300         {
3301 #if DEBUG_FS
3302           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3303                       "Transmitting result for query `%s' to client\n",
3304                       GNUNET_h2s (key));
3305 #endif  
3306           cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
3307                                                         msize,
3308                                                         GNUNET_TIME_UNIT_FOREVER_REL,
3309                                                         &transmit_to_client,
3310                                                         cl);
3311         }
3312       GNUNET_break (cl->th != NULL);
3313       if (pr->do_remove)                
3314         {
3315           prq->finished = GNUNET_YES;
3316           destroy_pending_request (pr);         
3317         }
3318     }
3319   else
3320     {
3321       cp = pr->cp;
3322 #if DEBUG_FS
3323       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3324                   "Transmitting result for query `%s' to other peer (PID=%u)\n",
3325                   GNUNET_h2s (key),
3326                   (unsigned int) cp->pid);
3327 #endif  
3328       GNUNET_STATISTICS_update (stats,
3329                                 gettext_noop ("# replies received for other peers"),
3330                                 1,
3331                                 GNUNET_NO);
3332       msize = sizeof (struct PutMessage) + prq->size;
3333       reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
3334       reply->cont = &transmit_reply_continuation;
3335       reply->cont_cls = pr;
3336 #if SUPPORT_DELAYS
3337       art_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
3338                                                  GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3339                                                                            TTL_DECREMENT));
3340       reply->delay_until 
3341         = GNUNET_TIME_relative_to_absolute (art_delay);
3342       GNUNET_STATISTICS_update (stats,
3343                                 gettext_noop ("cummulative artificial delay introduced (ms)"),
3344                                 art_delay.value,
3345                                 GNUNET_NO);
3346 #endif
3347       reply->msize = msize;
3348       reply->priority = UINT32_MAX; /* send replies first! */
3349       pm = (struct PutMessage*) &reply[1];
3350       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3351       pm->header.size = htons (msize);
3352       pm->type = htonl (prq->type);
3353       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3354       memcpy (&pm[1], prq->data, prq->size);
3355       add_to_pending_messages_for_peer (cp, reply, pr);
3356     }
3357   return GNUNET_YES;
3358 }
3359
3360
3361 /**
3362  * Iterator called on each result obtained for a DHT
3363  * operation that expects a reply
3364  *
3365  * @param cls closure
3366  * @param exp when will this value expire
3367  * @param key key of the result
3368  * @param get_path NULL-terminated array of pointers
3369  *                 to the peers on reverse GET path (or NULL if not recorded)
3370  * @param put_path NULL-terminated array of pointers
3371  *                 to the peers on the PUT path (or NULL if not recorded)
3372  * @param type type of the result
3373  * @param size number of bytes in data
3374  * @param data pointer to the result data
3375  */
3376 static void
3377 process_dht_reply (void *cls,
3378                    struct GNUNET_TIME_Absolute exp,
3379                    const GNUNET_HashCode * key,
3380                    const struct GNUNET_PeerIdentity * const *get_path,
3381                    const struct GNUNET_PeerIdentity * const *put_path,
3382                    enum GNUNET_BLOCK_Type type,
3383                    size_t size,
3384                    const void *data)
3385 {
3386   struct PendingRequest *pr = cls;
3387   struct ProcessReplyClosure prq;
3388
3389   memset (&prq, 0, sizeof (prq));
3390   prq.data = data;
3391   prq.expiration = exp;
3392   prq.size = size;  
3393   prq.type = type;
3394   process_reply (&prq, key, pr);
3395 }
3396
3397
3398
3399 /**
3400  * Continuation called to notify client about result of the
3401  * operation.
3402  *
3403  * @param cls closure
3404  * @param success GNUNET_SYSERR on failure
3405  * @param msg NULL on success, otherwise an error message
3406  */
3407 static void 
3408 put_migration_continuation (void *cls,
3409                             int success,
3410                             const char *msg)
3411 {
3412   struct GNUNET_TIME_Absolute *start = cls;
3413   struct GNUNET_TIME_Relative delay;
3414   
3415   delay = GNUNET_TIME_absolute_get_duration (*start);
3416   GNUNET_free (start);
3417   GNUNET_LOAD_update (datastore_put_load,
3418                       delay.value);
3419   if (GNUNET_OK == success)
3420     return;
3421   GNUNET_STATISTICS_update (stats,
3422                             gettext_noop ("# datastore 'put' failures"),
3423                             1,
3424                             GNUNET_NO);
3425 }
3426
3427
3428 /**
3429  * Handle P2P "PUT" message.
3430  *
3431  * @param cls closure, always NULL
3432  * @param other the other peer involved (sender or receiver, NULL
3433  *        for loopback messages where we are both sender and receiver)
3434  * @param message the actual message
3435  * @param latency reported latency of the connection with 'other'
3436  * @param distance reported distance (DV) to 'other' 
3437  * @return GNUNET_OK to keep the connection open,
3438  *         GNUNET_SYSERR to close it (signal serious error)
3439  */
3440 static int
3441 handle_p2p_put (void *cls,
3442                 const struct GNUNET_PeerIdentity *other,
3443                 const struct GNUNET_MessageHeader *message,
3444                 struct GNUNET_TIME_Relative latency,
3445                 uint32_t distance)
3446 {
3447   const struct PutMessage *put;
3448   uint16_t msize;
3449   size_t dsize;
3450   enum GNUNET_BLOCK_Type type;
3451   struct GNUNET_TIME_Absolute expiration;
3452   GNUNET_HashCode query;
3453   struct ProcessReplyClosure prq;
3454   struct GNUNET_TIME_Absolute *start;
3455   struct GNUNET_TIME_Relative block_time;  
3456   double putl;
3457   struct ConnectedPeer *cp; 
3458   struct PendingMessage *pm;
3459   struct MigrationStopMessage *msm;
3460
3461   msize = ntohs (message->size);
3462   if (msize < sizeof (struct PutMessage))
3463     {
3464       GNUNET_break_op(0);
3465       return GNUNET_SYSERR;
3466     }
3467   put = (const struct PutMessage*) message;
3468   dsize = msize - sizeof (struct PutMessage);
3469   type = ntohl (put->type);
3470   expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
3471
3472   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3473     return GNUNET_SYSERR;
3474   if (GNUNET_OK !=
3475       GNUNET_BLOCK_get_key (block_ctx,
3476                             type,
3477                             &put[1],
3478                             dsize,
3479                             &query))
3480     {
3481       GNUNET_break_op (0);
3482       return GNUNET_SYSERR;
3483     }
3484 #if DEBUG_FS
3485   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3486               "Received result for query `%s' from peer `%4s'\n",
3487               GNUNET_h2s (&query),
3488               GNUNET_i2s (other));
3489 #endif
3490   GNUNET_STATISTICS_update (stats,
3491                             gettext_noop ("# replies received (overall)"),
3492                             1,
3493                             GNUNET_NO);
3494   /* now, lookup 'query' */
3495   prq.data = (const void*) &put[1];
3496   if (other != NULL)
3497     prq.sender = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3498                                                     &other->hashPubKey);
3499   else
3500     prq.sender = NULL;
3501   prq.size = dsize;
3502   prq.type = type;
3503   prq.expiration = expiration;
3504   prq.priority = 0;
3505   prq.finished = GNUNET_NO;
3506   prq.request_found = GNUNET_NO;
3507   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3508                                               &query,
3509                                               &process_reply,
3510                                               &prq);
3511   if (prq.sender != NULL)
3512     {
3513       prq.sender->inc_preference += CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority;
3514       prq.sender->trust += prq.priority;
3515     }
3516   if ( (GNUNET_YES == active_migration) &&
3517        (GNUNET_NO == test_put_load_too_high (prq.priority)) )
3518     {      
3519 #if DEBUG_FS
3520       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3521                   "Replicating result for query `%s' with priority %u\n",
3522                   GNUNET_h2s (&query),
3523                   prq.priority);
3524 #endif
3525       start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
3526       *start = GNUNET_TIME_absolute_get ();
3527       GNUNET_DATASTORE_put (dsh,
3528                             0, &query, dsize, &put[1],
3529                             type, prq.priority, 1 /* anonymity */, 
3530                             expiration, 
3531                             1 + prq.priority, MAX_DATASTORE_QUEUE,
3532                             GNUNET_CONSTANTS_SERVICE_TIMEOUT,
3533                             &put_migration_continuation, 
3534                             start);
3535     }
3536   putl = GNUNET_LOAD_get_load (datastore_put_load);
3537   if ( (GNUNET_NO == prq.request_found) &&
3538        ( (GNUNET_YES != active_migration) ||
3539          (putl > 2.5 * (1 + prq.priority)) ) )
3540     {
3541       cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3542                                               &other->hashPubKey);
3543       if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).value < 5000)
3544         return GNUNET_OK; /* already blocked */
3545       /* We're too busy; send MigrationStop message! */
3546       if (GNUNET_YES != active_migration) 
3547         putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
3548       block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
3549                                                   5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3550                                                                                    (unsigned int) (60000 * putl * putl)));
3551       
3552       cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
3553       pm = GNUNET_malloc (sizeof (struct PendingMessage) + 
3554                           sizeof (struct MigrationStopMessage));
3555       pm->msize = sizeof (struct MigrationStopMessage);
3556       pm->priority = UINT32_MAX;
3557       msm = (struct MigrationStopMessage*) &pm[1];
3558       msm->header.size = htons (sizeof (struct MigrationStopMessage));
3559       msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
3560       msm->duration = GNUNET_TIME_relative_hton (block_time);
3561       add_to_pending_messages_for_peer (cp,
3562                                         pm,
3563                                         NULL);
3564     }
3565   return GNUNET_OK;
3566 }
3567
3568
3569 /**
3570  * Handle P2P "MIGRATION_STOP" message.
3571  *
3572  * @param cls closure, always NULL
3573  * @param other the other peer involved (sender or receiver, NULL
3574  *        for loopback messages where we are both sender and receiver)
3575  * @param message the actual message
3576  * @param latency reported latency of the connection with 'other'
3577  * @param distance reported distance (DV) to 'other' 
3578  * @return GNUNET_OK to keep the connection open,
3579  *         GNUNET_SYSERR to close it (signal serious error)
3580  */
3581 static int
3582 handle_p2p_migration_stop (void *cls,
3583                            const struct GNUNET_PeerIdentity *other,
3584                            const struct GNUNET_MessageHeader *message,
3585                            struct GNUNET_TIME_Relative latency,
3586                            uint32_t distance)
3587 {
3588   struct ConnectedPeer *cp; 
3589   const struct MigrationStopMessage *msm;
3590
3591   msm = (const struct MigrationStopMessage*) message;
3592   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3593                                           &other->hashPubKey);
3594   if (cp == NULL)
3595     {
3596       GNUNET_break (0);
3597       return GNUNET_OK;
3598     }
3599   cp->migration_blocked = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration));
3600   return GNUNET_OK;
3601 }
3602
3603
3604
3605 /* **************************** P2P GET Handling ************************ */
3606
3607
3608 /**
3609  * Closure for 'check_duplicate_request_{peer,client}'.
3610  */
3611 struct CheckDuplicateRequestClosure
3612 {
3613   /**
3614    * The new request we should check if it already exists.
3615    */
3616   const struct PendingRequest *pr;
3617
3618   /**
3619    * Existing request found by the checker, NULL if none.
3620    */
3621   struct PendingRequest *have;
3622 };
3623
3624
3625 /**
3626  * Iterator over entries in the 'query_request_map' that
3627  * tries to see if we have the same request pending from
3628  * the same client already.
3629  *
3630  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
3631  * @param key current key code (query, ignored, must match)
3632  * @param value value in the hash map (a 'struct PendingRequest' 
3633  *              that already exists)
3634  * @return GNUNET_YES if we should continue to
3635  *         iterate (no match yet)
3636  *         GNUNET_NO if not (match found).
3637  */
3638 static int
3639 check_duplicate_request_client (void *cls,
3640                                 const GNUNET_HashCode * key,
3641                                 void *value)
3642 {
3643   struct CheckDuplicateRequestClosure *cdc = cls;
3644   struct PendingRequest *have = value;
3645
3646   if (have->client_request_list == NULL)
3647     return GNUNET_YES;
3648   if ( (cdc->pr->client_request_list->client_list->client == have->client_request_list->client_list->client) &&
3649        (cdc->pr != have) )
3650     {
3651       cdc->have = have;
3652       return GNUNET_NO;
3653     }
3654   return GNUNET_YES;
3655 }
3656
3657
3658 /**
3659  * We're processing (local) results for a search request
3660  * from another peer.  Pass applicable results to the
3661  * peer and if we are done either clean up (operation
3662  * complete) or forward to other peers (more results possible).
3663  *
3664  * @param cls our closure (struct LocalGetContext)
3665  * @param key key for the content
3666  * @param size number of bytes in data
3667  * @param data content stored
3668  * @param type type of the content
3669  * @param priority priority of the content
3670  * @param anonymity anonymity-level for the content
3671  * @param expiration expiration time for the content
3672  * @param uid unique identifier for the datum;
3673  *        maybe 0 if no unique identifier is available
3674  */
3675 static void
3676 process_local_reply (void *cls,
3677                      const GNUNET_HashCode * key,
3678                      size_t size,
3679                      const void *data,
3680                      enum GNUNET_BLOCK_Type type,
3681                      uint32_t priority,
3682                      uint32_t anonymity,
3683                      struct GNUNET_TIME_Absolute
3684                      expiration, 
3685                      uint64_t uid)
3686 {
3687   struct PendingRequest *pr = cls;
3688   struct ProcessReplyClosure prq;
3689   struct CheckDuplicateRequestClosure cdrc;
3690   GNUNET_HashCode query;
3691   unsigned int old_rf;
3692   
3693   if (NULL == key)
3694     {
3695 #if DEBUG_FS > 1
3696       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3697                   "Done processing local replies, forwarding request to other peers.\n");
3698 #endif
3699       pr->qe = NULL;
3700       if (pr->client_request_list != NULL)
3701         {
3702           GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
3703                                       GNUNET_YES);
3704           /* Figure out if this is a duplicate request and possibly
3705              merge 'struct PendingRequest' entries */
3706           cdrc.have = NULL;
3707           cdrc.pr = pr;
3708           GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3709                                                       &pr->query,
3710                                                       &check_duplicate_request_client,
3711                                                       &cdrc);
3712           if (cdrc.have != NULL)
3713             {
3714 #if DEBUG_FS
3715               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3716                           "Received request for block `%s' twice from client, will only request once.\n",
3717                           GNUNET_h2s (&pr->query));
3718 #endif
3719               
3720               destroy_pending_request (pr);
3721               return;
3722             }
3723         }
3724       if (pr->local_only == GNUNET_YES)
3725         {
3726           destroy_pending_request (pr);
3727           return;
3728         }
3729       /* no more results */
3730       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
3731         pr->task = GNUNET_SCHEDULER_add_now (sched,
3732                                              &forward_request_task,
3733                                              pr);      
3734       return;
3735     }
3736 #if DEBUG_FS
3737   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3738               "New local response to `%s' of type %u.\n",
3739               GNUNET_h2s (key),
3740               type);
3741 #endif
3742   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3743     {
3744 #if DEBUG_FS
3745       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3746                   "Found ONDEMAND block, performing on-demand encoding\n");
3747 #endif
3748       GNUNET_STATISTICS_update (stats,
3749                                 gettext_noop ("# on-demand blocks matched requests"),
3750                                 1,
3751                                 GNUNET_NO);
3752       if (GNUNET_OK != 
3753           GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, 
3754                                             anonymity, expiration, uid, 
3755                                             &process_local_reply,
3756                                             pr))
3757       if (pr->qe != NULL)
3758         {
3759           GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3760         }
3761       return;
3762     }
3763   old_rf = pr->results_found;
3764   memset (&prq, 0, sizeof (prq));
3765   prq.data = data;
3766   prq.expiration = expiration;
3767   prq.size = size;  
3768   if (GNUNET_OK != 
3769       GNUNET_BLOCK_get_key (block_ctx,
3770                             type,
3771                             data,
3772                             size,
3773                             &query))
3774     {
3775       GNUNET_break (0);
3776       GNUNET_DATASTORE_remove (dsh,
3777                                key,
3778                                size, data,
3779                                -1, -1, 
3780                                GNUNET_TIME_UNIT_FOREVER_REL,
3781                                NULL, NULL);
3782       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3783       return;
3784     }
3785   prq.type = type;
3786   prq.priority = priority;  
3787   prq.finished = GNUNET_NO;
3788   prq.request_found = GNUNET_NO;
3789   if ( (old_rf == 0) &&
3790        (pr->results_found == 0) )
3791     update_datastore_delays (pr->start_time);
3792   process_reply (&prq, key, pr);
3793   if (prq.finished == GNUNET_YES)
3794     return;
3795   if (pr->qe == NULL)
3796     return; /* done here */
3797   if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
3798     {
3799       pr->local_only = GNUNET_YES; /* do not forward */
3800       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3801       return;
3802     }
3803   if ( (pr->client_request_list == NULL) &&
3804        ( (GNUNET_YES == test_get_load_too_high (0)) ||
3805          (pr->results_found > 5 + 2 * pr->priority) ) )
3806     {
3807 #if DEBUG_FS > 2
3808       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3809                   "Load too high, done with request\n");
3810 #endif
3811       GNUNET_STATISTICS_update (stats,
3812                                 gettext_noop ("# processing result set cut short due to load"),
3813                                 1,
3814                                 GNUNET_NO);
3815       /* FIXME: if this is activated, we might stall large downloads
3816          indefinitely since (presumably) the load can never go down again! */
3817 #if 0
3818       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3819       return;
3820 #endif
3821     }
3822   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3823 }
3824
3825
3826 /**
3827  * We've received a request with the specified priority.  Bound it
3828  * according to how much we trust the given peer.
3829  * 
3830  * @param prio_in requested priority
3831  * @param cp the peer making the request
3832  * @return effective priority
3833  */
3834 static int32_t
3835 bound_priority (uint32_t prio_in,
3836                 struct ConnectedPeer *cp)
3837 {
3838 #define N ((double)128.0)
3839   uint32_t ret;
3840   double rret;
3841   int ld;
3842
3843   ld = test_get_load_too_high (0);
3844   if (ld == GNUNET_SYSERR)
3845     return 0; /* excess resources */
3846   ret = change_host_trust (cp, prio_in);
3847   if (ret > 0)
3848     {
3849       if (ret > current_priorities + N)
3850         rret = current_priorities + N;
3851       else
3852         rret = ret;
3853       current_priorities 
3854         = (current_priorities * (N-1) + rret)/N;
3855     }
3856   if ( (ld == GNUNET_YES) && (ret > 0) )
3857     {
3858       /* try with charging */
3859       ld = test_get_load_too_high (ret);
3860     }
3861   if (ld == GNUNET_YES)
3862     {
3863       /* undo charge */
3864       if (ret != 0)
3865         change_host_trust (cp, -ret);
3866       return -1; /* not enough resources */
3867     }
3868 #undef N
3869   return ret;
3870 }
3871
3872
3873 /**
3874  * Iterator over entries in the 'query_request_map' that
3875  * tries to see if we have the same request pending from
3876  * the same peer already.
3877  *
3878  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
3879  * @param key current key code (query, ignored, must match)
3880  * @param value value in the hash map (a 'struct PendingRequest' 
3881  *              that already exists)
3882  * @return GNUNET_YES if we should continue to
3883  *         iterate (no match yet)
3884  *         GNUNET_NO if not (match found).
3885  */
3886 static int
3887 check_duplicate_request_peer (void *cls,
3888                               const GNUNET_HashCode * key,
3889                               void *value)
3890 {
3891   struct CheckDuplicateRequestClosure *cdc = cls;
3892   struct PendingRequest *have = value;
3893
3894   if (cdc->pr->target_pid == have->target_pid)
3895     {
3896       cdc->have = have;
3897       return GNUNET_NO;
3898     }
3899   return GNUNET_YES;
3900 }
3901
3902
3903 /**
3904  * Handle P2P "GET" request.
3905  *
3906  * @param cls closure, always NULL
3907  * @param other the other peer involved (sender or receiver, NULL
3908  *        for loopback messages where we are both sender and receiver)
3909  * @param message the actual message
3910  * @param latency reported latency of the connection with 'other'
3911  * @param distance reported distance (DV) to 'other' 
3912  * @return GNUNET_OK to keep the connection open,
3913  *         GNUNET_SYSERR to close it (signal serious error)
3914  */
3915 static int
3916 handle_p2p_get (void *cls,
3917                 const struct GNUNET_PeerIdentity *other,
3918                 const struct GNUNET_MessageHeader *message,
3919                 struct GNUNET_TIME_Relative latency,
3920                 uint32_t distance)
3921 {
3922   struct PendingRequest *pr;
3923   struct ConnectedPeer *cp;
3924   struct ConnectedPeer *cps;
3925   struct CheckDuplicateRequestClosure cdc;
3926   struct GNUNET_TIME_Relative timeout;
3927   uint16_t msize;
3928   const struct GetMessage *gm;
3929   unsigned int bits;
3930   const GNUNET_HashCode *opt;
3931   uint32_t bm;
3932   size_t bfsize;
3933   uint32_t ttl_decrement;
3934   int32_t priority;
3935   enum GNUNET_BLOCK_Type type;
3936   int have_ns;
3937
3938   msize = ntohs(message->size);
3939   if (msize < sizeof (struct GetMessage))
3940     {
3941       GNUNET_break_op (0);
3942       return GNUNET_SYSERR;
3943     }
3944   gm = (const struct GetMessage*) message;
3945 #if DEBUG_FS
3946   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3947               "Received request for `%s'\n",
3948               GNUNET_h2s (&gm->query));
3949 #endif
3950   type = ntohl (gm->type);
3951   bm = ntohl (gm->hash_bitmap);
3952   bits = 0;
3953   while (bm > 0)
3954     {
3955       if (1 == (bm & 1))
3956         bits++;
3957       bm >>= 1;
3958     }
3959   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
3960     {
3961       GNUNET_break_op (0);
3962       return GNUNET_SYSERR;
3963     }  
3964   opt = (const GNUNET_HashCode*) &gm[1];
3965   bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
3966   bm = ntohl (gm->hash_bitmap);
3967   bits = 0;
3968   cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3969                                            &other->hashPubKey);
3970   if (NULL == cps)
3971     {
3972       /* peer must have just disconnected */
3973       GNUNET_STATISTICS_update (stats,
3974                                 gettext_noop ("# requests dropped due to initiator not being connected"),
3975                                 1,
3976                                 GNUNET_NO);
3977       return GNUNET_SYSERR;
3978     }
3979   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
3980     cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3981                                             &opt[bits++]);
3982   else
3983     cp = cps;
3984   if (cp == NULL)
3985     {
3986 #if DEBUG_FS
3987       if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
3988         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3989                     "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
3990                     GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
3991       
3992       else
3993         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3994                     "Failed to find peer `%4s' in connection set. Dropping query.\n",
3995                     GNUNET_i2s (other));
3996 #endif
3997       GNUNET_STATISTICS_update (stats,
3998                                 gettext_noop ("# requests dropped due to missing reverse route"),
3999                                 1,
4000                                 GNUNET_NO);
4001      /* FIXME: try connect? */
4002       return GNUNET_OK;
4003     }
4004   /* note that we can really only check load here since otherwise
4005      peers could find out that we are overloaded by not being
4006      disconnected after sending us a malformed query... */
4007   priority = bound_priority (ntohl (gm->priority), cps);
4008   if (priority < 0)
4009     {
4010 #if DEBUG_FS
4011       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4012                   "Dropping query from `%s', this peer is too busy.\n",
4013                   GNUNET_i2s (other));
4014 #endif
4015       GNUNET_STATISTICS_update (stats,
4016                                 gettext_noop ("# requests dropped due to high load"),
4017                                 1,
4018                                 GNUNET_NO);
4019 #if 0
4020       /* FIXME: this causes problems... */
4021       return GNUNET_OK;
4022 #endif
4023     }
4024 #if DEBUG_FS 
4025   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4026               "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
4027               GNUNET_h2s (&gm->query),
4028               (unsigned int) type,
4029               GNUNET_i2s (other),
4030               (unsigned int) bm);
4031 #endif
4032   have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
4033   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
4034                       (have_ns ? sizeof(GNUNET_HashCode) : 0));
4035   if (have_ns)
4036     {
4037       pr->namespace = (GNUNET_HashCode*) &pr[1];
4038       memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
4039     }
4040   if ( (GNUNET_LOAD_get_load (cp->transmission_delay) > 3 * (1 + priority)) ||
4041        (GNUNET_LOAD_get_average (cp->transmission_delay) > 
4042         GNUNET_CONSTANTS_MAX_CORK_DELAY.value * 2 + GNUNET_LOAD_get_average (rt_entry_lifetime)) )
4043     {
4044       /* don't have BW to send to peer, or would likely take longer than we have for it,
4045          so at best indirect the query */
4046       priority = 0;
4047       /* FIXME: if this line is enabled, the 'perf' test for larger files simply "hangs";
4048          the cause seems to be that the load goes up (to the point where we do this)
4049          and then never goes down again... (outch) */
4050       // pr->forward_only = GNUNET_YES;
4051     }
4052   pr->type = type;
4053   pr->mingle = ntohl (gm->filter_mutator);
4054   if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
4055     pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
4056   pr->anonymity_level = 1;
4057   pr->priority = (uint32_t) priority;
4058   pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
4059   pr->query = gm->query;
4060   /* decrement ttl (always) */
4061   ttl_decrement = 2 * TTL_DECREMENT +
4062     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
4063                               TTL_DECREMENT);
4064   if ( (pr->ttl < 0) &&
4065        (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
4066     {
4067 #if DEBUG_FS
4068       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4069                   "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
4070                   GNUNET_i2s (other),
4071                   pr->ttl,
4072                   ttl_decrement);
4073 #endif
4074       GNUNET_STATISTICS_update (stats,
4075                                 gettext_noop ("# requests dropped due TTL underflow"),
4076                                 1,
4077                                 GNUNET_NO);
4078       /* integer underflow => drop (should be very rare)! */      
4079       GNUNET_free (pr);
4080       return GNUNET_OK;
4081     } 
4082   pr->ttl -= ttl_decrement;
4083   pr->start_time = GNUNET_TIME_absolute_get ();
4084
4085   /* get bloom filter */
4086   if (bfsize > 0)
4087     {
4088       pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
4089                                                   bfsize,
4090                                                   BLOOMFILTER_K);
4091       pr->bf_size = bfsize;
4092     }
4093   cdc.have = NULL;
4094   cdc.pr = pr;
4095   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
4096                                               &gm->query,
4097                                               &check_duplicate_request_peer,
4098                                               &cdc);
4099   if (cdc.have != NULL)
4100     {
4101       if (cdc.have->start_time.value + cdc.have->ttl >=
4102           pr->start_time.value + pr->ttl)
4103         {
4104           /* existing request has higher TTL, drop new one! */
4105           cdc.have->priority += pr->priority;
4106           destroy_pending_request (pr);
4107 #if DEBUG_FS
4108           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4109                       "Have existing request with higher TTL, dropping new request.\n",
4110                       GNUNET_i2s (other));
4111 #endif
4112           GNUNET_STATISTICS_update (stats,
4113                                     gettext_noop ("# requests dropped due to higher-TTL request"),
4114                                     1,
4115                                     GNUNET_NO);
4116           return GNUNET_OK;
4117         }
4118       else
4119         {
4120           /* existing request has lower TTL, drop old one! */
4121           pr->priority += cdc.have->priority;
4122           /* Possible optimization: if we have applicable pending
4123              replies in 'cdc.have', we might want to move those over
4124              (this is a really rare special-case, so it is not clear
4125              that this would be worth it) */
4126           destroy_pending_request (cdc.have);
4127           /* keep processing 'pr'! */
4128         }
4129     }
4130
4131   pr->cp = cp;
4132   GNUNET_break (GNUNET_OK ==
4133                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
4134                                                    &gm->query,
4135                                                    pr,
4136                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4137   GNUNET_break (GNUNET_OK ==
4138                 GNUNET_CONTAINER_multihashmap_put (peer_request_map,
4139                                                    &other->hashPubKey,
4140                                                    pr,
4141                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4142   
4143   pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
4144                                             pr,
4145                                             pr->start_time.value + pr->ttl);
4146
4147   GNUNET_STATISTICS_update (stats,
4148                             gettext_noop ("# P2P searches received"),
4149                             1,
4150                             GNUNET_NO);
4151   GNUNET_STATISTICS_update (stats,
4152                             gettext_noop ("# P2P searches active"),
4153                             1,
4154                             GNUNET_NO);
4155
4156   /* calculate change in traffic preference */
4157   cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
4158   /* process locally */
4159   if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
4160     type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
4161   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
4162                                            (pr->priority + 1)); 
4163   if (GNUNET_YES != pr->forward_only)
4164     {
4165 #if DEBUG_FS
4166       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4167                   "Handing request for `%s' to datastore\n",
4168                   GNUNET_h2s (&gm->query));
4169 #endif
4170       pr->qe = GNUNET_DATASTORE_get (dsh,
4171                                      &gm->query,
4172                                      type,                             
4173                                      pr->priority + 1,
4174                                      MAX_DATASTORE_QUEUE,                                
4175                                      timeout,
4176                                      &process_local_reply,
4177                                      pr);
4178       if (NULL == pr->qe)
4179         {
4180           GNUNET_STATISTICS_update (stats,
4181                                     gettext_noop ("# requests dropped by datastore (queue length limit)"),
4182                                     1,
4183                                     GNUNET_NO);
4184         }
4185     }
4186   else
4187     {
4188       GNUNET_STATISTICS_update (stats,
4189                                 gettext_noop ("# requests forwarded due to high load"),
4190                                 1,
4191                                 GNUNET_NO);
4192     }
4193
4194   /* Are multiple results possible (and did we look locally)?  If so, start processing remotely now! */
4195   switch (pr->type)
4196     {
4197     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
4198     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
4199       /* only one result, wait for datastore */
4200       if (GNUNET_YES != pr->forward_only)
4201         {
4202           GNUNET_STATISTICS_update (stats,
4203                                     gettext_noop ("# requests not instantly forwarded (waiting for datastore)"),
4204                                     1,
4205                                     GNUNET_NO);
4206           break;
4207         }
4208     default:
4209       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
4210         pr->task = GNUNET_SCHEDULER_add_now (sched,
4211                                              &forward_request_task,
4212                                              pr);
4213     }
4214
4215   /* make sure we don't track too many requests */
4216   if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
4217     {
4218       pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
4219       GNUNET_assert (pr != NULL);
4220       destroy_pending_request (pr);
4221     }
4222   return GNUNET_OK;
4223 }
4224
4225
4226 /* **************************** CS GET Handling ************************ */
4227
4228
4229 /**
4230  * Handle START_SEARCH-message (search request from client).
4231  *
4232  * @param cls closure
4233  * @param client identification of the client
4234  * @param message the actual message
4235  */
4236 static void
4237 handle_start_search (void *cls,
4238                      struct GNUNET_SERVER_Client *client,
4239                      const struct GNUNET_MessageHeader *message)
4240 {
4241   static GNUNET_HashCode all_zeros;
4242   const struct SearchMessage *sm;
4243   struct ClientList *cl;
4244   struct ClientRequestList *crl;
4245   struct PendingRequest *pr;
4246   uint16_t msize;
4247   unsigned int sc;
4248   enum GNUNET_BLOCK_Type type;
4249
4250   msize = ntohs (message->size);
4251   if ( (msize < sizeof (struct SearchMessage)) ||
4252        (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
4253     {
4254       GNUNET_break (0);
4255       GNUNET_SERVER_receive_done (client,
4256                                   GNUNET_SYSERR);
4257       return;
4258     }
4259   GNUNET_STATISTICS_update (stats,
4260                             gettext_noop ("# client searches received"),
4261                             1,
4262                             GNUNET_NO);
4263   sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
4264   sm = (const struct SearchMessage*) message;
4265   type = ntohl (sm->type);
4266 #if DEBUG_FS
4267   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4268               "Received request for `%s' of type %u from local client\n",
4269               GNUNET_h2s (&sm->query),
4270               (unsigned int) type);
4271 #endif
4272   cl = client_list;
4273   while ( (cl != NULL) &&
4274           (cl->client != client) )
4275     cl = cl->next;
4276   if (cl == NULL)
4277     {
4278       cl = GNUNET_malloc (sizeof (struct ClientList));
4279       cl->client = client;
4280       GNUNET_SERVER_client_keep (client);
4281       cl->next = client_list;
4282       client_list = cl;
4283     }
4284   /* detect duplicate KBLOCK requests */
4285   if ( (type == GNUNET_BLOCK_TYPE_FS_KBLOCK) ||
4286        (type == GNUNET_BLOCK_TYPE_FS_NBLOCK) ||
4287        (type == GNUNET_BLOCK_TYPE_ANY) )
4288     {
4289       crl = cl->rl_head;
4290       while ( (crl != NULL) &&
4291               ( (0 != memcmp (&crl->req->query,
4292                               &sm->query,
4293                               sizeof (GNUNET_HashCode))) ||
4294                 (crl->req->type != type) ) )
4295         crl = crl->next;
4296       if (crl != NULL)  
4297         { 
4298 #if DEBUG_FS
4299           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4300                       "Have existing request, merging content-seen lists.\n");
4301 #endif
4302           pr = crl->req;
4303           /* Duplicate request (used to send long list of
4304              known/blocked results); merge 'pr->replies_seen'
4305              and update bloom filter */
4306           GNUNET_array_grow (pr->replies_seen,
4307                              pr->replies_seen_size,
4308                              pr->replies_seen_off + sc);
4309           memcpy (&pr->replies_seen[pr->replies_seen_off],
4310                   &sm[1],
4311                   sc * sizeof (GNUNET_HashCode));
4312           pr->replies_seen_off += sc;
4313           refresh_bloomfilter (pr);
4314           GNUNET_STATISTICS_update (stats,
4315                                     gettext_noop ("# client searches updated (merged content seen list)"),
4316                                     1,
4317                                     GNUNET_NO);
4318           GNUNET_SERVER_receive_done (client,
4319                                       GNUNET_OK);
4320           return;
4321         }
4322     }
4323   GNUNET_STATISTICS_update (stats,
4324                             gettext_noop ("# client searches active"),
4325                             1,
4326                             GNUNET_NO);
4327   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
4328                       ((type == GNUNET_BLOCK_TYPE_FS_SBLOCK) ? sizeof(GNUNET_HashCode) : 0));
4329   crl = GNUNET_malloc (sizeof (struct ClientRequestList));
4330   memset (crl, 0, sizeof (struct ClientRequestList));
4331   crl->client_list = cl;
4332   GNUNET_CONTAINER_DLL_insert (cl->rl_head,
4333                                cl->rl_tail,
4334                                crl);  
4335   crl->req = pr;
4336   pr->type = type;
4337   pr->client_request_list = crl;
4338   GNUNET_array_grow (pr->replies_seen,
4339                      pr->replies_seen_size,
4340                      sc);
4341   memcpy (pr->replies_seen,
4342           &sm[1],
4343           sc * sizeof (GNUNET_HashCode));
4344   pr->replies_seen_off = sc;
4345   pr->anonymity_level = ntohl (sm->anonymity_level); 
4346   pr->start_time = GNUNET_TIME_absolute_get ();
4347   refresh_bloomfilter (pr);
4348   pr->query = sm->query;
4349   if (0 == (1 & ntohl (sm->options)))
4350     pr->local_only = GNUNET_NO;
4351   else
4352     pr->local_only = GNUNET_YES;
4353   switch (type)
4354     {
4355     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
4356     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
4357       if (0 != memcmp (&sm->target,
4358                        &all_zeros,
4359                        sizeof (GNUNET_HashCode)))
4360         pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target);
4361       break;
4362     case GNUNET_BLOCK_TYPE_FS_SBLOCK:
4363       pr->namespace = (GNUNET_HashCode*) &pr[1];
4364       memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode));
4365       break;
4366     default:
4367       break;
4368     }
4369   GNUNET_break (GNUNET_OK ==
4370                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
4371                                                    &sm->query,
4372                                                    pr,
4373                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4374   if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
4375     type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */
4376   pr->qe = GNUNET_DATASTORE_get (dsh,
4377                                  &sm->query,
4378                                  type,
4379                                  -3, -1,
4380                                  GNUNET_CONSTANTS_SERVICE_TIMEOUT,                             
4381                                  &process_local_reply,
4382                                  pr);
4383 }
4384
4385
4386 /* **************************** Startup ************************ */
4387
4388 /**
4389  * Process fs requests.
4390  *
4391  * @param s scheduler to use
4392  * @param server the initialized server
4393  * @param c configuration to use
4394  */
4395 static int
4396 main_init (struct GNUNET_SCHEDULER_Handle *s,
4397            struct GNUNET_SERVER_Handle *server,
4398            const struct GNUNET_CONFIGURATION_Handle *c)
4399 {
4400   static const struct GNUNET_CORE_MessageHandler p2p_handlers[] =
4401     {
4402       { &handle_p2p_get, 
4403         GNUNET_MESSAGE_TYPE_FS_GET, 0 },
4404       { &handle_p2p_put, 
4405         GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
4406       { &handle_p2p_migration_stop, 
4407         GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP,
4408         sizeof (struct MigrationStopMessage) },
4409       { NULL, 0, 0 }
4410     };
4411   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
4412     {&GNUNET_FS_handle_index_start, NULL, 
4413      GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
4414     {&GNUNET_FS_handle_index_list_get, NULL, 
4415      GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
4416     {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
4417      sizeof (struct UnindexMessage) },
4418     {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
4419      0 },
4420     {NULL, NULL, 0, 0}
4421   };
4422   unsigned long long enc = 128;
4423
4424   sched = s;
4425   cfg = c;
4426   stats = GNUNET_STATISTICS_create (sched, "fs", cfg);
4427   min_migration_delay = GNUNET_TIME_UNIT_SECONDS;
4428   if ( (GNUNET_OK !=
4429         GNUNET_CONFIGURATION_get_value_number (cfg,
4430                                                "fs",
4431                                                "MAX_PENDING_REQUESTS",
4432                                                &max_pending_requests)) ||
4433        (GNUNET_OK !=
4434         GNUNET_CONFIGURATION_get_value_number (cfg,
4435                                                "fs",
4436                                                "EXPECTED_NEIGHBOUR_COUNT",
4437                                                &enc)) ||
4438        (GNUNET_OK != 
4439         GNUNET_CONFIGURATION_get_value_time (cfg,
4440                                              "fs",
4441                                              "MIN_MIGRATION_DELAY",
4442                                              &min_migration_delay)) )
4443     {
4444       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4445                   _("Configuration fails to specify certain parameters, assuming default values."));
4446     }
4447   connected_peers = GNUNET_CONTAINER_multihashmap_create (enc); 
4448   query_request_map = GNUNET_CONTAINER_multihashmap_create (max_pending_requests);
4449   rt_entry_lifetime = GNUNET_LOAD_value_init ();
4450   peer_request_map = GNUNET_CONTAINER_multihashmap_create (enc);
4451   requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
4452   core = GNUNET_CORE_connect (sched,
4453                               cfg,
4454                               GNUNET_TIME_UNIT_FOREVER_REL,
4455                               NULL,
4456                               NULL,
4457                               &peer_connect_handler,
4458                               &peer_disconnect_handler,
4459                               NULL,
4460                               NULL, GNUNET_NO,
4461                               NULL, GNUNET_NO,
4462                               p2p_handlers);
4463   if (NULL == core)
4464     {
4465       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
4466                   _("Failed to connect to `%s' service.\n"),
4467                   "core");
4468       GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
4469       connected_peers = NULL;
4470       GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
4471       query_request_map = NULL;
4472       GNUNET_LOAD_value_free (rt_entry_lifetime);
4473       rt_entry_lifetime = NULL;
4474       GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
4475       requests_by_expiration_heap = NULL;
4476       GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
4477       peer_request_map = NULL;
4478       if (dsh != NULL)
4479         {
4480           GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4481           dsh = NULL;
4482         }
4483       return GNUNET_SYSERR;
4484     }
4485   /* FIXME: distinguish between sending and storing in options? */
4486   if (active_migration) 
4487     {
4488       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4489                   _("Content migration is enabled, will start to gather data\n"));
4490       consider_migration_gathering ();
4491     }
4492   consider_dht_put_gathering (NULL);
4493   GNUNET_SERVER_disconnect_notify (server, 
4494                                    &handle_client_disconnect,
4495                                    NULL);
4496   GNUNET_assert (GNUNET_OK ==
4497                  GNUNET_CONFIGURATION_get_value_filename (cfg,
4498                                                           "fs",
4499                                                           "TRUST",
4500                                                           &trustDirectory));
4501   GNUNET_DISK_directory_create (trustDirectory);
4502   GNUNET_SCHEDULER_add_with_priority (sched,
4503                                       GNUNET_SCHEDULER_PRIORITY_HIGH,
4504                                       &cron_flush_trust, NULL);
4505
4506
4507   GNUNET_SERVER_add_handlers (server, handlers);
4508   GNUNET_SCHEDULER_add_delayed (sched,
4509                                 GNUNET_TIME_UNIT_FOREVER_REL,
4510                                 &shutdown_task,
4511                                 NULL);
4512   return GNUNET_OK;
4513 }
4514
4515
4516 /**
4517  * Process fs requests.
4518  *
4519  * @param cls closure
4520  * @param sched scheduler to use
4521  * @param server the initialized server
4522  * @param cfg configuration to use
4523  */
4524 static void
4525 run (void *cls,
4526      struct GNUNET_SCHEDULER_Handle *sched,
4527      struct GNUNET_SERVER_Handle *server,
4528      const struct GNUNET_CONFIGURATION_Handle *cfg)
4529 {
4530   active_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
4531                                                            "FS",
4532                                                            "ACTIVEMIGRATION");
4533   dsh = GNUNET_DATASTORE_connect (cfg,
4534                                   sched);
4535   if (dsh == NULL)
4536     {
4537       GNUNET_SCHEDULER_shutdown (sched);
4538       return;
4539     }
4540   datastore_get_load = GNUNET_LOAD_value_init ();
4541   datastore_put_load = GNUNET_LOAD_value_init ();
4542   block_cfg = GNUNET_CONFIGURATION_create ();
4543   GNUNET_CONFIGURATION_set_value_string (block_cfg,
4544                                          "block",
4545                                          "PLUGINS",
4546                                          "fs");
4547   block_ctx = GNUNET_BLOCK_context_create (block_cfg);
4548   GNUNET_assert (NULL != block_ctx);
4549   dht_handle = GNUNET_DHT_connect (sched,
4550                                    cfg,
4551                                    FS_DHT_HT_SIZE);
4552   if ( (GNUNET_OK != GNUNET_FS_indexing_init (sched, cfg, dsh)) ||
4553        (GNUNET_OK != main_init (sched, server, cfg)) )
4554     {    
4555       GNUNET_SCHEDULER_shutdown (sched);
4556       GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4557       dsh = NULL;
4558       GNUNET_DHT_disconnect (dht_handle);
4559       dht_handle = NULL;
4560       GNUNET_BLOCK_context_destroy (block_ctx);
4561       block_ctx = NULL;
4562       GNUNET_CONFIGURATION_destroy (block_cfg);
4563       block_cfg = NULL;
4564       GNUNET_LOAD_value_free (datastore_get_load);
4565       datastore_get_load = NULL;
4566       GNUNET_LOAD_value_free (datastore_put_load);
4567       datastore_put_load = NULL;
4568       return;   
4569     }
4570 }
4571
4572
4573 /**
4574  * The main function for the fs service.
4575  *
4576  * @param argc number of arguments from the command line
4577  * @param argv command line arguments
4578  * @return 0 ok, 1 on error
4579  */
4580 int
4581 main (int argc, char *const *argv)
4582 {
4583   return (GNUNET_OK ==
4584           GNUNET_SERVICE_run (argc,
4585                               argv,
4586                               "fs",
4587                               GNUNET_SERVICE_OPTION_NONE,
4588                               &run, NULL)) ? 0 : 1;
4589 }
4590
4591 /* end of gnunet-service-fs.c */