3886e1de32940f2652b48699fceeefb10ee91e5f
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs.c
23  * @brief gnunet anonymity protocol implementation
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - more statistics
28  */
29 #include "platform.h"
30 #include <float.h>
31 #include "gnunet_constants.h"
32 #include "gnunet_core_service.h"
33 #include "gnunet_dht_service.h"
34 #include "gnunet_datastore_service.h"
35 #include "gnunet_load_lib.h"
36 #include "gnunet_peer_lib.h"
37 #include "gnunet_protocols.h"
38 #include "gnunet_signatures.h"
39 #include "gnunet_statistics_service.h"
40 #include "gnunet_util_lib.h"
41 #include "gnunet-service-fs_indexing.h"
42 #include "fs.h"
43
44 #define DEBUG_FS GNUNET_NO
45
46 /**
47  * Should we introduce random latency in processing?  Required for proper
48  * implementation of GAP, but can be disabled for performance evaluation of
49  * the basic routing algorithm.
50  *
51  * Note that with delays enabled, performance can be significantly lower
52  * (several orders of magnitude in 2-peer test runs); if you want to
53  * measure throughput of other components, set this to NO.  Also, you
54  * might want to consider changing 'RETRY_PROBABILITY_INV' to 1 for
55  * a rather wasteful mode of operation (that might still get the highest
56  * throughput overall).
57  *
58  * Performance measurements (for 50 MB file, 2 peers):
59  *
60  * - Without delays: 3300 kb/s
61  * - With    delays:  101 kb/s
62  */
63 #define SUPPORT_DELAYS GNUNET_NO
64
65 /**
66  * Size for the hash map for DHT requests from the FS
67  * service.  Should be about the number of concurrent
68  * DHT requests we plan to make.
69  */
70 #define FS_DHT_HT_SIZE 1024
71
72 /**
73  * At what frequency should our datastore load decrease
74  * automatically (since if we don't use it, clearly the
75  * load must be going down).
76  */
77 #define DATASTORE_LOAD_AUTODECLINE GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 250)
78
79 /**
80  * How often do we flush trust values to disk?
81  */
82 #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
83
84 /**
85  * How often do we at most PUT content into the DHT?
86  */
87 #define MAX_DHT_PUT_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
88
89 /**
90  * Inverse of the probability that we will submit the same query
91  * to the same peer again.  If the same peer already got the query
92  * repeatedly recently, the probability is multiplied by the inverse
93  * of this number each time.  Note that we only try about every TTL_DECREMENT/2
94  * plus MAX_CORK_DELAY (so roughly every 3.5s).
95  *
96  * Note that this factor is a key influence to performance in small
97  * networks (especially test networks of 2 peers) because if there is
98  * only a single peer with the data, this value will determine how
99  * soon we might re-try.  For example, a value of 3 can result in 
100  * 1.7 MB/s transfer rates for a 10 MB file when a value of 1 would
101  * give us 5 MB/s.  OTOH, obviously re-trying the same peer can be
102  * rather inefficient in larger networks, hence picking 1 is in 
103  * general not the best choice.
104  *
105  * Performance measurements (for 50 MB file, 2 peers, no delays):
106  *
107  * - 1: 3300 kb/s (consistently)
108  * - 3: 2046 kb/s, 754 kb/s, 3490 kb/s
109  * - 5:  759 kb/s, 968 kb/s, 1160 kb/s
110  *
111  * Note that this does NOT mean that the value should be 1 since
112  * a 2-peer network is far from representative here (and this fails
113  * to take into consideration bandwidth wasted by repeatedly 
114  * sending queries to peers that don't have the content).  Also,
115  * it is expected that higher values lead to more inconsistent
116  * measurements since this only affects lost messages towards the
117  * end of the download.
118  *
119  * Finally, we should probably consider changing this and making
120  * it dependent on the number of connected peers or a related
121  * metric (bad magic constants...).
122  */
123 #define RETRY_PROBABILITY_INV 1
124
125 /**
126  * What is the maximum delay for a P2P FS message (in our interaction
127  * with core)?  FS-internal delays are another story.  The value is
128  * chosen based on the 32k block size.  Given that peers typcially
129  * have at least 1 kb/s bandwidth, 45s waits give us a chance to
130  * transmit one message even to the lowest-bandwidth peers.
131  */
132 #define MAX_TRANSMIT_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 45)
133
134 /**
135  * Maximum number of requests (from other peers, overall) that we're
136  * willing to have pending at any given point in time.  Can be changed
137  * via the configuration file (32k is just the default).
138  */
139 static unsigned long long max_pending_requests = (32 * 1024);
140
141
142 /**
143  * Information we keep for each pending reply.  The
144  * actual message follows at the end of this struct.
145  */
146 struct PendingMessage;
147
148 /**
149  * Function called upon completion of a transmission.
150  *
151  * @param cls closure
152  * @param pid ID of receiving peer, 0 on transmission error
153  */
154 typedef void (*TransmissionContinuation)(void * cls, 
155                                          GNUNET_PEER_Id tpid);
156
157
158 /**
159  * Information we keep for each pending message (GET/PUT).  The
160  * actual message follows at the end of this struct.
161  */
162 struct PendingMessage
163 {
164   /**
165    * This is a doubly-linked list of messages to the same peer.
166    */
167   struct PendingMessage *next;
168
169   /**
170    * This is a doubly-linked list of messages to the same peer.
171    */
172   struct PendingMessage *prev;
173
174   /**
175    * Entry in pending message list for this pending message.
176    */ 
177   struct PendingMessageList *pml;  
178
179   /**
180    * Function to call immediately once we have transmitted this
181    * message.
182    */
183   TransmissionContinuation cont;
184
185   /**
186    * Closure for cont.
187    */
188   void *cont_cls;
189
190   /**
191    * Do not transmit this pending message until this deadline.
192    */
193   struct GNUNET_TIME_Absolute delay_until;
194
195   /**
196    * Size of the reply; actual reply message follows
197    * at the end of this struct.
198    */
199   size_t msize;
200   
201   /**
202    * How important is this message for us?
203    */
204   uint32_t priority;
205  
206 };
207
208
209 /**
210  * Information about a peer that we are connected to.
211  * We track data that is useful for determining which
212  * peers should receive our requests.  We also keep
213  * a list of messages to transmit to this peer.
214  */
215 struct ConnectedPeer
216 {
217
218   /**
219    * List of the last clients for which this peer successfully
220    * answered a query.
221    */
222   struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
223
224   /**
225    * List of the last PIDs for which
226    * this peer successfully answered a query;
227    * We use 0 to indicate no successful reply.
228    */
229   GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
230
231   /**
232    * Average delay between sending the peer a request and
233    * getting a reply (only calculated over the requests for
234    * which we actually got a reply).   Calculated
235    * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
236    */ 
237   struct GNUNET_TIME_Relative avg_delay;
238
239   /**
240    * Point in time until which this peer does not want us to migrate content
241    * to it.
242    */
243   struct GNUNET_TIME_Absolute migration_blocked;
244
245   /**
246    * Time until when we blocked this peer from migrating
247    * data to us.
248    */
249   struct GNUNET_TIME_Absolute last_migration_block;
250
251   /**
252    * Transmission times for the last MAX_QUEUE_PER_PEER
253    * requests for this peer.  Used as a ring buffer, current
254    * offset is stored in 'last_request_times_off'.  If the
255    * oldest entry is more recent than the 'avg_delay', we should
256    * not send any more requests right now.
257    */
258   struct GNUNET_TIME_Absolute last_request_times[MAX_QUEUE_PER_PEER];
259
260   /**
261    * Handle for an active request for transmission to this
262    * peer, or NULL.
263    */
264   struct GNUNET_CORE_TransmitHandle *cth;
265
266   /**
267    * Messages (replies, queries, content migration) we would like to
268    * send to this peer in the near future.  Sorted by priority, head.
269    */
270   struct PendingMessage *pending_messages_head;
271
272   /**
273    * Messages (replies, queries, content migration) we would like to
274    * send to this peer in the near future.  Sorted by priority, tail.
275    */
276   struct PendingMessage *pending_messages_tail;
277
278   /**
279    * How long does it typically take for us to transmit a message
280    * to this peer?  (delay between the request being issued and
281    * the callback being invoked).
282    */
283   struct GNUNET_LOAD_Value *transmission_delay;
284
285   /**
286    * Context of our GNUNET_CORE_peer_change_preference call (or NULL).
287    */
288   struct GNUNET_CORE_InformationRequestContext *irc;
289
290   /**
291    * Request for which 'irc' is currently active (or NULL).
292    */
293   struct PendingRequest *pr;
294
295   /**
296    * Time when the last transmission request was issued.
297    */
298   struct GNUNET_TIME_Absolute last_transmission_request_start;
299
300   /**
301    * ID of delay task for scheduling transmission.
302    */
303   GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task;
304
305   /**
306    * Average priority of successful replies.  Calculated
307    * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
308    */
309   double avg_priority;
310
311   /**
312    * Increase in traffic preference still to be submitted
313    * to the core service for this peer.
314    */
315   uint64_t inc_preference;
316
317   /**
318    * Trust rating for this peer
319    */
320   uint32_t trust;
321
322   /**
323    * Trust rating for this peer on disk.
324    */
325   uint32_t disk_trust;
326
327   /**
328    * The peer's identity.
329    */
330   GNUNET_PEER_Id pid;  
331
332   /**
333    * Size of the linked list of 'pending_messages'.
334    */
335   unsigned int pending_requests;
336
337   /**
338    * Which offset in "last_p2p_replies" will be updated next?
339    * (we go round-robin).
340    */
341   unsigned int last_p2p_replies_woff;
342
343   /**
344    * Which offset in "last_client_replies" will be updated next?
345    * (we go round-robin).
346    */
347   unsigned int last_client_replies_woff;
348
349   /**
350    * Current offset into 'last_request_times' ring buffer.
351    */
352   unsigned int last_request_times_off;
353
354 };
355
356
357 /**
358  * Information we keep for each pending request.  We should try to
359  * keep this struct as small as possible since its memory consumption
360  * is key to how many requests we can have pending at once.
361  */
362 struct PendingRequest;
363
364
365 /**
366  * Doubly-linked list of requests we are performing
367  * on behalf of the same client.
368  */
369 struct ClientRequestList
370 {
371
372   /**
373    * This is a doubly-linked list.
374    */
375   struct ClientRequestList *next;
376
377   /**
378    * This is a doubly-linked list.
379    */
380   struct ClientRequestList *prev;
381
382   /**
383    * Request this entry represents.
384    */
385   struct PendingRequest *req;
386
387   /**
388    * Client list this request belongs to.
389    */
390   struct ClientList *client_list;
391
392 };
393
394
395 /**
396  * Replies to be transmitted to the client.  The actual
397  * response message is allocated after this struct.
398  */
399 struct ClientResponseMessage
400 {
401   /**
402    * This is a doubly-linked list.
403    */
404   struct ClientResponseMessage *next;
405
406   /**
407    * This is a doubly-linked list.
408    */
409   struct ClientResponseMessage *prev;
410
411   /**
412    * Client list entry this response belongs to.
413    */
414   struct ClientList *client_list;
415
416   /**
417    * Number of bytes in the response.
418    */
419   size_t msize;
420 };
421
422
423 /**
424  * Linked list of clients we are performing requests
425  * for right now.
426  */
427 struct ClientList
428 {
429   /**
430    * This is a linked list.
431    */
432   struct ClientList *next;
433
434   /**
435    * ID of a client making a request, NULL if this entry is for a
436    * peer.
437    */
438   struct GNUNET_SERVER_Client *client;
439
440   /**
441    * Head of list of requests performed on behalf
442    * of this client right now.
443    */
444   struct ClientRequestList *rl_head;
445
446   /**
447    * Tail of list of requests performed on behalf
448    * of this client right now.
449    */
450   struct ClientRequestList *rl_tail;
451
452   /**
453    * Head of linked list of responses.
454    */
455   struct ClientResponseMessage *res_head;
456
457   /**
458    * Tail of linked list of responses.
459    */
460   struct ClientResponseMessage *res_tail;
461
462   /**
463    * Context for sending replies.
464    */
465   struct GNUNET_CONNECTION_TransmitHandle *th;
466
467 };
468
469
470 /**
471  * Information about a peer that we have forwarded this
472  * request to already.  
473  */
474 struct UsedTargetEntry
475 {
476   /**
477    * What was the last time we have transmitted this request to this
478    * peer?
479    */
480   struct GNUNET_TIME_Absolute last_request_time;
481
482   /**
483    * How often have we transmitted this request to this peer?
484    */
485   unsigned int num_requests;
486
487   /**
488    * PID of the target peer.
489    */
490   GNUNET_PEER_Id pid;
491
492 };
493
494
495
496
497
498 /**
499  * Doubly-linked list of messages we are performing
500  * due to a pending request.
501  */
502 struct PendingMessageList
503 {
504
505   /**
506    * This is a doubly-linked list of messages on behalf of the same request.
507    */
508   struct PendingMessageList *next;
509
510   /**
511    * This is a doubly-linked list of messages on behalf of the same request.
512    */
513   struct PendingMessageList *prev;
514
515   /**
516    * Message this entry represents.
517    */
518   struct PendingMessage *pm;
519
520   /**
521    * Request this entry belongs to.
522    */
523   struct PendingRequest *req;
524
525   /**
526    * Peer this message is targeted for.
527    */
528   struct ConnectedPeer *target;
529
530 };
531
532
533 /**
534  * Information we keep for each pending request.  We should try to
535  * keep this struct as small as possible since its memory consumption
536  * is key to how many requests we can have pending at once.
537  */
538 struct PendingRequest
539 {
540
541   /**
542    * If this request was made by a client, this is our entry in the
543    * client request list; otherwise NULL.
544    */
545   struct ClientRequestList *client_request_list;
546
547   /**
548    * Entry of peer responsible for this entry (if this request
549    * was made by a peer).
550    */
551   struct ConnectedPeer *cp;
552
553   /**
554    * If this is a namespace query, pointer to the hash of the public
555    * key of the namespace; otherwise NULL.  Pointer will be to the 
556    * end of this struct (so no need to free it).
557    */
558   const GNUNET_HashCode *namespace;
559
560   /**
561    * Bloomfilter we use to filter out replies that we don't care about
562    * (anymore).  NULL as long as we are interested in all replies.
563    */
564   struct GNUNET_CONTAINER_BloomFilter *bf;
565
566   /**
567    * Reference to DHT get operation for this request (or NULL).
568    */
569   struct GNUNET_DHT_GetHandle *dht_get;
570
571   /**
572    * Context of our GNUNET_CORE_peer_change_preference call.
573    */
574   struct ConnectedPeer *pirc;
575
576   /**
577    * Hash code of all replies that we have seen so far (only valid
578    * if client is not NULL since we only track replies like this for
579    * our own clients).
580    */
581   GNUNET_HashCode *replies_seen;
582
583   /**
584    * Node in the heap representing this entry; NULL
585    * if we have no heap node.
586    */
587   struct GNUNET_CONTAINER_HeapNode *hnode;
588
589   /**
590    * Head of list of messages being performed on behalf of this
591    * request.
592    */
593   struct PendingMessageList *pending_head;
594
595   /**
596    * Tail of list of messages being performed on behalf of this
597    * request.
598    */
599   struct PendingMessageList *pending_tail;
600
601   /**
602    * When did we first see this request (form this peer), or, if our
603    * client is initiating, when did we last initiate a search?
604    */
605   struct GNUNET_TIME_Absolute start_time;
606
607   /**
608    * The query that this request is for.
609    */
610   GNUNET_HashCode query;
611
612   /**
613    * The task responsible for transmitting queries
614    * for this request.
615    */
616   GNUNET_SCHEDULER_TaskIdentifier task;
617
618   /**
619    * (Interned) Peer identifier that identifies a preferred target
620    * for requests.
621    */
622   GNUNET_PEER_Id target_pid;
623
624   /**
625    * (Interned) Peer identifiers of peers that have already
626    * received our query for this content.
627    */
628   struct UsedTargetEntry *used_targets;
629   
630   /**
631    * Our entry in the queue (non-NULL while we wait for our
632    * turn to interact with the local database).
633    */
634   struct GNUNET_DATASTORE_QueueEntry *qe;
635
636   /**
637    * Size of the 'bf' (in bytes).
638    */
639   size_t bf_size;
640
641   /**
642    * Desired anonymity level; only valid for requests from a local client.
643    */
644   uint32_t anonymity_level;
645
646   /**
647    * How many entries in "used_targets" are actually valid?
648    */
649   unsigned int used_targets_off;
650
651   /**
652    * How long is the "used_targets" array?
653    */
654   unsigned int used_targets_size;
655
656   /**
657    * Number of results found for this request.
658    */
659   unsigned int results_found;
660
661   /**
662    * How many entries in "replies_seen" are actually valid?
663    */
664   unsigned int replies_seen_off;
665
666   /**
667    * How long is the "replies_seen" array?
668    */
669   unsigned int replies_seen_size;
670   
671   /**
672    * Priority with which this request was made.  If one of our clients
673    * made the request, then this is the current priority that we are
674    * using when initiating the request.  This value is used when
675    * we decide to reward other peers with trust for providing a reply.
676    */
677   uint32_t priority;
678
679   /**
680    * Priority points left for us to spend when forwarding this request
681    * to other peers.
682    */
683   uint32_t remaining_priority;
684
685   /**
686    * Number to mingle hashes for bloom-filter tests with.
687    */
688   int32_t mingle;
689
690   /**
691    * TTL with which we saw this request (or, if we initiated, TTL that
692    * we used for the request).
693    */
694   int32_t ttl;
695   
696   /**
697    * Type of the content that this request is for.
698    */
699   enum GNUNET_BLOCK_Type type;
700
701   /**
702    * Remove this request after transmission of the current response.
703    */
704   int8_t do_remove;
705
706   /**
707    * GNUNET_YES if we should not forward this request to other peers.
708    */
709   int8_t local_only;
710
711   /**
712    * GNUNET_YES if we should not forward this request to other peers.
713    */
714   int8_t forward_only;
715
716 };
717
718
719 /**
720  * Block that is ready for migration to other peers.  Actual data is at the end of the block.
721  */
722 struct MigrationReadyBlock
723 {
724
725   /**
726    * This is a doubly-linked list.
727    */
728   struct MigrationReadyBlock *next;
729
730   /**
731    * This is a doubly-linked list.
732    */
733   struct MigrationReadyBlock *prev;
734
735   /**
736    * Query for the block.
737    */
738   GNUNET_HashCode query;
739
740   /**
741    * When does this block expire? 
742    */
743   struct GNUNET_TIME_Absolute expiration;
744
745   /**
746    * Peers we would consider forwarding this
747    * block to.  Zero for empty entries.
748    */
749   GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
750
751   /**
752    * Size of the block.
753    */
754   size_t size;
755
756   /**
757    *  Number of targets already used.
758    */
759   unsigned int used_targets;
760
761   /**
762    * Type of the block.
763    */
764   enum GNUNET_BLOCK_Type type;
765 };
766
767
768 /**
769  * Our connection to the datastore.
770  */
771 static struct GNUNET_DATASTORE_Handle *dsh;
772
773 /**
774  * Our block context.
775  */
776 static struct GNUNET_BLOCK_Context *block_ctx;
777
778 /**
779  * Our block configuration.
780  */
781 static struct GNUNET_CONFIGURATION_Handle *block_cfg;
782
783 /**
784  * Our configuration.
785  */
786 static const struct GNUNET_CONFIGURATION_Handle *cfg;
787
788 /**
789  * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
790  */
791 static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
792
793 /**
794  * Map of peer identifiers to "struct PendingRequest" (for that peer).
795  */
796 static struct GNUNET_CONTAINER_MultiHashMap *peer_request_map;
797
798 /**
799  * Map of query identifiers to "struct PendingRequest" (for that query).
800  */
801 static struct GNUNET_CONTAINER_MultiHashMap *query_request_map;
802
803 /**
804  * Heap with the request that will expire next at the top.  Contains
805  * pointers of type "struct PendingRequest*"; these will *also* be
806  * aliased from the "requests_by_peer" data structures and the
807  * "requests_by_query" table.  Note that requests from our clients
808  * don't expire and are thus NOT in the "requests_by_expiration"
809  * (or the "requests_by_peer" tables).
810  */
811 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
812
813 /**
814  * Handle for reporting statistics.
815  */
816 static struct GNUNET_STATISTICS_Handle *stats;
817
818 /**
819  * Linked list of clients we are currently processing requests for.
820  */
821 static struct ClientList *client_list;
822
823 /**
824  * Pointer to handle to the core service (points to NULL until we've
825  * connected to it).
826  */
827 static struct GNUNET_CORE_Handle *core;
828
829 /**
830  * Head of linked list of blocks that can be migrated.
831  */
832 static struct MigrationReadyBlock *mig_head;
833
834 /**
835  * Tail of linked list of blocks that can be migrated.
836  */
837 static struct MigrationReadyBlock *mig_tail;
838
839 /**
840  * Request to datastore for migration (or NULL).
841  */
842 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
843
844 /**
845  * Request to datastore for DHT PUTs (or NULL).
846  */
847 static struct GNUNET_DATASTORE_QueueEntry *dht_qe;
848
849 /**
850  * Type we will request for the next DHT PUT round from the datastore.
851  */
852 static enum GNUNET_BLOCK_Type dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
853
854 /**
855  * Where do we store trust information?
856  */
857 static char *trustDirectory;
858
859 /**
860  * ID of task that collects blocks for migration.
861  */
862 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
863
864 /**
865  * ID of task that collects blocks for DHT PUTs.
866  */
867 static GNUNET_SCHEDULER_TaskIdentifier dht_task;
868
869 /**
870  * What is the maximum frequency at which we are allowed to
871  * poll the datastore for migration content?
872  */
873 static struct GNUNET_TIME_Relative min_migration_delay;
874
875 /**
876  * Handle for DHT operations.
877  */
878 static struct GNUNET_DHT_Handle *dht_handle;
879
880 /**
881  * Size of the doubly-linked list of migration blocks.
882  */
883 static unsigned int mig_size;
884
885 /**
886  * Are we allowed to migrate content to this peer.
887  */
888 static int active_migration;
889
890 /**
891  * How many entires with zero anonymity do we currently estimate
892  * to have in the database?
893  */
894 static unsigned int zero_anonymity_count_estimate;
895
896 /**
897  * Typical priorities we're seeing from other peers right now.  Since
898  * most priorities will be zero, this value is the weighted average of
899  * non-zero priorities seen "recently".  In order to ensure that new
900  * values do not dramatically change the ratio, values are first
901  * "capped" to a reasonable range (+N of the current value) and then
902  * averaged into the existing value by a ratio of 1:N.  Hence
903  * receiving the largest possible priority can still only raise our
904  * "current_priorities" by at most 1.
905  */
906 static double current_priorities;
907
908 /**
909  * Datastore 'GET' load tracking.
910  */
911 static struct GNUNET_LOAD_Value *datastore_get_load;
912
913 /**
914  * Datastore 'PUT' load tracking.
915  */
916 static struct GNUNET_LOAD_Value *datastore_put_load;
917
918 /**
919  * How long do requests typically stay in the routing table?
920  */
921 static struct GNUNET_LOAD_Value *rt_entry_lifetime;
922
923 /**
924  * We've just now completed a datastore request.  Update our
925  * datastore load calculations.
926  *
927  * @param start time when the datastore request was issued
928  */
929 static void
930 update_datastore_delays (struct GNUNET_TIME_Absolute start)
931 {
932   struct GNUNET_TIME_Relative delay;
933
934   delay = GNUNET_TIME_absolute_get_duration (start);
935   GNUNET_LOAD_update (datastore_get_load,
936                       delay.rel_value);
937 }
938
939
940 /**
941  * Get the filename under which we would store the GNUNET_HELLO_Message
942  * for the given host and protocol.
943  * @return filename of the form DIRECTORY/HOSTID
944  */
945 static char *
946 get_trust_filename (const struct GNUNET_PeerIdentity *id)
947 {
948   struct GNUNET_CRYPTO_HashAsciiEncoded fil;
949   char *fn;
950
951   GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
952   GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil);
953   return fn;
954 }
955
956
957
958 /**
959  * Transmit messages by copying it to the target buffer
960  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
961  * for writing in the meantime.  In that case, do nothing
962  * (the disconnect or shutdown handler will take care of the rest).
963  * If we were able to transmit messages and there are still more
964  * pending, ask core again for further calls to this function.
965  *
966  * @param cls closure, pointer to the 'struct ConnectedPeer*'
967  * @param size number of bytes available in buf
968  * @param buf where the callee should write the message
969  * @return number of bytes written to buf
970  */
971 static size_t
972 transmit_to_peer (void *cls,
973                   size_t size, void *buf);
974
975
976 /* ******************* clean up functions ************************ */
977
978 /**
979  * Delete the given migration block.
980  *
981  * @param mb block to delete
982  */
983 static void
984 delete_migration_block (struct MigrationReadyBlock *mb)
985 {
986   GNUNET_CONTAINER_DLL_remove (mig_head,
987                                mig_tail,
988                                mb);
989   GNUNET_PEER_decrement_rcs (mb->target_list,
990                              MIGRATION_LIST_SIZE);
991   mig_size--;
992   GNUNET_free (mb);
993 }
994
995
996 /**
997  * Compare the distance of two peers to a key.
998  *
999  * @param key key
1000  * @param p1 first peer
1001  * @param p2 second peer
1002  * @return GNUNET_YES if P1 is closer to key than P2
1003  */
1004 static int
1005 is_closer (const GNUNET_HashCode *key,
1006            const struct GNUNET_PeerIdentity *p1,
1007            const struct GNUNET_PeerIdentity *p2)
1008 {
1009   return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey,
1010                                     &p2->hashPubKey,
1011                                     key);
1012 }
1013
1014
1015 /**
1016  * Consider migrating content to a given peer.
1017  *
1018  * @param cls 'struct MigrationReadyBlock*' to select
1019  *            targets for (or NULL for none)
1020  * @param key ID of the peer 
1021  * @param value 'struct ConnectedPeer' of the peer
1022  * @return GNUNET_YES (always continue iteration)
1023  */
1024 static int
1025 consider_migration (void *cls,
1026                     const GNUNET_HashCode *key,
1027                     void *value)
1028 {
1029   struct MigrationReadyBlock *mb = cls;
1030   struct ConnectedPeer *cp = value;
1031   struct MigrationReadyBlock *pos;
1032   struct GNUNET_PeerIdentity cppid;
1033   struct GNUNET_PeerIdentity otherpid;
1034   struct GNUNET_PeerIdentity worstpid;
1035   size_t msize;
1036   unsigned int i;
1037   unsigned int repl;
1038   
1039   /* consider 'cp' as a migration target for mb */
1040   if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).rel_value > 0)
1041     return GNUNET_YES; /* peer has requested no migration! */
1042   if (mb != NULL)
1043     {
1044       GNUNET_PEER_resolve (cp->pid,
1045                            &cppid);
1046       repl = MIGRATION_LIST_SIZE;
1047       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1048         {
1049           if (mb->target_list[i] == 0)
1050             {
1051               mb->target_list[i] = cp->pid;
1052               GNUNET_PEER_change_rc (mb->target_list[i], 1);
1053               repl = MIGRATION_LIST_SIZE;
1054               break;
1055             }
1056           GNUNET_PEER_resolve (mb->target_list[i],
1057                                &otherpid);
1058           if ( (repl == MIGRATION_LIST_SIZE) &&
1059                is_closer (&mb->query,
1060                           &cppid,
1061                           &otherpid)) 
1062             {
1063               repl = i;
1064               worstpid = otherpid;
1065             }
1066           else if ( (repl != MIGRATION_LIST_SIZE) &&
1067                     (is_closer (&mb->query,
1068                                 &worstpid,
1069                                 &otherpid) ) )
1070             {
1071               repl = i;
1072               worstpid = otherpid;
1073             }       
1074         }
1075       if (repl != MIGRATION_LIST_SIZE) 
1076         {
1077           GNUNET_PEER_change_rc (mb->target_list[repl], -1);
1078           mb->target_list[repl] = cp->pid;
1079           GNUNET_PEER_change_rc (mb->target_list[repl], 1);
1080         }
1081     }
1082
1083   /* consider scheduling transmission to cp for content migration */
1084   if (cp->cth != NULL)        
1085     return GNUNET_YES; 
1086   msize = 0;
1087   pos = mig_head;
1088   while (pos != NULL)
1089     {
1090       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1091         {
1092           if (cp->pid == pos->target_list[i])
1093             {
1094               if (msize == 0)
1095                 msize = pos->size;
1096               else
1097                 msize = GNUNET_MIN (msize,
1098                                     pos->size);
1099               break;
1100             }
1101         }
1102       pos = pos->next;
1103     }
1104   if (msize == 0)
1105     return GNUNET_YES; /* no content available */
1106 #if DEBUG_FS
1107   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1108               "Trying to migrate at least %u bytes to peer `%s'\n",
1109               msize,
1110               GNUNET_h2s (key));
1111 #endif
1112   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
1113     {
1114       GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task);
1115       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
1116     }
1117   cp->cth 
1118     = GNUNET_CORE_notify_transmit_ready (core,
1119                                          0, GNUNET_TIME_UNIT_FOREVER_REL,
1120                                          (const struct GNUNET_PeerIdentity*) key,
1121                                          msize + sizeof (struct PutMessage),
1122                                          &transmit_to_peer,
1123                                          cp);
1124   return GNUNET_YES;
1125 }
1126
1127
1128 /**
1129  * Task that is run periodically to obtain blocks for content
1130  * migration
1131  * 
1132  * @param cls unused
1133  * @param tc scheduler context (also unused)
1134  */
1135 static void
1136 gather_migration_blocks (void *cls,
1137                          const struct GNUNET_SCHEDULER_TaskContext *tc);
1138
1139
1140
1141
1142 /**
1143  * Task that is run periodically to obtain blocks for DHT PUTs.
1144  * 
1145  * @param cls type of blocks to gather
1146  * @param tc scheduler context (unused)
1147  */
1148 static void
1149 gather_dht_put_blocks (void *cls,
1150                        const struct GNUNET_SCHEDULER_TaskContext *tc);
1151
1152
1153 /**
1154  * If the migration task is not currently running, consider
1155  * (re)scheduling it with the appropriate delay.
1156  */
1157 static void
1158 consider_migration_gathering ()
1159 {
1160   struct GNUNET_TIME_Relative delay;
1161
1162   if (dsh == NULL)
1163     return;
1164   if (mig_qe != NULL)
1165     return;
1166   if (mig_task != GNUNET_SCHEDULER_NO_TASK)
1167     return;
1168   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
1169                                          mig_size);
1170   delay = GNUNET_TIME_relative_divide (delay,
1171                                        MAX_MIGRATION_QUEUE);
1172   delay = GNUNET_TIME_relative_max (delay,
1173                                     min_migration_delay);
1174   mig_task = GNUNET_SCHEDULER_add_delayed (delay,
1175                                            &gather_migration_blocks,
1176                                            NULL);
1177 }
1178
1179
1180 /**
1181  * If the DHT PUT gathering task is not currently running, consider
1182  * (re)scheduling it with the appropriate delay.
1183  */
1184 static void
1185 consider_dht_put_gathering (void *cls)
1186 {
1187   struct GNUNET_TIME_Relative delay;
1188
1189   if (dsh == NULL)
1190     return;
1191   if (dht_qe != NULL)
1192     return;
1193   if (dht_task != GNUNET_SCHEDULER_NO_TASK)
1194     return;
1195   if (zero_anonymity_count_estimate > 0)
1196     {
1197       delay = GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY,
1198                                            zero_anonymity_count_estimate);
1199       delay = GNUNET_TIME_relative_min (delay,
1200                                         MAX_DHT_PUT_FREQ);
1201     }
1202   else
1203     {
1204       /* if we have NO zero-anonymity content yet, wait 5 minutes for some to
1205          (hopefully) appear */
1206       delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5);
1207     }
1208   dht_task = GNUNET_SCHEDULER_add_delayed (delay,
1209                                            &gather_dht_put_blocks,
1210                                            cls);
1211 }
1212
1213
1214 /**
1215  * Process content offered for migration.
1216  *
1217  * @param cls closure
1218  * @param key key for the content
1219  * @param size number of bytes in data
1220  * @param data content stored
1221  * @param type type of the content
1222  * @param priority priority of the content
1223  * @param anonymity anonymity-level for the content
1224  * @param expiration expiration time for the content
1225  * @param uid unique identifier for the datum;
1226  *        maybe 0 if no unique identifier is available
1227  */
1228 static void
1229 process_migration_content (void *cls,
1230                            const GNUNET_HashCode * key,
1231                            size_t size,
1232                            const void *data,
1233                            enum GNUNET_BLOCK_Type type,
1234                            uint32_t priority,
1235                            uint32_t anonymity,
1236                            struct GNUNET_TIME_Absolute
1237                            expiration, uint64_t uid)
1238 {
1239   struct MigrationReadyBlock *mb;
1240   
1241   if (key == NULL)
1242     {
1243       mig_qe = NULL;
1244       if (mig_size < MAX_MIGRATION_QUEUE)  
1245         consider_migration_gathering ();
1246       return;
1247     }
1248   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1249     {
1250       if (GNUNET_OK !=
1251           GNUNET_FS_handle_on_demand_block (key, size, data,
1252                                             type, priority, anonymity,
1253                                             expiration, uid, 
1254                                             &process_migration_content,
1255                                             NULL))
1256         {
1257           GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1258         }
1259       return;
1260     }
1261 #if DEBUG_FS
1262   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1263               "Retrieved block `%s' of type %u for migration\n",
1264               GNUNET_h2s (key),
1265               type);
1266 #endif
1267   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
1268   mb->query = *key;
1269   mb->expiration = expiration;
1270   mb->size = size;
1271   mb->type = type;
1272   memcpy (&mb[1], data, size);
1273   GNUNET_CONTAINER_DLL_insert_after (mig_head,
1274                                      mig_tail,
1275                                      mig_tail,
1276                                      mb);
1277   mig_size++;
1278   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1279                                          &consider_migration,
1280                                          mb);
1281   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1282 }
1283
1284
1285 /**
1286  * Function called upon completion of the DHT PUT operation.
1287  */
1288 static void
1289 dht_put_continuation (void *cls,
1290                       const struct GNUNET_SCHEDULER_TaskContext *tc)
1291 {
1292   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1293 }
1294
1295
1296 /**
1297  * Store content in DHT.
1298  *
1299  * @param cls closure
1300  * @param key key for the content
1301  * @param size number of bytes in data
1302  * @param data content stored
1303  * @param type type of the content
1304  * @param priority priority of the content
1305  * @param anonymity anonymity-level for the content
1306  * @param expiration expiration time for the content
1307  * @param uid unique identifier for the datum;
1308  *        maybe 0 if no unique identifier is available
1309  */
1310 static void
1311 process_dht_put_content (void *cls,
1312                          const GNUNET_HashCode * key,
1313                          size_t size,
1314                          const void *data,
1315                          enum GNUNET_BLOCK_Type type,
1316                          uint32_t priority,
1317                          uint32_t anonymity,
1318                          struct GNUNET_TIME_Absolute
1319                          expiration, uint64_t uid)
1320
1321   static unsigned int counter;
1322   static GNUNET_HashCode last_vhash;
1323   static GNUNET_HashCode vhash;
1324
1325   if (key == NULL)
1326     {
1327       dht_qe = NULL;
1328       consider_dht_put_gathering (cls);
1329       return;
1330     }
1331   /* slightly funky code to estimate the total number of values with zero
1332      anonymity from the maximum observed length of a monotonically increasing 
1333      sequence of hashes over the contents */
1334   GNUNET_CRYPTO_hash (data, size, &vhash);
1335   if (GNUNET_CRYPTO_hash_cmp (&vhash, &last_vhash) <= 0)
1336     {
1337       if (zero_anonymity_count_estimate > 0)
1338         zero_anonymity_count_estimate /= 2;
1339       counter = 0;
1340     }
1341   last_vhash = vhash;
1342   if (counter < 31)
1343     counter++;
1344   if (zero_anonymity_count_estimate < (1 << counter))
1345     zero_anonymity_count_estimate = (1 << counter);
1346 #if DEBUG_FS
1347   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1348               "Retrieved block `%s' of type %u for DHT PUT\n",
1349               GNUNET_h2s (key),
1350               type);
1351 #endif
1352   GNUNET_DHT_put (dht_handle,
1353                   key,
1354                   DEFAULT_PUT_REPLICATION,
1355                   GNUNET_DHT_RO_NONE,
1356                   type,
1357                   size,
1358                   data,
1359                   expiration,
1360                   GNUNET_TIME_UNIT_FOREVER_REL,
1361                   &dht_put_continuation,
1362                   cls);
1363 }
1364
1365
1366 /**
1367  * Task that is run periodically to obtain blocks for content
1368  * migration
1369  * 
1370  * @param cls unused
1371  * @param tc scheduler context (also unused)
1372  */
1373 static void
1374 gather_migration_blocks (void *cls,
1375                          const struct GNUNET_SCHEDULER_TaskContext *tc)
1376 {
1377   mig_task = GNUNET_SCHEDULER_NO_TASK;
1378   if (dsh != NULL)
1379     {
1380       mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, UINT_MAX,
1381                                             GNUNET_TIME_UNIT_FOREVER_REL,
1382                                             &process_migration_content, NULL);
1383       GNUNET_assert (mig_qe != NULL);
1384     }
1385 }
1386
1387
1388 /**
1389  * Task that is run periodically to obtain blocks for DHT PUTs.
1390  * 
1391  * @param cls type of blocks to gather
1392  * @param tc scheduler context (unused)
1393  */
1394 static void
1395 gather_dht_put_blocks (void *cls,
1396                        const struct GNUNET_SCHEDULER_TaskContext *tc)
1397 {
1398   dht_task = GNUNET_SCHEDULER_NO_TASK;
1399   if (dsh != NULL)
1400     {
1401       if (dht_put_type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1402         dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
1403       dht_qe = GNUNET_DATASTORE_get_zero_anonymity (dsh, 0, UINT_MAX,
1404                                                     GNUNET_TIME_UNIT_FOREVER_REL,
1405                                                     dht_put_type++,
1406                                                     &process_dht_put_content, NULL);
1407       GNUNET_assert (dht_qe != NULL);
1408     }
1409 }
1410
1411
1412 /**
1413  * We're done with a particular message list entry.
1414  * Free all associated resources.
1415  * 
1416  * @param pml entry to destroy
1417  */
1418 static void
1419 destroy_pending_message_list_entry (struct PendingMessageList *pml)
1420 {
1421   GNUNET_CONTAINER_DLL_remove (pml->req->pending_head,
1422                                pml->req->pending_tail,
1423                                pml);
1424   GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head,
1425                                pml->target->pending_messages_tail,
1426                                pml->pm);
1427   pml->target->pending_requests--;
1428   GNUNET_free (pml->pm);
1429   GNUNET_free (pml);
1430 }
1431
1432
1433 /**
1434  * Destroy the given pending message (and call the respective
1435  * continuation).
1436  *
1437  * @param pm message to destroy
1438  * @param tpid id of peer that the message was delivered to, or 0 for none
1439  */
1440 static void
1441 destroy_pending_message (struct PendingMessage *pm,
1442                          GNUNET_PEER_Id tpid)
1443 {
1444   struct PendingMessageList *pml = pm->pml;
1445   TransmissionContinuation cont;
1446   void *cont_cls;
1447
1448   cont = pm->cont;
1449   cont_cls = pm->cont_cls;
1450   if (pml != NULL)
1451     {
1452       GNUNET_assert (pml->pm == pm);
1453       GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
1454       destroy_pending_message_list_entry (pml);
1455     }
1456   else
1457     {
1458       GNUNET_free (pm);
1459     }
1460   if (cont != NULL)
1461     cont (cont_cls, tpid);  
1462 }
1463
1464
1465 /**
1466  * We're done processing a particular request.
1467  * Free all associated resources.
1468  *
1469  * @param pr request to destroy
1470  */
1471 static void
1472 destroy_pending_request (struct PendingRequest *pr)
1473 {
1474   struct GNUNET_PeerIdentity pid;
1475   unsigned int i;
1476
1477   if (pr->hnode != NULL)
1478     {
1479       GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
1480                                          pr->hnode);
1481       pr->hnode = NULL;
1482     }
1483   if (NULL == pr->client_request_list)
1484     {
1485       GNUNET_STATISTICS_update (stats,
1486                                 gettext_noop ("# P2P searches active"),
1487                                 -1,
1488                                 GNUNET_NO);
1489     }
1490   else
1491     {
1492       GNUNET_STATISTICS_update (stats,
1493                                 gettext_noop ("# client searches active"),
1494                                 -1,
1495                                 GNUNET_NO);
1496     }
1497   if (GNUNET_YES == 
1498       GNUNET_CONTAINER_multihashmap_remove (query_request_map,
1499                                             &pr->query,
1500                                             pr))
1501     {
1502       GNUNET_LOAD_update (rt_entry_lifetime,
1503                           GNUNET_TIME_absolute_get_duration (pr->start_time).rel_value);
1504     }
1505   if (pr->qe != NULL)
1506      {
1507       GNUNET_DATASTORE_cancel (pr->qe);
1508       pr->qe = NULL;
1509     }
1510   if (pr->dht_get != NULL)
1511     {
1512       GNUNET_DHT_get_stop (pr->dht_get);
1513       pr->dht_get = NULL;
1514     }
1515   if (pr->client_request_list != NULL)
1516     {
1517       GNUNET_CONTAINER_DLL_remove (pr->client_request_list->client_list->rl_head,
1518                                    pr->client_request_list->client_list->rl_tail,
1519                                    pr->client_request_list);
1520       GNUNET_free (pr->client_request_list);
1521       pr->client_request_list = NULL;
1522     }
1523   if (pr->cp != NULL)
1524     {
1525       GNUNET_PEER_resolve (pr->cp->pid,
1526                            &pid);
1527       (void) GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1528                                                    &pid.hashPubKey,
1529                                                    pr);
1530       pr->cp = NULL;
1531     }
1532   if (pr->bf != NULL)
1533     {
1534       GNUNET_CONTAINER_bloomfilter_free (pr->bf);                                        
1535       pr->bf = NULL;
1536     }
1537   if (pr->pirc != NULL)
1538     {
1539       GNUNET_CORE_peer_change_preference_cancel (pr->pirc->irc);
1540       pr->pirc->irc = NULL;
1541       pr->pirc = NULL;
1542     }
1543   if (pr->replies_seen != NULL)
1544     {
1545       GNUNET_free (pr->replies_seen);
1546       pr->replies_seen = NULL;
1547     }
1548   if (pr->task != GNUNET_SCHEDULER_NO_TASK)
1549     {
1550       GNUNET_SCHEDULER_cancel (pr->task);
1551       pr->task = GNUNET_SCHEDULER_NO_TASK;
1552     }
1553   while (NULL != pr->pending_head)    
1554     destroy_pending_message_list_entry (pr->pending_head);
1555   GNUNET_PEER_change_rc (pr->target_pid, -1);
1556   if (pr->used_targets != NULL)
1557     {
1558       for (i=0;i<pr->used_targets_off;i++)
1559         GNUNET_PEER_change_rc (pr->used_targets[i].pid, -1);
1560       GNUNET_free (pr->used_targets);
1561       pr->used_targets_off = 0;
1562       pr->used_targets_size = 0;
1563       pr->used_targets = NULL;
1564     }
1565   GNUNET_free (pr);
1566 }
1567
1568
1569 /**
1570  * Find latency information in 'atsi'.
1571  *
1572  * @param atsi performance data
1573  * @return connection latency
1574  */
1575 static struct GNUNET_TIME_Relative
1576 get_latency (const struct GNUNET_TRANSPORT_ATS_Information *atsi)
1577 {
1578   /* FIXME: extract latency data from 'atsi' */
1579   return GNUNET_TIME_UNIT_SECONDS;
1580 }
1581
1582
1583 /**
1584  * Method called whenever a given peer connects.
1585  *
1586  * @param cls closure, not used
1587  * @param peer peer identity this notification is about
1588  * @param atsi performance information
1589  */
1590 static void 
1591 peer_connect_handler (void *cls,
1592                       const struct
1593                       GNUNET_PeerIdentity * peer,
1594                       const struct GNUNET_TRANSPORT_ATS_Information *atsi)
1595 {
1596   struct ConnectedPeer *cp;
1597   struct MigrationReadyBlock *pos;
1598   char *fn;
1599   uint32_t trust;
1600   struct GNUNET_TIME_Relative latency;
1601
1602   latency = get_latency (atsi);
1603   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1604                                           &peer->hashPubKey);
1605   if (NULL != cp)
1606     {
1607       GNUNET_break (0);
1608       return;
1609     }
1610   cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
1611   cp->transmission_delay = GNUNET_LOAD_value_init (latency);
1612   cp->pid = GNUNET_PEER_intern (peer);
1613
1614   fn = get_trust_filename (peer);
1615   if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
1616       (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
1617     cp->disk_trust = cp->trust = ntohl (trust);
1618   GNUNET_free (fn);
1619
1620   GNUNET_break (GNUNET_OK ==
1621                 GNUNET_CONTAINER_multihashmap_put (connected_peers,
1622                                                    &peer->hashPubKey,
1623                                                    cp,
1624                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1625
1626   pos = mig_head;
1627   while (NULL != pos)
1628     {
1629       (void) consider_migration (pos, &peer->hashPubKey, cp);
1630       pos = pos->next;
1631     }
1632 }
1633
1634
1635 /**
1636  * Method called whenever a given peer has a status change.
1637  *
1638  * @param cls closure
1639  * @param peer peer identity this notification is about
1640  * @param bandwidth_in available amount of inbound bandwidth
1641  * @param bandwidth_out available amount of outbound bandwidth
1642  * @param timeout absolute time when this peer will time out
1643  *        unless we see some further activity from it
1644  * @param atsi status information
1645  */
1646 static void
1647 peer_status_handler (void *cls,
1648                      const struct
1649                      GNUNET_PeerIdentity * peer,
1650                      struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in,
1651                      struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
1652                      struct GNUNET_TIME_Absolute timeout,
1653                      const struct GNUNET_TRANSPORT_ATS_Information *atsi)
1654 {
1655   struct ConnectedPeer *cp;
1656   struct GNUNET_TIME_Relative latency;
1657
1658   latency = get_latency (atsi);
1659   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1660                                           &peer->hashPubKey);
1661   if (cp == NULL)
1662     {
1663       GNUNET_break (0);
1664       return;
1665     }
1666   GNUNET_LOAD_value_set_decline (cp->transmission_delay,
1667                                  latency);  
1668 }
1669
1670
1671
1672 /**
1673  * Increase the host credit by a value.
1674  *
1675  * @param host which peer to change the trust value on
1676  * @param value is the int value by which the
1677  *  host credit is to be increased or decreased
1678  * @returns the actual change in trust (positive or negative)
1679  */
1680 static int
1681 change_host_trust (struct ConnectedPeer *host, int value)
1682 {
1683   if (value == 0)
1684     return 0;
1685   GNUNET_assert (host != NULL);
1686   if (value > 0)
1687     {
1688       if (host->trust + value < host->trust)
1689         {
1690           value = UINT32_MAX - host->trust;
1691           host->trust = UINT32_MAX;
1692         }
1693       else
1694         host->trust += value;
1695     }
1696   else
1697     {
1698       if (host->trust < -value)
1699         {
1700           value = -host->trust;
1701           host->trust = 0;
1702         }
1703       else
1704         host->trust += value;
1705     }
1706   return value;
1707 }
1708
1709
1710 /**
1711  * Write host-trust information to a file - flush the buffer entry!
1712  */
1713 static int
1714 flush_trust (void *cls,
1715              const GNUNET_HashCode *key,
1716              void *value)
1717 {
1718   struct ConnectedPeer *host = value;
1719   char *fn;
1720   uint32_t trust;
1721   struct GNUNET_PeerIdentity pid;
1722
1723   if (host->trust == host->disk_trust)
1724     return GNUNET_OK;                     /* unchanged */
1725   GNUNET_PEER_resolve (host->pid,
1726                        &pid);
1727   fn = get_trust_filename (&pid);
1728   if (host->trust == 0)
1729     {
1730       if ((0 != UNLINK (fn)) && (errno != ENOENT))
1731         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
1732                                   GNUNET_ERROR_TYPE_BULK, "unlink", fn);
1733     }
1734   else
1735     {
1736       trust = htonl (host->trust);
1737       if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust, 
1738                                                     sizeof(uint32_t),
1739                                                     GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE
1740                                                     | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ))
1741         host->disk_trust = host->trust;
1742     }
1743   GNUNET_free (fn);
1744   return GNUNET_OK;
1745 }
1746
1747 /**
1748  * Call this method periodically to scan data/hosts for new hosts.
1749  */
1750 static void
1751 cron_flush_trust (void *cls,
1752                   const struct GNUNET_SCHEDULER_TaskContext *tc)
1753 {
1754
1755   if (NULL == connected_peers)
1756     return;
1757   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1758                                          &flush_trust,
1759                                          NULL);
1760   if (NULL == tc)
1761     return;
1762   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1763     return;
1764   GNUNET_SCHEDULER_add_delayed (TRUST_FLUSH_FREQ, &cron_flush_trust, NULL);
1765 }
1766
1767
1768 /**
1769  * Free (each) request made by the peer.
1770  *
1771  * @param cls closure, points to peer that the request belongs to
1772  * @param key current key code
1773  * @param value value in the hash map
1774  * @return GNUNET_YES (we should continue to iterate)
1775  */
1776 static int
1777 destroy_request (void *cls,
1778                  const GNUNET_HashCode * key,
1779                  void *value)
1780 {
1781   const struct GNUNET_PeerIdentity * peer = cls;
1782   struct PendingRequest *pr = value;
1783   
1784   GNUNET_break (GNUNET_YES ==
1785                 GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1786                                                       &peer->hashPubKey,
1787                                                       pr));
1788   destroy_pending_request (pr);
1789   return GNUNET_YES;
1790 }
1791
1792
1793 /**
1794  * Method called whenever a peer disconnects.
1795  *
1796  * @param cls closure, not used
1797  * @param peer peer identity this notification is about
1798  */
1799 static void
1800 peer_disconnect_handler (void *cls,
1801                          const struct
1802                          GNUNET_PeerIdentity * peer)
1803 {
1804   struct ConnectedPeer *cp;
1805   struct PendingMessage *pm;
1806   unsigned int i;
1807   struct MigrationReadyBlock *pos;
1808   struct MigrationReadyBlock *next;
1809
1810   GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map,
1811                                               &peer->hashPubKey,
1812                                               &destroy_request,
1813                                               (void*) peer);
1814   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1815                                           &peer->hashPubKey);
1816   if (cp == NULL)
1817     return;
1818   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1819     {
1820       if (NULL != cp->last_client_replies[i])
1821         {
1822           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1823           cp->last_client_replies[i] = NULL;
1824         }
1825     }
1826   GNUNET_break (GNUNET_YES ==
1827                 GNUNET_CONTAINER_multihashmap_remove (connected_peers,
1828                                                       &peer->hashPubKey,
1829                                                       cp));
1830   if (cp->irc != NULL)
1831     {
1832       GNUNET_CORE_peer_change_preference_cancel (cp->irc);
1833       cp->irc = NULL;
1834       cp->pr->pirc = NULL;
1835       cp->pr = NULL;
1836     }
1837
1838   /* remove this peer from migration considerations; schedule
1839      alternatives */
1840   next = mig_head;
1841   while (NULL != (pos = next))
1842     {
1843       next = pos->next;
1844       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1845         {
1846           if (pos->target_list[i] == cp->pid)
1847             {
1848               GNUNET_PEER_change_rc (pos->target_list[i], -1);
1849               pos->target_list[i] = 0;
1850             }
1851          }
1852       if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers))
1853         {
1854           delete_migration_block (pos);
1855           consider_migration_gathering ();
1856           continue;
1857         }
1858       GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1859                                              &consider_migration,
1860                                              pos);
1861     }
1862   GNUNET_PEER_change_rc (cp->pid, -1);
1863   GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1864   if (NULL != cp->cth)
1865     {
1866       GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1867       cp->cth = NULL;
1868     }
1869   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
1870     {
1871       GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task);
1872       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
1873     }
1874   while (NULL != (pm = cp->pending_messages_head))
1875     destroy_pending_message (pm, 0 /* delivery failed */);
1876   GNUNET_LOAD_value_free (cp->transmission_delay);
1877   GNUNET_break (0 == cp->pending_requests);
1878   GNUNET_free (cp);
1879 }
1880
1881
1882 /**
1883  * Iterator over hash map entries that removes all occurences
1884  * of the given 'client' from the 'last_client_replies' of the
1885  * given connected peer.
1886  *
1887  * @param cls closure, the 'struct GNUNET_SERVER_Client*' to remove
1888  * @param key current key code (unused)
1889  * @param value value in the hash map (the 'struct ConnectedPeer*' to change)
1890  * @return GNUNET_YES (we should continue to iterate)
1891  */
1892 static int
1893 remove_client_from_last_client_replies (void *cls,
1894                                         const GNUNET_HashCode * key,
1895                                         void *value)
1896 {
1897   struct GNUNET_SERVER_Client *client = cls;
1898   struct ConnectedPeer *cp = value;
1899   unsigned int i;
1900
1901   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1902     {
1903       if (cp->last_client_replies[i] == client)
1904         {
1905           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1906           cp->last_client_replies[i] = NULL;
1907         }
1908     }  
1909   return GNUNET_YES;
1910 }
1911
1912
1913 /**
1914  * A client disconnected.  Remove all of its pending queries.
1915  *
1916  * @param cls closure, NULL
1917  * @param client identification of the client
1918  */
1919 static void
1920 handle_client_disconnect (void *cls,
1921                           struct GNUNET_SERVER_Client
1922                           * client)
1923 {
1924   struct ClientList *pos;
1925   struct ClientList *prev;
1926   struct ClientRequestList *rcl;
1927   struct ClientResponseMessage *creply;
1928
1929   if (client == NULL)
1930     return;
1931   prev = NULL;
1932   pos = client_list;
1933   while ( (NULL != pos) &&
1934           (pos->client != client) )
1935     {
1936       prev = pos;
1937       pos = pos->next;
1938     }
1939   if (pos == NULL)
1940     return; /* no requests pending for this client */
1941   while (NULL != (rcl = pos->rl_head))
1942     {
1943       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1944                   "Destroying pending request `%s' on disconnect\n",
1945                   GNUNET_h2s (&rcl->req->query));
1946       destroy_pending_request (rcl->req);
1947     }
1948   if (prev == NULL)
1949     client_list = pos->next;
1950   else
1951     prev->next = pos->next;
1952   if (pos->th != NULL)
1953     {
1954       GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
1955       pos->th = NULL;
1956     }
1957   while (NULL != (creply = pos->res_head))
1958     {
1959       GNUNET_CONTAINER_DLL_remove (pos->res_head,
1960                                    pos->res_tail,
1961                                    creply);
1962       GNUNET_free (creply);
1963     }    
1964   GNUNET_SERVER_client_drop (pos->client);
1965   GNUNET_free (pos);
1966   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1967                                          &remove_client_from_last_client_replies,
1968                                          client);
1969 }
1970
1971
1972 /**
1973  * Iterator to free peer entries.
1974  *
1975  * @param cls closure, unused
1976  * @param key current key code
1977  * @param value value in the hash map (peer entry)
1978  * @return GNUNET_YES (we should continue to iterate)
1979  */
1980 static int 
1981 clean_peer (void *cls,
1982             const GNUNET_HashCode * key,
1983             void *value)
1984 {
1985   peer_disconnect_handler (NULL, (const struct GNUNET_PeerIdentity*) key);
1986   return GNUNET_YES;
1987 }
1988
1989
1990 /**
1991  * Task run during shutdown.
1992  *
1993  * @param cls unused
1994  * @param tc unused
1995  */
1996 static void
1997 shutdown_task (void *cls,
1998                const struct GNUNET_SCHEDULER_TaskContext *tc)
1999 {
2000   if (mig_qe != NULL)
2001     {
2002       GNUNET_DATASTORE_cancel (mig_qe);
2003       mig_qe = NULL;
2004     }
2005   if (dht_qe != NULL)
2006     {
2007       GNUNET_DATASTORE_cancel (dht_qe);
2008       dht_qe = NULL;
2009     }
2010   if (GNUNET_SCHEDULER_NO_TASK != mig_task)
2011     {
2012       GNUNET_SCHEDULER_cancel (mig_task);
2013       mig_task = GNUNET_SCHEDULER_NO_TASK;
2014     }
2015   if (GNUNET_SCHEDULER_NO_TASK != dht_task)
2016     {
2017       GNUNET_SCHEDULER_cancel (dht_task);
2018       dht_task = GNUNET_SCHEDULER_NO_TASK;
2019     }
2020   while (client_list != NULL)
2021     handle_client_disconnect (NULL,
2022                               client_list->client);
2023   cron_flush_trust (NULL, NULL);
2024   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
2025                                          &clean_peer,
2026                                          NULL);
2027   GNUNET_break (0 == GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap));
2028   GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
2029   requests_by_expiration_heap = 0;
2030   GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
2031   connected_peers = NULL;
2032   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map));
2033   GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
2034   query_request_map = NULL;
2035   GNUNET_LOAD_value_free (rt_entry_lifetime);
2036   rt_entry_lifetime = NULL;
2037   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map));
2038   GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
2039   peer_request_map = NULL;
2040   GNUNET_assert (NULL != core);
2041   GNUNET_CORE_disconnect (core);
2042   core = NULL;
2043   if (stats != NULL)
2044     {
2045       GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
2046       stats = NULL;
2047     }
2048   if (dsh != NULL)
2049     {
2050       GNUNET_DATASTORE_disconnect (dsh,
2051                                    GNUNET_NO);
2052       dsh = NULL;
2053     }
2054   while (mig_head != NULL)
2055     delete_migration_block (mig_head);
2056   GNUNET_assert (0 == mig_size);
2057   GNUNET_DHT_disconnect (dht_handle);
2058   dht_handle = NULL;
2059   GNUNET_LOAD_value_free (datastore_get_load);
2060   datastore_get_load = NULL;
2061   GNUNET_LOAD_value_free (datastore_put_load);
2062   datastore_put_load = NULL;
2063   GNUNET_BLOCK_context_destroy (block_ctx);
2064   block_ctx = NULL;
2065   GNUNET_CONFIGURATION_destroy (block_cfg);
2066   block_cfg = NULL;
2067   cfg = NULL;  
2068   GNUNET_free_non_null (trustDirectory);
2069   trustDirectory = NULL;
2070 }
2071
2072
2073 /* ******************* Utility functions  ******************** */
2074
2075
2076 /**
2077  * We've had to delay a request for transmission to core, but now
2078  * we should be ready.  Run it.
2079  *
2080  * @param cls the 'struct ConnectedPeer' for which a request was delayed
2081  * @param tc task context (unused)
2082  */
2083 static void
2084 delayed_transmission_request (void *cls,
2085                               const struct GNUNET_SCHEDULER_TaskContext *tc)
2086 {
2087   struct ConnectedPeer *cp = cls;
2088   struct GNUNET_PeerIdentity pid;
2089   struct PendingMessage *pm;
2090
2091   pm = cp->pending_messages_head;
2092   cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
2093   GNUNET_assert (cp->cth == NULL);
2094   if (pm == NULL)
2095     return;
2096   GNUNET_PEER_resolve (cp->pid,
2097                        &pid);
2098   cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
2099   cp->cth = GNUNET_CORE_notify_transmit_ready (core,
2100                                                pm->priority,
2101                                                GNUNET_CONSTANTS_SERVICE_TIMEOUT,
2102                                                &pid,
2103                                                pm->msize,
2104                                                &transmit_to_peer,
2105                                                cp);
2106 }
2107
2108
2109 /**
2110  * Transmit messages by copying it to the target buffer
2111  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
2112  * for writing in the meantime.  In that case, do nothing
2113  * (the disconnect or shutdown handler will take care of the rest).
2114  * If we were able to transmit messages and there are still more
2115  * pending, ask core again for further calls to this function.
2116  *
2117  * @param cls closure, pointer to the 'struct ConnectedPeer*'
2118  * @param size number of bytes available in buf
2119  * @param buf where the callee should write the message
2120  * @return number of bytes written to buf
2121  */
2122 static size_t
2123 transmit_to_peer (void *cls,
2124                   size_t size, void *buf)
2125 {
2126   struct ConnectedPeer *cp = cls;
2127   char *cbuf = buf;
2128   struct PendingMessage *pm;
2129   struct PendingMessage *next_pm;
2130   struct GNUNET_TIME_Absolute now;
2131   struct GNUNET_TIME_Relative min_delay;
2132   struct MigrationReadyBlock *mb;
2133   struct MigrationReadyBlock *next;
2134   struct PutMessage migm;
2135   size_t msize;
2136   unsigned int i;
2137   struct GNUNET_PeerIdentity pid;
2138  
2139   cp->cth = NULL;
2140   if (NULL == buf)
2141     {
2142 #if DEBUG_FS
2143       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2144                   "Dropping message, core too busy.\n");
2145 #endif
2146       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2147                   "Dropping message, core too busy.\n");
2148       GNUNET_LOAD_update (cp->transmission_delay,
2149                           UINT64_MAX);
2150       
2151       if (NULL != (pm = cp->pending_messages_head))
2152         {
2153           GNUNET_CONTAINER_DLL_remove (cp->pending_messages_head,
2154                                        cp->pending_messages_tail,
2155                                        pm);
2156           cp->pending_requests--;    
2157           destroy_pending_message (pm, 0);
2158         }
2159       if (NULL != (pm = cp->pending_messages_head))
2160         {
2161           GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == cp->delayed_transmission_request_task);
2162           min_delay = GNUNET_TIME_absolute_get_remaining (pm->delay_until);
2163           cp->delayed_transmission_request_task
2164             = GNUNET_SCHEDULER_add_delayed (min_delay,
2165                                             &delayed_transmission_request,
2166                                             cp);
2167         }
2168       return 0;
2169     }  
2170   GNUNET_LOAD_update (cp->transmission_delay,
2171                       GNUNET_TIME_absolute_get_duration (cp->last_transmission_request_start).rel_value);
2172   now = GNUNET_TIME_absolute_get ();
2173   msize = 0;
2174   min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
2175   next_pm = cp->pending_messages_head;
2176   while ( (NULL != (pm = next_pm) ) &&
2177           (pm->msize <= size) )
2178     {
2179       next_pm = pm->next;
2180       if (pm->delay_until.abs_value > now.abs_value)
2181         {
2182           min_delay = GNUNET_TIME_relative_min (min_delay,
2183                                                 GNUNET_TIME_absolute_get_remaining (pm->delay_until));
2184           continue;
2185         }
2186       memcpy (&cbuf[msize], &pm[1], pm->msize);
2187       msize += pm->msize;
2188       size -= pm->msize;
2189       if (NULL == pm->pml)
2190         {
2191           GNUNET_CONTAINER_DLL_remove (cp->pending_messages_head,
2192                                        cp->pending_messages_tail,
2193                                        pm);
2194           cp->pending_requests--;
2195         }
2196       destroy_pending_message (pm, cp->pid);
2197     }
2198   if (pm != NULL)
2199     min_delay = GNUNET_TIME_UNIT_ZERO;
2200   if (NULL != cp->pending_messages_head)
2201     {     
2202       GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == cp->delayed_transmission_request_task);
2203       cp->delayed_transmission_request_task
2204         = GNUNET_SCHEDULER_add_delayed (min_delay,
2205                                         &delayed_transmission_request,
2206                                         cp);
2207     }
2208   if (pm == NULL)
2209     {      
2210       GNUNET_PEER_resolve (cp->pid,
2211                            &pid);
2212       next = mig_head;
2213       while (NULL != (mb = next))
2214         {
2215           next = mb->next;
2216           for (i=0;i<MIGRATION_LIST_SIZE;i++)
2217             {
2218               if ( (cp->pid == mb->target_list[i]) &&
2219                    (mb->size + sizeof (migm) <= size) )
2220                 {
2221                   GNUNET_PEER_change_rc (mb->target_list[i], -1);
2222                   mb->target_list[i] = 0;
2223                   mb->used_targets++;
2224                   memset (&migm, 0, sizeof (migm));
2225                   migm.header.size = htons (sizeof (migm) + mb->size);
2226                   migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2227                   migm.type = htonl (mb->type);
2228                   migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
2229                   memcpy (&cbuf[msize], &migm, sizeof (migm));
2230                   msize += sizeof (migm);
2231                   size -= sizeof (migm);
2232                   memcpy (&cbuf[msize], &mb[1], mb->size);
2233                   msize += mb->size;
2234                   size -= mb->size;
2235 #if DEBUG_FS
2236                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2237                               "Pushing migration block `%s' (%u bytes) to `%s'\n",
2238                               GNUNET_h2s (&mb->query),
2239                               (unsigned int) mb->size,
2240                               GNUNET_i2s (&pid));
2241 #endif    
2242                   break;
2243                 }
2244               else
2245                 {
2246 #if DEBUG_FS
2247                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2248                               "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
2249                               GNUNET_h2s (&mb->query),
2250                               (unsigned int) mb->size,
2251                               GNUNET_i2s (&pid));
2252 #endif    
2253                 }
2254             }
2255           if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
2256                (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
2257             {
2258               delete_migration_block (mb);
2259               consider_migration_gathering ();
2260             }
2261         }
2262       consider_migration (NULL, 
2263                           &pid.hashPubKey,
2264                           cp);
2265     }
2266 #if DEBUG_FS
2267   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2268               "Transmitting %u bytes to peer with PID %u\n",
2269               (unsigned int) msize,
2270               (unsigned int) cp->pid);
2271 #endif
2272   return msize;
2273 }
2274
2275
2276 /**
2277  * Add a message to the set of pending messages for the given peer.
2278  *
2279  * @param cp peer to send message to
2280  * @param pm message to queue
2281  * @param pr request on which behalf this message is being queued
2282  */
2283 static void
2284 add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
2285                                   struct PendingMessage *pm,
2286                                   struct PendingRequest *pr)
2287 {
2288   struct PendingMessage *pos;
2289   struct PendingMessageList *pml;
2290   struct GNUNET_PeerIdentity pid;
2291
2292   GNUNET_assert (pm->next == NULL);
2293   GNUNET_assert (pm->pml == NULL);    
2294   if (pr != NULL)
2295     {
2296       pml = GNUNET_malloc (sizeof (struct PendingMessageList));
2297       pml->req = pr;
2298       pml->target = cp;
2299       pml->pm = pm;
2300       pm->pml = pml;  
2301       GNUNET_CONTAINER_DLL_insert (pr->pending_head,
2302                                    pr->pending_tail,
2303                                    pml);
2304     }
2305   pos = cp->pending_messages_head;
2306   while ( (pos != NULL) &&
2307           (pm->priority < pos->priority) )
2308     pos = pos->next;    
2309   GNUNET_CONTAINER_DLL_insert_after (cp->pending_messages_head,
2310                                      cp->pending_messages_tail,
2311                                      pos,
2312                                      pm);
2313   cp->pending_requests++;
2314   if (cp->pending_requests > MAX_QUEUE_PER_PEER)
2315     {
2316       GNUNET_STATISTICS_update (stats,
2317                                 gettext_noop ("# P2P searches discarded (queue length bound)"),
2318                                 1,
2319                                 GNUNET_NO);
2320       destroy_pending_message (cp->pending_messages_tail, 0);  
2321     }
2322   GNUNET_PEER_resolve (cp->pid, &pid);
2323   if (NULL != cp->cth)
2324     {
2325       GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
2326       cp->cth = NULL;
2327     }
2328   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
2329     {
2330       GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task);
2331       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
2332     }
2333   /* need to schedule transmission */
2334   cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
2335   cp->cth = GNUNET_CORE_notify_transmit_ready (core,
2336                                                cp->pending_messages_head->priority,
2337                                                MAX_TRANSMIT_DELAY,
2338                                                &pid,
2339                                                cp->pending_messages_head->msize,
2340                                                &transmit_to_peer,
2341                                                cp);
2342   if (cp->cth == NULL)
2343     {
2344 #if DEBUG_FS
2345       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2346                   "Failed to schedule transmission with core!\n");
2347 #endif
2348       GNUNET_STATISTICS_update (stats,
2349                                 gettext_noop ("# CORE transmission failures"),
2350                                 1,
2351                                 GNUNET_NO);
2352     }
2353 }
2354
2355
2356 /**
2357  * Test if the DATABASE (GET) load on this peer is too high
2358  * to even consider processing the query at
2359  * all.  
2360  * 
2361  * @return GNUNET_YES if the load is too high to do anything (load high)
2362  *         GNUNET_NO to process normally (load normal)
2363  *         GNUNET_SYSERR to process for free (load low)
2364  */
2365 static int
2366 test_get_load_too_high (uint32_t priority)
2367 {
2368   double ld;
2369
2370   ld = GNUNET_LOAD_get_load (datastore_get_load);
2371   if (ld < 1)
2372     return GNUNET_SYSERR;    
2373   if (ld <= priority)    
2374     return GNUNET_NO;    
2375   return GNUNET_YES;
2376 }
2377
2378
2379
2380
2381 /**
2382  * Test if the DATABASE (PUT) load on this peer is too high
2383  * to even consider processing the query at
2384  * all.  
2385  * 
2386  * @return GNUNET_YES if the load is too high to do anything (load high)
2387  *         GNUNET_NO to process normally (load normal or low)
2388  */
2389 static int
2390 test_put_load_too_high (uint32_t priority)
2391 {
2392   double ld;
2393
2394   if (GNUNET_LOAD_get_average (datastore_put_load) < 50)
2395     return GNUNET_NO; /* very fast */
2396   ld = GNUNET_LOAD_get_load (datastore_put_load);
2397   if (ld < 2.0 * (1 + priority))
2398     return GNUNET_NO;
2399   GNUNET_STATISTICS_update (stats,
2400                             gettext_noop ("# storage requests dropped due to high load"),
2401                             1,
2402                             GNUNET_NO);
2403   return GNUNET_YES;
2404 }
2405
2406
2407 /* ******************* Pending Request Refresh Task ******************** */
2408
2409
2410
2411 /**
2412  * We use a random delay to make the timing of requests less
2413  * predictable.  This function returns such a random delay.  We add a base
2414  * delay of MAX_CORK_DELAY (1s).
2415  *
2416  * FIXME: make schedule dependent on the specifics of the request?
2417  * Or bandwidth and number of connected peers and load?
2418  *
2419  * @return random delay to use for some request, between 1s and 1000+TTL_DECREMENT ms
2420  */
2421 static struct GNUNET_TIME_Relative
2422 get_processing_delay ()
2423 {
2424   return 
2425     GNUNET_TIME_relative_add (GNUNET_CONSTANTS_MAX_CORK_DELAY,
2426                               GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
2427                                                              GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2428                                                                                        TTL_DECREMENT)));
2429 }
2430
2431
2432 /**
2433  * We're processing a GET request from another peer and have decided
2434  * to forward it to other peers.  This function is called periodically
2435  * and should forward the request to other peers until we have all
2436  * possible replies.  If we have transmitted the *only* reply to
2437  * the initiator we should destroy the pending request.  If we have
2438  * many replies in the queue to the initiator, we should delay sending
2439  * out more queries until the reply queue has shrunk some.
2440  *
2441  * @param cls our "struct ProcessGetContext *"
2442  * @param tc unused
2443  */
2444 static void
2445 forward_request_task (void *cls,
2446                       const struct GNUNET_SCHEDULER_TaskContext *tc);
2447
2448
2449 /**
2450  * Function called after we either failed or succeeded
2451  * at transmitting a query to a peer.  
2452  *
2453  * @param cls the requests "struct PendingRequest*"
2454  * @param tpid ID of receiving peer, 0 on transmission error
2455  */
2456 static void
2457 transmit_query_continuation (void *cls,
2458                              GNUNET_PEER_Id tpid)
2459 {
2460   struct PendingRequest *pr = cls;
2461   unsigned int i;
2462
2463   GNUNET_STATISTICS_update (stats,
2464                             gettext_noop ("# queries scheduled for forwarding"),
2465                             -1,
2466                             GNUNET_NO);
2467   if (tpid == 0)   
2468     {
2469 #if DEBUG_FS
2470       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2471                   "Transmission of request failed, will try again later.\n");
2472 #endif
2473       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2474         pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2475                                                  &forward_request_task,
2476                                                  pr); 
2477       return;    
2478     }
2479 #if DEBUG_FS
2480   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2481               "Transmitted query `%s'\n",
2482               GNUNET_h2s (&pr->query));
2483 #endif
2484   GNUNET_STATISTICS_update (stats,
2485                             gettext_noop ("# queries forwarded"),
2486                             1,
2487                             GNUNET_NO);
2488   for (i=0;i<pr->used_targets_off;i++)
2489     if (pr->used_targets[i].pid == tpid)
2490       break; /* found match! */    
2491   if (i == pr->used_targets_off)
2492     {
2493       /* need to create new entry */
2494       if (pr->used_targets_off == pr->used_targets_size)
2495         GNUNET_array_grow (pr->used_targets,
2496                            pr->used_targets_size,
2497                            pr->used_targets_size * 2 + 2);
2498       GNUNET_PEER_change_rc (tpid, 1);
2499       pr->used_targets[pr->used_targets_off].pid = tpid;
2500       pr->used_targets[pr->used_targets_off].num_requests = 0;
2501       i = pr->used_targets_off++;
2502     }
2503   pr->used_targets[i].last_request_time = GNUNET_TIME_absolute_get ();
2504   pr->used_targets[i].num_requests++;
2505   if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2506     pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2507                                              &forward_request_task,
2508                                              pr);
2509 }
2510
2511
2512 /**
2513  * How many bytes should a bloomfilter be if we have already seen
2514  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
2515  * of bits set per entry.  Furthermore, we should not re-size the
2516  * filter too often (to keep it cheap).
2517  *
2518  * Since other peers will also add entries but not resize the filter,
2519  * we should generally pick a slightly larger size than what the
2520  * strict math would suggest.
2521  *
2522  * @return must be a power of two and smaller or equal to 2^15.
2523  */
2524 static size_t
2525 compute_bloomfilter_size (unsigned int entry_count)
2526 {
2527   size_t size;
2528   unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
2529   uint16_t max = 1 << 15;
2530
2531   if (entry_count > max)
2532     return max;
2533   size = 8;
2534   while ((size < max) && (size < ideal))
2535     size *= 2;
2536   if (size > max)
2537     return max;
2538   return size;
2539 }
2540
2541
2542 /**
2543  * Recalculate our bloom filter for filtering replies.  This function
2544  * will create a new bloom filter from scratch, so it should only be
2545  * called if we have no bloomfilter at all (and hence can create a
2546  * fresh one of minimal size without problems) OR if our peer is the
2547  * initiator (in which case we may resize to larger than mimimum size).
2548  *
2549  * @param pr request for which the BF is to be recomputed
2550  */
2551 static void
2552 refresh_bloomfilter (struct PendingRequest *pr)
2553 {
2554   unsigned int i;
2555   size_t nsize;
2556   GNUNET_HashCode mhash;
2557
2558   nsize = compute_bloomfilter_size (pr->replies_seen_off);
2559   if (nsize == pr->bf_size)
2560     return; /* size not changed */
2561   if (pr->bf != NULL)
2562     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
2563   pr->bf_size = nsize;
2564   pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
2565   pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
2566                                               pr->bf_size,
2567                                               BLOOMFILTER_K);
2568   for (i=0;i<pr->replies_seen_off;i++)
2569     {
2570       GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i],
2571                                 pr->mingle,
2572                                 &mhash);
2573       GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
2574     }
2575 }
2576
2577
2578 /**
2579  * Function called after we've tried to reserve a certain amount of
2580  * bandwidth for a reply.  Check if we succeeded and if so send our
2581  * query.
2582  *
2583  * @param cls the requests "struct PendingRequest*"
2584  * @param peer identifies the peer
2585  * @param bpm_out set to the current bandwidth limit (sending) for this peer
2586  * @param amount set to the amount that was actually reserved or unreserved
2587  * @param preference current traffic preference for the given peer
2588  */
2589 static void
2590 target_reservation_cb (void *cls,
2591                        const struct
2592                        GNUNET_PeerIdentity * peer,
2593                        struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
2594                        int amount,
2595                        uint64_t preference)
2596 {
2597   struct PendingRequest *pr = cls;
2598   struct ConnectedPeer *cp;
2599   struct PendingMessage *pm;
2600   struct GetMessage *gm;
2601   GNUNET_HashCode *ext;
2602   char *bfdata;
2603   size_t msize;
2604   unsigned int k;
2605   int no_route;
2606   uint32_t bm;
2607   unsigned int i;
2608
2609   /* (3) transmit, update ttl/priority */
2610   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2611                                           &peer->hashPubKey);
2612   if (cp == NULL)
2613     {
2614       /* Peer must have just left */
2615 #if DEBUG_FS
2616       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2617                   "Selected peer disconnected!\n");
2618 #endif
2619       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2620         pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2621                                                  &forward_request_task,
2622                                                  pr);
2623       return;
2624     }
2625   cp->irc = NULL;
2626   pr->pirc = NULL;
2627   if (peer == NULL)
2628     {
2629       /* error in communication with core, try again later */
2630       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2631         pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2632                                                  &forward_request_task,
2633                                                  pr);
2634       return;
2635     }
2636   no_route = GNUNET_NO;
2637   if (amount == 0)
2638     {
2639       if (pr->cp == NULL)
2640         {
2641 #if DEBUG_FS > 1
2642           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2643                       "Failed to reserve bandwidth for reply (got %d/%u bytes only)!\n",
2644                       amount,
2645                       DBLOCK_SIZE);
2646 #endif
2647           GNUNET_STATISTICS_update (stats,
2648                                     gettext_noop ("# reply bandwidth reservation requests failed"),
2649                                     1,
2650                                     GNUNET_NO);
2651           if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2652             pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2653                                                      &forward_request_task,
2654                                                      pr);
2655           return;  /* this target round failed */
2656         }
2657       no_route = GNUNET_YES;
2658     }
2659   
2660   GNUNET_STATISTICS_update (stats,
2661                             gettext_noop ("# queries scheduled for forwarding"),
2662                             1,
2663                             GNUNET_NO);
2664   for (i=0;i<pr->used_targets_off;i++)
2665     if (pr->used_targets[i].pid == cp->pid) 
2666       {
2667         GNUNET_STATISTICS_update (stats,
2668                                   gettext_noop ("# queries retransmitted to same target"),
2669                                   1,
2670                                   GNUNET_NO);
2671         break;
2672       } 
2673
2674   /* build message and insert message into priority queue */
2675 #if DEBUG_FS
2676   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2677               "Forwarding request `%s' to `%4s'!\n",
2678               GNUNET_h2s (&pr->query),
2679               GNUNET_i2s (peer));
2680 #endif
2681   k = 0;
2682   bm = 0;
2683   if (GNUNET_YES == no_route)
2684     {
2685       bm |= GET_MESSAGE_BIT_RETURN_TO;
2686       k++;      
2687     }
2688   if (pr->namespace != NULL)
2689     {
2690       bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
2691       k++;
2692     }
2693   if (pr->target_pid != 0)
2694     {
2695       bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
2696       k++;
2697     }
2698   msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
2699   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
2700   pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
2701   pm->msize = msize;
2702   gm = (struct GetMessage*) &pm[1];
2703   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
2704   gm->header.size = htons (msize);
2705   gm->type = htonl (pr->type);
2706   pr->remaining_priority /= 2;
2707   gm->priority = htonl (pr->remaining_priority);
2708   gm->ttl = htonl (pr->ttl);
2709   gm->filter_mutator = htonl(pr->mingle); 
2710   gm->hash_bitmap = htonl (bm);
2711   gm->query = pr->query;
2712   ext = (GNUNET_HashCode*) &gm[1];
2713   k = 0;
2714   if (GNUNET_YES == no_route)
2715     GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2716   if (pr->namespace != NULL)
2717     memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
2718   if (pr->target_pid != 0)
2719     GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2720   bfdata = (char *) &ext[k];
2721   if (pr->bf != NULL)
2722     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
2723                                                bfdata,
2724                                                pr->bf_size);
2725   pm->cont = &transmit_query_continuation;
2726   pm->cont_cls = pr;
2727   cp->last_request_times[(cp->last_request_times_off++) % MAX_QUEUE_PER_PEER] = GNUNET_TIME_absolute_get ();
2728   add_to_pending_messages_for_peer (cp, pm, pr);
2729 }
2730
2731
2732 /**
2733  * Closure used for "target_peer_select_cb".
2734  */
2735 struct PeerSelectionContext 
2736 {
2737   /**
2738    * The request for which we are selecting
2739    * peers.
2740    */
2741   struct PendingRequest *pr;
2742
2743   /**
2744    * Current "prime" target.
2745    */
2746   struct GNUNET_PeerIdentity target;
2747
2748   /**
2749    * How much do we like this target?
2750    */
2751   double target_score;
2752
2753   /**
2754    * Does it make sense to we re-try quickly again?
2755    */
2756   int fast_retry;
2757
2758 };
2759
2760
2761 /**
2762  * Function called for each connected peer to determine
2763  * which one(s) would make good targets for forwarding.
2764  *
2765  * @param cls closure (struct PeerSelectionContext)
2766  * @param key current key code (peer identity)
2767  * @param value value in the hash map (struct ConnectedPeer)
2768  * @return GNUNET_YES if we should continue to
2769  *         iterate,
2770  *         GNUNET_NO if not.
2771  */
2772 static int
2773 target_peer_select_cb (void *cls,
2774                        const GNUNET_HashCode * key,
2775                        void *value)
2776 {
2777   struct PeerSelectionContext *psc = cls;
2778   struct ConnectedPeer *cp = value;
2779   struct PendingRequest *pr = psc->pr;
2780   struct GNUNET_TIME_Relative delay;
2781   double score;
2782   unsigned int i;
2783   unsigned int pc;
2784
2785   /* 1) check that this peer is not the initiator */
2786   if (cp == pr->cp)     
2787     {
2788 #if DEBUG_FS
2789       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2790                   "Skipping initiator in forwarding selection\n");
2791 #endif
2792       return GNUNET_YES; /* skip */        
2793     }
2794   if (cp->irc != NULL)
2795     {
2796       psc->fast_retry = GNUNET_YES;
2797       return GNUNET_YES; /* skip: already querying core about this peer for other reasons */
2798     }
2799
2800   /* 2) check if we have already (recently) forwarded to this peer */
2801   /* 2a) this particular request */
2802   pc = 0;
2803   for (i=0;i<pr->used_targets_off;i++)
2804     if (pr->used_targets[i].pid == cp->pid) 
2805       {
2806         pc = pr->used_targets[i].num_requests;
2807         GNUNET_assert (pc > 0);
2808         /* FIXME: make re-enabling a peer independent of how often
2809            this function is called??? */
2810         if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2811                                            RETRY_PROBABILITY_INV * pc))
2812           {
2813 #if DEBUG_FS
2814             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2815                         "NOT re-trying query that was previously transmitted %u times\n",
2816                         (unsigned int) pc);
2817 #endif
2818             return GNUNET_YES; /* skip */
2819           }
2820         break;
2821       }
2822 #if DEBUG_FS
2823   if (0 < pc)
2824     {
2825       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2826                   "Re-trying query that was previously transmitted %u times to this peer\n",
2827                   (unsigned int) pc);
2828     }
2829 #endif
2830   /* 2b) many other requests to this peer */
2831   delay = GNUNET_TIME_absolute_get_duration (cp->last_request_times[cp->last_request_times_off % MAX_QUEUE_PER_PEER]);
2832   if (delay.rel_value <= cp->avg_delay.rel_value)
2833     {
2834 #if DEBUG_FS
2835       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2836                   "NOT sending query since we send %u others to this peer in the last %llums\n",
2837                   MAX_QUEUE_PER_PEER,
2838                   cp->avg_delay.rel_value);
2839 #endif
2840       return GNUNET_YES; /* skip */      
2841     }
2842
2843   /* 3) calculate how much we'd like to forward to this peer,
2844      starting with a random value that is strong enough
2845      to at least give any peer a chance sometimes 
2846      (compared to the other factors that come later) */
2847   /* 3a) count successful (recent) routes from cp for same source */
2848   if (pr->cp != NULL)
2849     {
2850       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2851                                         P2P_SUCCESS_LIST_SIZE);
2852       for (i=0;i<P2P_SUCCESS_LIST_SIZE;i++)
2853         if (cp->last_p2p_replies[i] == pr->cp->pid)
2854           score += 1.0; /* likely successful based on hot path */
2855     }
2856   else
2857     {
2858       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2859                                         CS2P_SUCCESS_LIST_SIZE);
2860       for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
2861         if (cp->last_client_replies[i] == pr->client_request_list->client_list->client)
2862           score += 1.0; /* likely successful based on hot path */
2863     }
2864   /* 3b) include latency */
2865   if (cp->avg_delay.rel_value < 4 * TTL_DECREMENT)
2866     score += 1.0; /* likely fast based on latency */
2867   /* 3c) include priorities */
2868   if (cp->avg_priority <= pr->remaining_priority / 2.0)
2869     score += 1.0; /* likely successful based on priorities */
2870   /* 3d) penalize for queue size */  
2871   score -= (2.0 * cp->pending_requests / (double) MAX_QUEUE_PER_PEER); 
2872   /* 3e) include peer proximity */
2873   score -= (2.0 * (GNUNET_CRYPTO_hash_distance_u32 (key,
2874                                                     &pr->query)) / (double) UINT32_MAX);
2875   /* 4) super-bonus for being the known target */
2876   if (pr->target_pid == cp->pid)
2877     score += 100.0;
2878   /* store best-fit in closure */
2879   score++; /* avoid zero */
2880   if (score > psc->target_score)
2881     {
2882       psc->target_score = score;
2883       psc->target.hashPubKey = *key; 
2884     }
2885 #if DEBUG_FS
2886   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2887               "Peer `%s' gets score %f for forwarding query, max is %8f\n",
2888               GNUNET_h2s (key),
2889               score,
2890               psc->target_score);
2891 #endif
2892   return GNUNET_YES;
2893 }
2894   
2895
2896 /**
2897  * The priority level imposes a bound on the maximum
2898  * value for the ttl that can be requested.
2899  *
2900  * @param ttl_in requested ttl
2901  * @param prio given priority
2902  * @return ttl_in if ttl_in is below the limit,
2903  *         otherwise the ttl-limit for the given priority
2904  */
2905 static int32_t
2906 bound_ttl (int32_t ttl_in, uint32_t prio)
2907 {
2908   unsigned long long allowed;
2909
2910   if (ttl_in <= 0)
2911     return ttl_in;
2912   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
2913   if (ttl_in > allowed)      
2914     {
2915       if (allowed >= (1 << 30))
2916         return 1 << 30;
2917       return allowed;
2918     }
2919   return ttl_in;
2920 }
2921
2922
2923 /**
2924  * Iterator called on each result obtained for a DHT
2925  * operation that expects a reply
2926  *
2927  * @param cls closure
2928  * @param exp when will this value expire
2929  * @param key key of the result
2930  * @param get_path NULL-terminated array of pointers
2931  *                 to the peers on reverse GET path (or NULL if not recorded)
2932  * @param put_path NULL-terminated array of pointers
2933  *                 to the peers on the PUT path (or NULL if not recorded)
2934  * @param type type of the result
2935  * @param size number of bytes in data
2936  * @param data pointer to the result data
2937  */
2938 static void
2939 process_dht_reply (void *cls,
2940                    struct GNUNET_TIME_Absolute exp,
2941                    const GNUNET_HashCode * key,
2942                    const struct GNUNET_PeerIdentity * const *get_path,
2943                    const struct GNUNET_PeerIdentity * const *put_path,
2944                    enum GNUNET_BLOCK_Type type,
2945                    size_t size,
2946                    const void *data);
2947
2948
2949 /**
2950  * We're processing a GET request and have decided
2951  * to forward it to other peers.  This function is called periodically
2952  * and should forward the request to other peers until we have all
2953  * possible replies.  If we have transmitted the *only* reply to
2954  * the initiator we should destroy the pending request.  If we have
2955  * many replies in the queue to the initiator, we should delay sending
2956  * out more queries until the reply queue has shrunk some.
2957  *
2958  * @param cls our "struct ProcessGetContext *"
2959  * @param tc unused
2960  */
2961 static void
2962 forward_request_task (void *cls,
2963                      const struct GNUNET_SCHEDULER_TaskContext *tc)
2964 {
2965   struct PendingRequest *pr = cls;
2966   struct PeerSelectionContext psc;
2967   struct ConnectedPeer *cp; 
2968   struct GNUNET_TIME_Relative delay;
2969
2970   pr->task = GNUNET_SCHEDULER_NO_TASK;
2971   if (pr->pirc != NULL)
2972     {
2973 #if DEBUG_FS
2974       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2975                   "Forwarding of query `%s' not attempted due to pending local lookup!\n",
2976                   GNUNET_h2s (&pr->query));
2977 #endif
2978       return; /* already pending */
2979     }
2980   if (GNUNET_YES == pr->local_only)
2981     return; /* configured to not do P2P search */
2982   /* (0) try DHT */
2983   if ( (0 == pr->anonymity_level) &&
2984        (GNUNET_YES != pr->forward_only) &&
2985        (pr->type != GNUNET_BLOCK_TYPE_FS_DBLOCK) &&
2986        (pr->type != GNUNET_BLOCK_TYPE_FS_IBLOCK) )
2987     {
2988       pr->dht_get = GNUNET_DHT_get_start (dht_handle,
2989                                           GNUNET_TIME_UNIT_FOREVER_REL,
2990                                           pr->type,
2991                                           &pr->query,
2992                                           DEFAULT_GET_REPLICATION,
2993                                           GNUNET_DHT_RO_NONE,
2994                                           pr->bf,
2995                                           pr->mingle,
2996                                           pr->namespace,
2997                                           (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
2998                                           &process_dht_reply,
2999                                           pr);
3000     }
3001   /* (1) select target */
3002   psc.pr = pr;
3003   psc.target_score = -DBL_MAX;
3004   psc.fast_retry = GNUNET_NO;
3005   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
3006                                          &target_peer_select_cb,
3007                                          &psc);  
3008   if (psc.target_score == -DBL_MAX)
3009     {
3010       if (psc.fast_retry == GNUNET_YES)
3011         delay = GNUNET_TIME_UNIT_MILLISECONDS; /* FIXME: store adaptive fast-retry value in 'pr' */
3012       else
3013         delay = get_processing_delay ();
3014 #if DEBUG_FS 
3015       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3016                   "No peer selected for forwarding of query `%s', will try again in %llu ms!\n",
3017                   GNUNET_h2s (&pr->query),
3018                   delay.rel_value);
3019 #endif
3020       pr->task = GNUNET_SCHEDULER_add_delayed (delay,
3021                                                &forward_request_task,
3022                                                pr);
3023       return; /* nobody selected */
3024     }
3025   /* (3) update TTL/priority */
3026   if (pr->client_request_list != NULL)
3027     {
3028       /* FIXME: use better algorithm!? */
3029       if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3030                                          4))
3031         pr->priority++;
3032       /* bound priority we use by priorities we see from other peers
3033          rounded up (must round up so that we can see non-zero
3034          priorities, but round up as little as possible to make it
3035          plausible that we forwarded another peers request) */
3036       if (pr->priority > current_priorities + 1.0)
3037         pr->priority = (uint32_t) current_priorities + 1.0;
3038       pr->ttl = bound_ttl (pr->ttl + TTL_DECREMENT * 2,
3039                            pr->priority);
3040 #if DEBUG_FS
3041       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3042                   "Trying query `%s' with priority %u and TTL %d.\n",
3043                   GNUNET_h2s (&pr->query),
3044                   pr->priority,
3045                   pr->ttl);
3046 #endif
3047     }
3048
3049   /* (3) reserve reply bandwidth */
3050   if (GNUNET_NO == pr->forward_only)
3051     {
3052       cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3053                                               &psc.target.hashPubKey);
3054       GNUNET_assert (NULL != cp);
3055       GNUNET_assert (cp->irc == NULL);
3056       pr->pirc = cp;
3057       cp->pr = pr;
3058       cp->irc = GNUNET_CORE_peer_change_preference (core,
3059                                                     &psc.target,
3060                                                     GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
3061                                                     GNUNET_BANDWIDTH_value_init (UINT32_MAX),
3062                                                     DBLOCK_SIZE * 2, 
3063                                                     cp->inc_preference,
3064                                                     &target_reservation_cb,
3065                                                     pr);
3066       GNUNET_assert (cp->irc != NULL);
3067       cp->inc_preference = 0;
3068     }
3069   else
3070     {
3071       /* force forwarding */
3072       static struct GNUNET_BANDWIDTH_Value32NBO zerobw;
3073       target_reservation_cb (pr, &psc.target,
3074                              zerobw, 0, 0.0);
3075     }
3076 }
3077
3078
3079 /* **************************** P2P PUT Handling ************************ */
3080
3081
3082 /**
3083  * Function called after we either failed or succeeded
3084  * at transmitting a reply to a peer.  
3085  *
3086  * @param cls the requests "struct PendingRequest*"
3087  * @param tpid ID of receiving peer, 0 on transmission error
3088  */
3089 static void
3090 transmit_reply_continuation (void *cls,
3091                              GNUNET_PEER_Id tpid)
3092 {
3093   struct PendingRequest *pr = cls;
3094   
3095   switch (pr->type)
3096     {
3097     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
3098     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
3099       /* only one reply expected, done with the request! */
3100       destroy_pending_request (pr);
3101       break;
3102     case GNUNET_BLOCK_TYPE_ANY:
3103     case GNUNET_BLOCK_TYPE_FS_KBLOCK:
3104     case GNUNET_BLOCK_TYPE_FS_SBLOCK:
3105       break;
3106     default:
3107       GNUNET_break (0);
3108       break;
3109     }
3110 }
3111
3112
3113 /**
3114  * Transmit the given message by copying it to the target buffer
3115  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
3116  * for writing in the meantime.  In that case, do nothing
3117  * (the disconnect or shutdown handler will take care of the rest).
3118  * If we were able to transmit messages and there are still more
3119  * pending, ask core again for further calls to this function.
3120  *
3121  * @param cls closure, pointer to the 'struct ClientList*'
3122  * @param size number of bytes available in buf
3123  * @param buf where the callee should write the message
3124  * @return number of bytes written to buf
3125  */
3126 static size_t
3127 transmit_to_client (void *cls,
3128                   size_t size, void *buf)
3129 {
3130   struct ClientList *cl = cls;
3131   char *cbuf = buf;
3132   struct ClientResponseMessage *creply;
3133   size_t msize;
3134   
3135   cl->th = NULL;
3136   if (NULL == buf)
3137     {
3138 #if DEBUG_FS
3139       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3140                   "Not sending reply, client communication problem.\n");
3141 #endif
3142       return 0;
3143     }
3144   msize = 0;
3145   while ( (NULL != (creply = cl->res_head) ) &&
3146           (creply->msize <= size) )
3147     {
3148       memcpy (&cbuf[msize], &creply[1], creply->msize);
3149       msize += creply->msize;
3150       size -= creply->msize;
3151       GNUNET_CONTAINER_DLL_remove (cl->res_head,
3152                                    cl->res_tail,
3153                                    creply);
3154       GNUNET_free (creply);
3155     }
3156   if (NULL != creply)
3157     cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
3158                                                   creply->msize,
3159                                                   GNUNET_TIME_UNIT_FOREVER_REL,
3160                                                   &transmit_to_client,
3161                                                   cl);
3162 #if DEBUG_FS
3163   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3164               "Transmitted %u bytes to client\n",
3165               (unsigned int) msize);
3166 #endif
3167   return msize;
3168 }
3169
3170
3171 /**
3172  * Closure for "process_reply" function.
3173  */
3174 struct ProcessReplyClosure
3175 {
3176   /**
3177    * The data for the reply.
3178    */
3179   const void *data;
3180
3181   /**
3182    * Who gave us this reply? NULL for local host (or DHT)
3183    */
3184   struct ConnectedPeer *sender;
3185
3186   /**
3187    * When the reply expires.
3188    */
3189   struct GNUNET_TIME_Absolute expiration;
3190
3191   /**
3192    * Size of data.
3193    */
3194   size_t size;
3195
3196   /**
3197    * Type of the block.
3198    */
3199   enum GNUNET_BLOCK_Type type;
3200
3201   /**
3202    * How much was this reply worth to us?
3203    */
3204   uint32_t priority;
3205
3206   /**
3207    * Evaluation result (returned).
3208    */
3209   enum GNUNET_BLOCK_EvaluationResult eval;
3210
3211   /**
3212    * Did we finish processing the associated request?
3213    */ 
3214   int finished;
3215
3216   /**
3217    * Did we find a matching request?
3218    */
3219   int request_found;
3220 };
3221
3222
3223 /**
3224  * We have received a reply; handle it!
3225  *
3226  * @param cls response (struct ProcessReplyClosure)
3227  * @param key our query
3228  * @param value value in the hash map (info about the query)
3229  * @return GNUNET_YES (we should continue to iterate)
3230  */
3231 static int
3232 process_reply (void *cls,
3233                const GNUNET_HashCode * key,
3234                void *value)
3235 {
3236   struct ProcessReplyClosure *prq = cls;
3237   struct PendingRequest *pr = value;
3238   struct PendingMessage *reply;
3239   struct ClientResponseMessage *creply;
3240   struct ClientList *cl;
3241   struct PutMessage *pm;
3242   struct ConnectedPeer *cp;
3243   struct GNUNET_TIME_Relative cur_delay;
3244 #if SUPPORT_DELAYS  
3245 struct GNUNET_TIME_Relative art_delay;
3246 #endif
3247   size_t msize;
3248   unsigned int i;
3249
3250 #if DEBUG_FS
3251   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3252               "Matched result (type %u) for query `%s' with pending request\n",
3253               (unsigned int) prq->type,
3254               GNUNET_h2s (key));
3255 #endif  
3256   GNUNET_STATISTICS_update (stats,
3257                             gettext_noop ("# replies received and matched"),
3258                             1,
3259                             GNUNET_NO);
3260   if (prq->sender != NULL)
3261     {
3262       for (i=0;i<pr->used_targets_off;i++)
3263         if (pr->used_targets[i].pid == prq->sender->pid)
3264           break;
3265       if (i < pr->used_targets_off)
3266         {
3267           cur_delay = GNUNET_TIME_absolute_get_duration (pr->used_targets[i].last_request_time);      
3268           prq->sender->avg_delay.rel_value
3269             = (prq->sender->avg_delay.rel_value * 
3270                (RUNAVG_DELAY_N - 1) + cur_delay.rel_value) / RUNAVG_DELAY_N; 
3271           prq->sender->avg_priority
3272             = (prq->sender->avg_priority * 
3273                (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
3274         }
3275       if (pr->cp != NULL)
3276         {
3277           GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
3278                                  [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], 
3279                                  -1);
3280           GNUNET_PEER_change_rc (pr->cp->pid, 1);
3281           prq->sender->last_p2p_replies
3282             [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
3283             = pr->cp->pid;
3284         }
3285       else
3286         {
3287           if (NULL != prq->sender->last_client_replies
3288               [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
3289             GNUNET_SERVER_client_drop (prq->sender->last_client_replies
3290                                        [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
3291           prq->sender->last_client_replies
3292             [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
3293             = pr->client_request_list->client_list->client;
3294           GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
3295         }
3296     }
3297   prq->eval = GNUNET_BLOCK_evaluate (block_ctx,
3298                                      prq->type,
3299                                      key,
3300                                      &pr->bf,
3301                                      pr->mingle,
3302                                      pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
3303                                      prq->data,
3304                                      prq->size);
3305   switch (prq->eval)
3306     {
3307     case GNUNET_BLOCK_EVALUATION_OK_MORE:
3308       break;
3309     case GNUNET_BLOCK_EVALUATION_OK_LAST:
3310       while (NULL != pr->pending_head)
3311         destroy_pending_message_list_entry (pr->pending_head);
3312       if (pr->qe != NULL)
3313         {
3314           if (pr->client_request_list != NULL)
3315             GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
3316                                         GNUNET_YES);
3317           GNUNET_DATASTORE_cancel (pr->qe);
3318           pr->qe = NULL;
3319         }
3320       pr->do_remove = GNUNET_YES;
3321       if (pr->task != GNUNET_SCHEDULER_NO_TASK)
3322         {
3323           GNUNET_SCHEDULER_cancel (pr->task);
3324           pr->task = GNUNET_SCHEDULER_NO_TASK;
3325         }
3326       GNUNET_break (GNUNET_YES ==
3327                     GNUNET_CONTAINER_multihashmap_remove (query_request_map,
3328                                                           key,
3329                                                           pr));
3330       GNUNET_LOAD_update (rt_entry_lifetime,
3331                           GNUNET_TIME_absolute_get_duration (pr->start_time).rel_value);
3332       break;
3333     case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
3334       GNUNET_STATISTICS_update (stats,
3335                                 gettext_noop ("# duplicate replies discarded (bloomfilter)"),
3336                                 1,
3337                                 GNUNET_NO);
3338 #if DEBUG_FS
3339 /*      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3340                   "Duplicate response `%s', discarding.\n",
3341                   GNUNET_h2s (&mhash));*/
3342 #endif
3343       return GNUNET_YES; /* duplicate */
3344     case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
3345       return GNUNET_YES; /* wrong namespace */  
3346     case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
3347       GNUNET_break (0);
3348       return GNUNET_YES;
3349     case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
3350       GNUNET_break (0);
3351       return GNUNET_YES;
3352     case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
3353       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3354                   _("Unsupported block type %u\n"),
3355                   prq->type);
3356       return GNUNET_NO;
3357     }
3358   if (pr->client_request_list != NULL)
3359     {
3360       if (pr->replies_seen_size == pr->replies_seen_off)
3361         GNUNET_array_grow (pr->replies_seen,
3362                            pr->replies_seen_size,
3363                            pr->replies_seen_size * 2 + 4);      
3364       GNUNET_CRYPTO_hash (prq->data,
3365                           prq->size,
3366                           &pr->replies_seen[pr->replies_seen_off++]);         
3367       refresh_bloomfilter (pr);
3368     }
3369   if (NULL == prq->sender)
3370     {
3371 #if DEBUG_FS
3372       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3373                   "Found result for query `%s' in local datastore\n",
3374                   GNUNET_h2s (key));
3375 #endif
3376       GNUNET_STATISTICS_update (stats,
3377                                 gettext_noop ("# results found locally"),
3378                                 1,
3379                                 GNUNET_NO);      
3380     }
3381   prq->priority += pr->remaining_priority;
3382   pr->remaining_priority = 0;
3383   pr->results_found++;
3384   prq->request_found = GNUNET_YES;
3385   if (NULL != pr->client_request_list)
3386     {
3387       GNUNET_STATISTICS_update (stats,
3388                                 gettext_noop ("# replies received for local clients"),
3389                                 1,
3390                                 GNUNET_NO);
3391       cl = pr->client_request_list->client_list;
3392       msize = sizeof (struct PutMessage) + prq->size;
3393       creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage));
3394       creply->msize = msize;
3395       creply->client_list = cl;
3396       GNUNET_CONTAINER_DLL_insert_after (cl->res_head,
3397                                          cl->res_tail,
3398                                          cl->res_tail,
3399                                          creply);      
3400       pm = (struct PutMessage*) &creply[1];
3401       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3402       pm->header.size = htons (msize);
3403       pm->type = htonl (prq->type);
3404       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3405       memcpy (&pm[1], prq->data, prq->size);      
3406       if (NULL == cl->th)
3407         {
3408 #if DEBUG_FS
3409           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3410                       "Transmitting result for query `%s' to client\n",
3411                       GNUNET_h2s (key));
3412 #endif  
3413           cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
3414                                                         msize,
3415                                                         GNUNET_TIME_UNIT_FOREVER_REL,
3416                                                         &transmit_to_client,
3417                                                         cl);
3418         }
3419       GNUNET_break (cl->th != NULL);
3420       if (pr->do_remove)                
3421         {
3422           prq->finished = GNUNET_YES;
3423           destroy_pending_request (pr);         
3424         }
3425     }
3426   else
3427     {
3428       cp = pr->cp;
3429 #if DEBUG_FS
3430       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3431                   "Transmitting result for query `%s' to other peer (PID=%u)\n",
3432                   GNUNET_h2s (key),
3433                   (unsigned int) cp->pid);
3434 #endif  
3435       GNUNET_STATISTICS_update (stats,
3436                                 gettext_noop ("# replies received for other peers"),
3437                                 1,
3438                                 GNUNET_NO);
3439       msize = sizeof (struct PutMessage) + prq->size;
3440       reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
3441       reply->cont = &transmit_reply_continuation;
3442       reply->cont_cls = pr;
3443 #if SUPPORT_DELAYS
3444       art_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
3445                                                  GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3446                                                                            TTL_DECREMENT));
3447       reply->delay_until 
3448         = GNUNET_TIME_relative_to_absolute (art_delay);
3449       GNUNET_STATISTICS_update (stats,
3450                                 gettext_noop ("cummulative artificial delay introduced (ms)"),
3451                                 art_delay.abs_value,
3452                                 GNUNET_NO);
3453 #endif
3454       reply->msize = msize;
3455       reply->priority = UINT32_MAX; /* send replies first! */
3456       pm = (struct PutMessage*) &reply[1];
3457       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3458       pm->header.size = htons (msize);
3459       pm->type = htonl (prq->type);
3460       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3461       memcpy (&pm[1], prq->data, prq->size);
3462       add_to_pending_messages_for_peer (cp, reply, pr);
3463     }
3464   return GNUNET_YES;
3465 }
3466
3467
3468 /**
3469  * Iterator called on each result obtained for a DHT
3470  * operation that expects a reply
3471  *
3472  * @param cls closure
3473  * @param exp when will this value expire
3474  * @param key key of the result
3475  * @param get_path NULL-terminated array of pointers
3476  *                 to the peers on reverse GET path (or NULL if not recorded)
3477  * @param put_path NULL-terminated array of pointers
3478  *                 to the peers on the PUT path (or NULL if not recorded)
3479  * @param type type of the result
3480  * @param size number of bytes in data
3481  * @param data pointer to the result data
3482  */
3483 static void
3484 process_dht_reply (void *cls,
3485                    struct GNUNET_TIME_Absolute exp,
3486                    const GNUNET_HashCode * key,
3487                    const struct GNUNET_PeerIdentity * const *get_path,
3488                    const struct GNUNET_PeerIdentity * const *put_path,
3489                    enum GNUNET_BLOCK_Type type,
3490                    size_t size,
3491                    const void *data)
3492 {
3493   struct PendingRequest *pr = cls;
3494   struct ProcessReplyClosure prq;
3495
3496   memset (&prq, 0, sizeof (prq));
3497   prq.data = data;
3498   prq.expiration = exp;
3499   prq.size = size;  
3500   prq.type = type;
3501   process_reply (&prq, key, pr);
3502 }
3503
3504
3505
3506 /**
3507  * Continuation called to notify client about result of the
3508  * operation.
3509  *
3510  * @param cls closure
3511  * @param success GNUNET_SYSERR on failure
3512  * @param msg NULL on success, otherwise an error message
3513  */
3514 static void 
3515 put_migration_continuation (void *cls,
3516                             int success,
3517                             const char *msg)
3518 {
3519   struct GNUNET_TIME_Absolute *start = cls;
3520   struct GNUNET_TIME_Relative delay;
3521   
3522   delay = GNUNET_TIME_absolute_get_duration (*start);
3523   GNUNET_free (start);
3524   GNUNET_LOAD_update (datastore_put_load,
3525                       delay.rel_value);
3526   if (GNUNET_OK == success)
3527     return;
3528   GNUNET_STATISTICS_update (stats,
3529                             gettext_noop ("# datastore 'put' failures"),
3530                             1,
3531                             GNUNET_NO);
3532 }
3533
3534
3535 /**
3536  * Handle P2P "PUT" message.
3537  *
3538  * @param cls closure, always NULL
3539  * @param other the other peer involved (sender or receiver, NULL
3540  *        for loopback messages where we are both sender and receiver)
3541  * @param message the actual message
3542  * @param atsi performance information
3543  * @return GNUNET_OK to keep the connection open,
3544  *         GNUNET_SYSERR to close it (signal serious error)
3545  */
3546 static int
3547 handle_p2p_put (void *cls,
3548                 const struct GNUNET_PeerIdentity *other,
3549                 const struct GNUNET_MessageHeader *message,
3550                 const struct GNUNET_TRANSPORT_ATS_Information *atsi)
3551 {
3552   const struct PutMessage *put;
3553   uint16_t msize;
3554   size_t dsize;
3555   enum GNUNET_BLOCK_Type type;
3556   struct GNUNET_TIME_Absolute expiration;
3557   GNUNET_HashCode query;
3558   struct ProcessReplyClosure prq;
3559   struct GNUNET_TIME_Absolute *start;
3560   struct GNUNET_TIME_Relative block_time;  
3561   double putl;
3562   struct ConnectedPeer *cp; 
3563   struct PendingMessage *pm;
3564   struct MigrationStopMessage *msm;
3565
3566   msize = ntohs (message->size);
3567   if (msize < sizeof (struct PutMessage))
3568     {
3569       GNUNET_break_op(0);
3570       return GNUNET_SYSERR;
3571     }
3572   put = (const struct PutMessage*) message;
3573   dsize = msize - sizeof (struct PutMessage);
3574   type = ntohl (put->type);
3575   expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
3576
3577   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3578     return GNUNET_SYSERR;
3579   if (GNUNET_OK !=
3580       GNUNET_BLOCK_get_key (block_ctx,
3581                             type,
3582                             &put[1],
3583                             dsize,
3584                             &query))
3585     {
3586       GNUNET_break_op (0);
3587       return GNUNET_SYSERR;
3588     }
3589 #if DEBUG_FS
3590   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3591               "Received result for query `%s' from peer `%4s'\n",
3592               GNUNET_h2s (&query),
3593               GNUNET_i2s (other));
3594 #endif
3595   GNUNET_STATISTICS_update (stats,
3596                             gettext_noop ("# replies received (overall)"),
3597                             1,
3598                             GNUNET_NO);
3599   /* now, lookup 'query' */
3600   prq.data = (const void*) &put[1];
3601   if (other != NULL)
3602     prq.sender = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3603                                                     &other->hashPubKey);
3604   else
3605     prq.sender = NULL;
3606   prq.size = dsize;
3607   prq.type = type;
3608   prq.expiration = expiration;
3609   prq.priority = 0;
3610   prq.finished = GNUNET_NO;
3611   prq.request_found = GNUNET_NO;
3612   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3613                                               &query,
3614                                               &process_reply,
3615                                               &prq);
3616   if (prq.sender != NULL)
3617     {
3618       prq.sender->inc_preference += CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority;
3619       change_host_trust (prq.sender, prq.priority);
3620     }
3621   if ( (GNUNET_YES == active_migration) &&
3622        (GNUNET_NO == test_put_load_too_high (prq.priority)) )
3623     {      
3624 #if DEBUG_FS
3625       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3626                   "Replicating result for query `%s' with priority %u\n",
3627                   GNUNET_h2s (&query),
3628                   prq.priority);
3629 #endif
3630       start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
3631       *start = GNUNET_TIME_absolute_get ();
3632       GNUNET_DATASTORE_put (dsh,
3633                             0, &query, dsize, &put[1],
3634                             type, prq.priority, 1 /* anonymity */, 
3635                             expiration, 
3636                             1 + prq.priority, MAX_DATASTORE_QUEUE,
3637                             GNUNET_CONSTANTS_SERVICE_TIMEOUT,
3638                             &put_migration_continuation, 
3639                             start);
3640     }
3641   putl = GNUNET_LOAD_get_load (datastore_put_load);
3642   if ( (GNUNET_NO == prq.request_found) &&
3643        ( (GNUNET_YES != active_migration) ||
3644          (putl > 2.5 * (1 + prq.priority)) ) )
3645     {
3646       cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3647                                               &other->hashPubKey);
3648       if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).rel_value < 5000)
3649         return GNUNET_OK; /* already blocked */
3650       /* We're too busy; send MigrationStop message! */
3651       if (GNUNET_YES != active_migration) 
3652         putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
3653       block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
3654                                                   5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3655                                                                                    (unsigned int) (60000 * putl * putl)));
3656       
3657       cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
3658       pm = GNUNET_malloc (sizeof (struct PendingMessage) + 
3659                           sizeof (struct MigrationStopMessage));
3660       pm->msize = sizeof (struct MigrationStopMessage);
3661       pm->priority = UINT32_MAX;
3662       msm = (struct MigrationStopMessage*) &pm[1];
3663       msm->header.size = htons (sizeof (struct MigrationStopMessage));
3664       msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
3665       msm->duration = GNUNET_TIME_relative_hton (block_time);
3666       add_to_pending_messages_for_peer (cp,
3667                                         pm,
3668                                         NULL);
3669     }
3670   return GNUNET_OK;
3671 }
3672
3673
3674 /**
3675  * Handle P2P "MIGRATION_STOP" message.
3676  *
3677  * @param cls closure, always NULL
3678  * @param other the other peer involved (sender or receiver, NULL
3679  *        for loopback messages where we are both sender and receiver)
3680  * @param message the actual message
3681  * @param atsi performance information
3682  * @return GNUNET_OK to keep the connection open,
3683  *         GNUNET_SYSERR to close it (signal serious error)
3684  */
3685 static int
3686 handle_p2p_migration_stop (void *cls,
3687                            const struct GNUNET_PeerIdentity *other,
3688                            const struct GNUNET_MessageHeader *message,
3689                            const struct GNUNET_TRANSPORT_ATS_Information *atsi)
3690 {
3691   struct ConnectedPeer *cp; 
3692   const struct MigrationStopMessage *msm;
3693
3694   msm = (const struct MigrationStopMessage*) message;
3695   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3696                                           &other->hashPubKey);
3697   if (cp == NULL)
3698     {
3699       GNUNET_break (0);
3700       return GNUNET_OK;
3701     }
3702   cp->migration_blocked = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration));
3703   return GNUNET_OK;
3704 }
3705
3706
3707
3708 /* **************************** P2P GET Handling ************************ */
3709
3710
3711 /**
3712  * Closure for 'check_duplicate_request_{peer,client}'.
3713  */
3714 struct CheckDuplicateRequestClosure
3715 {
3716   /**
3717    * The new request we should check if it already exists.
3718    */
3719   const struct PendingRequest *pr;
3720
3721   /**
3722    * Existing request found by the checker, NULL if none.
3723    */
3724   struct PendingRequest *have;
3725 };
3726
3727
3728 /**
3729  * Iterator over entries in the 'query_request_map' that
3730  * tries to see if we have the same request pending from
3731  * the same client already.
3732  *
3733  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
3734  * @param key current key code (query, ignored, must match)
3735  * @param value value in the hash map (a 'struct PendingRequest' 
3736  *              that already exists)
3737  * @return GNUNET_YES if we should continue to
3738  *         iterate (no match yet)
3739  *         GNUNET_NO if not (match found).
3740  */
3741 static int
3742 check_duplicate_request_client (void *cls,
3743                                 const GNUNET_HashCode * key,
3744                                 void *value)
3745 {
3746   struct CheckDuplicateRequestClosure *cdc = cls;
3747   struct PendingRequest *have = value;
3748
3749   if (have->client_request_list == NULL)
3750     return GNUNET_YES;
3751   if ( (cdc->pr->client_request_list->client_list->client == have->client_request_list->client_list->client) &&
3752        (cdc->pr != have) )
3753     {
3754       cdc->have = have;
3755       return GNUNET_NO;
3756     }
3757   return GNUNET_YES;
3758 }
3759
3760
3761 /**
3762  * We're processing (local) results for a search request
3763  * from another peer.  Pass applicable results to the
3764  * peer and if we are done either clean up (operation
3765  * complete) or forward to other peers (more results possible).
3766  *
3767  * @param cls our closure (struct LocalGetContext)
3768  * @param key key for the content
3769  * @param size number of bytes in data
3770  * @param data content stored
3771  * @param type type of the content
3772  * @param priority priority of the content
3773  * @param anonymity anonymity-level for the content
3774  * @param expiration expiration time for the content
3775  * @param uid unique identifier for the datum;
3776  *        maybe 0 if no unique identifier is available
3777  */
3778 static void
3779 process_local_reply (void *cls,
3780                      const GNUNET_HashCode * key,
3781                      size_t size,
3782                      const void *data,
3783                      enum GNUNET_BLOCK_Type type,
3784                      uint32_t priority,
3785                      uint32_t anonymity,
3786                      struct GNUNET_TIME_Absolute
3787                      expiration, 
3788                      uint64_t uid)
3789 {
3790   struct PendingRequest *pr = cls;
3791   struct ProcessReplyClosure prq;
3792   struct CheckDuplicateRequestClosure cdrc;
3793   GNUNET_HashCode query;
3794   unsigned int old_rf;
3795   
3796   if (NULL == key)
3797     {
3798 #if DEBUG_FS > 1
3799       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3800                   "Done processing local replies, forwarding request to other peers.\n");
3801 #endif
3802       pr->qe = NULL;
3803       if (pr->client_request_list != NULL)
3804         {
3805           GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
3806                                       GNUNET_YES);
3807           /* Figure out if this is a duplicate request and possibly
3808              merge 'struct PendingRequest' entries */
3809           cdrc.have = NULL;
3810           cdrc.pr = pr;
3811           GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3812                                                       &pr->query,
3813                                                       &check_duplicate_request_client,
3814                                                       &cdrc);
3815           if (cdrc.have != NULL)
3816             {
3817 #if DEBUG_FS
3818               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3819                           "Received request for block `%s' twice from client, will only request once.\n",
3820                           GNUNET_h2s (&pr->query));
3821 #endif
3822               
3823               destroy_pending_request (pr);
3824               return;
3825             }
3826         }
3827       if (pr->local_only == GNUNET_YES)
3828         {
3829           destroy_pending_request (pr);
3830           return;
3831         }
3832       /* no more results */
3833       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
3834         pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task,
3835                                              pr);      
3836       return;
3837     }
3838 #if DEBUG_FS
3839   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3840               "New local response to `%s' of type %u.\n",
3841               GNUNET_h2s (key),
3842               type);
3843 #endif
3844   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3845     {
3846 #if DEBUG_FS
3847       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3848                   "Found ONDEMAND block, performing on-demand encoding\n");
3849 #endif
3850       GNUNET_STATISTICS_update (stats,
3851                                 gettext_noop ("# on-demand blocks matched requests"),
3852                                 1,
3853                                 GNUNET_NO);
3854       if (GNUNET_OK != 
3855           GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, 
3856                                             anonymity, expiration, uid, 
3857                                             &process_local_reply,
3858                                             pr))
3859       if (pr->qe != NULL)
3860         {
3861           GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3862         }
3863       return;
3864     }
3865   old_rf = pr->results_found;
3866   memset (&prq, 0, sizeof (prq));
3867   prq.data = data;
3868   prq.expiration = expiration;
3869   prq.size = size;  
3870   if (GNUNET_OK != 
3871       GNUNET_BLOCK_get_key (block_ctx,
3872                             type,
3873                             data,
3874                             size,
3875                             &query))
3876     {
3877       GNUNET_break (0);
3878       GNUNET_DATASTORE_remove (dsh,
3879                                key,
3880                                size, data,
3881                                -1, -1, 
3882                                GNUNET_TIME_UNIT_FOREVER_REL,
3883                                NULL, NULL);
3884       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3885       return;
3886     }
3887   prq.type = type;
3888   prq.priority = priority;  
3889   prq.finished = GNUNET_NO;
3890   prq.request_found = GNUNET_NO;
3891   if ( (old_rf == 0) &&
3892        (pr->results_found == 0) )
3893     update_datastore_delays (pr->start_time);
3894   process_reply (&prq, key, pr);
3895   if (prq.finished == GNUNET_YES)
3896     return;
3897   if (pr->qe == NULL)
3898     return; /* done here */
3899   if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
3900     {
3901       pr->local_only = GNUNET_YES; /* do not forward */
3902       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3903       return;
3904     }
3905   if ( (pr->client_request_list == NULL) &&
3906        ( (GNUNET_YES == test_get_load_too_high (0)) ||
3907          (pr->results_found > 5 + 2 * pr->priority) ) )
3908     {
3909 #if DEBUG_FS > 2
3910       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3911                   "Load too high, done with request\n");
3912 #endif
3913       GNUNET_STATISTICS_update (stats,
3914                                 gettext_noop ("# processing result set cut short due to load"),
3915                                 1,
3916                                 GNUNET_NO);
3917       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3918       return;
3919     }
3920   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3921 }
3922
3923
3924 /**
3925  * We've received a request with the specified priority.  Bound it
3926  * according to how much we trust the given peer.
3927  * 
3928  * @param prio_in requested priority
3929  * @param cp the peer making the request
3930  * @return effective priority
3931  */
3932 static int32_t
3933 bound_priority (uint32_t prio_in,
3934                 struct ConnectedPeer *cp)
3935 {
3936 #define N ((double)128.0)
3937   uint32_t ret;
3938   double rret;
3939   int ld;
3940
3941   ld = test_get_load_too_high (0);
3942   if (ld == GNUNET_SYSERR)
3943     {
3944       GNUNET_STATISTICS_update (stats,
3945                                 gettext_noop ("# requests done for free (low load)"),
3946                                 1,
3947                                 GNUNET_NO);
3948       return 0; /* excess resources */
3949     }
3950   if (prio_in > INT32_MAX)
3951     prio_in = INT32_MAX;
3952   ret = - change_host_trust (cp, - (int) prio_in);
3953   if (ret > 0)
3954     {
3955       if (ret > current_priorities + N)
3956         rret = current_priorities + N;
3957       else
3958         rret = ret;
3959       current_priorities 
3960         = (current_priorities * (N-1) + rret)/N;
3961     }
3962   if ( (ld == GNUNET_YES) && (ret > 0) )
3963     {
3964       /* try with charging */
3965       ld = test_get_load_too_high (ret);
3966     }
3967   if (ld == GNUNET_YES)
3968     {
3969       GNUNET_STATISTICS_update (stats,
3970                                 gettext_noop ("# request dropped, priority insufficient"),
3971                                 1,
3972                                 GNUNET_NO);
3973       /* undo charge */
3974       change_host_trust (cp, (int) ret);
3975       return -1; /* not enough resources */
3976     }
3977   else
3978     {
3979       GNUNET_STATISTICS_update (stats,
3980                                 gettext_noop ("# requests done for a price (normal load)"),
3981                                 1,
3982                                 GNUNET_NO);
3983     }
3984 #undef N
3985   return ret;
3986 }
3987
3988
3989 /**
3990  * Iterator over entries in the 'query_request_map' that
3991  * tries to see if we have the same request pending from
3992  * the same peer already.
3993  *
3994  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
3995  * @param key current key code (query, ignored, must match)
3996  * @param value value in the hash map (a 'struct PendingRequest' 
3997  *              that already exists)
3998  * @return GNUNET_YES if we should continue to
3999  *         iterate (no match yet)
4000  *         GNUNET_NO if not (match found).
4001  */
4002 static int
4003 check_duplicate_request_peer (void *cls,
4004                               const GNUNET_HashCode * key,
4005                               void *value)
4006 {
4007   struct CheckDuplicateRequestClosure *cdc = cls;
4008   struct PendingRequest *have = value;
4009
4010   if (cdc->pr->target_pid == have->target_pid)
4011     {
4012       cdc->have = have;
4013       return GNUNET_NO;
4014     }
4015   return GNUNET_YES;
4016 }
4017
4018
4019 /**
4020  * Handle P2P "GET" request.
4021  *
4022  * @param cls closure, always NULL
4023  * @param other the other peer involved (sender or receiver, NULL
4024  *        for loopback messages where we are both sender and receiver)
4025  * @param message the actual message
4026  * @param atsi performance information
4027  * @return GNUNET_OK to keep the connection open,
4028  *         GNUNET_SYSERR to close it (signal serious error)
4029  */
4030 static int
4031 handle_p2p_get (void *cls,
4032                 const struct GNUNET_PeerIdentity *other,
4033                 const struct GNUNET_MessageHeader *message,
4034                 const struct GNUNET_TRANSPORT_ATS_Information *atsi)
4035 {
4036   struct PendingRequest *pr;
4037   struct ConnectedPeer *cp;
4038   struct ConnectedPeer *cps;
4039   struct CheckDuplicateRequestClosure cdc;
4040   struct GNUNET_TIME_Relative timeout;
4041   uint16_t msize;
4042   const struct GetMessage *gm;
4043   unsigned int bits;
4044   const GNUNET_HashCode *opt;
4045   uint32_t bm;
4046   size_t bfsize;
4047   uint32_t ttl_decrement;
4048   int32_t priority;
4049   enum GNUNET_BLOCK_Type type;
4050   int have_ns;
4051
4052   msize = ntohs(message->size);
4053   if (msize < sizeof (struct GetMessage))
4054     {
4055       GNUNET_break_op (0);
4056       return GNUNET_SYSERR;
4057     }
4058   gm = (const struct GetMessage*) message;
4059 #if DEBUG_FS
4060   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4061               "Received request for `%s'\n",
4062               GNUNET_h2s (&gm->query));
4063 #endif
4064   type = ntohl (gm->type);
4065   bm = ntohl (gm->hash_bitmap);
4066   bits = 0;
4067   while (bm > 0)
4068     {
4069       if (1 == (bm & 1))
4070         bits++;
4071       bm >>= 1;
4072     }
4073   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
4074     {
4075       GNUNET_break_op (0);
4076       return GNUNET_SYSERR;
4077     }  
4078   opt = (const GNUNET_HashCode*) &gm[1];
4079   bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (GNUNET_HashCode);
4080   /* bfsize must be power of 2, check! */
4081   if (0 != ( (bfsize - 1) & bfsize))
4082     {
4083       GNUNET_break_op (0);
4084       return GNUNET_SYSERR;
4085     }
4086   bm = ntohl (gm->hash_bitmap);
4087   bits = 0;
4088   cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
4089                                            &other->hashPubKey);
4090   if (NULL == cps)
4091     {
4092       /* peer must have just disconnected */
4093       GNUNET_STATISTICS_update (stats,
4094                                 gettext_noop ("# requests dropped due to initiator not being connected"),
4095                                 1,
4096                                 GNUNET_NO);
4097       return GNUNET_SYSERR;
4098     }
4099   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
4100     cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
4101                                             &opt[bits++]);
4102   else
4103     cp = cps;
4104   if (cp == NULL)
4105     {
4106 #if DEBUG_FS
4107       if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
4108         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4109                     "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
4110                     GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
4111       
4112       else
4113         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4114                     "Failed to find peer `%4s' in connection set. Dropping query.\n",
4115                     GNUNET_i2s (other));
4116 #endif
4117       GNUNET_STATISTICS_update (stats,
4118                                 gettext_noop ("# requests dropped due to missing reverse route"),
4119                                 1,
4120                                 GNUNET_NO);
4121      /* FIXME: try connect? */
4122       return GNUNET_OK;
4123     }
4124   /* note that we can really only check load here since otherwise
4125      peers could find out that we are overloaded by not being
4126      disconnected after sending us a malformed query... */
4127   priority = bound_priority (ntohl (gm->priority), cps);
4128   if (priority < 0)
4129     {
4130 #if DEBUG_FS
4131       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4132                   "Dropping query from `%s', this peer is too busy.\n",
4133                   GNUNET_i2s (other));
4134 #endif
4135       return GNUNET_OK;
4136     }
4137 #if DEBUG_FS 
4138   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4139               "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
4140               GNUNET_h2s (&gm->query),
4141               (unsigned int) type,
4142               GNUNET_i2s (other),
4143               (unsigned int) bm);
4144 #endif
4145   have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
4146   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
4147                       (have_ns ? sizeof(GNUNET_HashCode) : 0));
4148   if (have_ns)
4149     {
4150       pr->namespace = (GNUNET_HashCode*) &pr[1];
4151       memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
4152     }
4153   if ( (GNUNET_LOAD_get_load (cp->transmission_delay) > 3 * (1 + priority)) ||
4154        (GNUNET_LOAD_get_average (cp->transmission_delay) > 
4155         GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value * 2 + GNUNET_LOAD_get_average (rt_entry_lifetime)) )
4156     {
4157       /* don't have BW to send to peer, or would likely take longer than we have for it,
4158          so at best indirect the query */
4159       priority = 0;
4160       pr->forward_only = GNUNET_YES;
4161     }
4162   pr->type = type;
4163   pr->mingle = ntohl (gm->filter_mutator);
4164   if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
4165     pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
4166   pr->anonymity_level = 1;
4167   pr->priority = (uint32_t) priority;
4168   pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
4169   pr->query = gm->query;
4170   /* decrement ttl (always) */
4171   ttl_decrement = 2 * TTL_DECREMENT +
4172     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
4173                               TTL_DECREMENT);
4174   if ( (pr->ttl < 0) &&
4175        (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
4176     {
4177 #if DEBUG_FS
4178       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4179                   "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
4180                   GNUNET_i2s (other),
4181                   pr->ttl,
4182                   ttl_decrement);
4183 #endif
4184       GNUNET_STATISTICS_update (stats,
4185                                 gettext_noop ("# requests dropped due TTL underflow"),
4186                                 1,
4187                                 GNUNET_NO);
4188       /* integer underflow => drop (should be very rare)! */      
4189       GNUNET_free (pr);
4190       return GNUNET_OK;
4191     } 
4192   pr->ttl -= ttl_decrement;
4193   pr->start_time = GNUNET_TIME_absolute_get ();
4194
4195   /* get bloom filter */
4196   if (bfsize > 0)
4197     {
4198       pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
4199                                                   bfsize,
4200                                                   BLOOMFILTER_K);
4201       pr->bf_size = bfsize;
4202     }
4203   cdc.have = NULL;
4204   cdc.pr = pr;
4205   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
4206                                               &gm->query,
4207                                               &check_duplicate_request_peer,
4208                                               &cdc);
4209   if (cdc.have != NULL)
4210     {
4211       if (cdc.have->start_time.abs_value + cdc.have->ttl >=
4212           pr->start_time.abs_value + pr->ttl)
4213         {
4214           /* existing request has higher TTL, drop new one! */
4215           cdc.have->priority += pr->priority;
4216           destroy_pending_request (pr);
4217 #if DEBUG_FS
4218           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4219                       "Have existing request with higher TTL, dropping new request.\n",
4220                       GNUNET_i2s (other));
4221 #endif
4222           GNUNET_STATISTICS_update (stats,
4223                                     gettext_noop ("# requests dropped due to higher-TTL request"),
4224                                     1,
4225                                     GNUNET_NO);
4226           return GNUNET_OK;
4227         }
4228       else
4229         {
4230           /* existing request has lower TTL, drop old one! */
4231           pr->priority += cdc.have->priority;
4232           /* Possible optimization: if we have applicable pending
4233              replies in 'cdc.have', we might want to move those over
4234              (this is a really rare special-case, so it is not clear
4235              that this would be worth it) */
4236           destroy_pending_request (cdc.have);
4237           /* keep processing 'pr'! */
4238         }
4239     }
4240
4241   pr->cp = cp;
4242   GNUNET_break (GNUNET_OK ==
4243                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
4244                                                    &gm->query,
4245                                                    pr,
4246                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4247   GNUNET_break (GNUNET_OK ==
4248                 GNUNET_CONTAINER_multihashmap_put (peer_request_map,
4249                                                    &other->hashPubKey,
4250                                                    pr,
4251                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4252   
4253   pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
4254                                             pr,
4255                                             pr->start_time.abs_value + pr->ttl);
4256
4257   GNUNET_STATISTICS_update (stats,
4258                             gettext_noop ("# P2P searches received"),
4259                             1,
4260                             GNUNET_NO);
4261   GNUNET_STATISTICS_update (stats,
4262                             gettext_noop ("# P2P searches active"),
4263                             1,
4264                             GNUNET_NO);
4265
4266   /* calculate change in traffic preference */
4267   cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
4268   /* process locally */
4269   if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
4270     type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
4271   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
4272                                            (pr->priority + 1)); 
4273   if (GNUNET_YES != pr->forward_only)
4274     {
4275 #if DEBUG_FS
4276       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4277                   "Handing request for `%s' to datastore\n",
4278                   GNUNET_h2s (&gm->query));
4279 #endif
4280       pr->qe = GNUNET_DATASTORE_get (dsh,
4281                                      &gm->query,
4282                                      type,                             
4283                                      pr->priority + 1,
4284                                      MAX_DATASTORE_QUEUE,                                
4285                                      timeout,
4286                                      &process_local_reply,
4287                                      pr);
4288       if (NULL == pr->qe)
4289         {
4290           GNUNET_STATISTICS_update (stats,
4291                                     gettext_noop ("# requests dropped by datastore (queue length limit)"),
4292                                     1,
4293                                     GNUNET_NO);
4294         }
4295     }
4296   else
4297     {
4298       GNUNET_STATISTICS_update (stats,
4299                                 gettext_noop ("# requests forwarded due to high load"),
4300                                 1,
4301                                 GNUNET_NO);
4302     }
4303
4304   /* Are multiple results possible (and did we look locally)?  If so, start processing remotely now! */
4305   switch (pr->type)
4306     {
4307     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
4308     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
4309       /* only one result, wait for datastore */
4310       if (GNUNET_YES != pr->forward_only)
4311         {
4312           GNUNET_STATISTICS_update (stats,
4313                                     gettext_noop ("# requests not instantly forwarded (waiting for datastore)"),
4314                                     1,
4315                                     GNUNET_NO);
4316           break;
4317         }
4318     default:
4319       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
4320         pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task,
4321                                              pr);
4322     }
4323
4324   /* make sure we don't track too many requests */
4325   if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
4326     {
4327       pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
4328       GNUNET_assert (pr != NULL);
4329       destroy_pending_request (pr);
4330     }
4331   return GNUNET_OK;
4332 }
4333
4334
4335 /* **************************** CS GET Handling ************************ */
4336
4337
4338 /**
4339  * Handle START_SEARCH-message (search request from client).
4340  *
4341  * @param cls closure
4342  * @param client identification of the client
4343  * @param message the actual message
4344  */
4345 static void
4346 handle_start_search (void *cls,
4347                      struct GNUNET_SERVER_Client *client,
4348                      const struct GNUNET_MessageHeader *message)
4349 {
4350   static GNUNET_HashCode all_zeros;
4351   const struct SearchMessage *sm;
4352   struct ClientList *cl;
4353   struct ClientRequestList *crl;
4354   struct PendingRequest *pr;
4355   uint16_t msize;
4356   unsigned int sc;
4357   enum GNUNET_BLOCK_Type type;
4358
4359   msize = ntohs (message->size);
4360   if ( (msize < sizeof (struct SearchMessage)) ||
4361        (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
4362     {
4363       GNUNET_break (0);
4364       GNUNET_SERVER_receive_done (client,
4365                                   GNUNET_SYSERR);
4366       return;
4367     }
4368   GNUNET_STATISTICS_update (stats,
4369                             gettext_noop ("# client searches received"),
4370                             1,
4371                             GNUNET_NO);
4372   sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
4373   sm = (const struct SearchMessage*) message;
4374   type = ntohl (sm->type);
4375 #if DEBUG_FS
4376   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4377               "Received request for `%s' of type %u from local client\n",
4378               GNUNET_h2s (&sm->query),
4379               (unsigned int) type);
4380 #endif
4381   cl = client_list;
4382   while ( (cl != NULL) &&
4383           (cl->client != client) )
4384     cl = cl->next;
4385   if (cl == NULL)
4386     {
4387       cl = GNUNET_malloc (sizeof (struct ClientList));
4388       cl->client = client;
4389       GNUNET_SERVER_client_keep (client);
4390       cl->next = client_list;
4391       client_list = cl;
4392     }
4393   /* detect duplicate KBLOCK requests */
4394   if ( (type == GNUNET_BLOCK_TYPE_FS_KBLOCK) ||
4395        (type == GNUNET_BLOCK_TYPE_FS_NBLOCK) ||
4396        (type == GNUNET_BLOCK_TYPE_ANY) )
4397     {
4398       crl = cl->rl_head;
4399       while ( (crl != NULL) &&
4400               ( (0 != memcmp (&crl->req->query,
4401                               &sm->query,
4402                               sizeof (GNUNET_HashCode))) ||
4403                 (crl->req->type != type) ) )
4404         crl = crl->next;
4405       if (crl != NULL)  
4406         { 
4407 #if DEBUG_FS
4408           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4409                       "Have existing request, merging content-seen lists.\n");
4410 #endif
4411           pr = crl->req;
4412           /* Duplicate request (used to send long list of
4413              known/blocked results); merge 'pr->replies_seen'
4414              and update bloom filter */
4415           GNUNET_array_grow (pr->replies_seen,
4416                              pr->replies_seen_size,
4417                              pr->replies_seen_off + sc);
4418           memcpy (&pr->replies_seen[pr->replies_seen_off],
4419                   &sm[1],
4420                   sc * sizeof (GNUNET_HashCode));
4421           pr->replies_seen_off += sc;
4422           refresh_bloomfilter (pr);
4423           GNUNET_STATISTICS_update (stats,
4424                                     gettext_noop ("# client searches updated (merged content seen list)"),
4425                                     1,
4426                                     GNUNET_NO);
4427           GNUNET_SERVER_receive_done (client,
4428                                       GNUNET_OK);
4429           return;
4430         }
4431     }
4432   GNUNET_STATISTICS_update (stats,
4433                             gettext_noop ("# client searches active"),
4434                             1,
4435                             GNUNET_NO);
4436   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
4437                       ((type == GNUNET_BLOCK_TYPE_FS_SBLOCK) ? sizeof(GNUNET_HashCode) : 0));
4438   crl = GNUNET_malloc (sizeof (struct ClientRequestList));
4439   memset (crl, 0, sizeof (struct ClientRequestList));
4440   crl->client_list = cl;
4441   GNUNET_CONTAINER_DLL_insert (cl->rl_head,
4442                                cl->rl_tail,
4443                                crl);  
4444   crl->req = pr;
4445   pr->type = type;
4446   pr->client_request_list = crl;
4447   GNUNET_array_grow (pr->replies_seen,
4448                      pr->replies_seen_size,
4449                      sc);
4450   memcpy (pr->replies_seen,
4451           &sm[1],
4452           sc * sizeof (GNUNET_HashCode));
4453   pr->replies_seen_off = sc;
4454   pr->anonymity_level = ntohl (sm->anonymity_level); 
4455   pr->start_time = GNUNET_TIME_absolute_get ();
4456   refresh_bloomfilter (pr);
4457   pr->query = sm->query;
4458   if (0 == (1 & ntohl (sm->options)))
4459     pr->local_only = GNUNET_NO;
4460   else
4461     pr->local_only = GNUNET_YES;
4462   switch (type)
4463     {
4464     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
4465     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
4466       if (0 != memcmp (&sm->target,
4467                        &all_zeros,
4468                        sizeof (GNUNET_HashCode)))
4469         pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target);
4470       break;
4471     case GNUNET_BLOCK_TYPE_FS_SBLOCK:
4472       pr->namespace = (GNUNET_HashCode*) &pr[1];
4473       memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode));
4474       break;
4475     default:
4476       break;
4477     }
4478   GNUNET_break (GNUNET_OK ==
4479                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
4480                                                    &sm->query,
4481                                                    pr,
4482                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4483   if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
4484     type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */
4485   pr->qe = GNUNET_DATASTORE_get (dsh,
4486                                  &sm->query,
4487                                  type,
4488                                  -3, -1,
4489                                  GNUNET_CONSTANTS_SERVICE_TIMEOUT,                             
4490                                  &process_local_reply,
4491                                  pr);
4492 }
4493
4494
4495 /* **************************** Startup ************************ */
4496
4497 /**
4498  * Process fs requests.
4499  *
4500  * @param server the initialized server
4501  * @param c configuration to use
4502  */
4503 static int
4504 main_init (struct GNUNET_SERVER_Handle *server,
4505            const struct GNUNET_CONFIGURATION_Handle *c)
4506 {
4507   static const struct GNUNET_CORE_MessageHandler p2p_handlers[] =
4508     {
4509       { &handle_p2p_get, 
4510         GNUNET_MESSAGE_TYPE_FS_GET, 0 },
4511       { &handle_p2p_put, 
4512         GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
4513       { &handle_p2p_migration_stop, 
4514         GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP,
4515         sizeof (struct MigrationStopMessage) },
4516       { NULL, 0, 0 }
4517     };
4518   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
4519     {&GNUNET_FS_handle_index_start, NULL, 
4520      GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
4521     {&GNUNET_FS_handle_index_list_get, NULL, 
4522      GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
4523     {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
4524      sizeof (struct UnindexMessage) },
4525     {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
4526      0 },
4527     {NULL, NULL, 0, 0}
4528   };
4529   unsigned long long enc = 128;
4530
4531   cfg = c;
4532   stats = GNUNET_STATISTICS_create ("fs", cfg);
4533   min_migration_delay = GNUNET_TIME_UNIT_SECONDS;
4534   if ( (GNUNET_OK !=
4535         GNUNET_CONFIGURATION_get_value_number (cfg,
4536                                                "fs",
4537                                                "MAX_PENDING_REQUESTS",
4538                                                &max_pending_requests)) ||
4539        (GNUNET_OK !=
4540         GNUNET_CONFIGURATION_get_value_number (cfg,
4541                                                "fs",
4542                                                "EXPECTED_NEIGHBOUR_COUNT",
4543                                                &enc)) ||
4544        (GNUNET_OK != 
4545         GNUNET_CONFIGURATION_get_value_time (cfg,
4546                                              "fs",
4547                                              "MIN_MIGRATION_DELAY",
4548                                              &min_migration_delay)) )
4549     {
4550       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4551                   _("Configuration fails to specify certain parameters, assuming default values."));
4552     }
4553   connected_peers = GNUNET_CONTAINER_multihashmap_create (enc); 
4554   query_request_map = GNUNET_CONTAINER_multihashmap_create (max_pending_requests);
4555   rt_entry_lifetime = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_FOREVER_REL);
4556   peer_request_map = GNUNET_CONTAINER_multihashmap_create (enc);
4557   requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
4558   core = GNUNET_CORE_connect (cfg,
4559                               1, /* larger? */
4560                               NULL,
4561                               NULL,
4562                               &peer_connect_handler,
4563                               &peer_disconnect_handler,
4564                               &peer_status_handler,
4565                               NULL, GNUNET_NO,
4566                               NULL, GNUNET_NO,
4567                               p2p_handlers);
4568   if (NULL == core)
4569     {
4570       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
4571                   _("Failed to connect to `%s' service.\n"),
4572                   "core");
4573       GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
4574       connected_peers = NULL;
4575       GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
4576       query_request_map = NULL;
4577       GNUNET_LOAD_value_free (rt_entry_lifetime);
4578       rt_entry_lifetime = NULL;
4579       GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
4580       requests_by_expiration_heap = NULL;
4581       GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
4582       peer_request_map = NULL;
4583       if (dsh != NULL)
4584         {
4585           GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4586           dsh = NULL;
4587         }
4588       return GNUNET_SYSERR;
4589     }
4590   /* FIXME: distinguish between sending and storing in options? */
4591   if (active_migration) 
4592     {
4593       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4594                   _("Content migration is enabled, will start to gather data\n"));
4595       consider_migration_gathering ();
4596     }
4597   consider_dht_put_gathering (NULL);
4598   GNUNET_SERVER_disconnect_notify (server, 
4599                                    &handle_client_disconnect,
4600                                    NULL);
4601   GNUNET_assert (GNUNET_OK ==
4602                  GNUNET_CONFIGURATION_get_value_filename (cfg,
4603                                                           "fs",
4604                                                           "TRUST",
4605                                                           &trustDirectory));
4606   GNUNET_DISK_directory_create (trustDirectory);
4607   GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_HIGH,
4608                                       &cron_flush_trust, NULL);
4609
4610
4611   GNUNET_SERVER_add_handlers (server, handlers);
4612   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
4613                                 &shutdown_task,
4614                                 NULL);
4615   return GNUNET_OK;
4616 }
4617
4618
4619 /**
4620  * Process fs requests.
4621  *
4622  * @param cls closure
4623  * @param server the initialized server
4624  * @param cfg configuration to use
4625  */
4626 static void
4627 run (void *cls,
4628      struct GNUNET_SERVER_Handle *server,
4629      const struct GNUNET_CONFIGURATION_Handle *cfg)
4630 {
4631   active_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
4632                                                            "FS",
4633                                                            "ACTIVEMIGRATION");
4634   dsh = GNUNET_DATASTORE_connect (cfg);
4635   if (dsh == NULL)
4636     {
4637       GNUNET_SCHEDULER_shutdown ();
4638       return;
4639     }
4640   datastore_get_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
4641   datastore_put_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
4642   block_cfg = GNUNET_CONFIGURATION_create ();
4643   GNUNET_CONFIGURATION_set_value_string (block_cfg,
4644                                          "block",
4645                                          "PLUGINS",
4646                                          "fs");
4647   block_ctx = GNUNET_BLOCK_context_create (block_cfg);
4648   GNUNET_assert (NULL != block_ctx);
4649   dht_handle = GNUNET_DHT_connect (cfg,
4650                                    FS_DHT_HT_SIZE);
4651   if ( (GNUNET_OK != GNUNET_FS_indexing_init (cfg, dsh)) ||
4652        (GNUNET_OK != main_init (server, cfg)) )
4653     {    
4654       GNUNET_SCHEDULER_shutdown ();
4655       GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4656       dsh = NULL;
4657       GNUNET_DHT_disconnect (dht_handle);
4658       dht_handle = NULL;
4659       GNUNET_BLOCK_context_destroy (block_ctx);
4660       block_ctx = NULL;
4661       GNUNET_CONFIGURATION_destroy (block_cfg);
4662       block_cfg = NULL;
4663       GNUNET_LOAD_value_free (datastore_get_load);
4664       datastore_get_load = NULL;
4665       GNUNET_LOAD_value_free (datastore_put_load);
4666       datastore_put_load = NULL;
4667       return;   
4668     }
4669 }
4670
4671
4672 /**
4673  * The main function for the fs service.
4674  *
4675  * @param argc number of arguments from the command line
4676  * @param argv command line arguments
4677  * @return 0 ok, 1 on error
4678  */
4679 int
4680 main (int argc, char *const *argv)
4681 {
4682   return (GNUNET_OK ==
4683           GNUNET_SERVICE_run (argc,
4684                               argv,
4685                               "fs",
4686                               GNUNET_SERVICE_OPTION_NONE,
4687                               &run, NULL)) ? 0 : 1;
4688 }
4689
4690 /* end of gnunet-service-fs.c */