(no commit message)
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs.c
23  * @brief gnunet anonymity protocol implementation
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - collect traffic data for anonymity levels > 1
28  * - implement transmission restrictions for anonymity level > 1
29  * - more statistics
30  */
31 #include "platform.h"
32 #include <float.h>
33 #include "gnunet_constants.h"
34 #include "gnunet_core_service.h"
35 #include "gnunet_dht_service.h"
36 #include "gnunet_datastore_service.h"
37 #include "gnunet_load_lib.h"
38 #include "gnunet_peer_lib.h"
39 #include "gnunet_protocols.h"
40 #include "gnunet_signatures.h"
41 #include "gnunet_statistics_service.h"
42 #include "gnunet_transport_service.h"
43 #include "gnunet_util_lib.h"
44 #include "gnunet-service-fs_indexing.h"
45 #include "fs.h"
46
47 #define DEBUG_FS GNUNET_NO
48
49 /**
50  * Should we introduce random latency in processing?  Required for proper
51  * implementation of GAP, but can be disabled for performance evaluation of
52  * the basic routing algorithm.
53  *
54  * Note that with delays enabled, performance can be significantly lower
55  * (several orders of magnitude in 2-peer test runs); if you want to
56  * measure throughput of other components, set this to NO.  Also, you
57  * might want to consider changing 'RETRY_PROBABILITY_INV' to 1 for
58  * a rather wasteful mode of operation (that might still get the highest
59  * throughput overall).
60  *
61  * Performance measurements (for 50 MB file, 2 peers):
62  *
63  * - Without delays: 3300 kb/s
64  * - With    delays:  101 kb/s
65  */
66 #define SUPPORT_DELAYS GNUNET_NO
67
68 /**
69  * Size for the hash map for DHT requests from the FS
70  * service.  Should be about the number of concurrent
71  * DHT requests we plan to make.
72  */
73 #define FS_DHT_HT_SIZE 1024
74
75 /**
76  * At what frequency should our datastore load decrease
77  * automatically (since if we don't use it, clearly the
78  * load must be going down).
79  */
80 #define DATASTORE_LOAD_AUTODECLINE GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 250)
81
82 /**
83  * How often do we flush trust values to disk?
84  */
85 #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
86
87 /**
88  * How often do we at most PUT content into the DHT?
89  */
90 #define MAX_DHT_PUT_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
91
92 /**
93  * Inverse of the probability that we will submit the same query
94  * to the same peer again.  If the same peer already got the query
95  * repeatedly recently, the probability is multiplied by the inverse
96  * of this number each time.  Note that we only try about every TTL_DECREMENT/2
97  * plus MAX_CORK_DELAY (so roughly every 3.5s).
98  *
99  * Note that this factor is a key influence to performance in small
100  * networks (especially test networks of 2 peers) because if there is
101  * only a single peer with the data, this value will determine how
102  * soon we might re-try.  For example, a value of 3 can result in 
103  * 1.7 MB/s transfer rates for a 10 MB file when a value of 1 would
104  * give us 5 MB/s.  OTOH, obviously re-trying the same peer can be
105  * rather inefficient in larger networks, hence picking 1 is in 
106  * general not the best choice.
107  *
108  * Performance measurements (for 50 MB file, 2 peers, no delays):
109  *
110  * - 1: 3300 kb/s (consistently)
111  * - 3: 2046 kb/s, 754 kb/s, 3490 kb/s
112  * - 5:  759 kb/s, 968 kb/s, 1160 kb/s
113  *
114  * Note that this does NOT mean that the value should be 1 since
115  * a 2-peer network is far from representative here (and this fails
116  * to take into consideration bandwidth wasted by repeatedly 
117  * sending queries to peers that don't have the content).  Also,
118  * it is expected that higher values lead to more inconsistent
119  * measurements since this only affects lost messages towards the
120  * end of the download.
121  *
122  * Finally, we should probably consider changing this and making
123  * it dependent on the number of connected peers or a related
124  * metric (bad magic constants...).
125  */
126 #define RETRY_PROBABILITY_INV 1
127
128 /**
129  * What is the maximum delay for a P2P FS message (in our interaction
130  * with core)?  FS-internal delays are another story.  The value is
131  * chosen based on the 32k block size.  Given that peers typcially
132  * have at least 1 kb/s bandwidth, 45s waits give us a chance to
133  * transmit one message even to the lowest-bandwidth peers.
134  */
135 #define MAX_TRANSMIT_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 45)
136
137 /**
138  * Maximum number of requests (from other peers, overall) that we're
139  * willing to have pending at any given point in time.  Can be changed
140  * via the configuration file (32k is just the default).
141  */
142 static unsigned long long max_pending_requests = (32 * 1024);
143
144
145 /**
146  * Information we keep for each pending reply.  The
147  * actual message follows at the end of this struct.
148  */
149 struct PendingMessage;
150
151 /**
152  * Function called upon completion of a transmission.
153  *
154  * @param cls closure
155  * @param pid ID of receiving peer, 0 on transmission error
156  */
157 typedef void (*TransmissionContinuation)(void * cls, 
158                                          GNUNET_PEER_Id tpid);
159
160
161 /**
162  * Information we keep for each pending message (GET/PUT).  The
163  * actual message follows at the end of this struct.
164  */
165 struct PendingMessage
166 {
167   /**
168    * This is a doubly-linked list of messages to the same peer.
169    */
170   struct PendingMessage *next;
171
172   /**
173    * This is a doubly-linked list of messages to the same peer.
174    */
175   struct PendingMessage *prev;
176
177   /**
178    * Entry in pending message list for this pending message.
179    */ 
180   struct PendingMessageList *pml;  
181
182   /**
183    * Function to call immediately once we have transmitted this
184    * message.
185    */
186   TransmissionContinuation cont;
187
188   /**
189    * Closure for cont.
190    */
191   void *cont_cls;
192
193   /**
194    * Do not transmit this pending message until this deadline.
195    */
196   struct GNUNET_TIME_Absolute delay_until;
197
198   /**
199    * Size of the reply; actual reply message follows
200    * at the end of this struct.
201    */
202   size_t msize;
203   
204   /**
205    * How important is this message for us?
206    */
207   uint32_t priority;
208  
209 };
210
211
212 /**
213  * Information about a peer that we are connected to.
214  * We track data that is useful for determining which
215  * peers should receive our requests.  We also keep
216  * a list of messages to transmit to this peer.
217  */
218 struct ConnectedPeer
219 {
220
221   /**
222    * List of the last clients for which this peer successfully
223    * answered a query.
224    */
225   struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
226
227   /**
228    * List of the last PIDs for which
229    * this peer successfully answered a query;
230    * We use 0 to indicate no successful reply.
231    */
232   GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
233
234   /**
235    * Average delay between sending the peer a request and
236    * getting a reply (only calculated over the requests for
237    * which we actually got a reply).   Calculated
238    * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
239    */ 
240   struct GNUNET_TIME_Relative avg_delay;
241
242   /**
243    * Point in time until which this peer does not want us to migrate content
244    * to it.
245    */
246   struct GNUNET_TIME_Absolute migration_blocked;
247
248   /**
249    * Time until when we blocked this peer from migrating
250    * data to us.
251    */
252   struct GNUNET_TIME_Absolute last_migration_block;
253
254   /**
255    * Transmission times for the last MAX_QUEUE_PER_PEER
256    * requests for this peer.  Used as a ring buffer, current
257    * offset is stored in 'last_request_times_off'.  If the
258    * oldest entry is more recent than the 'avg_delay', we should
259    * not send any more requests right now.
260    */
261   struct GNUNET_TIME_Absolute last_request_times[MAX_QUEUE_PER_PEER];
262
263   /**
264    * Handle for an active request for transmission to this
265    * peer, or NULL.
266    */
267   struct GNUNET_CORE_TransmitHandle *cth;
268
269   /**
270    * Messages (replies, queries, content migration) we would like to
271    * send to this peer in the near future.  Sorted by priority, head.
272    */
273   struct PendingMessage *pending_messages_head;
274
275   /**
276    * Messages (replies, queries, content migration) we would like to
277    * send to this peer in the near future.  Sorted by priority, tail.
278    */
279   struct PendingMessage *pending_messages_tail;
280
281   /**
282    * How long does it typically take for us to transmit a message
283    * to this peer?  (delay between the request being issued and
284    * the callback being invoked).
285    */
286   struct GNUNET_LOAD_Value *transmission_delay;
287
288   /**
289    * Context of our GNUNET_CORE_peer_change_preference call (or NULL).
290    */
291   struct GNUNET_CORE_InformationRequestContext *irc;
292
293   /**
294    * Request for which 'irc' is currently active (or NULL).
295    */
296   struct PendingRequest *pr;
297
298   /**
299    * Time when the last transmission request was issued.
300    */
301   struct GNUNET_TIME_Absolute last_transmission_request_start;
302
303   /**
304    * ID of delay task for scheduling transmission.
305    */
306   GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task;
307
308   /**
309    * Average priority of successful replies.  Calculated
310    * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
311    */
312   double avg_priority;
313
314   /**
315    * Increase in traffic preference still to be submitted
316    * to the core service for this peer.
317    */
318   uint64_t inc_preference;
319
320   /**
321    * Trust rating for this peer
322    */
323   uint32_t trust;
324
325   /**
326    * Trust rating for this peer on disk.
327    */
328   uint32_t disk_trust;
329
330   /**
331    * The peer's identity.
332    */
333   GNUNET_PEER_Id pid;  
334
335   /**
336    * Size of the linked list of 'pending_messages'.
337    */
338   unsigned int pending_requests;
339
340   /**
341    * Which offset in "last_p2p_replies" will be updated next?
342    * (we go round-robin).
343    */
344   unsigned int last_p2p_replies_woff;
345
346   /**
347    * Which offset in "last_client_replies" will be updated next?
348    * (we go round-robin).
349    */
350   unsigned int last_client_replies_woff;
351
352   /**
353    * Current offset into 'last_request_times' ring buffer.
354    */
355   unsigned int last_request_times_off;
356
357 };
358
359
360 /**
361  * Information we keep for each pending request.  We should try to
362  * keep this struct as small as possible since its memory consumption
363  * is key to how many requests we can have pending at once.
364  */
365 struct PendingRequest;
366
367
368 /**
369  * Doubly-linked list of requests we are performing
370  * on behalf of the same client.
371  */
372 struct ClientRequestList
373 {
374
375   /**
376    * This is a doubly-linked list.
377    */
378   struct ClientRequestList *next;
379
380   /**
381    * This is a doubly-linked list.
382    */
383   struct ClientRequestList *prev;
384
385   /**
386    * Request this entry represents.
387    */
388   struct PendingRequest *req;
389
390   /**
391    * Client list this request belongs to.
392    */
393   struct ClientList *client_list;
394
395 };
396
397
398 /**
399  * Replies to be transmitted to the client.  The actual
400  * response message is allocated after this struct.
401  */
402 struct ClientResponseMessage
403 {
404   /**
405    * This is a doubly-linked list.
406    */
407   struct ClientResponseMessage *next;
408
409   /**
410    * This is a doubly-linked list.
411    */
412   struct ClientResponseMessage *prev;
413
414   /**
415    * Client list entry this response belongs to.
416    */
417   struct ClientList *client_list;
418
419   /**
420    * Number of bytes in the response.
421    */
422   size_t msize;
423 };
424
425
426 /**
427  * Linked list of clients we are performing requests
428  * for right now.
429  */
430 struct ClientList
431 {
432   /**
433    * This is a linked list.
434    */
435   struct ClientList *next;
436
437   /**
438    * ID of a client making a request, NULL if this entry is for a
439    * peer.
440    */
441   struct GNUNET_SERVER_Client *client;
442
443   /**
444    * Head of list of requests performed on behalf
445    * of this client right now.
446    */
447   struct ClientRequestList *rl_head;
448
449   /**
450    * Tail of list of requests performed on behalf
451    * of this client right now.
452    */
453   struct ClientRequestList *rl_tail;
454
455   /**
456    * Head of linked list of responses.
457    */
458   struct ClientResponseMessage *res_head;
459
460   /**
461    * Tail of linked list of responses.
462    */
463   struct ClientResponseMessage *res_tail;
464
465   /**
466    * Context for sending replies.
467    */
468   struct GNUNET_CONNECTION_TransmitHandle *th;
469
470 };
471
472
473 /**
474  * Information about a peer that we have forwarded this
475  * request to already.  
476  */
477 struct UsedTargetEntry
478 {
479   /**
480    * What was the last time we have transmitted this request to this
481    * peer?
482    */
483   struct GNUNET_TIME_Absolute last_request_time;
484
485   /**
486    * How often have we transmitted this request to this peer?
487    */
488   unsigned int num_requests;
489
490   /**
491    * PID of the target peer.
492    */
493   GNUNET_PEER_Id pid;
494
495 };
496
497
498
499
500
501 /**
502  * Doubly-linked list of messages we are performing
503  * due to a pending request.
504  */
505 struct PendingMessageList
506 {
507
508   /**
509    * This is a doubly-linked list of messages on behalf of the same request.
510    */
511   struct PendingMessageList *next;
512
513   /**
514    * This is a doubly-linked list of messages on behalf of the same request.
515    */
516   struct PendingMessageList *prev;
517
518   /**
519    * Message this entry represents.
520    */
521   struct PendingMessage *pm;
522
523   /**
524    * Request this entry belongs to.
525    */
526   struct PendingRequest *req;
527
528   /**
529    * Peer this message is targeted for.
530    */
531   struct ConnectedPeer *target;
532
533 };
534
535
536 /**
537  * Information we keep for each pending request.  We should try to
538  * keep this struct as small as possible since its memory consumption
539  * is key to how many requests we can have pending at once.
540  */
541 struct PendingRequest
542 {
543
544   /**
545    * If this request was made by a client, this is our entry in the
546    * client request list; otherwise NULL.
547    */
548   struct ClientRequestList *client_request_list;
549
550   /**
551    * Entry of peer responsible for this entry (if this request
552    * was made by a peer).
553    */
554   struct ConnectedPeer *cp;
555
556   /**
557    * If this is a namespace query, pointer to the hash of the public
558    * key of the namespace; otherwise NULL.  Pointer will be to the 
559    * end of this struct (so no need to free it).
560    */
561   const GNUNET_HashCode *namespace;
562
563   /**
564    * Bloomfilter we use to filter out replies that we don't care about
565    * (anymore).  NULL as long as we are interested in all replies.
566    */
567   struct GNUNET_CONTAINER_BloomFilter *bf;
568
569   /**
570    * Reference to DHT get operation for this request (or NULL).
571    */
572   struct GNUNET_DHT_GetHandle *dht_get;
573
574   /**
575    * Context of our GNUNET_CORE_peer_change_preference call.
576    */
577   struct ConnectedPeer *pirc;
578
579   /**
580    * Hash code of all replies that we have seen so far (only valid
581    * if client is not NULL since we only track replies like this for
582    * our own clients).
583    */
584   GNUNET_HashCode *replies_seen;
585
586   /**
587    * Node in the heap representing this entry; NULL
588    * if we have no heap node.
589    */
590   struct GNUNET_CONTAINER_HeapNode *hnode;
591
592   /**
593    * Head of list of messages being performed on behalf of this
594    * request.
595    */
596   struct PendingMessageList *pending_head;
597
598   /**
599    * Tail of list of messages being performed on behalf of this
600    * request.
601    */
602   struct PendingMessageList *pending_tail;
603
604   /**
605    * When did we first see this request (form this peer), or, if our
606    * client is initiating, when did we last initiate a search?
607    */
608   struct GNUNET_TIME_Absolute start_time;
609
610   /**
611    * The query that this request is for.
612    */
613   GNUNET_HashCode query;
614
615   /**
616    * The task responsible for transmitting queries
617    * for this request.
618    */
619   GNUNET_SCHEDULER_TaskIdentifier task;
620
621   /**
622    * (Interned) Peer identifier that identifies a preferred target
623    * for requests.
624    */
625   GNUNET_PEER_Id target_pid;
626
627   /**
628    * (Interned) Peer identifiers of peers that have already
629    * received our query for this content.
630    */
631   struct UsedTargetEntry *used_targets;
632   
633   /**
634    * Our entry in the queue (non-NULL while we wait for our
635    * turn to interact with the local database).
636    */
637   struct GNUNET_DATASTORE_QueueEntry *qe;
638
639   /**
640    * Size of the 'bf' (in bytes).
641    */
642   size_t bf_size;
643
644   /**
645    * Desired anonymity level; only valid for requests from a local client.
646    */
647   uint32_t anonymity_level;
648
649   /**
650    * How many entries in "used_targets" are actually valid?
651    */
652   unsigned int used_targets_off;
653
654   /**
655    * How long is the "used_targets" array?
656    */
657   unsigned int used_targets_size;
658
659   /**
660    * Number of results found for this request.
661    */
662   unsigned int results_found;
663
664   /**
665    * How many entries in "replies_seen" are actually valid?
666    */
667   unsigned int replies_seen_off;
668
669   /**
670    * How long is the "replies_seen" array?
671    */
672   unsigned int replies_seen_size;
673   
674   /**
675    * Priority with which this request was made.  If one of our clients
676    * made the request, then this is the current priority that we are
677    * using when initiating the request.  This value is used when
678    * we decide to reward other peers with trust for providing a reply.
679    */
680   uint32_t priority;
681
682   /**
683    * Priority points left for us to spend when forwarding this request
684    * to other peers.
685    */
686   uint32_t remaining_priority;
687
688   /**
689    * Number to mingle hashes for bloom-filter tests with.
690    */
691   int32_t mingle;
692
693   /**
694    * TTL with which we saw this request (or, if we initiated, TTL that
695    * we used for the request).
696    */
697   int32_t ttl;
698   
699   /**
700    * Type of the content that this request is for.
701    */
702   enum GNUNET_BLOCK_Type type;
703
704   /**
705    * Remove this request after transmission of the current response.
706    */
707   int8_t do_remove;
708
709   /**
710    * GNUNET_YES if we should not forward this request to other peers.
711    */
712   int8_t local_only;
713
714   /**
715    * GNUNET_YES if we should not forward this request to other peers.
716    */
717   int8_t forward_only;
718
719 };
720
721
722 /**
723  * Block that is ready for migration to other peers.  Actual data is at the end of the block.
724  */
725 struct MigrationReadyBlock
726 {
727
728   /**
729    * This is a doubly-linked list.
730    */
731   struct MigrationReadyBlock *next;
732
733   /**
734    * This is a doubly-linked list.
735    */
736   struct MigrationReadyBlock *prev;
737
738   /**
739    * Query for the block.
740    */
741   GNUNET_HashCode query;
742
743   /**
744    * When does this block expire? 
745    */
746   struct GNUNET_TIME_Absolute expiration;
747
748   /**
749    * Peers we would consider forwarding this
750    * block to.  Zero for empty entries.
751    */
752   GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
753
754   /**
755    * Size of the block.
756    */
757   size_t size;
758
759   /**
760    *  Number of targets already used.
761    */
762   unsigned int used_targets;
763
764   /**
765    * Type of the block.
766    */
767   enum GNUNET_BLOCK_Type type;
768 };
769
770
771 /**
772  * Our connection to the datastore.
773  */
774 static struct GNUNET_DATASTORE_Handle *dsh;
775
776 /**
777  * Our block context.
778  */
779 static struct GNUNET_BLOCK_Context *block_ctx;
780
781 /**
782  * Our block configuration.
783  */
784 static struct GNUNET_CONFIGURATION_Handle *block_cfg;
785
786 /**
787  * Our configuration.
788  */
789 static const struct GNUNET_CONFIGURATION_Handle *cfg;
790
791 /**
792  * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
793  */
794 static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
795
796 /**
797  * Map of peer identifiers to "struct PendingRequest" (for that peer).
798  */
799 static struct GNUNET_CONTAINER_MultiHashMap *peer_request_map;
800
801 /**
802  * Map of query identifiers to "struct PendingRequest" (for that query).
803  */
804 static struct GNUNET_CONTAINER_MultiHashMap *query_request_map;
805
806 /**
807  * Heap with the request that will expire next at the top.  Contains
808  * pointers of type "struct PendingRequest*"; these will *also* be
809  * aliased from the "requests_by_peer" data structures and the
810  * "requests_by_query" table.  Note that requests from our clients
811  * don't expire and are thus NOT in the "requests_by_expiration"
812  * (or the "requests_by_peer" tables).
813  */
814 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
815
816 /**
817  * Handle for reporting statistics.
818  */
819 static struct GNUNET_STATISTICS_Handle *stats;
820
821 /**
822  * Linked list of clients we are currently processing requests for.
823  */
824 static struct ClientList *client_list;
825
826 /**
827  * Pointer to handle to the core service (points to NULL until we've
828  * connected to it).
829  */
830 static struct GNUNET_CORE_Handle *core;
831
832 /**
833  * Head of linked list of blocks that can be migrated.
834  */
835 static struct MigrationReadyBlock *mig_head;
836
837 /**
838  * Tail of linked list of blocks that can be migrated.
839  */
840 static struct MigrationReadyBlock *mig_tail;
841
842 /**
843  * Request to datastore for migration (or NULL).
844  */
845 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
846
847 /**
848  * Request to datastore for DHT PUTs (or NULL).
849  */
850 static struct GNUNET_DATASTORE_QueueEntry *dht_qe;
851
852 /**
853  * Type we will request for the next DHT PUT round from the datastore.
854  */
855 static enum GNUNET_BLOCK_Type dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
856
857 /**
858  * Where do we store trust information?
859  */
860 static char *trustDirectory;
861
862 /**
863  * ID of task that collects blocks for migration.
864  */
865 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
866
867 /**
868  * ID of task that collects blocks for DHT PUTs.
869  */
870 static GNUNET_SCHEDULER_TaskIdentifier dht_task;
871
872 /**
873  * What is the maximum frequency at which we are allowed to
874  * poll the datastore for migration content?
875  */
876 static struct GNUNET_TIME_Relative min_migration_delay;
877
878 /**
879  * Handle for DHT operations.
880  */
881 static struct GNUNET_DHT_Handle *dht_handle;
882
883 /**
884  * Size of the doubly-linked list of migration blocks.
885  */
886 static unsigned int mig_size;
887
888 /**
889  * Are we allowed to migrate content to this peer.
890  */
891 static int active_to_migration;
892
893 /**
894  * Are we allowed to push out content from this peer.
895  */
896 static int active_from_migration;
897
898 /**
899  * How many entires with zero anonymity do we currently estimate
900  * to have in the database?
901  */
902 static unsigned int zero_anonymity_count_estimate;
903
904 /**
905  * Typical priorities we're seeing from other peers right now.  Since
906  * most priorities will be zero, this value is the weighted average of
907  * non-zero priorities seen "recently".  In order to ensure that new
908  * values do not dramatically change the ratio, values are first
909  * "capped" to a reasonable range (+N of the current value) and then
910  * averaged into the existing value by a ratio of 1:N.  Hence
911  * receiving the largest possible priority can still only raise our
912  * "current_priorities" by at most 1.
913  */
914 static double current_priorities;
915
916 /**
917  * Datastore 'GET' load tracking.
918  */
919 static struct GNUNET_LOAD_Value *datastore_get_load;
920
921 /**
922  * Datastore 'PUT' load tracking.
923  */
924 static struct GNUNET_LOAD_Value *datastore_put_load;
925
926 /**
927  * How long do requests typically stay in the routing table?
928  */
929 static struct GNUNET_LOAD_Value *rt_entry_lifetime;
930
931 /**
932  * We've just now completed a datastore request.  Update our
933  * datastore load calculations.
934  *
935  * @param start time when the datastore request was issued
936  */
937 static void
938 update_datastore_delays (struct GNUNET_TIME_Absolute start)
939 {
940   struct GNUNET_TIME_Relative delay;
941
942   delay = GNUNET_TIME_absolute_get_duration (start);
943   GNUNET_LOAD_update (datastore_get_load,
944                       delay.rel_value);
945 }
946
947
948 /**
949  * Get the filename under which we would store the GNUNET_HELLO_Message
950  * for the given host and protocol.
951  * @return filename of the form DIRECTORY/HOSTID
952  */
953 static char *
954 get_trust_filename (const struct GNUNET_PeerIdentity *id)
955 {
956   struct GNUNET_CRYPTO_HashAsciiEncoded fil;
957   char *fn;
958
959   GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
960   GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil);
961   return fn;
962 }
963
964
965
966 /**
967  * Transmit messages by copying it to the target buffer
968  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
969  * for writing in the meantime.  In that case, do nothing
970  * (the disconnect or shutdown handler will take care of the rest).
971  * If we were able to transmit messages and there are still more
972  * pending, ask core again for further calls to this function.
973  *
974  * @param cls closure, pointer to the 'struct ConnectedPeer*'
975  * @param size number of bytes available in buf
976  * @param buf where the callee should write the message
977  * @return number of bytes written to buf
978  */
979 static size_t
980 transmit_to_peer (void *cls,
981                   size_t size, void *buf);
982
983
984 /* ******************* clean up functions ************************ */
985
986 /**
987  * Delete the given migration block.
988  *
989  * @param mb block to delete
990  */
991 static void
992 delete_migration_block (struct MigrationReadyBlock *mb)
993 {
994   GNUNET_CONTAINER_DLL_remove (mig_head,
995                                mig_tail,
996                                mb);
997   GNUNET_PEER_decrement_rcs (mb->target_list,
998                              MIGRATION_LIST_SIZE);
999   mig_size--;
1000   GNUNET_free (mb);
1001 }
1002
1003
1004 /**
1005  * Compare the distance of two peers to a key.
1006  *
1007  * @param key key
1008  * @param p1 first peer
1009  * @param p2 second peer
1010  * @return GNUNET_YES if P1 is closer to key than P2
1011  */
1012 static int
1013 is_closer (const GNUNET_HashCode *key,
1014            const struct GNUNET_PeerIdentity *p1,
1015            const struct GNUNET_PeerIdentity *p2)
1016 {
1017   return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey,
1018                                     &p2->hashPubKey,
1019                                     key);
1020 }
1021
1022
1023 /**
1024  * Consider migrating content to a given peer.
1025  *
1026  * @param cls 'struct MigrationReadyBlock*' to select
1027  *            targets for (or NULL for none)
1028  * @param key ID of the peer 
1029  * @param value 'struct ConnectedPeer' of the peer
1030  * @return GNUNET_YES (always continue iteration)
1031  */
1032 static int
1033 consider_migration (void *cls,
1034                     const GNUNET_HashCode *key,
1035                     void *value)
1036 {
1037   struct MigrationReadyBlock *mb = cls;
1038   struct ConnectedPeer *cp = value;
1039   struct MigrationReadyBlock *pos;
1040   struct GNUNET_PeerIdentity cppid;
1041   struct GNUNET_PeerIdentity otherpid;
1042   struct GNUNET_PeerIdentity worstpid;
1043   size_t msize;
1044   unsigned int i;
1045   unsigned int repl;
1046   
1047   /* consider 'cp' as a migration target for mb */
1048   if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).rel_value > 0)
1049     return GNUNET_YES; /* peer has requested no migration! */
1050   if (mb != NULL)
1051     {
1052       GNUNET_PEER_resolve (cp->pid,
1053                            &cppid);
1054       repl = MIGRATION_LIST_SIZE;
1055       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1056         {
1057           if (mb->target_list[i] == 0)
1058             {
1059               mb->target_list[i] = cp->pid;
1060               GNUNET_PEER_change_rc (mb->target_list[i], 1);
1061               repl = MIGRATION_LIST_SIZE;
1062               break;
1063             }
1064           GNUNET_PEER_resolve (mb->target_list[i],
1065                                &otherpid);
1066           if ( (repl == MIGRATION_LIST_SIZE) &&
1067                is_closer (&mb->query,
1068                           &cppid,
1069                           &otherpid)) 
1070             {
1071               repl = i;
1072               worstpid = otherpid;
1073             }
1074           else if ( (repl != MIGRATION_LIST_SIZE) &&
1075                     (is_closer (&mb->query,
1076                                 &worstpid,
1077                                 &otherpid) ) )
1078             {
1079               repl = i;
1080               worstpid = otherpid;
1081             }       
1082         }
1083       if (repl != MIGRATION_LIST_SIZE) 
1084         {
1085           GNUNET_PEER_change_rc (mb->target_list[repl], -1);
1086           mb->target_list[repl] = cp->pid;
1087           GNUNET_PEER_change_rc (mb->target_list[repl], 1);
1088         }
1089     }
1090
1091   /* consider scheduling transmission to cp for content migration */
1092   if (cp->cth != NULL)        
1093     return GNUNET_YES; 
1094   msize = 0;
1095   pos = mig_head;
1096   while (pos != NULL)
1097     {
1098       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1099         {
1100           if (cp->pid == pos->target_list[i])
1101             {
1102               if (msize == 0)
1103                 msize = pos->size;
1104               else
1105                 msize = GNUNET_MIN (msize,
1106                                     pos->size);
1107               break;
1108             }
1109         }
1110       pos = pos->next;
1111     }
1112   if (msize == 0)
1113     return GNUNET_YES; /* no content available */
1114 #if DEBUG_FS
1115   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1116               "Trying to migrate at least %u bytes to peer `%s'\n",
1117               msize,
1118               GNUNET_h2s (key));
1119 #endif
1120   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
1121     {
1122       GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task);
1123       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
1124     }
1125   cp->cth 
1126     = GNUNET_CORE_notify_transmit_ready (core,
1127                                          0, GNUNET_TIME_UNIT_FOREVER_REL,
1128                                          (const struct GNUNET_PeerIdentity*) key,
1129                                          msize + sizeof (struct PutMessage),
1130                                          &transmit_to_peer,
1131                                          cp);
1132   return GNUNET_YES;
1133 }
1134
1135
1136 /**
1137  * Task that is run periodically to obtain blocks for content
1138  * migration
1139  * 
1140  * @param cls unused
1141  * @param tc scheduler context (also unused)
1142  */
1143 static void
1144 gather_migration_blocks (void *cls,
1145                          const struct GNUNET_SCHEDULER_TaskContext *tc);
1146
1147
1148
1149
1150 /**
1151  * Task that is run periodically to obtain blocks for DHT PUTs.
1152  * 
1153  * @param cls type of blocks to gather
1154  * @param tc scheduler context (unused)
1155  */
1156 static void
1157 gather_dht_put_blocks (void *cls,
1158                        const struct GNUNET_SCHEDULER_TaskContext *tc);
1159
1160
1161 /**
1162  * If the migration task is not currently running, consider
1163  * (re)scheduling it with the appropriate delay.
1164  */
1165 static void
1166 consider_migration_gathering ()
1167 {
1168   struct GNUNET_TIME_Relative delay;
1169
1170   if (dsh == NULL)
1171     return;
1172   if (mig_qe != NULL)
1173     return;
1174   if (mig_task != GNUNET_SCHEDULER_NO_TASK)
1175     return;
1176   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
1177                                          mig_size);
1178   delay = GNUNET_TIME_relative_divide (delay,
1179                                        MAX_MIGRATION_QUEUE);
1180   delay = GNUNET_TIME_relative_max (delay,
1181                                     min_migration_delay);
1182   mig_task = GNUNET_SCHEDULER_add_delayed (delay,
1183                                            &gather_migration_blocks,
1184                                            NULL);
1185 }
1186
1187
1188 /**
1189  * If the DHT PUT gathering task is not currently running, consider
1190  * (re)scheduling it with the appropriate delay.
1191  */
1192 static void
1193 consider_dht_put_gathering (void *cls)
1194 {
1195   struct GNUNET_TIME_Relative delay;
1196
1197   if (dsh == NULL)
1198     return;
1199   if (dht_qe != NULL)
1200     return;
1201   if (dht_task != GNUNET_SCHEDULER_NO_TASK)
1202     return;
1203   if (zero_anonymity_count_estimate > 0)
1204     {
1205       delay = GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY,
1206                                            zero_anonymity_count_estimate);
1207       delay = GNUNET_TIME_relative_min (delay,
1208                                         MAX_DHT_PUT_FREQ);
1209     }
1210   else
1211     {
1212       /* if we have NO zero-anonymity content yet, wait 5 minutes for some to
1213          (hopefully) appear */
1214       delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5);
1215     }
1216   dht_task = GNUNET_SCHEDULER_add_delayed (delay,
1217                                            &gather_dht_put_blocks,
1218                                            cls);
1219 }
1220
1221
1222 /**
1223  * Process content offered for migration.
1224  *
1225  * @param cls closure
1226  * @param key key for the content
1227  * @param size number of bytes in data
1228  * @param data content stored
1229  * @param type type of the content
1230  * @param priority priority of the content
1231  * @param anonymity anonymity-level for the content
1232  * @param expiration expiration time for the content
1233  * @param uid unique identifier for the datum;
1234  *        maybe 0 if no unique identifier is available
1235  */
1236 static void
1237 process_migration_content (void *cls,
1238                            const GNUNET_HashCode * key,
1239                            size_t size,
1240                            const void *data,
1241                            enum GNUNET_BLOCK_Type type,
1242                            uint32_t priority,
1243                            uint32_t anonymity,
1244                            struct GNUNET_TIME_Absolute
1245                            expiration, uint64_t uid)
1246 {
1247   struct MigrationReadyBlock *mb;
1248   
1249   if (key == NULL)
1250     {
1251       mig_qe = NULL;
1252       if (mig_size < MAX_MIGRATION_QUEUE)  
1253         consider_migration_gathering ();
1254       return;
1255     }
1256   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1257     {
1258       if (GNUNET_OK !=
1259           GNUNET_FS_handle_on_demand_block (key, size, data,
1260                                             type, priority, anonymity,
1261                                             expiration, uid, 
1262                                             &process_migration_content,
1263                                             NULL))
1264         {
1265           GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1266         }
1267       return;
1268     }
1269 #if DEBUG_FS
1270   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1271               "Retrieved block `%s' of type %u for migration\n",
1272               GNUNET_h2s (key),
1273               type);
1274 #endif
1275   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
1276   mb->query = *key;
1277   mb->expiration = expiration;
1278   mb->size = size;
1279   mb->type = type;
1280   memcpy (&mb[1], data, size);
1281   GNUNET_CONTAINER_DLL_insert_after (mig_head,
1282                                      mig_tail,
1283                                      mig_tail,
1284                                      mb);
1285   mig_size++;
1286   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1287                                          &consider_migration,
1288                                          mb);
1289   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1290 }
1291
1292
1293 /**
1294  * Function called upon completion of the DHT PUT operation.
1295  */
1296 static void
1297 dht_put_continuation (void *cls,
1298                       const struct GNUNET_SCHEDULER_TaskContext *tc)
1299 {
1300   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1301 }
1302
1303
1304 /**
1305  * Store content in DHT.
1306  *
1307  * @param cls closure
1308  * @param key key for the content
1309  * @param size number of bytes in data
1310  * @param data content stored
1311  * @param type type of the content
1312  * @param priority priority of the content
1313  * @param anonymity anonymity-level for the content
1314  * @param expiration expiration time for the content
1315  * @param uid unique identifier for the datum;
1316  *        maybe 0 if no unique identifier is available
1317  */
1318 static void
1319 process_dht_put_content (void *cls,
1320                          const GNUNET_HashCode * key,
1321                          size_t size,
1322                          const void *data,
1323                          enum GNUNET_BLOCK_Type type,
1324                          uint32_t priority,
1325                          uint32_t anonymity,
1326                          struct GNUNET_TIME_Absolute
1327                          expiration, uint64_t uid)
1328
1329   static unsigned int counter;
1330   static GNUNET_HashCode last_vhash;
1331   static GNUNET_HashCode vhash;
1332
1333   if (key == NULL)
1334     {
1335       dht_qe = NULL;
1336       consider_dht_put_gathering (cls);
1337       return;
1338     }
1339   /* slightly funky code to estimate the total number of values with zero
1340      anonymity from the maximum observed length of a monotonically increasing 
1341      sequence of hashes over the contents */
1342   GNUNET_CRYPTO_hash (data, size, &vhash);
1343   if (GNUNET_CRYPTO_hash_cmp (&vhash, &last_vhash) <= 0)
1344     {
1345       if (zero_anonymity_count_estimate > 0)
1346         zero_anonymity_count_estimate /= 2;
1347       counter = 0;
1348     }
1349   last_vhash = vhash;
1350   if (counter < 31)
1351     counter++;
1352   if (zero_anonymity_count_estimate < (1 << counter))
1353     zero_anonymity_count_estimate = (1 << counter);
1354 #if DEBUG_FS
1355   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1356               "Retrieved block `%s' of type %u for DHT PUT\n",
1357               GNUNET_h2s (key),
1358               type);
1359 #endif
1360   GNUNET_DHT_put (dht_handle,
1361                   key,
1362                   DEFAULT_PUT_REPLICATION,
1363                   GNUNET_DHT_RO_NONE,
1364                   type,
1365                   size,
1366                   data,
1367                   expiration,
1368                   GNUNET_TIME_UNIT_FOREVER_REL,
1369                   &dht_put_continuation,
1370                   cls);
1371 }
1372
1373
1374 /**
1375  * Task that is run periodically to obtain blocks for content
1376  * migration
1377  * 
1378  * @param cls unused
1379  * @param tc scheduler context (also unused)
1380  */
1381 static void
1382 gather_migration_blocks (void *cls,
1383                          const struct GNUNET_SCHEDULER_TaskContext *tc)
1384 {
1385   mig_task = GNUNET_SCHEDULER_NO_TASK;
1386   if (dsh != NULL)
1387     {
1388       mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, UINT_MAX,
1389                                             GNUNET_TIME_UNIT_FOREVER_REL,
1390                                             &process_migration_content, NULL);
1391       GNUNET_assert (mig_qe != NULL);
1392     }
1393 }
1394
1395
1396 /**
1397  * Task that is run periodically to obtain blocks for DHT PUTs.
1398  * 
1399  * @param cls type of blocks to gather
1400  * @param tc scheduler context (unused)
1401  */
1402 static void
1403 gather_dht_put_blocks (void *cls,
1404                        const struct GNUNET_SCHEDULER_TaskContext *tc)
1405 {
1406   dht_task = GNUNET_SCHEDULER_NO_TASK;
1407   if (dsh != NULL)
1408     {
1409       if (dht_put_type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1410         dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
1411       dht_qe = GNUNET_DATASTORE_get_zero_anonymity (dsh, 0, UINT_MAX,
1412                                                     GNUNET_TIME_UNIT_FOREVER_REL,
1413                                                     dht_put_type++,
1414                                                     &process_dht_put_content, NULL);
1415       GNUNET_assert (dht_qe != NULL);
1416     }
1417 }
1418
1419
1420 /**
1421  * We're done with a particular message list entry.
1422  * Free all associated resources.
1423  * 
1424  * @param pml entry to destroy
1425  */
1426 static void
1427 destroy_pending_message_list_entry (struct PendingMessageList *pml)
1428 {
1429   GNUNET_CONTAINER_DLL_remove (pml->req->pending_head,
1430                                pml->req->pending_tail,
1431                                pml);
1432   GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head,
1433                                pml->target->pending_messages_tail,
1434                                pml->pm);
1435   pml->target->pending_requests--;
1436   GNUNET_free (pml->pm);
1437   GNUNET_free (pml);
1438 }
1439
1440
1441 /**
1442  * Destroy the given pending message (and call the respective
1443  * continuation).
1444  *
1445  * @param pm message to destroy
1446  * @param tpid id of peer that the message was delivered to, or 0 for none
1447  */
1448 static void
1449 destroy_pending_message (struct PendingMessage *pm,
1450                          GNUNET_PEER_Id tpid)
1451 {
1452   struct PendingMessageList *pml = pm->pml;
1453   TransmissionContinuation cont;
1454   void *cont_cls;
1455
1456   cont = pm->cont;
1457   cont_cls = pm->cont_cls;
1458   if (pml != NULL)
1459     {
1460       GNUNET_assert (pml->pm == pm);
1461       GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
1462       destroy_pending_message_list_entry (pml);
1463     }
1464   else
1465     {
1466       GNUNET_free (pm);
1467     }
1468   if (cont != NULL)
1469     cont (cont_cls, tpid);  
1470 }
1471
1472
1473 /**
1474  * We're done processing a particular request.
1475  * Free all associated resources.
1476  *
1477  * @param pr request to destroy
1478  */
1479 static void
1480 destroy_pending_request (struct PendingRequest *pr)
1481 {
1482   struct GNUNET_PeerIdentity pid;
1483   unsigned int i;
1484
1485   if (pr->hnode != NULL)
1486     {
1487       GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
1488                                          pr->hnode);
1489       pr->hnode = NULL;
1490     }
1491   if (NULL == pr->client_request_list)
1492     {
1493       GNUNET_STATISTICS_update (stats,
1494                                 gettext_noop ("# P2P searches active"),
1495                                 -1,
1496                                 GNUNET_NO);
1497     }
1498   else
1499     {
1500       GNUNET_STATISTICS_update (stats,
1501                                 gettext_noop ("# client searches active"),
1502                                 -1,
1503                                 GNUNET_NO);
1504     }
1505   if (GNUNET_YES == 
1506       GNUNET_CONTAINER_multihashmap_remove (query_request_map,
1507                                             &pr->query,
1508                                             pr))
1509     {
1510       GNUNET_LOAD_update (rt_entry_lifetime,
1511                           GNUNET_TIME_absolute_get_duration (pr->start_time).rel_value);
1512     }
1513   if (pr->qe != NULL)
1514      {
1515       GNUNET_DATASTORE_cancel (pr->qe);
1516       pr->qe = NULL;
1517     }
1518   if (pr->dht_get != NULL)
1519     {
1520       GNUNET_DHT_get_stop (pr->dht_get);
1521       pr->dht_get = NULL;
1522     }
1523   if (pr->client_request_list != NULL)
1524     {
1525       GNUNET_CONTAINER_DLL_remove (pr->client_request_list->client_list->rl_head,
1526                                    pr->client_request_list->client_list->rl_tail,
1527                                    pr->client_request_list);
1528       GNUNET_free (pr->client_request_list);
1529       pr->client_request_list = NULL;
1530     }
1531   if (pr->cp != NULL)
1532     {
1533       GNUNET_PEER_resolve (pr->cp->pid,
1534                            &pid);
1535       (void) GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1536                                                    &pid.hashPubKey,
1537                                                    pr);
1538       pr->cp = NULL;
1539     }
1540   if (pr->bf != NULL)
1541     {
1542       GNUNET_CONTAINER_bloomfilter_free (pr->bf);                                        
1543       pr->bf = NULL;
1544     }
1545   if (pr->pirc != NULL)
1546     {
1547       GNUNET_CORE_peer_change_preference_cancel (pr->pirc->irc);
1548       pr->pirc->irc = NULL;
1549       pr->pirc = NULL;
1550     }
1551   if (pr->replies_seen != NULL)
1552     {
1553       GNUNET_free (pr->replies_seen);
1554       pr->replies_seen = NULL;
1555     }
1556   if (pr->task != GNUNET_SCHEDULER_NO_TASK)
1557     {
1558       GNUNET_SCHEDULER_cancel (pr->task);
1559       pr->task = GNUNET_SCHEDULER_NO_TASK;
1560     }
1561   while (NULL != pr->pending_head)    
1562     destroy_pending_message_list_entry (pr->pending_head);
1563   GNUNET_PEER_change_rc (pr->target_pid, -1);
1564   if (pr->used_targets != NULL)
1565     {
1566       for (i=0;i<pr->used_targets_off;i++)
1567         GNUNET_PEER_change_rc (pr->used_targets[i].pid, -1);
1568       GNUNET_free (pr->used_targets);
1569       pr->used_targets_off = 0;
1570       pr->used_targets_size = 0;
1571       pr->used_targets = NULL;
1572     }
1573   GNUNET_free (pr);
1574 }
1575
1576
1577 /**
1578  * Find latency information in 'atsi'.
1579  *
1580  * @param atsi performance data
1581  * @return connection latency
1582  */
1583 static struct GNUNET_TIME_Relative
1584 get_latency (const struct GNUNET_TRANSPORT_ATS_Information *atsi)
1585 {
1586   while ( (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR) &&
1587           (ntohl (atsi->type) != GNUNET_TRANSPORT_ATS_QUALITY_NET_DELAY) )
1588     atsi++;
1589   if (ntohl (atsi->type) == GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR) 
1590     {
1591       GNUNET_break (0);
1592       /* how can we not have latency data? */
1593       return GNUNET_TIME_UNIT_SECONDS;
1594     }
1595   return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
1596                                         ntohl (atsi->value));
1597 }
1598
1599
1600 /**
1601  * Method called whenever a given peer connects.
1602  *
1603  * @param cls closure, not used
1604  * @param peer peer identity this notification is about
1605  * @param atsi performance information
1606  */
1607 static void 
1608 peer_connect_handler (void *cls,
1609                       const struct
1610                       GNUNET_PeerIdentity * peer,
1611                       const struct GNUNET_TRANSPORT_ATS_Information *atsi)
1612 {
1613   struct ConnectedPeer *cp;
1614   struct MigrationReadyBlock *pos;
1615   char *fn;
1616   uint32_t trust;
1617   struct GNUNET_TIME_Relative latency;
1618
1619   latency = get_latency (atsi);
1620   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1621                                           &peer->hashPubKey);
1622   if (NULL != cp)
1623     {
1624       GNUNET_break (0);
1625       return;
1626     }
1627   cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
1628   cp->transmission_delay = GNUNET_LOAD_value_init (latency);
1629   cp->pid = GNUNET_PEER_intern (peer);
1630
1631   fn = get_trust_filename (peer);
1632   if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
1633       (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
1634     cp->disk_trust = cp->trust = ntohl (trust);
1635   GNUNET_free (fn);
1636
1637   GNUNET_break (GNUNET_OK ==
1638                 GNUNET_CONTAINER_multihashmap_put (connected_peers,
1639                                                    &peer->hashPubKey,
1640                                                    cp,
1641                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1642
1643   pos = mig_head;
1644   while (NULL != pos)
1645     {
1646       (void) consider_migration (pos, &peer->hashPubKey, cp);
1647       pos = pos->next;
1648     }
1649 }
1650
1651
1652 /**
1653  * Method called whenever a given peer has a status change.
1654  *
1655  * @param cls closure
1656  * @param peer peer identity this notification is about
1657  * @param bandwidth_in available amount of inbound bandwidth
1658  * @param bandwidth_out available amount of outbound bandwidth
1659  * @param timeout absolute time when this peer will time out
1660  *        unless we see some further activity from it
1661  * @param atsi status information
1662  */
1663 static void
1664 peer_status_handler (void *cls,
1665                      const struct
1666                      GNUNET_PeerIdentity * peer,
1667                      struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in,
1668                      struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out,
1669                      struct GNUNET_TIME_Absolute timeout,
1670                      const struct GNUNET_TRANSPORT_ATS_Information *atsi)
1671 {
1672   struct ConnectedPeer *cp;
1673   struct GNUNET_TIME_Relative latency;
1674
1675   latency = get_latency (atsi);
1676   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1677                                           &peer->hashPubKey);
1678   if (cp == NULL)
1679     {
1680       GNUNET_break (0);
1681       return;
1682     }
1683   GNUNET_LOAD_value_set_decline (cp->transmission_delay,
1684                                  latency);  
1685 }
1686
1687
1688
1689 /**
1690  * Increase the host credit by a value.
1691  *
1692  * @param host which peer to change the trust value on
1693  * @param value is the int value by which the
1694  *  host credit is to be increased or decreased
1695  * @returns the actual change in trust (positive or negative)
1696  */
1697 static int
1698 change_host_trust (struct ConnectedPeer *host, int value)
1699 {
1700   if (value == 0)
1701     return 0;
1702   GNUNET_assert (host != NULL);
1703   if (value > 0)
1704     {
1705       if (host->trust + value < host->trust)
1706         {
1707           value = UINT32_MAX - host->trust;
1708           host->trust = UINT32_MAX;
1709         }
1710       else
1711         host->trust += value;
1712     }
1713   else
1714     {
1715       if (host->trust < -value)
1716         {
1717           value = -host->trust;
1718           host->trust = 0;
1719         }
1720       else
1721         host->trust += value;
1722     }
1723   return value;
1724 }
1725
1726
1727 /**
1728  * Write host-trust information to a file - flush the buffer entry!
1729  */
1730 static int
1731 flush_trust (void *cls,
1732              const GNUNET_HashCode *key,
1733              void *value)
1734 {
1735   struct ConnectedPeer *host = value;
1736   char *fn;
1737   uint32_t trust;
1738   struct GNUNET_PeerIdentity pid;
1739
1740   if (host->trust == host->disk_trust)
1741     return GNUNET_OK;                     /* unchanged */
1742   GNUNET_PEER_resolve (host->pid,
1743                        &pid);
1744   fn = get_trust_filename (&pid);
1745   if (host->trust == 0)
1746     {
1747       if ((0 != UNLINK (fn)) && (errno != ENOENT))
1748         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
1749                                   GNUNET_ERROR_TYPE_BULK, "unlink", fn);
1750     }
1751   else
1752     {
1753       trust = htonl (host->trust);
1754       if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust, 
1755                                                     sizeof(uint32_t),
1756                                                     GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE
1757                                                     | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ))
1758         host->disk_trust = host->trust;
1759     }
1760   GNUNET_free (fn);
1761   return GNUNET_OK;
1762 }
1763
1764 /**
1765  * Call this method periodically to scan data/hosts for new hosts.
1766  */
1767 static void
1768 cron_flush_trust (void *cls,
1769                   const struct GNUNET_SCHEDULER_TaskContext *tc)
1770 {
1771
1772   if (NULL == connected_peers)
1773     return;
1774   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1775                                          &flush_trust,
1776                                          NULL);
1777   if (NULL == tc)
1778     return;
1779   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1780     return;
1781   GNUNET_SCHEDULER_add_delayed (TRUST_FLUSH_FREQ, &cron_flush_trust, NULL);
1782 }
1783
1784
1785 /**
1786  * Free (each) request made by the peer.
1787  *
1788  * @param cls closure, points to peer that the request belongs to
1789  * @param key current key code
1790  * @param value value in the hash map
1791  * @return GNUNET_YES (we should continue to iterate)
1792  */
1793 static int
1794 destroy_request (void *cls,
1795                  const GNUNET_HashCode * key,
1796                  void *value)
1797 {
1798   const struct GNUNET_PeerIdentity * peer = cls;
1799   struct PendingRequest *pr = value;
1800   
1801   GNUNET_break (GNUNET_YES ==
1802                 GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1803                                                       &peer->hashPubKey,
1804                                                       pr));
1805   destroy_pending_request (pr);
1806   return GNUNET_YES;
1807 }
1808
1809
1810 /**
1811  * Method called whenever a peer disconnects.
1812  *
1813  * @param cls closure, not used
1814  * @param peer peer identity this notification is about
1815  */
1816 static void
1817 peer_disconnect_handler (void *cls,
1818                          const struct
1819                          GNUNET_PeerIdentity * peer)
1820 {
1821   struct ConnectedPeer *cp;
1822   struct PendingMessage *pm;
1823   unsigned int i;
1824   struct MigrationReadyBlock *pos;
1825   struct MigrationReadyBlock *next;
1826
1827   GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map,
1828                                               &peer->hashPubKey,
1829                                               &destroy_request,
1830                                               (void*) peer);
1831   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1832                                           &peer->hashPubKey);
1833   if (cp == NULL)
1834     return;
1835   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1836     {
1837       if (NULL != cp->last_client_replies[i])
1838         {
1839           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1840           cp->last_client_replies[i] = NULL;
1841         }
1842     }
1843   GNUNET_break (GNUNET_YES ==
1844                 GNUNET_CONTAINER_multihashmap_remove (connected_peers,
1845                                                       &peer->hashPubKey,
1846                                                       cp));
1847   if (cp->irc != NULL)
1848     {
1849       GNUNET_CORE_peer_change_preference_cancel (cp->irc);
1850       cp->irc = NULL;
1851       cp->pr->pirc = NULL;
1852       cp->pr = NULL;
1853     }
1854
1855   /* remove this peer from migration considerations; schedule
1856      alternatives */
1857   next = mig_head;
1858   while (NULL != (pos = next))
1859     {
1860       next = pos->next;
1861       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1862         {
1863           if (pos->target_list[i] == cp->pid)
1864             {
1865               GNUNET_PEER_change_rc (pos->target_list[i], -1);
1866               pos->target_list[i] = 0;
1867             }
1868          }
1869       if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers))
1870         {
1871           delete_migration_block (pos);
1872           consider_migration_gathering ();
1873           continue;
1874         }
1875       GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1876                                              &consider_migration,
1877                                              pos);
1878     }
1879   GNUNET_PEER_change_rc (cp->pid, -1);
1880   GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1881   if (NULL != cp->cth)
1882     {
1883       GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1884       cp->cth = NULL;
1885     }
1886   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
1887     {
1888       GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task);
1889       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
1890     }
1891   while (NULL != (pm = cp->pending_messages_head))
1892     destroy_pending_message (pm, 0 /* delivery failed */);
1893   GNUNET_LOAD_value_free (cp->transmission_delay);
1894   GNUNET_break (0 == cp->pending_requests);
1895   GNUNET_free (cp);
1896 }
1897
1898
1899 /**
1900  * Iterator over hash map entries that removes all occurences
1901  * of the given 'client' from the 'last_client_replies' of the
1902  * given connected peer.
1903  *
1904  * @param cls closure, the 'struct GNUNET_SERVER_Client*' to remove
1905  * @param key current key code (unused)
1906  * @param value value in the hash map (the 'struct ConnectedPeer*' to change)
1907  * @return GNUNET_YES (we should continue to iterate)
1908  */
1909 static int
1910 remove_client_from_last_client_replies (void *cls,
1911                                         const GNUNET_HashCode * key,
1912                                         void *value)
1913 {
1914   struct GNUNET_SERVER_Client *client = cls;
1915   struct ConnectedPeer *cp = value;
1916   unsigned int i;
1917
1918   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1919     {
1920       if (cp->last_client_replies[i] == client)
1921         {
1922           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1923           cp->last_client_replies[i] = NULL;
1924         }
1925     }  
1926   return GNUNET_YES;
1927 }
1928
1929
1930 /**
1931  * A client disconnected.  Remove all of its pending queries.
1932  *
1933  * @param cls closure, NULL
1934  * @param client identification of the client
1935  */
1936 static void
1937 handle_client_disconnect (void *cls,
1938                           struct GNUNET_SERVER_Client
1939                           * client)
1940 {
1941   struct ClientList *pos;
1942   struct ClientList *prev;
1943   struct ClientRequestList *rcl;
1944   struct ClientResponseMessage *creply;
1945
1946   if (client == NULL)
1947     return;
1948   prev = NULL;
1949   pos = client_list;
1950   while ( (NULL != pos) &&
1951           (pos->client != client) )
1952     {
1953       prev = pos;
1954       pos = pos->next;
1955     }
1956   if (pos == NULL)
1957     return; /* no requests pending for this client */
1958   while (NULL != (rcl = pos->rl_head))
1959     {
1960       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1961                   "Destroying pending request `%s' on disconnect\n",
1962                   GNUNET_h2s (&rcl->req->query));
1963       destroy_pending_request (rcl->req);
1964     }
1965   if (prev == NULL)
1966     client_list = pos->next;
1967   else
1968     prev->next = pos->next;
1969   if (pos->th != NULL)
1970     {
1971       GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
1972       pos->th = NULL;
1973     }
1974   while (NULL != (creply = pos->res_head))
1975     {
1976       GNUNET_CONTAINER_DLL_remove (pos->res_head,
1977                                    pos->res_tail,
1978                                    creply);
1979       GNUNET_free (creply);
1980     }    
1981   GNUNET_SERVER_client_drop (pos->client);
1982   GNUNET_free (pos);
1983   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1984                                          &remove_client_from_last_client_replies,
1985                                          client);
1986 }
1987
1988
1989 /**
1990  * Iterator to free peer entries.
1991  *
1992  * @param cls closure, unused
1993  * @param key current key code
1994  * @param value value in the hash map (peer entry)
1995  * @return GNUNET_YES (we should continue to iterate)
1996  */
1997 static int 
1998 clean_peer (void *cls,
1999             const GNUNET_HashCode * key,
2000             void *value)
2001 {
2002   peer_disconnect_handler (NULL, (const struct GNUNET_PeerIdentity*) key);
2003   return GNUNET_YES;
2004 }
2005
2006
2007 /**
2008  * Task run during shutdown.
2009  *
2010  * @param cls unused
2011  * @param tc unused
2012  */
2013 static void
2014 shutdown_task (void *cls,
2015                const struct GNUNET_SCHEDULER_TaskContext *tc)
2016 {
2017   if (mig_qe != NULL)
2018     {
2019       GNUNET_DATASTORE_cancel (mig_qe);
2020       mig_qe = NULL;
2021     }
2022   if (dht_qe != NULL)
2023     {
2024       GNUNET_DATASTORE_cancel (dht_qe);
2025       dht_qe = NULL;
2026     }
2027   if (GNUNET_SCHEDULER_NO_TASK != mig_task)
2028     {
2029       GNUNET_SCHEDULER_cancel (mig_task);
2030       mig_task = GNUNET_SCHEDULER_NO_TASK;
2031     }
2032   if (GNUNET_SCHEDULER_NO_TASK != dht_task)
2033     {
2034       GNUNET_SCHEDULER_cancel (dht_task);
2035       dht_task = GNUNET_SCHEDULER_NO_TASK;
2036     }
2037   while (client_list != NULL)
2038     handle_client_disconnect (NULL,
2039                               client_list->client);
2040   cron_flush_trust (NULL, NULL);
2041   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
2042                                          &clean_peer,
2043                                          NULL);
2044   GNUNET_break (0 == GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap));
2045   GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
2046   requests_by_expiration_heap = 0;
2047   GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
2048   connected_peers = NULL;
2049   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map));
2050   GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
2051   query_request_map = NULL;
2052   GNUNET_LOAD_value_free (rt_entry_lifetime);
2053   rt_entry_lifetime = NULL;
2054   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map));
2055   GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
2056   peer_request_map = NULL;
2057   GNUNET_assert (NULL != core);
2058   GNUNET_CORE_disconnect (core);
2059   core = NULL;
2060   if (stats != NULL)
2061     {
2062       GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
2063       stats = NULL;
2064     }
2065   if (dsh != NULL)
2066     {
2067       GNUNET_DATASTORE_disconnect (dsh,
2068                                    GNUNET_NO);
2069       dsh = NULL;
2070     }
2071   while (mig_head != NULL)
2072     delete_migration_block (mig_head);
2073   GNUNET_assert (0 == mig_size);
2074   GNUNET_DHT_disconnect (dht_handle);
2075   dht_handle = NULL;
2076   GNUNET_LOAD_value_free (datastore_get_load);
2077   datastore_get_load = NULL;
2078   GNUNET_LOAD_value_free (datastore_put_load);
2079   datastore_put_load = NULL;
2080   GNUNET_BLOCK_context_destroy (block_ctx);
2081   block_ctx = NULL;
2082   GNUNET_CONFIGURATION_destroy (block_cfg);
2083   block_cfg = NULL;
2084   cfg = NULL;  
2085   GNUNET_free_non_null (trustDirectory);
2086   trustDirectory = NULL;
2087 }
2088
2089
2090 /* ******************* Utility functions  ******************** */
2091
2092
2093 /**
2094  * We've had to delay a request for transmission to core, but now
2095  * we should be ready.  Run it.
2096  *
2097  * @param cls the 'struct ConnectedPeer' for which a request was delayed
2098  * @param tc task context (unused)
2099  */
2100 static void
2101 delayed_transmission_request (void *cls,
2102                               const struct GNUNET_SCHEDULER_TaskContext *tc)
2103 {
2104   struct ConnectedPeer *cp = cls;
2105   struct GNUNET_PeerIdentity pid;
2106   struct PendingMessage *pm;
2107
2108   pm = cp->pending_messages_head;
2109   cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
2110   GNUNET_assert (cp->cth == NULL);
2111   if (pm == NULL)
2112     return;
2113   GNUNET_PEER_resolve (cp->pid,
2114                        &pid);
2115   cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
2116   cp->cth = GNUNET_CORE_notify_transmit_ready (core,
2117                                                pm->priority,
2118                                                GNUNET_CONSTANTS_SERVICE_TIMEOUT,
2119                                                &pid,
2120                                                pm->msize,
2121                                                &transmit_to_peer,
2122                                                cp);
2123 }
2124
2125
2126 /**
2127  * Transmit messages by copying it to the target buffer
2128  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
2129  * for writing in the meantime.  In that case, do nothing
2130  * (the disconnect or shutdown handler will take care of the rest).
2131  * If we were able to transmit messages and there are still more
2132  * pending, ask core again for further calls to this function.
2133  *
2134  * @param cls closure, pointer to the 'struct ConnectedPeer*'
2135  * @param size number of bytes available in buf
2136  * @param buf where the callee should write the message
2137  * @return number of bytes written to buf
2138  */
2139 static size_t
2140 transmit_to_peer (void *cls,
2141                   size_t size, void *buf)
2142 {
2143   struct ConnectedPeer *cp = cls;
2144   char *cbuf = buf;
2145   struct PendingMessage *pm;
2146   struct PendingMessage *next_pm;
2147   struct GNUNET_TIME_Absolute now;
2148   struct GNUNET_TIME_Relative min_delay;
2149   struct MigrationReadyBlock *mb;
2150   struct MigrationReadyBlock *next;
2151   struct PutMessage migm;
2152   size_t msize;
2153   unsigned int i;
2154   struct GNUNET_PeerIdentity pid;
2155  
2156   cp->cth = NULL;
2157   if (NULL == buf)
2158     {
2159 #if DEBUG_FS
2160       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2161                   "Dropping message, core too busy.\n");
2162 #endif
2163       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2164                   "Dropping message, core too busy.\n");
2165       GNUNET_LOAD_update (cp->transmission_delay,
2166                           UINT64_MAX);
2167       
2168       if (NULL != (pm = cp->pending_messages_head))
2169         {
2170           GNUNET_CONTAINER_DLL_remove (cp->pending_messages_head,
2171                                        cp->pending_messages_tail,
2172                                        pm);
2173           cp->pending_requests--;    
2174           destroy_pending_message (pm, 0);
2175         }
2176       if (NULL != (pm = cp->pending_messages_head))
2177         {
2178           GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == cp->delayed_transmission_request_task);
2179           min_delay = GNUNET_TIME_absolute_get_remaining (pm->delay_until);
2180           cp->delayed_transmission_request_task
2181             = GNUNET_SCHEDULER_add_delayed (min_delay,
2182                                             &delayed_transmission_request,
2183                                             cp);
2184         }
2185       return 0;
2186     }  
2187   GNUNET_LOAD_update (cp->transmission_delay,
2188                       GNUNET_TIME_absolute_get_duration (cp->last_transmission_request_start).rel_value);
2189   now = GNUNET_TIME_absolute_get ();
2190   msize = 0;
2191   min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
2192   next_pm = cp->pending_messages_head;
2193   while ( (NULL != (pm = next_pm) ) &&
2194           (pm->msize <= size) )
2195     {
2196       next_pm = pm->next;
2197       if (pm->delay_until.abs_value > now.abs_value)
2198         {
2199           min_delay = GNUNET_TIME_relative_min (min_delay,
2200                                                 GNUNET_TIME_absolute_get_remaining (pm->delay_until));
2201           continue;
2202         }
2203       memcpy (&cbuf[msize], &pm[1], pm->msize);
2204       msize += pm->msize;
2205       size -= pm->msize;
2206       if (NULL == pm->pml)
2207         {
2208           GNUNET_CONTAINER_DLL_remove (cp->pending_messages_head,
2209                                        cp->pending_messages_tail,
2210                                        pm);
2211           cp->pending_requests--;
2212         }
2213       destroy_pending_message (pm, cp->pid);
2214     }
2215   if (pm != NULL)
2216     min_delay = GNUNET_TIME_UNIT_ZERO;
2217   if (NULL != cp->pending_messages_head)
2218     {     
2219       GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == cp->delayed_transmission_request_task);
2220       cp->delayed_transmission_request_task
2221         = GNUNET_SCHEDULER_add_delayed (min_delay,
2222                                         &delayed_transmission_request,
2223                                         cp);
2224     }
2225   if (pm == NULL)
2226     {      
2227       GNUNET_PEER_resolve (cp->pid,
2228                            &pid);
2229       next = mig_head;
2230       while (NULL != (mb = next))
2231         {
2232           next = mb->next;
2233           for (i=0;i<MIGRATION_LIST_SIZE;i++)
2234             {
2235               if ( (cp->pid == mb->target_list[i]) &&
2236                    (mb->size + sizeof (migm) <= size) )
2237                 {
2238                   GNUNET_PEER_change_rc (mb->target_list[i], -1);
2239                   mb->target_list[i] = 0;
2240                   mb->used_targets++;
2241                   memset (&migm, 0, sizeof (migm));
2242                   migm.header.size = htons (sizeof (migm) + mb->size);
2243                   migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2244                   migm.type = htonl (mb->type);
2245                   migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
2246                   memcpy (&cbuf[msize], &migm, sizeof (migm));
2247                   msize += sizeof (migm);
2248                   size -= sizeof (migm);
2249                   memcpy (&cbuf[msize], &mb[1], mb->size);
2250                   msize += mb->size;
2251                   size -= mb->size;
2252 #if DEBUG_FS
2253                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2254                               "Pushing migration block `%s' (%u bytes) to `%s'\n",
2255                               GNUNET_h2s (&mb->query),
2256                               (unsigned int) mb->size,
2257                               GNUNET_i2s (&pid));
2258 #endif    
2259                   break;
2260                 }
2261               else
2262                 {
2263 #if DEBUG_FS
2264                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2265                               "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
2266                               GNUNET_h2s (&mb->query),
2267                               (unsigned int) mb->size,
2268                               GNUNET_i2s (&pid));
2269 #endif    
2270                 }
2271             }
2272           if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
2273                (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
2274             {
2275               delete_migration_block (mb);
2276               consider_migration_gathering ();
2277             }
2278         }
2279       consider_migration (NULL, 
2280                           &pid.hashPubKey,
2281                           cp);
2282     }
2283 #if DEBUG_FS
2284   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2285               "Transmitting %u bytes to peer with PID %u\n",
2286               (unsigned int) msize,
2287               (unsigned int) cp->pid);
2288 #endif
2289   return msize;
2290 }
2291
2292
2293 /**
2294  * Add a message to the set of pending messages for the given peer.
2295  *
2296  * @param cp peer to send message to
2297  * @param pm message to queue
2298  * @param pr request on which behalf this message is being queued
2299  */
2300 static void
2301 add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
2302                                   struct PendingMessage *pm,
2303                                   struct PendingRequest *pr)
2304 {
2305   struct PendingMessage *pos;
2306   struct PendingMessageList *pml;
2307   struct GNUNET_PeerIdentity pid;
2308
2309   GNUNET_assert (pm->next == NULL);
2310   GNUNET_assert (pm->pml == NULL);    
2311   if (pr != NULL)
2312     {
2313       pml = GNUNET_malloc (sizeof (struct PendingMessageList));
2314       pml->req = pr;
2315       pml->target = cp;
2316       pml->pm = pm;
2317       pm->pml = pml;  
2318       GNUNET_CONTAINER_DLL_insert (pr->pending_head,
2319                                    pr->pending_tail,
2320                                    pml);
2321     }
2322   pos = cp->pending_messages_head;
2323   while ( (pos != NULL) &&
2324           (pm->priority < pos->priority) )
2325     pos = pos->next;    
2326   GNUNET_CONTAINER_DLL_insert_after (cp->pending_messages_head,
2327                                      cp->pending_messages_tail,
2328                                      pos,
2329                                      pm);
2330   cp->pending_requests++;
2331   if (cp->pending_requests > MAX_QUEUE_PER_PEER)
2332     {
2333       GNUNET_STATISTICS_update (stats,
2334                                 gettext_noop ("# P2P searches discarded (queue length bound)"),
2335                                 1,
2336                                 GNUNET_NO);
2337       destroy_pending_message (cp->pending_messages_tail, 0);  
2338     }
2339   GNUNET_PEER_resolve (cp->pid, &pid);
2340   if (NULL != cp->cth)
2341     {
2342       GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
2343       cp->cth = NULL;
2344     }
2345   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
2346     {
2347       GNUNET_SCHEDULER_cancel (cp->delayed_transmission_request_task);
2348       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
2349     }
2350   /* need to schedule transmission */
2351   cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
2352   cp->cth = GNUNET_CORE_notify_transmit_ready (core,
2353                                                cp->pending_messages_head->priority,
2354                                                MAX_TRANSMIT_DELAY,
2355                                                &pid,
2356                                                cp->pending_messages_head->msize,
2357                                                &transmit_to_peer,
2358                                                cp);
2359   if (cp->cth == NULL)
2360     {
2361 #if DEBUG_FS
2362       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2363                   "Failed to schedule transmission with core!\n");
2364 #endif
2365       GNUNET_STATISTICS_update (stats,
2366                                 gettext_noop ("# CORE transmission failures"),
2367                                 1,
2368                                 GNUNET_NO);
2369     }
2370 }
2371
2372
2373 /**
2374  * Test if the DATABASE (GET) load on this peer is too high
2375  * to even consider processing the query at
2376  * all.  
2377  * 
2378  * @return GNUNET_YES if the load is too high to do anything (load high)
2379  *         GNUNET_NO to process normally (load normal)
2380  *         GNUNET_SYSERR to process for free (load low)
2381  */
2382 static int
2383 test_get_load_too_high (uint32_t priority)
2384 {
2385   double ld;
2386
2387   ld = GNUNET_LOAD_get_load (datastore_get_load);
2388   if (ld < 1)
2389     return GNUNET_SYSERR;    
2390   if (ld <= priority)    
2391     return GNUNET_NO;    
2392   return GNUNET_YES;
2393 }
2394
2395
2396
2397
2398 /**
2399  * Test if the DATABASE (PUT) load on this peer is too high
2400  * to even consider processing the query at
2401  * all.  
2402  * 
2403  * @return GNUNET_YES if the load is too high to do anything (load high)
2404  *         GNUNET_NO to process normally (load normal or low)
2405  */
2406 static int
2407 test_put_load_too_high (uint32_t priority)
2408 {
2409   double ld;
2410
2411   if (GNUNET_LOAD_get_average (datastore_put_load) < 50)
2412     return GNUNET_NO; /* very fast */
2413   ld = GNUNET_LOAD_get_load (datastore_put_load);
2414   if (ld < 2.0 * (1 + priority))
2415     return GNUNET_NO;
2416   GNUNET_STATISTICS_update (stats,
2417                             gettext_noop ("# storage requests dropped due to high load"),
2418                             1,
2419                             GNUNET_NO);
2420   return GNUNET_YES;
2421 }
2422
2423
2424 /* ******************* Pending Request Refresh Task ******************** */
2425
2426
2427
2428 /**
2429  * We use a random delay to make the timing of requests less
2430  * predictable.  This function returns such a random delay.  We add a base
2431  * delay of MAX_CORK_DELAY (1s).
2432  *
2433  * FIXME: make schedule dependent on the specifics of the request?
2434  * Or bandwidth and number of connected peers and load?
2435  *
2436  * @return random delay to use for some request, between 1s and 1000+TTL_DECREMENT ms
2437  */
2438 static struct GNUNET_TIME_Relative
2439 get_processing_delay ()
2440 {
2441   return 
2442     GNUNET_TIME_relative_add (GNUNET_CONSTANTS_MAX_CORK_DELAY,
2443                               GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
2444                                                              GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2445                                                                                        TTL_DECREMENT)));
2446 }
2447
2448
2449 /**
2450  * We're processing a GET request from another peer and have decided
2451  * to forward it to other peers.  This function is called periodically
2452  * and should forward the request to other peers until we have all
2453  * possible replies.  If we have transmitted the *only* reply to
2454  * the initiator we should destroy the pending request.  If we have
2455  * many replies in the queue to the initiator, we should delay sending
2456  * out more queries until the reply queue has shrunk some.
2457  *
2458  * @param cls our "struct ProcessGetContext *"
2459  * @param tc unused
2460  */
2461 static void
2462 forward_request_task (void *cls,
2463                       const struct GNUNET_SCHEDULER_TaskContext *tc);
2464
2465
2466 /**
2467  * Function called after we either failed or succeeded
2468  * at transmitting a query to a peer.  
2469  *
2470  * @param cls the requests "struct PendingRequest*"
2471  * @param tpid ID of receiving peer, 0 on transmission error
2472  */
2473 static void
2474 transmit_query_continuation (void *cls,
2475                              GNUNET_PEER_Id tpid)
2476 {
2477   struct PendingRequest *pr = cls;
2478   unsigned int i;
2479
2480   GNUNET_STATISTICS_update (stats,
2481                             gettext_noop ("# queries scheduled for forwarding"),
2482                             -1,
2483                             GNUNET_NO);
2484   if (tpid == 0)   
2485     {
2486 #if DEBUG_FS
2487       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2488                   "Transmission of request failed, will try again later.\n");
2489 #endif
2490       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2491         pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2492                                                  &forward_request_task,
2493                                                  pr); 
2494       return;    
2495     }
2496 #if DEBUG_FS
2497   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2498               "Transmitted query `%s'\n",
2499               GNUNET_h2s (&pr->query));
2500 #endif
2501   GNUNET_STATISTICS_update (stats,
2502                             gettext_noop ("# queries forwarded"),
2503                             1,
2504                             GNUNET_NO);
2505   for (i=0;i<pr->used_targets_off;i++)
2506     if (pr->used_targets[i].pid == tpid)
2507       break; /* found match! */    
2508   if (i == pr->used_targets_off)
2509     {
2510       /* need to create new entry */
2511       if (pr->used_targets_off == pr->used_targets_size)
2512         GNUNET_array_grow (pr->used_targets,
2513                            pr->used_targets_size,
2514                            pr->used_targets_size * 2 + 2);
2515       GNUNET_PEER_change_rc (tpid, 1);
2516       pr->used_targets[pr->used_targets_off].pid = tpid;
2517       pr->used_targets[pr->used_targets_off].num_requests = 0;
2518       i = pr->used_targets_off++;
2519     }
2520   pr->used_targets[i].last_request_time = GNUNET_TIME_absolute_get ();
2521   pr->used_targets[i].num_requests++;
2522   if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2523     pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2524                                              &forward_request_task,
2525                                              pr);
2526 }
2527
2528
2529 /**
2530  * How many bytes should a bloomfilter be if we have already seen
2531  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
2532  * of bits set per entry.  Furthermore, we should not re-size the
2533  * filter too often (to keep it cheap).
2534  *
2535  * Since other peers will also add entries but not resize the filter,
2536  * we should generally pick a slightly larger size than what the
2537  * strict math would suggest.
2538  *
2539  * @return must be a power of two and smaller or equal to 2^15.
2540  */
2541 static size_t
2542 compute_bloomfilter_size (unsigned int entry_count)
2543 {
2544   size_t size;
2545   unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
2546   uint16_t max = 1 << 15;
2547
2548   if (entry_count > max)
2549     return max;
2550   size = 8;
2551   while ((size < max) && (size < ideal))
2552     size *= 2;
2553   if (size > max)
2554     return max;
2555   return size;
2556 }
2557
2558
2559 /**
2560  * Recalculate our bloom filter for filtering replies.  This function
2561  * will create a new bloom filter from scratch, so it should only be
2562  * called if we have no bloomfilter at all (and hence can create a
2563  * fresh one of minimal size without problems) OR if our peer is the
2564  * initiator (in which case we may resize to larger than mimimum size).
2565  *
2566  * @param pr request for which the BF is to be recomputed
2567  */
2568 static void
2569 refresh_bloomfilter (struct PendingRequest *pr)
2570 {
2571   unsigned int i;
2572   size_t nsize;
2573   GNUNET_HashCode mhash;
2574
2575   nsize = compute_bloomfilter_size (pr->replies_seen_off);
2576   if (nsize == pr->bf_size)
2577     return; /* size not changed */
2578   if (pr->bf != NULL)
2579     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
2580   pr->bf_size = nsize;
2581   pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
2582   pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
2583                                               pr->bf_size,
2584                                               BLOOMFILTER_K);
2585   for (i=0;i<pr->replies_seen_off;i++)
2586     {
2587       GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i],
2588                                 pr->mingle,
2589                                 &mhash);
2590       GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
2591     }
2592 }
2593
2594
2595 /**
2596  * Function called after we've tried to reserve a certain amount of
2597  * bandwidth for a reply.  Check if we succeeded and if so send our
2598  * query.
2599  *
2600  * @param cls the requests "struct PendingRequest*"
2601  * @param peer identifies the peer
2602  * @param bpm_out set to the current bandwidth limit (sending) for this peer
2603  * @param amount set to the amount that was actually reserved or unreserved
2604  * @param preference current traffic preference for the given peer
2605  */
2606 static void
2607 target_reservation_cb (void *cls,
2608                        const struct
2609                        GNUNET_PeerIdentity * peer,
2610                        struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
2611                        int amount,
2612                        uint64_t preference)
2613 {
2614   struct PendingRequest *pr = cls;
2615   struct ConnectedPeer *cp;
2616   struct PendingMessage *pm;
2617   struct GetMessage *gm;
2618   GNUNET_HashCode *ext;
2619   char *bfdata;
2620   size_t msize;
2621   unsigned int k;
2622   int no_route;
2623   uint32_t bm;
2624   unsigned int i;
2625
2626   /* (3) transmit, update ttl/priority */
2627   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2628                                           &peer->hashPubKey);
2629   if (cp == NULL)
2630     {
2631       /* Peer must have just left */
2632 #if DEBUG_FS
2633       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2634                   "Selected peer disconnected!\n");
2635 #endif
2636       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2637         pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2638                                                  &forward_request_task,
2639                                                  pr);
2640       return;
2641     }
2642   cp->irc = NULL;
2643   pr->pirc = NULL;
2644   if (peer == NULL)
2645     {
2646       /* error in communication with core, try again later */
2647       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2648         pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2649                                                  &forward_request_task,
2650                                                  pr);
2651       return;
2652     }
2653   no_route = GNUNET_NO;
2654   if (amount == 0)
2655     {
2656       if (pr->cp == NULL)
2657         {
2658 #if DEBUG_FS > 1
2659           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2660                       "Failed to reserve bandwidth for reply (got %d/%u bytes only)!\n",
2661                       amount,
2662                       DBLOCK_SIZE);
2663 #endif
2664           GNUNET_STATISTICS_update (stats,
2665                                     gettext_noop ("# reply bandwidth reservation requests failed"),
2666                                     1,
2667                                     GNUNET_NO);
2668           if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2669             pr->task = GNUNET_SCHEDULER_add_delayed (get_processing_delay (),
2670                                                      &forward_request_task,
2671                                                      pr);
2672           return;  /* this target round failed */
2673         }
2674       no_route = GNUNET_YES;
2675     }
2676   
2677   GNUNET_STATISTICS_update (stats,
2678                             gettext_noop ("# queries scheduled for forwarding"),
2679                             1,
2680                             GNUNET_NO);
2681   for (i=0;i<pr->used_targets_off;i++)
2682     if (pr->used_targets[i].pid == cp->pid) 
2683       {
2684         GNUNET_STATISTICS_update (stats,
2685                                   gettext_noop ("# queries retransmitted to same target"),
2686                                   1,
2687                                   GNUNET_NO);
2688         break;
2689       } 
2690
2691   /* build message and insert message into priority queue */
2692 #if DEBUG_FS
2693   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2694               "Forwarding request `%s' to `%4s'!\n",
2695               GNUNET_h2s (&pr->query),
2696               GNUNET_i2s (peer));
2697 #endif
2698   k = 0;
2699   bm = 0;
2700   if (GNUNET_YES == no_route)
2701     {
2702       bm |= GET_MESSAGE_BIT_RETURN_TO;
2703       k++;      
2704     }
2705   if (pr->namespace != NULL)
2706     {
2707       bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
2708       k++;
2709     }
2710   if (pr->target_pid != 0)
2711     {
2712       bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
2713       k++;
2714     }
2715   msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
2716   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
2717   pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
2718   pm->msize = msize;
2719   gm = (struct GetMessage*) &pm[1];
2720   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
2721   gm->header.size = htons (msize);
2722   gm->type = htonl (pr->type);
2723   pr->remaining_priority /= 2;
2724   gm->priority = htonl (pr->remaining_priority);
2725   gm->ttl = htonl (pr->ttl);
2726   gm->filter_mutator = htonl(pr->mingle); 
2727   gm->hash_bitmap = htonl (bm);
2728   gm->query = pr->query;
2729   ext = (GNUNET_HashCode*) &gm[1];
2730   k = 0;
2731   if (GNUNET_YES == no_route)
2732     GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2733   if (pr->namespace != NULL)
2734     memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
2735   if (pr->target_pid != 0)
2736     GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2737   bfdata = (char *) &ext[k];
2738   if (pr->bf != NULL)
2739     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
2740                                                bfdata,
2741                                                pr->bf_size);
2742   pm->cont = &transmit_query_continuation;
2743   pm->cont_cls = pr;
2744   cp->last_request_times[(cp->last_request_times_off++) % MAX_QUEUE_PER_PEER] = GNUNET_TIME_absolute_get ();
2745   add_to_pending_messages_for_peer (cp, pm, pr);
2746 }
2747
2748
2749 /**
2750  * Closure used for "target_peer_select_cb".
2751  */
2752 struct PeerSelectionContext 
2753 {
2754   /**
2755    * The request for which we are selecting
2756    * peers.
2757    */
2758   struct PendingRequest *pr;
2759
2760   /**
2761    * Current "prime" target.
2762    */
2763   struct GNUNET_PeerIdentity target;
2764
2765   /**
2766    * How much do we like this target?
2767    */
2768   double target_score;
2769
2770   /**
2771    * Does it make sense to we re-try quickly again?
2772    */
2773   int fast_retry;
2774
2775 };
2776
2777
2778 /**
2779  * Function called for each connected peer to determine
2780  * which one(s) would make good targets for forwarding.
2781  *
2782  * @param cls closure (struct PeerSelectionContext)
2783  * @param key current key code (peer identity)
2784  * @param value value in the hash map (struct ConnectedPeer)
2785  * @return GNUNET_YES if we should continue to
2786  *         iterate,
2787  *         GNUNET_NO if not.
2788  */
2789 static int
2790 target_peer_select_cb (void *cls,
2791                        const GNUNET_HashCode * key,
2792                        void *value)
2793 {
2794   struct PeerSelectionContext *psc = cls;
2795   struct ConnectedPeer *cp = value;
2796   struct PendingRequest *pr = psc->pr;
2797   struct GNUNET_TIME_Relative delay;
2798   double score;
2799   unsigned int i;
2800   unsigned int pc;
2801
2802   /* 1) check that this peer is not the initiator */
2803   if (cp == pr->cp)     
2804     {
2805 #if DEBUG_FS
2806       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2807                   "Skipping initiator in forwarding selection\n");
2808 #endif
2809       return GNUNET_YES; /* skip */        
2810     }
2811   if (cp->irc != NULL)
2812     {
2813       psc->fast_retry = GNUNET_YES;
2814       return GNUNET_YES; /* skip: already querying core about this peer for other reasons */
2815     }
2816
2817   /* 2) check if we have already (recently) forwarded to this peer */
2818   /* 2a) this particular request */
2819   pc = 0;
2820   for (i=0;i<pr->used_targets_off;i++)
2821     if (pr->used_targets[i].pid == cp->pid) 
2822       {
2823         pc = pr->used_targets[i].num_requests;
2824         GNUNET_assert (pc > 0);
2825         /* FIXME: make re-enabling a peer independent of how often
2826            this function is called??? */
2827         if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2828                                            RETRY_PROBABILITY_INV * pc))
2829           {
2830 #if DEBUG_FS
2831             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2832                         "NOT re-trying query that was previously transmitted %u times\n",
2833                         (unsigned int) pc);
2834 #endif
2835             return GNUNET_YES; /* skip */
2836           }
2837         break;
2838       }
2839 #if DEBUG_FS
2840   if (0 < pc)
2841     {
2842       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2843                   "Re-trying query that was previously transmitted %u times to this peer\n",
2844                   (unsigned int) pc);
2845     }
2846 #endif
2847   /* 2b) many other requests to this peer */
2848   delay = GNUNET_TIME_absolute_get_duration (cp->last_request_times[cp->last_request_times_off % MAX_QUEUE_PER_PEER]);
2849   if (delay.rel_value <= cp->avg_delay.rel_value)
2850     {
2851 #if DEBUG_FS
2852       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2853                   "NOT sending query since we send %u others to this peer in the last %llums\n",
2854                   MAX_QUEUE_PER_PEER,
2855                   cp->avg_delay.rel_value);
2856 #endif
2857       return GNUNET_YES; /* skip */      
2858     }
2859
2860   /* 3) calculate how much we'd like to forward to this peer,
2861      starting with a random value that is strong enough
2862      to at least give any peer a chance sometimes 
2863      (compared to the other factors that come later) */
2864   /* 3a) count successful (recent) routes from cp for same source */
2865   if (pr->cp != NULL)
2866     {
2867       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2868                                         P2P_SUCCESS_LIST_SIZE);
2869       for (i=0;i<P2P_SUCCESS_LIST_SIZE;i++)
2870         if (cp->last_p2p_replies[i] == pr->cp->pid)
2871           score += 1.0; /* likely successful based on hot path */
2872     }
2873   else
2874     {
2875       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2876                                         CS2P_SUCCESS_LIST_SIZE);
2877       for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
2878         if (cp->last_client_replies[i] == pr->client_request_list->client_list->client)
2879           score += 1.0; /* likely successful based on hot path */
2880     }
2881   /* 3b) include latency */
2882   if (cp->avg_delay.rel_value < 4 * TTL_DECREMENT)
2883     score += 1.0; /* likely fast based on latency */
2884   /* 3c) include priorities */
2885   if (cp->avg_priority <= pr->remaining_priority / 2.0)
2886     score += 1.0; /* likely successful based on priorities */
2887   /* 3d) penalize for queue size */  
2888   score -= (2.0 * cp->pending_requests / (double) MAX_QUEUE_PER_PEER); 
2889   /* 3e) include peer proximity */
2890   score -= (2.0 * (GNUNET_CRYPTO_hash_distance_u32 (key,
2891                                                     &pr->query)) / (double) UINT32_MAX);
2892   /* 4) super-bonus for being the known target */
2893   if (pr->target_pid == cp->pid)
2894     score += 100.0;
2895   /* store best-fit in closure */
2896   score++; /* avoid zero */
2897   if (score > psc->target_score)
2898     {
2899       psc->target_score = score;
2900       psc->target.hashPubKey = *key; 
2901     }
2902 #if DEBUG_FS
2903   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2904               "Peer `%s' gets score %f for forwarding query, max is %8f\n",
2905               GNUNET_h2s (key),
2906               score,
2907               psc->target_score);
2908 #endif
2909   return GNUNET_YES;
2910 }
2911   
2912
2913 /**
2914  * The priority level imposes a bound on the maximum
2915  * value for the ttl that can be requested.
2916  *
2917  * @param ttl_in requested ttl
2918  * @param prio given priority
2919  * @return ttl_in if ttl_in is below the limit,
2920  *         otherwise the ttl-limit for the given priority
2921  */
2922 static int32_t
2923 bound_ttl (int32_t ttl_in, uint32_t prio)
2924 {
2925   unsigned long long allowed;
2926
2927   if (ttl_in <= 0)
2928     return ttl_in;
2929   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
2930   if (ttl_in > allowed)      
2931     {
2932       if (allowed >= (1 << 30))
2933         return 1 << 30;
2934       return allowed;
2935     }
2936   return ttl_in;
2937 }
2938
2939
2940 /**
2941  * Iterator called on each result obtained for a DHT
2942  * operation that expects a reply
2943  *
2944  * @param cls closure
2945  * @param exp when will this value expire
2946  * @param key key of the result
2947  * @param get_path NULL-terminated array of pointers
2948  *                 to the peers on reverse GET path (or NULL if not recorded)
2949  * @param put_path NULL-terminated array of pointers
2950  *                 to the peers on the PUT path (or NULL if not recorded)
2951  * @param type type of the result
2952  * @param size number of bytes in data
2953  * @param data pointer to the result data
2954  */
2955 static void
2956 process_dht_reply (void *cls,
2957                    struct GNUNET_TIME_Absolute exp,
2958                    const GNUNET_HashCode * key,
2959                    const struct GNUNET_PeerIdentity * const *get_path,
2960                    const struct GNUNET_PeerIdentity * const *put_path,
2961                    enum GNUNET_BLOCK_Type type,
2962                    size_t size,
2963                    const void *data);
2964
2965
2966 /**
2967  * We're processing a GET request and have decided
2968  * to forward it to other peers.  This function is called periodically
2969  * and should forward the request to other peers until we have all
2970  * possible replies.  If we have transmitted the *only* reply to
2971  * the initiator we should destroy the pending request.  If we have
2972  * many replies in the queue to the initiator, we should delay sending
2973  * out more queries until the reply queue has shrunk some.
2974  *
2975  * @param cls our "struct ProcessGetContext *"
2976  * @param tc unused
2977  */
2978 static void
2979 forward_request_task (void *cls,
2980                      const struct GNUNET_SCHEDULER_TaskContext *tc)
2981 {
2982   struct PendingRequest *pr = cls;
2983   struct PeerSelectionContext psc;
2984   struct ConnectedPeer *cp; 
2985   struct GNUNET_TIME_Relative delay;
2986
2987   pr->task = GNUNET_SCHEDULER_NO_TASK;
2988   if (pr->pirc != NULL)
2989     {
2990 #if DEBUG_FS
2991       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2992                   "Forwarding of query `%s' not attempted due to pending local lookup!\n",
2993                   GNUNET_h2s (&pr->query));
2994 #endif
2995       return; /* already pending */
2996     }
2997   if (GNUNET_YES == pr->local_only)
2998     return; /* configured to not do P2P search */
2999   /* (0) try DHT */
3000   if ( (0 == pr->anonymity_level) &&
3001        (GNUNET_YES != pr->forward_only) &&
3002        (pr->type != GNUNET_BLOCK_TYPE_FS_DBLOCK) &&
3003        (pr->type != GNUNET_BLOCK_TYPE_FS_IBLOCK) )
3004     {
3005       pr->dht_get = GNUNET_DHT_get_start (dht_handle,
3006                                           GNUNET_TIME_UNIT_FOREVER_REL,
3007                                           pr->type,
3008                                           &pr->query,
3009                                           DEFAULT_GET_REPLICATION,
3010                                           GNUNET_DHT_RO_NONE,
3011                                           pr->bf,
3012                                           pr->mingle,
3013                                           pr->namespace,
3014                                           (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
3015                                           &process_dht_reply,
3016                                           pr);
3017     }
3018   /* (1) select target */
3019   psc.pr = pr;
3020   psc.target_score = -DBL_MAX;
3021   psc.fast_retry = GNUNET_NO;
3022   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
3023                                          &target_peer_select_cb,
3024                                          &psc);  
3025   if (psc.target_score == -DBL_MAX)
3026     {
3027       if (psc.fast_retry == GNUNET_YES)
3028         delay = GNUNET_TIME_UNIT_MILLISECONDS; /* FIXME: store adaptive fast-retry value in 'pr' */
3029       else
3030         delay = get_processing_delay ();
3031 #if DEBUG_FS 
3032       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3033                   "No peer selected for forwarding of query `%s', will try again in %llu ms!\n",
3034                   GNUNET_h2s (&pr->query),
3035                   delay.rel_value);
3036 #endif
3037       pr->task = GNUNET_SCHEDULER_add_delayed (delay,
3038                                                &forward_request_task,
3039                                                pr);
3040       return; /* nobody selected */
3041     }
3042   /* (3) update TTL/priority */
3043   if (pr->client_request_list != NULL)
3044     {
3045       /* FIXME: use better algorithm!? */
3046       if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3047                                          4))
3048         pr->priority++;
3049       /* bound priority we use by priorities we see from other peers
3050          rounded up (must round up so that we can see non-zero
3051          priorities, but round up as little as possible to make it
3052          plausible that we forwarded another peers request) */
3053       if (pr->priority > current_priorities + 1.0)
3054         pr->priority = (uint32_t) current_priorities + 1.0;
3055       pr->ttl = bound_ttl (pr->ttl + TTL_DECREMENT * 2,
3056                            pr->priority);
3057 #if DEBUG_FS
3058       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3059                   "Trying query `%s' with priority %u and TTL %d.\n",
3060                   GNUNET_h2s (&pr->query),
3061                   pr->priority,
3062                   pr->ttl);
3063 #endif
3064     }
3065
3066   /* (3) reserve reply bandwidth */
3067   if (GNUNET_NO == pr->forward_only)
3068     {
3069       cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3070                                               &psc.target.hashPubKey);
3071       GNUNET_assert (NULL != cp);
3072       GNUNET_assert (cp->irc == NULL);
3073       pr->pirc = cp;
3074       cp->pr = pr;
3075       cp->irc = GNUNET_CORE_peer_change_preference (core,
3076                                                     &psc.target,
3077                                                     GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
3078                                                     GNUNET_BANDWIDTH_value_init (UINT32_MAX),
3079                                                     DBLOCK_SIZE * 2, 
3080                                                     cp->inc_preference,
3081                                                     &target_reservation_cb,
3082                                                     pr);
3083       GNUNET_assert (cp->irc != NULL);
3084       cp->inc_preference = 0;
3085     }
3086   else
3087     {
3088       /* force forwarding */
3089       static struct GNUNET_BANDWIDTH_Value32NBO zerobw;
3090       target_reservation_cb (pr, &psc.target,
3091                              zerobw, 0, 0.0);
3092     }
3093 }
3094
3095
3096 /* **************************** P2P PUT Handling ************************ */
3097
3098
3099 /**
3100  * Function called after we either failed or succeeded
3101  * at transmitting a reply to a peer.  
3102  *
3103  * @param cls the requests "struct PendingRequest*"
3104  * @param tpid ID of receiving peer, 0 on transmission error
3105  */
3106 static void
3107 transmit_reply_continuation (void *cls,
3108                              GNUNET_PEER_Id tpid)
3109 {
3110   struct PendingRequest *pr = cls;
3111   
3112   switch (pr->type)
3113     {
3114     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
3115     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
3116       /* only one reply expected, done with the request! */
3117       destroy_pending_request (pr);
3118       break;
3119     case GNUNET_BLOCK_TYPE_ANY:
3120     case GNUNET_BLOCK_TYPE_FS_KBLOCK:
3121     case GNUNET_BLOCK_TYPE_FS_SBLOCK:
3122       break;
3123     default:
3124       GNUNET_break (0);
3125       break;
3126     }
3127 }
3128
3129
3130 /**
3131  * Transmit the given message by copying it to the target buffer
3132  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
3133  * for writing in the meantime.  In that case, do nothing
3134  * (the disconnect or shutdown handler will take care of the rest).
3135  * If we were able to transmit messages and there are still more
3136  * pending, ask core again for further calls to this function.
3137  *
3138  * @param cls closure, pointer to the 'struct ClientList*'
3139  * @param size number of bytes available in buf
3140  * @param buf where the callee should write the message
3141  * @return number of bytes written to buf
3142  */
3143 static size_t
3144 transmit_to_client (void *cls,
3145                   size_t size, void *buf)
3146 {
3147   struct ClientList *cl = cls;
3148   char *cbuf = buf;
3149   struct ClientResponseMessage *creply;
3150   size_t msize;
3151   
3152   cl->th = NULL;
3153   if (NULL == buf)
3154     {
3155 #if DEBUG_FS
3156       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3157                   "Not sending reply, client communication problem.\n");
3158 #endif
3159       return 0;
3160     }
3161   msize = 0;
3162   while ( (NULL != (creply = cl->res_head) ) &&
3163           (creply->msize <= size) )
3164     {
3165       memcpy (&cbuf[msize], &creply[1], creply->msize);
3166       msize += creply->msize;
3167       size -= creply->msize;
3168       GNUNET_CONTAINER_DLL_remove (cl->res_head,
3169                                    cl->res_tail,
3170                                    creply);
3171       GNUNET_free (creply);
3172     }
3173   if (NULL != creply)
3174     cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
3175                                                   creply->msize,
3176                                                   GNUNET_TIME_UNIT_FOREVER_REL,
3177                                                   &transmit_to_client,
3178                                                   cl);
3179 #if DEBUG_FS
3180   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3181               "Transmitted %u bytes to client\n",
3182               (unsigned int) msize);
3183 #endif
3184   return msize;
3185 }
3186
3187
3188 /**
3189  * Closure for "process_reply" function.
3190  */
3191 struct ProcessReplyClosure
3192 {
3193   /**
3194    * The data for the reply.
3195    */
3196   const void *data;
3197
3198   /**
3199    * Who gave us this reply? NULL for local host (or DHT)
3200    */
3201   struct ConnectedPeer *sender;
3202
3203   /**
3204    * When the reply expires.
3205    */
3206   struct GNUNET_TIME_Absolute expiration;
3207
3208   /**
3209    * Size of data.
3210    */
3211   size_t size;
3212
3213   /**
3214    * Type of the block.
3215    */
3216   enum GNUNET_BLOCK_Type type;
3217
3218   /**
3219    * How much was this reply worth to us?
3220    */
3221   uint32_t priority;
3222
3223   /**
3224    * Evaluation result (returned).
3225    */
3226   enum GNUNET_BLOCK_EvaluationResult eval;
3227
3228   /**
3229    * Did we finish processing the associated request?
3230    */ 
3231   int finished;
3232
3233   /**
3234    * Did we find a matching request?
3235    */
3236   int request_found;
3237 };
3238
3239
3240 /**
3241  * We have received a reply; handle it!
3242  *
3243  * @param cls response (struct ProcessReplyClosure)
3244  * @param key our query
3245  * @param value value in the hash map (info about the query)
3246  * @return GNUNET_YES (we should continue to iterate)
3247  */
3248 static int
3249 process_reply (void *cls,
3250                const GNUNET_HashCode * key,
3251                void *value)
3252 {
3253   struct ProcessReplyClosure *prq = cls;
3254   struct PendingRequest *pr = value;
3255   struct PendingMessage *reply;
3256   struct ClientResponseMessage *creply;
3257   struct ClientList *cl;
3258   struct PutMessage *pm;
3259   struct ConnectedPeer *cp;
3260   struct GNUNET_TIME_Relative cur_delay;
3261 #if SUPPORT_DELAYS  
3262 struct GNUNET_TIME_Relative art_delay;
3263 #endif
3264   size_t msize;
3265   unsigned int i;
3266
3267 #if DEBUG_FS
3268   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3269               "Matched result (type %u) for query `%s' with pending request\n",
3270               (unsigned int) prq->type,
3271               GNUNET_h2s (key));
3272 #endif  
3273   GNUNET_STATISTICS_update (stats,
3274                             gettext_noop ("# replies received and matched"),
3275                             1,
3276                             GNUNET_NO);
3277   if (prq->sender != NULL)
3278     {
3279       for (i=0;i<pr->used_targets_off;i++)
3280         if (pr->used_targets[i].pid == prq->sender->pid)
3281           break;
3282       if (i < pr->used_targets_off)
3283         {
3284           cur_delay = GNUNET_TIME_absolute_get_duration (pr->used_targets[i].last_request_time);      
3285           prq->sender->avg_delay.rel_value
3286             = (prq->sender->avg_delay.rel_value * 
3287                (RUNAVG_DELAY_N - 1) + cur_delay.rel_value) / RUNAVG_DELAY_N; 
3288           prq->sender->avg_priority
3289             = (prq->sender->avg_priority * 
3290                (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
3291         }
3292       if (pr->cp != NULL)
3293         {
3294           GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
3295                                  [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], 
3296                                  -1);
3297           GNUNET_PEER_change_rc (pr->cp->pid, 1);
3298           prq->sender->last_p2p_replies
3299             [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
3300             = pr->cp->pid;
3301         }
3302       else
3303         {
3304           if (NULL != prq->sender->last_client_replies
3305               [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
3306             GNUNET_SERVER_client_drop (prq->sender->last_client_replies
3307                                        [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
3308           prq->sender->last_client_replies
3309             [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
3310             = pr->client_request_list->client_list->client;
3311           GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
3312         }
3313     }
3314   prq->eval = GNUNET_BLOCK_evaluate (block_ctx,
3315                                      prq->type,
3316                                      key,
3317                                      &pr->bf,
3318                                      pr->mingle,
3319                                      pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
3320                                      prq->data,
3321                                      prq->size);
3322   switch (prq->eval)
3323     {
3324     case GNUNET_BLOCK_EVALUATION_OK_MORE:
3325       break;
3326     case GNUNET_BLOCK_EVALUATION_OK_LAST:
3327       while (NULL != pr->pending_head)
3328         destroy_pending_message_list_entry (pr->pending_head);
3329       if (pr->qe != NULL)
3330         {
3331           if (pr->client_request_list != NULL)
3332             GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
3333                                         GNUNET_YES);
3334           GNUNET_DATASTORE_cancel (pr->qe);
3335           pr->qe = NULL;
3336         }
3337       pr->do_remove = GNUNET_YES;
3338       if (pr->task != GNUNET_SCHEDULER_NO_TASK)
3339         {
3340           GNUNET_SCHEDULER_cancel (pr->task);
3341           pr->task = GNUNET_SCHEDULER_NO_TASK;
3342         }
3343       GNUNET_break (GNUNET_YES ==
3344                     GNUNET_CONTAINER_multihashmap_remove (query_request_map,
3345                                                           key,
3346                                                           pr));
3347       GNUNET_LOAD_update (rt_entry_lifetime,
3348                           GNUNET_TIME_absolute_get_duration (pr->start_time).rel_value);
3349       break;
3350     case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
3351       GNUNET_STATISTICS_update (stats,
3352                                 gettext_noop ("# duplicate replies discarded (bloomfilter)"),
3353                                 1,
3354                                 GNUNET_NO);
3355 #if DEBUG_FS
3356 /*      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3357                   "Duplicate response `%s', discarding.\n",
3358                   GNUNET_h2s (&mhash));*/
3359 #endif
3360       return GNUNET_YES; /* duplicate */
3361     case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
3362       return GNUNET_YES; /* wrong namespace */  
3363     case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
3364       GNUNET_break (0);
3365       return GNUNET_YES;
3366     case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
3367       GNUNET_break (0);
3368       return GNUNET_YES;
3369     case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
3370       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3371                   _("Unsupported block type %u\n"),
3372                   prq->type);
3373       return GNUNET_NO;
3374     }
3375   if (pr->client_request_list != NULL)
3376     {
3377       if (pr->replies_seen_size == pr->replies_seen_off)
3378         GNUNET_array_grow (pr->replies_seen,
3379                            pr->replies_seen_size,
3380                            pr->replies_seen_size * 2 + 4);      
3381       GNUNET_CRYPTO_hash (prq->data,
3382                           prq->size,
3383                           &pr->replies_seen[pr->replies_seen_off++]);         
3384       refresh_bloomfilter (pr);
3385     }
3386   if (NULL == prq->sender)
3387     {
3388 #if DEBUG_FS
3389       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3390                   "Found result for query `%s' in local datastore\n",
3391                   GNUNET_h2s (key));
3392 #endif
3393       GNUNET_STATISTICS_update (stats,
3394                                 gettext_noop ("# results found locally"),
3395                                 1,
3396                                 GNUNET_NO);      
3397     }
3398   prq->priority += pr->remaining_priority;
3399   pr->remaining_priority = 0;
3400   pr->results_found++;
3401   prq->request_found = GNUNET_YES;
3402   if (NULL != pr->client_request_list)
3403     {
3404       GNUNET_STATISTICS_update (stats,
3405                                 gettext_noop ("# replies received for local clients"),
3406                                 1,
3407                                 GNUNET_NO);
3408       cl = pr->client_request_list->client_list;
3409       msize = sizeof (struct PutMessage) + prq->size;
3410       creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage));
3411       creply->msize = msize;
3412       creply->client_list = cl;
3413       GNUNET_CONTAINER_DLL_insert_after (cl->res_head,
3414                                          cl->res_tail,
3415                                          cl->res_tail,
3416                                          creply);      
3417       pm = (struct PutMessage*) &creply[1];
3418       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3419       pm->header.size = htons (msize);
3420       pm->type = htonl (prq->type);
3421       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3422       memcpy (&pm[1], prq->data, prq->size);      
3423       if (NULL == cl->th)
3424         {
3425 #if DEBUG_FS
3426           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3427                       "Transmitting result for query `%s' to client\n",
3428                       GNUNET_h2s (key));
3429 #endif  
3430           cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
3431                                                         msize,
3432                                                         GNUNET_TIME_UNIT_FOREVER_REL,
3433                                                         &transmit_to_client,
3434                                                         cl);
3435         }
3436       GNUNET_break (cl->th != NULL);
3437       if (pr->do_remove)                
3438         {
3439           prq->finished = GNUNET_YES;
3440           destroy_pending_request (pr);         
3441         }
3442     }
3443   else
3444     {
3445       cp = pr->cp;
3446 #if DEBUG_FS
3447       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3448                   "Transmitting result for query `%s' to other peer (PID=%u)\n",
3449                   GNUNET_h2s (key),
3450                   (unsigned int) cp->pid);
3451 #endif  
3452       GNUNET_STATISTICS_update (stats,
3453                                 gettext_noop ("# replies received for other peers"),
3454                                 1,
3455                                 GNUNET_NO);
3456       msize = sizeof (struct PutMessage) + prq->size;
3457       reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
3458       reply->cont = &transmit_reply_continuation;
3459       reply->cont_cls = pr;
3460 #if SUPPORT_DELAYS
3461       art_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
3462                                                  GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3463                                                                            TTL_DECREMENT));
3464       reply->delay_until 
3465         = GNUNET_TIME_relative_to_absolute (art_delay);
3466       GNUNET_STATISTICS_update (stats,
3467                                 gettext_noop ("cummulative artificial delay introduced (ms)"),
3468                                 art_delay.abs_value,
3469                                 GNUNET_NO);
3470 #endif
3471       reply->msize = msize;
3472       reply->priority = UINT32_MAX; /* send replies first! */
3473       pm = (struct PutMessage*) &reply[1];
3474       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3475       pm->header.size = htons (msize);
3476       pm->type = htonl (prq->type);
3477       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3478       memcpy (&pm[1], prq->data, prq->size);
3479       add_to_pending_messages_for_peer (cp, reply, pr);
3480     }
3481   return GNUNET_YES;
3482 }
3483
3484
3485 /**
3486  * Iterator called on each result obtained for a DHT
3487  * operation that expects a reply
3488  *
3489  * @param cls closure
3490  * @param exp when will this value expire
3491  * @param key key of the result
3492  * @param get_path NULL-terminated array of pointers
3493  *                 to the peers on reverse GET path (or NULL if not recorded)
3494  * @param put_path NULL-terminated array of pointers
3495  *                 to the peers on the PUT path (or NULL if not recorded)
3496  * @param type type of the result
3497  * @param size number of bytes in data
3498  * @param data pointer to the result data
3499  */
3500 static void
3501 process_dht_reply (void *cls,
3502                    struct GNUNET_TIME_Absolute exp,
3503                    const GNUNET_HashCode * key,
3504                    const struct GNUNET_PeerIdentity * const *get_path,
3505                    const struct GNUNET_PeerIdentity * const *put_path,
3506                    enum GNUNET_BLOCK_Type type,
3507                    size_t size,
3508                    const void *data)
3509 {
3510   struct PendingRequest *pr = cls;
3511   struct ProcessReplyClosure prq;
3512
3513   memset (&prq, 0, sizeof (prq));
3514   prq.data = data;
3515   prq.expiration = exp;
3516   prq.size = size;  
3517   prq.type = type;
3518   process_reply (&prq, key, pr);
3519 }
3520
3521
3522
3523 /**
3524  * Continuation called to notify client about result of the
3525  * operation.
3526  *
3527  * @param cls closure
3528  * @param success GNUNET_SYSERR on failure
3529  * @param msg NULL on success, otherwise an error message
3530  */
3531 static void 
3532 put_migration_continuation (void *cls,
3533                             int success,
3534                             const char *msg)
3535 {
3536   struct GNUNET_TIME_Absolute *start = cls;
3537   struct GNUNET_TIME_Relative delay;
3538   
3539   delay = GNUNET_TIME_absolute_get_duration (*start);
3540   GNUNET_free (start);
3541   GNUNET_LOAD_update (datastore_put_load,
3542                       delay.rel_value);
3543   if (GNUNET_OK == success)
3544     return;
3545   GNUNET_STATISTICS_update (stats,
3546                             gettext_noop ("# datastore 'put' failures"),
3547                             1,
3548                             GNUNET_NO);
3549 }
3550
3551
3552 /**
3553  * Handle P2P "PUT" message.
3554  *
3555  * @param cls closure, always NULL
3556  * @param other the other peer involved (sender or receiver, NULL
3557  *        for loopback messages where we are both sender and receiver)
3558  * @param message the actual message
3559  * @param atsi performance information
3560  * @return GNUNET_OK to keep the connection open,
3561  *         GNUNET_SYSERR to close it (signal serious error)
3562  */
3563 static int
3564 handle_p2p_put (void *cls,
3565                 const struct GNUNET_PeerIdentity *other,
3566                 const struct GNUNET_MessageHeader *message,
3567                 const struct GNUNET_TRANSPORT_ATS_Information *atsi)
3568 {
3569   const struct PutMessage *put;
3570   uint16_t msize;
3571   size_t dsize;
3572   enum GNUNET_BLOCK_Type type;
3573   struct GNUNET_TIME_Absolute expiration;
3574   GNUNET_HashCode query;
3575   struct ProcessReplyClosure prq;
3576   struct GNUNET_TIME_Absolute *start;
3577   struct GNUNET_TIME_Relative block_time;  
3578   double putl;
3579   struct ConnectedPeer *cp; 
3580   struct PendingMessage *pm;
3581   struct MigrationStopMessage *msm;
3582
3583   msize = ntohs (message->size);
3584   if (msize < sizeof (struct PutMessage))
3585     {
3586       GNUNET_break_op(0);
3587       return GNUNET_SYSERR;
3588     }
3589   put = (const struct PutMessage*) message;
3590   dsize = msize - sizeof (struct PutMessage);
3591   type = ntohl (put->type);
3592   expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
3593
3594   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3595     return GNUNET_SYSERR;
3596   if (GNUNET_OK !=
3597       GNUNET_BLOCK_get_key (block_ctx,
3598                             type,
3599                             &put[1],
3600                             dsize,
3601                             &query))
3602     {
3603       GNUNET_break_op (0);
3604       return GNUNET_SYSERR;
3605     }
3606 #if DEBUG_FS
3607   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3608               "Received result for query `%s' from peer `%4s'\n",
3609               GNUNET_h2s (&query),
3610               GNUNET_i2s (other));
3611 #endif
3612   GNUNET_STATISTICS_update (stats,
3613                             gettext_noop ("# replies received (overall)"),
3614                             1,
3615                             GNUNET_NO);
3616   /* now, lookup 'query' */
3617   prq.data = (const void*) &put[1];
3618   if (other != NULL)
3619     prq.sender = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3620                                                     &other->hashPubKey);
3621   else
3622     prq.sender = NULL;
3623   prq.size = dsize;
3624   prq.type = type;
3625   prq.expiration = expiration;
3626   prq.priority = 0;
3627   prq.finished = GNUNET_NO;
3628   prq.request_found = GNUNET_NO;
3629   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3630                                               &query,
3631                                               &process_reply,
3632                                               &prq);
3633   if (prq.sender != NULL)
3634     {
3635       prq.sender->inc_preference += CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority;
3636       change_host_trust (prq.sender, prq.priority);
3637     }
3638   if ( (GNUNET_YES == active_to_migration) &&
3639        (GNUNET_NO == test_put_load_too_high (prq.priority)) )
3640     {      
3641 #if DEBUG_FS
3642       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3643                   "Replicating result for query `%s' with priority %u\n",
3644                   GNUNET_h2s (&query),
3645                   prq.priority);
3646 #endif
3647       start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
3648       *start = GNUNET_TIME_absolute_get ();
3649       GNUNET_DATASTORE_put (dsh,
3650                             0, &query, dsize, &put[1],
3651                             type, prq.priority, 1 /* anonymity */, 
3652                             expiration, 
3653                             1 + prq.priority, MAX_DATASTORE_QUEUE,
3654                             GNUNET_CONSTANTS_SERVICE_TIMEOUT,
3655                             &put_migration_continuation, 
3656                             start);
3657     }
3658   putl = GNUNET_LOAD_get_load (datastore_put_load);
3659   if ( (GNUNET_NO == prq.request_found) &&
3660        ( (GNUNET_YES != active_to_migration) ||
3661          (putl > 2.5 * (1 + prq.priority)) ) )
3662     {
3663       cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3664                                               &other->hashPubKey);
3665       if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).rel_value < 5000)
3666         return GNUNET_OK; /* already blocked */
3667       /* We're too busy; send MigrationStop message! */
3668       if (GNUNET_YES != active_to_migration) 
3669         putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
3670       block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
3671                                                   5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3672                                                                                    (unsigned int) (60000 * putl * putl)));
3673       
3674       cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
3675       pm = GNUNET_malloc (sizeof (struct PendingMessage) + 
3676                           sizeof (struct MigrationStopMessage));
3677       pm->msize = sizeof (struct MigrationStopMessage);
3678       pm->priority = UINT32_MAX;
3679       msm = (struct MigrationStopMessage*) &pm[1];
3680       msm->header.size = htons (sizeof (struct MigrationStopMessage));
3681       msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
3682       msm->duration = GNUNET_TIME_relative_hton (block_time);
3683       add_to_pending_messages_for_peer (cp,
3684                                         pm,
3685                                         NULL);
3686     }
3687   return GNUNET_OK;
3688 }
3689
3690
3691 /**
3692  * Handle P2P "MIGRATION_STOP" message.
3693  *
3694  * @param cls closure, always NULL
3695  * @param other the other peer involved (sender or receiver, NULL
3696  *        for loopback messages where we are both sender and receiver)
3697  * @param message the actual message
3698  * @param atsi performance information
3699  * @return GNUNET_OK to keep the connection open,
3700  *         GNUNET_SYSERR to close it (signal serious error)
3701  */
3702 static int
3703 handle_p2p_migration_stop (void *cls,
3704                            const struct GNUNET_PeerIdentity *other,
3705                            const struct GNUNET_MessageHeader *message,
3706                            const struct GNUNET_TRANSPORT_ATS_Information *atsi)
3707 {
3708   struct ConnectedPeer *cp; 
3709   const struct MigrationStopMessage *msm;
3710
3711   msm = (const struct MigrationStopMessage*) message;
3712   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3713                                           &other->hashPubKey);
3714   if (cp == NULL)
3715     {
3716       GNUNET_break (0);
3717       return GNUNET_OK;
3718     }
3719   cp->migration_blocked = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration));
3720   return GNUNET_OK;
3721 }
3722
3723
3724
3725 /* **************************** P2P GET Handling ************************ */
3726
3727
3728 /**
3729  * Closure for 'check_duplicate_request_{peer,client}'.
3730  */
3731 struct CheckDuplicateRequestClosure
3732 {
3733   /**
3734    * The new request we should check if it already exists.
3735    */
3736   const struct PendingRequest *pr;
3737
3738   /**
3739    * Existing request found by the checker, NULL if none.
3740    */
3741   struct PendingRequest *have;
3742 };
3743
3744
3745 /**
3746  * Iterator over entries in the 'query_request_map' that
3747  * tries to see if we have the same request pending from
3748  * the same client already.
3749  *
3750  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
3751  * @param key current key code (query, ignored, must match)
3752  * @param value value in the hash map (a 'struct PendingRequest' 
3753  *              that already exists)
3754  * @return GNUNET_YES if we should continue to
3755  *         iterate (no match yet)
3756  *         GNUNET_NO if not (match found).
3757  */
3758 static int
3759 check_duplicate_request_client (void *cls,
3760                                 const GNUNET_HashCode * key,
3761                                 void *value)
3762 {
3763   struct CheckDuplicateRequestClosure *cdc = cls;
3764   struct PendingRequest *have = value;
3765
3766   if (have->client_request_list == NULL)
3767     return GNUNET_YES;
3768   if ( (cdc->pr->client_request_list->client_list->client == have->client_request_list->client_list->client) &&
3769        (cdc->pr != have) )
3770     {
3771       cdc->have = have;
3772       return GNUNET_NO;
3773     }
3774   return GNUNET_YES;
3775 }
3776
3777
3778 /**
3779  * We're processing (local) results for a search request
3780  * from another peer.  Pass applicable results to the
3781  * peer and if we are done either clean up (operation
3782  * complete) or forward to other peers (more results possible).
3783  *
3784  * @param cls our closure (struct LocalGetContext)
3785  * @param key key for the content
3786  * @param size number of bytes in data
3787  * @param data content stored
3788  * @param type type of the content
3789  * @param priority priority of the content
3790  * @param anonymity anonymity-level for the content
3791  * @param expiration expiration time for the content
3792  * @param uid unique identifier for the datum;
3793  *        maybe 0 if no unique identifier is available
3794  */
3795 static void
3796 process_local_reply (void *cls,
3797                      const GNUNET_HashCode * key,
3798                      size_t size,
3799                      const void *data,
3800                      enum GNUNET_BLOCK_Type type,
3801                      uint32_t priority,
3802                      uint32_t anonymity,
3803                      struct GNUNET_TIME_Absolute
3804                      expiration, 
3805                      uint64_t uid)
3806 {
3807   struct PendingRequest *pr = cls;
3808   struct ProcessReplyClosure prq;
3809   struct CheckDuplicateRequestClosure cdrc;
3810   GNUNET_HashCode query;
3811   unsigned int old_rf;
3812   
3813   if (NULL == key)
3814     {
3815 #if DEBUG_FS > 1
3816       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3817                   "Done processing local replies, forwarding request to other peers.\n");
3818 #endif
3819       pr->qe = NULL;
3820       if (pr->client_request_list != NULL)
3821         {
3822           GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
3823                                       GNUNET_YES);
3824           /* Figure out if this is a duplicate request and possibly
3825              merge 'struct PendingRequest' entries */
3826           cdrc.have = NULL;
3827           cdrc.pr = pr;
3828           GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3829                                                       &pr->query,
3830                                                       &check_duplicate_request_client,
3831                                                       &cdrc);
3832           if (cdrc.have != NULL)
3833             {
3834 #if DEBUG_FS
3835               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3836                           "Received request for block `%s' twice from client, will only request once.\n",
3837                           GNUNET_h2s (&pr->query));
3838 #endif
3839               
3840               destroy_pending_request (pr);
3841               return;
3842             }
3843         }
3844       if (pr->local_only == GNUNET_YES)
3845         {
3846           destroy_pending_request (pr);
3847           return;
3848         }
3849       /* no more results */
3850       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
3851         pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task,
3852                                              pr);      
3853       return;
3854     }
3855 #if DEBUG_FS
3856   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3857               "New local response to `%s' of type %u.\n",
3858               GNUNET_h2s (key),
3859               type);
3860 #endif
3861   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3862     {
3863 #if DEBUG_FS
3864       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3865                   "Found ONDEMAND block, performing on-demand encoding\n");
3866 #endif
3867       GNUNET_STATISTICS_update (stats,
3868                                 gettext_noop ("# on-demand blocks matched requests"),
3869                                 1,
3870                                 GNUNET_NO);
3871       if (GNUNET_OK != 
3872           GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, 
3873                                             anonymity, expiration, uid, 
3874                                             &process_local_reply,
3875                                             pr))
3876       if (pr->qe != NULL)
3877         {
3878           GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3879         }
3880       return;
3881     }
3882   old_rf = pr->results_found;
3883   memset (&prq, 0, sizeof (prq));
3884   prq.data = data;
3885   prq.expiration = expiration;
3886   prq.size = size;  
3887   if (GNUNET_OK != 
3888       GNUNET_BLOCK_get_key (block_ctx,
3889                             type,
3890                             data,
3891                             size,
3892                             &query))
3893     {
3894       GNUNET_break (0);
3895       GNUNET_DATASTORE_remove (dsh,
3896                                key,
3897                                size, data,
3898                                -1, -1, 
3899                                GNUNET_TIME_UNIT_FOREVER_REL,
3900                                NULL, NULL);
3901       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3902       return;
3903     }
3904   prq.type = type;
3905   prq.priority = priority;  
3906   prq.finished = GNUNET_NO;
3907   prq.request_found = GNUNET_NO;
3908   if ( (old_rf == 0) &&
3909        (pr->results_found == 0) )
3910     update_datastore_delays (pr->start_time);
3911   process_reply (&prq, key, pr);
3912   if (prq.finished == GNUNET_YES)
3913     return;
3914   if (pr->qe == NULL)
3915     return; /* done here */
3916   if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
3917     {
3918       pr->local_only = GNUNET_YES; /* do not forward */
3919       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3920       return;
3921     }
3922   if ( (pr->client_request_list == NULL) &&
3923        ( (GNUNET_YES == test_get_load_too_high (0)) ||
3924          (pr->results_found > 5 + 2 * pr->priority) ) )
3925     {
3926 #if DEBUG_FS > 2
3927       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3928                   "Load too high, done with request\n");
3929 #endif
3930       GNUNET_STATISTICS_update (stats,
3931                                 gettext_noop ("# processing result set cut short due to load"),
3932                                 1,
3933                                 GNUNET_NO);
3934       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3935       return;
3936     }
3937   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3938 }
3939
3940
3941 /**
3942  * We've received a request with the specified priority.  Bound it
3943  * according to how much we trust the given peer.
3944  * 
3945  * @param prio_in requested priority
3946  * @param cp the peer making the request
3947  * @return effective priority
3948  */
3949 static int32_t
3950 bound_priority (uint32_t prio_in,
3951                 struct ConnectedPeer *cp)
3952 {
3953 #define N ((double)128.0)
3954   uint32_t ret;
3955   double rret;
3956   int ld;
3957
3958   ld = test_get_load_too_high (0);
3959   if (ld == GNUNET_SYSERR)
3960     {
3961       GNUNET_STATISTICS_update (stats,
3962                                 gettext_noop ("# requests done for free (low load)"),
3963                                 1,
3964                                 GNUNET_NO);
3965       return 0; /* excess resources */
3966     }
3967   if (prio_in > INT32_MAX)
3968     prio_in = INT32_MAX;
3969   ret = - change_host_trust (cp, - (int) prio_in);
3970   if (ret > 0)
3971     {
3972       if (ret > current_priorities + N)
3973         rret = current_priorities + N;
3974       else
3975         rret = ret;
3976       current_priorities 
3977         = (current_priorities * (N-1) + rret)/N;
3978     }
3979   if ( (ld == GNUNET_YES) && (ret > 0) )
3980     {
3981       /* try with charging */
3982       ld = test_get_load_too_high (ret);
3983     }
3984   if (ld == GNUNET_YES)
3985     {
3986       GNUNET_STATISTICS_update (stats,
3987                                 gettext_noop ("# request dropped, priority insufficient"),
3988                                 1,
3989                                 GNUNET_NO);
3990       /* undo charge */
3991       change_host_trust (cp, (int) ret);
3992       return -1; /* not enough resources */
3993     }
3994   else
3995     {
3996       GNUNET_STATISTICS_update (stats,
3997                                 gettext_noop ("# requests done for a price (normal load)"),
3998                                 1,
3999                                 GNUNET_NO);
4000     }
4001 #undef N
4002   return ret;
4003 }
4004
4005
4006 /**
4007  * Iterator over entries in the 'query_request_map' that
4008  * tries to see if we have the same request pending from
4009  * the same peer already.
4010  *
4011  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
4012  * @param key current key code (query, ignored, must match)
4013  * @param value value in the hash map (a 'struct PendingRequest' 
4014  *              that already exists)
4015  * @return GNUNET_YES if we should continue to
4016  *         iterate (no match yet)
4017  *         GNUNET_NO if not (match found).
4018  */
4019 static int
4020 check_duplicate_request_peer (void *cls,
4021                               const GNUNET_HashCode * key,
4022                               void *value)
4023 {
4024   struct CheckDuplicateRequestClosure *cdc = cls;
4025   struct PendingRequest *have = value;
4026
4027   if (cdc->pr->target_pid == have->target_pid)
4028     {
4029       cdc->have = have;
4030       return GNUNET_NO;
4031     }
4032   return GNUNET_YES;
4033 }
4034
4035
4036 /**
4037  * Handle P2P "GET" request.
4038  *
4039  * @param cls closure, always NULL
4040  * @param other the other peer involved (sender or receiver, NULL
4041  *        for loopback messages where we are both sender and receiver)
4042  * @param message the actual message
4043  * @param atsi performance information
4044  * @return GNUNET_OK to keep the connection open,
4045  *         GNUNET_SYSERR to close it (signal serious error)
4046  */
4047 static int
4048 handle_p2p_get (void *cls,
4049                 const struct GNUNET_PeerIdentity *other,
4050                 const struct GNUNET_MessageHeader *message,
4051                 const struct GNUNET_TRANSPORT_ATS_Information *atsi)
4052 {
4053   struct PendingRequest *pr;
4054   struct ConnectedPeer *cp;
4055   struct ConnectedPeer *cps;
4056   struct CheckDuplicateRequestClosure cdc;
4057   struct GNUNET_TIME_Relative timeout;
4058   uint16_t msize;
4059   const struct GetMessage *gm;
4060   unsigned int bits;
4061   const GNUNET_HashCode *opt;
4062   uint32_t bm;
4063   size_t bfsize;
4064   uint32_t ttl_decrement;
4065   int32_t priority;
4066   enum GNUNET_BLOCK_Type type;
4067   int have_ns;
4068
4069   msize = ntohs(message->size);
4070   if (msize < sizeof (struct GetMessage))
4071     {
4072       GNUNET_break_op (0);
4073       return GNUNET_SYSERR;
4074     }
4075   gm = (const struct GetMessage*) message;
4076 #if DEBUG_FS
4077   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4078               "Received request for `%s'\n",
4079               GNUNET_h2s (&gm->query));
4080 #endif
4081   type = ntohl (gm->type);
4082   bm = ntohl (gm->hash_bitmap);
4083   bits = 0;
4084   while (bm > 0)
4085     {
4086       if (1 == (bm & 1))
4087         bits++;
4088       bm >>= 1;
4089     }
4090   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
4091     {
4092       GNUNET_break_op (0);
4093       return GNUNET_SYSERR;
4094     }  
4095   opt = (const GNUNET_HashCode*) &gm[1];
4096   bfsize = msize - sizeof (struct GetMessage) - bits * sizeof (GNUNET_HashCode);
4097   /* bfsize must be power of 2, check! */
4098   if (0 != ( (bfsize - 1) & bfsize))
4099     {
4100       GNUNET_break_op (0);
4101       return GNUNET_SYSERR;
4102     }
4103   bm = ntohl (gm->hash_bitmap);
4104   bits = 0;
4105   cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
4106                                            &other->hashPubKey);
4107   if (NULL == cps)
4108     {
4109       /* peer must have just disconnected */
4110       GNUNET_STATISTICS_update (stats,
4111                                 gettext_noop ("# requests dropped due to initiator not being connected"),
4112                                 1,
4113                                 GNUNET_NO);
4114       return GNUNET_SYSERR;
4115     }
4116   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
4117     cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
4118                                             &opt[bits++]);
4119   else
4120     cp = cps;
4121   if (cp == NULL)
4122     {
4123 #if DEBUG_FS
4124       if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
4125         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4126                     "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
4127                     GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
4128       
4129       else
4130         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4131                     "Failed to find peer `%4s' in connection set. Dropping query.\n",
4132                     GNUNET_i2s (other));
4133 #endif
4134       GNUNET_STATISTICS_update (stats,
4135                                 gettext_noop ("# requests dropped due to missing reverse route"),
4136                                 1,
4137                                 GNUNET_NO);
4138      /* FIXME: try connect? */
4139       return GNUNET_OK;
4140     }
4141   /* note that we can really only check load here since otherwise
4142      peers could find out that we are overloaded by not being
4143      disconnected after sending us a malformed query... */
4144   priority = bound_priority (ntohl (gm->priority), cps);
4145   if (priority < 0)
4146     {
4147 #if DEBUG_FS
4148       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4149                   "Dropping query from `%s', this peer is too busy.\n",
4150                   GNUNET_i2s (other));
4151 #endif
4152       return GNUNET_OK;
4153     }
4154 #if DEBUG_FS 
4155   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4156               "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
4157               GNUNET_h2s (&gm->query),
4158               (unsigned int) type,
4159               GNUNET_i2s (other),
4160               (unsigned int) bm);
4161 #endif
4162   have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
4163   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
4164                       (have_ns ? sizeof(GNUNET_HashCode) : 0));
4165   if (have_ns)
4166     {
4167       pr->namespace = (GNUNET_HashCode*) &pr[1];
4168       memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
4169     }
4170   if ( (GNUNET_LOAD_get_load (cp->transmission_delay) > 3 * (1 + priority)) ||
4171        (GNUNET_LOAD_get_average (cp->transmission_delay) > 
4172         GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value * 2 + GNUNET_LOAD_get_average (rt_entry_lifetime)) )
4173     {
4174       /* don't have BW to send to peer, or would likely take longer than we have for it,
4175          so at best indirect the query */
4176       priority = 0;
4177       pr->forward_only = GNUNET_YES;
4178     }
4179   pr->type = type;
4180   pr->mingle = ntohl (gm->filter_mutator);
4181   if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
4182     pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
4183   pr->anonymity_level = 1;
4184   pr->priority = (uint32_t) priority;
4185   pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
4186   pr->query = gm->query;
4187   /* decrement ttl (always) */
4188   ttl_decrement = 2 * TTL_DECREMENT +
4189     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
4190                               TTL_DECREMENT);
4191   if ( (pr->ttl < 0) &&
4192        (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
4193     {
4194 #if DEBUG_FS
4195       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4196                   "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
4197                   GNUNET_i2s (other),
4198                   pr->ttl,
4199                   ttl_decrement);
4200 #endif
4201       GNUNET_STATISTICS_update (stats,
4202                                 gettext_noop ("# requests dropped due TTL underflow"),
4203                                 1,
4204                                 GNUNET_NO);
4205       /* integer underflow => drop (should be very rare)! */      
4206       GNUNET_free (pr);
4207       return GNUNET_OK;
4208     } 
4209   pr->ttl -= ttl_decrement;
4210   pr->start_time = GNUNET_TIME_absolute_get ();
4211
4212   /* get bloom filter */
4213   if (bfsize > 0)
4214     {
4215       pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
4216                                                   bfsize,
4217                                                   BLOOMFILTER_K);
4218       pr->bf_size = bfsize;
4219     }
4220   cdc.have = NULL;
4221   cdc.pr = pr;
4222   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
4223                                               &gm->query,
4224                                               &check_duplicate_request_peer,
4225                                               &cdc);
4226   if (cdc.have != NULL)
4227     {
4228       if (cdc.have->start_time.abs_value + cdc.have->ttl >=
4229           pr->start_time.abs_value + pr->ttl)
4230         {
4231           /* existing request has higher TTL, drop new one! */
4232           cdc.have->priority += pr->priority;
4233           destroy_pending_request (pr);
4234 #if DEBUG_FS
4235           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4236                       "Have existing request with higher TTL, dropping new request.\n",
4237                       GNUNET_i2s (other));
4238 #endif
4239           GNUNET_STATISTICS_update (stats,
4240                                     gettext_noop ("# requests dropped due to higher-TTL request"),
4241                                     1,
4242                                     GNUNET_NO);
4243           return GNUNET_OK;
4244         }
4245       else
4246         {
4247           /* existing request has lower TTL, drop old one! */
4248           pr->priority += cdc.have->priority;
4249           /* Possible optimization: if we have applicable pending
4250              replies in 'cdc.have', we might want to move those over
4251              (this is a really rare special-case, so it is not clear
4252              that this would be worth it) */
4253           destroy_pending_request (cdc.have);
4254           /* keep processing 'pr'! */
4255         }
4256     }
4257
4258   pr->cp = cp;
4259   GNUNET_break (GNUNET_OK ==
4260                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
4261                                                    &gm->query,
4262                                                    pr,
4263                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4264   GNUNET_break (GNUNET_OK ==
4265                 GNUNET_CONTAINER_multihashmap_put (peer_request_map,
4266                                                    &other->hashPubKey,
4267                                                    pr,
4268                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4269   
4270   pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
4271                                             pr,
4272                                             pr->start_time.abs_value + pr->ttl);
4273
4274   GNUNET_STATISTICS_update (stats,
4275                             gettext_noop ("# P2P searches received"),
4276                             1,
4277                             GNUNET_NO);
4278   GNUNET_STATISTICS_update (stats,
4279                             gettext_noop ("# P2P searches active"),
4280                             1,
4281                             GNUNET_NO);
4282
4283   /* calculate change in traffic preference */
4284   cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
4285   /* process locally */
4286   if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
4287     type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
4288   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
4289                                            (pr->priority + 1)); 
4290   if (GNUNET_YES != pr->forward_only)
4291     {
4292 #if DEBUG_FS
4293       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4294                   "Handing request for `%s' to datastore\n",
4295                   GNUNET_h2s (&gm->query));
4296 #endif
4297       pr->qe = GNUNET_DATASTORE_get (dsh,
4298                                      &gm->query,
4299                                      type,                             
4300                                      pr->priority + 1,
4301                                      MAX_DATASTORE_QUEUE,                                
4302                                      timeout,
4303                                      &process_local_reply,
4304                                      pr);
4305       if (NULL == pr->qe)
4306         {
4307           GNUNET_STATISTICS_update (stats,
4308                                     gettext_noop ("# requests dropped by datastore (queue length limit)"),
4309                                     1,
4310                                     GNUNET_NO);
4311         }
4312     }
4313   else
4314     {
4315       GNUNET_STATISTICS_update (stats,
4316                                 gettext_noop ("# requests forwarded due to high load"),
4317                                 1,
4318                                 GNUNET_NO);
4319     }
4320
4321   /* Are multiple results possible (and did we look locally)?  If so, start processing remotely now! */
4322   switch (pr->type)
4323     {
4324     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
4325     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
4326       /* only one result, wait for datastore */
4327       if (GNUNET_YES != pr->forward_only)
4328         {
4329           GNUNET_STATISTICS_update (stats,
4330                                     gettext_noop ("# requests not instantly forwarded (waiting for datastore)"),
4331                                     1,
4332                                     GNUNET_NO);
4333           break;
4334         }
4335     default:
4336       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
4337         pr->task = GNUNET_SCHEDULER_add_now (&forward_request_task,
4338                                              pr);
4339     }
4340
4341   /* make sure we don't track too many requests */
4342   if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
4343     {
4344       pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
4345       GNUNET_assert (pr != NULL);
4346       destroy_pending_request (pr);
4347     }
4348   return GNUNET_OK;
4349 }
4350
4351
4352 /* **************************** CS GET Handling ************************ */
4353
4354
4355 /**
4356  * Handle START_SEARCH-message (search request from client).
4357  *
4358  * @param cls closure
4359  * @param client identification of the client
4360  * @param message the actual message
4361  */
4362 static void
4363 handle_start_search (void *cls,
4364                      struct GNUNET_SERVER_Client *client,
4365                      const struct GNUNET_MessageHeader *message)
4366 {
4367   static GNUNET_HashCode all_zeros;
4368   const struct SearchMessage *sm;
4369   struct ClientList *cl;
4370   struct ClientRequestList *crl;
4371   struct PendingRequest *pr;
4372   uint16_t msize;
4373   unsigned int sc;
4374   enum GNUNET_BLOCK_Type type;
4375
4376   msize = ntohs (message->size);
4377   if ( (msize < sizeof (struct SearchMessage)) ||
4378        (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
4379     {
4380       GNUNET_break (0);
4381       GNUNET_SERVER_receive_done (client,
4382                                   GNUNET_SYSERR);
4383       return;
4384     }
4385   GNUNET_STATISTICS_update (stats,
4386                             gettext_noop ("# client searches received"),
4387                             1,
4388                             GNUNET_NO);
4389   sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
4390   sm = (const struct SearchMessage*) message;
4391   type = ntohl (sm->type);
4392 #if DEBUG_FS
4393   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4394               "Received request for `%s' of type %u from local client\n",
4395               GNUNET_h2s (&sm->query),
4396               (unsigned int) type);
4397 #endif
4398   cl = client_list;
4399   while ( (cl != NULL) &&
4400           (cl->client != client) )
4401     cl = cl->next;
4402   if (cl == NULL)
4403     {
4404       cl = GNUNET_malloc (sizeof (struct ClientList));
4405       cl->client = client;
4406       GNUNET_SERVER_client_keep (client);
4407       cl->next = client_list;
4408       client_list = cl;
4409     }
4410   /* detect duplicate KBLOCK requests */
4411   if ( (type == GNUNET_BLOCK_TYPE_FS_KBLOCK) ||
4412        (type == GNUNET_BLOCK_TYPE_FS_NBLOCK) ||
4413        (type == GNUNET_BLOCK_TYPE_ANY) )
4414     {
4415       crl = cl->rl_head;
4416       while ( (crl != NULL) &&
4417               ( (0 != memcmp (&crl->req->query,
4418                               &sm->query,
4419                               sizeof (GNUNET_HashCode))) ||
4420                 (crl->req->type != type) ) )
4421         crl = crl->next;
4422       if (crl != NULL)  
4423         { 
4424 #if DEBUG_FS
4425           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4426                       "Have existing request, merging content-seen lists.\n");
4427 #endif
4428           pr = crl->req;
4429           /* Duplicate request (used to send long list of
4430              known/blocked results); merge 'pr->replies_seen'
4431              and update bloom filter */
4432           GNUNET_array_grow (pr->replies_seen,
4433                              pr->replies_seen_size,
4434                              pr->replies_seen_off + sc);
4435           memcpy (&pr->replies_seen[pr->replies_seen_off],
4436                   &sm[1],
4437                   sc * sizeof (GNUNET_HashCode));
4438           pr->replies_seen_off += sc;
4439           refresh_bloomfilter (pr);
4440           GNUNET_STATISTICS_update (stats,
4441                                     gettext_noop ("# client searches updated (merged content seen list)"),
4442                                     1,
4443                                     GNUNET_NO);
4444           GNUNET_SERVER_receive_done (client,
4445                                       GNUNET_OK);
4446           return;
4447         }
4448     }
4449   GNUNET_STATISTICS_update (stats,
4450                             gettext_noop ("# client searches active"),
4451                             1,
4452                             GNUNET_NO);
4453   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
4454                       ((type == GNUNET_BLOCK_TYPE_FS_SBLOCK) ? sizeof(GNUNET_HashCode) : 0));
4455   crl = GNUNET_malloc (sizeof (struct ClientRequestList));
4456   memset (crl, 0, sizeof (struct ClientRequestList));
4457   crl->client_list = cl;
4458   GNUNET_CONTAINER_DLL_insert (cl->rl_head,
4459                                cl->rl_tail,
4460                                crl);  
4461   crl->req = pr;
4462   pr->type = type;
4463   pr->client_request_list = crl;
4464   GNUNET_array_grow (pr->replies_seen,
4465                      pr->replies_seen_size,
4466                      sc);
4467   memcpy (pr->replies_seen,
4468           &sm[1],
4469           sc * sizeof (GNUNET_HashCode));
4470   pr->replies_seen_off = sc;
4471   pr->anonymity_level = ntohl (sm->anonymity_level); 
4472   pr->start_time = GNUNET_TIME_absolute_get ();
4473   refresh_bloomfilter (pr);
4474   pr->query = sm->query;
4475   if (0 == (1 & ntohl (sm->options)))
4476     pr->local_only = GNUNET_NO;
4477   else
4478     pr->local_only = GNUNET_YES;
4479   switch (type)
4480     {
4481     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
4482     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
4483       if (0 != memcmp (&sm->target,
4484                        &all_zeros,
4485                        sizeof (GNUNET_HashCode)))
4486         pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target);
4487       break;
4488     case GNUNET_BLOCK_TYPE_FS_SBLOCK:
4489       pr->namespace = (GNUNET_HashCode*) &pr[1];
4490       memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode));
4491       break;
4492     default:
4493       break;
4494     }
4495   GNUNET_break (GNUNET_OK ==
4496                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
4497                                                    &sm->query,
4498                                                    pr,
4499                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4500   if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
4501     type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */
4502   pr->qe = GNUNET_DATASTORE_get (dsh,
4503                                  &sm->query,
4504                                  type,
4505                                  -3, -1,
4506                                  GNUNET_CONSTANTS_SERVICE_TIMEOUT,                             
4507                                  &process_local_reply,
4508                                  pr);
4509 }
4510
4511
4512 /* **************************** Startup ************************ */
4513
4514 /**
4515  * Process fs requests.
4516  *
4517  * @param server the initialized server
4518  * @param c configuration to use
4519  */
4520 static int
4521 main_init (struct GNUNET_SERVER_Handle *server,
4522            const struct GNUNET_CONFIGURATION_Handle *c)
4523 {
4524   static const struct GNUNET_CORE_MessageHandler p2p_handlers[] =
4525     {
4526       { &handle_p2p_get, 
4527         GNUNET_MESSAGE_TYPE_FS_GET, 0 },
4528       { &handle_p2p_put, 
4529         GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
4530       { &handle_p2p_migration_stop, 
4531         GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP,
4532         sizeof (struct MigrationStopMessage) },
4533       { NULL, 0, 0 }
4534     };
4535   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
4536     {&GNUNET_FS_handle_index_start, NULL, 
4537      GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
4538     {&GNUNET_FS_handle_index_list_get, NULL, 
4539      GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
4540     {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
4541      sizeof (struct UnindexMessage) },
4542     {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
4543      0 },
4544     {NULL, NULL, 0, 0}
4545   };
4546   unsigned long long enc = 128;
4547
4548   cfg = c;
4549   stats = GNUNET_STATISTICS_create ("fs", cfg);
4550   min_migration_delay = GNUNET_TIME_UNIT_SECONDS;
4551   if ( (GNUNET_OK !=
4552         GNUNET_CONFIGURATION_get_value_number (cfg,
4553                                                "fs",
4554                                                "MAX_PENDING_REQUESTS",
4555                                                &max_pending_requests)) ||
4556        (GNUNET_OK !=
4557         GNUNET_CONFIGURATION_get_value_number (cfg,
4558                                                "fs",
4559                                                "EXPECTED_NEIGHBOUR_COUNT",
4560                                                &enc)) ||
4561        (GNUNET_OK != 
4562         GNUNET_CONFIGURATION_get_value_time (cfg,
4563                                              "fs",
4564                                              "MIN_MIGRATION_DELAY",
4565                                              &min_migration_delay)) )
4566     {
4567       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4568                   _("Configuration fails to specify certain parameters, assuming default values."));
4569     }
4570   connected_peers = GNUNET_CONTAINER_multihashmap_create (enc); 
4571   query_request_map = GNUNET_CONTAINER_multihashmap_create (max_pending_requests);
4572   rt_entry_lifetime = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_FOREVER_REL);
4573   peer_request_map = GNUNET_CONTAINER_multihashmap_create (enc);
4574   requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
4575   core = GNUNET_CORE_connect (cfg,
4576                               1, /* larger? */
4577                               NULL,
4578                               NULL,
4579                               &peer_connect_handler,
4580                               &peer_disconnect_handler,
4581                               &peer_status_handler,
4582                               NULL, GNUNET_NO,
4583                               NULL, GNUNET_NO,
4584                               p2p_handlers);
4585   if (NULL == core)
4586     {
4587       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
4588                   _("Failed to connect to `%s' service.\n"),
4589                   "core");
4590       GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
4591       connected_peers = NULL;
4592       GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
4593       query_request_map = NULL;
4594       GNUNET_LOAD_value_free (rt_entry_lifetime);
4595       rt_entry_lifetime = NULL;
4596       GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
4597       requests_by_expiration_heap = NULL;
4598       GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
4599       peer_request_map = NULL;
4600       if (dsh != NULL)
4601         {
4602           GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4603           dsh = NULL;
4604         }
4605       return GNUNET_SYSERR;
4606     }
4607   if (active_from_migration) 
4608     {
4609       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4610                   _("Content migration is enabled, will start to gather data\n"));
4611       consider_migration_gathering ();
4612     }
4613   consider_dht_put_gathering (NULL);
4614   GNUNET_SERVER_disconnect_notify (server, 
4615                                    &handle_client_disconnect,
4616                                    NULL);
4617   GNUNET_assert (GNUNET_OK ==
4618                  GNUNET_CONFIGURATION_get_value_filename (cfg,
4619                                                           "fs",
4620                                                           "TRUST",
4621                                                           &trustDirectory));
4622   GNUNET_DISK_directory_create (trustDirectory);
4623   GNUNET_SCHEDULER_add_with_priority (GNUNET_SCHEDULER_PRIORITY_HIGH,
4624                                       &cron_flush_trust, NULL);
4625
4626
4627   GNUNET_SERVER_add_handlers (server, handlers);
4628   GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
4629                                 &shutdown_task,
4630                                 NULL);
4631   return GNUNET_OK;
4632 }
4633
4634
4635 /**
4636  * Process fs requests.
4637  *
4638  * @param cls closure
4639  * @param server the initialized server
4640  * @param cfg configuration to use
4641  */
4642 static void
4643 run (void *cls,
4644      struct GNUNET_SERVER_Handle *server,
4645      const struct GNUNET_CONFIGURATION_Handle *cfg)
4646 {
4647   active_to_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
4648                                                               "FS",
4649                                                               "CONTENT_CACHING");
4650   active_from_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
4651                                                                 "FS",
4652                                                                 "CONTENT_PUSHING");
4653   dsh = GNUNET_DATASTORE_connect (cfg);
4654   if (dsh == NULL)
4655     {
4656       GNUNET_SCHEDULER_shutdown ();
4657       return;
4658     }
4659   datastore_get_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
4660   datastore_put_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
4661   block_cfg = GNUNET_CONFIGURATION_create ();
4662   GNUNET_CONFIGURATION_set_value_string (block_cfg,
4663                                          "block",
4664                                          "PLUGINS",
4665                                          "fs");
4666   block_ctx = GNUNET_BLOCK_context_create (block_cfg);
4667   GNUNET_assert (NULL != block_ctx);
4668   dht_handle = GNUNET_DHT_connect (cfg,
4669                                    FS_DHT_HT_SIZE);
4670   if ( (GNUNET_OK != GNUNET_FS_indexing_init (cfg, dsh)) ||
4671        (GNUNET_OK != main_init (server, cfg)) )
4672     {    
4673       GNUNET_SCHEDULER_shutdown ();
4674       GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4675       dsh = NULL;
4676       GNUNET_DHT_disconnect (dht_handle);
4677       dht_handle = NULL;
4678       GNUNET_BLOCK_context_destroy (block_ctx);
4679       block_ctx = NULL;
4680       GNUNET_CONFIGURATION_destroy (block_cfg);
4681       block_cfg = NULL;
4682       GNUNET_LOAD_value_free (datastore_get_load);
4683       datastore_get_load = NULL;
4684       GNUNET_LOAD_value_free (datastore_put_load);
4685       datastore_put_load = NULL;
4686       return;   
4687     }
4688 }
4689
4690
4691 /**
4692  * The main function for the fs service.
4693  *
4694  * @param argc number of arguments from the command line
4695  * @param argv command line arguments
4696  * @return 0 ok, 1 on error
4697  */
4698 int
4699 main (int argc, char *const *argv)
4700 {
4701   return (GNUNET_OK ==
4702           GNUNET_SERVICE_run (argc,
4703                               argv,
4704                               "fs",
4705                               GNUNET_SERVICE_OPTION_NONE,
4706                               &run, NULL)) ? 0 : 1;
4707 }
4708
4709 /* end of gnunet-service-fs.c */