cleaner stats
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs.c
23  * @brief gnunet anonymity protocol implementation
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - more statistics
28  */
29 #include "platform.h"
30 #include <float.h>
31 #include "gnunet_constants.h"
32 #include "gnunet_core_service.h"
33 #include "gnunet_dht_service.h"
34 #include "gnunet_datastore_service.h"
35 #include "gnunet_load_lib.h"
36 #include "gnunet_peer_lib.h"
37 #include "gnunet_protocols.h"
38 #include "gnunet_signatures.h"
39 #include "gnunet_statistics_service.h"
40 #include "gnunet_util_lib.h"
41 #include "gnunet-service-fs_indexing.h"
42 #include "fs.h"
43
44 #define DEBUG_FS GNUNET_NO
45
46 /**
47  * Should we introduce random latency in processing?  Required for proper
48  * implementation of GAP, but can be disabled for performance evaluation of
49  * the basic routing algorithm.
50  *
51  * Note that with delays enabled, performance can be significantly lower
52  * (several orders of magnitude in 2-peer test runs); if you want to
53  * measure throughput of other components, set this to NO.  Also, you
54  * might want to consider changing 'RETRY_PROBABILITY_INV' to 1 for
55  * a rather wasteful mode of operation (that might still get the highest
56  * throughput overall).
57  */
58 #define SUPPORT_DELAYS GNUNET_NO
59
60 /**
61  * Currently experimental code...
62  */
63 #define ENABLE_LOAD_MGMT GNUNET_YES
64
65 /**
66  * Size for the hash map for DHT requests from the FS
67  * service.  Should be about the number of concurrent
68  * DHT requests we plan to make.
69  */
70 #define FS_DHT_HT_SIZE 1024
71
72 /**
73  * At what frequency should our datastore load decrease
74  * automatically (since if we don't use it, clearly the
75  * load must be going down).
76  */
77 #define DATASTORE_LOAD_AUTODECLINE GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 250)
78
79 /**
80  * How often do we flush trust values to disk?
81  */
82 #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
83
84 /**
85  * How often do we at most PUT content into the DHT?
86  */
87 #define MAX_DHT_PUT_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
88
89 /**
90  * Inverse of the probability that we will submit the same query
91  * to the same peer again.  If the same peer already got the query
92  * repeatedly recently, the probability is multiplied by the inverse
93  * of this number each time.  Note that we only try about every TTL_DECREMENT/2
94  * plus MAX_CORK_DELAY (so roughly every 3.5s).
95  *
96  * Note that this factor is a key influence to performance in small
97  * networks (especially test networks of 2 peers) because if there is
98  * only a single peer with the data, this value will determine how
99  * soon we might re-try.  For example, a value of 3 can result in 
100  * 1.7 MB/s transfer rates for a 10 MB file when a value of 1 would
101  * give us 5 MB/s.  OTOH, obviously re-trying the same peer can be
102  * rather inefficient in larger networks, hence picking 1 is in 
103  * general not the best choice.
104  */
105 #define RETRY_PROBABILITY_INV 1
106
107 /**
108  * What is the maximum delay for a P2P FS message (in our interaction
109  * with core)?  FS-internal delays are another story.  The value is
110  * chosen based on the 32k block size.  Given that peers typcially
111  * have at least 1 kb/s bandwidth, 45s waits give us a chance to
112  * transmit one message even to the lowest-bandwidth peers.
113  */
114 #define MAX_TRANSMIT_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 45)
115
116 /**
117  * Maximum number of requests (from other peers, overall) that we're
118  * willing to have pending at any given point in time.  Can be changed
119  * via the configuration file (32k is just the default).
120  */
121 static unsigned long long max_pending_requests = (32 * 1024);
122
123
124 /**
125  * Information we keep for each pending reply.  The
126  * actual message follows at the end of this struct.
127  */
128 struct PendingMessage;
129
130 /**
131  * Function called upon completion of a transmission.
132  *
133  * @param cls closure
134  * @param pid ID of receiving peer, 0 on transmission error
135  */
136 typedef void (*TransmissionContinuation)(void * cls, 
137                                          GNUNET_PEER_Id tpid);
138
139
140 /**
141  * Information we keep for each pending message (GET/PUT).  The
142  * actual message follows at the end of this struct.
143  */
144 struct PendingMessage
145 {
146   /**
147    * This is a doubly-linked list of messages to the same peer.
148    */
149   struct PendingMessage *next;
150
151   /**
152    * This is a doubly-linked list of messages to the same peer.
153    */
154   struct PendingMessage *prev;
155
156   /**
157    * Entry in pending message list for this pending message.
158    */ 
159   struct PendingMessageList *pml;  
160
161   /**
162    * Function to call immediately once we have transmitted this
163    * message.
164    */
165   TransmissionContinuation cont;
166
167   /**
168    * Closure for cont.
169    */
170   void *cont_cls;
171
172   /**
173    * Do not transmit this pending message until this deadline.
174    */
175   struct GNUNET_TIME_Absolute delay_until;
176
177   /**
178    * Size of the reply; actual reply message follows
179    * at the end of this struct.
180    */
181   size_t msize;
182   
183   /**
184    * How important is this message for us?
185    */
186   uint32_t priority;
187  
188 };
189
190
191 /**
192  * Information about a peer that we are connected to.
193  * We track data that is useful for determining which
194  * peers should receive our requests.  We also keep
195  * a list of messages to transmit to this peer.
196  */
197 struct ConnectedPeer
198 {
199
200   /**
201    * List of the last clients for which this peer successfully
202    * answered a query.
203    */
204   struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
205
206   /**
207    * List of the last PIDs for which
208    * this peer successfully answered a query;
209    * We use 0 to indicate no successful reply.
210    */
211   GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
212
213   /**
214    * Average delay between sending the peer a request and
215    * getting a reply (only calculated over the requests for
216    * which we actually got a reply).   Calculated
217    * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
218    */ 
219   struct GNUNET_TIME_Relative avg_delay;
220
221   /**
222    * Point in time until which this peer does not want us to migrate content
223    * to it.
224    */
225   struct GNUNET_TIME_Absolute migration_blocked;
226
227   /**
228    * Time until when we blocked this peer from migrating
229    * data to us.
230    */
231   struct GNUNET_TIME_Absolute last_migration_block;
232
233   /**
234    * Transmission times for the last MAX_QUEUE_PER_PEER
235    * requests for this peer.  Used as a ring buffer, current
236    * offset is stored in 'last_request_times_off'.  If the
237    * oldest entry is more recent than the 'avg_delay', we should
238    * not send any more requests right now.
239    */
240   struct GNUNET_TIME_Absolute last_request_times[MAX_QUEUE_PER_PEER];
241
242   /**
243    * Handle for an active request for transmission to this
244    * peer, or NULL.
245    */
246   struct GNUNET_CORE_TransmitHandle *cth;
247
248   /**
249    * Messages (replies, queries, content migration) we would like to
250    * send to this peer in the near future.  Sorted by priority, head.
251    */
252   struct PendingMessage *pending_messages_head;
253
254   /**
255    * Messages (replies, queries, content migration) we would like to
256    * send to this peer in the near future.  Sorted by priority, tail.
257    */
258   struct PendingMessage *pending_messages_tail;
259
260   /**
261    * How long does it typically take for us to transmit a message
262    * to this peer?  (delay between the request being issued and
263    * the callback being invoked).
264    */
265   struct GNUNET_LOAD_Value *transmission_delay;
266
267   /**
268    * Time when the last transmission request was issued.
269    */
270   struct GNUNET_TIME_Absolute last_transmission_request_start;
271
272   /**
273    * ID of delay task for scheduling transmission.
274    */
275   GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task;
276
277   /**
278    * Average priority of successful replies.  Calculated
279    * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
280    */
281   double avg_priority;
282
283   /**
284    * Increase in traffic preference still to be submitted
285    * to the core service for this peer.
286    */
287   uint64_t inc_preference;
288
289   /**
290    * Trust rating for this peer
291    */
292   uint32_t trust;
293
294   /**
295    * Trust rating for this peer on disk.
296    */
297   uint32_t disk_trust;
298
299   /**
300    * The peer's identity.
301    */
302   GNUNET_PEER_Id pid;  
303
304   /**
305    * Size of the linked list of 'pending_messages'.
306    */
307   unsigned int pending_requests;
308
309   /**
310    * Which offset in "last_p2p_replies" will be updated next?
311    * (we go round-robin).
312    */
313   unsigned int last_p2p_replies_woff;
314
315   /**
316    * Which offset in "last_client_replies" will be updated next?
317    * (we go round-robin).
318    */
319   unsigned int last_client_replies_woff;
320
321   /**
322    * Current offset into 'last_request_times' ring buffer.
323    */
324   unsigned int last_request_times_off;
325
326 };
327
328
329 /**
330  * Information we keep for each pending request.  We should try to
331  * keep this struct as small as possible since its memory consumption
332  * is key to how many requests we can have pending at once.
333  */
334 struct PendingRequest;
335
336
337 /**
338  * Doubly-linked list of requests we are performing
339  * on behalf of the same client.
340  */
341 struct ClientRequestList
342 {
343
344   /**
345    * This is a doubly-linked list.
346    */
347   struct ClientRequestList *next;
348
349   /**
350    * This is a doubly-linked list.
351    */
352   struct ClientRequestList *prev;
353
354   /**
355    * Request this entry represents.
356    */
357   struct PendingRequest *req;
358
359   /**
360    * Client list this request belongs to.
361    */
362   struct ClientList *client_list;
363
364 };
365
366
367 /**
368  * Replies to be transmitted to the client.  The actual
369  * response message is allocated after this struct.
370  */
371 struct ClientResponseMessage
372 {
373   /**
374    * This is a doubly-linked list.
375    */
376   struct ClientResponseMessage *next;
377
378   /**
379    * This is a doubly-linked list.
380    */
381   struct ClientResponseMessage *prev;
382
383   /**
384    * Client list entry this response belongs to.
385    */
386   struct ClientList *client_list;
387
388   /**
389    * Number of bytes in the response.
390    */
391   size_t msize;
392 };
393
394
395 /**
396  * Linked list of clients we are performing requests
397  * for right now.
398  */
399 struct ClientList
400 {
401   /**
402    * This is a linked list.
403    */
404   struct ClientList *next;
405
406   /**
407    * ID of a client making a request, NULL if this entry is for a
408    * peer.
409    */
410   struct GNUNET_SERVER_Client *client;
411
412   /**
413    * Head of list of requests performed on behalf
414    * of this client right now.
415    */
416   struct ClientRequestList *rl_head;
417
418   /**
419    * Tail of list of requests performed on behalf
420    * of this client right now.
421    */
422   struct ClientRequestList *rl_tail;
423
424   /**
425    * Head of linked list of responses.
426    */
427   struct ClientResponseMessage *res_head;
428
429   /**
430    * Tail of linked list of responses.
431    */
432   struct ClientResponseMessage *res_tail;
433
434   /**
435    * Context for sending replies.
436    */
437   struct GNUNET_CONNECTION_TransmitHandle *th;
438
439 };
440
441
442 /**
443  * Information about a peer that we have forwarded this
444  * request to already.  
445  */
446 struct UsedTargetEntry
447 {
448   /**
449    * What was the last time we have transmitted this request to this
450    * peer?
451    */
452   struct GNUNET_TIME_Absolute last_request_time;
453
454   /**
455    * How often have we transmitted this request to this peer?
456    */
457   unsigned int num_requests;
458
459   /**
460    * PID of the target peer.
461    */
462   GNUNET_PEER_Id pid;
463
464 };
465
466
467
468
469
470 /**
471  * Doubly-linked list of messages we are performing
472  * due to a pending request.
473  */
474 struct PendingMessageList
475 {
476
477   /**
478    * This is a doubly-linked list of messages on behalf of the same request.
479    */
480   struct PendingMessageList *next;
481
482   /**
483    * This is a doubly-linked list of messages on behalf of the same request.
484    */
485   struct PendingMessageList *prev;
486
487   /**
488    * Message this entry represents.
489    */
490   struct PendingMessage *pm;
491
492   /**
493    * Request this entry belongs to.
494    */
495   struct PendingRequest *req;
496
497   /**
498    * Peer this message is targeted for.
499    */
500   struct ConnectedPeer *target;
501
502 };
503
504
505 /**
506  * Information we keep for each pending request.  We should try to
507  * keep this struct as small as possible since its memory consumption
508  * is key to how many requests we can have pending at once.
509  */
510 struct PendingRequest
511 {
512
513   /**
514    * If this request was made by a client, this is our entry in the
515    * client request list; otherwise NULL.
516    */
517   struct ClientRequestList *client_request_list;
518
519   /**
520    * Entry of peer responsible for this entry (if this request
521    * was made by a peer).
522    */
523   struct ConnectedPeer *cp;
524
525   /**
526    * If this is a namespace query, pointer to the hash of the public
527    * key of the namespace; otherwise NULL.  Pointer will be to the 
528    * end of this struct (so no need to free it).
529    */
530   const GNUNET_HashCode *namespace;
531
532   /**
533    * Bloomfilter we use to filter out replies that we don't care about
534    * (anymore).  NULL as long as we are interested in all replies.
535    */
536   struct GNUNET_CONTAINER_BloomFilter *bf;
537
538   /**
539    * Context of our GNUNET_CORE_peer_change_preference call.
540    */
541   struct GNUNET_CORE_InformationRequestContext *irc;
542
543   /**
544    * Reference to DHT get operation for this request (or NULL).
545    */
546   struct GNUNET_DHT_GetHandle *dht_get;
547
548   /**
549    * Hash code of all replies that we have seen so far (only valid
550    * if client is not NULL since we only track replies like this for
551    * our own clients).
552    */
553   GNUNET_HashCode *replies_seen;
554
555   /**
556    * Node in the heap representing this entry; NULL
557    * if we have no heap node.
558    */
559   struct GNUNET_CONTAINER_HeapNode *hnode;
560
561   /**
562    * Head of list of messages being performed on behalf of this
563    * request.
564    */
565   struct PendingMessageList *pending_head;
566
567   /**
568    * Tail of list of messages being performed on behalf of this
569    * request.
570    */
571   struct PendingMessageList *pending_tail;
572
573   /**
574    * When did we first see this request (form this peer), or, if our
575    * client is initiating, when did we last initiate a search?
576    */
577   struct GNUNET_TIME_Absolute start_time;
578
579   /**
580    * The query that this request is for.
581    */
582   GNUNET_HashCode query;
583
584   /**
585    * The task responsible for transmitting queries
586    * for this request.
587    */
588   GNUNET_SCHEDULER_TaskIdentifier task;
589
590   /**
591    * (Interned) Peer identifier that identifies a preferred target
592    * for requests.
593    */
594   GNUNET_PEER_Id target_pid;
595
596   /**
597    * (Interned) Peer identifiers of peers that have already
598    * received our query for this content.
599    */
600   struct UsedTargetEntry *used_targets;
601   
602   /**
603    * Our entry in the queue (non-NULL while we wait for our
604    * turn to interact with the local database).
605    */
606   struct GNUNET_DATASTORE_QueueEntry *qe;
607
608   /**
609    * Size of the 'bf' (in bytes).
610    */
611   size_t bf_size;
612
613   /**
614    * Desired anonymity level; only valid for requests from a local client.
615    */
616   uint32_t anonymity_level;
617
618   /**
619    * How many entries in "used_targets" are actually valid?
620    */
621   unsigned int used_targets_off;
622
623   /**
624    * How long is the "used_targets" array?
625    */
626   unsigned int used_targets_size;
627
628   /**
629    * Number of results found for this request.
630    */
631   unsigned int results_found;
632
633   /**
634    * How many entries in "replies_seen" are actually valid?
635    */
636   unsigned int replies_seen_off;
637
638   /**
639    * How long is the "replies_seen" array?
640    */
641   unsigned int replies_seen_size;
642   
643   /**
644    * Priority with which this request was made.  If one of our clients
645    * made the request, then this is the current priority that we are
646    * using when initiating the request.  This value is used when
647    * we decide to reward other peers with trust for providing a reply.
648    */
649   uint32_t priority;
650
651   /**
652    * Priority points left for us to spend when forwarding this request
653    * to other peers.
654    */
655   uint32_t remaining_priority;
656
657   /**
658    * Number to mingle hashes for bloom-filter tests with.
659    */
660   int32_t mingle;
661
662   /**
663    * TTL with which we saw this request (or, if we initiated, TTL that
664    * we used for the request).
665    */
666   int32_t ttl;
667   
668   /**
669    * Type of the content that this request is for.
670    */
671   enum GNUNET_BLOCK_Type type;
672
673   /**
674    * Remove this request after transmission of the current response.
675    */
676   int8_t do_remove;
677
678   /**
679    * GNUNET_YES if we should not forward this request to other peers.
680    */
681   int8_t local_only;
682
683   /**
684    * GNUNET_YES if we should not forward this request to other peers.
685    */
686   int8_t forward_only;
687
688 };
689
690
691 /**
692  * Block that is ready for migration to other peers.  Actual data is at the end of the block.
693  */
694 struct MigrationReadyBlock
695 {
696
697   /**
698    * This is a doubly-linked list.
699    */
700   struct MigrationReadyBlock *next;
701
702   /**
703    * This is a doubly-linked list.
704    */
705   struct MigrationReadyBlock *prev;
706
707   /**
708    * Query for the block.
709    */
710   GNUNET_HashCode query;
711
712   /**
713    * When does this block expire? 
714    */
715   struct GNUNET_TIME_Absolute expiration;
716
717   /**
718    * Peers we would consider forwarding this
719    * block to.  Zero for empty entries.
720    */
721   GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
722
723   /**
724    * Size of the block.
725    */
726   size_t size;
727
728   /**
729    *  Number of targets already used.
730    */
731   unsigned int used_targets;
732
733   /**
734    * Type of the block.
735    */
736   enum GNUNET_BLOCK_Type type;
737 };
738
739
740 /**
741  * Our connection to the datastore.
742  */
743 static struct GNUNET_DATASTORE_Handle *dsh;
744
745 /**
746  * Our block context.
747  */
748 static struct GNUNET_BLOCK_Context *block_ctx;
749
750 /**
751  * Our block configuration.
752  */
753 static struct GNUNET_CONFIGURATION_Handle *block_cfg;
754
755 /**
756  * Our scheduler.
757  */
758 static struct GNUNET_SCHEDULER_Handle *sched;
759
760 /**
761  * Our configuration.
762  */
763 static const struct GNUNET_CONFIGURATION_Handle *cfg;
764
765 /**
766  * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
767  */
768 static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
769
770 /**
771  * Map of peer identifiers to "struct PendingRequest" (for that peer).
772  */
773 static struct GNUNET_CONTAINER_MultiHashMap *peer_request_map;
774
775 /**
776  * Map of query identifiers to "struct PendingRequest" (for that query).
777  */
778 static struct GNUNET_CONTAINER_MultiHashMap *query_request_map;
779
780 /**
781  * Heap with the request that will expire next at the top.  Contains
782  * pointers of type "struct PendingRequest*"; these will *also* be
783  * aliased from the "requests_by_peer" data structures and the
784  * "requests_by_query" table.  Note that requests from our clients
785  * don't expire and are thus NOT in the "requests_by_expiration"
786  * (or the "requests_by_peer" tables).
787  */
788 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
789
790 /**
791  * Handle for reporting statistics.
792  */
793 static struct GNUNET_STATISTICS_Handle *stats;
794
795 /**
796  * Linked list of clients we are currently processing requests for.
797  */
798 static struct ClientList *client_list;
799
800 /**
801  * Pointer to handle to the core service (points to NULL until we've
802  * connected to it).
803  */
804 static struct GNUNET_CORE_Handle *core;
805
806 /**
807  * Head of linked list of blocks that can be migrated.
808  */
809 static struct MigrationReadyBlock *mig_head;
810
811 /**
812  * Tail of linked list of blocks that can be migrated.
813  */
814 static struct MigrationReadyBlock *mig_tail;
815
816 /**
817  * Request to datastore for migration (or NULL).
818  */
819 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
820
821 /**
822  * Request to datastore for DHT PUTs (or NULL).
823  */
824 static struct GNUNET_DATASTORE_QueueEntry *dht_qe;
825
826 /**
827  * Type we will request for the next DHT PUT round from the datastore.
828  */
829 static enum GNUNET_BLOCK_Type dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
830
831 /**
832  * Where do we store trust information?
833  */
834 static char *trustDirectory;
835
836 /**
837  * ID of task that collects blocks for migration.
838  */
839 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
840
841 /**
842  * ID of task that collects blocks for DHT PUTs.
843  */
844 static GNUNET_SCHEDULER_TaskIdentifier dht_task;
845
846 /**
847  * What is the maximum frequency at which we are allowed to
848  * poll the datastore for migration content?
849  */
850 static struct GNUNET_TIME_Relative min_migration_delay;
851
852 /**
853  * Handle for DHT operations.
854  */
855 static struct GNUNET_DHT_Handle *dht_handle;
856
857 /**
858  * Size of the doubly-linked list of migration blocks.
859  */
860 static unsigned int mig_size;
861
862 /**
863  * Are we allowed to migrate content to this peer.
864  */
865 static int active_migration;
866
867 /**
868  * How many entires with zero anonymity do we currently estimate
869  * to have in the database?
870  */
871 static unsigned int zero_anonymity_count_estimate;
872
873 /**
874  * Typical priorities we're seeing from other peers right now.  Since
875  * most priorities will be zero, this value is the weighted average of
876  * non-zero priorities seen "recently".  In order to ensure that new
877  * values do not dramatically change the ratio, values are first
878  * "capped" to a reasonable range (+N of the current value) and then
879  * averaged into the existing value by a ratio of 1:N.  Hence
880  * receiving the largest possible priority can still only raise our
881  * "current_priorities" by at most 1.
882  */
883 static double current_priorities;
884
885 /**
886  * Datastore 'GET' load tracking.
887  */
888 static struct GNUNET_LOAD_Value *datastore_get_load;
889
890 /**
891  * Datastore 'PUT' load tracking.
892  */
893 static struct GNUNET_LOAD_Value *datastore_put_load;
894
895 /**
896  * How long do requests typically stay in the routing table?
897  */
898 static struct GNUNET_LOAD_Value *rt_entry_lifetime;
899
900 /**
901  * We've just now completed a datastore request.  Update our
902  * datastore load calculations.
903  *
904  * @param start time when the datastore request was issued
905  */
906 static void
907 update_datastore_delays (struct GNUNET_TIME_Absolute start)
908 {
909   struct GNUNET_TIME_Relative delay;
910
911   delay = GNUNET_TIME_absolute_get_duration (start);
912   GNUNET_LOAD_update (datastore_get_load,
913                       delay.value);
914 }
915
916
917 /**
918  * Get the filename under which we would store the GNUNET_HELLO_Message
919  * for the given host and protocol.
920  * @return filename of the form DIRECTORY/HOSTID
921  */
922 static char *
923 get_trust_filename (const struct GNUNET_PeerIdentity *id)
924 {
925   struct GNUNET_CRYPTO_HashAsciiEncoded fil;
926   char *fn;
927
928   GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
929   GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil);
930   return fn;
931 }
932
933
934
935 /**
936  * Transmit messages by copying it to the target buffer
937  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
938  * for writing in the meantime.  In that case, do nothing
939  * (the disconnect or shutdown handler will take care of the rest).
940  * If we were able to transmit messages and there are still more
941  * pending, ask core again for further calls to this function.
942  *
943  * @param cls closure, pointer to the 'struct ConnectedPeer*'
944  * @param size number of bytes available in buf
945  * @param buf where the callee should write the message
946  * @return number of bytes written to buf
947  */
948 static size_t
949 transmit_to_peer (void *cls,
950                   size_t size, void *buf);
951
952
953 /* ******************* clean up functions ************************ */
954
955 /**
956  * Delete the given migration block.
957  *
958  * @param mb block to delete
959  */
960 static void
961 delete_migration_block (struct MigrationReadyBlock *mb)
962 {
963   GNUNET_CONTAINER_DLL_remove (mig_head,
964                                mig_tail,
965                                mb);
966   GNUNET_PEER_decrement_rcs (mb->target_list,
967                              MIGRATION_LIST_SIZE);
968   mig_size--;
969   GNUNET_free (mb);
970 }
971
972
973 /**
974  * Compare the distance of two peers to a key.
975  *
976  * @param key key
977  * @param p1 first peer
978  * @param p2 second peer
979  * @return GNUNET_YES if P1 is closer to key than P2
980  */
981 static int
982 is_closer (const GNUNET_HashCode *key,
983            const struct GNUNET_PeerIdentity *p1,
984            const struct GNUNET_PeerIdentity *p2)
985 {
986   return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey,
987                                     &p2->hashPubKey,
988                                     key);
989 }
990
991
992 /**
993  * Consider migrating content to a given peer.
994  *
995  * @param cls 'struct MigrationReadyBlock*' to select
996  *            targets for (or NULL for none)
997  * @param key ID of the peer 
998  * @param value 'struct ConnectedPeer' of the peer
999  * @return GNUNET_YES (always continue iteration)
1000  */
1001 static int
1002 consider_migration (void *cls,
1003                     const GNUNET_HashCode *key,
1004                     void *value)
1005 {
1006   struct MigrationReadyBlock *mb = cls;
1007   struct ConnectedPeer *cp = value;
1008   struct MigrationReadyBlock *pos;
1009   struct GNUNET_PeerIdentity cppid;
1010   struct GNUNET_PeerIdentity otherpid;
1011   struct GNUNET_PeerIdentity worstpid;
1012   size_t msize;
1013   unsigned int i;
1014   unsigned int repl;
1015   
1016   /* consider 'cp' as a migration target for mb */
1017   if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).value > 0)
1018     return GNUNET_YES; /* peer has requested no migration! */
1019   if (mb != NULL)
1020     {
1021       GNUNET_PEER_resolve (cp->pid,
1022                            &cppid);
1023       repl = MIGRATION_LIST_SIZE;
1024       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1025         {
1026           if (mb->target_list[i] == 0)
1027             {
1028               mb->target_list[i] = cp->pid;
1029               GNUNET_PEER_change_rc (mb->target_list[i], 1);
1030               repl = MIGRATION_LIST_SIZE;
1031               break;
1032             }
1033           GNUNET_PEER_resolve (mb->target_list[i],
1034                                &otherpid);
1035           if ( (repl == MIGRATION_LIST_SIZE) &&
1036                is_closer (&mb->query,
1037                           &cppid,
1038                           &otherpid)) 
1039             {
1040               repl = i;
1041               worstpid = otherpid;
1042             }
1043           else if ( (repl != MIGRATION_LIST_SIZE) &&
1044                     (is_closer (&mb->query,
1045                                 &worstpid,
1046                                 &otherpid) ) )
1047             {
1048               repl = i;
1049               worstpid = otherpid;
1050             }       
1051         }
1052       if (repl != MIGRATION_LIST_SIZE) 
1053         {
1054           GNUNET_PEER_change_rc (mb->target_list[repl], -1);
1055           mb->target_list[repl] = cp->pid;
1056           GNUNET_PEER_change_rc (mb->target_list[repl], 1);
1057         }
1058     }
1059
1060   /* consider scheduling transmission to cp for content migration */
1061   if (cp->cth != NULL)        
1062     return GNUNET_YES; 
1063   msize = 0;
1064   pos = mig_head;
1065   while (pos != NULL)
1066     {
1067       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1068         {
1069           if (cp->pid == pos->target_list[i])
1070             {
1071               if (msize == 0)
1072                 msize = pos->size;
1073               else
1074                 msize = GNUNET_MIN (msize,
1075                                     pos->size);
1076               break;
1077             }
1078         }
1079       pos = pos->next;
1080     }
1081   if (msize == 0)
1082     return GNUNET_YES; /* no content available */
1083 #if DEBUG_FS
1084   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1085               "Trying to migrate at least %u bytes to peer `%s'\n",
1086               msize,
1087               GNUNET_h2s (key));
1088 #endif
1089   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
1090     {
1091       GNUNET_SCHEDULER_cancel (sched, cp->delayed_transmission_request_task);
1092       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
1093     }
1094   cp->cth 
1095     = GNUNET_CORE_notify_transmit_ready (core,
1096                                          0, GNUNET_TIME_UNIT_FOREVER_REL,
1097                                          (const struct GNUNET_PeerIdentity*) key,
1098                                          msize + sizeof (struct PutMessage),
1099                                          &transmit_to_peer,
1100                                          cp);
1101   return GNUNET_YES;
1102 }
1103
1104
1105 /**
1106  * Task that is run periodically to obtain blocks for content
1107  * migration
1108  * 
1109  * @param cls unused
1110  * @param tc scheduler context (also unused)
1111  */
1112 static void
1113 gather_migration_blocks (void *cls,
1114                          const struct GNUNET_SCHEDULER_TaskContext *tc);
1115
1116
1117
1118
1119 /**
1120  * Task that is run periodically to obtain blocks for DHT PUTs.
1121  * 
1122  * @param cls type of blocks to gather
1123  * @param tc scheduler context (unused)
1124  */
1125 static void
1126 gather_dht_put_blocks (void *cls,
1127                        const struct GNUNET_SCHEDULER_TaskContext *tc);
1128
1129
1130 /**
1131  * If the migration task is not currently running, consider
1132  * (re)scheduling it with the appropriate delay.
1133  */
1134 static void
1135 consider_migration_gathering ()
1136 {
1137   struct GNUNET_TIME_Relative delay;
1138
1139   if (dsh == NULL)
1140     return;
1141   if (mig_qe != NULL)
1142     return;
1143   if (mig_task != GNUNET_SCHEDULER_NO_TASK)
1144     return;
1145   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
1146                                          mig_size);
1147   delay = GNUNET_TIME_relative_divide (delay,
1148                                        MAX_MIGRATION_QUEUE);
1149   delay = GNUNET_TIME_relative_max (delay,
1150                                     min_migration_delay);
1151   mig_task = GNUNET_SCHEDULER_add_delayed (sched,
1152                                            delay,
1153                                            &gather_migration_blocks,
1154                                            NULL);
1155 }
1156
1157
1158 /**
1159  * If the DHT PUT gathering task is not currently running, consider
1160  * (re)scheduling it with the appropriate delay.
1161  */
1162 static void
1163 consider_dht_put_gathering (void *cls)
1164 {
1165   struct GNUNET_TIME_Relative delay;
1166
1167   if (dsh == NULL)
1168     return;
1169   if (dht_qe != NULL)
1170     return;
1171   if (dht_task != GNUNET_SCHEDULER_NO_TASK)
1172     return;
1173   if (zero_anonymity_count_estimate > 0)
1174     {
1175       delay = GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY,
1176                                            zero_anonymity_count_estimate);
1177       delay = GNUNET_TIME_relative_min (delay,
1178                                         MAX_DHT_PUT_FREQ);
1179     }
1180   else
1181     {
1182       /* if we have NO zero-anonymity content yet, wait 5 minutes for some to
1183          (hopefully) appear */
1184       delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5);
1185     }
1186   dht_task = GNUNET_SCHEDULER_add_delayed (sched,
1187                                            delay,
1188                                            &gather_dht_put_blocks,
1189                                            cls);
1190 }
1191
1192
1193 /**
1194  * Process content offered for migration.
1195  *
1196  * @param cls closure
1197  * @param key key for the content
1198  * @param size number of bytes in data
1199  * @param data content stored
1200  * @param type type of the content
1201  * @param priority priority of the content
1202  * @param anonymity anonymity-level for the content
1203  * @param expiration expiration time for the content
1204  * @param uid unique identifier for the datum;
1205  *        maybe 0 if no unique identifier is available
1206  */
1207 static void
1208 process_migration_content (void *cls,
1209                            const GNUNET_HashCode * key,
1210                            size_t size,
1211                            const void *data,
1212                            enum GNUNET_BLOCK_Type type,
1213                            uint32_t priority,
1214                            uint32_t anonymity,
1215                            struct GNUNET_TIME_Absolute
1216                            expiration, uint64_t uid)
1217 {
1218   struct MigrationReadyBlock *mb;
1219   
1220   if (key == NULL)
1221     {
1222       mig_qe = NULL;
1223       if (mig_size < MAX_MIGRATION_QUEUE)  
1224         consider_migration_gathering ();
1225       return;
1226     }
1227   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1228     {
1229       if (GNUNET_OK !=
1230           GNUNET_FS_handle_on_demand_block (key, size, data,
1231                                             type, priority, anonymity,
1232                                             expiration, uid, 
1233                                             &process_migration_content,
1234                                             NULL))
1235         {
1236           GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1237         }
1238       return;
1239     }
1240 #if DEBUG_FS
1241   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1242               "Retrieved block `%s' of type %u for migration\n",
1243               GNUNET_h2s (key),
1244               type);
1245 #endif
1246   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
1247   mb->query = *key;
1248   mb->expiration = expiration;
1249   mb->size = size;
1250   mb->type = type;
1251   memcpy (&mb[1], data, size);
1252   GNUNET_CONTAINER_DLL_insert_after (mig_head,
1253                                      mig_tail,
1254                                      mig_tail,
1255                                      mb);
1256   mig_size++;
1257   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1258                                          &consider_migration,
1259                                          mb);
1260   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1261 }
1262
1263
1264 /**
1265  * Function called upon completion of the DHT PUT operation.
1266  */
1267 static void
1268 dht_put_continuation (void *cls,
1269                       const struct GNUNET_SCHEDULER_TaskContext *tc)
1270 {
1271   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1272 }
1273
1274
1275 /**
1276  * Store content in DHT.
1277  *
1278  * @param cls closure
1279  * @param key key for the content
1280  * @param size number of bytes in data
1281  * @param data content stored
1282  * @param type type of the content
1283  * @param priority priority of the content
1284  * @param anonymity anonymity-level for the content
1285  * @param expiration expiration time for the content
1286  * @param uid unique identifier for the datum;
1287  *        maybe 0 if no unique identifier is available
1288  */
1289 static void
1290 process_dht_put_content (void *cls,
1291                          const GNUNET_HashCode * key,
1292                          size_t size,
1293                          const void *data,
1294                          enum GNUNET_BLOCK_Type type,
1295                          uint32_t priority,
1296                          uint32_t anonymity,
1297                          struct GNUNET_TIME_Absolute
1298                          expiration, uint64_t uid)
1299
1300   static unsigned int counter;
1301   static GNUNET_HashCode last_vhash;
1302   static GNUNET_HashCode vhash;
1303
1304   if (key == NULL)
1305     {
1306       dht_qe = NULL;
1307       consider_dht_put_gathering (cls);
1308       return;
1309     }
1310   /* slightly funky code to estimate the total number of values with zero
1311      anonymity from the maximum observed length of a monotonically increasing 
1312      sequence of hashes over the contents */
1313   GNUNET_CRYPTO_hash (data, size, &vhash);
1314   if (GNUNET_CRYPTO_hash_cmp (&vhash, &last_vhash) <= 0)
1315     {
1316       if (zero_anonymity_count_estimate > 0)
1317         zero_anonymity_count_estimate /= 2;
1318       counter = 0;
1319     }
1320   last_vhash = vhash;
1321   if (counter < 31)
1322     counter++;
1323   if (zero_anonymity_count_estimate < (1 << counter))
1324     zero_anonymity_count_estimate = (1 << counter);
1325 #if DEBUG_FS
1326   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1327               "Retrieved block `%s' of type %u for DHT PUT\n",
1328               GNUNET_h2s (key),
1329               type);
1330 #endif
1331   GNUNET_DHT_put (dht_handle,
1332                   key,
1333                   GNUNET_DHT_RO_NONE,
1334                   type,
1335                   size,
1336                   data,
1337                   expiration,
1338                   GNUNET_TIME_UNIT_FOREVER_REL,
1339                   &dht_put_continuation,
1340                   cls);
1341 }
1342
1343
1344 /**
1345  * Task that is run periodically to obtain blocks for content
1346  * migration
1347  * 
1348  * @param cls unused
1349  * @param tc scheduler context (also unused)
1350  */
1351 static void
1352 gather_migration_blocks (void *cls,
1353                          const struct GNUNET_SCHEDULER_TaskContext *tc)
1354 {
1355   mig_task = GNUNET_SCHEDULER_NO_TASK;
1356   if (dsh != NULL)
1357     {
1358       mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, UINT_MAX,
1359                                             GNUNET_TIME_UNIT_FOREVER_REL,
1360                                             &process_migration_content, NULL);
1361       GNUNET_assert (mig_qe != NULL);
1362     }
1363 }
1364
1365
1366 /**
1367  * Task that is run periodically to obtain blocks for DHT PUTs.
1368  * 
1369  * @param cls type of blocks to gather
1370  * @param tc scheduler context (unused)
1371  */
1372 static void
1373 gather_dht_put_blocks (void *cls,
1374                        const struct GNUNET_SCHEDULER_TaskContext *tc)
1375 {
1376   dht_task = GNUNET_SCHEDULER_NO_TASK;
1377   if (dsh != NULL)
1378     {
1379       if (dht_put_type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1380         dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
1381       dht_qe = GNUNET_DATASTORE_get_zero_anonymity (dsh, 0, UINT_MAX,
1382                                                     GNUNET_TIME_UNIT_FOREVER_REL,
1383                                                     dht_put_type++,
1384                                                     &process_dht_put_content, NULL);
1385       GNUNET_assert (dht_qe != NULL);
1386     }
1387 }
1388
1389
1390 /**
1391  * We're done with a particular message list entry.
1392  * Free all associated resources.
1393  * 
1394  * @param pml entry to destroy
1395  */
1396 static void
1397 destroy_pending_message_list_entry (struct PendingMessageList *pml)
1398 {
1399   GNUNET_CONTAINER_DLL_remove (pml->req->pending_head,
1400                                pml->req->pending_tail,
1401                                pml);
1402   GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head,
1403                                pml->target->pending_messages_tail,
1404                                pml->pm);
1405   pml->target->pending_requests--;
1406   GNUNET_free (pml->pm);
1407   GNUNET_free (pml);
1408 }
1409
1410
1411 /**
1412  * Destroy the given pending message (and call the respective
1413  * continuation).
1414  *
1415  * @param pm message to destroy
1416  * @param tpid id of peer that the message was delivered to, or 0 for none
1417  */
1418 static void
1419 destroy_pending_message (struct PendingMessage *pm,
1420                          GNUNET_PEER_Id tpid)
1421 {
1422   struct PendingMessageList *pml = pm->pml;
1423   TransmissionContinuation cont;
1424   void *cont_cls;
1425
1426   cont = pm->cont;
1427   cont_cls = pm->cont_cls;
1428   if (pml != NULL)
1429     {
1430       GNUNET_assert (pml->pm == pm);
1431       GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
1432       destroy_pending_message_list_entry (pml);
1433     }
1434   else
1435     {
1436       GNUNET_free (pm);
1437     }
1438   if (cont != NULL)
1439     cont (cont_cls, tpid);  
1440 }
1441
1442
1443 /**
1444  * We're done processing a particular request.
1445  * Free all associated resources.
1446  *
1447  * @param pr request to destroy
1448  */
1449 static void
1450 destroy_pending_request (struct PendingRequest *pr)
1451 {
1452   struct GNUNET_PeerIdentity pid;
1453   unsigned int i;
1454
1455   if (pr->hnode != NULL)
1456     {
1457       GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
1458                                          pr->hnode);
1459       pr->hnode = NULL;
1460     }
1461   if (NULL == pr->client_request_list)
1462     {
1463       GNUNET_STATISTICS_update (stats,
1464                                 gettext_noop ("# P2P searches active"),
1465                                 -1,
1466                                 GNUNET_NO);
1467     }
1468   else
1469     {
1470       GNUNET_STATISTICS_update (stats,
1471                                 gettext_noop ("# client searches active"),
1472                                 -1,
1473                                 GNUNET_NO);
1474     }
1475   if (GNUNET_YES == 
1476       GNUNET_CONTAINER_multihashmap_remove (query_request_map,
1477                                             &pr->query,
1478                                             pr))
1479     {
1480       GNUNET_LOAD_update (rt_entry_lifetime,
1481                           GNUNET_TIME_absolute_get_duration (pr->start_time).value);
1482     }
1483   if (pr->qe != NULL)
1484      {
1485       GNUNET_DATASTORE_cancel (pr->qe);
1486       pr->qe = NULL;
1487     }
1488   if (pr->dht_get != NULL)
1489     {
1490       GNUNET_DHT_get_stop (pr->dht_get);
1491       pr->dht_get = NULL;
1492     }
1493   if (pr->client_request_list != NULL)
1494     {
1495       GNUNET_CONTAINER_DLL_remove (pr->client_request_list->client_list->rl_head,
1496                                    pr->client_request_list->client_list->rl_tail,
1497                                    pr->client_request_list);
1498       GNUNET_free (pr->client_request_list);
1499       pr->client_request_list = NULL;
1500     }
1501   if (pr->cp != NULL)
1502     {
1503       GNUNET_PEER_resolve (pr->cp->pid,
1504                            &pid);
1505       (void) GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1506                                                    &pid.hashPubKey,
1507                                                    pr);
1508       pr->cp = NULL;
1509     }
1510   if (pr->bf != NULL)
1511     {
1512       GNUNET_CONTAINER_bloomfilter_free (pr->bf);                                        
1513       pr->bf = NULL;
1514     }
1515   if (pr->irc != NULL)
1516     {
1517       GNUNET_CORE_peer_change_preference_cancel (pr->irc);
1518       pr->irc = NULL;
1519     }
1520   if (pr->replies_seen != NULL)
1521     {
1522       GNUNET_free (pr->replies_seen);
1523       pr->replies_seen = NULL;
1524     }
1525   if (pr->task != GNUNET_SCHEDULER_NO_TASK)
1526     {
1527       GNUNET_SCHEDULER_cancel (sched,
1528                                pr->task);
1529       pr->task = GNUNET_SCHEDULER_NO_TASK;
1530     }
1531   while (NULL != pr->pending_head)    
1532     destroy_pending_message_list_entry (pr->pending_head);
1533   GNUNET_PEER_change_rc (pr->target_pid, -1);
1534   if (pr->used_targets != NULL)
1535     {
1536       for (i=0;i<pr->used_targets_off;i++)
1537         GNUNET_PEER_change_rc (pr->used_targets[i].pid, -1);
1538       GNUNET_free (pr->used_targets);
1539       pr->used_targets_off = 0;
1540       pr->used_targets_size = 0;
1541       pr->used_targets = NULL;
1542     }
1543   GNUNET_free (pr);
1544 }
1545
1546
1547 /**
1548  * Method called whenever a given peer connects.
1549  *
1550  * @param cls closure, not used
1551  * @param peer peer identity this notification is about
1552  * @param latency reported latency of the connection with 'other'
1553  * @param distance reported distance (DV) to 'other' 
1554  */
1555 static void 
1556 peer_connect_handler (void *cls,
1557                       const struct
1558                       GNUNET_PeerIdentity * peer,
1559                       struct GNUNET_TIME_Relative latency,
1560                       uint32_t distance)
1561 {
1562   struct ConnectedPeer *cp;
1563   struct MigrationReadyBlock *pos;
1564   char *fn;
1565   uint32_t trust;
1566   
1567   cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
1568   cp->transmission_delay = GNUNET_LOAD_value_init (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
1569   cp->pid = GNUNET_PEER_intern (peer);
1570
1571   fn = get_trust_filename (peer);
1572   if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
1573       (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
1574     cp->disk_trust = cp->trust = ntohl (trust);
1575   GNUNET_free (fn);
1576
1577   GNUNET_break (GNUNET_OK ==
1578                 GNUNET_CONTAINER_multihashmap_put (connected_peers,
1579                                                    &peer->hashPubKey,
1580                                                    cp,
1581                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1582
1583   pos = mig_head;
1584   while (NULL != pos)
1585     {
1586       (void) consider_migration (pos, &peer->hashPubKey, cp);
1587       pos = pos->next;
1588     }
1589 }
1590
1591
1592 /**
1593  * Increase the host credit by a value.
1594  *
1595  * @param host which peer to change the trust value on
1596  * @param value is the int value by which the
1597  *  host credit is to be increased or decreased
1598  * @returns the actual change in trust (positive or negative)
1599  */
1600 static int
1601 change_host_trust (struct ConnectedPeer *host, int value)
1602 {
1603   unsigned int old_trust;
1604
1605   if (value == 0)
1606     return 0;
1607   GNUNET_assert (host != NULL);
1608   old_trust = host->trust;
1609   if (value > 0)
1610     {
1611       if (host->trust + value < host->trust)
1612         {
1613           value = UINT32_MAX - host->trust;
1614           host->trust = UINT32_MAX;
1615         }
1616       else
1617         host->trust += value;
1618     }
1619   else
1620     {
1621       if (host->trust < -value)
1622         {
1623           value = -host->trust;
1624           host->trust = 0;
1625         }
1626       else
1627         host->trust += value;
1628     }
1629   return value;
1630 }
1631
1632
1633 /**
1634  * Write host-trust information to a file - flush the buffer entry!
1635  */
1636 static int
1637 flush_trust (void *cls,
1638              const GNUNET_HashCode *key,
1639              void *value)
1640 {
1641   struct ConnectedPeer *host = value;
1642   char *fn;
1643   uint32_t trust;
1644   struct GNUNET_PeerIdentity pid;
1645
1646   if (host->trust == host->disk_trust)
1647     return GNUNET_OK;                     /* unchanged */
1648   GNUNET_PEER_resolve (host->pid,
1649                        &pid);
1650   fn = get_trust_filename (&pid);
1651   if (host->trust == 0)
1652     {
1653       if ((0 != UNLINK (fn)) && (errno != ENOENT))
1654         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
1655                                   GNUNET_ERROR_TYPE_BULK, "unlink", fn);
1656     }
1657   else
1658     {
1659       trust = htonl (host->trust);
1660       if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust, 
1661                                                     sizeof(uint32_t),
1662                                                     GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE
1663                                                     | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ))
1664         host->disk_trust = host->trust;
1665     }
1666   GNUNET_free (fn);
1667   return GNUNET_OK;
1668 }
1669
1670 /**
1671  * Call this method periodically to scan data/hosts for new hosts.
1672  */
1673 static void
1674 cron_flush_trust (void *cls,
1675                   const struct GNUNET_SCHEDULER_TaskContext *tc)
1676 {
1677
1678   if (NULL == connected_peers)
1679     return;
1680   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1681                                          &flush_trust,
1682                                          NULL);
1683   if (NULL == tc)
1684     return;
1685   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1686     return;
1687   GNUNET_SCHEDULER_add_delayed (tc->sched,
1688                                 TRUST_FLUSH_FREQ, &cron_flush_trust, NULL);
1689 }
1690
1691
1692 /**
1693  * Free (each) request made by the peer.
1694  *
1695  * @param cls closure, points to peer that the request belongs to
1696  * @param key current key code
1697  * @param value value in the hash map
1698  * @return GNUNET_YES (we should continue to iterate)
1699  */
1700 static int
1701 destroy_request (void *cls,
1702                  const GNUNET_HashCode * key,
1703                  void *value)
1704 {
1705   const struct GNUNET_PeerIdentity * peer = cls;
1706   struct PendingRequest *pr = value;
1707   
1708   GNUNET_break (GNUNET_YES ==
1709                 GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1710                                                       &peer->hashPubKey,
1711                                                       pr));
1712   destroy_pending_request (pr);
1713   return GNUNET_YES;
1714 }
1715
1716
1717 /**
1718  * Method called whenever a peer disconnects.
1719  *
1720  * @param cls closure, not used
1721  * @param peer peer identity this notification is about
1722  */
1723 static void
1724 peer_disconnect_handler (void *cls,
1725                          const struct
1726                          GNUNET_PeerIdentity * peer)
1727 {
1728   struct ConnectedPeer *cp;
1729   struct PendingMessage *pm;
1730   unsigned int i;
1731   struct MigrationReadyBlock *pos;
1732   struct MigrationReadyBlock *next;
1733
1734   GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map,
1735                                               &peer->hashPubKey,
1736                                               &destroy_request,
1737                                               (void*) peer);
1738   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1739                                           &peer->hashPubKey);
1740   if (cp == NULL)
1741     return;
1742   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1743     {
1744       if (NULL != cp->last_client_replies[i])
1745         {
1746           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1747           cp->last_client_replies[i] = NULL;
1748         }
1749     }
1750   GNUNET_break (GNUNET_YES ==
1751                 GNUNET_CONTAINER_multihashmap_remove (connected_peers,
1752                                                       &peer->hashPubKey,
1753                                                       cp));
1754   /* remove this peer from migration considerations; schedule
1755      alternatives */
1756   next = mig_head;
1757   while (NULL != (pos = next))
1758     {
1759       next = pos->next;
1760       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1761         {
1762           if (pos->target_list[i] == cp->pid)
1763             {
1764               GNUNET_PEER_change_rc (pos->target_list[i], -1);
1765               pos->target_list[i] = 0;
1766             }
1767          }
1768       if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers))
1769         {
1770           delete_migration_block (pos);
1771           consider_migration_gathering ();
1772           continue;
1773         }
1774       GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1775                                              &consider_migration,
1776                                              pos);
1777     }
1778   GNUNET_PEER_change_rc (cp->pid, -1);
1779   GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1780   if (NULL != cp->cth)
1781     {
1782       GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1783       cp->cth = NULL;
1784     }
1785   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
1786     {
1787       GNUNET_SCHEDULER_cancel (sched, cp->delayed_transmission_request_task);
1788       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
1789     }
1790   while (NULL != (pm = cp->pending_messages_head))
1791     destroy_pending_message (pm, 0 /* delivery failed */);
1792   GNUNET_LOAD_value_free (cp->transmission_delay);
1793   GNUNET_break (0 == cp->pending_requests);
1794   GNUNET_free (cp);
1795 }
1796
1797
1798 /**
1799  * Iterator over hash map entries that removes all occurences
1800  * of the given 'client' from the 'last_client_replies' of the
1801  * given connected peer.
1802  *
1803  * @param cls closure, the 'struct GNUNET_SERVER_Client*' to remove
1804  * @param key current key code (unused)
1805  * @param value value in the hash map (the 'struct ConnectedPeer*' to change)
1806  * @return GNUNET_YES (we should continue to iterate)
1807  */
1808 static int
1809 remove_client_from_last_client_replies (void *cls,
1810                                         const GNUNET_HashCode * key,
1811                                         void *value)
1812 {
1813   struct GNUNET_SERVER_Client *client = cls;
1814   struct ConnectedPeer *cp = value;
1815   unsigned int i;
1816
1817   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1818     {
1819       if (cp->last_client_replies[i] == client)
1820         {
1821           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1822           cp->last_client_replies[i] = NULL;
1823         }
1824     }  
1825   return GNUNET_YES;
1826 }
1827
1828
1829 /**
1830  * A client disconnected.  Remove all of its pending queries.
1831  *
1832  * @param cls closure, NULL
1833  * @param client identification of the client
1834  */
1835 static void
1836 handle_client_disconnect (void *cls,
1837                           struct GNUNET_SERVER_Client
1838                           * client)
1839 {
1840   struct ClientList *pos;
1841   struct ClientList *prev;
1842   struct ClientRequestList *rcl;
1843   struct ClientResponseMessage *creply;
1844
1845   if (client == NULL)
1846     return;
1847   prev = NULL;
1848   pos = client_list;
1849   while ( (NULL != pos) &&
1850           (pos->client != client) )
1851     {
1852       prev = pos;
1853       pos = pos->next;
1854     }
1855   if (pos == NULL)
1856     return; /* no requests pending for this client */
1857   while (NULL != (rcl = pos->rl_head))
1858     {
1859       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1860                   "Destroying pending request `%s' on disconnect\n",
1861                   GNUNET_h2s (&rcl->req->query));
1862       destroy_pending_request (rcl->req);
1863     }
1864   if (prev == NULL)
1865     client_list = pos->next;
1866   else
1867     prev->next = pos->next;
1868   if (pos->th != NULL)
1869     {
1870       GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
1871       pos->th = NULL;
1872     }
1873   while (NULL != (creply = pos->res_head))
1874     {
1875       GNUNET_CONTAINER_DLL_remove (pos->res_head,
1876                                    pos->res_tail,
1877                                    creply);
1878       GNUNET_free (creply);
1879     }    
1880   GNUNET_SERVER_client_drop (pos->client);
1881   GNUNET_free (pos);
1882   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1883                                          &remove_client_from_last_client_replies,
1884                                          client);
1885 }
1886
1887
1888 /**
1889  * Iterator to free peer entries.
1890  *
1891  * @param cls closure, unused
1892  * @param key current key code
1893  * @param value value in the hash map (peer entry)
1894  * @return GNUNET_YES (we should continue to iterate)
1895  */
1896 static int 
1897 clean_peer (void *cls,
1898             const GNUNET_HashCode * key,
1899             void *value)
1900 {
1901   peer_disconnect_handler (NULL, (const struct GNUNET_PeerIdentity*) key);
1902   return GNUNET_YES;
1903 }
1904
1905
1906 /**
1907  * Task run during shutdown.
1908  *
1909  * @param cls unused
1910  * @param tc unused
1911  */
1912 static void
1913 shutdown_task (void *cls,
1914                const struct GNUNET_SCHEDULER_TaskContext *tc)
1915 {
1916   if (mig_qe != NULL)
1917     {
1918       GNUNET_DATASTORE_cancel (mig_qe);
1919       mig_qe = NULL;
1920     }
1921   if (dht_qe != NULL)
1922     {
1923       GNUNET_DATASTORE_cancel (dht_qe);
1924       dht_qe = NULL;
1925     }
1926   if (GNUNET_SCHEDULER_NO_TASK != mig_task)
1927     {
1928       GNUNET_SCHEDULER_cancel (sched, mig_task);
1929       mig_task = GNUNET_SCHEDULER_NO_TASK;
1930     }
1931   if (GNUNET_SCHEDULER_NO_TASK != dht_task)
1932     {
1933       GNUNET_SCHEDULER_cancel (sched, dht_task);
1934       dht_task = GNUNET_SCHEDULER_NO_TASK;
1935     }
1936   while (client_list != NULL)
1937     handle_client_disconnect (NULL,
1938                               client_list->client);
1939   cron_flush_trust (NULL, NULL);
1940   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1941                                          &clean_peer,
1942                                          NULL);
1943   GNUNET_break (0 == GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap));
1944   GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
1945   requests_by_expiration_heap = 0;
1946   GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
1947   connected_peers = NULL;
1948   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map));
1949   GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
1950   query_request_map = NULL;
1951   GNUNET_LOAD_value_free (rt_entry_lifetime);
1952   rt_entry_lifetime = NULL;
1953   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map));
1954   GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
1955   peer_request_map = NULL;
1956   GNUNET_assert (NULL != core);
1957   GNUNET_CORE_disconnect (core);
1958   core = NULL;
1959   if (stats != NULL)
1960     {
1961       GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
1962       stats = NULL;
1963     }
1964   if (dsh != NULL)
1965     {
1966       GNUNET_DATASTORE_disconnect (dsh,
1967                                    GNUNET_NO);
1968       dsh = NULL;
1969     }
1970   while (mig_head != NULL)
1971     delete_migration_block (mig_head);
1972   GNUNET_assert (0 == mig_size);
1973   GNUNET_DHT_disconnect (dht_handle);
1974   dht_handle = NULL;
1975   GNUNET_LOAD_value_free (datastore_get_load);
1976   datastore_get_load = NULL;
1977   GNUNET_LOAD_value_free (datastore_put_load);
1978   datastore_put_load = NULL;
1979   GNUNET_BLOCK_context_destroy (block_ctx);
1980   block_ctx = NULL;
1981   GNUNET_CONFIGURATION_destroy (block_cfg);
1982   block_cfg = NULL;
1983   sched = NULL;
1984   cfg = NULL;  
1985   GNUNET_free_non_null (trustDirectory);
1986   trustDirectory = NULL;
1987 }
1988
1989
1990 /* ******************* Utility functions  ******************** */
1991
1992
1993 /**
1994  * We've had to delay a request for transmission to core, but now
1995  * we should be ready.  Run it.
1996  *
1997  * @param cls the 'struct ConnectedPeer' for which a request was delayed
1998  * @param tc task context (unused)
1999  */
2000 static void
2001 delayed_transmission_request (void *cls,
2002                               const struct GNUNET_SCHEDULER_TaskContext *tc)
2003 {
2004   struct ConnectedPeer *cp = cls;
2005   struct GNUNET_PeerIdentity pid;
2006   struct PendingMessage *pm;
2007
2008   pm = cp->pending_messages_head;
2009   cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
2010   GNUNET_assert (cp->cth == NULL);
2011   if (pm == NULL)
2012     return;
2013   GNUNET_PEER_resolve (cp->pid,
2014                        &pid);
2015   cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
2016   cp->cth = GNUNET_CORE_notify_transmit_ready (core,
2017                                                pm->priority,
2018                                                GNUNET_CONSTANTS_SERVICE_TIMEOUT,
2019                                                &pid,
2020                                                pm->msize,
2021                                                &transmit_to_peer,
2022                                                cp);
2023 }
2024
2025
2026 /**
2027  * Transmit messages by copying it to the target buffer
2028  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
2029  * for writing in the meantime.  In that case, do nothing
2030  * (the disconnect or shutdown handler will take care of the rest).
2031  * If we were able to transmit messages and there are still more
2032  * pending, ask core again for further calls to this function.
2033  *
2034  * @param cls closure, pointer to the 'struct ConnectedPeer*'
2035  * @param size number of bytes available in buf
2036  * @param buf where the callee should write the message
2037  * @return number of bytes written to buf
2038  */
2039 static size_t
2040 transmit_to_peer (void *cls,
2041                   size_t size, void *buf)
2042 {
2043   struct ConnectedPeer *cp = cls;
2044   char *cbuf = buf;
2045   struct PendingMessage *pm;
2046   struct PendingMessage *next_pm;
2047   struct GNUNET_TIME_Absolute now;
2048   struct GNUNET_TIME_Relative min_delay;
2049   struct MigrationReadyBlock *mb;
2050   struct MigrationReadyBlock *next;
2051   struct PutMessage migm;
2052   size_t msize;
2053   unsigned int i;
2054   struct GNUNET_PeerIdentity pid;
2055  
2056   cp->cth = NULL;
2057   if (NULL == buf)
2058     {
2059 #if DEBUG_FS
2060       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2061                   "Dropping message, core too busy.\n");
2062 #endif
2063       GNUNET_LOAD_update (cp->transmission_delay,
2064                           UINT64_MAX);
2065       return 0;
2066     }  
2067   GNUNET_LOAD_update (cp->transmission_delay,
2068                       GNUNET_TIME_absolute_get_duration (cp->last_transmission_request_start).value);
2069   now = GNUNET_TIME_absolute_get ();
2070   msize = 0;
2071   min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
2072   next_pm = cp->pending_messages_head;
2073   while ( (NULL != (pm = next_pm) ) &&
2074           (pm->msize <= size) )
2075     {
2076       next_pm = pm->next;
2077       if (pm->delay_until.value > now.value)
2078         {
2079           min_delay = GNUNET_TIME_relative_min (min_delay,
2080                                                 GNUNET_TIME_absolute_get_remaining (pm->delay_until));
2081           continue;
2082         }
2083       memcpy (&cbuf[msize], &pm[1], pm->msize);
2084       msize += pm->msize;
2085       size -= pm->msize;
2086       if (NULL == pm->pml)
2087         {
2088           GNUNET_CONTAINER_DLL_remove (cp->pending_messages_head,
2089                                        cp->pending_messages_tail,
2090                                        pm);
2091           cp->pending_requests--;
2092         }
2093       destroy_pending_message (pm, cp->pid);
2094     }
2095   if (pm != NULL)
2096     min_delay = GNUNET_TIME_UNIT_ZERO;
2097   if (NULL != cp->pending_messages_head)
2098     {     
2099       GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == cp->delayed_transmission_request_task);
2100       cp->delayed_transmission_request_task
2101         = GNUNET_SCHEDULER_add_delayed (sched,
2102                                         min_delay,
2103                                         &delayed_transmission_request,
2104                                         cp);
2105     }
2106   if (pm == NULL)
2107     {      
2108       GNUNET_PEER_resolve (cp->pid,
2109                            &pid);
2110       next = mig_head;
2111       while (NULL != (mb = next))
2112         {
2113           next = mb->next;
2114           for (i=0;i<MIGRATION_LIST_SIZE;i++)
2115             {
2116               if ( (cp->pid == mb->target_list[i]) &&
2117                    (mb->size + sizeof (migm) <= size) )
2118                 {
2119                   GNUNET_PEER_change_rc (mb->target_list[i], -1);
2120                   mb->target_list[i] = 0;
2121                   mb->used_targets++;
2122                   memset (&migm, 0, sizeof (migm));
2123                   migm.header.size = htons (sizeof (migm) + mb->size);
2124                   migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2125                   migm.type = htonl (mb->type);
2126                   migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
2127                   memcpy (&cbuf[msize], &migm, sizeof (migm));
2128                   msize += sizeof (migm);
2129                   size -= sizeof (migm);
2130                   memcpy (&cbuf[msize], &mb[1], mb->size);
2131                   msize += mb->size;
2132                   size -= mb->size;
2133 #if DEBUG_FS
2134                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2135                               "Pushing migration block `%s' (%u bytes) to `%s'\n",
2136                               GNUNET_h2s (&mb->query),
2137                               (unsigned int) mb->size,
2138                               GNUNET_i2s (&pid));
2139 #endif    
2140                   break;
2141                 }
2142               else
2143                 {
2144 #if DEBUG_FS
2145                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2146                               "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
2147                               GNUNET_h2s (&mb->query),
2148                               (unsigned int) mb->size,
2149                               GNUNET_i2s (&pid));
2150 #endif    
2151                 }
2152             }
2153           if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
2154                (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
2155             {
2156               delete_migration_block (mb);
2157               consider_migration_gathering ();
2158             }
2159         }
2160       consider_migration (NULL, 
2161                           &pid.hashPubKey,
2162                           cp);
2163     }
2164 #if DEBUG_FS
2165   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2166               "Transmitting %u bytes to peer with PID %u\n",
2167               (unsigned int) msize,
2168               (unsigned int) cp->pid);
2169 #endif
2170   return msize;
2171 }
2172
2173
2174 /**
2175  * Add a message to the set of pending messages for the given peer.
2176  *
2177  * @param cp peer to send message to
2178  * @param pm message to queue
2179  * @param pr request on which behalf this message is being queued
2180  */
2181 static void
2182 add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
2183                                   struct PendingMessage *pm,
2184                                   struct PendingRequest *pr)
2185 {
2186   struct PendingMessage *pos;
2187   struct PendingMessageList *pml;
2188   struct GNUNET_PeerIdentity pid;
2189
2190   GNUNET_assert (pm->next == NULL);
2191   GNUNET_assert (pm->pml == NULL);    
2192   if (pr != NULL)
2193     {
2194       pml = GNUNET_malloc (sizeof (struct PendingMessageList));
2195       pml->req = pr;
2196       pml->target = cp;
2197       pml->pm = pm;
2198       pm->pml = pml;  
2199       GNUNET_CONTAINER_DLL_insert (pr->pending_head,
2200                                    pr->pending_tail,
2201                                    pml);
2202     }
2203   pos = cp->pending_messages_head;
2204   while ( (pos != NULL) &&
2205           (pm->priority < pos->priority) )
2206     pos = pos->next;    
2207   GNUNET_CONTAINER_DLL_insert_after (cp->pending_messages_head,
2208                                      cp->pending_messages_tail,
2209                                      pos,
2210                                      pm);
2211   cp->pending_requests++;
2212   if (cp->pending_requests > MAX_QUEUE_PER_PEER)
2213     {
2214       GNUNET_STATISTICS_update (stats,
2215                                 gettext_noop ("# P2P searches discarded (queue length bound)"),
2216                                 1,
2217                                 GNUNET_NO);
2218       destroy_pending_message (cp->pending_messages_tail, 0);  
2219     }
2220   GNUNET_PEER_resolve (cp->pid, &pid);
2221   if (NULL != cp->cth)
2222     {
2223       GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
2224       cp->cth = NULL;
2225     }
2226   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
2227     {
2228       GNUNET_SCHEDULER_cancel (sched, cp->delayed_transmission_request_task);
2229       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
2230     }
2231   /* need to schedule transmission */
2232   cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
2233   cp->cth = GNUNET_CORE_notify_transmit_ready (core,
2234                                                cp->pending_messages_head->priority,
2235                                                MAX_TRANSMIT_DELAY,
2236                                                &pid,
2237                                                cp->pending_messages_head->msize,
2238                                                &transmit_to_peer,
2239                                                cp);
2240   if (cp->cth == NULL)
2241     {
2242 #if DEBUG_FS
2243       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2244                   "Failed to schedule transmission with core!\n");
2245 #endif
2246       GNUNET_STATISTICS_update (stats,
2247                                 gettext_noop ("# CORE transmission failures"),
2248                                 1,
2249                                 GNUNET_NO);
2250     }
2251 }
2252
2253
2254 /**
2255  * Test if the DATABASE (GET) load on this peer is too high
2256  * to even consider processing the query at
2257  * all.  
2258  * 
2259  * @return GNUNET_YES if the load is too high to do anything (load high)
2260  *         GNUNET_NO to process normally (load normal)
2261  *         GNUNET_SYSERR to process for free (load low)
2262  */
2263 static int
2264 test_get_load_too_high (uint32_t priority)
2265 {
2266   double ld;
2267
2268   ld = GNUNET_LOAD_get_load (datastore_get_load);
2269   if (ld < 1)
2270     return GNUNET_SYSERR;    
2271   if (ld <= priority)    
2272     return GNUNET_NO;    
2273   return GNUNET_YES;
2274 }
2275
2276
2277
2278
2279 /**
2280  * Test if the DATABASE (PUT) load on this peer is too high
2281  * to even consider processing the query at
2282  * all.  
2283  * 
2284  * @return GNUNET_YES if the load is too high to do anything (load high)
2285  *         GNUNET_NO to process normally (load normal or low)
2286  */
2287 static int
2288 test_put_load_too_high (uint32_t priority)
2289 {
2290   double ld;
2291
2292   if (GNUNET_LOAD_get_average (datastore_put_load) < 50)
2293     return GNUNET_NO; /* very fast */
2294   ld = GNUNET_LOAD_get_load (datastore_put_load);
2295   if (ld < 2.0 * (1 + priority))
2296     return GNUNET_NO;
2297   GNUNET_STATISTICS_update (stats,
2298                             gettext_noop ("# storage requests dropped due to high load"),
2299                             1,
2300                             GNUNET_NO);
2301   return GNUNET_YES;
2302 }
2303
2304
2305 /* ******************* Pending Request Refresh Task ******************** */
2306
2307
2308
2309 /**
2310  * We use a random delay to make the timing of requests less
2311  * predictable.  This function returns such a random delay.  We add a base
2312  * delay of MAX_CORK_DELAY (1s).
2313  *
2314  * FIXME: make schedule dependent on the specifics of the request?
2315  * Or bandwidth and number of connected peers and load?
2316  *
2317  * @return random delay to use for some request, between 1s and 1000+TTL_DECREMENT ms
2318  */
2319 static struct GNUNET_TIME_Relative
2320 get_processing_delay ()
2321 {
2322   return 
2323     GNUNET_TIME_relative_add (GNUNET_CONSTANTS_MAX_CORK_DELAY,
2324                               GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
2325                                                              GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2326                                                                                        TTL_DECREMENT)));
2327 }
2328
2329
2330 /**
2331  * We're processing a GET request from another peer and have decided
2332  * to forward it to other peers.  This function is called periodically
2333  * and should forward the request to other peers until we have all
2334  * possible replies.  If we have transmitted the *only* reply to
2335  * the initiator we should destroy the pending request.  If we have
2336  * many replies in the queue to the initiator, we should delay sending
2337  * out more queries until the reply queue has shrunk some.
2338  *
2339  * @param cls our "struct ProcessGetContext *"
2340  * @param tc unused
2341  */
2342 static void
2343 forward_request_task (void *cls,
2344                       const struct GNUNET_SCHEDULER_TaskContext *tc);
2345
2346
2347 /**
2348  * Function called after we either failed or succeeded
2349  * at transmitting a query to a peer.  
2350  *
2351  * @param cls the requests "struct PendingRequest*"
2352  * @param tpid ID of receiving peer, 0 on transmission error
2353  */
2354 static void
2355 transmit_query_continuation (void *cls,
2356                              GNUNET_PEER_Id tpid)
2357 {
2358   struct PendingRequest *pr = cls;
2359   unsigned int i;
2360
2361   GNUNET_STATISTICS_update (stats,
2362                             gettext_noop ("# queries scheduled for forwarding"),
2363                             -1,
2364                             GNUNET_NO);
2365   if (tpid == 0)   
2366     {
2367 #if DEBUG_FS
2368       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2369                   "Transmission of request failed, will try again later.\n");
2370 #endif
2371       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2372         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2373                                                  get_processing_delay (),
2374                                                  &forward_request_task,
2375                                                  pr); 
2376       return;    
2377     }
2378 #if DEBUG_FS
2379   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2380               "Transmitted query `%s'\n",
2381               GNUNET_h2s (&pr->query));
2382 #endif
2383   GNUNET_STATISTICS_update (stats,
2384                             gettext_noop ("# queries forwarded"),
2385                             1,
2386                             GNUNET_NO);
2387   for (i=0;i<pr->used_targets_off;i++)
2388     if (pr->used_targets[i].pid == tpid)
2389       break; /* found match! */    
2390   if (i == pr->used_targets_off)
2391     {
2392       /* need to create new entry */
2393       if (pr->used_targets_off == pr->used_targets_size)
2394         GNUNET_array_grow (pr->used_targets,
2395                            pr->used_targets_size,
2396                            pr->used_targets_size * 2 + 2);
2397       GNUNET_PEER_change_rc (tpid, 1);
2398       pr->used_targets[pr->used_targets_off].pid = tpid;
2399       pr->used_targets[pr->used_targets_off].num_requests = 0;
2400       i = pr->used_targets_off++;
2401     }
2402   pr->used_targets[i].last_request_time = GNUNET_TIME_absolute_get ();
2403   pr->used_targets[i].num_requests++;
2404   if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2405     pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2406                                              get_processing_delay (),
2407                                              &forward_request_task,
2408                                              pr);
2409 }
2410
2411
2412 /**
2413  * How many bytes should a bloomfilter be if we have already seen
2414  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
2415  * of bits set per entry.  Furthermore, we should not re-size the
2416  * filter too often (to keep it cheap).
2417  *
2418  * Since other peers will also add entries but not resize the filter,
2419  * we should generally pick a slightly larger size than what the
2420  * strict math would suggest.
2421  *
2422  * @return must be a power of two and smaller or equal to 2^15.
2423  */
2424 static size_t
2425 compute_bloomfilter_size (unsigned int entry_count)
2426 {
2427   size_t size;
2428   unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
2429   uint16_t max = 1 << 15;
2430
2431   if (entry_count > max)
2432     return max;
2433   size = 8;
2434   while ((size < max) && (size < ideal))
2435     size *= 2;
2436   if (size > max)
2437     return max;
2438   return size;
2439 }
2440
2441
2442 /**
2443  * Recalculate our bloom filter for filtering replies.  This function
2444  * will create a new bloom filter from scratch, so it should only be
2445  * called if we have no bloomfilter at all (and hence can create a
2446  * fresh one of minimal size without problems) OR if our peer is the
2447  * initiator (in which case we may resize to larger than mimimum size).
2448  *
2449  * @param pr request for which the BF is to be recomputed
2450  */
2451 static void
2452 refresh_bloomfilter (struct PendingRequest *pr)
2453 {
2454   unsigned int i;
2455   size_t nsize;
2456   GNUNET_HashCode mhash;
2457
2458   nsize = compute_bloomfilter_size (pr->replies_seen_off);
2459   if (nsize == pr->bf_size)
2460     return; /* size not changed */
2461   if (pr->bf != NULL)
2462     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
2463   pr->bf_size = nsize;
2464   pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
2465   pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
2466                                               pr->bf_size,
2467                                               BLOOMFILTER_K);
2468   for (i=0;i<pr->replies_seen_off;i++)
2469     {
2470       GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i],
2471                                 pr->mingle,
2472                                 &mhash);
2473       GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
2474     }
2475 }
2476
2477
2478 /**
2479  * Function called after we've tried to reserve a certain amount of
2480  * bandwidth for a reply.  Check if we succeeded and if so send our
2481  * query.
2482  *
2483  * @param cls the requests "struct PendingRequest*"
2484  * @param peer identifies the peer
2485  * @param bpm_in set to the current bandwidth limit (receiving) for this peer
2486  * @param bpm_out set to the current bandwidth limit (sending) for this peer
2487  * @param amount set to the amount that was actually reserved or unreserved
2488  * @param preference current traffic preference for the given peer
2489  */
2490 static void
2491 target_reservation_cb (void *cls,
2492                        const struct
2493                        GNUNET_PeerIdentity * peer,
2494                        struct GNUNET_BANDWIDTH_Value32NBO bpm_in,
2495                        struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
2496                        int amount,
2497                        uint64_t preference)
2498 {
2499   struct PendingRequest *pr = cls;
2500   struct ConnectedPeer *cp;
2501   struct PendingMessage *pm;
2502   struct GetMessage *gm;
2503   GNUNET_HashCode *ext;
2504   char *bfdata;
2505   size_t msize;
2506   unsigned int k;
2507   int no_route;
2508   uint32_t bm;
2509   unsigned int i;
2510
2511   pr->irc = NULL;
2512   if (peer == NULL)
2513     {
2514       /* error in communication with core, try again later */
2515       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2516         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2517                                                  get_processing_delay (),
2518                                                  &forward_request_task,
2519                                                  pr);
2520       return;
2521     }
2522   /* (3) transmit, update ttl/priority */
2523   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2524                                           &peer->hashPubKey);
2525   if (cp == NULL)
2526     {
2527       /* Peer must have just left */
2528 #if DEBUG_FS
2529       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2530                   "Selected peer disconnected!\n");
2531 #endif
2532       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2533         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2534                                                  get_processing_delay (),
2535                                                  &forward_request_task,
2536                                                  pr);
2537       return;
2538     }
2539   no_route = GNUNET_NO;
2540   if (amount == 0)
2541     {
2542       if (pr->cp == NULL)
2543         {
2544 #if DEBUG_FS > 1
2545           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2546                       "Failed to reserve bandwidth for reply (got %d/%u bytes only)!\n",
2547                       amount,
2548                       DBLOCK_SIZE);
2549 #endif
2550           GNUNET_STATISTICS_update (stats,
2551                                     gettext_noop ("# reply bandwidth reservation requests failed"),
2552                                     1,
2553                                     GNUNET_NO);
2554           if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2555             pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2556                                                      get_processing_delay (),
2557                                                      &forward_request_task,
2558                                                      pr);
2559           return;  /* this target round failed */
2560         }
2561       no_route = GNUNET_YES;
2562     }
2563   
2564   GNUNET_STATISTICS_update (stats,
2565                             gettext_noop ("# queries scheduled for forwarding"),
2566                             1,
2567                             GNUNET_NO);
2568   for (i=0;i<pr->used_targets_off;i++)
2569     if (pr->used_targets[i].pid == cp->pid) 
2570       {
2571         GNUNET_STATISTICS_update (stats,
2572                                   gettext_noop ("# queries retransmitted to same target"),
2573                                   1,
2574                                   GNUNET_NO);
2575         break;
2576       } 
2577
2578   /* build message and insert message into priority queue */
2579 #if DEBUG_FS
2580   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2581               "Forwarding request `%s' to `%4s'!\n",
2582               GNUNET_h2s (&pr->query),
2583               GNUNET_i2s (peer));
2584 #endif
2585   k = 0;
2586   bm = 0;
2587   if (GNUNET_YES == no_route)
2588     {
2589       bm |= GET_MESSAGE_BIT_RETURN_TO;
2590       k++;      
2591     }
2592   if (pr->namespace != NULL)
2593     {
2594       bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
2595       k++;
2596     }
2597   if (pr->target_pid != 0)
2598     {
2599       bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
2600       k++;
2601     }
2602   msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
2603   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
2604   pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
2605   pm->msize = msize;
2606   gm = (struct GetMessage*) &pm[1];
2607   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
2608   gm->header.size = htons (msize);
2609   gm->type = htonl (pr->type);
2610   pr->remaining_priority /= 2;
2611   gm->priority = htonl (pr->remaining_priority);
2612   gm->ttl = htonl (pr->ttl);
2613   gm->filter_mutator = htonl(pr->mingle); 
2614   gm->hash_bitmap = htonl (bm);
2615   gm->query = pr->query;
2616   ext = (GNUNET_HashCode*) &gm[1];
2617   k = 0;
2618   if (GNUNET_YES == no_route)
2619     GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2620   if (pr->namespace != NULL)
2621     memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
2622   if (pr->target_pid != 0)
2623     GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2624   bfdata = (char *) &ext[k];
2625   if (pr->bf != NULL)
2626     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
2627                                                bfdata,
2628                                                pr->bf_size);
2629   pm->cont = &transmit_query_continuation;
2630   pm->cont_cls = pr;
2631   cp->last_request_times[(cp->last_request_times_off++) % MAX_QUEUE_PER_PEER] = GNUNET_TIME_absolute_get ();
2632   add_to_pending_messages_for_peer (cp, pm, pr);
2633 }
2634
2635
2636 /**
2637  * Closure used for "target_peer_select_cb".
2638  */
2639 struct PeerSelectionContext 
2640 {
2641   /**
2642    * The request for which we are selecting
2643    * peers.
2644    */
2645   struct PendingRequest *pr;
2646
2647   /**
2648    * Current "prime" target.
2649    */
2650   struct GNUNET_PeerIdentity target;
2651
2652   /**
2653    * How much do we like this target?
2654    */
2655   double target_score;
2656
2657 };
2658
2659
2660 /**
2661  * Function called for each connected peer to determine
2662  * which one(s) would make good targets for forwarding.
2663  *
2664  * @param cls closure (struct PeerSelectionContext)
2665  * @param key current key code (peer identity)
2666  * @param value value in the hash map (struct ConnectedPeer)
2667  * @return GNUNET_YES if we should continue to
2668  *         iterate,
2669  *         GNUNET_NO if not.
2670  */
2671 static int
2672 target_peer_select_cb (void *cls,
2673                        const GNUNET_HashCode * key,
2674                        void *value)
2675 {
2676   struct PeerSelectionContext *psc = cls;
2677   struct ConnectedPeer *cp = value;
2678   struct PendingRequest *pr = psc->pr;
2679   struct GNUNET_TIME_Relative delay;
2680   double score;
2681   unsigned int i;
2682   unsigned int pc;
2683
2684   /* 1) check that this peer is not the initiator */
2685   if (cp == pr->cp)
2686     {
2687 #if DEBUG_FS
2688       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2689                   "Skipping initiator in forwarding selection\n");
2690 #endif
2691       return GNUNET_YES; /* skip */        
2692     }
2693
2694   /* 2) check if we have already (recently) forwarded to this peer */
2695   /* 2a) this particular request */
2696   pc = 0;
2697   for (i=0;i<pr->used_targets_off;i++)
2698     if (pr->used_targets[i].pid == cp->pid) 
2699       {
2700         pc = pr->used_targets[i].num_requests;
2701         GNUNET_assert (pc > 0);
2702         if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2703                                            RETRY_PROBABILITY_INV * pc))
2704           {
2705 #if DEBUG_FS
2706             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2707                         "NOT re-trying query that was previously transmitted %u times\n",
2708                         (unsigned int) pc);
2709 #endif
2710             return GNUNET_YES; /* skip */
2711           }
2712         break;
2713       }
2714 #if DEBUG_FS
2715   if (0 < pc)
2716     {
2717       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2718                   "Re-trying query that was previously transmitted %u times to this peer\n",
2719                   (unsigned int) pc);
2720     }
2721 #endif
2722   /* 2b) many other requests to this peer */
2723   delay = GNUNET_TIME_absolute_get_duration (cp->last_request_times[cp->last_request_times_off % MAX_QUEUE_PER_PEER]);
2724   if (delay.value <= cp->avg_delay.value)
2725     {
2726 #if DEBUG_FS
2727       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2728                   "NOT sending query since we send %u others to this peer in the last %llums\n",
2729                   MAX_QUEUE_PER_PEER,
2730                   cp->avg_delay.value);
2731 #endif
2732       return GNUNET_YES; /* skip */      
2733     }
2734
2735   /* 3) calculate how much we'd like to forward to this peer,
2736      starting with a random value that is strong enough
2737      to at least give any peer a chance sometimes 
2738      (compared to the other factors that come later) */
2739   /* 3a) count successful (recent) routes from cp for same source */
2740   if (pr->cp != NULL)
2741     {
2742       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2743                                         P2P_SUCCESS_LIST_SIZE);
2744       for (i=0;i<P2P_SUCCESS_LIST_SIZE;i++)
2745         if (cp->last_p2p_replies[i] == pr->cp->pid)
2746           score += 1.0; /* likely successful based on hot path */
2747     }
2748   else
2749     {
2750       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2751                                         CS2P_SUCCESS_LIST_SIZE);
2752       for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
2753         if (cp->last_client_replies[i] == pr->client_request_list->client_list->client)
2754           score += 1.0; /* likely successful based on hot path */
2755     }
2756   /* 3b) include latency */
2757   if (cp->avg_delay.value < 4 * TTL_DECREMENT)
2758     score += 1.0; /* likely fast based on latency */
2759   /* 3c) include priorities */
2760   if (cp->avg_priority <= pr->remaining_priority / 2.0)
2761     score += 1.0; /* likely successful based on priorities */
2762   /* 3d) penalize for queue size */  
2763   score -= (2.0 * cp->pending_requests / (double) MAX_QUEUE_PER_PEER); 
2764   /* 3e) include peer proximity */
2765   score -= (2.0 * (GNUNET_CRYPTO_hash_distance_u32 (key,
2766                                                     &pr->query)) / (double) UINT32_MAX);
2767   /* 4) super-bonus for being the known target */
2768   if (pr->target_pid == cp->pid)
2769     score += 100.0;
2770   /* store best-fit in closure */
2771 #if DEBUG_FS
2772   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2773               "Peer `%s' gets score %f for forwarding query, max is %f\n",
2774               GNUNET_h2s (key),
2775               score,
2776               psc->target_score);
2777 #endif  
2778   score++; /* avoid zero */
2779   if (score > psc->target_score)
2780     {
2781       psc->target_score = score;
2782       psc->target.hashPubKey = *key; 
2783     }
2784   return GNUNET_YES;
2785 }
2786   
2787
2788 /**
2789  * The priority level imposes a bound on the maximum
2790  * value for the ttl that can be requested.
2791  *
2792  * @param ttl_in requested ttl
2793  * @param prio given priority
2794  * @return ttl_in if ttl_in is below the limit,
2795  *         otherwise the ttl-limit for the given priority
2796  */
2797 static int32_t
2798 bound_ttl (int32_t ttl_in, uint32_t prio)
2799 {
2800   unsigned long long allowed;
2801
2802   if (ttl_in <= 0)
2803     return ttl_in;
2804   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
2805   if (ttl_in > allowed)      
2806     {
2807       if (allowed >= (1 << 30))
2808         return 1 << 30;
2809       return allowed;
2810     }
2811   return ttl_in;
2812 }
2813
2814
2815 /**
2816  * Iterator called on each result obtained for a DHT
2817  * operation that expects a reply
2818  *
2819  * @param cls closure
2820  * @param exp when will this value expire
2821  * @param key key of the result
2822  * @param get_path NULL-terminated array of pointers
2823  *                 to the peers on reverse GET path (or NULL if not recorded)
2824  * @param put_path NULL-terminated array of pointers
2825  *                 to the peers on the PUT path (or NULL if not recorded)
2826  * @param type type of the result
2827  * @param size number of bytes in data
2828  * @param data pointer to the result data
2829  */
2830 static void
2831 process_dht_reply (void *cls,
2832                    struct GNUNET_TIME_Absolute exp,
2833                    const GNUNET_HashCode * key,
2834                    const struct GNUNET_PeerIdentity * const *get_path,
2835                    const struct GNUNET_PeerIdentity * const *put_path,
2836                    enum GNUNET_BLOCK_Type type,
2837                    size_t size,
2838                    const void *data);
2839
2840
2841 /**
2842  * We're processing a GET request and have decided
2843  * to forward it to other peers.  This function is called periodically
2844  * and should forward the request to other peers until we have all
2845  * possible replies.  If we have transmitted the *only* reply to
2846  * the initiator we should destroy the pending request.  If we have
2847  * many replies in the queue to the initiator, we should delay sending
2848  * out more queries until the reply queue has shrunk some.
2849  *
2850  * @param cls our "struct ProcessGetContext *"
2851  * @param tc unused
2852  */
2853 static void
2854 forward_request_task (void *cls,
2855                      const struct GNUNET_SCHEDULER_TaskContext *tc)
2856 {
2857   struct PendingRequest *pr = cls;
2858   struct PeerSelectionContext psc;
2859   struct ConnectedPeer *cp; 
2860   struct GNUNET_TIME_Relative delay;
2861
2862   pr->task = GNUNET_SCHEDULER_NO_TASK;
2863   if (pr->irc != NULL)
2864     {
2865 #if DEBUG_FS
2866       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2867                   "Forwarding of query `%s' not attempted due to pending local lookup!\n",
2868                   GNUNET_h2s (&pr->query));
2869 #endif
2870       return; /* already pending */
2871     }
2872   if (GNUNET_YES == pr->local_only)
2873     return; /* configured to not do P2P search */
2874   /* (0) try DHT */
2875   if ( (0 == pr->anonymity_level) &&
2876        (GNUNET_YES != pr->forward_only) &&
2877        (pr->type != GNUNET_BLOCK_TYPE_FS_DBLOCK) &&
2878        (pr->type != GNUNET_BLOCK_TYPE_FS_IBLOCK) )
2879     {
2880       pr->dht_get = GNUNET_DHT_get_start (dht_handle,
2881                                           GNUNET_TIME_UNIT_FOREVER_REL,
2882                                           pr->type,
2883                                           &pr->query,
2884                                           GNUNET_DHT_RO_NONE,
2885                                           pr->bf,
2886                                           pr->mingle,
2887                                           pr->namespace,
2888                                           (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
2889                                           &process_dht_reply,
2890                                           pr);
2891     }
2892   /* (1) select target */
2893   psc.pr = pr;
2894   psc.target_score = -DBL_MAX;
2895   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
2896                                          &target_peer_select_cb,
2897                                          &psc);  
2898   if (psc.target_score == -DBL_MAX)
2899     {
2900       delay = get_processing_delay ();
2901 #if DEBUG_FS 
2902       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2903                   "No peer selected for forwarding of query `%s', will try again in %llu ms!\n",
2904                   GNUNET_h2s (&pr->query),
2905                   delay.value);
2906 #endif
2907       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2908                                                delay,
2909                                                &forward_request_task,
2910                                                pr);
2911       return; /* nobody selected */
2912     }
2913   /* (3) update TTL/priority */
2914   if (pr->client_request_list != NULL)
2915     {
2916       /* FIXME: use better algorithm!? */
2917       if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2918                                          4))
2919         pr->priority++;
2920       /* bound priority we use by priorities we see from other peers
2921          rounded up (must round up so that we can see non-zero
2922          priorities, but round up as little as possible to make it
2923          plausible that we forwarded another peers request) */
2924       if (pr->priority > current_priorities + 1.0)
2925         pr->priority = (uint32_t) current_priorities + 1.0;
2926       pr->ttl = bound_ttl (pr->ttl + TTL_DECREMENT * 2,
2927                            pr->priority);
2928 #if DEBUG_FS
2929       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2930                   "Trying query `%s' with priority %u and TTL %d.\n",
2931                   GNUNET_h2s (&pr->query),
2932                   pr->priority,
2933                   pr->ttl);
2934 #endif
2935     }
2936
2937   /* (3) reserve reply bandwidth */
2938   if (GNUNET_NO == pr->forward_only)
2939     {
2940       cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2941                                               &psc.target.hashPubKey);
2942       GNUNET_assert (NULL != cp);
2943       pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
2944                                                     &psc.target,
2945                                                     GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
2946                                                     GNUNET_BANDWIDTH_value_init (UINT32_MAX),
2947                                                     DBLOCK_SIZE * 2, 
2948                                                     cp->inc_preference,
2949                                                     &target_reservation_cb,
2950                                                     pr);
2951       cp->inc_preference = 0;
2952     }
2953   else
2954     {
2955       /* force forwarding */
2956       static struct GNUNET_BANDWIDTH_Value32NBO zerobw;
2957       target_reservation_cb (pr, &psc.target,
2958                              zerobw, zerobw, 0, 0.0);
2959     }
2960 }
2961
2962
2963 /* **************************** P2P PUT Handling ************************ */
2964
2965
2966 /**
2967  * Function called after we either failed or succeeded
2968  * at transmitting a reply to a peer.  
2969  *
2970  * @param cls the requests "struct PendingRequest*"
2971  * @param tpid ID of receiving peer, 0 on transmission error
2972  */
2973 static void
2974 transmit_reply_continuation (void *cls,
2975                              GNUNET_PEER_Id tpid)
2976 {
2977   struct PendingRequest *pr = cls;
2978   
2979   switch (pr->type)
2980     {
2981     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
2982     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
2983       /* only one reply expected, done with the request! */
2984       destroy_pending_request (pr);
2985       break;
2986     case GNUNET_BLOCK_TYPE_ANY:
2987     case GNUNET_BLOCK_TYPE_FS_KBLOCK:
2988     case GNUNET_BLOCK_TYPE_FS_SBLOCK:
2989       break;
2990     default:
2991       GNUNET_break (0);
2992       break;
2993     }
2994 }
2995
2996
2997 /**
2998  * Transmit the given message by copying it to the target buffer
2999  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
3000  * for writing in the meantime.  In that case, do nothing
3001  * (the disconnect or shutdown handler will take care of the rest).
3002  * If we were able to transmit messages and there are still more
3003  * pending, ask core again for further calls to this function.
3004  *
3005  * @param cls closure, pointer to the 'struct ClientList*'
3006  * @param size number of bytes available in buf
3007  * @param buf where the callee should write the message
3008  * @return number of bytes written to buf
3009  */
3010 static size_t
3011 transmit_to_client (void *cls,
3012                   size_t size, void *buf)
3013 {
3014   struct ClientList *cl = cls;
3015   char *cbuf = buf;
3016   struct ClientResponseMessage *creply;
3017   size_t msize;
3018   
3019   cl->th = NULL;
3020   if (NULL == buf)
3021     {
3022 #if DEBUG_FS
3023       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3024                   "Not sending reply, client communication problem.\n");
3025 #endif
3026       return 0;
3027     }
3028   msize = 0;
3029   while ( (NULL != (creply = cl->res_head) ) &&
3030           (creply->msize <= size) )
3031     {
3032       memcpy (&cbuf[msize], &creply[1], creply->msize);
3033       msize += creply->msize;
3034       size -= creply->msize;
3035       GNUNET_CONTAINER_DLL_remove (cl->res_head,
3036                                    cl->res_tail,
3037                                    creply);
3038       GNUNET_free (creply);
3039     }
3040   if (NULL != creply)
3041     cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
3042                                                   creply->msize,
3043                                                   GNUNET_TIME_UNIT_FOREVER_REL,
3044                                                   &transmit_to_client,
3045                                                   cl);
3046 #if DEBUG_FS
3047   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3048               "Transmitted %u bytes to client\n",
3049               (unsigned int) msize);
3050 #endif
3051   return msize;
3052 }
3053
3054
3055 /**
3056  * Closure for "process_reply" function.
3057  */
3058 struct ProcessReplyClosure
3059 {
3060   /**
3061    * The data for the reply.
3062    */
3063   const void *data;
3064
3065   /**
3066    * Who gave us this reply? NULL for local host (or DHT)
3067    */
3068   struct ConnectedPeer *sender;
3069
3070   /**
3071    * When the reply expires.
3072    */
3073   struct GNUNET_TIME_Absolute expiration;
3074
3075   /**
3076    * Size of data.
3077    */
3078   size_t size;
3079
3080   /**
3081    * Type of the block.
3082    */
3083   enum GNUNET_BLOCK_Type type;
3084
3085   /**
3086    * How much was this reply worth to us?
3087    */
3088   uint32_t priority;
3089
3090   /**
3091    * Evaluation result (returned).
3092    */
3093   enum GNUNET_BLOCK_EvaluationResult eval;
3094
3095   /**
3096    * Did we finish processing the associated request?
3097    */ 
3098   int finished;
3099
3100   /**
3101    * Did we find a matching request?
3102    */
3103   int request_found;
3104 };
3105
3106
3107 /**
3108  * We have received a reply; handle it!
3109  *
3110  * @param cls response (struct ProcessReplyClosure)
3111  * @param key our query
3112  * @param value value in the hash map (info about the query)
3113  * @return GNUNET_YES (we should continue to iterate)
3114  */
3115 static int
3116 process_reply (void *cls,
3117                const GNUNET_HashCode * key,
3118                void *value)
3119 {
3120   struct ProcessReplyClosure *prq = cls;
3121   struct PendingRequest *pr = value;
3122   struct PendingMessage *reply;
3123   struct ClientResponseMessage *creply;
3124   struct ClientList *cl;
3125   struct PutMessage *pm;
3126   struct ConnectedPeer *cp;
3127   struct GNUNET_TIME_Relative cur_delay;
3128 #if SUPPORT_DELAYS  
3129 struct GNUNET_TIME_Relative art_delay;
3130 #endif
3131   size_t msize;
3132   unsigned int i;
3133
3134 #if DEBUG_FS
3135   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3136               "Matched result (type %u) for query `%s' with pending request\n",
3137               (unsigned int) prq->type,
3138               GNUNET_h2s (key));
3139 #endif  
3140   GNUNET_STATISTICS_update (stats,
3141                             gettext_noop ("# replies received and matched"),
3142                             1,
3143                             GNUNET_NO);
3144   if (prq->sender != NULL)
3145     {
3146       for (i=0;i<pr->used_targets_off;i++)
3147         if (pr->used_targets[i].pid == prq->sender->pid)
3148           break;
3149       if (i < pr->used_targets_off)
3150         {
3151           cur_delay = GNUNET_TIME_absolute_get_duration (pr->used_targets[i].last_request_time);      
3152           prq->sender->avg_delay.value
3153             = (prq->sender->avg_delay.value * 
3154                (RUNAVG_DELAY_N - 1) + cur_delay.value) / RUNAVG_DELAY_N; 
3155           prq->sender->avg_priority
3156             = (prq->sender->avg_priority * 
3157                (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
3158         }
3159       if (pr->cp != NULL)
3160         {
3161           GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
3162                                  [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], 
3163                                  -1);
3164           GNUNET_PEER_change_rc (pr->cp->pid, 1);
3165           prq->sender->last_p2p_replies
3166             [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
3167             = pr->cp->pid;
3168         }
3169       else
3170         {
3171           if (NULL != prq->sender->last_client_replies
3172               [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
3173             GNUNET_SERVER_client_drop (prq->sender->last_client_replies
3174                                        [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
3175           prq->sender->last_client_replies
3176             [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
3177             = pr->client_request_list->client_list->client;
3178           GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
3179         }
3180     }
3181   prq->eval = GNUNET_BLOCK_evaluate (block_ctx,
3182                                      prq->type,
3183                                      key,
3184                                      &pr->bf,
3185                                      pr->mingle,
3186                                      pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
3187                                      prq->data,
3188                                      prq->size);
3189   switch (prq->eval)
3190     {
3191     case GNUNET_BLOCK_EVALUATION_OK_MORE:
3192       break;
3193     case GNUNET_BLOCK_EVALUATION_OK_LAST:
3194       while (NULL != pr->pending_head)
3195         destroy_pending_message_list_entry (pr->pending_head);
3196       if (pr->qe != NULL)
3197         {
3198           if (pr->client_request_list != NULL)
3199             GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
3200                                         GNUNET_YES);
3201           GNUNET_DATASTORE_cancel (pr->qe);
3202           pr->qe = NULL;
3203         }
3204       pr->do_remove = GNUNET_YES;
3205       if (pr->task != GNUNET_SCHEDULER_NO_TASK)
3206         {
3207           GNUNET_SCHEDULER_cancel (sched,
3208                                    pr->task);
3209           pr->task = GNUNET_SCHEDULER_NO_TASK;
3210         }
3211       GNUNET_break (GNUNET_YES ==
3212                     GNUNET_CONTAINER_multihashmap_remove (query_request_map,
3213                                                           key,
3214                                                           pr));
3215       GNUNET_LOAD_update (rt_entry_lifetime,
3216                           GNUNET_TIME_absolute_get_duration (pr->start_time).value);
3217       break;
3218     case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
3219       GNUNET_STATISTICS_update (stats,
3220                                 gettext_noop ("# duplicate replies discarded (bloomfilter)"),
3221                                 1,
3222                                 GNUNET_NO);
3223 #if DEBUG_FS
3224       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3225                   "Duplicate response `%s', discarding.\n",
3226                   GNUNET_h2s (&mhash));
3227 #endif
3228       return GNUNET_YES; /* duplicate */
3229     case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
3230       return GNUNET_YES; /* wrong namespace */  
3231     case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
3232       GNUNET_break (0);
3233       return GNUNET_YES;
3234     case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
3235       GNUNET_break (0);
3236       return GNUNET_YES;
3237     case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
3238       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3239                   _("Unsupported block type %u\n"),
3240                   prq->type);
3241       return GNUNET_NO;
3242     }
3243   if (pr->client_request_list != NULL)
3244     {
3245       if (pr->replies_seen_size == pr->replies_seen_off)
3246         GNUNET_array_grow (pr->replies_seen,
3247                            pr->replies_seen_size,
3248                            pr->replies_seen_size * 2 + 4);      
3249       GNUNET_CRYPTO_hash (prq->data,
3250                           prq->size,
3251                           &pr->replies_seen[pr->replies_seen_off++]);         
3252       refresh_bloomfilter (pr);
3253     }
3254   if (NULL == prq->sender)
3255     {
3256 #if DEBUG_FS
3257       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3258                   "Found result for query `%s' in local datastore\n",
3259                   GNUNET_h2s (key));
3260 #endif
3261       GNUNET_STATISTICS_update (stats,
3262                                 gettext_noop ("# results found locally"),
3263                                 1,
3264                                 GNUNET_NO);      
3265     }
3266   prq->priority += pr->remaining_priority;
3267   pr->remaining_priority = 0;
3268   pr->results_found++;
3269   prq->request_found = GNUNET_YES;
3270   if (NULL != pr->client_request_list)
3271     {
3272       GNUNET_STATISTICS_update (stats,
3273                                 gettext_noop ("# replies received for local clients"),
3274                                 1,
3275                                 GNUNET_NO);
3276       cl = pr->client_request_list->client_list;
3277       msize = sizeof (struct PutMessage) + prq->size;
3278       creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage));
3279       creply->msize = msize;
3280       creply->client_list = cl;
3281       GNUNET_CONTAINER_DLL_insert_after (cl->res_head,
3282                                          cl->res_tail,
3283                                          cl->res_tail,
3284                                          creply);      
3285       pm = (struct PutMessage*) &creply[1];
3286       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3287       pm->header.size = htons (msize);
3288       pm->type = htonl (prq->type);
3289       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3290       memcpy (&pm[1], prq->data, prq->size);      
3291       if (NULL == cl->th)
3292         {
3293 #if DEBUG_FS
3294           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3295                       "Transmitting result for query `%s' to client\n",
3296                       GNUNET_h2s (key));
3297 #endif  
3298           cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
3299                                                         msize,
3300                                                         GNUNET_TIME_UNIT_FOREVER_REL,
3301                                                         &transmit_to_client,
3302                                                         cl);
3303         }
3304       GNUNET_break (cl->th != NULL);
3305       if (pr->do_remove)                
3306         {
3307           prq->finished = GNUNET_YES;
3308           destroy_pending_request (pr);         
3309         }
3310     }
3311   else
3312     {
3313       cp = pr->cp;
3314 #if DEBUG_FS
3315       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3316                   "Transmitting result for query `%s' to other peer (PID=%u)\n",
3317                   GNUNET_h2s (key),
3318                   (unsigned int) cp->pid);
3319 #endif  
3320       GNUNET_STATISTICS_update (stats,
3321                                 gettext_noop ("# replies received for other peers"),
3322                                 1,
3323                                 GNUNET_NO);
3324       msize = sizeof (struct PutMessage) + prq->size;
3325       reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
3326       reply->cont = &transmit_reply_continuation;
3327       reply->cont_cls = pr;
3328 #if SUPPORT_DELAYS
3329       art_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
3330                                                  GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3331                                                                            TTL_DECREMENT));
3332       reply->delay_until 
3333         = GNUNET_TIME_relative_to_absolute (art_delay);
3334       GNUNET_STATISTICS_update (stats,
3335                                 gettext_noop ("cummulative artificial delay introduced (ms)"),
3336                                 art_delay.value,
3337                                 GNUNET_NO);
3338 #endif
3339       reply->msize = msize;
3340       reply->priority = UINT32_MAX; /* send replies first! */
3341       pm = (struct PutMessage*) &reply[1];
3342       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3343       pm->header.size = htons (msize);
3344       pm->type = htonl (prq->type);
3345       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3346       memcpy (&pm[1], prq->data, prq->size);
3347       add_to_pending_messages_for_peer (cp, reply, pr);
3348     }
3349   return GNUNET_YES;
3350 }
3351
3352
3353 /**
3354  * Iterator called on each result obtained for a DHT
3355  * operation that expects a reply
3356  *
3357  * @param cls closure
3358  * @param exp when will this value expire
3359  * @param key key of the result
3360  * @param get_path NULL-terminated array of pointers
3361  *                 to the peers on reverse GET path (or NULL if not recorded)
3362  * @param put_path NULL-terminated array of pointers
3363  *                 to the peers on the PUT path (or NULL if not recorded)
3364  * @param type type of the result
3365  * @param size number of bytes in data
3366  * @param data pointer to the result data
3367  */
3368 static void
3369 process_dht_reply (void *cls,
3370                    struct GNUNET_TIME_Absolute exp,
3371                    const GNUNET_HashCode * key,
3372                    const struct GNUNET_PeerIdentity * const *get_path,
3373                    const struct GNUNET_PeerIdentity * const *put_path,
3374                    enum GNUNET_BLOCK_Type type,
3375                    size_t size,
3376                    const void *data)
3377 {
3378   struct PendingRequest *pr = cls;
3379   struct ProcessReplyClosure prq;
3380
3381   memset (&prq, 0, sizeof (prq));
3382   prq.data = data;
3383   prq.expiration = exp;
3384   prq.size = size;  
3385   prq.type = type;
3386   process_reply (&prq, key, pr);
3387 }
3388
3389
3390
3391 /**
3392  * Continuation called to notify client about result of the
3393  * operation.
3394  *
3395  * @param cls closure
3396  * @param success GNUNET_SYSERR on failure
3397  * @param msg NULL on success, otherwise an error message
3398  */
3399 static void 
3400 put_migration_continuation (void *cls,
3401                             int success,
3402                             const char *msg)
3403 {
3404   struct GNUNET_TIME_Absolute *start = cls;
3405   struct GNUNET_TIME_Relative delay;
3406   
3407   delay = GNUNET_TIME_absolute_get_duration (*start);
3408   GNUNET_free (start);
3409   GNUNET_LOAD_update (datastore_put_load,
3410                       delay.value);
3411   if (GNUNET_OK == success)
3412     return;
3413   GNUNET_STATISTICS_update (stats,
3414                             gettext_noop ("# datastore 'put' failures"),
3415                             1,
3416                             GNUNET_NO);
3417 }
3418
3419
3420 /**
3421  * Handle P2P "PUT" message.
3422  *
3423  * @param cls closure, always NULL
3424  * @param other the other peer involved (sender or receiver, NULL
3425  *        for loopback messages where we are both sender and receiver)
3426  * @param message the actual message
3427  * @param latency reported latency of the connection with 'other'
3428  * @param distance reported distance (DV) to 'other' 
3429  * @return GNUNET_OK to keep the connection open,
3430  *         GNUNET_SYSERR to close it (signal serious error)
3431  */
3432 static int
3433 handle_p2p_put (void *cls,
3434                 const struct GNUNET_PeerIdentity *other,
3435                 const struct GNUNET_MessageHeader *message,
3436                 struct GNUNET_TIME_Relative latency,
3437                 uint32_t distance)
3438 {
3439   const struct PutMessage *put;
3440   uint16_t msize;
3441   size_t dsize;
3442   enum GNUNET_BLOCK_Type type;
3443   struct GNUNET_TIME_Absolute expiration;
3444   GNUNET_HashCode query;
3445   struct ProcessReplyClosure prq;
3446   struct GNUNET_TIME_Absolute *start;
3447   struct GNUNET_TIME_Relative block_time;  
3448   double putl;
3449   struct ConnectedPeer *cp; 
3450   struct PendingMessage *pm;
3451   struct MigrationStopMessage *msm;
3452
3453   msize = ntohs (message->size);
3454   if (msize < sizeof (struct PutMessage))
3455     {
3456       GNUNET_break_op(0);
3457       return GNUNET_SYSERR;
3458     }
3459   put = (const struct PutMessage*) message;
3460   dsize = msize - sizeof (struct PutMessage);
3461   type = ntohl (put->type);
3462   expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
3463
3464   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3465     return GNUNET_SYSERR;
3466   if (GNUNET_OK !=
3467       GNUNET_BLOCK_get_key (block_ctx,
3468                             type,
3469                             &put[1],
3470                             dsize,
3471                             &query))
3472     {
3473       GNUNET_break_op (0);
3474       return GNUNET_SYSERR;
3475     }
3476 #if DEBUG_FS
3477   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3478               "Received result for query `%s' from peer `%4s'\n",
3479               GNUNET_h2s (&query),
3480               GNUNET_i2s (other));
3481 #endif
3482   GNUNET_STATISTICS_update (stats,
3483                             gettext_noop ("# replies received (overall)"),
3484                             1,
3485                             GNUNET_NO);
3486   /* now, lookup 'query' */
3487   prq.data = (const void*) &put[1];
3488   if (other != NULL)
3489     prq.sender = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3490                                                     &other->hashPubKey);
3491   else
3492     prq.sender = NULL;
3493   prq.size = dsize;
3494   prq.type = type;
3495   prq.expiration = expiration;
3496   prq.priority = 0;
3497   prq.finished = GNUNET_NO;
3498   prq.request_found = GNUNET_NO;
3499   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3500                                               &query,
3501                                               &process_reply,
3502                                               &prq);
3503   if (prq.sender != NULL)
3504     {
3505       prq.sender->inc_preference += CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority;
3506       prq.sender->trust += prq.priority;
3507     }
3508   if ( (GNUNET_YES == active_migration) &&
3509        (GNUNET_NO == test_put_load_too_high (prq.priority)) )
3510     {      
3511 #if DEBUG_FS
3512       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3513                   "Replicating result for query `%s' with priority %u\n",
3514                   GNUNET_h2s (&query),
3515                   prq.priority);
3516 #endif
3517       start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
3518       *start = GNUNET_TIME_absolute_get ();
3519       GNUNET_DATASTORE_put (dsh,
3520                             0, &query, dsize, &put[1],
3521                             type, prq.priority, 1 /* anonymity */, 
3522                             expiration, 
3523                             1 + prq.priority, MAX_DATASTORE_QUEUE,
3524                             GNUNET_CONSTANTS_SERVICE_TIMEOUT,
3525                             &put_migration_continuation, 
3526                             start);
3527     }
3528   putl = GNUNET_LOAD_get_load (datastore_put_load);
3529   if ( (GNUNET_NO == prq.request_found) &&
3530        ( (GNUNET_YES != active_migration) ||
3531          (putl > 2.5 * (1 + prq.priority)) ) )
3532     {
3533       cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3534                                               &other->hashPubKey);
3535       if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).value < 5000)
3536         return GNUNET_OK; /* already blocked */
3537       /* We're too busy; send MigrationStop message! */
3538       if (GNUNET_YES != active_migration) 
3539         putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
3540       block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
3541                                                   5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3542                                                                                    (unsigned int) (60000 * putl * putl)));
3543       
3544       cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
3545       pm = GNUNET_malloc (sizeof (struct PendingMessage) + 
3546                           sizeof (struct MigrationStopMessage));
3547       pm->msize = sizeof (struct MigrationStopMessage);
3548       pm->priority = UINT32_MAX;
3549       msm = (struct MigrationStopMessage*) &pm[1];
3550       msm->header.size = htons (sizeof (struct MigrationStopMessage));
3551       msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
3552       msm->duration = GNUNET_TIME_relative_hton (block_time);
3553       add_to_pending_messages_for_peer (cp,
3554                                         pm,
3555                                         NULL);
3556     }
3557   return GNUNET_OK;
3558 }
3559
3560
3561 /**
3562  * Handle P2P "MIGRATION_STOP" message.
3563  *
3564  * @param cls closure, always NULL
3565  * @param other the other peer involved (sender or receiver, NULL
3566  *        for loopback messages where we are both sender and receiver)
3567  * @param message the actual message
3568  * @param latency reported latency of the connection with 'other'
3569  * @param distance reported distance (DV) to 'other' 
3570  * @return GNUNET_OK to keep the connection open,
3571  *         GNUNET_SYSERR to close it (signal serious error)
3572  */
3573 static int
3574 handle_p2p_migration_stop (void *cls,
3575                            const struct GNUNET_PeerIdentity *other,
3576                            const struct GNUNET_MessageHeader *message,
3577                            struct GNUNET_TIME_Relative latency,
3578                            uint32_t distance)
3579 {
3580   struct ConnectedPeer *cp; 
3581   const struct MigrationStopMessage *msm;
3582
3583   msm = (const struct MigrationStopMessage*) message;
3584   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3585                                           &other->hashPubKey);
3586   if (cp == NULL)
3587     {
3588       GNUNET_break (0);
3589       return GNUNET_OK;
3590     }
3591   cp->migration_blocked = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration));
3592   return GNUNET_OK;
3593 }
3594
3595
3596
3597 /* **************************** P2P GET Handling ************************ */
3598
3599
3600 /**
3601  * Closure for 'check_duplicate_request_{peer,client}'.
3602  */
3603 struct CheckDuplicateRequestClosure
3604 {
3605   /**
3606    * The new request we should check if it already exists.
3607    */
3608   const struct PendingRequest *pr;
3609
3610   /**
3611    * Existing request found by the checker, NULL if none.
3612    */
3613   struct PendingRequest *have;
3614 };
3615
3616
3617 /**
3618  * Iterator over entries in the 'query_request_map' that
3619  * tries to see if we have the same request pending from
3620  * the same client already.
3621  *
3622  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
3623  * @param key current key code (query, ignored, must match)
3624  * @param value value in the hash map (a 'struct PendingRequest' 
3625  *              that already exists)
3626  * @return GNUNET_YES if we should continue to
3627  *         iterate (no match yet)
3628  *         GNUNET_NO if not (match found).
3629  */
3630 static int
3631 check_duplicate_request_client (void *cls,
3632                                 const GNUNET_HashCode * key,
3633                                 void *value)
3634 {
3635   struct CheckDuplicateRequestClosure *cdc = cls;
3636   struct PendingRequest *have = value;
3637
3638   if (have->client_request_list == NULL)
3639     return GNUNET_YES;
3640   if ( (cdc->pr->client_request_list->client_list->client == have->client_request_list->client_list->client) &&
3641        (cdc->pr != have) )
3642     {
3643       cdc->have = have;
3644       return GNUNET_NO;
3645     }
3646   return GNUNET_YES;
3647 }
3648
3649
3650 /**
3651  * We're processing (local) results for a search request
3652  * from another peer.  Pass applicable results to the
3653  * peer and if we are done either clean up (operation
3654  * complete) or forward to other peers (more results possible).
3655  *
3656  * @param cls our closure (struct LocalGetContext)
3657  * @param key key for the content
3658  * @param size number of bytes in data
3659  * @param data content stored
3660  * @param type type of the content
3661  * @param priority priority of the content
3662  * @param anonymity anonymity-level for the content
3663  * @param expiration expiration time for the content
3664  * @param uid unique identifier for the datum;
3665  *        maybe 0 if no unique identifier is available
3666  */
3667 static void
3668 process_local_reply (void *cls,
3669                      const GNUNET_HashCode * key,
3670                      size_t size,
3671                      const void *data,
3672                      enum GNUNET_BLOCK_Type type,
3673                      uint32_t priority,
3674                      uint32_t anonymity,
3675                      struct GNUNET_TIME_Absolute
3676                      expiration, 
3677                      uint64_t uid)
3678 {
3679   struct PendingRequest *pr = cls;
3680   struct ProcessReplyClosure prq;
3681   struct CheckDuplicateRequestClosure cdrc;
3682   GNUNET_HashCode query;
3683   unsigned int old_rf;
3684   
3685   if (NULL == key)
3686     {
3687 #if DEBUG_FS > 1
3688       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3689                   "Done processing local replies, forwarding request to other peers.\n");
3690 #endif
3691       pr->qe = NULL;
3692       if (pr->client_request_list != NULL)
3693         {
3694           GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
3695                                       GNUNET_YES);
3696           /* Figure out if this is a duplicate request and possibly
3697              merge 'struct PendingRequest' entries */
3698           cdrc.have = NULL;
3699           cdrc.pr = pr;
3700           GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3701                                                       &pr->query,
3702                                                       &check_duplicate_request_client,
3703                                                       &cdrc);
3704           if (cdrc.have != NULL)
3705             {
3706 #if DEBUG_FS
3707               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3708                           "Received request for block `%s' twice from client, will only request once.\n",
3709                           GNUNET_h2s (&pr->query));
3710 #endif
3711               
3712               destroy_pending_request (pr);
3713               return;
3714             }
3715         }
3716       if (pr->local_only == GNUNET_YES)
3717         {
3718           destroy_pending_request (pr);
3719           return;
3720         }
3721       /* no more results */
3722       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
3723         pr->task = GNUNET_SCHEDULER_add_now (sched,
3724                                              &forward_request_task,
3725                                              pr);      
3726       return;
3727     }
3728 #if DEBUG_FS
3729   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3730               "New local response to `%s' of type %u.\n",
3731               GNUNET_h2s (key),
3732               type);
3733 #endif
3734   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3735     {
3736 #if DEBUG_FS
3737       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3738                   "Found ONDEMAND block, performing on-demand encoding\n");
3739 #endif
3740       GNUNET_STATISTICS_update (stats,
3741                                 gettext_noop ("# on-demand blocks matched requests"),
3742                                 1,
3743                                 GNUNET_NO);
3744       if (GNUNET_OK != 
3745           GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, 
3746                                             anonymity, expiration, uid, 
3747                                             &process_local_reply,
3748                                             pr))
3749       if (pr->qe != NULL)
3750         {
3751           GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3752         }
3753       return;
3754     }
3755   old_rf = pr->results_found;
3756   memset (&prq, 0, sizeof (prq));
3757   prq.data = data;
3758   prq.expiration = expiration;
3759   prq.size = size;  
3760   if (GNUNET_OK != 
3761       GNUNET_BLOCK_get_key (block_ctx,
3762                             type,
3763                             data,
3764                             size,
3765                             &query))
3766     {
3767       GNUNET_break (0);
3768       GNUNET_DATASTORE_remove (dsh,
3769                                key,
3770                                size, data,
3771                                -1, -1, 
3772                                GNUNET_TIME_UNIT_FOREVER_REL,
3773                                NULL, NULL);
3774       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3775       return;
3776     }
3777   prq.type = type;
3778   prq.priority = priority;  
3779   prq.finished = GNUNET_NO;
3780   prq.request_found = GNUNET_NO;
3781   if ( (old_rf == 0) &&
3782        (pr->results_found == 0) )
3783     update_datastore_delays (pr->start_time);
3784   process_reply (&prq, key, pr);
3785   if (prq.finished == GNUNET_YES)
3786     return;
3787   if (pr->qe == NULL)
3788     return; /* done here */
3789   if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
3790     {
3791       pr->local_only = GNUNET_YES; /* do not forward */
3792       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3793       return;
3794     }
3795   if ( (pr->client_request_list == NULL) &&
3796        ( (GNUNET_YES == test_get_load_too_high (0)) ||
3797          (pr->results_found > 5 + 2 * pr->priority) ) )
3798     {
3799 #if DEBUG_FS > 2
3800       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3801                   "Load too high, done with request\n");
3802 #endif
3803       GNUNET_STATISTICS_update (stats,
3804                                 gettext_noop ("# processing result set cut short due to load"),
3805                                 1,
3806                                 GNUNET_NO);
3807       /* FIXME: if this is activated, we might stall large downloads
3808          indefinitely since (presumably) the load can never go down again! */
3809 #if ENABLE_LOAD_MGMT
3810       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3811       return;
3812 #endif
3813     }
3814   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3815 }
3816
3817
3818 /**
3819  * We've received a request with the specified priority.  Bound it
3820  * according to how much we trust the given peer.
3821  * 
3822  * @param prio_in requested priority
3823  * @param cp the peer making the request
3824  * @return effective priority
3825  */
3826 static int32_t
3827 bound_priority (uint32_t prio_in,
3828                 struct ConnectedPeer *cp)
3829 {
3830 #define N ((double)128.0)
3831   uint32_t ret;
3832   double rret;
3833   int ld;
3834
3835   ld = test_get_load_too_high (0);
3836   if (ld == GNUNET_SYSERR)
3837     {
3838       GNUNET_STATISTICS_update (stats,
3839                                 gettext_noop ("# requests done for free (low load)"),
3840                                 1,
3841                                 GNUNET_NO);
3842       return 0; /* excess resources */
3843     }
3844   ret = change_host_trust (cp, prio_in);
3845   if (ret > 0)
3846     {
3847       if (ret > current_priorities + N)
3848         rret = current_priorities + N;
3849       else
3850         rret = ret;
3851       current_priorities 
3852         = (current_priorities * (N-1) + rret)/N;
3853     }
3854   if ( (ld == GNUNET_YES) && (ret > 0) )
3855     {
3856       /* try with charging */
3857       ld = test_get_load_too_high (ret);
3858     }
3859   if (ld == GNUNET_YES)
3860     {
3861       GNUNET_STATISTICS_update (stats,
3862                                 gettext_noop ("# request dropped, priority insufficient"),
3863                                 1,
3864                                 GNUNET_NO);
3865       /* undo charge */
3866       if (ret != 0)
3867         change_host_trust (cp, -ret);
3868       return -1; /* not enough resources */
3869     }
3870   else
3871     {
3872       GNUNET_STATISTICS_update (stats,
3873                                 gettext_noop ("# requests done for a price (normal load)"),
3874                                 1,
3875                                 GNUNET_NO);
3876     }
3877 #undef N
3878   return ret;
3879 }
3880
3881
3882 /**
3883  * Iterator over entries in the 'query_request_map' that
3884  * tries to see if we have the same request pending from
3885  * the same peer already.
3886  *
3887  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
3888  * @param key current key code (query, ignored, must match)
3889  * @param value value in the hash map (a 'struct PendingRequest' 
3890  *              that already exists)
3891  * @return GNUNET_YES if we should continue to
3892  *         iterate (no match yet)
3893  *         GNUNET_NO if not (match found).
3894  */
3895 static int
3896 check_duplicate_request_peer (void *cls,
3897                               const GNUNET_HashCode * key,
3898                               void *value)
3899 {
3900   struct CheckDuplicateRequestClosure *cdc = cls;
3901   struct PendingRequest *have = value;
3902
3903   if (cdc->pr->target_pid == have->target_pid)
3904     {
3905       cdc->have = have;
3906       return GNUNET_NO;
3907     }
3908   return GNUNET_YES;
3909 }
3910
3911
3912 /**
3913  * Handle P2P "GET" request.
3914  *
3915  * @param cls closure, always NULL
3916  * @param other the other peer involved (sender or receiver, NULL
3917  *        for loopback messages where we are both sender and receiver)
3918  * @param message the actual message
3919  * @param latency reported latency of the connection with 'other'
3920  * @param distance reported distance (DV) to 'other' 
3921  * @return GNUNET_OK to keep the connection open,
3922  *         GNUNET_SYSERR to close it (signal serious error)
3923  */
3924 static int
3925 handle_p2p_get (void *cls,
3926                 const struct GNUNET_PeerIdentity *other,
3927                 const struct GNUNET_MessageHeader *message,
3928                 struct GNUNET_TIME_Relative latency,
3929                 uint32_t distance)
3930 {
3931   struct PendingRequest *pr;
3932   struct ConnectedPeer *cp;
3933   struct ConnectedPeer *cps;
3934   struct CheckDuplicateRequestClosure cdc;
3935   struct GNUNET_TIME_Relative timeout;
3936   uint16_t msize;
3937   const struct GetMessage *gm;
3938   unsigned int bits;
3939   const GNUNET_HashCode *opt;
3940   uint32_t bm;
3941   size_t bfsize;
3942   uint32_t ttl_decrement;
3943   int32_t priority;
3944   enum GNUNET_BLOCK_Type type;
3945   int have_ns;
3946
3947   msize = ntohs(message->size);
3948   if (msize < sizeof (struct GetMessage))
3949     {
3950       GNUNET_break_op (0);
3951       return GNUNET_SYSERR;
3952     }
3953   gm = (const struct GetMessage*) message;
3954 #if DEBUG_FS
3955   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3956               "Received request for `%s'\n",
3957               GNUNET_h2s (&gm->query));
3958 #endif
3959   type = ntohl (gm->type);
3960   bm = ntohl (gm->hash_bitmap);
3961   bits = 0;
3962   while (bm > 0)
3963     {
3964       if (1 == (bm & 1))
3965         bits++;
3966       bm >>= 1;
3967     }
3968   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
3969     {
3970       GNUNET_break_op (0);
3971       return GNUNET_SYSERR;
3972     }  
3973   opt = (const GNUNET_HashCode*) &gm[1];
3974   bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
3975   bm = ntohl (gm->hash_bitmap);
3976   bits = 0;
3977   cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3978                                            &other->hashPubKey);
3979   if (NULL == cps)
3980     {
3981       /* peer must have just disconnected */
3982       GNUNET_STATISTICS_update (stats,
3983                                 gettext_noop ("# requests dropped due to initiator not being connected"),
3984                                 1,
3985                                 GNUNET_NO);
3986       return GNUNET_SYSERR;
3987     }
3988   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
3989     cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3990                                             &opt[bits++]);
3991   else
3992     cp = cps;
3993   if (cp == NULL)
3994     {
3995 #if DEBUG_FS
3996       if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
3997         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3998                     "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
3999                     GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
4000       
4001       else
4002         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4003                     "Failed to find peer `%4s' in connection set. Dropping query.\n",
4004                     GNUNET_i2s (other));
4005 #endif
4006       GNUNET_STATISTICS_update (stats,
4007                                 gettext_noop ("# requests dropped due to missing reverse route"),
4008                                 1,
4009                                 GNUNET_NO);
4010      /* FIXME: try connect? */
4011       return GNUNET_OK;
4012     }
4013   /* note that we can really only check load here since otherwise
4014      peers could find out that we are overloaded by not being
4015      disconnected after sending us a malformed query... */
4016   priority = bound_priority (ntohl (gm->priority), cps);
4017   if (priority < 0)
4018     {
4019 #if DEBUG_FS
4020       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4021                   "Dropping query from `%s', this peer is too busy.\n",
4022                   GNUNET_i2s (other));
4023 #endif
4024       GNUNET_STATISTICS_update (stats,
4025                                 gettext_noop ("# requests dropped due to high load"),
4026                                 1,
4027                                 GNUNET_NO);
4028 #if ENABLE_LOAD_MGMT
4029       /* FIXME: this causes problems... */
4030       return GNUNET_OK;
4031 #endif
4032     }
4033 #if DEBUG_FS 
4034   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4035               "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
4036               GNUNET_h2s (&gm->query),
4037               (unsigned int) type,
4038               GNUNET_i2s (other),
4039               (unsigned int) bm);
4040 #endif
4041   have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
4042   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
4043                       (have_ns ? sizeof(GNUNET_HashCode) : 0));
4044   if (have_ns)
4045     {
4046       pr->namespace = (GNUNET_HashCode*) &pr[1];
4047       memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
4048     }
4049   if ( (GNUNET_LOAD_get_load (cp->transmission_delay) > 3 * (1 + priority)) ||
4050        (GNUNET_LOAD_get_average (cp->transmission_delay) > 
4051         GNUNET_CONSTANTS_MAX_CORK_DELAY.value * 2 + GNUNET_LOAD_get_average (rt_entry_lifetime)) )
4052     {
4053       /* don't have BW to send to peer, or would likely take longer than we have for it,
4054          so at best indirect the query */
4055       priority = 0;
4056 #if ENABLE_LOAD_MGMT
4057       /* FIXME: if this line is enabled, the 'perf' test for larger files simply "hangs";
4058          the cause seems to be that the load goes up (to the point where we do this)
4059          and then never goes down again... (outch) */
4060       pr->forward_only = GNUNET_YES;
4061 #endif
4062     }
4063   pr->type = type;
4064   pr->mingle = ntohl (gm->filter_mutator);
4065   if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
4066     pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
4067   pr->anonymity_level = 1;
4068   pr->priority = (uint32_t) priority;
4069   pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
4070   pr->query = gm->query;
4071   /* decrement ttl (always) */
4072   ttl_decrement = 2 * TTL_DECREMENT +
4073     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
4074                               TTL_DECREMENT);
4075   if ( (pr->ttl < 0) &&
4076        (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
4077     {
4078 #if DEBUG_FS
4079       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4080                   "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
4081                   GNUNET_i2s (other),
4082                   pr->ttl,
4083                   ttl_decrement);
4084 #endif
4085       GNUNET_STATISTICS_update (stats,
4086                                 gettext_noop ("# requests dropped due TTL underflow"),
4087                                 1,
4088                                 GNUNET_NO);
4089       /* integer underflow => drop (should be very rare)! */      
4090       GNUNET_free (pr);
4091       return GNUNET_OK;
4092     } 
4093   pr->ttl -= ttl_decrement;
4094   pr->start_time = GNUNET_TIME_absolute_get ();
4095
4096   /* get bloom filter */
4097   if (bfsize > 0)
4098     {
4099       pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
4100                                                   bfsize,
4101                                                   BLOOMFILTER_K);
4102       pr->bf_size = bfsize;
4103     }
4104   cdc.have = NULL;
4105   cdc.pr = pr;
4106   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
4107                                               &gm->query,
4108                                               &check_duplicate_request_peer,
4109                                               &cdc);
4110   if (cdc.have != NULL)
4111     {
4112       if (cdc.have->start_time.value + cdc.have->ttl >=
4113           pr->start_time.value + pr->ttl)
4114         {
4115           /* existing request has higher TTL, drop new one! */
4116           cdc.have->priority += pr->priority;
4117           destroy_pending_request (pr);
4118 #if DEBUG_FS
4119           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4120                       "Have existing request with higher TTL, dropping new request.\n",
4121                       GNUNET_i2s (other));
4122 #endif
4123           GNUNET_STATISTICS_update (stats,
4124                                     gettext_noop ("# requests dropped due to higher-TTL request"),
4125                                     1,
4126                                     GNUNET_NO);
4127           return GNUNET_OK;
4128         }
4129       else
4130         {
4131           /* existing request has lower TTL, drop old one! */
4132           pr->priority += cdc.have->priority;
4133           /* Possible optimization: if we have applicable pending
4134              replies in 'cdc.have', we might want to move those over
4135              (this is a really rare special-case, so it is not clear
4136              that this would be worth it) */
4137           destroy_pending_request (cdc.have);
4138           /* keep processing 'pr'! */
4139         }
4140     }
4141
4142   pr->cp = cp;
4143   GNUNET_break (GNUNET_OK ==
4144                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
4145                                                    &gm->query,
4146                                                    pr,
4147                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4148   GNUNET_break (GNUNET_OK ==
4149                 GNUNET_CONTAINER_multihashmap_put (peer_request_map,
4150                                                    &other->hashPubKey,
4151                                                    pr,
4152                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4153   
4154   pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
4155                                             pr,
4156                                             pr->start_time.value + pr->ttl);
4157
4158   GNUNET_STATISTICS_update (stats,
4159                             gettext_noop ("# P2P searches received"),
4160                             1,
4161                             GNUNET_NO);
4162   GNUNET_STATISTICS_update (stats,
4163                             gettext_noop ("# P2P searches active"),
4164                             1,
4165                             GNUNET_NO);
4166
4167   /* calculate change in traffic preference */
4168   cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
4169   /* process locally */
4170   if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
4171     type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
4172   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
4173                                            (pr->priority + 1)); 
4174   if (GNUNET_YES != pr->forward_only)
4175     {
4176 #if DEBUG_FS
4177       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4178                   "Handing request for `%s' to datastore\n",
4179                   GNUNET_h2s (&gm->query));
4180 #endif
4181       pr->qe = GNUNET_DATASTORE_get (dsh,
4182                                      &gm->query,
4183                                      type,                             
4184                                      pr->priority + 1,
4185                                      MAX_DATASTORE_QUEUE,                                
4186                                      timeout,
4187                                      &process_local_reply,
4188                                      pr);
4189       if (NULL == pr->qe)
4190         {
4191           GNUNET_STATISTICS_update (stats,
4192                                     gettext_noop ("# requests dropped by datastore (queue length limit)"),
4193                                     1,
4194                                     GNUNET_NO);
4195         }
4196     }
4197   else
4198     {
4199       GNUNET_STATISTICS_update (stats,
4200                                 gettext_noop ("# requests forwarded due to high load"),
4201                                 1,
4202                                 GNUNET_NO);
4203     }
4204
4205   /* Are multiple results possible (and did we look locally)?  If so, start processing remotely now! */
4206   switch (pr->type)
4207     {
4208     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
4209     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
4210       /* only one result, wait for datastore */
4211       if (GNUNET_YES != pr->forward_only)
4212         {
4213           GNUNET_STATISTICS_update (stats,
4214                                     gettext_noop ("# requests not instantly forwarded (waiting for datastore)"),
4215                                     1,
4216                                     GNUNET_NO);
4217           break;
4218         }
4219     default:
4220       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
4221         pr->task = GNUNET_SCHEDULER_add_now (sched,
4222                                              &forward_request_task,
4223                                              pr);
4224     }
4225
4226   /* make sure we don't track too many requests */
4227   if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
4228     {
4229       pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
4230       GNUNET_assert (pr != NULL);
4231       destroy_pending_request (pr);
4232     }
4233   return GNUNET_OK;
4234 }
4235
4236
4237 /* **************************** CS GET Handling ************************ */
4238
4239
4240 /**
4241  * Handle START_SEARCH-message (search request from client).
4242  *
4243  * @param cls closure
4244  * @param client identification of the client
4245  * @param message the actual message
4246  */
4247 static void
4248 handle_start_search (void *cls,
4249                      struct GNUNET_SERVER_Client *client,
4250                      const struct GNUNET_MessageHeader *message)
4251 {
4252   static GNUNET_HashCode all_zeros;
4253   const struct SearchMessage *sm;
4254   struct ClientList *cl;
4255   struct ClientRequestList *crl;
4256   struct PendingRequest *pr;
4257   uint16_t msize;
4258   unsigned int sc;
4259   enum GNUNET_BLOCK_Type type;
4260
4261   msize = ntohs (message->size);
4262   if ( (msize < sizeof (struct SearchMessage)) ||
4263        (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
4264     {
4265       GNUNET_break (0);
4266       GNUNET_SERVER_receive_done (client,
4267                                   GNUNET_SYSERR);
4268       return;
4269     }
4270   GNUNET_STATISTICS_update (stats,
4271                             gettext_noop ("# client searches received"),
4272                             1,
4273                             GNUNET_NO);
4274   sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
4275   sm = (const struct SearchMessage*) message;
4276   type = ntohl (sm->type);
4277 #if DEBUG_FS
4278   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4279               "Received request for `%s' of type %u from local client\n",
4280               GNUNET_h2s (&sm->query),
4281               (unsigned int) type);
4282 #endif
4283   cl = client_list;
4284   while ( (cl != NULL) &&
4285           (cl->client != client) )
4286     cl = cl->next;
4287   if (cl == NULL)
4288     {
4289       cl = GNUNET_malloc (sizeof (struct ClientList));
4290       cl->client = client;
4291       GNUNET_SERVER_client_keep (client);
4292       cl->next = client_list;
4293       client_list = cl;
4294     }
4295   /* detect duplicate KBLOCK requests */
4296   if ( (type == GNUNET_BLOCK_TYPE_FS_KBLOCK) ||
4297        (type == GNUNET_BLOCK_TYPE_FS_NBLOCK) ||
4298        (type == GNUNET_BLOCK_TYPE_ANY) )
4299     {
4300       crl = cl->rl_head;
4301       while ( (crl != NULL) &&
4302               ( (0 != memcmp (&crl->req->query,
4303                               &sm->query,
4304                               sizeof (GNUNET_HashCode))) ||
4305                 (crl->req->type != type) ) )
4306         crl = crl->next;
4307       if (crl != NULL)  
4308         { 
4309 #if DEBUG_FS
4310           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4311                       "Have existing request, merging content-seen lists.\n");
4312 #endif
4313           pr = crl->req;
4314           /* Duplicate request (used to send long list of
4315              known/blocked results); merge 'pr->replies_seen'
4316              and update bloom filter */
4317           GNUNET_array_grow (pr->replies_seen,
4318                              pr->replies_seen_size,
4319                              pr->replies_seen_off + sc);
4320           memcpy (&pr->replies_seen[pr->replies_seen_off],
4321                   &sm[1],
4322                   sc * sizeof (GNUNET_HashCode));
4323           pr->replies_seen_off += sc;
4324           refresh_bloomfilter (pr);
4325           GNUNET_STATISTICS_update (stats,
4326                                     gettext_noop ("# client searches updated (merged content seen list)"),
4327                                     1,
4328                                     GNUNET_NO);
4329           GNUNET_SERVER_receive_done (client,
4330                                       GNUNET_OK);
4331           return;
4332         }
4333     }
4334   GNUNET_STATISTICS_update (stats,
4335                             gettext_noop ("# client searches active"),
4336                             1,
4337                             GNUNET_NO);
4338   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
4339                       ((type == GNUNET_BLOCK_TYPE_FS_SBLOCK) ? sizeof(GNUNET_HashCode) : 0));
4340   crl = GNUNET_malloc (sizeof (struct ClientRequestList));
4341   memset (crl, 0, sizeof (struct ClientRequestList));
4342   crl->client_list = cl;
4343   GNUNET_CONTAINER_DLL_insert (cl->rl_head,
4344                                cl->rl_tail,
4345                                crl);  
4346   crl->req = pr;
4347   pr->type = type;
4348   pr->client_request_list = crl;
4349   GNUNET_array_grow (pr->replies_seen,
4350                      pr->replies_seen_size,
4351                      sc);
4352   memcpy (pr->replies_seen,
4353           &sm[1],
4354           sc * sizeof (GNUNET_HashCode));
4355   pr->replies_seen_off = sc;
4356   pr->anonymity_level = ntohl (sm->anonymity_level); 
4357   pr->start_time = GNUNET_TIME_absolute_get ();
4358   refresh_bloomfilter (pr);
4359   pr->query = sm->query;
4360   if (0 == (1 & ntohl (sm->options)))
4361     pr->local_only = GNUNET_NO;
4362   else
4363     pr->local_only = GNUNET_YES;
4364   switch (type)
4365     {
4366     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
4367     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
4368       if (0 != memcmp (&sm->target,
4369                        &all_zeros,
4370                        sizeof (GNUNET_HashCode)))
4371         pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target);
4372       break;
4373     case GNUNET_BLOCK_TYPE_FS_SBLOCK:
4374       pr->namespace = (GNUNET_HashCode*) &pr[1];
4375       memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode));
4376       break;
4377     default:
4378       break;
4379     }
4380   GNUNET_break (GNUNET_OK ==
4381                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
4382                                                    &sm->query,
4383                                                    pr,
4384                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4385   if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
4386     type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */
4387   pr->qe = GNUNET_DATASTORE_get (dsh,
4388                                  &sm->query,
4389                                  type,
4390                                  -3, -1,
4391                                  GNUNET_CONSTANTS_SERVICE_TIMEOUT,                             
4392                                  &process_local_reply,
4393                                  pr);
4394 }
4395
4396
4397 /* **************************** Startup ************************ */
4398
4399 /**
4400  * Process fs requests.
4401  *
4402  * @param s scheduler to use
4403  * @param server the initialized server
4404  * @param c configuration to use
4405  */
4406 static int
4407 main_init (struct GNUNET_SCHEDULER_Handle *s,
4408            struct GNUNET_SERVER_Handle *server,
4409            const struct GNUNET_CONFIGURATION_Handle *c)
4410 {
4411   static const struct GNUNET_CORE_MessageHandler p2p_handlers[] =
4412     {
4413       { &handle_p2p_get, 
4414         GNUNET_MESSAGE_TYPE_FS_GET, 0 },
4415       { &handle_p2p_put, 
4416         GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
4417       { &handle_p2p_migration_stop, 
4418         GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP,
4419         sizeof (struct MigrationStopMessage) },
4420       { NULL, 0, 0 }
4421     };
4422   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
4423     {&GNUNET_FS_handle_index_start, NULL, 
4424      GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
4425     {&GNUNET_FS_handle_index_list_get, NULL, 
4426      GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
4427     {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
4428      sizeof (struct UnindexMessage) },
4429     {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
4430      0 },
4431     {NULL, NULL, 0, 0}
4432   };
4433   unsigned long long enc = 128;
4434
4435   sched = s;
4436   cfg = c;
4437   stats = GNUNET_STATISTICS_create (sched, "fs", cfg);
4438   min_migration_delay = GNUNET_TIME_UNIT_SECONDS;
4439   if ( (GNUNET_OK !=
4440         GNUNET_CONFIGURATION_get_value_number (cfg,
4441                                                "fs",
4442                                                "MAX_PENDING_REQUESTS",
4443                                                &max_pending_requests)) ||
4444        (GNUNET_OK !=
4445         GNUNET_CONFIGURATION_get_value_number (cfg,
4446                                                "fs",
4447                                                "EXPECTED_NEIGHBOUR_COUNT",
4448                                                &enc)) ||
4449        (GNUNET_OK != 
4450         GNUNET_CONFIGURATION_get_value_time (cfg,
4451                                              "fs",
4452                                              "MIN_MIGRATION_DELAY",
4453                                              &min_migration_delay)) )
4454     {
4455       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4456                   _("Configuration fails to specify certain parameters, assuming default values."));
4457     }
4458   connected_peers = GNUNET_CONTAINER_multihashmap_create (enc); 
4459   query_request_map = GNUNET_CONTAINER_multihashmap_create (max_pending_requests);
4460   rt_entry_lifetime = GNUNET_LOAD_value_init (GNUNET_TIME_UNIT_SECONDS);
4461   peer_request_map = GNUNET_CONTAINER_multihashmap_create (enc);
4462   requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
4463   core = GNUNET_CORE_connect (sched,
4464                               cfg,
4465                               GNUNET_TIME_UNIT_FOREVER_REL,
4466                               NULL,
4467                               NULL,
4468                               &peer_connect_handler,
4469                               &peer_disconnect_handler,
4470                               NULL,
4471                               NULL, GNUNET_NO,
4472                               NULL, GNUNET_NO,
4473                               p2p_handlers);
4474   if (NULL == core)
4475     {
4476       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
4477                   _("Failed to connect to `%s' service.\n"),
4478                   "core");
4479       GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
4480       connected_peers = NULL;
4481       GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
4482       query_request_map = NULL;
4483       GNUNET_LOAD_value_free (rt_entry_lifetime);
4484       rt_entry_lifetime = NULL;
4485       GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
4486       requests_by_expiration_heap = NULL;
4487       GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
4488       peer_request_map = NULL;
4489       if (dsh != NULL)
4490         {
4491           GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4492           dsh = NULL;
4493         }
4494       return GNUNET_SYSERR;
4495     }
4496   /* FIXME: distinguish between sending and storing in options? */
4497   if (active_migration) 
4498     {
4499       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4500                   _("Content migration is enabled, will start to gather data\n"));
4501       consider_migration_gathering ();
4502     }
4503   consider_dht_put_gathering (NULL);
4504   GNUNET_SERVER_disconnect_notify (server, 
4505                                    &handle_client_disconnect,
4506                                    NULL);
4507   GNUNET_assert (GNUNET_OK ==
4508                  GNUNET_CONFIGURATION_get_value_filename (cfg,
4509                                                           "fs",
4510                                                           "TRUST",
4511                                                           &trustDirectory));
4512   GNUNET_DISK_directory_create (trustDirectory);
4513   GNUNET_SCHEDULER_add_with_priority (sched,
4514                                       GNUNET_SCHEDULER_PRIORITY_HIGH,
4515                                       &cron_flush_trust, NULL);
4516
4517
4518   GNUNET_SERVER_add_handlers (server, handlers);
4519   GNUNET_SCHEDULER_add_delayed (sched,
4520                                 GNUNET_TIME_UNIT_FOREVER_REL,
4521                                 &shutdown_task,
4522                                 NULL);
4523   return GNUNET_OK;
4524 }
4525
4526
4527 /**
4528  * Process fs requests.
4529  *
4530  * @param cls closure
4531  * @param sched scheduler to use
4532  * @param server the initialized server
4533  * @param cfg configuration to use
4534  */
4535 static void
4536 run (void *cls,
4537      struct GNUNET_SCHEDULER_Handle *sched,
4538      struct GNUNET_SERVER_Handle *server,
4539      const struct GNUNET_CONFIGURATION_Handle *cfg)
4540 {
4541   active_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
4542                                                            "FS",
4543                                                            "ACTIVEMIGRATION");
4544   dsh = GNUNET_DATASTORE_connect (cfg,
4545                                   sched);
4546   if (dsh == NULL)
4547     {
4548       GNUNET_SCHEDULER_shutdown (sched);
4549       return;
4550     }
4551   datastore_get_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
4552   datastore_put_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
4553   block_cfg = GNUNET_CONFIGURATION_create ();
4554   GNUNET_CONFIGURATION_set_value_string (block_cfg,
4555                                          "block",
4556                                          "PLUGINS",
4557                                          "fs");
4558   block_ctx = GNUNET_BLOCK_context_create (block_cfg);
4559   GNUNET_assert (NULL != block_ctx);
4560   dht_handle = GNUNET_DHT_connect (sched,
4561                                    cfg,
4562                                    FS_DHT_HT_SIZE);
4563   if ( (GNUNET_OK != GNUNET_FS_indexing_init (sched, cfg, dsh)) ||
4564        (GNUNET_OK != main_init (sched, server, cfg)) )
4565     {    
4566       GNUNET_SCHEDULER_shutdown (sched);
4567       GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4568       dsh = NULL;
4569       GNUNET_DHT_disconnect (dht_handle);
4570       dht_handle = NULL;
4571       GNUNET_BLOCK_context_destroy (block_ctx);
4572       block_ctx = NULL;
4573       GNUNET_CONFIGURATION_destroy (block_cfg);
4574       block_cfg = NULL;
4575       GNUNET_LOAD_value_free (datastore_get_load);
4576       datastore_get_load = NULL;
4577       GNUNET_LOAD_value_free (datastore_put_load);
4578       datastore_put_load = NULL;
4579       return;   
4580     }
4581 }
4582
4583
4584 /**
4585  * The main function for the fs service.
4586  *
4587  * @param argc number of arguments from the command line
4588  * @param argv command line arguments
4589  * @return 0 ok, 1 on error
4590  */
4591 int
4592 main (int argc, char *const *argv)
4593 {
4594   return (GNUNET_OK ==
4595           GNUNET_SERVICE_run (argc,
4596                               argv,
4597                               "fs",
4598                               GNUNET_SERVICE_OPTION_NONE,
4599                               &run, NULL)) ? 0 : 1;
4600 }
4601
4602 /* end of gnunet-service-fs.c */