more testcases, allow location uris in fs_download
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs.c
23  * @brief gnunet anonymity protocol implementation
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - more statistics
28  */
29 #include "platform.h"
30 #include <float.h>
31 #include "gnunet_constants.h"
32 #include "gnunet_core_service.h"
33 #include "gnunet_dht_service.h"
34 #include "gnunet_datastore_service.h"
35 #include "gnunet_load_lib.h"
36 #include "gnunet_peer_lib.h"
37 #include "gnunet_protocols.h"
38 #include "gnunet_signatures.h"
39 #include "gnunet_statistics_service.h"
40 #include "gnunet_util_lib.h"
41 #include "gnunet-service-fs_indexing.h"
42 #include "fs.h"
43
44 #define DEBUG_FS GNUNET_NO
45
46 /**
47  * Should we introduce random latency in processing?  Required for proper
48  * implementation of GAP, but can be disabled for performance evaluation of
49  * the basic routing algorithm.
50  *
51  * Note that with delays enabled, performance can be significantly lower
52  * (several orders of magnitude in 2-peer test runs); if you want to
53  * measure throughput of other components, set this to NO.  Also, you
54  * might want to consider changing 'RETRY_PROBABILITY_INV' to 1 for
55  * a rather wasteful mode of operation (that might still get the highest
56  * throughput overall).
57  *
58  * Performance measurements (for 50 MB file, 2 peers):
59  *
60  * - Without delays: 3300 kb/s
61  * - With    delays:  101 kb/s
62  */
63 #define SUPPORT_DELAYS GNUNET_NO
64
65 /**
66  * Size for the hash map for DHT requests from the FS
67  * service.  Should be about the number of concurrent
68  * DHT requests we plan to make.
69  */
70 #define FS_DHT_HT_SIZE 1024
71
72 /**
73  * At what frequency should our datastore load decrease
74  * automatically (since if we don't use it, clearly the
75  * load must be going down).
76  */
77 #define DATASTORE_LOAD_AUTODECLINE GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 250)
78
79 /**
80  * How often do we flush trust values to disk?
81  */
82 #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
83
84 /**
85  * How often do we at most PUT content into the DHT?
86  */
87 #define MAX_DHT_PUT_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
88
89 /**
90  * Inverse of the probability that we will submit the same query
91  * to the same peer again.  If the same peer already got the query
92  * repeatedly recently, the probability is multiplied by the inverse
93  * of this number each time.  Note that we only try about every TTL_DECREMENT/2
94  * plus MAX_CORK_DELAY (so roughly every 3.5s).
95  *
96  * Note that this factor is a key influence to performance in small
97  * networks (especially test networks of 2 peers) because if there is
98  * only a single peer with the data, this value will determine how
99  * soon we might re-try.  For example, a value of 3 can result in 
100  * 1.7 MB/s transfer rates for a 10 MB file when a value of 1 would
101  * give us 5 MB/s.  OTOH, obviously re-trying the same peer can be
102  * rather inefficient in larger networks, hence picking 1 is in 
103  * general not the best choice.
104  *
105  * Performance measurements (for 50 MB file, 2 peers, no delays):
106  *
107  * - 1: 3300 kb/s (consistently)
108  * - 3: 2046 kb/s, 754 kb/s, 3490 kb/s
109  * - 5:  759 kb/s, 968 kb/s, 1160 kb/s
110  *
111  * Note that this does NOT mean that the value should be 1 since
112  * a 2-peer network is far from representative here (and this fails
113  * to take into consideration bandwidth wasted by repeatedly 
114  * sending queries to peers that don't have the content).  Also,
115  * it is expected that higher values lead to more inconsistent
116  * measurements since this only affects lost messages towards the
117  * end of the download.
118  *
119  * Finally, we should probably consider changing this and making
120  * it dependent on the number of connected peers or a related
121  * metric (bad magic constants...).
122  */
123 #define RETRY_PROBABILITY_INV 1
124
125 /**
126  * What is the maximum delay for a P2P FS message (in our interaction
127  * with core)?  FS-internal delays are another story.  The value is
128  * chosen based on the 32k block size.  Given that peers typcially
129  * have at least 1 kb/s bandwidth, 45s waits give us a chance to
130  * transmit one message even to the lowest-bandwidth peers.
131  */
132 #define MAX_TRANSMIT_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 45)
133
134 /**
135  * Maximum number of requests (from other peers, overall) that we're
136  * willing to have pending at any given point in time.  Can be changed
137  * via the configuration file (32k is just the default).
138  */
139 static unsigned long long max_pending_requests = (32 * 1024);
140
141
142 /**
143  * Information we keep for each pending reply.  The
144  * actual message follows at the end of this struct.
145  */
146 struct PendingMessage;
147
148 /**
149  * Function called upon completion of a transmission.
150  *
151  * @param cls closure
152  * @param pid ID of receiving peer, 0 on transmission error
153  */
154 typedef void (*TransmissionContinuation)(void * cls, 
155                                          GNUNET_PEER_Id tpid);
156
157
158 /**
159  * Information we keep for each pending message (GET/PUT).  The
160  * actual message follows at the end of this struct.
161  */
162 struct PendingMessage
163 {
164   /**
165    * This is a doubly-linked list of messages to the same peer.
166    */
167   struct PendingMessage *next;
168
169   /**
170    * This is a doubly-linked list of messages to the same peer.
171    */
172   struct PendingMessage *prev;
173
174   /**
175    * Entry in pending message list for this pending message.
176    */ 
177   struct PendingMessageList *pml;  
178
179   /**
180    * Function to call immediately once we have transmitted this
181    * message.
182    */
183   TransmissionContinuation cont;
184
185   /**
186    * Closure for cont.
187    */
188   void *cont_cls;
189
190   /**
191    * Do not transmit this pending message until this deadline.
192    */
193   struct GNUNET_TIME_Absolute delay_until;
194
195   /**
196    * Size of the reply; actual reply message follows
197    * at the end of this struct.
198    */
199   size_t msize;
200   
201   /**
202    * How important is this message for us?
203    */
204   uint32_t priority;
205  
206 };
207
208
209 /**
210  * Information about a peer that we are connected to.
211  * We track data that is useful for determining which
212  * peers should receive our requests.  We also keep
213  * a list of messages to transmit to this peer.
214  */
215 struct ConnectedPeer
216 {
217
218   /**
219    * List of the last clients for which this peer successfully
220    * answered a query.
221    */
222   struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
223
224   /**
225    * List of the last PIDs for which
226    * this peer successfully answered a query;
227    * We use 0 to indicate no successful reply.
228    */
229   GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
230
231   /**
232    * Average delay between sending the peer a request and
233    * getting a reply (only calculated over the requests for
234    * which we actually got a reply).   Calculated
235    * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
236    */ 
237   struct GNUNET_TIME_Relative avg_delay;
238
239   /**
240    * Point in time until which this peer does not want us to migrate content
241    * to it.
242    */
243   struct GNUNET_TIME_Absolute migration_blocked;
244
245   /**
246    * Time until when we blocked this peer from migrating
247    * data to us.
248    */
249   struct GNUNET_TIME_Absolute last_migration_block;
250
251   /**
252    * Transmission times for the last MAX_QUEUE_PER_PEER
253    * requests for this peer.  Used as a ring buffer, current
254    * offset is stored in 'last_request_times_off'.  If the
255    * oldest entry is more recent than the 'avg_delay', we should
256    * not send any more requests right now.
257    */
258   struct GNUNET_TIME_Absolute last_request_times[MAX_QUEUE_PER_PEER];
259
260   /**
261    * Handle for an active request for transmission to this
262    * peer, or NULL.
263    */
264   struct GNUNET_CORE_TransmitHandle *cth;
265
266   /**
267    * Messages (replies, queries, content migration) we would like to
268    * send to this peer in the near future.  Sorted by priority, head.
269    */
270   struct PendingMessage *pending_messages_head;
271
272   /**
273    * Messages (replies, queries, content migration) we would like to
274    * send to this peer in the near future.  Sorted by priority, tail.
275    */
276   struct PendingMessage *pending_messages_tail;
277
278   /**
279    * How long does it typically take for us to transmit a message
280    * to this peer?  (delay between the request being issued and
281    * the callback being invoked).
282    */
283   struct GNUNET_LOAD_Value *transmission_delay;
284
285   /**
286    * Time when the last transmission request was issued.
287    */
288   struct GNUNET_TIME_Absolute last_transmission_request_start;
289
290   /**
291    * ID of delay task for scheduling transmission.
292    */
293   GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task;
294
295   /**
296    * Average priority of successful replies.  Calculated
297    * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
298    */
299   double avg_priority;
300
301   /**
302    * Increase in traffic preference still to be submitted
303    * to the core service for this peer.
304    */
305   uint64_t inc_preference;
306
307   /**
308    * Trust rating for this peer
309    */
310   uint32_t trust;
311
312   /**
313    * Trust rating for this peer on disk.
314    */
315   uint32_t disk_trust;
316
317   /**
318    * The peer's identity.
319    */
320   GNUNET_PEER_Id pid;  
321
322   /**
323    * Size of the linked list of 'pending_messages'.
324    */
325   unsigned int pending_requests;
326
327   /**
328    * Which offset in "last_p2p_replies" will be updated next?
329    * (we go round-robin).
330    */
331   unsigned int last_p2p_replies_woff;
332
333   /**
334    * Which offset in "last_client_replies" will be updated next?
335    * (we go round-robin).
336    */
337   unsigned int last_client_replies_woff;
338
339   /**
340    * Current offset into 'last_request_times' ring buffer.
341    */
342   unsigned int last_request_times_off;
343
344 };
345
346
347 /**
348  * Information we keep for each pending request.  We should try to
349  * keep this struct as small as possible since its memory consumption
350  * is key to how many requests we can have pending at once.
351  */
352 struct PendingRequest;
353
354
355 /**
356  * Doubly-linked list of requests we are performing
357  * on behalf of the same client.
358  */
359 struct ClientRequestList
360 {
361
362   /**
363    * This is a doubly-linked list.
364    */
365   struct ClientRequestList *next;
366
367   /**
368    * This is a doubly-linked list.
369    */
370   struct ClientRequestList *prev;
371
372   /**
373    * Request this entry represents.
374    */
375   struct PendingRequest *req;
376
377   /**
378    * Client list this request belongs to.
379    */
380   struct ClientList *client_list;
381
382 };
383
384
385 /**
386  * Replies to be transmitted to the client.  The actual
387  * response message is allocated after this struct.
388  */
389 struct ClientResponseMessage
390 {
391   /**
392    * This is a doubly-linked list.
393    */
394   struct ClientResponseMessage *next;
395
396   /**
397    * This is a doubly-linked list.
398    */
399   struct ClientResponseMessage *prev;
400
401   /**
402    * Client list entry this response belongs to.
403    */
404   struct ClientList *client_list;
405
406   /**
407    * Number of bytes in the response.
408    */
409   size_t msize;
410 };
411
412
413 /**
414  * Linked list of clients we are performing requests
415  * for right now.
416  */
417 struct ClientList
418 {
419   /**
420    * This is a linked list.
421    */
422   struct ClientList *next;
423
424   /**
425    * ID of a client making a request, NULL if this entry is for a
426    * peer.
427    */
428   struct GNUNET_SERVER_Client *client;
429
430   /**
431    * Head of list of requests performed on behalf
432    * of this client right now.
433    */
434   struct ClientRequestList *rl_head;
435
436   /**
437    * Tail of list of requests performed on behalf
438    * of this client right now.
439    */
440   struct ClientRequestList *rl_tail;
441
442   /**
443    * Head of linked list of responses.
444    */
445   struct ClientResponseMessage *res_head;
446
447   /**
448    * Tail of linked list of responses.
449    */
450   struct ClientResponseMessage *res_tail;
451
452   /**
453    * Context for sending replies.
454    */
455   struct GNUNET_CONNECTION_TransmitHandle *th;
456
457 };
458
459
460 /**
461  * Information about a peer that we have forwarded this
462  * request to already.  
463  */
464 struct UsedTargetEntry
465 {
466   /**
467    * What was the last time we have transmitted this request to this
468    * peer?
469    */
470   struct GNUNET_TIME_Absolute last_request_time;
471
472   /**
473    * How often have we transmitted this request to this peer?
474    */
475   unsigned int num_requests;
476
477   /**
478    * PID of the target peer.
479    */
480   GNUNET_PEER_Id pid;
481
482 };
483
484
485
486
487
488 /**
489  * Doubly-linked list of messages we are performing
490  * due to a pending request.
491  */
492 struct PendingMessageList
493 {
494
495   /**
496    * This is a doubly-linked list of messages on behalf of the same request.
497    */
498   struct PendingMessageList *next;
499
500   /**
501    * This is a doubly-linked list of messages on behalf of the same request.
502    */
503   struct PendingMessageList *prev;
504
505   /**
506    * Message this entry represents.
507    */
508   struct PendingMessage *pm;
509
510   /**
511    * Request this entry belongs to.
512    */
513   struct PendingRequest *req;
514
515   /**
516    * Peer this message is targeted for.
517    */
518   struct ConnectedPeer *target;
519
520 };
521
522
523 /**
524  * Information we keep for each pending request.  We should try to
525  * keep this struct as small as possible since its memory consumption
526  * is key to how many requests we can have pending at once.
527  */
528 struct PendingRequest
529 {
530
531   /**
532    * If this request was made by a client, this is our entry in the
533    * client request list; otherwise NULL.
534    */
535   struct ClientRequestList *client_request_list;
536
537   /**
538    * Entry of peer responsible for this entry (if this request
539    * was made by a peer).
540    */
541   struct ConnectedPeer *cp;
542
543   /**
544    * If this is a namespace query, pointer to the hash of the public
545    * key of the namespace; otherwise NULL.  Pointer will be to the 
546    * end of this struct (so no need to free it).
547    */
548   const GNUNET_HashCode *namespace;
549
550   /**
551    * Bloomfilter we use to filter out replies that we don't care about
552    * (anymore).  NULL as long as we are interested in all replies.
553    */
554   struct GNUNET_CONTAINER_BloomFilter *bf;
555
556   /**
557    * Context of our GNUNET_CORE_peer_change_preference call.
558    */
559   struct GNUNET_CORE_InformationRequestContext *irc;
560
561   /**
562    * Reference to DHT get operation for this request (or NULL).
563    */
564   struct GNUNET_DHT_GetHandle *dht_get;
565
566   /**
567    * Hash code of all replies that we have seen so far (only valid
568    * if client is not NULL since we only track replies like this for
569    * our own clients).
570    */
571   GNUNET_HashCode *replies_seen;
572
573   /**
574    * Node in the heap representing this entry; NULL
575    * if we have no heap node.
576    */
577   struct GNUNET_CONTAINER_HeapNode *hnode;
578
579   /**
580    * Head of list of messages being performed on behalf of this
581    * request.
582    */
583   struct PendingMessageList *pending_head;
584
585   /**
586    * Tail of list of messages being performed on behalf of this
587    * request.
588    */
589   struct PendingMessageList *pending_tail;
590
591   /**
592    * When did we first see this request (form this peer), or, if our
593    * client is initiating, when did we last initiate a search?
594    */
595   struct GNUNET_TIME_Absolute start_time;
596
597   /**
598    * The query that this request is for.
599    */
600   GNUNET_HashCode query;
601
602   /**
603    * The task responsible for transmitting queries
604    * for this request.
605    */
606   GNUNET_SCHEDULER_TaskIdentifier task;
607
608   /**
609    * (Interned) Peer identifier that identifies a preferred target
610    * for requests.
611    */
612   GNUNET_PEER_Id target_pid;
613
614   /**
615    * (Interned) Peer identifiers of peers that have already
616    * received our query for this content.
617    */
618   struct UsedTargetEntry *used_targets;
619   
620   /**
621    * Our entry in the queue (non-NULL while we wait for our
622    * turn to interact with the local database).
623    */
624   struct GNUNET_DATASTORE_QueueEntry *qe;
625
626   /**
627    * Size of the 'bf' (in bytes).
628    */
629   size_t bf_size;
630
631   /**
632    * Desired anonymity level; only valid for requests from a local client.
633    */
634   uint32_t anonymity_level;
635
636   /**
637    * How many entries in "used_targets" are actually valid?
638    */
639   unsigned int used_targets_off;
640
641   /**
642    * How long is the "used_targets" array?
643    */
644   unsigned int used_targets_size;
645
646   /**
647    * Number of results found for this request.
648    */
649   unsigned int results_found;
650
651   /**
652    * How many entries in "replies_seen" are actually valid?
653    */
654   unsigned int replies_seen_off;
655
656   /**
657    * How long is the "replies_seen" array?
658    */
659   unsigned int replies_seen_size;
660   
661   /**
662    * Priority with which this request was made.  If one of our clients
663    * made the request, then this is the current priority that we are
664    * using when initiating the request.  This value is used when
665    * we decide to reward other peers with trust for providing a reply.
666    */
667   uint32_t priority;
668
669   /**
670    * Priority points left for us to spend when forwarding this request
671    * to other peers.
672    */
673   uint32_t remaining_priority;
674
675   /**
676    * Number to mingle hashes for bloom-filter tests with.
677    */
678   int32_t mingle;
679
680   /**
681    * TTL with which we saw this request (or, if we initiated, TTL that
682    * we used for the request).
683    */
684   int32_t ttl;
685   
686   /**
687    * Type of the content that this request is for.
688    */
689   enum GNUNET_BLOCK_Type type;
690
691   /**
692    * Remove this request after transmission of the current response.
693    */
694   int8_t do_remove;
695
696   /**
697    * GNUNET_YES if we should not forward this request to other peers.
698    */
699   int8_t local_only;
700
701   /**
702    * GNUNET_YES if we should not forward this request to other peers.
703    */
704   int8_t forward_only;
705
706 };
707
708
709 /**
710  * Block that is ready for migration to other peers.  Actual data is at the end of the block.
711  */
712 struct MigrationReadyBlock
713 {
714
715   /**
716    * This is a doubly-linked list.
717    */
718   struct MigrationReadyBlock *next;
719
720   /**
721    * This is a doubly-linked list.
722    */
723   struct MigrationReadyBlock *prev;
724
725   /**
726    * Query for the block.
727    */
728   GNUNET_HashCode query;
729
730   /**
731    * When does this block expire? 
732    */
733   struct GNUNET_TIME_Absolute expiration;
734
735   /**
736    * Peers we would consider forwarding this
737    * block to.  Zero for empty entries.
738    */
739   GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
740
741   /**
742    * Size of the block.
743    */
744   size_t size;
745
746   /**
747    *  Number of targets already used.
748    */
749   unsigned int used_targets;
750
751   /**
752    * Type of the block.
753    */
754   enum GNUNET_BLOCK_Type type;
755 };
756
757
758 /**
759  * Our connection to the datastore.
760  */
761 static struct GNUNET_DATASTORE_Handle *dsh;
762
763 /**
764  * Our block context.
765  */
766 static struct GNUNET_BLOCK_Context *block_ctx;
767
768 /**
769  * Our block configuration.
770  */
771 static struct GNUNET_CONFIGURATION_Handle *block_cfg;
772
773 /**
774  * Our scheduler.
775  */
776 static struct GNUNET_SCHEDULER_Handle *sched;
777
778 /**
779  * Our configuration.
780  */
781 static const struct GNUNET_CONFIGURATION_Handle *cfg;
782
783 /**
784  * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
785  */
786 static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
787
788 /**
789  * Map of peer identifiers to "struct PendingRequest" (for that peer).
790  */
791 static struct GNUNET_CONTAINER_MultiHashMap *peer_request_map;
792
793 /**
794  * Map of query identifiers to "struct PendingRequest" (for that query).
795  */
796 static struct GNUNET_CONTAINER_MultiHashMap *query_request_map;
797
798 /**
799  * Heap with the request that will expire next at the top.  Contains
800  * pointers of type "struct PendingRequest*"; these will *also* be
801  * aliased from the "requests_by_peer" data structures and the
802  * "requests_by_query" table.  Note that requests from our clients
803  * don't expire and are thus NOT in the "requests_by_expiration"
804  * (or the "requests_by_peer" tables).
805  */
806 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
807
808 /**
809  * Handle for reporting statistics.
810  */
811 static struct GNUNET_STATISTICS_Handle *stats;
812
813 /**
814  * Linked list of clients we are currently processing requests for.
815  */
816 static struct ClientList *client_list;
817
818 /**
819  * Pointer to handle to the core service (points to NULL until we've
820  * connected to it).
821  */
822 static struct GNUNET_CORE_Handle *core;
823
824 /**
825  * Head of linked list of blocks that can be migrated.
826  */
827 static struct MigrationReadyBlock *mig_head;
828
829 /**
830  * Tail of linked list of blocks that can be migrated.
831  */
832 static struct MigrationReadyBlock *mig_tail;
833
834 /**
835  * Request to datastore for migration (or NULL).
836  */
837 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
838
839 /**
840  * Request to datastore for DHT PUTs (or NULL).
841  */
842 static struct GNUNET_DATASTORE_QueueEntry *dht_qe;
843
844 /**
845  * Type we will request for the next DHT PUT round from the datastore.
846  */
847 static enum GNUNET_BLOCK_Type dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
848
849 /**
850  * Where do we store trust information?
851  */
852 static char *trustDirectory;
853
854 /**
855  * ID of task that collects blocks for migration.
856  */
857 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
858
859 /**
860  * ID of task that collects blocks for DHT PUTs.
861  */
862 static GNUNET_SCHEDULER_TaskIdentifier dht_task;
863
864 /**
865  * What is the maximum frequency at which we are allowed to
866  * poll the datastore for migration content?
867  */
868 static struct GNUNET_TIME_Relative min_migration_delay;
869
870 /**
871  * Handle for DHT operations.
872  */
873 static struct GNUNET_DHT_Handle *dht_handle;
874
875 /**
876  * Size of the doubly-linked list of migration blocks.
877  */
878 static unsigned int mig_size;
879
880 /**
881  * Are we allowed to migrate content to this peer.
882  */
883 static int active_migration;
884
885 /**
886  * How many entires with zero anonymity do we currently estimate
887  * to have in the database?
888  */
889 static unsigned int zero_anonymity_count_estimate;
890
891 /**
892  * Typical priorities we're seeing from other peers right now.  Since
893  * most priorities will be zero, this value is the weighted average of
894  * non-zero priorities seen "recently".  In order to ensure that new
895  * values do not dramatically change the ratio, values are first
896  * "capped" to a reasonable range (+N of the current value) and then
897  * averaged into the existing value by a ratio of 1:N.  Hence
898  * receiving the largest possible priority can still only raise our
899  * "current_priorities" by at most 1.
900  */
901 static double current_priorities;
902
903 /**
904  * Datastore 'GET' load tracking.
905  */
906 static struct GNUNET_LOAD_Value *datastore_get_load;
907
908 /**
909  * Datastore 'PUT' load tracking.
910  */
911 static struct GNUNET_LOAD_Value *datastore_put_load;
912
913 /**
914  * How long do requests typically stay in the routing table?
915  */
916 static struct GNUNET_LOAD_Value *rt_entry_lifetime;
917
918 /**
919  * We've just now completed a datastore request.  Update our
920  * datastore load calculations.
921  *
922  * @param start time when the datastore request was issued
923  */
924 static void
925 update_datastore_delays (struct GNUNET_TIME_Absolute start)
926 {
927   struct GNUNET_TIME_Relative delay;
928
929   delay = GNUNET_TIME_absolute_get_duration (start);
930   GNUNET_LOAD_update (datastore_get_load,
931                       delay.value);
932 }
933
934
935 /**
936  * Get the filename under which we would store the GNUNET_HELLO_Message
937  * for the given host and protocol.
938  * @return filename of the form DIRECTORY/HOSTID
939  */
940 static char *
941 get_trust_filename (const struct GNUNET_PeerIdentity *id)
942 {
943   struct GNUNET_CRYPTO_HashAsciiEncoded fil;
944   char *fn;
945
946   GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
947   GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil);
948   return fn;
949 }
950
951
952
953 /**
954  * Transmit messages by copying it to the target buffer
955  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
956  * for writing in the meantime.  In that case, do nothing
957  * (the disconnect or shutdown handler will take care of the rest).
958  * If we were able to transmit messages and there are still more
959  * pending, ask core again for further calls to this function.
960  *
961  * @param cls closure, pointer to the 'struct ConnectedPeer*'
962  * @param size number of bytes available in buf
963  * @param buf where the callee should write the message
964  * @return number of bytes written to buf
965  */
966 static size_t
967 transmit_to_peer (void *cls,
968                   size_t size, void *buf);
969
970
971 /* ******************* clean up functions ************************ */
972
973 /**
974  * Delete the given migration block.
975  *
976  * @param mb block to delete
977  */
978 static void
979 delete_migration_block (struct MigrationReadyBlock *mb)
980 {
981   GNUNET_CONTAINER_DLL_remove (mig_head,
982                                mig_tail,
983                                mb);
984   GNUNET_PEER_decrement_rcs (mb->target_list,
985                              MIGRATION_LIST_SIZE);
986   mig_size--;
987   GNUNET_free (mb);
988 }
989
990
991 /**
992  * Compare the distance of two peers to a key.
993  *
994  * @param key key
995  * @param p1 first peer
996  * @param p2 second peer
997  * @return GNUNET_YES if P1 is closer to key than P2
998  */
999 static int
1000 is_closer (const GNUNET_HashCode *key,
1001            const struct GNUNET_PeerIdentity *p1,
1002            const struct GNUNET_PeerIdentity *p2)
1003 {
1004   return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey,
1005                                     &p2->hashPubKey,
1006                                     key);
1007 }
1008
1009
1010 /**
1011  * Consider migrating content to a given peer.
1012  *
1013  * @param cls 'struct MigrationReadyBlock*' to select
1014  *            targets for (or NULL for none)
1015  * @param key ID of the peer 
1016  * @param value 'struct ConnectedPeer' of the peer
1017  * @return GNUNET_YES (always continue iteration)
1018  */
1019 static int
1020 consider_migration (void *cls,
1021                     const GNUNET_HashCode *key,
1022                     void *value)
1023 {
1024   struct MigrationReadyBlock *mb = cls;
1025   struct ConnectedPeer *cp = value;
1026   struct MigrationReadyBlock *pos;
1027   struct GNUNET_PeerIdentity cppid;
1028   struct GNUNET_PeerIdentity otherpid;
1029   struct GNUNET_PeerIdentity worstpid;
1030   size_t msize;
1031   unsigned int i;
1032   unsigned int repl;
1033   
1034   /* consider 'cp' as a migration target for mb */
1035   if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).value > 0)
1036     return GNUNET_YES; /* peer has requested no migration! */
1037   if (mb != NULL)
1038     {
1039       GNUNET_PEER_resolve (cp->pid,
1040                            &cppid);
1041       repl = MIGRATION_LIST_SIZE;
1042       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1043         {
1044           if (mb->target_list[i] == 0)
1045             {
1046               mb->target_list[i] = cp->pid;
1047               GNUNET_PEER_change_rc (mb->target_list[i], 1);
1048               repl = MIGRATION_LIST_SIZE;
1049               break;
1050             }
1051           GNUNET_PEER_resolve (mb->target_list[i],
1052                                &otherpid);
1053           if ( (repl == MIGRATION_LIST_SIZE) &&
1054                is_closer (&mb->query,
1055                           &cppid,
1056                           &otherpid)) 
1057             {
1058               repl = i;
1059               worstpid = otherpid;
1060             }
1061           else if ( (repl != MIGRATION_LIST_SIZE) &&
1062                     (is_closer (&mb->query,
1063                                 &worstpid,
1064                                 &otherpid) ) )
1065             {
1066               repl = i;
1067               worstpid = otherpid;
1068             }       
1069         }
1070       if (repl != MIGRATION_LIST_SIZE) 
1071         {
1072           GNUNET_PEER_change_rc (mb->target_list[repl], -1);
1073           mb->target_list[repl] = cp->pid;
1074           GNUNET_PEER_change_rc (mb->target_list[repl], 1);
1075         }
1076     }
1077
1078   /* consider scheduling transmission to cp for content migration */
1079   if (cp->cth != NULL)        
1080     return GNUNET_YES; 
1081   msize = 0;
1082   pos = mig_head;
1083   while (pos != NULL)
1084     {
1085       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1086         {
1087           if (cp->pid == pos->target_list[i])
1088             {
1089               if (msize == 0)
1090                 msize = pos->size;
1091               else
1092                 msize = GNUNET_MIN (msize,
1093                                     pos->size);
1094               break;
1095             }
1096         }
1097       pos = pos->next;
1098     }
1099   if (msize == 0)
1100     return GNUNET_YES; /* no content available */
1101 #if DEBUG_FS
1102   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1103               "Trying to migrate at least %u bytes to peer `%s'\n",
1104               msize,
1105               GNUNET_h2s (key));
1106 #endif
1107   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
1108     {
1109       GNUNET_SCHEDULER_cancel (sched, cp->delayed_transmission_request_task);
1110       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
1111     }
1112   cp->cth 
1113     = GNUNET_CORE_notify_transmit_ready (core,
1114                                          0, GNUNET_TIME_UNIT_FOREVER_REL,
1115                                          (const struct GNUNET_PeerIdentity*) key,
1116                                          msize + sizeof (struct PutMessage),
1117                                          &transmit_to_peer,
1118                                          cp);
1119   return GNUNET_YES;
1120 }
1121
1122
1123 /**
1124  * Task that is run periodically to obtain blocks for content
1125  * migration
1126  * 
1127  * @param cls unused
1128  * @param tc scheduler context (also unused)
1129  */
1130 static void
1131 gather_migration_blocks (void *cls,
1132                          const struct GNUNET_SCHEDULER_TaskContext *tc);
1133
1134
1135
1136
1137 /**
1138  * Task that is run periodically to obtain blocks for DHT PUTs.
1139  * 
1140  * @param cls type of blocks to gather
1141  * @param tc scheduler context (unused)
1142  */
1143 static void
1144 gather_dht_put_blocks (void *cls,
1145                        const struct GNUNET_SCHEDULER_TaskContext *tc);
1146
1147
1148 /**
1149  * If the migration task is not currently running, consider
1150  * (re)scheduling it with the appropriate delay.
1151  */
1152 static void
1153 consider_migration_gathering ()
1154 {
1155   struct GNUNET_TIME_Relative delay;
1156
1157   if (dsh == NULL)
1158     return;
1159   if (mig_qe != NULL)
1160     return;
1161   if (mig_task != GNUNET_SCHEDULER_NO_TASK)
1162     return;
1163   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
1164                                          mig_size);
1165   delay = GNUNET_TIME_relative_divide (delay,
1166                                        MAX_MIGRATION_QUEUE);
1167   delay = GNUNET_TIME_relative_max (delay,
1168                                     min_migration_delay);
1169   mig_task = GNUNET_SCHEDULER_add_delayed (sched,
1170                                            delay,
1171                                            &gather_migration_blocks,
1172                                            NULL);
1173 }
1174
1175
1176 /**
1177  * If the DHT PUT gathering task is not currently running, consider
1178  * (re)scheduling it with the appropriate delay.
1179  */
1180 static void
1181 consider_dht_put_gathering (void *cls)
1182 {
1183   struct GNUNET_TIME_Relative delay;
1184
1185   if (dsh == NULL)
1186     return;
1187   if (dht_qe != NULL)
1188     return;
1189   if (dht_task != GNUNET_SCHEDULER_NO_TASK)
1190     return;
1191   if (zero_anonymity_count_estimate > 0)
1192     {
1193       delay = GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY,
1194                                            zero_anonymity_count_estimate);
1195       delay = GNUNET_TIME_relative_min (delay,
1196                                         MAX_DHT_PUT_FREQ);
1197     }
1198   else
1199     {
1200       /* if we have NO zero-anonymity content yet, wait 5 minutes for some to
1201          (hopefully) appear */
1202       delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5);
1203     }
1204   dht_task = GNUNET_SCHEDULER_add_delayed (sched,
1205                                            delay,
1206                                            &gather_dht_put_blocks,
1207                                            cls);
1208 }
1209
1210
1211 /**
1212  * Process content offered for migration.
1213  *
1214  * @param cls closure
1215  * @param key key for the content
1216  * @param size number of bytes in data
1217  * @param data content stored
1218  * @param type type of the content
1219  * @param priority priority of the content
1220  * @param anonymity anonymity-level for the content
1221  * @param expiration expiration time for the content
1222  * @param uid unique identifier for the datum;
1223  *        maybe 0 if no unique identifier is available
1224  */
1225 static void
1226 process_migration_content (void *cls,
1227                            const GNUNET_HashCode * key,
1228                            size_t size,
1229                            const void *data,
1230                            enum GNUNET_BLOCK_Type type,
1231                            uint32_t priority,
1232                            uint32_t anonymity,
1233                            struct GNUNET_TIME_Absolute
1234                            expiration, uint64_t uid)
1235 {
1236   struct MigrationReadyBlock *mb;
1237   
1238   if (key == NULL)
1239     {
1240       mig_qe = NULL;
1241       if (mig_size < MAX_MIGRATION_QUEUE)  
1242         consider_migration_gathering ();
1243       return;
1244     }
1245   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1246     {
1247       if (GNUNET_OK !=
1248           GNUNET_FS_handle_on_demand_block (key, size, data,
1249                                             type, priority, anonymity,
1250                                             expiration, uid, 
1251                                             &process_migration_content,
1252                                             NULL))
1253         {
1254           GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1255         }
1256       return;
1257     }
1258 #if DEBUG_FS
1259   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1260               "Retrieved block `%s' of type %u for migration\n",
1261               GNUNET_h2s (key),
1262               type);
1263 #endif
1264   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
1265   mb->query = *key;
1266   mb->expiration = expiration;
1267   mb->size = size;
1268   mb->type = type;
1269   memcpy (&mb[1], data, size);
1270   GNUNET_CONTAINER_DLL_insert_after (mig_head,
1271                                      mig_tail,
1272                                      mig_tail,
1273                                      mb);
1274   mig_size++;
1275   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1276                                          &consider_migration,
1277                                          mb);
1278   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1279 }
1280
1281
1282 /**
1283  * Function called upon completion of the DHT PUT operation.
1284  */
1285 static void
1286 dht_put_continuation (void *cls,
1287                       const struct GNUNET_SCHEDULER_TaskContext *tc)
1288 {
1289   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1290 }
1291
1292
1293 /**
1294  * Store content in DHT.
1295  *
1296  * @param cls closure
1297  * @param key key for the content
1298  * @param size number of bytes in data
1299  * @param data content stored
1300  * @param type type of the content
1301  * @param priority priority of the content
1302  * @param anonymity anonymity-level for the content
1303  * @param expiration expiration time for the content
1304  * @param uid unique identifier for the datum;
1305  *        maybe 0 if no unique identifier is available
1306  */
1307 static void
1308 process_dht_put_content (void *cls,
1309                          const GNUNET_HashCode * key,
1310                          size_t size,
1311                          const void *data,
1312                          enum GNUNET_BLOCK_Type type,
1313                          uint32_t priority,
1314                          uint32_t anonymity,
1315                          struct GNUNET_TIME_Absolute
1316                          expiration, uint64_t uid)
1317
1318   static unsigned int counter;
1319   static GNUNET_HashCode last_vhash;
1320   static GNUNET_HashCode vhash;
1321
1322   if (key == NULL)
1323     {
1324       dht_qe = NULL;
1325       consider_dht_put_gathering (cls);
1326       return;
1327     }
1328   /* slightly funky code to estimate the total number of values with zero
1329      anonymity from the maximum observed length of a monotonically increasing 
1330      sequence of hashes over the contents */
1331   GNUNET_CRYPTO_hash (data, size, &vhash);
1332   if (GNUNET_CRYPTO_hash_cmp (&vhash, &last_vhash) <= 0)
1333     {
1334       if (zero_anonymity_count_estimate > 0)
1335         zero_anonymity_count_estimate /= 2;
1336       counter = 0;
1337     }
1338   last_vhash = vhash;
1339   if (counter < 31)
1340     counter++;
1341   if (zero_anonymity_count_estimate < (1 << counter))
1342     zero_anonymity_count_estimate = (1 << counter);
1343 #if DEBUG_FS
1344   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1345               "Retrieved block `%s' of type %u for DHT PUT\n",
1346               GNUNET_h2s (key),
1347               type);
1348 #endif
1349   GNUNET_DHT_put (dht_handle,
1350                   key,
1351                   GNUNET_DHT_RO_NONE,
1352                   type,
1353                   size,
1354                   data,
1355                   expiration,
1356                   GNUNET_TIME_UNIT_FOREVER_REL,
1357                   &dht_put_continuation,
1358                   cls);
1359 }
1360
1361
1362 /**
1363  * Task that is run periodically to obtain blocks for content
1364  * migration
1365  * 
1366  * @param cls unused
1367  * @param tc scheduler context (also unused)
1368  */
1369 static void
1370 gather_migration_blocks (void *cls,
1371                          const struct GNUNET_SCHEDULER_TaskContext *tc)
1372 {
1373   mig_task = GNUNET_SCHEDULER_NO_TASK;
1374   if (dsh != NULL)
1375     {
1376       mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, UINT_MAX,
1377                                             GNUNET_TIME_UNIT_FOREVER_REL,
1378                                             &process_migration_content, NULL);
1379       GNUNET_assert (mig_qe != NULL);
1380     }
1381 }
1382
1383
1384 /**
1385  * Task that is run periodically to obtain blocks for DHT PUTs.
1386  * 
1387  * @param cls type of blocks to gather
1388  * @param tc scheduler context (unused)
1389  */
1390 static void
1391 gather_dht_put_blocks (void *cls,
1392                        const struct GNUNET_SCHEDULER_TaskContext *tc)
1393 {
1394   dht_task = GNUNET_SCHEDULER_NO_TASK;
1395   if (dsh != NULL)
1396     {
1397       if (dht_put_type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1398         dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
1399       dht_qe = GNUNET_DATASTORE_get_zero_anonymity (dsh, 0, UINT_MAX,
1400                                                     GNUNET_TIME_UNIT_FOREVER_REL,
1401                                                     dht_put_type++,
1402                                                     &process_dht_put_content, NULL);
1403       GNUNET_assert (dht_qe != NULL);
1404     }
1405 }
1406
1407
1408 /**
1409  * We're done with a particular message list entry.
1410  * Free all associated resources.
1411  * 
1412  * @param pml entry to destroy
1413  */
1414 static void
1415 destroy_pending_message_list_entry (struct PendingMessageList *pml)
1416 {
1417   GNUNET_CONTAINER_DLL_remove (pml->req->pending_head,
1418                                pml->req->pending_tail,
1419                                pml);
1420   GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head,
1421                                pml->target->pending_messages_tail,
1422                                pml->pm);
1423   pml->target->pending_requests--;
1424   GNUNET_free (pml->pm);
1425   GNUNET_free (pml);
1426 }
1427
1428
1429 /**
1430  * Destroy the given pending message (and call the respective
1431  * continuation).
1432  *
1433  * @param pm message to destroy
1434  * @param tpid id of peer that the message was delivered to, or 0 for none
1435  */
1436 static void
1437 destroy_pending_message (struct PendingMessage *pm,
1438                          GNUNET_PEER_Id tpid)
1439 {
1440   struct PendingMessageList *pml = pm->pml;
1441   TransmissionContinuation cont;
1442   void *cont_cls;
1443
1444   cont = pm->cont;
1445   cont_cls = pm->cont_cls;
1446   if (pml != NULL)
1447     {
1448       GNUNET_assert (pml->pm == pm);
1449       GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
1450       destroy_pending_message_list_entry (pml);
1451     }
1452   else
1453     {
1454       GNUNET_free (pm);
1455     }
1456   if (cont != NULL)
1457     cont (cont_cls, tpid);  
1458 }
1459
1460
1461 /**
1462  * We're done processing a particular request.
1463  * Free all associated resources.
1464  *
1465  * @param pr request to destroy
1466  */
1467 static void
1468 destroy_pending_request (struct PendingRequest *pr)
1469 {
1470   struct GNUNET_PeerIdentity pid;
1471   unsigned int i;
1472
1473   if (pr->hnode != NULL)
1474     {
1475       GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
1476                                          pr->hnode);
1477       pr->hnode = NULL;
1478     }
1479   if (NULL == pr->client_request_list)
1480     {
1481       GNUNET_STATISTICS_update (stats,
1482                                 gettext_noop ("# P2P searches active"),
1483                                 -1,
1484                                 GNUNET_NO);
1485     }
1486   else
1487     {
1488       GNUNET_STATISTICS_update (stats,
1489                                 gettext_noop ("# client searches active"),
1490                                 -1,
1491                                 GNUNET_NO);
1492     }
1493   if (GNUNET_YES == 
1494       GNUNET_CONTAINER_multihashmap_remove (query_request_map,
1495                                             &pr->query,
1496                                             pr))
1497     {
1498       GNUNET_LOAD_update (rt_entry_lifetime,
1499                           GNUNET_TIME_absolute_get_duration (pr->start_time).value);
1500     }
1501   if (pr->qe != NULL)
1502      {
1503       GNUNET_DATASTORE_cancel (pr->qe);
1504       pr->qe = NULL;
1505     }
1506   if (pr->dht_get != NULL)
1507     {
1508       GNUNET_DHT_get_stop (pr->dht_get);
1509       pr->dht_get = NULL;
1510     }
1511   if (pr->client_request_list != NULL)
1512     {
1513       GNUNET_CONTAINER_DLL_remove (pr->client_request_list->client_list->rl_head,
1514                                    pr->client_request_list->client_list->rl_tail,
1515                                    pr->client_request_list);
1516       GNUNET_free (pr->client_request_list);
1517       pr->client_request_list = NULL;
1518     }
1519   if (pr->cp != NULL)
1520     {
1521       GNUNET_PEER_resolve (pr->cp->pid,
1522                            &pid);
1523       (void) GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1524                                                    &pid.hashPubKey,
1525                                                    pr);
1526       pr->cp = NULL;
1527     }
1528   if (pr->bf != NULL)
1529     {
1530       GNUNET_CONTAINER_bloomfilter_free (pr->bf);                                        
1531       pr->bf = NULL;
1532     }
1533   if (pr->irc != NULL)
1534     {
1535       GNUNET_CORE_peer_change_preference_cancel (pr->irc);
1536       pr->irc = NULL;
1537     }
1538   if (pr->replies_seen != NULL)
1539     {
1540       GNUNET_free (pr->replies_seen);
1541       pr->replies_seen = NULL;
1542     }
1543   if (pr->task != GNUNET_SCHEDULER_NO_TASK)
1544     {
1545       GNUNET_SCHEDULER_cancel (sched,
1546                                pr->task);
1547       pr->task = GNUNET_SCHEDULER_NO_TASK;
1548     }
1549   while (NULL != pr->pending_head)    
1550     destroy_pending_message_list_entry (pr->pending_head);
1551   GNUNET_PEER_change_rc (pr->target_pid, -1);
1552   if (pr->used_targets != NULL)
1553     {
1554       for (i=0;i<pr->used_targets_off;i++)
1555         GNUNET_PEER_change_rc (pr->used_targets[i].pid, -1);
1556       GNUNET_free (pr->used_targets);
1557       pr->used_targets_off = 0;
1558       pr->used_targets_size = 0;
1559       pr->used_targets = NULL;
1560     }
1561   GNUNET_free (pr);
1562 }
1563
1564
1565 /**
1566  * Method called whenever a given peer connects.
1567  *
1568  * @param cls closure, not used
1569  * @param peer peer identity this notification is about
1570  * @param latency reported latency of the connection with 'other'
1571  * @param distance reported distance (DV) to 'other' 
1572  */
1573 static void 
1574 peer_connect_handler (void *cls,
1575                       const struct
1576                       GNUNET_PeerIdentity * peer,
1577                       struct GNUNET_TIME_Relative latency,
1578                       uint32_t distance)
1579 {
1580   struct ConnectedPeer *cp;
1581   struct MigrationReadyBlock *pos;
1582   char *fn;
1583   uint32_t trust;
1584   
1585   cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
1586   cp->transmission_delay = GNUNET_LOAD_value_init (latency);
1587   cp->pid = GNUNET_PEER_intern (peer);
1588
1589   fn = get_trust_filename (peer);
1590   if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
1591       (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
1592     cp->disk_trust = cp->trust = ntohl (trust);
1593   GNUNET_free (fn);
1594
1595   GNUNET_break (GNUNET_OK ==
1596                 GNUNET_CONTAINER_multihashmap_put (connected_peers,
1597                                                    &peer->hashPubKey,
1598                                                    cp,
1599                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1600
1601   pos = mig_head;
1602   while (NULL != pos)
1603     {
1604       (void) consider_migration (pos, &peer->hashPubKey, cp);
1605       pos = pos->next;
1606     }
1607 }
1608
1609
1610 /**
1611  * Method called whenever a given peer has a status change.
1612  *
1613  * @param cls closure
1614  * @param peer peer identity this notification is about
1615  * @param latency reported latency of the connection with 'other'
1616  * @param distance reported distance (DV) to 'other' 
1617  * @param bandwidth_in available amount of inbound bandwidth
1618  * @param bandwidth_out available amount of outbound bandwidth
1619  * @param timeout absolute time when this peer will time out
1620  *        unless we see some further activity from it
1621  */
1622 static void
1623 peer_status_handler (void *cls,
1624                      const struct
1625                      GNUNET_PeerIdentity * peer,
1626                      struct GNUNET_TIME_Relative latency,
1627                      uint32_t distance,
1628                      struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in,
1629                      struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
1630                      struct GNUNET_TIME_Absolute timeout)
1631 {
1632   struct ConnectedPeer *cp;
1633
1634   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1635                                           &peer->hashPubKey);
1636   GNUNET_assert (cp != NULL);
1637   GNUNET_LOAD_value_set_decline (cp->transmission_delay,
1638                                  latency);  
1639 }
1640
1641
1642
1643 /**
1644  * Increase the host credit by a value.
1645  *
1646  * @param host which peer to change the trust value on
1647  * @param value is the int value by which the
1648  *  host credit is to be increased or decreased
1649  * @returns the actual change in trust (positive or negative)
1650  */
1651 static int
1652 change_host_trust (struct ConnectedPeer *host, int value)
1653 {
1654   unsigned int old_trust;
1655
1656   if (value == 0)
1657     return 0;
1658   GNUNET_assert (host != NULL);
1659   old_trust = host->trust;
1660   if (value > 0)
1661     {
1662       if (host->trust + value < host->trust)
1663         {
1664           value = UINT32_MAX - host->trust;
1665           host->trust = UINT32_MAX;
1666         }
1667       else
1668         host->trust += value;
1669     }
1670   else
1671     {
1672       if (host->trust < -value)
1673         {
1674           value = -host->trust;
1675           host->trust = 0;
1676         }
1677       else
1678         host->trust += value;
1679     }
1680   return value;
1681 }
1682
1683
1684 /**
1685  * Write host-trust information to a file - flush the buffer entry!
1686  */
1687 static int
1688 flush_trust (void *cls,
1689              const GNUNET_HashCode *key,
1690              void *value)
1691 {
1692   struct ConnectedPeer *host = value;
1693   char *fn;
1694   uint32_t trust;
1695   struct GNUNET_PeerIdentity pid;
1696
1697   if (host->trust == host->disk_trust)
1698     return GNUNET_OK;                     /* unchanged */
1699   GNUNET_PEER_resolve (host->pid,
1700                        &pid);
1701   fn = get_trust_filename (&pid);
1702   if (host->trust == 0)
1703     {
1704       if ((0 != UNLINK (fn)) && (errno != ENOENT))
1705         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
1706                                   GNUNET_ERROR_TYPE_BULK, "unlink", fn);
1707     }
1708   else
1709     {
1710       trust = htonl (host->trust);
1711       if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust, 
1712                                                     sizeof(uint32_t),
1713                                                     GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE
1714                                                     | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ))
1715         host->disk_trust = host->trust;
1716     }
1717   GNUNET_free (fn);
1718   return GNUNET_OK;
1719 }
1720
1721 /**
1722  * Call this method periodically to scan data/hosts for new hosts.
1723  */
1724 static void
1725 cron_flush_trust (void *cls,
1726                   const struct GNUNET_SCHEDULER_TaskContext *tc)
1727 {
1728
1729   if (NULL == connected_peers)
1730     return;
1731   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1732                                          &flush_trust,
1733                                          NULL);
1734   if (NULL == tc)
1735     return;
1736   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1737     return;
1738   GNUNET_SCHEDULER_add_delayed (tc->sched,
1739                                 TRUST_FLUSH_FREQ, &cron_flush_trust, NULL);
1740 }
1741
1742
1743 /**
1744  * Free (each) request made by the peer.
1745  *
1746  * @param cls closure, points to peer that the request belongs to
1747  * @param key current key code
1748  * @param value value in the hash map
1749  * @return GNUNET_YES (we should continue to iterate)
1750  */
1751 static int
1752 destroy_request (void *cls,
1753                  const GNUNET_HashCode * key,
1754                  void *value)
1755 {
1756   const struct GNUNET_PeerIdentity * peer = cls;
1757   struct PendingRequest *pr = value;
1758   
1759   GNUNET_break (GNUNET_YES ==
1760                 GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1761                                                       &peer->hashPubKey,
1762                                                       pr));
1763   destroy_pending_request (pr);
1764   return GNUNET_YES;
1765 }
1766
1767
1768 /**
1769  * Method called whenever a peer disconnects.
1770  *
1771  * @param cls closure, not used
1772  * @param peer peer identity this notification is about
1773  */
1774 static void
1775 peer_disconnect_handler (void *cls,
1776                          const struct
1777                          GNUNET_PeerIdentity * peer)
1778 {
1779   struct ConnectedPeer *cp;
1780   struct PendingMessage *pm;
1781   unsigned int i;
1782   struct MigrationReadyBlock *pos;
1783   struct MigrationReadyBlock *next;
1784
1785   GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map,
1786                                               &peer->hashPubKey,
1787                                               &destroy_request,
1788                                               (void*) peer);
1789   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1790                                           &peer->hashPubKey);
1791   if (cp == NULL)
1792     return;
1793   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1794     {
1795       if (NULL != cp->last_client_replies[i])
1796         {
1797           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1798           cp->last_client_replies[i] = NULL;
1799         }
1800     }
1801   GNUNET_break (GNUNET_YES ==
1802                 GNUNET_CONTAINER_multihashmap_remove (connected_peers,
1803                                                       &peer->hashPubKey,
1804                                                       cp));
1805   /* remove this peer from migration considerations; schedule
1806      alternatives */
1807   next = mig_head;
1808   while (NULL != (pos = next))
1809     {
1810       next = pos->next;
1811       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1812         {
1813           if (pos->target_list[i] == cp->pid)
1814             {
1815               GNUNET_PEER_change_rc (pos->target_list[i], -1);
1816               pos->target_list[i] = 0;
1817             }
1818          }
1819       if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers))
1820         {
1821           delete_migration_block (pos);
1822           consider_migration_gathering ();
1823           continue;
1824         }
1825       GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1826                                              &consider_migration,
1827                                              pos);
1828     }
1829   GNUNET_PEER_change_rc (cp->pid, -1);
1830   GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1831   if (NULL != cp->cth)
1832     {
1833       GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1834       cp->cth = NULL;
1835     }
1836   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
1837     {
1838       GNUNET_SCHEDULER_cancel (sched, cp->delayed_transmission_request_task);
1839       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
1840     }
1841   while (NULL != (pm = cp->pending_messages_head))
1842     destroy_pending_message (pm, 0 /* delivery failed */);
1843   GNUNET_LOAD_value_free (cp->transmission_delay);
1844   GNUNET_break (0 == cp->pending_requests);
1845   GNUNET_free (cp);
1846 }
1847
1848
1849 /**
1850  * Iterator over hash map entries that removes all occurences
1851  * of the given 'client' from the 'last_client_replies' of the
1852  * given connected peer.
1853  *
1854  * @param cls closure, the 'struct GNUNET_SERVER_Client*' to remove
1855  * @param key current key code (unused)
1856  * @param value value in the hash map (the 'struct ConnectedPeer*' to change)
1857  * @return GNUNET_YES (we should continue to iterate)
1858  */
1859 static int
1860 remove_client_from_last_client_replies (void *cls,
1861                                         const GNUNET_HashCode * key,
1862                                         void *value)
1863 {
1864   struct GNUNET_SERVER_Client *client = cls;
1865   struct ConnectedPeer *cp = value;
1866   unsigned int i;
1867
1868   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1869     {
1870       if (cp->last_client_replies[i] == client)
1871         {
1872           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1873           cp->last_client_replies[i] = NULL;
1874         }
1875     }  
1876   return GNUNET_YES;
1877 }
1878
1879
1880 /**
1881  * A client disconnected.  Remove all of its pending queries.
1882  *
1883  * @param cls closure, NULL
1884  * @param client identification of the client
1885  */
1886 static void
1887 handle_client_disconnect (void *cls,
1888                           struct GNUNET_SERVER_Client
1889                           * client)
1890 {
1891   struct ClientList *pos;
1892   struct ClientList *prev;
1893   struct ClientRequestList *rcl;
1894   struct ClientResponseMessage *creply;
1895
1896   if (client == NULL)
1897     return;
1898   prev = NULL;
1899   pos = client_list;
1900   while ( (NULL != pos) &&
1901           (pos->client != client) )
1902     {
1903       prev = pos;
1904       pos = pos->next;
1905     }
1906   if (pos == NULL)
1907     return; /* no requests pending for this client */
1908   while (NULL != (rcl = pos->rl_head))
1909     {
1910       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1911                   "Destroying pending request `%s' on disconnect\n",
1912                   GNUNET_h2s (&rcl->req->query));
1913       destroy_pending_request (rcl->req);
1914     }
1915   if (prev == NULL)
1916     client_list = pos->next;
1917   else
1918     prev->next = pos->next;
1919   if (pos->th != NULL)
1920     {
1921       GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
1922       pos->th = NULL;
1923     }
1924   while (NULL != (creply = pos->res_head))
1925     {
1926       GNUNET_CONTAINER_DLL_remove (pos->res_head,
1927                                    pos->res_tail,
1928                                    creply);
1929       GNUNET_free (creply);
1930     }    
1931   GNUNET_SERVER_client_drop (pos->client);
1932   GNUNET_free (pos);
1933   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1934                                          &remove_client_from_last_client_replies,
1935                                          client);
1936 }
1937
1938
1939 /**
1940  * Iterator to free peer entries.
1941  *
1942  * @param cls closure, unused
1943  * @param key current key code
1944  * @param value value in the hash map (peer entry)
1945  * @return GNUNET_YES (we should continue to iterate)
1946  */
1947 static int 
1948 clean_peer (void *cls,
1949             const GNUNET_HashCode * key,
1950             void *value)
1951 {
1952   peer_disconnect_handler (NULL, (const struct GNUNET_PeerIdentity*) key);
1953   return GNUNET_YES;
1954 }
1955
1956
1957 /**
1958  * Task run during shutdown.
1959  *
1960  * @param cls unused
1961  * @param tc unused
1962  */
1963 static void
1964 shutdown_task (void *cls,
1965                const struct GNUNET_SCHEDULER_TaskContext *tc)
1966 {
1967   if (mig_qe != NULL)
1968     {
1969       GNUNET_DATASTORE_cancel (mig_qe);
1970       mig_qe = NULL;
1971     }
1972   if (dht_qe != NULL)
1973     {
1974       GNUNET_DATASTORE_cancel (dht_qe);
1975       dht_qe = NULL;
1976     }
1977   if (GNUNET_SCHEDULER_NO_TASK != mig_task)
1978     {
1979       GNUNET_SCHEDULER_cancel (sched, mig_task);
1980       mig_task = GNUNET_SCHEDULER_NO_TASK;
1981     }
1982   if (GNUNET_SCHEDULER_NO_TASK != dht_task)
1983     {
1984       GNUNET_SCHEDULER_cancel (sched, dht_task);
1985       dht_task = GNUNET_SCHEDULER_NO_TASK;
1986     }
1987   while (client_list != NULL)
1988     handle_client_disconnect (NULL,
1989                               client_list->client);
1990   cron_flush_trust (NULL, NULL);
1991   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1992                                          &clean_peer,
1993                                          NULL);
1994   GNUNET_break (0 == GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap));
1995   GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
1996   requests_by_expiration_heap = 0;
1997   GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
1998   connected_peers = NULL;
1999   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map));
2000   GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
2001   query_request_map = NULL;
2002   GNUNET_LOAD_value_free (rt_entry_lifetime);
2003   rt_entry_lifetime = NULL;
2004   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map));
2005   GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
2006   peer_request_map = NULL;
2007   GNUNET_assert (NULL != core);
2008   GNUNET_CORE_disconnect (core);
2009   core = NULL;
2010   if (stats != NULL)
2011     {
2012       GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
2013       stats = NULL;
2014     }
2015   if (dsh != NULL)
2016     {
2017       GNUNET_DATASTORE_disconnect (dsh,
2018                                    GNUNET_NO);
2019       dsh = NULL;
2020     }
2021   while (mig_head != NULL)
2022     delete_migration_block (mig_head);
2023   GNUNET_assert (0 == mig_size);
2024   GNUNET_DHT_disconnect (dht_handle);
2025   dht_handle = NULL;
2026   GNUNET_LOAD_value_free (datastore_get_load);
2027   datastore_get_load = NULL;
2028   GNUNET_LOAD_value_free (datastore_put_load);
2029   datastore_put_load = NULL;
2030   GNUNET_BLOCK_context_destroy (block_ctx);
2031   block_ctx = NULL;
2032   GNUNET_CONFIGURATION_destroy (block_cfg);
2033   block_cfg = NULL;
2034   sched = NULL;
2035   cfg = NULL;  
2036   GNUNET_free_non_null (trustDirectory);
2037   trustDirectory = NULL;
2038 }
2039
2040
2041 /* ******************* Utility functions  ******************** */
2042
2043
2044 /**
2045  * We've had to delay a request for transmission to core, but now
2046  * we should be ready.  Run it.
2047  *
2048  * @param cls the 'struct ConnectedPeer' for which a request was delayed
2049  * @param tc task context (unused)
2050  */
2051 static void
2052 delayed_transmission_request (void *cls,
2053                               const struct GNUNET_SCHEDULER_TaskContext *tc)
2054 {
2055   struct ConnectedPeer *cp = cls;
2056   struct GNUNET_PeerIdentity pid;
2057   struct PendingMessage *pm;
2058
2059   pm = cp->pending_messages_head;
2060   cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
2061   GNUNET_assert (cp->cth == NULL);
2062   if (pm == NULL)
2063     return;
2064   GNUNET_PEER_resolve (cp->pid,
2065                        &pid);
2066   cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
2067   cp->cth = GNUNET_CORE_notify_transmit_ready (core,
2068                                                pm->priority,
2069                                                GNUNET_CONSTANTS_SERVICE_TIMEOUT,
2070                                                &pid,
2071                                                pm->msize,
2072                                                &transmit_to_peer,
2073                                                cp);
2074 }
2075
2076
2077 /**
2078  * Transmit messages by copying it to the target buffer
2079  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
2080  * for writing in the meantime.  In that case, do nothing
2081  * (the disconnect or shutdown handler will take care of the rest).
2082  * If we were able to transmit messages and there are still more
2083  * pending, ask core again for further calls to this function.
2084  *
2085  * @param cls closure, pointer to the 'struct ConnectedPeer*'
2086  * @param size number of bytes available in buf
2087  * @param buf where the callee should write the message
2088  * @return number of bytes written to buf
2089  */
2090 static size_t
2091 transmit_to_peer (void *cls,
2092                   size_t size, void *buf)
2093 {
2094   struct ConnectedPeer *cp = cls;
2095   char *cbuf = buf;
2096   struct PendingMessage *pm;
2097   struct PendingMessage *next_pm;
2098   struct GNUNET_TIME_Absolute now;
2099   struct GNUNET_TIME_Relative min_delay;
2100   struct MigrationReadyBlock *mb;
2101   struct MigrationReadyBlock *next;
2102   struct PutMessage migm;
2103   size_t msize;
2104   unsigned int i;
2105   struct GNUNET_PeerIdentity pid;
2106  
2107   cp->cth = NULL;
2108   if (NULL == buf)
2109     {
2110 #if DEBUG_FS
2111       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2112                   "Dropping message, core too busy.\n");
2113 #endif
2114       GNUNET_LOAD_update (cp->transmission_delay,
2115                           UINT64_MAX);
2116       return 0;
2117     }  
2118   GNUNET_LOAD_update (cp->transmission_delay,
2119                       GNUNET_TIME_absolute_get_duration (cp->last_transmission_request_start).value);
2120   now = GNUNET_TIME_absolute_get ();
2121   msize = 0;
2122   min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
2123   next_pm = cp->pending_messages_head;
2124   while ( (NULL != (pm = next_pm) ) &&
2125           (pm->msize <= size) )
2126     {
2127       next_pm = pm->next;
2128       if (pm->delay_until.value > now.value)
2129         {
2130           min_delay = GNUNET_TIME_relative_min (min_delay,
2131                                                 GNUNET_TIME_absolute_get_remaining (pm->delay_until));
2132           continue;
2133         }
2134       memcpy (&cbuf[msize], &pm[1], pm->msize);
2135       msize += pm->msize;
2136       size -= pm->msize;
2137       if (NULL == pm->pml)
2138         {
2139           GNUNET_CONTAINER_DLL_remove (cp->pending_messages_head,
2140                                        cp->pending_messages_tail,
2141                                        pm);
2142           cp->pending_requests--;
2143         }
2144       destroy_pending_message (pm, cp->pid);
2145     }
2146   if (pm != NULL)
2147     min_delay = GNUNET_TIME_UNIT_ZERO;
2148   if (NULL != cp->pending_messages_head)
2149     {     
2150       GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == cp->delayed_transmission_request_task);
2151       cp->delayed_transmission_request_task
2152         = GNUNET_SCHEDULER_add_delayed (sched,
2153                                         min_delay,
2154                                         &delayed_transmission_request,
2155                                         cp);
2156     }
2157   if (pm == NULL)
2158     {      
2159       GNUNET_PEER_resolve (cp->pid,
2160                            &pid);
2161       next = mig_head;
2162       while (NULL != (mb = next))
2163         {
2164           next = mb->next;
2165           for (i=0;i<MIGRATION_LIST_SIZE;i++)
2166             {
2167               if ( (cp->pid == mb->target_list[i]) &&
2168                    (mb->size + sizeof (migm) <= size) )
2169                 {
2170                   GNUNET_PEER_change_rc (mb->target_list[i], -1);
2171                   mb->target_list[i] = 0;
2172                   mb->used_targets++;
2173                   memset (&migm, 0, sizeof (migm));
2174                   migm.header.size = htons (sizeof (migm) + mb->size);
2175                   migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2176                   migm.type = htonl (mb->type);
2177                   migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
2178                   memcpy (&cbuf[msize], &migm, sizeof (migm));
2179                   msize += sizeof (migm);
2180                   size -= sizeof (migm);
2181                   memcpy (&cbuf[msize], &mb[1], mb->size);
2182                   msize += mb->size;
2183                   size -= mb->size;
2184 #if DEBUG_FS
2185                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2186                               "Pushing migration block `%s' (%u bytes) to `%s'\n",
2187                               GNUNET_h2s (&mb->query),
2188                               (unsigned int) mb->size,
2189                               GNUNET_i2s (&pid));
2190 #endif    
2191                   break;
2192                 }
2193               else
2194                 {
2195 #if DEBUG_FS
2196                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2197                               "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
2198                               GNUNET_h2s (&mb->query),
2199                               (unsigned int) mb->size,
2200                               GNUNET_i2s (&pid));
2201 #endif    
2202                 }
2203             }
2204           if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
2205                (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
2206             {
2207               delete_migration_block (mb);
2208               consider_migration_gathering ();
2209             }
2210         }
2211       consider_migration (NULL, 
2212                           &pid.hashPubKey,
2213                           cp);
2214     }
2215 #if DEBUG_FS
2216   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2217               "Transmitting %u bytes to peer with PID %u\n",
2218               (unsigned int) msize,
2219               (unsigned int) cp->pid);
2220 #endif
2221   return msize;
2222 }
2223
2224
2225 /**
2226  * Add a message to the set of pending messages for the given peer.
2227  *
2228  * @param cp peer to send message to
2229  * @param pm message to queue
2230  * @param pr request on which behalf this message is being queued
2231  */
2232 static void
2233 add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
2234                                   struct PendingMessage *pm,
2235                                   struct PendingRequest *pr)
2236 {
2237   struct PendingMessage *pos;
2238   struct PendingMessageList *pml;
2239   struct GNUNET_PeerIdentity pid;
2240
2241   GNUNET_assert (pm->next == NULL);
2242   GNUNET_assert (pm->pml == NULL);    
2243   if (pr != NULL)
2244     {
2245       pml = GNUNET_malloc (sizeof (struct PendingMessageList));
2246       pml->req = pr;
2247       pml->target = cp;
2248       pml->pm = pm;
2249       pm->pml = pml;  
2250       GNUNET_CONTAINER_DLL_insert (pr->pending_head,
2251                                    pr->pending_tail,
2252                                    pml);
2253     }
2254   pos = cp->pending_messages_head;
2255   while ( (pos != NULL) &&
2256           (pm->priority < pos->priority) )
2257     pos = pos->next;    
2258   GNUNET_CONTAINER_DLL_insert_after (cp->pending_messages_head,
2259                                      cp->pending_messages_tail,
2260                                      pos,
2261                                      pm);
2262   cp->pending_requests++;
2263   if (cp->pending_requests > MAX_QUEUE_PER_PEER)
2264     {
2265       GNUNET_STATISTICS_update (stats,
2266                                 gettext_noop ("# P2P searches discarded (queue length bound)"),
2267                                 1,
2268                                 GNUNET_NO);
2269       destroy_pending_message (cp->pending_messages_tail, 0);  
2270     }
2271   GNUNET_PEER_resolve (cp->pid, &pid);
2272   if (NULL != cp->cth)
2273     {
2274       GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
2275       cp->cth = NULL;
2276     }
2277   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
2278     {
2279       GNUNET_SCHEDULER_cancel (sched, cp->delayed_transmission_request_task);
2280       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
2281     }
2282   /* need to schedule transmission */
2283   cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
2284   cp->cth = GNUNET_CORE_notify_transmit_ready (core,
2285                                                cp->pending_messages_head->priority,
2286                                                MAX_TRANSMIT_DELAY,
2287                                                &pid,
2288                                                cp->pending_messages_head->msize,
2289                                                &transmit_to_peer,
2290                                                cp);
2291   if (cp->cth == NULL)
2292     {
2293 #if DEBUG_FS
2294       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2295                   "Failed to schedule transmission with core!\n");
2296 #endif
2297       GNUNET_STATISTICS_update (stats,
2298                                 gettext_noop ("# CORE transmission failures"),
2299                                 1,
2300                                 GNUNET_NO);
2301     }
2302 }
2303
2304
2305 /**
2306  * Test if the DATABASE (GET) load on this peer is too high
2307  * to even consider processing the query at
2308  * all.  
2309  * 
2310  * @return GNUNET_YES if the load is too high to do anything (load high)
2311  *         GNUNET_NO to process normally (load normal)
2312  *         GNUNET_SYSERR to process for free (load low)
2313  */
2314 static int
2315 test_get_load_too_high (uint32_t priority)
2316 {
2317   double ld;
2318
2319   ld = GNUNET_LOAD_get_load (datastore_get_load);
2320   if (ld < 1)
2321     return GNUNET_SYSERR;    
2322   if (ld <= priority)    
2323     return GNUNET_NO;    
2324   return GNUNET_YES;
2325 }
2326
2327
2328
2329
2330 /**
2331  * Test if the DATABASE (PUT) load on this peer is too high
2332  * to even consider processing the query at
2333  * all.  
2334  * 
2335  * @return GNUNET_YES if the load is too high to do anything (load high)
2336  *         GNUNET_NO to process normally (load normal or low)
2337  */
2338 static int
2339 test_put_load_too_high (uint32_t priority)
2340 {
2341   double ld;
2342
2343   if (GNUNET_LOAD_get_average (datastore_put_load) < 50)
2344     return GNUNET_NO; /* very fast */
2345   ld = GNUNET_LOAD_get_load (datastore_put_load);
2346   if (ld < 2.0 * (1 + priority))
2347     return GNUNET_NO;
2348   GNUNET_STATISTICS_update (stats,
2349                             gettext_noop ("# storage requests dropped due to high load"),
2350                             1,
2351                             GNUNET_NO);
2352   return GNUNET_YES;
2353 }
2354
2355
2356 /* ******************* Pending Request Refresh Task ******************** */
2357
2358
2359
2360 /**
2361  * We use a random delay to make the timing of requests less
2362  * predictable.  This function returns such a random delay.  We add a base
2363  * delay of MAX_CORK_DELAY (1s).
2364  *
2365  * FIXME: make schedule dependent on the specifics of the request?
2366  * Or bandwidth and number of connected peers and load?
2367  *
2368  * @return random delay to use for some request, between 1s and 1000+TTL_DECREMENT ms
2369  */
2370 static struct GNUNET_TIME_Relative
2371 get_processing_delay ()
2372 {
2373   return 
2374     GNUNET_TIME_relative_add (GNUNET_CONSTANTS_MAX_CORK_DELAY,
2375                               GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
2376                                                              GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2377                                                                                        TTL_DECREMENT)));
2378 }
2379
2380
2381 /**
2382  * We're processing a GET request from another peer and have decided
2383  * to forward it to other peers.  This function is called periodically
2384  * and should forward the request to other peers until we have all
2385  * possible replies.  If we have transmitted the *only* reply to
2386  * the initiator we should destroy the pending request.  If we have
2387  * many replies in the queue to the initiator, we should delay sending
2388  * out more queries until the reply queue has shrunk some.
2389  *
2390  * @param cls our "struct ProcessGetContext *"
2391  * @param tc unused
2392  */
2393 static void
2394 forward_request_task (void *cls,
2395                       const struct GNUNET_SCHEDULER_TaskContext *tc);
2396
2397
2398 /**
2399  * Function called after we either failed or succeeded
2400  * at transmitting a query to a peer.  
2401  *
2402  * @param cls the requests "struct PendingRequest*"
2403  * @param tpid ID of receiving peer, 0 on transmission error
2404  */
2405 static void
2406 transmit_query_continuation (void *cls,
2407                              GNUNET_PEER_Id tpid)
2408 {
2409   struct PendingRequest *pr = cls;
2410   unsigned int i;
2411
2412   GNUNET_STATISTICS_update (stats,
2413                             gettext_noop ("# queries scheduled for forwarding"),
2414                             -1,
2415                             GNUNET_NO);
2416   if (tpid == 0)   
2417     {
2418 #if DEBUG_FS
2419       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2420                   "Transmission of request failed, will try again later.\n");
2421 #endif
2422       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2423         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2424                                                  get_processing_delay (),
2425                                                  &forward_request_task,
2426                                                  pr); 
2427       return;    
2428     }
2429 #if DEBUG_FS
2430   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2431               "Transmitted query `%s'\n",
2432               GNUNET_h2s (&pr->query));
2433 #endif
2434   GNUNET_STATISTICS_update (stats,
2435                             gettext_noop ("# queries forwarded"),
2436                             1,
2437                             GNUNET_NO);
2438   for (i=0;i<pr->used_targets_off;i++)
2439     if (pr->used_targets[i].pid == tpid)
2440       break; /* found match! */    
2441   if (i == pr->used_targets_off)
2442     {
2443       /* need to create new entry */
2444       if (pr->used_targets_off == pr->used_targets_size)
2445         GNUNET_array_grow (pr->used_targets,
2446                            pr->used_targets_size,
2447                            pr->used_targets_size * 2 + 2);
2448       GNUNET_PEER_change_rc (tpid, 1);
2449       pr->used_targets[pr->used_targets_off].pid = tpid;
2450       pr->used_targets[pr->used_targets_off].num_requests = 0;
2451       i = pr->used_targets_off++;
2452     }
2453   pr->used_targets[i].last_request_time = GNUNET_TIME_absolute_get ();
2454   pr->used_targets[i].num_requests++;
2455   if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2456     pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2457                                              get_processing_delay (),
2458                                              &forward_request_task,
2459                                              pr);
2460 }
2461
2462
2463 /**
2464  * How many bytes should a bloomfilter be if we have already seen
2465  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
2466  * of bits set per entry.  Furthermore, we should not re-size the
2467  * filter too often (to keep it cheap).
2468  *
2469  * Since other peers will also add entries but not resize the filter,
2470  * we should generally pick a slightly larger size than what the
2471  * strict math would suggest.
2472  *
2473  * @return must be a power of two and smaller or equal to 2^15.
2474  */
2475 static size_t
2476 compute_bloomfilter_size (unsigned int entry_count)
2477 {
2478   size_t size;
2479   unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
2480   uint16_t max = 1 << 15;
2481
2482   if (entry_count > max)
2483     return max;
2484   size = 8;
2485   while ((size < max) && (size < ideal))
2486     size *= 2;
2487   if (size > max)
2488     return max;
2489   return size;
2490 }
2491
2492
2493 /**
2494  * Recalculate our bloom filter for filtering replies.  This function
2495  * will create a new bloom filter from scratch, so it should only be
2496  * called if we have no bloomfilter at all (and hence can create a
2497  * fresh one of minimal size without problems) OR if our peer is the
2498  * initiator (in which case we may resize to larger than mimimum size).
2499  *
2500  * @param pr request for which the BF is to be recomputed
2501  */
2502 static void
2503 refresh_bloomfilter (struct PendingRequest *pr)
2504 {
2505   unsigned int i;
2506   size_t nsize;
2507   GNUNET_HashCode mhash;
2508
2509   nsize = compute_bloomfilter_size (pr->replies_seen_off);
2510   if (nsize == pr->bf_size)
2511     return; /* size not changed */
2512   if (pr->bf != NULL)
2513     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
2514   pr->bf_size = nsize;
2515   pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
2516   pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
2517                                               pr->bf_size,
2518                                               BLOOMFILTER_K);
2519   for (i=0;i<pr->replies_seen_off;i++)
2520     {
2521       GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i],
2522                                 pr->mingle,
2523                                 &mhash);
2524       GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
2525     }
2526 }
2527
2528
2529 /**
2530  * Function called after we've tried to reserve a certain amount of
2531  * bandwidth for a reply.  Check if we succeeded and if so send our
2532  * query.
2533  *
2534  * @param cls the requests "struct PendingRequest*"
2535  * @param peer identifies the peer
2536  * @param bpm_in set to the current bandwidth limit (receiving) for this peer
2537  * @param bpm_out set to the current bandwidth limit (sending) for this peer
2538  * @param amount set to the amount that was actually reserved or unreserved
2539  * @param preference current traffic preference for the given peer
2540  */
2541 static void
2542 target_reservation_cb (void *cls,
2543                        const struct
2544                        GNUNET_PeerIdentity * peer,
2545                        struct GNUNET_BANDWIDTH_Value32NBO bpm_in,
2546                        struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
2547                        int amount,
2548                        uint64_t preference)
2549 {
2550   struct PendingRequest *pr = cls;
2551   struct ConnectedPeer *cp;
2552   struct PendingMessage *pm;
2553   struct GetMessage *gm;
2554   GNUNET_HashCode *ext;
2555   char *bfdata;
2556   size_t msize;
2557   unsigned int k;
2558   int no_route;
2559   uint32_t bm;
2560   unsigned int i;
2561
2562   pr->irc = NULL;
2563   if (peer == NULL)
2564     {
2565       /* error in communication with core, try again later */
2566       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2567         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2568                                                  get_processing_delay (),
2569                                                  &forward_request_task,
2570                                                  pr);
2571       return;
2572     }
2573   /* (3) transmit, update ttl/priority */
2574   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2575                                           &peer->hashPubKey);
2576   if (cp == NULL)
2577     {
2578       /* Peer must have just left */
2579 #if DEBUG_FS
2580       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2581                   "Selected peer disconnected!\n");
2582 #endif
2583       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2584         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2585                                                  get_processing_delay (),
2586                                                  &forward_request_task,
2587                                                  pr);
2588       return;
2589     }
2590   no_route = GNUNET_NO;
2591   if (amount == 0)
2592     {
2593       if (pr->cp == NULL)
2594         {
2595 #if DEBUG_FS > 1
2596           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2597                       "Failed to reserve bandwidth for reply (got %d/%u bytes only)!\n",
2598                       amount,
2599                       DBLOCK_SIZE);
2600 #endif
2601           GNUNET_STATISTICS_update (stats,
2602                                     gettext_noop ("# reply bandwidth reservation requests failed"),
2603                                     1,
2604                                     GNUNET_NO);
2605           if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2606             pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2607                                                      get_processing_delay (),
2608                                                      &forward_request_task,
2609                                                      pr);
2610           return;  /* this target round failed */
2611         }
2612       no_route = GNUNET_YES;
2613     }
2614   
2615   GNUNET_STATISTICS_update (stats,
2616                             gettext_noop ("# queries scheduled for forwarding"),
2617                             1,
2618                             GNUNET_NO);
2619   for (i=0;i<pr->used_targets_off;i++)
2620     if (pr->used_targets[i].pid == cp->pid) 
2621       {
2622         GNUNET_STATISTICS_update (stats,
2623                                   gettext_noop ("# queries retransmitted to same target"),
2624                                   1,
2625                                   GNUNET_NO);
2626         break;
2627       } 
2628
2629   /* build message and insert message into priority queue */
2630 #if DEBUG_FS
2631   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2632               "Forwarding request `%s' to `%4s'!\n",
2633               GNUNET_h2s (&pr->query),
2634               GNUNET_i2s (peer));
2635 #endif
2636   k = 0;
2637   bm = 0;
2638   if (GNUNET_YES == no_route)
2639     {
2640       bm |= GET_MESSAGE_BIT_RETURN_TO;
2641       k++;      
2642     }
2643   if (pr->namespace != NULL)
2644     {
2645       bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
2646       k++;
2647     }
2648   if (pr->target_pid != 0)
2649     {
2650       bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
2651       k++;
2652     }
2653   msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
2654   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
2655   pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
2656   pm->msize = msize;
2657   gm = (struct GetMessage*) &pm[1];
2658   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
2659   gm->header.size = htons (msize);
2660   gm->type = htonl (pr->type);
2661   pr->remaining_priority /= 2;
2662   gm->priority = htonl (pr->remaining_priority);
2663   gm->ttl = htonl (pr->ttl);
2664   gm->filter_mutator = htonl(pr->mingle); 
2665   gm->hash_bitmap = htonl (bm);
2666   gm->query = pr->query;
2667   ext = (GNUNET_HashCode*) &gm[1];
2668   k = 0;
2669   if (GNUNET_YES == no_route)
2670     GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2671   if (pr->namespace != NULL)
2672     memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
2673   if (pr->target_pid != 0)
2674     GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2675   bfdata = (char *) &ext[k];
2676   if (pr->bf != NULL)
2677     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
2678                                                bfdata,
2679                                                pr->bf_size);
2680   pm->cont = &transmit_query_continuation;
2681   pm->cont_cls = pr;
2682   cp->last_request_times[(cp->last_request_times_off++) % MAX_QUEUE_PER_PEER] = GNUNET_TIME_absolute_get ();
2683   add_to_pending_messages_for_peer (cp, pm, pr);
2684 }
2685
2686
2687 /**
2688  * Closure used for "target_peer_select_cb".
2689  */
2690 struct PeerSelectionContext 
2691 {
2692   /**
2693    * The request for which we are selecting
2694    * peers.
2695    */
2696   struct PendingRequest *pr;
2697
2698   /**
2699    * Current "prime" target.
2700    */
2701   struct GNUNET_PeerIdentity target;
2702
2703   /**
2704    * How much do we like this target?
2705    */
2706   double target_score;
2707
2708 };
2709
2710
2711 /**
2712  * Function called for each connected peer to determine
2713  * which one(s) would make good targets for forwarding.
2714  *
2715  * @param cls closure (struct PeerSelectionContext)
2716  * @param key current key code (peer identity)
2717  * @param value value in the hash map (struct ConnectedPeer)
2718  * @return GNUNET_YES if we should continue to
2719  *         iterate,
2720  *         GNUNET_NO if not.
2721  */
2722 static int
2723 target_peer_select_cb (void *cls,
2724                        const GNUNET_HashCode * key,
2725                        void *value)
2726 {
2727   struct PeerSelectionContext *psc = cls;
2728   struct ConnectedPeer *cp = value;
2729   struct PendingRequest *pr = psc->pr;
2730   struct GNUNET_TIME_Relative delay;
2731   double score;
2732   unsigned int i;
2733   unsigned int pc;
2734
2735   /* 1) check that this peer is not the initiator */
2736   if (cp == pr->cp)
2737     {
2738 #if DEBUG_FS
2739       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2740                   "Skipping initiator in forwarding selection\n");
2741 #endif
2742       return GNUNET_YES; /* skip */        
2743     }
2744
2745   /* 2) check if we have already (recently) forwarded to this peer */
2746   /* 2a) this particular request */
2747   pc = 0;
2748   for (i=0;i<pr->used_targets_off;i++)
2749     if (pr->used_targets[i].pid == cp->pid) 
2750       {
2751         pc = pr->used_targets[i].num_requests;
2752         GNUNET_assert (pc > 0);
2753         if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2754                                            RETRY_PROBABILITY_INV * pc))
2755           {
2756 #if DEBUG_FS
2757             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2758                         "NOT re-trying query that was previously transmitted %u times\n",
2759                         (unsigned int) pc);
2760 #endif
2761             return GNUNET_YES; /* skip */
2762           }
2763         break;
2764       }
2765 #if DEBUG_FS
2766   if (0 < pc)
2767     {
2768       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2769                   "Re-trying query that was previously transmitted %u times to this peer\n",
2770                   (unsigned int) pc);
2771     }
2772 #endif
2773   /* 2b) many other requests to this peer */
2774   delay = GNUNET_TIME_absolute_get_duration (cp->last_request_times[cp->last_request_times_off % MAX_QUEUE_PER_PEER]);
2775   if (delay.value <= cp->avg_delay.value)
2776     {
2777 #if DEBUG_FS
2778       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2779                   "NOT sending query since we send %u others to this peer in the last %llums\n",
2780                   MAX_QUEUE_PER_PEER,
2781                   cp->avg_delay.value);
2782 #endif
2783       return GNUNET_YES; /* skip */      
2784     }
2785
2786   /* 3) calculate how much we'd like to forward to this peer,
2787      starting with a random value that is strong enough
2788      to at least give any peer a chance sometimes 
2789      (compared to the other factors that come later) */
2790   /* 3a) count successful (recent) routes from cp for same source */
2791   if (pr->cp != NULL)
2792     {
2793       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2794                                         P2P_SUCCESS_LIST_SIZE);
2795       for (i=0;i<P2P_SUCCESS_LIST_SIZE;i++)
2796         if (cp->last_p2p_replies[i] == pr->cp->pid)
2797           score += 1.0; /* likely successful based on hot path */
2798     }
2799   else
2800     {
2801       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2802                                         CS2P_SUCCESS_LIST_SIZE);
2803       for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
2804         if (cp->last_client_replies[i] == pr->client_request_list->client_list->client)
2805           score += 1.0; /* likely successful based on hot path */
2806     }
2807   /* 3b) include latency */
2808   if (cp->avg_delay.value < 4 * TTL_DECREMENT)
2809     score += 1.0; /* likely fast based on latency */
2810   /* 3c) include priorities */
2811   if (cp->avg_priority <= pr->remaining_priority / 2.0)
2812     score += 1.0; /* likely successful based on priorities */
2813   /* 3d) penalize for queue size */  
2814   score -= (2.0 * cp->pending_requests / (double) MAX_QUEUE_PER_PEER); 
2815   /* 3e) include peer proximity */
2816   score -= (2.0 * (GNUNET_CRYPTO_hash_distance_u32 (key,
2817                                                     &pr->query)) / (double) UINT32_MAX);
2818   /* 4) super-bonus for being the known target */
2819   if (pr->target_pid == cp->pid)
2820     score += 100.0;
2821   /* store best-fit in closure */
2822 #if DEBUG_FS
2823   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2824               "Peer `%s' gets score %f for forwarding query, max is %f\n",
2825               GNUNET_h2s (key),
2826               score,
2827               psc->target_score);
2828 #endif  
2829   score++; /* avoid zero */
2830   if (score > psc->target_score)
2831     {
2832       psc->target_score = score;
2833       psc->target.hashPubKey = *key; 
2834     }
2835   return GNUNET_YES;
2836 }
2837   
2838
2839 /**
2840  * The priority level imposes a bound on the maximum
2841  * value for the ttl that can be requested.
2842  *
2843  * @param ttl_in requested ttl
2844  * @param prio given priority
2845  * @return ttl_in if ttl_in is below the limit,
2846  *         otherwise the ttl-limit for the given priority
2847  */
2848 static int32_t
2849 bound_ttl (int32_t ttl_in, uint32_t prio)
2850 {
2851   unsigned long long allowed;
2852
2853   if (ttl_in <= 0)
2854     return ttl_in;
2855   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
2856   if (ttl_in > allowed)      
2857     {
2858       if (allowed >= (1 << 30))
2859         return 1 << 30;
2860       return allowed;
2861     }
2862   return ttl_in;
2863 }
2864
2865
2866 /**
2867  * Iterator called on each result obtained for a DHT
2868  * operation that expects a reply
2869  *
2870  * @param cls closure
2871  * @param exp when will this value expire
2872  * @param key key of the result
2873  * @param get_path NULL-terminated array of pointers
2874  *                 to the peers on reverse GET path (or NULL if not recorded)
2875  * @param put_path NULL-terminated array of pointers
2876  *                 to the peers on the PUT path (or NULL if not recorded)
2877  * @param type type of the result
2878  * @param size number of bytes in data
2879  * @param data pointer to the result data
2880  */
2881 static void
2882 process_dht_reply (void *cls,
2883                    struct GNUNET_TIME_Absolute exp,
2884                    const GNUNET_HashCode * key,
2885                    const struct GNUNET_PeerIdentity * const *get_path,
2886                    const struct GNUNET_PeerIdentity * const *put_path,
2887                    enum GNUNET_BLOCK_Type type,
2888                    size_t size,
2889                    const void *data);
2890
2891
2892 /**
2893  * We're processing a GET request and have decided
2894  * to forward it to other peers.  This function is called periodically
2895  * and should forward the request to other peers until we have all
2896  * possible replies.  If we have transmitted the *only* reply to
2897  * the initiator we should destroy the pending request.  If we have
2898  * many replies in the queue to the initiator, we should delay sending
2899  * out more queries until the reply queue has shrunk some.
2900  *
2901  * @param cls our "struct ProcessGetContext *"
2902  * @param tc unused
2903  */
2904 static void
2905 forward_request_task (void *cls,
2906                      const struct GNUNET_SCHEDULER_TaskContext *tc)
2907 {
2908   struct PendingRequest *pr = cls;
2909   struct PeerSelectionContext psc;
2910   struct ConnectedPeer *cp; 
2911   struct GNUNET_TIME_Relative delay;
2912
2913   pr->task = GNUNET_SCHEDULER_NO_TASK;
2914   if (pr->irc != NULL)
2915     {
2916 #if DEBUG_FS
2917       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2918                   "Forwarding of query `%s' not attempted due to pending local lookup!\n",
2919                   GNUNET_h2s (&pr->query));
2920 #endif
2921       return; /* already pending */
2922     }
2923   if (GNUNET_YES == pr->local_only)
2924     return; /* configured to not do P2P search */
2925   /* (0) try DHT */
2926   if ( (0 == pr->anonymity_level) &&
2927        (GNUNET_YES != pr->forward_only) &&
2928        (pr->type != GNUNET_BLOCK_TYPE_FS_DBLOCK) &&
2929        (pr->type != GNUNET_BLOCK_TYPE_FS_IBLOCK) )
2930     {
2931       pr->dht_get = GNUNET_DHT_get_start (dht_handle,
2932                                           GNUNET_TIME_UNIT_FOREVER_REL,
2933                                           pr->type,
2934                                           &pr->query,
2935                                           GNUNET_DHT_RO_NONE,
2936                                           pr->bf,
2937                                           pr->mingle,
2938                                           pr->namespace,
2939                                           (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
2940                                           &process_dht_reply,
2941                                           pr);
2942     }
2943   /* (1) select target */
2944   psc.pr = pr;
2945   psc.target_score = -DBL_MAX;
2946   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
2947                                          &target_peer_select_cb,
2948                                          &psc);  
2949   if (psc.target_score == -DBL_MAX)
2950     {
2951       delay = get_processing_delay ();
2952 #if DEBUG_FS 
2953       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2954                   "No peer selected for forwarding of query `%s', will try again in %llu ms!\n",
2955                   GNUNET_h2s (&pr->query),
2956                   delay.value);
2957 #endif
2958       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2959                                                delay,
2960                                                &forward_request_task,
2961                                                pr);
2962       return; /* nobody selected */
2963     }
2964   /* (3) update TTL/priority */
2965   if (pr->client_request_list != NULL)
2966     {
2967       /* FIXME: use better algorithm!? */
2968       if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2969                                          4))
2970         pr->priority++;
2971       /* bound priority we use by priorities we see from other peers
2972          rounded up (must round up so that we can see non-zero
2973          priorities, but round up as little as possible to make it
2974          plausible that we forwarded another peers request) */
2975       if (pr->priority > current_priorities + 1.0)
2976         pr->priority = (uint32_t) current_priorities + 1.0;
2977       pr->ttl = bound_ttl (pr->ttl + TTL_DECREMENT * 2,
2978                            pr->priority);
2979 #if DEBUG_FS
2980       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2981                   "Trying query `%s' with priority %u and TTL %d.\n",
2982                   GNUNET_h2s (&pr->query),
2983                   pr->priority,
2984                   pr->ttl);
2985 #endif
2986     }
2987
2988   /* (3) reserve reply bandwidth */
2989   if (GNUNET_NO == pr->forward_only)
2990     {
2991       cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2992                                               &psc.target.hashPubKey);
2993       GNUNET_assert (NULL != cp);
2994       pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
2995                                                     &psc.target,
2996                                                     GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
2997                                                     GNUNET_BANDWIDTH_value_init (UINT32_MAX),
2998                                                     DBLOCK_SIZE * 2, 
2999                                                     cp->inc_preference,
3000                                                     &target_reservation_cb,
3001                                                     pr);
3002       cp->inc_preference = 0;
3003     }
3004   else
3005     {
3006       /* force forwarding */
3007       static struct GNUNET_BANDWIDTH_Value32NBO zerobw;
3008       target_reservation_cb (pr, &psc.target,
3009                              zerobw, zerobw, 0, 0.0);
3010     }
3011 }
3012
3013
3014 /* **************************** P2P PUT Handling ************************ */
3015
3016
3017 /**
3018  * Function called after we either failed or succeeded
3019  * at transmitting a reply to a peer.  
3020  *
3021  * @param cls the requests "struct PendingRequest*"
3022  * @param tpid ID of receiving peer, 0 on transmission error
3023  */
3024 static void
3025 transmit_reply_continuation (void *cls,
3026                              GNUNET_PEER_Id tpid)
3027 {
3028   struct PendingRequest *pr = cls;
3029   
3030   switch (pr->type)
3031     {
3032     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
3033     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
3034       /* only one reply expected, done with the request! */
3035       destroy_pending_request (pr);
3036       break;
3037     case GNUNET_BLOCK_TYPE_ANY:
3038     case GNUNET_BLOCK_TYPE_FS_KBLOCK:
3039     case GNUNET_BLOCK_TYPE_FS_SBLOCK:
3040       break;
3041     default:
3042       GNUNET_break (0);
3043       break;
3044     }
3045 }
3046
3047
3048 /**
3049  * Transmit the given message by copying it to the target buffer
3050  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
3051  * for writing in the meantime.  In that case, do nothing
3052  * (the disconnect or shutdown handler will take care of the rest).
3053  * If we were able to transmit messages and there are still more
3054  * pending, ask core again for further calls to this function.
3055  *
3056  * @param cls closure, pointer to the 'struct ClientList*'
3057  * @param size number of bytes available in buf
3058  * @param buf where the callee should write the message
3059  * @return number of bytes written to buf
3060  */
3061 static size_t
3062 transmit_to_client (void *cls,
3063                   size_t size, void *buf)
3064 {
3065   struct ClientList *cl = cls;
3066   char *cbuf = buf;
3067   struct ClientResponseMessage *creply;
3068   size_t msize;
3069   
3070   cl->th = NULL;
3071   if (NULL == buf)
3072     {
3073 #if DEBUG_FS
3074       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3075                   "Not sending reply, client communication problem.\n");
3076 #endif
3077       return 0;
3078     }
3079   msize = 0;
3080   while ( (NULL != (creply = cl->res_head) ) &&
3081           (creply->msize <= size) )
3082     {
3083       memcpy (&cbuf[msize], &creply[1], creply->msize);
3084       msize += creply->msize;
3085       size -= creply->msize;
3086       GNUNET_CONTAINER_DLL_remove (cl->res_head,
3087                                    cl->res_tail,
3088                                    creply);
3089       GNUNET_free (creply);
3090     }
3091   if (NULL != creply)
3092     cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
3093                                                   creply->msize,
3094                                                   GNUNET_TIME_UNIT_FOREVER_REL,
3095                                                   &transmit_to_client,
3096                                                   cl);
3097 #if DEBUG_FS
3098   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3099               "Transmitted %u bytes to client\n",
3100               (unsigned int) msize);
3101 #endif
3102   return msize;
3103 }
3104
3105
3106 /**
3107  * Closure for "process_reply" function.
3108  */
3109 struct ProcessReplyClosure
3110 {
3111   /**
3112    * The data for the reply.
3113    */
3114   const void *data;
3115
3116   /**
3117    * Who gave us this reply? NULL for local host (or DHT)
3118    */
3119   struct ConnectedPeer *sender;
3120
3121   /**
3122    * When the reply expires.
3123    */
3124   struct GNUNET_TIME_Absolute expiration;
3125
3126   /**
3127    * Size of data.
3128    */
3129   size_t size;
3130
3131   /**
3132    * Type of the block.
3133    */
3134   enum GNUNET_BLOCK_Type type;
3135
3136   /**
3137    * How much was this reply worth to us?
3138    */
3139   uint32_t priority;
3140
3141   /**
3142    * Evaluation result (returned).
3143    */
3144   enum GNUNET_BLOCK_EvaluationResult eval;
3145
3146   /**
3147    * Did we finish processing the associated request?
3148    */ 
3149   int finished;
3150
3151   /**
3152    * Did we find a matching request?
3153    */
3154   int request_found;
3155 };
3156
3157
3158 /**
3159  * We have received a reply; handle it!
3160  *
3161  * @param cls response (struct ProcessReplyClosure)
3162  * @param key our query
3163  * @param value value in the hash map (info about the query)
3164  * @return GNUNET_YES (we should continue to iterate)
3165  */
3166 static int
3167 process_reply (void *cls,
3168                const GNUNET_HashCode * key,
3169                void *value)
3170 {
3171   struct ProcessReplyClosure *prq = cls;
3172   struct PendingRequest *pr = value;
3173   struct PendingMessage *reply;
3174   struct ClientResponseMessage *creply;
3175   struct ClientList *cl;
3176   struct PutMessage *pm;
3177   struct ConnectedPeer *cp;
3178   struct GNUNET_TIME_Relative cur_delay;
3179 #if SUPPORT_DELAYS  
3180 struct GNUNET_TIME_Relative art_delay;
3181 #endif
3182   size_t msize;
3183   unsigned int i;
3184
3185 #if DEBUG_FS
3186   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3187               "Matched result (type %u) for query `%s' with pending request\n",
3188               (unsigned int) prq->type,
3189               GNUNET_h2s (key));
3190 #endif  
3191   GNUNET_STATISTICS_update (stats,
3192                             gettext_noop ("# replies received and matched"),
3193                             1,
3194                             GNUNET_NO);
3195   if (prq->sender != NULL)
3196     {
3197       for (i=0;i<pr->used_targets_off;i++)
3198         if (pr->used_targets[i].pid == prq->sender->pid)
3199           break;
3200       if (i < pr->used_targets_off)
3201         {
3202           cur_delay = GNUNET_TIME_absolute_get_duration (pr->used_targets[i].last_request_time);      
3203           prq->sender->avg_delay.value
3204             = (prq->sender->avg_delay.value * 
3205                (RUNAVG_DELAY_N - 1) + cur_delay.value) / RUNAVG_DELAY_N; 
3206           prq->sender->avg_priority
3207             = (prq->sender->avg_priority * 
3208                (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
3209         }
3210       if (pr->cp != NULL)
3211         {
3212           GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
3213                                  [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], 
3214                                  -1);
3215           GNUNET_PEER_change_rc (pr->cp->pid, 1);
3216           prq->sender->last_p2p_replies
3217             [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
3218             = pr->cp->pid;
3219         }
3220       else
3221         {
3222           if (NULL != prq->sender->last_client_replies
3223               [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
3224             GNUNET_SERVER_client_drop (prq->sender->last_client_replies
3225                                        [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
3226           prq->sender->last_client_replies
3227             [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
3228             = pr->client_request_list->client_list->client;
3229           GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
3230         }
3231     }
3232   prq->eval = GNUNET_BLOCK_evaluate (block_ctx,
3233                                      prq->type,
3234                                      key,
3235                                      &pr->bf,
3236                                      pr->mingle,
3237                                      pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
3238                                      prq->data,
3239                                      prq->size);
3240   switch (prq->eval)
3241     {
3242     case GNUNET_BLOCK_EVALUATION_OK_MORE:
3243       break;
3244     case GNUNET_BLOCK_EVALUATION_OK_LAST:
3245       while (NULL != pr->pending_head)
3246         destroy_pending_message_list_entry (pr->pending_head);
3247       if (pr->qe != NULL)
3248         {
3249           if (pr->client_request_list != NULL)
3250             GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
3251                                         GNUNET_YES);
3252           GNUNET_DATASTORE_cancel (pr->qe);
3253           pr->qe = NULL;
3254         }
3255       pr->do_remove = GNUNET_YES;
3256       if (pr->task != GNUNET_SCHEDULER_NO_TASK)
3257         {
3258           GNUNET_SCHEDULER_cancel (sched,
3259                                    pr->task);
3260           pr->task = GNUNET_SCHEDULER_NO_TASK;
3261         }
3262       GNUNET_break (GNUNET_YES ==
3263                     GNUNET_CONTAINER_multihashmap_remove (query_request_map,
3264                                                           key,
3265                                                           pr));
3266       GNUNET_LOAD_update (rt_entry_lifetime,
3267                           GNUNET_TIME_absolute_get_duration (pr->start_time).value);
3268       break;
3269     case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
3270       GNUNET_STATISTICS_update (stats,
3271                                 gettext_noop ("# duplicate replies discarded (bloomfilter)"),
3272                                 1,
3273                                 GNUNET_NO);
3274 #if DEBUG_FS
3275       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3276                   "Duplicate response `%s', discarding.\n",
3277                   GNUNET_h2s (&mhash));
3278 #endif
3279       return GNUNET_YES; /* duplicate */
3280     case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
3281       return GNUNET_YES; /* wrong namespace */  
3282     case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
3283       GNUNET_break (0);
3284       return GNUNET_YES;
3285     case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
3286       GNUNET_break (0);
3287       return GNUNET_YES;
3288     case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
3289       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3290                   _("Unsupported block type %u\n"),
3291                   prq->type);
3292       return GNUNET_NO;
3293     }
3294   if (pr->client_request_list != NULL)
3295     {
3296       if (pr->replies_seen_size == pr->replies_seen_off)
3297         GNUNET_array_grow (pr->replies_seen,
3298                            pr->replies_seen_size,
3299                            pr->replies_seen_size * 2 + 4);      
3300       GNUNET_CRYPTO_hash (prq->data,
3301                           prq->size,
3302                           &pr->replies_seen[pr->replies_seen_off++]);         
3303       refresh_bloomfilter (pr);
3304     }
3305   if (NULL == prq->sender)
3306     {
3307 #if DEBUG_FS
3308       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3309                   "Found result for query `%s' in local datastore\n",
3310                   GNUNET_h2s (key));
3311 #endif
3312       GNUNET_STATISTICS_update (stats,
3313                                 gettext_noop ("# results found locally"),
3314                                 1,
3315                                 GNUNET_NO);      
3316     }
3317   prq->priority += pr->remaining_priority;
3318   pr->remaining_priority = 0;
3319   pr->results_found++;
3320   prq->request_found = GNUNET_YES;
3321   if (NULL != pr->client_request_list)
3322     {
3323       GNUNET_STATISTICS_update (stats,
3324                                 gettext_noop ("# replies received for local clients"),
3325                                 1,
3326                                 GNUNET_NO);
3327       cl = pr->client_request_list->client_list;
3328       msize = sizeof (struct PutMessage) + prq->size;
3329       creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage));
3330       creply->msize = msize;
3331       creply->client_list = cl;
3332       GNUNET_CONTAINER_DLL_insert_after (cl->res_head,
3333                                          cl->res_tail,
3334                                          cl->res_tail,
3335                                          creply);      
3336       pm = (struct PutMessage*) &creply[1];
3337       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3338       pm->header.size = htons (msize);
3339       pm->type = htonl (prq->type);
3340       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3341       memcpy (&pm[1], prq->data, prq->size);      
3342       if (NULL == cl->th)
3343         {
3344 #if DEBUG_FS
3345           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3346                       "Transmitting result for query `%s' to client\n",
3347                       GNUNET_h2s (key));
3348 #endif  
3349           cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
3350                                                         msize,
3351                                                         GNUNET_TIME_UNIT_FOREVER_REL,
3352                                                         &transmit_to_client,
3353                                                         cl);
3354         }
3355       GNUNET_break (cl->th != NULL);
3356       if (pr->do_remove)                
3357         {
3358           prq->finished = GNUNET_YES;
3359           destroy_pending_request (pr);         
3360         }
3361     }
3362   else
3363     {
3364       cp = pr->cp;
3365 #if DEBUG_FS
3366       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3367                   "Transmitting result for query `%s' to other peer (PID=%u)\n",
3368                   GNUNET_h2s (key),
3369                   (unsigned int) cp->pid);
3370 #endif  
3371       GNUNET_STATISTICS_update (stats,
3372                                 gettext_noop ("# replies received for other peers"),
3373                                 1,
3374                                 GNUNET_NO);
3375       msize = sizeof (struct PutMessage) + prq->size;
3376       reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
3377       reply->cont = &transmit_reply_continuation;
3378       reply->cont_cls = pr;
3379 #if SUPPORT_DELAYS
3380       art_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
3381                                                  GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3382                                                                            TTL_DECREMENT));
3383       reply->delay_until 
3384         = GNUNET_TIME_relative_to_absolute (art_delay);
3385       GNUNET_STATISTICS_update (stats,
3386                                 gettext_noop ("cummulative artificial delay introduced (ms)"),
3387                                 art_delay.value,
3388                                 GNUNET_NO);
3389 #endif
3390       reply->msize = msize;
3391       reply->priority = UINT32_MAX; /* send replies first! */
3392       pm = (struct PutMessage*) &reply[1];
3393       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3394       pm->header.size = htons (msize);
3395       pm->type = htonl (prq->type);
3396       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3397       memcpy (&pm[1], prq->data, prq->size);
3398       add_to_pending_messages_for_peer (cp, reply, pr);
3399     }
3400   return GNUNET_YES;
3401 }
3402
3403
3404 /**
3405  * Iterator called on each result obtained for a DHT
3406  * operation that expects a reply
3407  *
3408  * @param cls closure
3409  * @param exp when will this value expire
3410  * @param key key of the result
3411  * @param get_path NULL-terminated array of pointers
3412  *                 to the peers on reverse GET path (or NULL if not recorded)
3413  * @param put_path NULL-terminated array of pointers
3414  *                 to the peers on the PUT path (or NULL if not recorded)
3415  * @param type type of the result
3416  * @param size number of bytes in data
3417  * @param data pointer to the result data
3418  */
3419 static void
3420 process_dht_reply (void *cls,
3421                    struct GNUNET_TIME_Absolute exp,
3422                    const GNUNET_HashCode * key,
3423                    const struct GNUNET_PeerIdentity * const *get_path,
3424                    const struct GNUNET_PeerIdentity * const *put_path,
3425                    enum GNUNET_BLOCK_Type type,
3426                    size_t size,
3427                    const void *data)
3428 {
3429   struct PendingRequest *pr = cls;
3430   struct ProcessReplyClosure prq;
3431
3432   memset (&prq, 0, sizeof (prq));
3433   prq.data = data;
3434   prq.expiration = exp;
3435   prq.size = size;  
3436   prq.type = type;
3437   process_reply (&prq, key, pr);
3438 }
3439
3440
3441
3442 /**
3443  * Continuation called to notify client about result of the
3444  * operation.
3445  *
3446  * @param cls closure
3447  * @param success GNUNET_SYSERR on failure
3448  * @param msg NULL on success, otherwise an error message
3449  */
3450 static void 
3451 put_migration_continuation (void *cls,
3452                             int success,
3453                             const char *msg)
3454 {
3455   struct GNUNET_TIME_Absolute *start = cls;
3456   struct GNUNET_TIME_Relative delay;
3457   
3458   delay = GNUNET_TIME_absolute_get_duration (*start);
3459   GNUNET_free (start);
3460   GNUNET_LOAD_update (datastore_put_load,
3461                       delay.value);
3462   if (GNUNET_OK == success)
3463     return;
3464   GNUNET_STATISTICS_update (stats,
3465                             gettext_noop ("# datastore 'put' failures"),
3466                             1,
3467                             GNUNET_NO);
3468 }
3469
3470
3471 /**
3472  * Handle P2P "PUT" message.
3473  *
3474  * @param cls closure, always NULL
3475  * @param other the other peer involved (sender or receiver, NULL
3476  *        for loopback messages where we are both sender and receiver)
3477  * @param message the actual message
3478  * @param latency reported latency of the connection with 'other'
3479  * @param distance reported distance (DV) to 'other' 
3480  * @return GNUNET_OK to keep the connection open,
3481  *         GNUNET_SYSERR to close it (signal serious error)
3482  */
3483 static int
3484 handle_p2p_put (void *cls,
3485                 const struct GNUNET_PeerIdentity *other,
3486                 const struct GNUNET_MessageHeader *message,
3487                 struct GNUNET_TIME_Relative latency,
3488                 uint32_t distance)
3489 {
3490   const struct PutMessage *put;
3491   uint16_t msize;
3492   size_t dsize;
3493   enum GNUNET_BLOCK_Type type;
3494   struct GNUNET_TIME_Absolute expiration;
3495   GNUNET_HashCode query;
3496   struct ProcessReplyClosure prq;
3497   struct GNUNET_TIME_Absolute *start;
3498   struct GNUNET_TIME_Relative block_time;  
3499   double putl;
3500   struct ConnectedPeer *cp; 
3501   struct PendingMessage *pm;
3502   struct MigrationStopMessage *msm;
3503
3504   msize = ntohs (message->size);
3505   if (msize < sizeof (struct PutMessage))
3506     {
3507       GNUNET_break_op(0);
3508       return GNUNET_SYSERR;
3509     }
3510   put = (const struct PutMessage*) message;
3511   dsize = msize - sizeof (struct PutMessage);
3512   type = ntohl (put->type);
3513   expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
3514
3515   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3516     return GNUNET_SYSERR;
3517   if (GNUNET_OK !=
3518       GNUNET_BLOCK_get_key (block_ctx,
3519                             type,
3520                             &put[1],
3521                             dsize,
3522                             &query))
3523     {
3524       GNUNET_break_op (0);
3525       return GNUNET_SYSERR;
3526     }
3527 #if DEBUG_FS
3528   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3529               "Received result for query `%s' from peer `%4s'\n",
3530               GNUNET_h2s (&query),
3531               GNUNET_i2s (other));
3532 #endif
3533   GNUNET_STATISTICS_update (stats,
3534                             gettext_noop ("# replies received (overall)"),
3535                             1,
3536                             GNUNET_NO);
3537   /* now, lookup 'query' */
3538   prq.data = (const void*) &put[1];
3539   if (other != NULL)
3540     prq.sender = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3541                                                     &other->hashPubKey);
3542   else
3543     prq.sender = NULL;
3544   prq.size = dsize;
3545   prq.type = type;
3546   prq.expiration = expiration;
3547   prq.priority = 0;
3548   prq.finished = GNUNET_NO;
3549   prq.request_found = GNUNET_NO;
3550   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3551                                               &query,
3552                                               &process_reply,
3553                                               &prq);
3554   if (prq.sender != NULL)
3555     {
3556       prq.sender->inc_preference += CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority;
3557       prq.sender->trust += prq.priority;
3558     }
3559   if ( (GNUNET_YES == active_migration) &&
3560        (GNUNET_NO == test_put_load_too_high (prq.priority)) )
3561     {      
3562 #if DEBUG_FS
3563       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3564                   "Replicating result for query `%s' with priority %u\n",
3565                   GNUNET_h2s (&query),
3566                   prq.priority);
3567 #endif
3568       start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
3569       *start = GNUNET_TIME_absolute_get ();
3570       GNUNET_DATASTORE_put (dsh,
3571                             0, &query, dsize, &put[1],
3572                             type, prq.priority, 1 /* anonymity */, 
3573                             expiration, 
3574                             1 + prq.priority, MAX_DATASTORE_QUEUE,
3575                             GNUNET_CONSTANTS_SERVICE_TIMEOUT,
3576                             &put_migration_continuation, 
3577                             start);
3578     }
3579   putl = GNUNET_LOAD_get_load (datastore_put_load);
3580   if ( (GNUNET_NO == prq.request_found) &&
3581        ( (GNUNET_YES != active_migration) ||
3582          (putl > 2.5 * (1 + prq.priority)) ) )
3583     {
3584       cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3585                                               &other->hashPubKey);
3586       if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).value < 5000)
3587         return GNUNET_OK; /* already blocked */
3588       /* We're too busy; send MigrationStop message! */
3589       if (GNUNET_YES != active_migration) 
3590         putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
3591       block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
3592                                                   5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3593                                                                                    (unsigned int) (60000 * putl * putl)));
3594       
3595       cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
3596       pm = GNUNET_malloc (sizeof (struct PendingMessage) + 
3597                           sizeof (struct MigrationStopMessage));
3598       pm->msize = sizeof (struct MigrationStopMessage);
3599       pm->priority = UINT32_MAX;
3600       msm = (struct MigrationStopMessage*) &pm[1];
3601       msm->header.size = htons (sizeof (struct MigrationStopMessage));
3602       msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
3603       msm->duration = GNUNET_TIME_relative_hton (block_time);
3604       add_to_pending_messages_for_peer (cp,
3605                                         pm,
3606                                         NULL);
3607     }
3608   return GNUNET_OK;
3609 }
3610
3611
3612 /**
3613  * Handle P2P "MIGRATION_STOP" message.
3614  *
3615  * @param cls closure, always NULL
3616  * @param other the other peer involved (sender or receiver, NULL
3617  *        for loopback messages where we are both sender and receiver)
3618  * @param message the actual message
3619  * @param latency reported latency of the connection with 'other'
3620  * @param distance reported distance (DV) to 'other' 
3621  * @return GNUNET_OK to keep the connection open,
3622  *         GNUNET_SYSERR to close it (signal serious error)
3623  */
3624 static int
3625 handle_p2p_migration_stop (void *cls,
3626                            const struct GNUNET_PeerIdentity *other,
3627                            const struct GNUNET_MessageHeader *message,
3628                            struct GNUNET_TIME_Relative latency,
3629                            uint32_t distance)
3630 {
3631   struct ConnectedPeer *cp; 
3632   const struct MigrationStopMessage *msm;
3633
3634   msm = (const struct MigrationStopMessage*) message;
3635   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3636                                           &other->hashPubKey);
3637   if (cp == NULL)
3638     {
3639       GNUNET_break (0);
3640       return GNUNET_OK;
3641     }
3642   cp->migration_blocked = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration));
3643   return GNUNET_OK;
3644 }
3645
3646
3647
3648 /* **************************** P2P GET Handling ************************ */
3649
3650
3651 /**
3652  * Closure for 'check_duplicate_request_{peer,client}'.
3653  */
3654 struct CheckDuplicateRequestClosure
3655 {
3656   /**
3657    * The new request we should check if it already exists.
3658    */
3659   const struct PendingRequest *pr;
3660
3661   /**
3662    * Existing request found by the checker, NULL if none.
3663    */
3664   struct PendingRequest *have;
3665 };
3666
3667
3668 /**
3669  * Iterator over entries in the 'query_request_map' that
3670  * tries to see if we have the same request pending from
3671  * the same client already.
3672  *
3673  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
3674  * @param key current key code (query, ignored, must match)
3675  * @param value value in the hash map (a 'struct PendingRequest' 
3676  *              that already exists)
3677  * @return GNUNET_YES if we should continue to
3678  *         iterate (no match yet)
3679  *         GNUNET_NO if not (match found).
3680  */
3681 static int
3682 check_duplicate_request_client (void *cls,
3683                                 const GNUNET_HashCode * key,
3684                                 void *value)
3685 {
3686   struct CheckDuplicateRequestClosure *cdc = cls;
3687   struct PendingRequest *have = value;
3688
3689   if (have->client_request_list == NULL)
3690     return GNUNET_YES;
3691   if ( (cdc->pr->client_request_list->client_list->client == have->client_request_list->client_list->client) &&
3692        (cdc->pr != have) )
3693     {
3694       cdc->have = have;
3695       return GNUNET_NO;
3696     }
3697   return GNUNET_YES;
3698 }
3699
3700
3701 /**
3702  * We're processing (local) results for a search request
3703  * from another peer.  Pass applicable results to the
3704  * peer and if we are done either clean up (operation
3705  * complete) or forward to other peers (more results possible).
3706  *
3707  * @param cls our closure (struct LocalGetContext)
3708  * @param key key for the content
3709  * @param size number of bytes in data
3710  * @param data content stored
3711  * @param type type of the content
3712  * @param priority priority of the content
3713  * @param anonymity anonymity-level for the content
3714  * @param expiration expiration time for the content
3715  * @param uid unique identifier for the datum;
3716  *        maybe 0 if no unique identifier is available
3717  */
3718 static void
3719 process_local_reply (void *cls,
3720                      const GNUNET_HashCode * key,
3721                      size_t size,
3722                      const void *data,
3723                      enum GNUNET_BLOCK_Type type,
3724                      uint32_t priority,
3725                      uint32_t anonymity,
3726                      struct GNUNET_TIME_Absolute
3727                      expiration, 
3728                      uint64_t uid)
3729 {
3730   struct PendingRequest *pr = cls;
3731   struct ProcessReplyClosure prq;
3732   struct CheckDuplicateRequestClosure cdrc;
3733   GNUNET_HashCode query;
3734   unsigned int old_rf;
3735   
3736   if (NULL == key)
3737     {
3738 #if DEBUG_FS > 1
3739       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3740                   "Done processing local replies, forwarding request to other peers.\n");
3741 #endif
3742       pr->qe = NULL;
3743       if (pr->client_request_list != NULL)
3744         {
3745           GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
3746                                       GNUNET_YES);
3747           /* Figure out if this is a duplicate request and possibly
3748              merge 'struct PendingRequest' entries */
3749           cdrc.have = NULL;
3750           cdrc.pr = pr;
3751           GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3752                                                       &pr->query,
3753                                                       &check_duplicate_request_client,
3754                                                       &cdrc);
3755           if (cdrc.have != NULL)
3756             {
3757 #if DEBUG_FS
3758               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3759                           "Received request for block `%s' twice from client, will only request once.\n",
3760                           GNUNET_h2s (&pr->query));
3761 #endif
3762               
3763               destroy_pending_request (pr);
3764               return;
3765             }
3766         }
3767       if (pr->local_only == GNUNET_YES)
3768         {
3769           destroy_pending_request (pr);
3770           return;
3771         }
3772       /* no more results */
3773       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
3774         pr->task = GNUNET_SCHEDULER_add_now (sched,
3775                                              &forward_request_task,
3776                                              pr);      
3777       return;
3778     }
3779 #if DEBUG_FS
3780   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3781               "New local response to `%s' of type %u.\n",
3782               GNUNET_h2s (key),
3783               type);
3784 #endif
3785   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3786     {
3787 #if DEBUG_FS
3788       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3789                   "Found ONDEMAND block, performing on-demand encoding\n");
3790 #endif
3791       GNUNET_STATISTICS_update (stats,
3792                                 gettext_noop ("# on-demand blocks matched requests"),
3793                                 1,
3794                                 GNUNET_NO);
3795       if (GNUNET_OK != 
3796           GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, 
3797                                             anonymity, expiration, uid, 
3798                                             &process_local_reply,
3799                                             pr))
3800       if (pr->qe != NULL)
3801         {
3802           GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3803         }
3804       return;
3805     }
3806   old_rf = pr->results_found;
3807   memset (&prq, 0, sizeof (prq));
3808   prq.data = data;
3809   prq.expiration = expiration;
3810   prq.size = size;  
3811   if (GNUNET_OK != 
3812       GNUNET_BLOCK_get_key (block_ctx,
3813                             type,
3814                             data,
3815                             size,
3816                             &query))
3817     {
3818       GNUNET_break (0);
3819       GNUNET_DATASTORE_remove (dsh,
3820                                key,
3821                                size, data,
3822                                -1, -1, 
3823                                GNUNET_TIME_UNIT_FOREVER_REL,
3824                                NULL, NULL);
3825       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3826       return;
3827     }
3828   prq.type = type;
3829   prq.priority = priority;  
3830   prq.finished = GNUNET_NO;
3831   prq.request_found = GNUNET_NO;
3832   if ( (old_rf == 0) &&
3833        (pr->results_found == 0) )
3834     update_datastore_delays (pr->start_time);
3835   process_reply (&prq, key, pr);
3836   if (prq.finished == GNUNET_YES)
3837     return;
3838   if (pr->qe == NULL)
3839     return; /* done here */
3840   if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
3841     {
3842       pr->local_only = GNUNET_YES; /* do not forward */
3843       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3844       return;
3845     }
3846   if ( (pr->client_request_list == NULL) &&
3847        ( (GNUNET_YES == test_get_load_too_high (0)) ||
3848          (pr->results_found > 5 + 2 * pr->priority) ) )
3849     {
3850 #if DEBUG_FS > 2
3851       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3852                   "Load too high, done with request\n");
3853 #endif
3854       GNUNET_STATISTICS_update (stats,
3855                                 gettext_noop ("# processing result set cut short due to load"),
3856                                 1,
3857                                 GNUNET_NO);
3858       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3859       return;
3860     }
3861   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3862 }
3863
3864
3865 /**
3866  * We've received a request with the specified priority.  Bound it
3867  * according to how much we trust the given peer.
3868  * 
3869  * @param prio_in requested priority
3870  * @param cp the peer making the request
3871  * @return effective priority
3872  */
3873 static int32_t
3874 bound_priority (uint32_t prio_in,
3875                 struct ConnectedPeer *cp)
3876 {
3877 #define N ((double)128.0)
3878   uint32_t ret;
3879   double rret;
3880   int ld;
3881
3882   ld = test_get_load_too_high (0);
3883   if (ld == GNUNET_SYSERR)
3884     {
3885       GNUNET_STATISTICS_update (stats,
3886                                 gettext_noop ("# requests done for free (low load)"),
3887                                 1,
3888                                 GNUNET_NO);
3889       return 0; /* excess resources */
3890     }
3891   ret = change_host_trust (cp, prio_in);
3892   if (ret > 0)
3893     {
3894       if (ret > current_priorities + N)
3895         rret = current_priorities + N;
3896       else
3897         rret = ret;
3898       current_priorities 
3899         = (current_priorities * (N-1) + rret)/N;
3900     }
3901   if ( (ld == GNUNET_YES) && (ret > 0) )
3902     {
3903       /* try with charging */
3904       ld = test_get_load_too_high (ret);
3905     }
3906   if (ld == GNUNET_YES)
3907     {
3908       GNUNET_STATISTICS_update (stats,
3909                                 gettext_noop ("# request dropped, priority insufficient"),
3910                                 1,
3911                                 GNUNET_NO);
3912       /* undo charge */
3913       if (ret != 0)
3914         change_host_trust (cp, -ret);
3915       return -1; /* not enough resources */
3916     }
3917   else
3918     {
3919       GNUNET_STATISTICS_update (stats,
3920                                 gettext_noop ("# requests done for a price (normal load)"),
3921                                 1,
3922                                 GNUNET_NO);
3923     }
3924 #undef N
3925   return ret;
3926 }
3927
3928
3929 /**
3930  * Iterator over entries in the 'query_request_map' that
3931  * tries to see if we have the same request pending from
3932  * the same peer already.
3933  *
3934  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
3935  * @param key current key code (query, ignored, must match)
3936  * @param value value in the hash map (a 'struct PendingRequest' 
3937  *              that already exists)
3938  * @return GNUNET_YES if we should continue to
3939  *         iterate (no match yet)
3940  *         GNUNET_NO if not (match found).
3941  */
3942 static int
3943 check_duplicate_request_peer (void *cls,
3944                               const GNUNET_HashCode * key,
3945                               void *value)
3946 {
3947   struct CheckDuplicateRequestClosure *cdc = cls;
3948   struct PendingRequest *have = value;
3949
3950   if (cdc->pr->target_pid == have->target_pid)
3951     {
3952       cdc->have = have;
3953       return GNUNET_NO;
3954     }
3955   return GNUNET_YES;
3956 }
3957
3958
3959 /**
3960  * Handle P2P "GET" request.
3961  *
3962  * @param cls closure, always NULL
3963  * @param other the other peer involved (sender or receiver, NULL
3964  *        for loopback messages where we are both sender and receiver)
3965  * @param message the actual message
3966  * @param latency reported latency of the connection with 'other'
3967  * @param distance reported distance (DV) to 'other' 
3968  * @return GNUNET_OK to keep the connection open,
3969  *         GNUNET_SYSERR to close it (signal serious error)
3970  */
3971 static int
3972 handle_p2p_get (void *cls,
3973                 const struct GNUNET_PeerIdentity *other,
3974                 const struct GNUNET_MessageHeader *message,
3975                 struct GNUNET_TIME_Relative latency,
3976                 uint32_t distance)
3977 {
3978   struct PendingRequest *pr;
3979   struct ConnectedPeer *cp;
3980   struct ConnectedPeer *cps;
3981   struct CheckDuplicateRequestClosure cdc;
3982   struct GNUNET_TIME_Relative timeout;
3983   uint16_t msize;
3984   const struct GetMessage *gm;
3985   unsigned int bits;
3986   const GNUNET_HashCode *opt;
3987   uint32_t bm;
3988   size_t bfsize;
3989   uint32_t ttl_decrement;
3990   int32_t priority;
3991   enum GNUNET_BLOCK_Type type;
3992   int have_ns;
3993
3994   msize = ntohs(message->size);
3995   if (msize < sizeof (struct GetMessage))
3996     {
3997       GNUNET_break_op (0);
3998       return GNUNET_SYSERR;
3999     }
4000   gm = (const struct GetMessage*) message;
4001 #if DEBUG_FS
4002   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4003               "Received request for `%s'\n",
4004               GNUNET_h2s (&gm->query));
4005 #endif
4006   type = ntohl (gm->type);
4007   bm = ntohl (gm->hash_bitmap);
4008   bits = 0;
4009   while (bm > 0)
4010     {
4011       if (1 == (bm & 1))
4012         bits++;
4013       bm >>= 1;
4014     }
4015   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
4016     {
4017       GNUNET_break_op (0);
4018       return GNUNET_SYSERR;
4019     }  
4020   opt = (const GNUNET_HashCode*) &gm[1];
4021   bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
4022   bm = ntohl (gm->hash_bitmap);
4023   bits = 0;
4024   cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
4025                                            &other->hashPubKey);
4026   if (NULL == cps)
4027     {
4028       /* peer must have just disconnected */
4029       GNUNET_STATISTICS_update (stats,
4030                                 gettext_noop ("# requests dropped due to initiator not being connected"),
4031                                 1,
4032                                 GNUNET_NO);
4033       return GNUNET_SYSERR;
4034     }
4035   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
4036     cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
4037                                             &opt[bits++]);
4038   else
4039     cp = cps;
4040   if (cp == NULL)
4041     {
4042 #if DEBUG_FS
4043       if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
4044         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4045                     "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
4046                     GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
4047       
4048       else
4049         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4050                     "Failed to find peer `%4s' in connection set. Dropping query.\n",
4051                     GNUNET_i2s (other));
4052 #endif
4053       GNUNET_STATISTICS_update (stats,
4054                                 gettext_noop ("# requests dropped due to missing reverse route"),
4055                                 1,
4056                                 GNUNET_NO);
4057      /* FIXME: try connect? */
4058       return GNUNET_OK;
4059     }
4060   /* note that we can really only check load here since otherwise
4061      peers could find out that we are overloaded by not being
4062      disconnected after sending us a malformed query... */
4063   priority = bound_priority (ntohl (gm->priority), cps);
4064   if (priority < 0)
4065     {
4066 #if DEBUG_FS
4067       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4068                   "Dropping query from `%s', this peer is too busy.\n",
4069                   GNUNET_i2s (other));
4070 #endif
4071       return GNUNET_OK;
4072     }
4073 #if DEBUG_FS 
4074   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4075               "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
4076               GNUNET_h2s (&gm->query),
4077               (unsigned int) type,
4078               GNUNET_i2s (other),
4079               (unsigned int) bm);
4080 #endif
4081   have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
4082   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
4083                       (have_ns ? sizeof(GNUNET_HashCode) : 0));
4084   if (have_ns)
4085     {
4086       pr->namespace = (GNUNET_HashCode*) &pr[1];
4087       memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
4088     }
4089   if ( (GNUNET_LOAD_get_load (cp->transmission_delay) > 3 * (1 + priority)) ||
4090        (GNUNET_LOAD_get_average (cp->transmission_delay) > 
4091         GNUNET_CONSTANTS_MAX_CORK_DELAY.value * 2 + GNUNET_LOAD_get_average (rt_entry_lifetime)) )
4092     {
4093       /* don't have BW to send to peer, or would likely take longer than we have for it,
4094          so at best indirect the query */
4095       priority = 0;
4096       pr->forward_only = GNUNET_YES;
4097     }
4098   pr->type = type;
4099   pr->mingle = ntohl (gm->filter_mutator);
4100   if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
4101     pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
4102   pr->anonymity_level = 1;
4103   pr->priority = (uint32_t) priority;
4104   pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
4105   pr->query = gm->query;
4106   /* decrement ttl (always) */
4107   ttl_decrement = 2 * TTL_DECREMENT +
4108     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
4109                               TTL_DECREMENT);
4110   if ( (pr->ttl < 0) &&
4111        (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
4112     {
4113 #if DEBUG_FS
4114       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4115                   "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
4116                   GNUNET_i2s (other),
4117                   pr->ttl,
4118                   ttl_decrement);
4119 #endif
4120       GNUNET_STATISTICS_update (stats,
4121                                 gettext_noop ("# requests dropped due TTL underflow"),
4122                                 1,
4123                                 GNUNET_NO);
4124       /* integer underflow => drop (should be very rare)! */      
4125       GNUNET_free (pr);
4126       return GNUNET_OK;
4127     } 
4128   pr->ttl -= ttl_decrement;
4129   pr->start_time = GNUNET_TIME_absolute_get ();
4130
4131   /* get bloom filter */
4132   if (bfsize > 0)
4133     {
4134       pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
4135                                                   bfsize,
4136                                                   BLOOMFILTER_K);
4137       pr->bf_size = bfsize;
4138     }
4139   cdc.have = NULL;
4140   cdc.pr = pr;
4141   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
4142                                               &gm->query,
4143                                               &check_duplicate_request_peer,
4144                                               &cdc);
4145   if (cdc.have != NULL)
4146     {
4147       if (cdc.have->start_time.value + cdc.have->ttl >=
4148           pr->start_time.value + pr->ttl)
4149         {
4150           /* existing request has higher TTL, drop new one! */
4151           cdc.have->priority += pr->priority;
4152           destroy_pending_request (pr);
4153 #if DEBUG_FS
4154           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4155                       "Have existing request with higher TTL, dropping new request.\n",
4156                       GNUNET_i2s (other));
4157 #endif
4158           GNUNET_STATISTICS_update (stats,
4159                                     gettext_noop ("# requests dropped due to higher-TTL request"),
4160                                     1,
4161                                     GNUNET_NO);
4162           return GNUNET_OK;
4163         }
4164       else
4165         {
4166           /* existing request has lower TTL, drop old one! */
4167           pr->priority += cdc.have->priority;
4168           /* Possible optimization: if we have applicable pending
4169              replies in 'cdc.have', we might want to move those over
4170              (this is a really rare special-case, so it is not clear
4171              that this would be worth it) */
4172           destroy_pending_request (cdc.have);
4173           /* keep processing 'pr'! */
4174         }
4175     }
4176
4177   pr->cp = cp;
4178   GNUNET_break (GNUNET_OK ==
4179                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
4180                                                    &gm->query,
4181                                                    pr,
4182                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4183   GNUNET_break (GNUNET_OK ==
4184                 GNUNET_CONTAINER_multihashmap_put (peer_request_map,
4185                                                    &other->hashPubKey,
4186                                                    pr,
4187                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4188   
4189   pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
4190                                             pr,
4191                                             pr->start_time.value + pr->ttl);
4192
4193   GNUNET_STATISTICS_update (stats,
4194                             gettext_noop ("# P2P searches received"),
4195                             1,
4196                             GNUNET_NO);
4197   GNUNET_STATISTICS_update (stats,
4198                             gettext_noop ("# P2P searches active"),
4199                             1,
4200                             GNUNET_NO);
4201
4202   /* calculate change in traffic preference */
4203   cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
4204   /* process locally */
4205   if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
4206     type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
4207   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
4208                                            (pr->priority + 1)); 
4209   if (GNUNET_YES != pr->forward_only)
4210     {
4211 #if DEBUG_FS
4212       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4213                   "Handing request for `%s' to datastore\n",
4214                   GNUNET_h2s (&gm->query));
4215 #endif
4216       pr->qe = GNUNET_DATASTORE_get (dsh,
4217                                      &gm->query,
4218                                      type,                             
4219                                      pr->priority + 1,
4220                                      MAX_DATASTORE_QUEUE,                                
4221                                      timeout,
4222                                      &process_local_reply,
4223                                      pr);
4224       if (NULL == pr->qe)
4225         {
4226           GNUNET_STATISTICS_update (stats,
4227                                     gettext_noop ("# requests dropped by datastore (queue length limit)"),
4228                                     1,
4229                                     GNUNET_NO);
4230         }
4231     }
4232   else
4233     {
4234       GNUNET_STATISTICS_update (stats,
4235                                 gettext_noop ("# requests forwarded due to high load"),
4236                                 1,
4237                                 GNUNET_NO);
4238     }
4239
4240   /* Are multiple results possible (and did we look locally)?  If so, start processing remotely now! */
4241   switch (pr->type)
4242     {
4243     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
4244     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
4245       /* only one result, wait for datastore */
4246       if (GNUNET_YES != pr->forward_only)
4247         {
4248           GNUNET_STATISTICS_update (stats,
4249                                     gettext_noop ("# requests not instantly forwarded (waiting for datastore)"),
4250                                     1,
4251                                     GNUNET_NO);
4252           break;
4253         }
4254     default:
4255       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
4256         pr->task = GNUNET_SCHEDULER_add_now (sched,
4257                                              &forward_request_task,
4258                                              pr);
4259     }
4260
4261   /* make sure we don't track too many requests */
4262   if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
4263     {
4264       pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
4265       GNUNET_assert (pr != NULL);
4266       destroy_pending_request (pr);
4267     }
4268   return GNUNET_OK;
4269 }
4270
4271
4272 /* **************************** CS GET Handling ************************ */
4273
4274
4275 /**
4276  * Handle START_SEARCH-message (search request from client).
4277  *
4278  * @param cls closure
4279  * @param client identification of the client
4280  * @param message the actual message
4281  */
4282 static void
4283 handle_start_search (void *cls,
4284                      struct GNUNET_SERVER_Client *client,
4285                      const struct GNUNET_MessageHeader *message)
4286 {
4287   static GNUNET_HashCode all_zeros;
4288   const struct SearchMessage *sm;
4289   struct ClientList *cl;
4290   struct ClientRequestList *crl;
4291   struct PendingRequest *pr;
4292   uint16_t msize;
4293   unsigned int sc;
4294   enum GNUNET_BLOCK_Type type;
4295
4296   msize = ntohs (message->size);
4297   if ( (msize < sizeof (struct SearchMessage)) ||
4298        (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
4299     {
4300       GNUNET_break (0);
4301       GNUNET_SERVER_receive_done (client,
4302                                   GNUNET_SYSERR);
4303       return;
4304     }
4305   GNUNET_STATISTICS_update (stats,
4306                             gettext_noop ("# client searches received"),
4307                             1,
4308                             GNUNET_NO);
4309   sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
4310   sm = (const struct SearchMessage*) message;
4311   type = ntohl (sm->type);
4312 #if DEBUG_FS
4313   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4314               "Received request for `%s' of type %u from local client\n",
4315               GNUNET_h2s (&sm->query),
4316               (unsigned int) type);
4317 #endif
4318   cl = client_list;
4319   while ( (cl != NULL) &&
4320           (cl->client != client) )
4321     cl = cl->next;
4322   if (cl == NULL)
4323     {
4324       cl = GNUNET_malloc (sizeof (struct ClientList));
4325       cl->client = client;
4326       GNUNET_SERVER_client_keep (client);
4327       cl->next = client_list;
4328       client_list = cl;
4329     }
4330   /* detect duplicate KBLOCK requests */
4331   if ( (type == GNUNET_BLOCK_TYPE_FS_KBLOCK) ||
4332        (type == GNUNET_BLOCK_TYPE_FS_NBLOCK) ||
4333        (type == GNUNET_BLOCK_TYPE_ANY) )
4334     {
4335       crl = cl->rl_head;
4336       while ( (crl != NULL) &&
4337               ( (0 != memcmp (&crl->req->query,
4338                               &sm->query,
4339                               sizeof (GNUNET_HashCode))) ||
4340                 (crl->req->type != type) ) )
4341         crl = crl->next;
4342       if (crl != NULL)  
4343         { 
4344 #if DEBUG_FS
4345           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4346                       "Have existing request, merging content-seen lists.\n");
4347 #endif
4348           pr = crl->req;
4349           /* Duplicate request (used to send long list of
4350              known/blocked results); merge 'pr->replies_seen'
4351              and update bloom filter */
4352           GNUNET_array_grow (pr->replies_seen,
4353                              pr->replies_seen_size,
4354                              pr->replies_seen_off + sc);
4355           memcpy (&pr->replies_seen[pr->replies_seen_off],
4356                   &sm[1],
4357                   sc * sizeof (GNUNET_HashCode));
4358           pr->replies_seen_off += sc;
4359           refresh_bloomfilter (pr);
4360           GNUNET_STATISTICS_update (stats,
4361                                     gettext_noop ("# client searches updated (merged content seen list)"),
4362                                     1,
4363                                     GNUNET_NO);
4364           GNUNET_SERVER_receive_done (client,
4365                                       GNUNET_OK);
4366           return;
4367         }
4368     }
4369   GNUNET_STATISTICS_update (stats,
4370                             gettext_noop ("# client searches active"),
4371                             1,
4372                             GNUNET_NO);
4373   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
4374                       ((type == GNUNET_BLOCK_TYPE_FS_SBLOCK) ? sizeof(GNUNET_HashCode) : 0));
4375   crl = GNUNET_malloc (sizeof (struct ClientRequestList));
4376   memset (crl, 0, sizeof (struct ClientRequestList));
4377   crl->client_list = cl;
4378   GNUNET_CONTAINER_DLL_insert (cl->rl_head,
4379                                cl->rl_tail,
4380                                crl);  
4381   crl->req = pr;
4382   pr->type = type;
4383   pr->client_request_list = crl;
4384   GNUNET_array_grow (pr->replies_seen,
4385                      pr->replies_seen_size,
4386                      sc);
4387   memcpy (pr->replies_seen,
4388           &sm[1],
4389           sc * sizeof (GNUNET_HashCode));
4390   pr->replies_seen_off = sc;
4391   pr->anonymity_level = ntohl (sm->anonymity_level); 
4392   pr->start_time = GNUNET_TIME_absolute_get ();
4393   refresh_bloomfilter (pr);
4394   pr->query = sm->query;
4395   if (0 == (1 & ntohl (sm->options)))
4396     pr->local_only = GNUNET_NO;
4397   else
4398     pr->local_only = GNUNET_YES;
4399   switch (type)
4400     {
4401     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
4402     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
4403       if (0 != memcmp (&sm->target,
4404                        &all_zeros,
4405                        sizeof (GNUNET_HashCode)))
4406         pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target);
4407       break;
4408     case GNUNET_BLOCK_TYPE_FS_SBLOCK:
4409       pr->namespace = (GNUNET_HashCode*) &pr[1];
4410       memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode));
4411       break;
4412     default:
4413       break;
4414     }
4415   GNUNET_break (GNUNET_OK ==
4416                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
4417                                                    &sm->query,
4418                                                    pr,
4419                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4420   if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
4421     type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */
4422   pr->qe = GNUNET_DATASTORE_get (dsh,
4423                                  &sm->query,
4424                                  type,
4425                                  -3, -1,
4426                                  GNUNET_CONSTANTS_SERVICE_TIMEOUT,                             
4427                                  &process_local_reply,
4428                                  pr);
4429 }
4430
4431
4432 /* **************************** Startup ************************ */
4433
4434 /**
4435  * Process fs requests.
4436  *
4437  * @param s scheduler to use
4438  * @param server the initialized server
4439  * @param c configuration to use
4440  */
4441 static int
4442 main_init (struct GNUNET_SCHEDULER_Handle *s,
4443            struct GNUNET_SERVER_Handle *server,
4444            const struct GNUNET_CONFIGURATION_Handle *c)
4445 {
4446   static const struct GNUNET_CORE_MessageHandler p2p_handlers[] =
4447     {
4448       { &handle_p2p_get, 
4449         GNUNET_MESSAGE_TYPE_FS_GET, 0 },
4450       { &handle_p2p_put, 
4451         GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
4452       { &handle_p2p_migration_stop, 
4453         GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP,
4454         sizeof (struct MigrationStopMessage) },
4455       { NULL, 0, 0 }
4456     };
4457   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
4458     {&GNUNET_FS_handle_index_start, NULL, 
4459      GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
4460     {&GNUNET_FS_handle_index_list_get, NULL, 
4461      GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
4462     {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
4463      sizeof (struct UnindexMessage) },
4464     {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
4465      0 },
4466     {NULL, NULL, 0, 0}
4467   };
4468   unsigned long long enc = 128;
4469
4470   sched = s;
4471   cfg = c;
4472   stats = GNUNET_STATISTICS_create (sched, "fs", cfg);
4473   min_migration_delay = GNUNET_TIME_UNIT_SECONDS;
4474   if ( (GNUNET_OK !=
4475         GNUNET_CONFIGURATION_get_value_number (cfg,
4476                                                "fs",
4477                                                "MAX_PENDING_REQUESTS",
4478                                                &max_pending_requests)) ||
4479        (GNUNET_OK !=
4480         GNUNET_CONFIGURATION_get_value_number (cfg,
4481                                                "fs",
4482                                                "EXPECTED_NEIGHBOUR_COUNT",
4483                                                &enc)) ||
4484        (GNUNET_OK != 
4485         GNUNET_CONFIGURATION_get_value_time (cfg,
4486                                              "fs",
4487                                              "MIN_MIGRATION_DELAY",
4488                                              &min_migration_delay)) )
4489     {
4490       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4491                   _("Configuration fails to specify certain parameters, assuming default values."));
4492     }
4493   connected_peers = GNUNET_CONTAINER_multihashmap_create (enc); 
4494   query_request_map = GNUNET_CONTAINER_multihashmap_create (max_pending_requests);
4495   rt_entry_lifetime = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_FOREVER_REL);
4496   peer_request_map = GNUNET_CONTAINER_multihashmap_create (enc);
4497   requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
4498   core = GNUNET_CORE_connect (sched,
4499                               cfg,
4500                               GNUNET_TIME_UNIT_FOREVER_REL,
4501                               NULL,
4502                               NULL,
4503                               &peer_connect_handler,
4504                               &peer_disconnect_handler,
4505                               &peer_status_handler,
4506                               NULL, GNUNET_NO,
4507                               NULL, GNUNET_NO,
4508                               p2p_handlers);
4509   if (NULL == core)
4510     {
4511       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
4512                   _("Failed to connect to `%s' service.\n"),
4513                   "core");
4514       GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
4515       connected_peers = NULL;
4516       GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
4517       query_request_map = NULL;
4518       GNUNET_LOAD_value_free (rt_entry_lifetime);
4519       rt_entry_lifetime = NULL;
4520       GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
4521       requests_by_expiration_heap = NULL;
4522       GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
4523       peer_request_map = NULL;
4524       if (dsh != NULL)
4525         {
4526           GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4527           dsh = NULL;
4528         }
4529       return GNUNET_SYSERR;
4530     }
4531   /* FIXME: distinguish between sending and storing in options? */
4532   if (active_migration) 
4533     {
4534       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4535                   _("Content migration is enabled, will start to gather data\n"));
4536       consider_migration_gathering ();
4537     }
4538   consider_dht_put_gathering (NULL);
4539   GNUNET_SERVER_disconnect_notify (server, 
4540                                    &handle_client_disconnect,
4541                                    NULL);
4542   GNUNET_assert (GNUNET_OK ==
4543                  GNUNET_CONFIGURATION_get_value_filename (cfg,
4544                                                           "fs",
4545                                                           "TRUST",
4546                                                           &trustDirectory));
4547   GNUNET_DISK_directory_create (trustDirectory);
4548   GNUNET_SCHEDULER_add_with_priority (sched,
4549                                       GNUNET_SCHEDULER_PRIORITY_HIGH,
4550                                       &cron_flush_trust, NULL);
4551
4552
4553   GNUNET_SERVER_add_handlers (server, handlers);
4554   GNUNET_SCHEDULER_add_delayed (sched,
4555                                 GNUNET_TIME_UNIT_FOREVER_REL,
4556                                 &shutdown_task,
4557                                 NULL);
4558   return GNUNET_OK;
4559 }
4560
4561
4562 /**
4563  * Process fs requests.
4564  *
4565  * @param cls closure
4566  * @param sched scheduler to use
4567  * @param server the initialized server
4568  * @param cfg configuration to use
4569  */
4570 static void
4571 run (void *cls,
4572      struct GNUNET_SCHEDULER_Handle *sched,
4573      struct GNUNET_SERVER_Handle *server,
4574      const struct GNUNET_CONFIGURATION_Handle *cfg)
4575 {
4576   active_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
4577                                                            "FS",
4578                                                            "ACTIVEMIGRATION");
4579   dsh = GNUNET_DATASTORE_connect (cfg,
4580                                   sched);
4581   if (dsh == NULL)
4582     {
4583       GNUNET_SCHEDULER_shutdown (sched);
4584       return;
4585     }
4586   datastore_get_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
4587   datastore_put_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
4588   block_cfg = GNUNET_CONFIGURATION_create ();
4589   GNUNET_CONFIGURATION_set_value_string (block_cfg,
4590                                          "block",
4591                                          "PLUGINS",
4592                                          "fs");
4593   block_ctx = GNUNET_BLOCK_context_create (block_cfg);
4594   GNUNET_assert (NULL != block_ctx);
4595   dht_handle = GNUNET_DHT_connect (sched,
4596                                    cfg,
4597                                    FS_DHT_HT_SIZE);
4598   if ( (GNUNET_OK != GNUNET_FS_indexing_init (sched, cfg, dsh)) ||
4599        (GNUNET_OK != main_init (sched, server, cfg)) )
4600     {    
4601       GNUNET_SCHEDULER_shutdown (sched);
4602       GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4603       dsh = NULL;
4604       GNUNET_DHT_disconnect (dht_handle);
4605       dht_handle = NULL;
4606       GNUNET_BLOCK_context_destroy (block_ctx);
4607       block_ctx = NULL;
4608       GNUNET_CONFIGURATION_destroy (block_cfg);
4609       block_cfg = NULL;
4610       GNUNET_LOAD_value_free (datastore_get_load);
4611       datastore_get_load = NULL;
4612       GNUNET_LOAD_value_free (datastore_put_load);
4613       datastore_put_load = NULL;
4614       return;   
4615     }
4616 }
4617
4618
4619 /**
4620  * The main function for the fs service.
4621  *
4622  * @param argc number of arguments from the command line
4623  * @param argv command line arguments
4624  * @return 0 ok, 1 on error
4625  */
4626 int
4627 main (int argc, char *const *argv)
4628 {
4629   return (GNUNET_OK ==
4630           GNUNET_SERVICE_run (argc,
4631                               argv,
4632                               "fs",
4633                               GNUNET_SERVICE_OPTION_NONE,
4634                               &run, NULL)) ? 0 : 1;
4635 }
4636
4637 /* end of gnunet-service-fs.c */