fixed
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs.c
23  * @brief gnunet anonymity protocol implementation
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - more statistics
28  */
29 #include "platform.h"
30 #include <float.h>
31 #include "gnunet_constants.h"
32 #include "gnunet_core_service.h"
33 #include "gnunet_dht_service.h"
34 #include "gnunet_datastore_service.h"
35 #include "gnunet_load_lib.h"
36 #include "gnunet_peer_lib.h"
37 #include "gnunet_protocols.h"
38 #include "gnunet_signatures.h"
39 #include "gnunet_statistics_service.h"
40 #include "gnunet_util_lib.h"
41 #include "gnunet-service-fs_indexing.h"
42 #include "fs.h"
43
44 #define DEBUG_FS GNUNET_NO
45
46 /**
47  * Should we introduce random latency in processing?  Required for proper
48  * implementation of GAP, but can be disabled for performance evaluation of
49  * the basic routing algorithm.
50  *
51  * Note that with delays enabled, performance can be significantly lower
52  * (several orders of magnitude in 2-peer test runs); if you want to
53  * measure throughput of other components, set this to NO.  Also, you
54  * might want to consider changing 'RETRY_PROBABILITY_INV' to 1 for
55  * a rather wasteful mode of operation (that might still get the highest
56  * throughput overall).
57  */
58 #define SUPPORT_DELAYS GNUNET_NO
59
60 /**
61  * Size for the hash map for DHT requests from the FS
62  * service.  Should be about the number of concurrent
63  * DHT requests we plan to make.
64  */
65 #define FS_DHT_HT_SIZE 1024
66
67 /**
68  * How often do we flush trust values to disk?
69  */
70 #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
71
72 /**
73  * How often do we at most PUT content into the DHT?
74  */
75 #define MAX_DHT_PUT_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
76
77 /**
78  * Inverse of the probability that we will submit the same query
79  * to the same peer again.  If the same peer already got the query
80  * repeatedly recently, the probability is multiplied by the inverse
81  * of this number each time.  Note that we only try about every TTL_DECREMENT/2
82  * plus MAX_CORK_DELAY (so roughly every 3.5s).
83  *
84  * Note that this factor is a key influence to performance in small
85  * networks (especially test networks of 2 peers) because if there is
86  * only a single peer with the data, this value will determine how
87  * soon we might re-try.  For example, a value of 3 can result in 
88  * 1.7 MB/s transfer rates for a 10 MB file when a value of 1 would
89  * give us 5 MB/s.  OTOH, obviously re-trying the same peer can be
90  * rather inefficient in larger networks, hence picking 1 is in 
91  * general not the best choice.
92  */
93 #define RETRY_PROBABILITY_INV 1
94
95 /**
96  * What is the maximum delay for a P2P FS message (in our interaction
97  * with core)?  FS-internal delays are another story.  The value is
98  * chosen based on the 32k block size.  Given that peers typcially
99  * have at least 1 kb/s bandwidth, 45s waits give us a chance to
100  * transmit one message even to the lowest-bandwidth peers.
101  */
102 #define MAX_TRANSMIT_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 45)
103
104 /**
105  * Maximum number of requests (from other peers, overall) that we're
106  * willing to have pending at any given point in time.  Can be changed
107  * via the configuration file (32k is just the default).
108  */
109 static unsigned long long max_pending_requests = (32 * 1024);
110
111
112 /**
113  * Information we keep for each pending reply.  The
114  * actual message follows at the end of this struct.
115  */
116 struct PendingMessage;
117
118 /**
119  * Function called upon completion of a transmission.
120  *
121  * @param cls closure
122  * @param pid ID of receiving peer, 0 on transmission error
123  */
124 typedef void (*TransmissionContinuation)(void * cls, 
125                                          GNUNET_PEER_Id tpid);
126
127
128 /**
129  * Information we keep for each pending message (GET/PUT).  The
130  * actual message follows at the end of this struct.
131  */
132 struct PendingMessage
133 {
134   /**
135    * This is a doubly-linked list of messages to the same peer.
136    */
137   struct PendingMessage *next;
138
139   /**
140    * This is a doubly-linked list of messages to the same peer.
141    */
142   struct PendingMessage *prev;
143
144   /**
145    * Entry in pending message list for this pending message.
146    */ 
147   struct PendingMessageList *pml;  
148
149   /**
150    * Function to call immediately once we have transmitted this
151    * message.
152    */
153   TransmissionContinuation cont;
154
155   /**
156    * Closure for cont.
157    */
158   void *cont_cls;
159
160   /**
161    * Do not transmit this pending message until this deadline.
162    */
163   struct GNUNET_TIME_Absolute delay_until;
164
165   /**
166    * Size of the reply; actual reply message follows
167    * at the end of this struct.
168    */
169   size_t msize;
170   
171   /**
172    * How important is this message for us?
173    */
174   uint32_t priority;
175  
176 };
177
178
179 /**
180  * Information about a peer that we are connected to.
181  * We track data that is useful for determining which
182  * peers should receive our requests.  We also keep
183  * a list of messages to transmit to this peer.
184  */
185 struct ConnectedPeer
186 {
187
188   /**
189    * List of the last clients for which this peer successfully
190    * answered a query.
191    */
192   struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
193
194   /**
195    * List of the last PIDs for which
196    * this peer successfully answered a query;
197    * We use 0 to indicate no successful reply.
198    */
199   GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
200
201   /**
202    * Average delay between sending the peer a request and
203    * getting a reply (only calculated over the requests for
204    * which we actually got a reply).   Calculated
205    * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
206    */ 
207   struct GNUNET_TIME_Relative avg_delay;
208
209   /**
210    * Point in time until which this peer does not want us to migrate content
211    * to it.
212    */
213   struct GNUNET_TIME_Absolute migration_blocked;
214
215   /**
216    * Time until when we blocked this peer from migrating
217    * data to us.
218    */
219   struct GNUNET_TIME_Absolute last_migration_block;
220
221   /**
222    * Transmission times for the last MAX_QUEUE_PER_PEER
223    * requests for this peer.  Used as a ring buffer, current
224    * offset is stored in 'last_request_times_off'.  If the
225    * oldest entry is more recent than the 'avg_delay', we should
226    * not send any more requests right now.
227    */
228   struct GNUNET_TIME_Absolute last_request_times[MAX_QUEUE_PER_PEER];
229
230   /**
231    * Handle for an active request for transmission to this
232    * peer, or NULL.
233    */
234   struct GNUNET_CORE_TransmitHandle *cth;
235
236   /**
237    * Messages (replies, queries, content migration) we would like to
238    * send to this peer in the near future.  Sorted by priority, head.
239    */
240   struct PendingMessage *pending_messages_head;
241
242   /**
243    * Messages (replies, queries, content migration) we would like to
244    * send to this peer in the near future.  Sorted by priority, tail.
245    */
246   struct PendingMessage *pending_messages_tail;
247
248   /**
249    * How long does it typically take for us to transmit a message
250    * to this peer?  (delay between the request being issued and
251    * the callback being invoked).
252    */
253   struct GNUNET_LOAD_Value *transmission_delay;
254
255   /**
256    * Time when the last transmission request was issued.
257    */
258   struct GNUNET_TIME_Absolute last_transmission_request_start;
259
260   /**
261    * ID of delay task for scheduling transmission.
262    */
263   GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task;
264
265   /**
266    * Average priority of successful replies.  Calculated
267    * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
268    */
269   double avg_priority;
270
271   /**
272    * Increase in traffic preference still to be submitted
273    * to the core service for this peer.
274    */
275   uint64_t inc_preference;
276
277   /**
278    * Trust rating for this peer
279    */
280   uint32_t trust;
281
282   /**
283    * Trust rating for this peer on disk.
284    */
285   uint32_t disk_trust;
286
287   /**
288    * The peer's identity.
289    */
290   GNUNET_PEER_Id pid;  
291
292   /**
293    * Size of the linked list of 'pending_messages'.
294    */
295   unsigned int pending_requests;
296
297   /**
298    * Which offset in "last_p2p_replies" will be updated next?
299    * (we go round-robin).
300    */
301   unsigned int last_p2p_replies_woff;
302
303   /**
304    * Which offset in "last_client_replies" will be updated next?
305    * (we go round-robin).
306    */
307   unsigned int last_client_replies_woff;
308
309   /**
310    * Current offset into 'last_request_times' ring buffer.
311    */
312   unsigned int last_request_times_off;
313
314 };
315
316
317 /**
318  * Information we keep for each pending request.  We should try to
319  * keep this struct as small as possible since its memory consumption
320  * is key to how many requests we can have pending at once.
321  */
322 struct PendingRequest;
323
324
325 /**
326  * Doubly-linked list of requests we are performing
327  * on behalf of the same client.
328  */
329 struct ClientRequestList
330 {
331
332   /**
333    * This is a doubly-linked list.
334    */
335   struct ClientRequestList *next;
336
337   /**
338    * This is a doubly-linked list.
339    */
340   struct ClientRequestList *prev;
341
342   /**
343    * Request this entry represents.
344    */
345   struct PendingRequest *req;
346
347   /**
348    * Client list this request belongs to.
349    */
350   struct ClientList *client_list;
351
352 };
353
354
355 /**
356  * Replies to be transmitted to the client.  The actual
357  * response message is allocated after this struct.
358  */
359 struct ClientResponseMessage
360 {
361   /**
362    * This is a doubly-linked list.
363    */
364   struct ClientResponseMessage *next;
365
366   /**
367    * This is a doubly-linked list.
368    */
369   struct ClientResponseMessage *prev;
370
371   /**
372    * Client list entry this response belongs to.
373    */
374   struct ClientList *client_list;
375
376   /**
377    * Number of bytes in the response.
378    */
379   size_t msize;
380 };
381
382
383 /**
384  * Linked list of clients we are performing requests
385  * for right now.
386  */
387 struct ClientList
388 {
389   /**
390    * This is a linked list.
391    */
392   struct ClientList *next;
393
394   /**
395    * ID of a client making a request, NULL if this entry is for a
396    * peer.
397    */
398   struct GNUNET_SERVER_Client *client;
399
400   /**
401    * Head of list of requests performed on behalf
402    * of this client right now.
403    */
404   struct ClientRequestList *rl_head;
405
406   /**
407    * Tail of list of requests performed on behalf
408    * of this client right now.
409    */
410   struct ClientRequestList *rl_tail;
411
412   /**
413    * Head of linked list of responses.
414    */
415   struct ClientResponseMessage *res_head;
416
417   /**
418    * Tail of linked list of responses.
419    */
420   struct ClientResponseMessage *res_tail;
421
422   /**
423    * Context for sending replies.
424    */
425   struct GNUNET_CONNECTION_TransmitHandle *th;
426
427 };
428
429
430 /**
431  * Information about a peer that we have forwarded this
432  * request to already.  
433  */
434 struct UsedTargetEntry
435 {
436   /**
437    * What was the last time we have transmitted this request to this
438    * peer?
439    */
440   struct GNUNET_TIME_Absolute last_request_time;
441
442   /**
443    * How often have we transmitted this request to this peer?
444    */
445   unsigned int num_requests;
446
447   /**
448    * PID of the target peer.
449    */
450   GNUNET_PEER_Id pid;
451
452 };
453
454
455
456
457
458 /**
459  * Doubly-linked list of messages we are performing
460  * due to a pending request.
461  */
462 struct PendingMessageList
463 {
464
465   /**
466    * This is a doubly-linked list of messages on behalf of the same request.
467    */
468   struct PendingMessageList *next;
469
470   /**
471    * This is a doubly-linked list of messages on behalf of the same request.
472    */
473   struct PendingMessageList *prev;
474
475   /**
476    * Message this entry represents.
477    */
478   struct PendingMessage *pm;
479
480   /**
481    * Request this entry belongs to.
482    */
483   struct PendingRequest *req;
484
485   /**
486    * Peer this message is targeted for.
487    */
488   struct ConnectedPeer *target;
489
490 };
491
492
493 /**
494  * Information we keep for each pending request.  We should try to
495  * keep this struct as small as possible since its memory consumption
496  * is key to how many requests we can have pending at once.
497  */
498 struct PendingRequest
499 {
500
501   /**
502    * If this request was made by a client, this is our entry in the
503    * client request list; otherwise NULL.
504    */
505   struct ClientRequestList *client_request_list;
506
507   /**
508    * Entry of peer responsible for this entry (if this request
509    * was made by a peer).
510    */
511   struct ConnectedPeer *cp;
512
513   /**
514    * If this is a namespace query, pointer to the hash of the public
515    * key of the namespace; otherwise NULL.  Pointer will be to the 
516    * end of this struct (so no need to free it).
517    */
518   const GNUNET_HashCode *namespace;
519
520   /**
521    * Bloomfilter we use to filter out replies that we don't care about
522    * (anymore).  NULL as long as we are interested in all replies.
523    */
524   struct GNUNET_CONTAINER_BloomFilter *bf;
525
526   /**
527    * Context of our GNUNET_CORE_peer_change_preference call.
528    */
529   struct GNUNET_CORE_InformationRequestContext *irc;
530
531   /**
532    * Reference to DHT get operation for this request (or NULL).
533    */
534   struct GNUNET_DHT_GetHandle *dht_get;
535
536   /**
537    * Hash code of all replies that we have seen so far (only valid
538    * if client is not NULL since we only track replies like this for
539    * our own clients).
540    */
541   GNUNET_HashCode *replies_seen;
542
543   /**
544    * Node in the heap representing this entry; NULL
545    * if we have no heap node.
546    */
547   struct GNUNET_CONTAINER_HeapNode *hnode;
548
549   /**
550    * Head of list of messages being performed on behalf of this
551    * request.
552    */
553   struct PendingMessageList *pending_head;
554
555   /**
556    * Tail of list of messages being performed on behalf of this
557    * request.
558    */
559   struct PendingMessageList *pending_tail;
560
561   /**
562    * When did we first see this request (form this peer), or, if our
563    * client is initiating, when did we last initiate a search?
564    */
565   struct GNUNET_TIME_Absolute start_time;
566
567   /**
568    * The query that this request is for.
569    */
570   GNUNET_HashCode query;
571
572   /**
573    * The task responsible for transmitting queries
574    * for this request.
575    */
576   GNUNET_SCHEDULER_TaskIdentifier task;
577
578   /**
579    * (Interned) Peer identifier that identifies a preferred target
580    * for requests.
581    */
582   GNUNET_PEER_Id target_pid;
583
584   /**
585    * (Interned) Peer identifiers of peers that have already
586    * received our query for this content.
587    */
588   struct UsedTargetEntry *used_targets;
589   
590   /**
591    * Our entry in the queue (non-NULL while we wait for our
592    * turn to interact with the local database).
593    */
594   struct GNUNET_DATASTORE_QueueEntry *qe;
595
596   /**
597    * Size of the 'bf' (in bytes).
598    */
599   size_t bf_size;
600
601   /**
602    * Desired anonymity level; only valid for requests from a local client.
603    */
604   uint32_t anonymity_level;
605
606   /**
607    * How many entries in "used_targets" are actually valid?
608    */
609   unsigned int used_targets_off;
610
611   /**
612    * How long is the "used_targets" array?
613    */
614   unsigned int used_targets_size;
615
616   /**
617    * Number of results found for this request.
618    */
619   unsigned int results_found;
620
621   /**
622    * How many entries in "replies_seen" are actually valid?
623    */
624   unsigned int replies_seen_off;
625
626   /**
627    * How long is the "replies_seen" array?
628    */
629   unsigned int replies_seen_size;
630   
631   /**
632    * Priority with which this request was made.  If one of our clients
633    * made the request, then this is the current priority that we are
634    * using when initiating the request.  This value is used when
635    * we decide to reward other peers with trust for providing a reply.
636    */
637   uint32_t priority;
638
639   /**
640    * Priority points left for us to spend when forwarding this request
641    * to other peers.
642    */
643   uint32_t remaining_priority;
644
645   /**
646    * Number to mingle hashes for bloom-filter tests with.
647    */
648   int32_t mingle;
649
650   /**
651    * TTL with which we saw this request (or, if we initiated, TTL that
652    * we used for the request).
653    */
654   int32_t ttl;
655   
656   /**
657    * Type of the content that this request is for.
658    */
659   enum GNUNET_BLOCK_Type type;
660
661   /**
662    * Remove this request after transmission of the current response.
663    */
664   int8_t do_remove;
665
666   /**
667    * GNUNET_YES if we should not forward this request to other peers.
668    */
669   int8_t local_only;
670
671   /**
672    * GNUNET_YES if we should not forward this request to other peers.
673    */
674   int8_t forward_only;
675
676 };
677
678
679 /**
680  * Block that is ready for migration to other peers.  Actual data is at the end of the block.
681  */
682 struct MigrationReadyBlock
683 {
684
685   /**
686    * This is a doubly-linked list.
687    */
688   struct MigrationReadyBlock *next;
689
690   /**
691    * This is a doubly-linked list.
692    */
693   struct MigrationReadyBlock *prev;
694
695   /**
696    * Query for the block.
697    */
698   GNUNET_HashCode query;
699
700   /**
701    * When does this block expire? 
702    */
703   struct GNUNET_TIME_Absolute expiration;
704
705   /**
706    * Peers we would consider forwarding this
707    * block to.  Zero for empty entries.
708    */
709   GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
710
711   /**
712    * Size of the block.
713    */
714   size_t size;
715
716   /**
717    *  Number of targets already used.
718    */
719   unsigned int used_targets;
720
721   /**
722    * Type of the block.
723    */
724   enum GNUNET_BLOCK_Type type;
725 };
726
727
728 /**
729  * Our connection to the datastore.
730  */
731 static struct GNUNET_DATASTORE_Handle *dsh;
732
733 /**
734  * Our block context.
735  */
736 static struct GNUNET_BLOCK_Context *block_ctx;
737
738 /**
739  * Our block configuration.
740  */
741 static struct GNUNET_CONFIGURATION_Handle *block_cfg;
742
743 /**
744  * Our scheduler.
745  */
746 static struct GNUNET_SCHEDULER_Handle *sched;
747
748 /**
749  * Our configuration.
750  */
751 static const struct GNUNET_CONFIGURATION_Handle *cfg;
752
753 /**
754  * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
755  */
756 static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
757
758 /**
759  * Map of peer identifiers to "struct PendingRequest" (for that peer).
760  */
761 static struct GNUNET_CONTAINER_MultiHashMap *peer_request_map;
762
763 /**
764  * Map of query identifiers to "struct PendingRequest" (for that query).
765  */
766 static struct GNUNET_CONTAINER_MultiHashMap *query_request_map;
767
768 /**
769  * Heap with the request that will expire next at the top.  Contains
770  * pointers of type "struct PendingRequest*"; these will *also* be
771  * aliased from the "requests_by_peer" data structures and the
772  * "requests_by_query" table.  Note that requests from our clients
773  * don't expire and are thus NOT in the "requests_by_expiration"
774  * (or the "requests_by_peer" tables).
775  */
776 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
777
778 /**
779  * Handle for reporting statistics.
780  */
781 static struct GNUNET_STATISTICS_Handle *stats;
782
783 /**
784  * Linked list of clients we are currently processing requests for.
785  */
786 static struct ClientList *client_list;
787
788 /**
789  * Pointer to handle to the core service (points to NULL until we've
790  * connected to it).
791  */
792 static struct GNUNET_CORE_Handle *core;
793
794 /**
795  * Head of linked list of blocks that can be migrated.
796  */
797 static struct MigrationReadyBlock *mig_head;
798
799 /**
800  * Tail of linked list of blocks that can be migrated.
801  */
802 static struct MigrationReadyBlock *mig_tail;
803
804 /**
805  * Request to datastore for migration (or NULL).
806  */
807 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
808
809 /**
810  * Request to datastore for DHT PUTs (or NULL).
811  */
812 static struct GNUNET_DATASTORE_QueueEntry *dht_qe;
813
814 /**
815  * Type we will request for the next DHT PUT round from the datastore.
816  */
817 static enum GNUNET_BLOCK_Type dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
818
819 /**
820  * Where do we store trust information?
821  */
822 static char *trustDirectory;
823
824 /**
825  * ID of task that collects blocks for migration.
826  */
827 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
828
829 /**
830  * ID of task that collects blocks for DHT PUTs.
831  */
832 static GNUNET_SCHEDULER_TaskIdentifier dht_task;
833
834 /**
835  * What is the maximum frequency at which we are allowed to
836  * poll the datastore for migration content?
837  */
838 static struct GNUNET_TIME_Relative min_migration_delay;
839
840 /**
841  * Handle for DHT operations.
842  */
843 static struct GNUNET_DHT_Handle *dht_handle;
844
845 /**
846  * Size of the doubly-linked list of migration blocks.
847  */
848 static unsigned int mig_size;
849
850 /**
851  * Are we allowed to migrate content to this peer.
852  */
853 static int active_migration;
854
855 /**
856  * How many entires with zero anonymity do we currently estimate
857  * to have in the database?
858  */
859 static unsigned int zero_anonymity_count_estimate;
860
861 /**
862  * Typical priorities we're seeing from other peers right now.  Since
863  * most priorities will be zero, this value is the weighted average of
864  * non-zero priorities seen "recently".  In order to ensure that new
865  * values do not dramatically change the ratio, values are first
866  * "capped" to a reasonable range (+N of the current value) and then
867  * averaged into the existing value by a ratio of 1:N.  Hence
868  * receiving the largest possible priority can still only raise our
869  * "current_priorities" by at most 1.
870  */
871 static double current_priorities;
872
873 /**
874  * Datastore 'GET' load tracking.
875  */
876 static struct GNUNET_LOAD_Value *datastore_get_load;
877
878 /**
879  * Datastore 'PUT' load tracking.
880  */
881 static struct GNUNET_LOAD_Value *datastore_put_load;
882
883 /**
884  * How long do requests typically stay in the routing table?
885  */
886 static struct GNUNET_LOAD_Value *rt_entry_lifetime;
887
888 /**
889  * We've just now completed a datastore request.  Update our
890  * datastore load calculations.
891  *
892  * @param start time when the datastore request was issued
893  */
894 static void
895 update_datastore_delays (struct GNUNET_TIME_Absolute start)
896 {
897   struct GNUNET_TIME_Relative delay;
898
899   delay = GNUNET_TIME_absolute_get_duration (start);
900   GNUNET_LOAD_update (datastore_get_load,
901                       delay.value);
902 }
903
904
905 /**
906  * Get the filename under which we would store the GNUNET_HELLO_Message
907  * for the given host and protocol.
908  * @return filename of the form DIRECTORY/HOSTID
909  */
910 static char *
911 get_trust_filename (const struct GNUNET_PeerIdentity *id)
912 {
913   struct GNUNET_CRYPTO_HashAsciiEncoded fil;
914   char *fn;
915
916   GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
917   GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil);
918   return fn;
919 }
920
921
922
923 /**
924  * Transmit messages by copying it to the target buffer
925  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
926  * for writing in the meantime.  In that case, do nothing
927  * (the disconnect or shutdown handler will take care of the rest).
928  * If we were able to transmit messages and there are still more
929  * pending, ask core again for further calls to this function.
930  *
931  * @param cls closure, pointer to the 'struct ConnectedPeer*'
932  * @param size number of bytes available in buf
933  * @param buf where the callee should write the message
934  * @return number of bytes written to buf
935  */
936 static size_t
937 transmit_to_peer (void *cls,
938                   size_t size, void *buf);
939
940
941 /* ******************* clean up functions ************************ */
942
943 /**
944  * Delete the given migration block.
945  *
946  * @param mb block to delete
947  */
948 static void
949 delete_migration_block (struct MigrationReadyBlock *mb)
950 {
951   GNUNET_CONTAINER_DLL_remove (mig_head,
952                                mig_tail,
953                                mb);
954   GNUNET_PEER_decrement_rcs (mb->target_list,
955                              MIGRATION_LIST_SIZE);
956   mig_size--;
957   GNUNET_free (mb);
958 }
959
960
961 /**
962  * Compare the distance of two peers to a key.
963  *
964  * @param key key
965  * @param p1 first peer
966  * @param p2 second peer
967  * @return GNUNET_YES if P1 is closer to key than P2
968  */
969 static int
970 is_closer (const GNUNET_HashCode *key,
971            const struct GNUNET_PeerIdentity *p1,
972            const struct GNUNET_PeerIdentity *p2)
973 {
974   return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey,
975                                     &p2->hashPubKey,
976                                     key);
977 }
978
979
980 /**
981  * Consider migrating content to a given peer.
982  *
983  * @param cls 'struct MigrationReadyBlock*' to select
984  *            targets for (or NULL for none)
985  * @param key ID of the peer 
986  * @param value 'struct ConnectedPeer' of the peer
987  * @return GNUNET_YES (always continue iteration)
988  */
989 static int
990 consider_migration (void *cls,
991                     const GNUNET_HashCode *key,
992                     void *value)
993 {
994   struct MigrationReadyBlock *mb = cls;
995   struct ConnectedPeer *cp = value;
996   struct MigrationReadyBlock *pos;
997   struct GNUNET_PeerIdentity cppid;
998   struct GNUNET_PeerIdentity otherpid;
999   struct GNUNET_PeerIdentity worstpid;
1000   size_t msize;
1001   unsigned int i;
1002   unsigned int repl;
1003   
1004   /* consider 'cp' as a migration target for mb */
1005   if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).value > 0)
1006     return GNUNET_YES; /* peer has requested no migration! */
1007   if (mb != NULL)
1008     {
1009       GNUNET_PEER_resolve (cp->pid,
1010                            &cppid);
1011       repl = MIGRATION_LIST_SIZE;
1012       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1013         {
1014           if (mb->target_list[i] == 0)
1015             {
1016               mb->target_list[i] = cp->pid;
1017               GNUNET_PEER_change_rc (mb->target_list[i], 1);
1018               repl = MIGRATION_LIST_SIZE;
1019               break;
1020             }
1021           GNUNET_PEER_resolve (mb->target_list[i],
1022                                &otherpid);
1023           if ( (repl == MIGRATION_LIST_SIZE) &&
1024                is_closer (&mb->query,
1025                           &cppid,
1026                           &otherpid)) 
1027             {
1028               repl = i;
1029               worstpid = otherpid;
1030             }
1031           else if ( (repl != MIGRATION_LIST_SIZE) &&
1032                     (is_closer (&mb->query,
1033                                 &worstpid,
1034                                 &otherpid) ) )
1035             {
1036               repl = i;
1037               worstpid = otherpid;
1038             }       
1039         }
1040       if (repl != MIGRATION_LIST_SIZE) 
1041         {
1042           GNUNET_PEER_change_rc (mb->target_list[repl], -1);
1043           mb->target_list[repl] = cp->pid;
1044           GNUNET_PEER_change_rc (mb->target_list[repl], 1);
1045         }
1046     }
1047
1048   /* consider scheduling transmission to cp for content migration */
1049   if (cp->cth != NULL)        
1050     return GNUNET_YES; 
1051   msize = 0;
1052   pos = mig_head;
1053   while (pos != NULL)
1054     {
1055       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1056         {
1057           if (cp->pid == pos->target_list[i])
1058             {
1059               if (msize == 0)
1060                 msize = pos->size;
1061               else
1062                 msize = GNUNET_MIN (msize,
1063                                     pos->size);
1064               break;
1065             }
1066         }
1067       pos = pos->next;
1068     }
1069   if (msize == 0)
1070     return GNUNET_YES; /* no content available */
1071 #if DEBUG_FS
1072   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1073               "Trying to migrate at least %u bytes to peer `%s'\n",
1074               msize,
1075               GNUNET_h2s (key));
1076 #endif
1077   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
1078     {
1079       GNUNET_SCHEDULER_cancel (sched, cp->delayed_transmission_request_task);
1080       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
1081     }
1082   cp->cth 
1083     = GNUNET_CORE_notify_transmit_ready (core,
1084                                          0, GNUNET_TIME_UNIT_FOREVER_REL,
1085                                          (const struct GNUNET_PeerIdentity*) key,
1086                                          msize + sizeof (struct PutMessage),
1087                                          &transmit_to_peer,
1088                                          cp);
1089   return GNUNET_YES;
1090 }
1091
1092
1093 /**
1094  * Task that is run periodically to obtain blocks for content
1095  * migration
1096  * 
1097  * @param cls unused
1098  * @param tc scheduler context (also unused)
1099  */
1100 static void
1101 gather_migration_blocks (void *cls,
1102                          const struct GNUNET_SCHEDULER_TaskContext *tc);
1103
1104
1105
1106
1107 /**
1108  * Task that is run periodically to obtain blocks for DHT PUTs.
1109  * 
1110  * @param cls type of blocks to gather
1111  * @param tc scheduler context (unused)
1112  */
1113 static void
1114 gather_dht_put_blocks (void *cls,
1115                        const struct GNUNET_SCHEDULER_TaskContext *tc);
1116
1117
1118 /**
1119  * If the migration task is not currently running, consider
1120  * (re)scheduling it with the appropriate delay.
1121  */
1122 static void
1123 consider_migration_gathering ()
1124 {
1125   struct GNUNET_TIME_Relative delay;
1126
1127   if (dsh == NULL)
1128     return;
1129   if (mig_qe != NULL)
1130     return;
1131   if (mig_task != GNUNET_SCHEDULER_NO_TASK)
1132     return;
1133   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
1134                                          mig_size);
1135   delay = GNUNET_TIME_relative_divide (delay,
1136                                        MAX_MIGRATION_QUEUE);
1137   delay = GNUNET_TIME_relative_max (delay,
1138                                     min_migration_delay);
1139   mig_task = GNUNET_SCHEDULER_add_delayed (sched,
1140                                            delay,
1141                                            &gather_migration_blocks,
1142                                            NULL);
1143 }
1144
1145
1146 /**
1147  * If the DHT PUT gathering task is not currently running, consider
1148  * (re)scheduling it with the appropriate delay.
1149  */
1150 static void
1151 consider_dht_put_gathering (void *cls)
1152 {
1153   struct GNUNET_TIME_Relative delay;
1154
1155   if (dsh == NULL)
1156     return;
1157   if (dht_qe != NULL)
1158     return;
1159   if (dht_task != GNUNET_SCHEDULER_NO_TASK)
1160     return;
1161   if (zero_anonymity_count_estimate > 0)
1162     {
1163       delay = GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY,
1164                                            zero_anonymity_count_estimate);
1165       delay = GNUNET_TIME_relative_min (delay,
1166                                         MAX_DHT_PUT_FREQ);
1167     }
1168   else
1169     {
1170       /* if we have NO zero-anonymity content yet, wait 5 minutes for some to
1171          (hopefully) appear */
1172       delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5);
1173     }
1174   dht_task = GNUNET_SCHEDULER_add_delayed (sched,
1175                                            delay,
1176                                            &gather_dht_put_blocks,
1177                                            cls);
1178 }
1179
1180
1181 /**
1182  * Process content offered for migration.
1183  *
1184  * @param cls closure
1185  * @param key key for the content
1186  * @param size number of bytes in data
1187  * @param data content stored
1188  * @param type type of the content
1189  * @param priority priority of the content
1190  * @param anonymity anonymity-level for the content
1191  * @param expiration expiration time for the content
1192  * @param uid unique identifier for the datum;
1193  *        maybe 0 if no unique identifier is available
1194  */
1195 static void
1196 process_migration_content (void *cls,
1197                            const GNUNET_HashCode * key,
1198                            size_t size,
1199                            const void *data,
1200                            enum GNUNET_BLOCK_Type type,
1201                            uint32_t priority,
1202                            uint32_t anonymity,
1203                            struct GNUNET_TIME_Absolute
1204                            expiration, uint64_t uid)
1205 {
1206   struct MigrationReadyBlock *mb;
1207   
1208   if (key == NULL)
1209     {
1210       mig_qe = NULL;
1211       if (mig_size < MAX_MIGRATION_QUEUE)  
1212         consider_migration_gathering ();
1213       return;
1214     }
1215   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1216     {
1217       if (GNUNET_OK !=
1218           GNUNET_FS_handle_on_demand_block (key, size, data,
1219                                             type, priority, anonymity,
1220                                             expiration, uid, 
1221                                             &process_migration_content,
1222                                             NULL))
1223         {
1224           GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1225         }
1226       return;
1227     }
1228 #if DEBUG_FS
1229   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1230               "Retrieved block `%s' of type %u for migration\n",
1231               GNUNET_h2s (key),
1232               type);
1233 #endif
1234   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
1235   mb->query = *key;
1236   mb->expiration = expiration;
1237   mb->size = size;
1238   mb->type = type;
1239   memcpy (&mb[1], data, size);
1240   GNUNET_CONTAINER_DLL_insert_after (mig_head,
1241                                      mig_tail,
1242                                      mig_tail,
1243                                      mb);
1244   mig_size++;
1245   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1246                                          &consider_migration,
1247                                          mb);
1248   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1249 }
1250
1251
1252 /**
1253  * Function called upon completion of the DHT PUT operation.
1254  */
1255 static void
1256 dht_put_continuation (void *cls,
1257                       const struct GNUNET_SCHEDULER_TaskContext *tc)
1258 {
1259   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1260 }
1261
1262
1263 /**
1264  * Store content in DHT.
1265  *
1266  * @param cls closure
1267  * @param key key for the content
1268  * @param size number of bytes in data
1269  * @param data content stored
1270  * @param type type of the content
1271  * @param priority priority of the content
1272  * @param anonymity anonymity-level for the content
1273  * @param expiration expiration time for the content
1274  * @param uid unique identifier for the datum;
1275  *        maybe 0 if no unique identifier is available
1276  */
1277 static void
1278 process_dht_put_content (void *cls,
1279                          const GNUNET_HashCode * key,
1280                          size_t size,
1281                          const void *data,
1282                          enum GNUNET_BLOCK_Type type,
1283                          uint32_t priority,
1284                          uint32_t anonymity,
1285                          struct GNUNET_TIME_Absolute
1286                          expiration, uint64_t uid)
1287
1288   static unsigned int counter;
1289   static GNUNET_HashCode last_vhash;
1290   static GNUNET_HashCode vhash;
1291
1292   if (key == NULL)
1293     {
1294       dht_qe = NULL;
1295       consider_dht_put_gathering (cls);
1296       return;
1297     }
1298   /* slightly funky code to estimate the total number of values with zero
1299      anonymity from the maximum observed length of a monotonically increasing 
1300      sequence of hashes over the contents */
1301   GNUNET_CRYPTO_hash (data, size, &vhash);
1302   if (GNUNET_CRYPTO_hash_cmp (&vhash, &last_vhash) <= 0)
1303     {
1304       if (zero_anonymity_count_estimate > 0)
1305         zero_anonymity_count_estimate /= 2;
1306       counter = 0;
1307     }
1308   last_vhash = vhash;
1309   if (counter < 31)
1310     counter++;
1311   if (zero_anonymity_count_estimate < (1 << counter))
1312     zero_anonymity_count_estimate = (1 << counter);
1313 #if DEBUG_FS
1314   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1315               "Retrieved block `%s' of type %u for DHT PUT\n",
1316               GNUNET_h2s (key),
1317               type);
1318 #endif
1319   GNUNET_DHT_put (dht_handle,
1320                   key,
1321                   GNUNET_DHT_RO_NONE,
1322                   type,
1323                   size,
1324                   data,
1325                   expiration,
1326                   GNUNET_TIME_UNIT_FOREVER_REL,
1327                   &dht_put_continuation,
1328                   cls);
1329 }
1330
1331
1332 /**
1333  * Task that is run periodically to obtain blocks for content
1334  * migration
1335  * 
1336  * @param cls unused
1337  * @param tc scheduler context (also unused)
1338  */
1339 static void
1340 gather_migration_blocks (void *cls,
1341                          const struct GNUNET_SCHEDULER_TaskContext *tc)
1342 {
1343   mig_task = GNUNET_SCHEDULER_NO_TASK;
1344   if (dsh != NULL)
1345     {
1346       mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, UINT_MAX,
1347                                             GNUNET_TIME_UNIT_FOREVER_REL,
1348                                             &process_migration_content, NULL);
1349       GNUNET_assert (mig_qe != NULL);
1350     }
1351 }
1352
1353
1354 /**
1355  * Task that is run periodically to obtain blocks for DHT PUTs.
1356  * 
1357  * @param cls type of blocks to gather
1358  * @param tc scheduler context (unused)
1359  */
1360 static void
1361 gather_dht_put_blocks (void *cls,
1362                        const struct GNUNET_SCHEDULER_TaskContext *tc)
1363 {
1364   dht_task = GNUNET_SCHEDULER_NO_TASK;
1365   if (dsh != NULL)
1366     {
1367       if (dht_put_type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1368         dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
1369       dht_qe = GNUNET_DATASTORE_get_zero_anonymity (dsh, 0, UINT_MAX,
1370                                                     GNUNET_TIME_UNIT_FOREVER_REL,
1371                                                     dht_put_type++,
1372                                                     &process_dht_put_content, NULL);
1373       GNUNET_assert (dht_qe != NULL);
1374     }
1375 }
1376
1377
1378 /**
1379  * We're done with a particular message list entry.
1380  * Free all associated resources.
1381  * 
1382  * @param pml entry to destroy
1383  */
1384 static void
1385 destroy_pending_message_list_entry (struct PendingMessageList *pml)
1386 {
1387   GNUNET_CONTAINER_DLL_remove (pml->req->pending_head,
1388                                pml->req->pending_tail,
1389                                pml);
1390   GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head,
1391                                pml->target->pending_messages_tail,
1392                                pml->pm);
1393   pml->target->pending_requests--;
1394   GNUNET_free (pml->pm);
1395   GNUNET_free (pml);
1396 }
1397
1398
1399 /**
1400  * Destroy the given pending message (and call the respective
1401  * continuation).
1402  *
1403  * @param pm message to destroy
1404  * @param tpid id of peer that the message was delivered to, or 0 for none
1405  */
1406 static void
1407 destroy_pending_message (struct PendingMessage *pm,
1408                          GNUNET_PEER_Id tpid)
1409 {
1410   struct PendingMessageList *pml = pm->pml;
1411   TransmissionContinuation cont;
1412   void *cont_cls;
1413
1414   cont = pm->cont;
1415   cont_cls = pm->cont_cls;
1416   if (pml != NULL)
1417     {
1418       GNUNET_assert (pml->pm == pm);
1419       GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
1420       destroy_pending_message_list_entry (pml);
1421     }
1422   else
1423     {
1424       GNUNET_free (pm);
1425     }
1426   if (cont != NULL)
1427     cont (cont_cls, tpid);  
1428 }
1429
1430
1431 /**
1432  * We're done processing a particular request.
1433  * Free all associated resources.
1434  *
1435  * @param pr request to destroy
1436  */
1437 static void
1438 destroy_pending_request (struct PendingRequest *pr)
1439 {
1440   struct GNUNET_PeerIdentity pid;
1441   unsigned int i;
1442
1443   if (pr->hnode != NULL)
1444     {
1445       GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
1446                                          pr->hnode);
1447       pr->hnode = NULL;
1448     }
1449   if (NULL == pr->client_request_list)
1450     {
1451       GNUNET_STATISTICS_update (stats,
1452                                 gettext_noop ("# P2P searches active"),
1453                                 -1,
1454                                 GNUNET_NO);
1455     }
1456   else
1457     {
1458       GNUNET_STATISTICS_update (stats,
1459                                 gettext_noop ("# client searches active"),
1460                                 -1,
1461                                 GNUNET_NO);
1462     }
1463   if (GNUNET_YES == 
1464       GNUNET_CONTAINER_multihashmap_remove (query_request_map,
1465                                             &pr->query,
1466                                             pr))
1467     {
1468       GNUNET_LOAD_update (rt_entry_lifetime,
1469                           GNUNET_TIME_absolute_get_duration (pr->start_time).value);
1470     }
1471   if (pr->qe != NULL)
1472      {
1473       GNUNET_DATASTORE_cancel (pr->qe);
1474       pr->qe = NULL;
1475     }
1476   if (pr->dht_get != NULL)
1477     {
1478       GNUNET_DHT_get_stop (pr->dht_get);
1479       pr->dht_get = NULL;
1480     }
1481   if (pr->client_request_list != NULL)
1482     {
1483       GNUNET_CONTAINER_DLL_remove (pr->client_request_list->client_list->rl_head,
1484                                    pr->client_request_list->client_list->rl_tail,
1485                                    pr->client_request_list);
1486       GNUNET_free (pr->client_request_list);
1487       pr->client_request_list = NULL;
1488     }
1489   if (pr->cp != NULL)
1490     {
1491       GNUNET_PEER_resolve (pr->cp->pid,
1492                            &pid);
1493       (void) GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1494                                                    &pid.hashPubKey,
1495                                                    pr);
1496       pr->cp = NULL;
1497     }
1498   if (pr->bf != NULL)
1499     {
1500       GNUNET_CONTAINER_bloomfilter_free (pr->bf);                                        
1501       pr->bf = NULL;
1502     }
1503   if (pr->irc != NULL)
1504     {
1505       GNUNET_CORE_peer_change_preference_cancel (pr->irc);
1506       pr->irc = NULL;
1507     }
1508   if (pr->replies_seen != NULL)
1509     {
1510       GNUNET_free (pr->replies_seen);
1511       pr->replies_seen = NULL;
1512     }
1513   if (pr->task != GNUNET_SCHEDULER_NO_TASK)
1514     {
1515       GNUNET_SCHEDULER_cancel (sched,
1516                                pr->task);
1517       pr->task = GNUNET_SCHEDULER_NO_TASK;
1518     }
1519   while (NULL != pr->pending_head)    
1520     destroy_pending_message_list_entry (pr->pending_head);
1521   GNUNET_PEER_change_rc (pr->target_pid, -1);
1522   if (pr->used_targets != NULL)
1523     {
1524       for (i=0;i<pr->used_targets_off;i++)
1525         GNUNET_PEER_change_rc (pr->used_targets[i].pid, -1);
1526       GNUNET_free (pr->used_targets);
1527       pr->used_targets_off = 0;
1528       pr->used_targets_size = 0;
1529       pr->used_targets = NULL;
1530     }
1531   GNUNET_free (pr);
1532 }
1533
1534
1535 /**
1536  * Method called whenever a given peer connects.
1537  *
1538  * @param cls closure, not used
1539  * @param peer peer identity this notification is about
1540  * @param latency reported latency of the connection with 'other'
1541  * @param distance reported distance (DV) to 'other' 
1542  */
1543 static void 
1544 peer_connect_handler (void *cls,
1545                       const struct
1546                       GNUNET_PeerIdentity * peer,
1547                       struct GNUNET_TIME_Relative latency,
1548                       uint32_t distance)
1549 {
1550   struct ConnectedPeer *cp;
1551   struct MigrationReadyBlock *pos;
1552   char *fn;
1553   uint32_t trust;
1554   
1555   cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
1556   cp->transmission_delay = GNUNET_LOAD_value_init ();
1557   cp->pid = GNUNET_PEER_intern (peer);
1558
1559   fn = get_trust_filename (peer);
1560   if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
1561       (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
1562     cp->disk_trust = cp->trust = ntohl (trust);
1563   GNUNET_free (fn);
1564
1565   GNUNET_break (GNUNET_OK ==
1566                 GNUNET_CONTAINER_multihashmap_put (connected_peers,
1567                                                    &peer->hashPubKey,
1568                                                    cp,
1569                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1570
1571   pos = mig_head;
1572   while (NULL != pos)
1573     {
1574       (void) consider_migration (pos, &peer->hashPubKey, cp);
1575       pos = pos->next;
1576     }
1577 }
1578
1579
1580 /**
1581  * Increase the host credit by a value.
1582  *
1583  * @param host which peer to change the trust value on
1584  * @param value is the int value by which the
1585  *  host credit is to be increased or decreased
1586  * @returns the actual change in trust (positive or negative)
1587  */
1588 static int
1589 change_host_trust (struct ConnectedPeer *host, int value)
1590 {
1591   unsigned int old_trust;
1592
1593   if (value == 0)
1594     return 0;
1595   GNUNET_assert (host != NULL);
1596   old_trust = host->trust;
1597   if (value > 0)
1598     {
1599       if (host->trust + value < host->trust)
1600         {
1601           value = UINT32_MAX - host->trust;
1602           host->trust = UINT32_MAX;
1603         }
1604       else
1605         host->trust += value;
1606     }
1607   else
1608     {
1609       if (host->trust < -value)
1610         {
1611           value = -host->trust;
1612           host->trust = 0;
1613         }
1614       else
1615         host->trust += value;
1616     }
1617   return value;
1618 }
1619
1620
1621 /**
1622  * Write host-trust information to a file - flush the buffer entry!
1623  */
1624 static int
1625 flush_trust (void *cls,
1626              const GNUNET_HashCode *key,
1627              void *value)
1628 {
1629   struct ConnectedPeer *host = value;
1630   char *fn;
1631   uint32_t trust;
1632   struct GNUNET_PeerIdentity pid;
1633
1634   if (host->trust == host->disk_trust)
1635     return GNUNET_OK;                     /* unchanged */
1636   GNUNET_PEER_resolve (host->pid,
1637                        &pid);
1638   fn = get_trust_filename (&pid);
1639   if (host->trust == 0)
1640     {
1641       if ((0 != UNLINK (fn)) && (errno != ENOENT))
1642         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
1643                                   GNUNET_ERROR_TYPE_BULK, "unlink", fn);
1644     }
1645   else
1646     {
1647       trust = htonl (host->trust);
1648       if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust, 
1649                                                     sizeof(uint32_t),
1650                                                     GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE
1651                                                     | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ))
1652         host->disk_trust = host->trust;
1653     }
1654   GNUNET_free (fn);
1655   return GNUNET_OK;
1656 }
1657
1658 /**
1659  * Call this method periodically to scan data/hosts for new hosts.
1660  */
1661 static void
1662 cron_flush_trust (void *cls,
1663                   const struct GNUNET_SCHEDULER_TaskContext *tc)
1664 {
1665
1666   if (NULL == connected_peers)
1667     return;
1668   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1669                                          &flush_trust,
1670                                          NULL);
1671   if (NULL == tc)
1672     return;
1673   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1674     return;
1675   GNUNET_SCHEDULER_add_delayed (tc->sched,
1676                                 TRUST_FLUSH_FREQ, &cron_flush_trust, NULL);
1677 }
1678
1679
1680 /**
1681  * Free (each) request made by the peer.
1682  *
1683  * @param cls closure, points to peer that the request belongs to
1684  * @param key current key code
1685  * @param value value in the hash map
1686  * @return GNUNET_YES (we should continue to iterate)
1687  */
1688 static int
1689 destroy_request (void *cls,
1690                  const GNUNET_HashCode * key,
1691                  void *value)
1692 {
1693   const struct GNUNET_PeerIdentity * peer = cls;
1694   struct PendingRequest *pr = value;
1695   
1696   GNUNET_break (GNUNET_YES ==
1697                 GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1698                                                       &peer->hashPubKey,
1699                                                       pr));
1700   destroy_pending_request (pr);
1701   return GNUNET_YES;
1702 }
1703
1704
1705 /**
1706  * Method called whenever a peer disconnects.
1707  *
1708  * @param cls closure, not used
1709  * @param peer peer identity this notification is about
1710  */
1711 static void
1712 peer_disconnect_handler (void *cls,
1713                          const struct
1714                          GNUNET_PeerIdentity * peer)
1715 {
1716   struct ConnectedPeer *cp;
1717   struct PendingMessage *pm;
1718   unsigned int i;
1719   struct MigrationReadyBlock *pos;
1720   struct MigrationReadyBlock *next;
1721
1722   GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map,
1723                                               &peer->hashPubKey,
1724                                               &destroy_request,
1725                                               (void*) peer);
1726   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1727                                           &peer->hashPubKey);
1728   if (cp == NULL)
1729     return;
1730   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1731     {
1732       if (NULL != cp->last_client_replies[i])
1733         {
1734           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1735           cp->last_client_replies[i] = NULL;
1736         }
1737     }
1738   GNUNET_break (GNUNET_YES ==
1739                 GNUNET_CONTAINER_multihashmap_remove (connected_peers,
1740                                                       &peer->hashPubKey,
1741                                                       cp));
1742   /* remove this peer from migration considerations; schedule
1743      alternatives */
1744   next = mig_head;
1745   while (NULL != (pos = next))
1746     {
1747       next = pos->next;
1748       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1749         {
1750           if (pos->target_list[i] == cp->pid)
1751             {
1752               GNUNET_PEER_change_rc (pos->target_list[i], -1);
1753               pos->target_list[i] = 0;
1754             }
1755          }
1756       if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers))
1757         {
1758           delete_migration_block (pos);
1759           consider_migration_gathering ();
1760           continue;
1761         }
1762       GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1763                                              &consider_migration,
1764                                              pos);
1765     }
1766   GNUNET_PEER_change_rc (cp->pid, -1);
1767   GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1768   if (NULL != cp->cth)
1769     {
1770       GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1771       cp->cth = NULL;
1772     }
1773   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
1774     {
1775       GNUNET_SCHEDULER_cancel (sched, cp->delayed_transmission_request_task);
1776       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
1777     }
1778   while (NULL != (pm = cp->pending_messages_head))
1779     destroy_pending_message (pm, 0 /* delivery failed */);
1780   GNUNET_LOAD_value_free (cp->transmission_delay);
1781   GNUNET_break (0 == cp->pending_requests);
1782   GNUNET_free (cp);
1783 }
1784
1785
1786 /**
1787  * Iterator over hash map entries that removes all occurences
1788  * of the given 'client' from the 'last_client_replies' of the
1789  * given connected peer.
1790  *
1791  * @param cls closure, the 'struct GNUNET_SERVER_Client*' to remove
1792  * @param key current key code (unused)
1793  * @param value value in the hash map (the 'struct ConnectedPeer*' to change)
1794  * @return GNUNET_YES (we should continue to iterate)
1795  */
1796 static int
1797 remove_client_from_last_client_replies (void *cls,
1798                                         const GNUNET_HashCode * key,
1799                                         void *value)
1800 {
1801   struct GNUNET_SERVER_Client *client = cls;
1802   struct ConnectedPeer *cp = value;
1803   unsigned int i;
1804
1805   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1806     {
1807       if (cp->last_client_replies[i] == client)
1808         {
1809           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1810           cp->last_client_replies[i] = NULL;
1811         }
1812     }  
1813   return GNUNET_YES;
1814 }
1815
1816
1817 /**
1818  * A client disconnected.  Remove all of its pending queries.
1819  *
1820  * @param cls closure, NULL
1821  * @param client identification of the client
1822  */
1823 static void
1824 handle_client_disconnect (void *cls,
1825                           struct GNUNET_SERVER_Client
1826                           * client)
1827 {
1828   struct ClientList *pos;
1829   struct ClientList *prev;
1830   struct ClientRequestList *rcl;
1831   struct ClientResponseMessage *creply;
1832
1833   if (client == NULL)
1834     return;
1835   prev = NULL;
1836   pos = client_list;
1837   while ( (NULL != pos) &&
1838           (pos->client != client) )
1839     {
1840       prev = pos;
1841       pos = pos->next;
1842     }
1843   if (pos == NULL)
1844     return; /* no requests pending for this client */
1845   while (NULL != (rcl = pos->rl_head))
1846     {
1847       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1848                   "Destroying pending request `%s' on disconnect\n",
1849                   GNUNET_h2s (&rcl->req->query));
1850       destroy_pending_request (rcl->req);
1851     }
1852   if (prev == NULL)
1853     client_list = pos->next;
1854   else
1855     prev->next = pos->next;
1856   if (pos->th != NULL)
1857     {
1858       GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
1859       pos->th = NULL;
1860     }
1861   while (NULL != (creply = pos->res_head))
1862     {
1863       GNUNET_CONTAINER_DLL_remove (pos->res_head,
1864                                    pos->res_tail,
1865                                    creply);
1866       GNUNET_free (creply);
1867     }    
1868   GNUNET_SERVER_client_drop (pos->client);
1869   GNUNET_free (pos);
1870   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1871                                          &remove_client_from_last_client_replies,
1872                                          client);
1873 }
1874
1875
1876 /**
1877  * Iterator to free peer entries.
1878  *
1879  * @param cls closure, unused
1880  * @param key current key code
1881  * @param value value in the hash map (peer entry)
1882  * @return GNUNET_YES (we should continue to iterate)
1883  */
1884 static int 
1885 clean_peer (void *cls,
1886             const GNUNET_HashCode * key,
1887             void *value)
1888 {
1889   peer_disconnect_handler (NULL, (const struct GNUNET_PeerIdentity*) key);
1890   return GNUNET_YES;
1891 }
1892
1893
1894 /**
1895  * Task run during shutdown.
1896  *
1897  * @param cls unused
1898  * @param tc unused
1899  */
1900 static void
1901 shutdown_task (void *cls,
1902                const struct GNUNET_SCHEDULER_TaskContext *tc)
1903 {
1904   if (mig_qe != NULL)
1905     {
1906       GNUNET_DATASTORE_cancel (mig_qe);
1907       mig_qe = NULL;
1908     }
1909   if (dht_qe != NULL)
1910     {
1911       GNUNET_DATASTORE_cancel (dht_qe);
1912       dht_qe = NULL;
1913     }
1914   if (GNUNET_SCHEDULER_NO_TASK != mig_task)
1915     {
1916       GNUNET_SCHEDULER_cancel (sched, mig_task);
1917       mig_task = GNUNET_SCHEDULER_NO_TASK;
1918     }
1919   if (GNUNET_SCHEDULER_NO_TASK != dht_task)
1920     {
1921       GNUNET_SCHEDULER_cancel (sched, dht_task);
1922       dht_task = GNUNET_SCHEDULER_NO_TASK;
1923     }
1924   while (client_list != NULL)
1925     handle_client_disconnect (NULL,
1926                               client_list->client);
1927   cron_flush_trust (NULL, NULL);
1928   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1929                                          &clean_peer,
1930                                          NULL);
1931   GNUNET_break (0 == GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap));
1932   GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
1933   requests_by_expiration_heap = 0;
1934   GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
1935   connected_peers = NULL;
1936   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map));
1937   GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
1938   query_request_map = NULL;
1939   GNUNET_LOAD_value_free (rt_entry_lifetime);
1940   rt_entry_lifetime = NULL;
1941   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map));
1942   GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
1943   peer_request_map = NULL;
1944   GNUNET_assert (NULL != core);
1945   GNUNET_CORE_disconnect (core);
1946   core = NULL;
1947   if (stats != NULL)
1948     {
1949       GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
1950       stats = NULL;
1951     }
1952   if (dsh != NULL)
1953     {
1954       GNUNET_DATASTORE_disconnect (dsh,
1955                                    GNUNET_NO);
1956       dsh = NULL;
1957     }
1958   while (mig_head != NULL)
1959     delete_migration_block (mig_head);
1960   GNUNET_assert (0 == mig_size);
1961   GNUNET_DHT_disconnect (dht_handle);
1962   dht_handle = NULL;
1963   GNUNET_LOAD_value_free (datastore_get_load);
1964   datastore_get_load = NULL;
1965   GNUNET_LOAD_value_free (datastore_put_load);
1966   datastore_put_load = NULL;
1967   GNUNET_BLOCK_context_destroy (block_ctx);
1968   block_ctx = NULL;
1969   GNUNET_CONFIGURATION_destroy (block_cfg);
1970   block_cfg = NULL;
1971   sched = NULL;
1972   cfg = NULL;  
1973   GNUNET_free_non_null (trustDirectory);
1974   trustDirectory = NULL;
1975 }
1976
1977
1978 /* ******************* Utility functions  ******************** */
1979
1980
1981 /**
1982  * We've had to delay a request for transmission to core, but now
1983  * we should be ready.  Run it.
1984  *
1985  * @param cls the 'struct ConnectedPeer' for which a request was delayed
1986  * @param tc task context (unused)
1987  */
1988 static void
1989 delayed_transmission_request (void *cls,
1990                               const struct GNUNET_SCHEDULER_TaskContext *tc)
1991 {
1992   struct ConnectedPeer *cp = cls;
1993   struct GNUNET_PeerIdentity pid;
1994   struct PendingMessage *pm;
1995
1996   pm = cp->pending_messages_head;
1997   cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
1998   GNUNET_assert (cp->cth == NULL);
1999   if (pm == NULL)
2000     return;
2001   GNUNET_PEER_resolve (cp->pid,
2002                        &pid);
2003   cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
2004   cp->cth = GNUNET_CORE_notify_transmit_ready (core,
2005                                                pm->priority,
2006                                                GNUNET_CONSTANTS_SERVICE_TIMEOUT,
2007                                                &pid,
2008                                                pm->msize,
2009                                                &transmit_to_peer,
2010                                                cp);
2011 }
2012
2013
2014 /**
2015  * Transmit messages by copying it to the target buffer
2016  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
2017  * for writing in the meantime.  In that case, do nothing
2018  * (the disconnect or shutdown handler will take care of the rest).
2019  * If we were able to transmit messages and there are still more
2020  * pending, ask core again for further calls to this function.
2021  *
2022  * @param cls closure, pointer to the 'struct ConnectedPeer*'
2023  * @param size number of bytes available in buf
2024  * @param buf where the callee should write the message
2025  * @return number of bytes written to buf
2026  */
2027 static size_t
2028 transmit_to_peer (void *cls,
2029                   size_t size, void *buf)
2030 {
2031   struct ConnectedPeer *cp = cls;
2032   char *cbuf = buf;
2033   struct PendingMessage *pm;
2034   struct PendingMessage *next_pm;
2035   struct GNUNET_TIME_Absolute now;
2036   struct GNUNET_TIME_Relative min_delay;
2037   struct MigrationReadyBlock *mb;
2038   struct MigrationReadyBlock *next;
2039   struct PutMessage migm;
2040   size_t msize;
2041   unsigned int i;
2042   struct GNUNET_PeerIdentity pid;
2043  
2044   cp->cth = NULL;
2045   if (NULL == buf)
2046     {
2047 #if DEBUG_FS
2048       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2049                   "Dropping message, core too busy.\n");
2050 #endif
2051       GNUNET_LOAD_update (cp->transmission_delay,
2052                           UINT64_MAX);
2053       return 0;
2054     }  
2055   GNUNET_LOAD_update (cp->transmission_delay,
2056                       GNUNET_TIME_absolute_get_duration (cp->last_transmission_request_start).value);
2057   now = GNUNET_TIME_absolute_get ();
2058   msize = 0;
2059   min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
2060   next_pm = cp->pending_messages_head;
2061   while ( (NULL != (pm = next_pm) ) &&
2062           (pm->msize <= size) )
2063     {
2064       next_pm = pm->next;
2065       if (pm->delay_until.value > now.value)
2066         {
2067           min_delay = GNUNET_TIME_relative_min (min_delay,
2068                                                 GNUNET_TIME_absolute_get_remaining (pm->delay_until));
2069           continue;
2070         }
2071       memcpy (&cbuf[msize], &pm[1], pm->msize);
2072       msize += pm->msize;
2073       size -= pm->msize;
2074       if (NULL == pm->pml)
2075         {
2076           GNUNET_CONTAINER_DLL_remove (cp->pending_messages_head,
2077                                        cp->pending_messages_tail,
2078                                        pm);
2079           cp->pending_requests--;
2080         }
2081       destroy_pending_message (pm, cp->pid);
2082     }
2083   if (pm != NULL)
2084     min_delay = GNUNET_TIME_UNIT_ZERO;
2085   if (NULL != cp->pending_messages_head)
2086     {     
2087       GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == cp->delayed_transmission_request_task);
2088       cp->delayed_transmission_request_task
2089         = GNUNET_SCHEDULER_add_delayed (sched,
2090                                         min_delay,
2091                                         &delayed_transmission_request,
2092                                         cp);
2093     }
2094   if (pm == NULL)
2095     {      
2096       GNUNET_PEER_resolve (cp->pid,
2097                            &pid);
2098       next = mig_head;
2099       while (NULL != (mb = next))
2100         {
2101           next = mb->next;
2102           for (i=0;i<MIGRATION_LIST_SIZE;i++)
2103             {
2104               if ( (cp->pid == mb->target_list[i]) &&
2105                    (mb->size + sizeof (migm) <= size) )
2106                 {
2107                   GNUNET_PEER_change_rc (mb->target_list[i], -1);
2108                   mb->target_list[i] = 0;
2109                   mb->used_targets++;
2110                   memset (&migm, 0, sizeof (migm));
2111                   migm.header.size = htons (sizeof (migm) + mb->size);
2112                   migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2113                   migm.type = htonl (mb->type);
2114                   migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
2115                   memcpy (&cbuf[msize], &migm, sizeof (migm));
2116                   msize += sizeof (migm);
2117                   size -= sizeof (migm);
2118                   memcpy (&cbuf[msize], &mb[1], mb->size);
2119                   msize += mb->size;
2120                   size -= mb->size;
2121 #if DEBUG_FS
2122                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2123                               "Pushing migration block `%s' (%u bytes) to `%s'\n",
2124                               GNUNET_h2s (&mb->query),
2125                               (unsigned int) mb->size,
2126                               GNUNET_i2s (&pid));
2127 #endif    
2128                   break;
2129                 }
2130               else
2131                 {
2132 #if DEBUG_FS
2133                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2134                               "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
2135                               GNUNET_h2s (&mb->query),
2136                               (unsigned int) mb->size,
2137                               GNUNET_i2s (&pid));
2138 #endif    
2139                 }
2140             }
2141           if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
2142                (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
2143             {
2144               delete_migration_block (mb);
2145               consider_migration_gathering ();
2146             }
2147         }
2148       consider_migration (NULL, 
2149                           &pid.hashPubKey,
2150                           cp);
2151     }
2152 #if DEBUG_FS
2153   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2154               "Transmitting %u bytes to peer with PID %u\n",
2155               (unsigned int) msize,
2156               (unsigned int) cp->pid);
2157 #endif
2158   return msize;
2159 }
2160
2161
2162 /**
2163  * Add a message to the set of pending messages for the given peer.
2164  *
2165  * @param cp peer to send message to
2166  * @param pm message to queue
2167  * @param pr request on which behalf this message is being queued
2168  */
2169 static void
2170 add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
2171                                   struct PendingMessage *pm,
2172                                   struct PendingRequest *pr)
2173 {
2174   struct PendingMessage *pos;
2175   struct PendingMessageList *pml;
2176   struct GNUNET_PeerIdentity pid;
2177
2178   GNUNET_assert (pm->next == NULL);
2179   GNUNET_assert (pm->pml == NULL);    
2180   if (pr != NULL)
2181     {
2182       pml = GNUNET_malloc (sizeof (struct PendingMessageList));
2183       pml->req = pr;
2184       pml->target = cp;
2185       pml->pm = pm;
2186       pm->pml = pml;  
2187       GNUNET_CONTAINER_DLL_insert (pr->pending_head,
2188                                    pr->pending_tail,
2189                                    pml);
2190     }
2191   pos = cp->pending_messages_head;
2192   while ( (pos != NULL) &&
2193           (pm->priority < pos->priority) )
2194     pos = pos->next;    
2195   GNUNET_CONTAINER_DLL_insert_after (cp->pending_messages_head,
2196                                      cp->pending_messages_tail,
2197                                      pos,
2198                                      pm);
2199   cp->pending_requests++;
2200   if (cp->pending_requests > MAX_QUEUE_PER_PEER)
2201     {
2202       GNUNET_STATISTICS_update (stats,
2203                                 gettext_noop ("# P2P searches discarded (queue length bound)"),
2204                                 1,
2205                                 GNUNET_NO);
2206       destroy_pending_message (cp->pending_messages_tail, 0);  
2207     }
2208   GNUNET_PEER_resolve (cp->pid, &pid);
2209   if (NULL != cp->cth)
2210     {
2211       GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
2212       cp->cth = NULL;
2213     }
2214   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
2215     {
2216       GNUNET_SCHEDULER_cancel (sched, cp->delayed_transmission_request_task);
2217       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
2218     }
2219   /* need to schedule transmission */
2220   cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
2221   cp->cth = GNUNET_CORE_notify_transmit_ready (core,
2222                                                cp->pending_messages_head->priority,
2223                                                MAX_TRANSMIT_DELAY,
2224                                                &pid,
2225                                                cp->pending_messages_head->msize,
2226                                                &transmit_to_peer,
2227                                                cp);
2228   if (cp->cth == NULL)
2229     {
2230 #if DEBUG_FS
2231       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2232                   "Failed to schedule transmission with core!\n");
2233 #endif
2234       GNUNET_STATISTICS_update (stats,
2235                                 gettext_noop ("# CORE transmission failures"),
2236                                 1,
2237                                 GNUNET_NO);
2238     }
2239 }
2240
2241
2242 /**
2243  * Test if the DATABASE (GET) load on this peer is too high
2244  * to even consider processing the query at
2245  * all.  
2246  * 
2247  * @return GNUNET_YES if the load is too high to do anything (load high)
2248  *         GNUNET_NO to process normally (load normal)
2249  *         GNUNET_SYSERR to process for free (load low)
2250  */
2251 static int
2252 test_get_load_too_high (uint32_t priority)
2253 {
2254   double ld;
2255
2256   ld = GNUNET_LOAD_get_load (datastore_get_load);
2257   if (ld < 1)
2258     {
2259       GNUNET_STATISTICS_update (stats,
2260                                 gettext_noop ("# requests done for free (low load)"),
2261                                 1,
2262                                 GNUNET_NO);
2263       return GNUNET_SYSERR;
2264     }
2265   if (ld <= priority)
2266     {
2267       GNUNET_STATISTICS_update (stats,
2268                                 gettext_noop ("# requests done for a price (normal load)"),
2269                                 1,
2270                                 GNUNET_NO);
2271       return GNUNET_NO;
2272     }
2273   GNUNET_STATISTICS_update (stats,
2274                             gettext_noop ("# priority determined to be high"),
2275                             1,
2276                             GNUNET_NO);
2277   return GNUNET_YES;
2278 }
2279
2280
2281
2282
2283 /**
2284  * Test if the DATABASE (PUT) load on this peer is too high
2285  * to even consider processing the query at
2286  * all.  
2287  * 
2288  * @return GNUNET_YES if the load is too high to do anything (load high)
2289  *         GNUNET_NO to process normally (load normal or low)
2290  */
2291 static int
2292 test_put_load_too_high (uint32_t priority)
2293 {
2294   double ld;
2295
2296   if (GNUNET_LOAD_get_average (datastore_put_load) < 50)
2297     return GNUNET_NO; /* very fast */
2298   ld = GNUNET_LOAD_get_load (datastore_put_load);
2299   if (ld < 2.0 * (1 + priority))
2300     return GNUNET_NO;
2301   GNUNET_STATISTICS_update (stats,
2302                             gettext_noop ("# storage requests dropped due to high load"),
2303                             1,
2304                             GNUNET_NO);
2305   return GNUNET_YES;
2306 }
2307
2308
2309 /* ******************* Pending Request Refresh Task ******************** */
2310
2311
2312
2313 /**
2314  * We use a random delay to make the timing of requests less
2315  * predictable.  This function returns such a random delay.  We add a base
2316  * delay of MAX_CORK_DELAY (1s).
2317  *
2318  * FIXME: make schedule dependent on the specifics of the request?
2319  * Or bandwidth and number of connected peers and load?
2320  *
2321  * @return random delay to use for some request, between 1s and 1000+TTL_DECREMENT ms
2322  */
2323 static struct GNUNET_TIME_Relative
2324 get_processing_delay ()
2325 {
2326   return 
2327     GNUNET_TIME_relative_add (GNUNET_CONSTANTS_MAX_CORK_DELAY,
2328                               GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
2329                                                              GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2330                                                                                        TTL_DECREMENT)));
2331 }
2332
2333
2334 /**
2335  * We're processing a GET request from another peer and have decided
2336  * to forward it to other peers.  This function is called periodically
2337  * and should forward the request to other peers until we have all
2338  * possible replies.  If we have transmitted the *only* reply to
2339  * the initiator we should destroy the pending request.  If we have
2340  * many replies in the queue to the initiator, we should delay sending
2341  * out more queries until the reply queue has shrunk some.
2342  *
2343  * @param cls our "struct ProcessGetContext *"
2344  * @param tc unused
2345  */
2346 static void
2347 forward_request_task (void *cls,
2348                       const struct GNUNET_SCHEDULER_TaskContext *tc);
2349
2350
2351 /**
2352  * Function called after we either failed or succeeded
2353  * at transmitting a query to a peer.  
2354  *
2355  * @param cls the requests "struct PendingRequest*"
2356  * @param tpid ID of receiving peer, 0 on transmission error
2357  */
2358 static void
2359 transmit_query_continuation (void *cls,
2360                              GNUNET_PEER_Id tpid)
2361 {
2362   struct PendingRequest *pr = cls;
2363   unsigned int i;
2364
2365   GNUNET_STATISTICS_update (stats,
2366                             gettext_noop ("# queries scheduled for forwarding"),
2367                             -1,
2368                             GNUNET_NO);
2369   if (tpid == 0)   
2370     {
2371 #if DEBUG_FS
2372       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2373                   "Transmission of request failed, will try again later.\n");
2374 #endif
2375       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2376         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2377                                                  get_processing_delay (),
2378                                                  &forward_request_task,
2379                                                  pr); 
2380       return;    
2381     }
2382 #if DEBUG_FS
2383   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2384               "Transmitted query `%s'\n",
2385               GNUNET_h2s (&pr->query));
2386 #endif
2387   GNUNET_STATISTICS_update (stats,
2388                             gettext_noop ("# queries forwarded"),
2389                             1,
2390                             GNUNET_NO);
2391   for (i=0;i<pr->used_targets_off;i++)
2392     if (pr->used_targets[i].pid == tpid)
2393       break; /* found match! */    
2394   if (i == pr->used_targets_off)
2395     {
2396       /* need to create new entry */
2397       if (pr->used_targets_off == pr->used_targets_size)
2398         GNUNET_array_grow (pr->used_targets,
2399                            pr->used_targets_size,
2400                            pr->used_targets_size * 2 + 2);
2401       GNUNET_PEER_change_rc (tpid, 1);
2402       pr->used_targets[pr->used_targets_off].pid = tpid;
2403       pr->used_targets[pr->used_targets_off].num_requests = 0;
2404       i = pr->used_targets_off++;
2405     }
2406   pr->used_targets[i].last_request_time = GNUNET_TIME_absolute_get ();
2407   pr->used_targets[i].num_requests++;
2408   if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2409     pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2410                                              get_processing_delay (),
2411                                              &forward_request_task,
2412                                              pr);
2413 }
2414
2415
2416 /**
2417  * How many bytes should a bloomfilter be if we have already seen
2418  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
2419  * of bits set per entry.  Furthermore, we should not re-size the
2420  * filter too often (to keep it cheap).
2421  *
2422  * Since other peers will also add entries but not resize the filter,
2423  * we should generally pick a slightly larger size than what the
2424  * strict math would suggest.
2425  *
2426  * @return must be a power of two and smaller or equal to 2^15.
2427  */
2428 static size_t
2429 compute_bloomfilter_size (unsigned int entry_count)
2430 {
2431   size_t size;
2432   unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
2433   uint16_t max = 1 << 15;
2434
2435   if (entry_count > max)
2436     return max;
2437   size = 8;
2438   while ((size < max) && (size < ideal))
2439     size *= 2;
2440   if (size > max)
2441     return max;
2442   return size;
2443 }
2444
2445
2446 /**
2447  * Recalculate our bloom filter for filtering replies.  This function
2448  * will create a new bloom filter from scratch, so it should only be
2449  * called if we have no bloomfilter at all (and hence can create a
2450  * fresh one of minimal size without problems) OR if our peer is the
2451  * initiator (in which case we may resize to larger than mimimum size).
2452  *
2453  * @param pr request for which the BF is to be recomputed
2454  */
2455 static void
2456 refresh_bloomfilter (struct PendingRequest *pr)
2457 {
2458   unsigned int i;
2459   size_t nsize;
2460   GNUNET_HashCode mhash;
2461
2462   nsize = compute_bloomfilter_size (pr->replies_seen_off);
2463   if (nsize == pr->bf_size)
2464     return; /* size not changed */
2465   if (pr->bf != NULL)
2466     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
2467   pr->bf_size = nsize;
2468   pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
2469   pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
2470                                               pr->bf_size,
2471                                               BLOOMFILTER_K);
2472   for (i=0;i<pr->replies_seen_off;i++)
2473     {
2474       GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i],
2475                                 pr->mingle,
2476                                 &mhash);
2477       GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
2478     }
2479 }
2480
2481
2482 /**
2483  * Function called after we've tried to reserve a certain amount of
2484  * bandwidth for a reply.  Check if we succeeded and if so send our
2485  * query.
2486  *
2487  * @param cls the requests "struct PendingRequest*"
2488  * @param peer identifies the peer
2489  * @param bpm_in set to the current bandwidth limit (receiving) for this peer
2490  * @param bpm_out set to the current bandwidth limit (sending) for this peer
2491  * @param amount set to the amount that was actually reserved or unreserved
2492  * @param preference current traffic preference for the given peer
2493  */
2494 static void
2495 target_reservation_cb (void *cls,
2496                        const struct
2497                        GNUNET_PeerIdentity * peer,
2498                        struct GNUNET_BANDWIDTH_Value32NBO bpm_in,
2499                        struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
2500                        int amount,
2501                        uint64_t preference)
2502 {
2503   struct PendingRequest *pr = cls;
2504   struct ConnectedPeer *cp;
2505   struct PendingMessage *pm;
2506   struct GetMessage *gm;
2507   GNUNET_HashCode *ext;
2508   char *bfdata;
2509   size_t msize;
2510   unsigned int k;
2511   int no_route;
2512   uint32_t bm;
2513   unsigned int i;
2514
2515   pr->irc = NULL;
2516   if (peer == NULL)
2517     {
2518       /* error in communication with core, try again later */
2519       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2520         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2521                                                  get_processing_delay (),
2522                                                  &forward_request_task,
2523                                                  pr);
2524       return;
2525     }
2526   /* (3) transmit, update ttl/priority */
2527   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2528                                           &peer->hashPubKey);
2529   if (cp == NULL)
2530     {
2531       /* Peer must have just left */
2532 #if DEBUG_FS
2533       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2534                   "Selected peer disconnected!\n");
2535 #endif
2536       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2537         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2538                                                  get_processing_delay (),
2539                                                  &forward_request_task,
2540                                                  pr);
2541       return;
2542     }
2543   no_route = GNUNET_NO;
2544   if (amount == 0)
2545     {
2546       if (pr->cp == NULL)
2547         {
2548 #if DEBUG_FS > 1
2549           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2550                       "Failed to reserve bandwidth for reply (got %d/%u bytes only)!\n",
2551                       amount,
2552                       DBLOCK_SIZE);
2553 #endif
2554           GNUNET_STATISTICS_update (stats,
2555                                     gettext_noop ("# reply bandwidth reservation requests failed"),
2556                                     1,
2557                                     GNUNET_NO);
2558           if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2559             pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2560                                                      get_processing_delay (),
2561                                                      &forward_request_task,
2562                                                      pr);
2563           return;  /* this target round failed */
2564         }
2565       no_route = GNUNET_YES;
2566     }
2567   
2568   GNUNET_STATISTICS_update (stats,
2569                             gettext_noop ("# queries scheduled for forwarding"),
2570                             1,
2571                             GNUNET_NO);
2572   for (i=0;i<pr->used_targets_off;i++)
2573     if (pr->used_targets[i].pid == cp->pid) 
2574       {
2575         GNUNET_STATISTICS_update (stats,
2576                                   gettext_noop ("# queries retransmitted to same target"),
2577                                   1,
2578                                   GNUNET_NO);
2579         break;
2580       } 
2581
2582   /* build message and insert message into priority queue */
2583 #if DEBUG_FS
2584   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2585               "Forwarding request `%s' to `%4s'!\n",
2586               GNUNET_h2s (&pr->query),
2587               GNUNET_i2s (peer));
2588 #endif
2589   k = 0;
2590   bm = 0;
2591   if (GNUNET_YES == no_route)
2592     {
2593       bm |= GET_MESSAGE_BIT_RETURN_TO;
2594       k++;      
2595     }
2596   if (pr->namespace != NULL)
2597     {
2598       bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
2599       k++;
2600     }
2601   if (pr->target_pid != 0)
2602     {
2603       bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
2604       k++;
2605     }
2606   msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
2607   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
2608   pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
2609   pm->msize = msize;
2610   gm = (struct GetMessage*) &pm[1];
2611   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
2612   gm->header.size = htons (msize);
2613   gm->type = htonl (pr->type);
2614   pr->remaining_priority /= 2;
2615   gm->priority = htonl (pr->remaining_priority);
2616   gm->ttl = htonl (pr->ttl);
2617   gm->filter_mutator = htonl(pr->mingle); 
2618   gm->hash_bitmap = htonl (bm);
2619   gm->query = pr->query;
2620   ext = (GNUNET_HashCode*) &gm[1];
2621   k = 0;
2622   if (GNUNET_YES == no_route)
2623     GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2624   if (pr->namespace != NULL)
2625     memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
2626   if (pr->target_pid != 0)
2627     GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2628   bfdata = (char *) &ext[k];
2629   if (pr->bf != NULL)
2630     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
2631                                                bfdata,
2632                                                pr->bf_size);
2633   pm->cont = &transmit_query_continuation;
2634   pm->cont_cls = pr;
2635   cp->last_request_times[(cp->last_request_times_off++) % MAX_QUEUE_PER_PEER] = GNUNET_TIME_absolute_get ();
2636   add_to_pending_messages_for_peer (cp, pm, pr);
2637 }
2638
2639
2640 /**
2641  * Closure used for "target_peer_select_cb".
2642  */
2643 struct PeerSelectionContext 
2644 {
2645   /**
2646    * The request for which we are selecting
2647    * peers.
2648    */
2649   struct PendingRequest *pr;
2650
2651   /**
2652    * Current "prime" target.
2653    */
2654   struct GNUNET_PeerIdentity target;
2655
2656   /**
2657    * How much do we like this target?
2658    */
2659   double target_score;
2660
2661 };
2662
2663
2664 /**
2665  * Function called for each connected peer to determine
2666  * which one(s) would make good targets for forwarding.
2667  *
2668  * @param cls closure (struct PeerSelectionContext)
2669  * @param key current key code (peer identity)
2670  * @param value value in the hash map (struct ConnectedPeer)
2671  * @return GNUNET_YES if we should continue to
2672  *         iterate,
2673  *         GNUNET_NO if not.
2674  */
2675 static int
2676 target_peer_select_cb (void *cls,
2677                        const GNUNET_HashCode * key,
2678                        void *value)
2679 {
2680   struct PeerSelectionContext *psc = cls;
2681   struct ConnectedPeer *cp = value;
2682   struct PendingRequest *pr = psc->pr;
2683   struct GNUNET_TIME_Relative delay;
2684   double score;
2685   unsigned int i;
2686   unsigned int pc;
2687
2688   /* 1) check that this peer is not the initiator */
2689   if (cp == pr->cp)
2690     {
2691 #if DEBUG_FS
2692       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2693                   "Skipping initiator in forwarding selection\n");
2694 #endif
2695       return GNUNET_YES; /* skip */        
2696     }
2697
2698   /* 2) check if we have already (recently) forwarded to this peer */
2699   /* 2a) this particular request */
2700   pc = 0;
2701   for (i=0;i<pr->used_targets_off;i++)
2702     if (pr->used_targets[i].pid == cp->pid) 
2703       {
2704         pc = pr->used_targets[i].num_requests;
2705         GNUNET_assert (pc > 0);
2706         if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2707                                            RETRY_PROBABILITY_INV * pc))
2708           {
2709 #if DEBUG_FS
2710             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2711                         "NOT re-trying query that was previously transmitted %u times\n",
2712                         (unsigned int) pc);
2713 #endif
2714             return GNUNET_YES; /* skip */
2715           }
2716         break;
2717       }
2718 #if DEBUG_FS
2719   if (0 < pc)
2720     {
2721       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2722                   "Re-trying query that was previously transmitted %u times to this peer\n",
2723                   (unsigned int) pc);
2724     }
2725 #endif
2726   /* 2b) many other requests to this peer */
2727   delay = GNUNET_TIME_absolute_get_duration (cp->last_request_times[cp->last_request_times_off % MAX_QUEUE_PER_PEER]);
2728   if (delay.value <= cp->avg_delay.value)
2729     {
2730 #if DEBUG_FS
2731       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2732                   "NOT sending query since we send %u others to this peer in the last %llums\n",
2733                   MAX_QUEUE_PER_PEER,
2734                   cp->avg_delay.value);
2735 #endif
2736       return GNUNET_YES; /* skip */      
2737     }
2738
2739   /* 3) calculate how much we'd like to forward to this peer,
2740      starting with a random value that is strong enough
2741      to at least give any peer a chance sometimes 
2742      (compared to the other factors that come later) */
2743   /* 3a) count successful (recent) routes from cp for same source */
2744   if (pr->cp != NULL)
2745     {
2746       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2747                                         P2P_SUCCESS_LIST_SIZE);
2748       for (i=0;i<P2P_SUCCESS_LIST_SIZE;i++)
2749         if (cp->last_p2p_replies[i] == pr->cp->pid)
2750           score += 1.0; /* likely successful based on hot path */
2751     }
2752   else
2753     {
2754       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2755                                         CS2P_SUCCESS_LIST_SIZE);
2756       for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
2757         if (cp->last_client_replies[i] == pr->client_request_list->client_list->client)
2758           score += 1.0; /* likely successful based on hot path */
2759     }
2760   /* 3b) include latency */
2761   if (cp->avg_delay.value < 4 * TTL_DECREMENT)
2762     score += 1.0; /* likely fast based on latency */
2763   /* 3c) include priorities */
2764   if (cp->avg_priority <= pr->remaining_priority / 2.0)
2765     score += 1.0; /* likely successful based on priorities */
2766   /* 3d) penalize for queue size */  
2767   score -= (2.0 * cp->pending_requests / (double) MAX_QUEUE_PER_PEER); 
2768   /* 3e) include peer proximity */
2769   score -= (2.0 * (GNUNET_CRYPTO_hash_distance_u32 (key,
2770                                                     &pr->query)) / (double) UINT32_MAX);
2771   /* 4) super-bonus for being the known target */
2772   if (pr->target_pid == cp->pid)
2773     score += 100.0;
2774   /* store best-fit in closure */
2775 #if DEBUG_FS
2776   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2777               "Peer `%s' gets score %f for forwarding query, max is %f\n",
2778               GNUNET_h2s (key),
2779               score,
2780               psc->target_score);
2781 #endif  
2782   score++; /* avoid zero */
2783   if (score > psc->target_score)
2784     {
2785       psc->target_score = score;
2786       psc->target.hashPubKey = *key; 
2787     }
2788   return GNUNET_YES;
2789 }
2790   
2791
2792 /**
2793  * The priority level imposes a bound on the maximum
2794  * value for the ttl that can be requested.
2795  *
2796  * @param ttl_in requested ttl
2797  * @param prio given priority
2798  * @return ttl_in if ttl_in is below the limit,
2799  *         otherwise the ttl-limit for the given priority
2800  */
2801 static int32_t
2802 bound_ttl (int32_t ttl_in, uint32_t prio)
2803 {
2804   unsigned long long allowed;
2805
2806   if (ttl_in <= 0)
2807     return ttl_in;
2808   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
2809   if (ttl_in > allowed)      
2810     {
2811       if (allowed >= (1 << 30))
2812         return 1 << 30;
2813       return allowed;
2814     }
2815   return ttl_in;
2816 }
2817
2818
2819 /**
2820  * Iterator called on each result obtained for a DHT
2821  * operation that expects a reply
2822  *
2823  * @param cls closure
2824  * @param exp when will this value expire
2825  * @param key key of the result
2826  * @param get_path NULL-terminated array of pointers
2827  *                 to the peers on reverse GET path (or NULL if not recorded)
2828  * @param put_path NULL-terminated array of pointers
2829  *                 to the peers on the PUT path (or NULL if not recorded)
2830  * @param type type of the result
2831  * @param size number of bytes in data
2832  * @param data pointer to the result data
2833  */
2834 static void
2835 process_dht_reply (void *cls,
2836                    struct GNUNET_TIME_Absolute exp,
2837                    const GNUNET_HashCode * key,
2838                    const struct GNUNET_PeerIdentity * const *get_path,
2839                    const struct GNUNET_PeerIdentity * const *put_path,
2840                    enum GNUNET_BLOCK_Type type,
2841                    size_t size,
2842                    const void *data);
2843
2844
2845 /**
2846  * We're processing a GET request and have decided
2847  * to forward it to other peers.  This function is called periodically
2848  * and should forward the request to other peers until we have all
2849  * possible replies.  If we have transmitted the *only* reply to
2850  * the initiator we should destroy the pending request.  If we have
2851  * many replies in the queue to the initiator, we should delay sending
2852  * out more queries until the reply queue has shrunk some.
2853  *
2854  * @param cls our "struct ProcessGetContext *"
2855  * @param tc unused
2856  */
2857 static void
2858 forward_request_task (void *cls,
2859                      const struct GNUNET_SCHEDULER_TaskContext *tc)
2860 {
2861   struct PendingRequest *pr = cls;
2862   struct PeerSelectionContext psc;
2863   struct ConnectedPeer *cp; 
2864   struct GNUNET_TIME_Relative delay;
2865
2866   pr->task = GNUNET_SCHEDULER_NO_TASK;
2867   if (pr->irc != NULL)
2868     {
2869 #if DEBUG_FS
2870       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2871                   "Forwarding of query `%s' not attempted due to pending local lookup!\n",
2872                   GNUNET_h2s (&pr->query));
2873 #endif
2874       return; /* already pending */
2875     }
2876   if (GNUNET_YES == pr->local_only)
2877     return; /* configured to not do P2P search */
2878   /* (0) try DHT */
2879   if ( (0 == pr->anonymity_level) &&
2880        (GNUNET_YES != pr->forward_only) &&
2881        (pr->type != GNUNET_BLOCK_TYPE_FS_DBLOCK) &&
2882        (pr->type != GNUNET_BLOCK_TYPE_FS_IBLOCK) )
2883     {
2884       pr->dht_get = GNUNET_DHT_get_start (dht_handle,
2885                                           GNUNET_TIME_UNIT_FOREVER_REL,
2886                                           pr->type,
2887                                           &pr->query,
2888                                           GNUNET_DHT_RO_NONE,
2889                                           pr->bf,
2890                                           pr->mingle,
2891                                           pr->namespace,
2892                                           (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
2893                                           &process_dht_reply,
2894                                           pr);
2895     }
2896   /* (1) select target */
2897   psc.pr = pr;
2898   psc.target_score = -DBL_MAX;
2899   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
2900                                          &target_peer_select_cb,
2901                                          &psc);  
2902   if (psc.target_score == -DBL_MAX)
2903     {
2904       delay = get_processing_delay ();
2905 #if DEBUG_FS 
2906       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2907                   "No peer selected for forwarding of query `%s', will try again in %llu ms!\n",
2908                   GNUNET_h2s (&pr->query),
2909                   delay.value);
2910 #endif
2911       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2912                                                delay,
2913                                                &forward_request_task,
2914                                                pr);
2915       return; /* nobody selected */
2916     }
2917   /* (3) update TTL/priority */
2918   if (pr->client_request_list != NULL)
2919     {
2920       /* FIXME: use better algorithm!? */
2921       if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2922                                          4))
2923         pr->priority++;
2924       /* bound priority we use by priorities we see from other peers
2925          rounded up (must round up so that we can see non-zero
2926          priorities, but round up as little as possible to make it
2927          plausible that we forwarded another peers request) */
2928       if (pr->priority > current_priorities + 1.0)
2929         pr->priority = (uint32_t) current_priorities + 1.0;
2930       pr->ttl = bound_ttl (pr->ttl + TTL_DECREMENT * 2,
2931                            pr->priority);
2932 #if DEBUG_FS
2933       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2934                   "Trying query `%s' with priority %u and TTL %d.\n",
2935                   GNUNET_h2s (&pr->query),
2936                   pr->priority,
2937                   pr->ttl);
2938 #endif
2939     }
2940
2941   /* (3) reserve reply bandwidth */
2942   if (GNUNET_NO == pr->forward_only)
2943     {
2944       cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2945                                               &psc.target.hashPubKey);
2946       GNUNET_assert (NULL != cp);
2947       pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
2948                                                     &psc.target,
2949                                                     GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
2950                                                     GNUNET_BANDWIDTH_value_init (UINT32_MAX),
2951                                                     DBLOCK_SIZE * 2, 
2952                                                     cp->inc_preference,
2953                                                     &target_reservation_cb,
2954                                                     pr);
2955       cp->inc_preference = 0;
2956     }
2957   else
2958     {
2959       /* force forwarding */
2960       static struct GNUNET_BANDWIDTH_Value32NBO zerobw;
2961       target_reservation_cb (pr, &psc.target,
2962                              zerobw, zerobw, 0, 0.0);
2963     }
2964 }
2965
2966
2967 /* **************************** P2P PUT Handling ************************ */
2968
2969
2970 /**
2971  * Function called after we either failed or succeeded
2972  * at transmitting a reply to a peer.  
2973  *
2974  * @param cls the requests "struct PendingRequest*"
2975  * @param tpid ID of receiving peer, 0 on transmission error
2976  */
2977 static void
2978 transmit_reply_continuation (void *cls,
2979                              GNUNET_PEER_Id tpid)
2980 {
2981   struct PendingRequest *pr = cls;
2982   
2983   switch (pr->type)
2984     {
2985     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
2986     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
2987       /* only one reply expected, done with the request! */
2988       destroy_pending_request (pr);
2989       break;
2990     case GNUNET_BLOCK_TYPE_ANY:
2991     case GNUNET_BLOCK_TYPE_FS_KBLOCK:
2992     case GNUNET_BLOCK_TYPE_FS_SBLOCK:
2993       break;
2994     default:
2995       GNUNET_break (0);
2996       break;
2997     }
2998 }
2999
3000
3001 /**
3002  * Transmit the given message by copying it to the target buffer
3003  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
3004  * for writing in the meantime.  In that case, do nothing
3005  * (the disconnect or shutdown handler will take care of the rest).
3006  * If we were able to transmit messages and there are still more
3007  * pending, ask core again for further calls to this function.
3008  *
3009  * @param cls closure, pointer to the 'struct ClientList*'
3010  * @param size number of bytes available in buf
3011  * @param buf where the callee should write the message
3012  * @return number of bytes written to buf
3013  */
3014 static size_t
3015 transmit_to_client (void *cls,
3016                   size_t size, void *buf)
3017 {
3018   struct ClientList *cl = cls;
3019   char *cbuf = buf;
3020   struct ClientResponseMessage *creply;
3021   size_t msize;
3022   
3023   cl->th = NULL;
3024   if (NULL == buf)
3025     {
3026 #if DEBUG_FS
3027       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3028                   "Not sending reply, client communication problem.\n");
3029 #endif
3030       return 0;
3031     }
3032   msize = 0;
3033   while ( (NULL != (creply = cl->res_head) ) &&
3034           (creply->msize <= size) )
3035     {
3036       memcpy (&cbuf[msize], &creply[1], creply->msize);
3037       msize += creply->msize;
3038       size -= creply->msize;
3039       GNUNET_CONTAINER_DLL_remove (cl->res_head,
3040                                    cl->res_tail,
3041                                    creply);
3042       GNUNET_free (creply);
3043     }
3044   if (NULL != creply)
3045     cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
3046                                                   creply->msize,
3047                                                   GNUNET_TIME_UNIT_FOREVER_REL,
3048                                                   &transmit_to_client,
3049                                                   cl);
3050 #if DEBUG_FS
3051   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3052               "Transmitted %u bytes to client\n",
3053               (unsigned int) msize);
3054 #endif
3055   return msize;
3056 }
3057
3058
3059 /**
3060  * Closure for "process_reply" function.
3061  */
3062 struct ProcessReplyClosure
3063 {
3064   /**
3065    * The data for the reply.
3066    */
3067   const void *data;
3068
3069   /**
3070    * Who gave us this reply? NULL for local host (or DHT)
3071    */
3072   struct ConnectedPeer *sender;
3073
3074   /**
3075    * When the reply expires.
3076    */
3077   struct GNUNET_TIME_Absolute expiration;
3078
3079   /**
3080    * Size of data.
3081    */
3082   size_t size;
3083
3084   /**
3085    * Type of the block.
3086    */
3087   enum GNUNET_BLOCK_Type type;
3088
3089   /**
3090    * How much was this reply worth to us?
3091    */
3092   uint32_t priority;
3093
3094   /**
3095    * Evaluation result (returned).
3096    */
3097   enum GNUNET_BLOCK_EvaluationResult eval;
3098
3099   /**
3100    * Did we finish processing the associated request?
3101    */ 
3102   int finished;
3103
3104   /**
3105    * Did we find a matching request?
3106    */
3107   int request_found;
3108 };
3109
3110
3111 /**
3112  * We have received a reply; handle it!
3113  *
3114  * @param cls response (struct ProcessReplyClosure)
3115  * @param key our query
3116  * @param value value in the hash map (info about the query)
3117  * @return GNUNET_YES (we should continue to iterate)
3118  */
3119 static int
3120 process_reply (void *cls,
3121                const GNUNET_HashCode * key,
3122                void *value)
3123 {
3124   struct ProcessReplyClosure *prq = cls;
3125   struct PendingRequest *pr = value;
3126   struct PendingMessage *reply;
3127   struct ClientResponseMessage *creply;
3128   struct ClientList *cl;
3129   struct PutMessage *pm;
3130   struct ConnectedPeer *cp;
3131   struct GNUNET_TIME_Relative cur_delay;
3132 #if SUPPORT_DELAYS  
3133 struct GNUNET_TIME_Relative art_delay;
3134 #endif
3135   size_t msize;
3136   unsigned int i;
3137
3138 #if DEBUG_FS
3139   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3140               "Matched result (type %u) for query `%s' with pending request\n",
3141               (unsigned int) prq->type,
3142               GNUNET_h2s (key));
3143 #endif  
3144   GNUNET_STATISTICS_update (stats,
3145                             gettext_noop ("# replies received and matched"),
3146                             1,
3147                             GNUNET_NO);
3148   if (prq->sender != NULL)
3149     {
3150       for (i=0;i<pr->used_targets_off;i++)
3151         if (pr->used_targets[i].pid == prq->sender->pid)
3152           break;
3153       if (i < pr->used_targets_off)
3154         {
3155           cur_delay = GNUNET_TIME_absolute_get_duration (pr->used_targets[i].last_request_time);      
3156           prq->sender->avg_delay.value
3157             = (prq->sender->avg_delay.value * 
3158                (RUNAVG_DELAY_N - 1) + cur_delay.value) / RUNAVG_DELAY_N; 
3159           prq->sender->avg_priority
3160             = (prq->sender->avg_priority * 
3161                (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
3162         }
3163       if (pr->cp != NULL)
3164         {
3165           GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
3166                                  [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], 
3167                                  -1);
3168           GNUNET_PEER_change_rc (pr->cp->pid, 1);
3169           prq->sender->last_p2p_replies
3170             [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
3171             = pr->cp->pid;
3172         }
3173       else
3174         {
3175           if (NULL != prq->sender->last_client_replies
3176               [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
3177             GNUNET_SERVER_client_drop (prq->sender->last_client_replies
3178                                        [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
3179           prq->sender->last_client_replies
3180             [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
3181             = pr->client_request_list->client_list->client;
3182           GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
3183         }
3184     }
3185   prq->eval = GNUNET_BLOCK_evaluate (block_ctx,
3186                                      prq->type,
3187                                      key,
3188                                      &pr->bf,
3189                                      pr->mingle,
3190                                      pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
3191                                      prq->data,
3192                                      prq->size);
3193   switch (prq->eval)
3194     {
3195     case GNUNET_BLOCK_EVALUATION_OK_MORE:
3196       break;
3197     case GNUNET_BLOCK_EVALUATION_OK_LAST:
3198       while (NULL != pr->pending_head)
3199         destroy_pending_message_list_entry (pr->pending_head);
3200       if (pr->qe != NULL)
3201         {
3202           if (pr->client_request_list != NULL)
3203             GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
3204                                         GNUNET_YES);
3205           GNUNET_DATASTORE_cancel (pr->qe);
3206           pr->qe = NULL;
3207         }
3208       pr->do_remove = GNUNET_YES;
3209       if (pr->task != GNUNET_SCHEDULER_NO_TASK)
3210         {
3211           GNUNET_SCHEDULER_cancel (sched,
3212                                    pr->task);
3213           pr->task = GNUNET_SCHEDULER_NO_TASK;
3214         }
3215       GNUNET_break (GNUNET_YES ==
3216                     GNUNET_CONTAINER_multihashmap_remove (query_request_map,
3217                                                           key,
3218                                                           pr));
3219       GNUNET_LOAD_update (rt_entry_lifetime,
3220                           GNUNET_TIME_absolute_get_duration (pr->start_time).value);
3221       break;
3222     case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
3223       GNUNET_STATISTICS_update (stats,
3224                                 gettext_noop ("# duplicate replies discarded (bloomfilter)"),
3225                                 1,
3226                                 GNUNET_NO);
3227 #if DEBUG_FS
3228       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3229                   "Duplicate response `%s', discarding.\n",
3230                   GNUNET_h2s (&mhash));
3231 #endif
3232       return GNUNET_YES; /* duplicate */
3233     case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
3234       return GNUNET_YES; /* wrong namespace */  
3235     case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
3236       GNUNET_break (0);
3237       return GNUNET_YES;
3238     case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
3239       GNUNET_break (0);
3240       return GNUNET_YES;
3241     case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
3242       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3243                   _("Unsupported block type %u\n"),
3244                   prq->type);
3245       return GNUNET_NO;
3246     }
3247   if (pr->client_request_list != NULL)
3248     {
3249       if (pr->replies_seen_size == pr->replies_seen_off)
3250         GNUNET_array_grow (pr->replies_seen,
3251                            pr->replies_seen_size,
3252                            pr->replies_seen_size * 2 + 4);      
3253       GNUNET_CRYPTO_hash (prq->data,
3254                           prq->size,
3255                           &pr->replies_seen[pr->replies_seen_off++]);         
3256       refresh_bloomfilter (pr);
3257     }
3258   if (NULL == prq->sender)
3259     {
3260 #if DEBUG_FS
3261       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3262                   "Found result for query `%s' in local datastore\n",
3263                   GNUNET_h2s (key));
3264 #endif
3265       GNUNET_STATISTICS_update (stats,
3266                                 gettext_noop ("# results found locally"),
3267                                 1,
3268                                 GNUNET_NO);      
3269     }
3270   prq->priority += pr->remaining_priority;
3271   pr->remaining_priority = 0;
3272   pr->results_found++;
3273   prq->request_found = GNUNET_YES;
3274   if (NULL != pr->client_request_list)
3275     {
3276       GNUNET_STATISTICS_update (stats,
3277                                 gettext_noop ("# replies received for local clients"),
3278                                 1,
3279                                 GNUNET_NO);
3280       cl = pr->client_request_list->client_list;
3281       msize = sizeof (struct PutMessage) + prq->size;
3282       creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage));
3283       creply->msize = msize;
3284       creply->client_list = cl;
3285       GNUNET_CONTAINER_DLL_insert_after (cl->res_head,
3286                                          cl->res_tail,
3287                                          cl->res_tail,
3288                                          creply);      
3289       pm = (struct PutMessage*) &creply[1];
3290       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3291       pm->header.size = htons (msize);
3292       pm->type = htonl (prq->type);
3293       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3294       memcpy (&pm[1], prq->data, prq->size);      
3295       if (NULL == cl->th)
3296         {
3297 #if DEBUG_FS
3298           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3299                       "Transmitting result for query `%s' to client\n",
3300                       GNUNET_h2s (key));
3301 #endif  
3302           cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
3303                                                         msize,
3304                                                         GNUNET_TIME_UNIT_FOREVER_REL,
3305                                                         &transmit_to_client,
3306                                                         cl);
3307         }
3308       GNUNET_break (cl->th != NULL);
3309       if (pr->do_remove)                
3310         {
3311           prq->finished = GNUNET_YES;
3312           destroy_pending_request (pr);         
3313         }
3314     }
3315   else
3316     {
3317       cp = pr->cp;
3318 #if DEBUG_FS
3319       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3320                   "Transmitting result for query `%s' to other peer (PID=%u)\n",
3321                   GNUNET_h2s (key),
3322                   (unsigned int) cp->pid);
3323 #endif  
3324       GNUNET_STATISTICS_update (stats,
3325                                 gettext_noop ("# replies received for other peers"),
3326                                 1,
3327                                 GNUNET_NO);
3328       msize = sizeof (struct PutMessage) + prq->size;
3329       reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
3330       reply->cont = &transmit_reply_continuation;
3331       reply->cont_cls = pr;
3332 #if SUPPORT_DELAYS
3333       art_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
3334                                                  GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3335                                                                            TTL_DECREMENT));
3336       reply->delay_until 
3337         = GNUNET_TIME_relative_to_absolute (art_delay);
3338       GNUNET_STATISTICS_update (stats,
3339                                 gettext_noop ("cummulative artificial delay introduced (ms)"),
3340                                 art_delay.value,
3341                                 GNUNET_NO);
3342 #endif
3343       reply->msize = msize;
3344       reply->priority = UINT32_MAX; /* send replies first! */
3345       pm = (struct PutMessage*) &reply[1];
3346       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3347       pm->header.size = htons (msize);
3348       pm->type = htonl (prq->type);
3349       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3350       memcpy (&pm[1], prq->data, prq->size);
3351       add_to_pending_messages_for_peer (cp, reply, pr);
3352     }
3353   return GNUNET_YES;
3354 }
3355
3356
3357 /**
3358  * Iterator called on each result obtained for a DHT
3359  * operation that expects a reply
3360  *
3361  * @param cls closure
3362  * @param exp when will this value expire
3363  * @param key key of the result
3364  * @param get_path NULL-terminated array of pointers
3365  *                 to the peers on reverse GET path (or NULL if not recorded)
3366  * @param put_path NULL-terminated array of pointers
3367  *                 to the peers on the PUT path (or NULL if not recorded)
3368  * @param type type of the result
3369  * @param size number of bytes in data
3370  * @param data pointer to the result data
3371  */
3372 static void
3373 process_dht_reply (void *cls,
3374                    struct GNUNET_TIME_Absolute exp,
3375                    const GNUNET_HashCode * key,
3376                    const struct GNUNET_PeerIdentity * const *get_path,
3377                    const struct GNUNET_PeerIdentity * const *put_path,
3378                    enum GNUNET_BLOCK_Type type,
3379                    size_t size,
3380                    const void *data)
3381 {
3382   struct PendingRequest *pr = cls;
3383   struct ProcessReplyClosure prq;
3384
3385   memset (&prq, 0, sizeof (prq));
3386   prq.data = data;
3387   prq.expiration = exp;
3388   prq.size = size;  
3389   prq.type = type;
3390   process_reply (&prq, key, pr);
3391 }
3392
3393
3394
3395 /**
3396  * Continuation called to notify client about result of the
3397  * operation.
3398  *
3399  * @param cls closure
3400  * @param success GNUNET_SYSERR on failure
3401  * @param msg NULL on success, otherwise an error message
3402  */
3403 static void 
3404 put_migration_continuation (void *cls,
3405                             int success,
3406                             const char *msg)
3407 {
3408   struct GNUNET_TIME_Absolute *start = cls;
3409   struct GNUNET_TIME_Relative delay;
3410   
3411   delay = GNUNET_TIME_absolute_get_duration (*start);
3412   GNUNET_free (start);
3413   GNUNET_LOAD_update (datastore_put_load,
3414                       delay.value);
3415   if (GNUNET_OK == success)
3416     return;
3417   GNUNET_STATISTICS_update (stats,
3418                             gettext_noop ("# datastore 'put' failures"),
3419                             1,
3420                             GNUNET_NO);
3421 }
3422
3423
3424 /**
3425  * Handle P2P "PUT" message.
3426  *
3427  * @param cls closure, always NULL
3428  * @param other the other peer involved (sender or receiver, NULL
3429  *        for loopback messages where we are both sender and receiver)
3430  * @param message the actual message
3431  * @param latency reported latency of the connection with 'other'
3432  * @param distance reported distance (DV) to 'other' 
3433  * @return GNUNET_OK to keep the connection open,
3434  *         GNUNET_SYSERR to close it (signal serious error)
3435  */
3436 static int
3437 handle_p2p_put (void *cls,
3438                 const struct GNUNET_PeerIdentity *other,
3439                 const struct GNUNET_MessageHeader *message,
3440                 struct GNUNET_TIME_Relative latency,
3441                 uint32_t distance)
3442 {
3443   const struct PutMessage *put;
3444   uint16_t msize;
3445   size_t dsize;
3446   enum GNUNET_BLOCK_Type type;
3447   struct GNUNET_TIME_Absolute expiration;
3448   GNUNET_HashCode query;
3449   struct ProcessReplyClosure prq;
3450   struct GNUNET_TIME_Absolute *start;
3451   struct GNUNET_TIME_Relative block_time;  
3452   double putl;
3453   struct ConnectedPeer *cp; 
3454   struct PendingMessage *pm;
3455   struct MigrationStopMessage *msm;
3456
3457   msize = ntohs (message->size);
3458   if (msize < sizeof (struct PutMessage))
3459     {
3460       GNUNET_break_op(0);
3461       return GNUNET_SYSERR;
3462     }
3463   put = (const struct PutMessage*) message;
3464   dsize = msize - sizeof (struct PutMessage);
3465   type = ntohl (put->type);
3466   expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
3467
3468   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3469     return GNUNET_SYSERR;
3470   if (GNUNET_OK !=
3471       GNUNET_BLOCK_get_key (block_ctx,
3472                             type,
3473                             &put[1],
3474                             dsize,
3475                             &query))
3476     {
3477       GNUNET_break_op (0);
3478       return GNUNET_SYSERR;
3479     }
3480 #if DEBUG_FS
3481   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3482               "Received result for query `%s' from peer `%4s'\n",
3483               GNUNET_h2s (&query),
3484               GNUNET_i2s (other));
3485 #endif
3486   GNUNET_STATISTICS_update (stats,
3487                             gettext_noop ("# replies received (overall)"),
3488                             1,
3489                             GNUNET_NO);
3490   /* now, lookup 'query' */
3491   prq.data = (const void*) &put[1];
3492   if (other != NULL)
3493     prq.sender = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3494                                                     &other->hashPubKey);
3495   else
3496     prq.sender = NULL;
3497   prq.size = dsize;
3498   prq.type = type;
3499   prq.expiration = expiration;
3500   prq.priority = 0;
3501   prq.finished = GNUNET_NO;
3502   prq.request_found = GNUNET_NO;
3503   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3504                                               &query,
3505                                               &process_reply,
3506                                               &prq);
3507   if (prq.sender != NULL)
3508     {
3509       prq.sender->inc_preference += CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority;
3510       prq.sender->trust += prq.priority;
3511     }
3512   if ( (GNUNET_YES == active_migration) &&
3513        (GNUNET_NO == test_put_load_too_high (prq.priority)) )
3514     {      
3515 #if DEBUG_FS
3516       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3517                   "Replicating result for query `%s' with priority %u\n",
3518                   GNUNET_h2s (&query),
3519                   prq.priority);
3520 #endif
3521       start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
3522       *start = GNUNET_TIME_absolute_get ();
3523       GNUNET_DATASTORE_put (dsh,
3524                             0, &query, dsize, &put[1],
3525                             type, prq.priority, 1 /* anonymity */, 
3526                             expiration, 
3527                             1 + prq.priority, MAX_DATASTORE_QUEUE,
3528                             GNUNET_CONSTANTS_SERVICE_TIMEOUT,
3529                             &put_migration_continuation, 
3530                             start);
3531     }
3532   putl = GNUNET_LOAD_get_load (datastore_put_load);
3533   if ( (GNUNET_NO == prq.request_found) &&
3534        ( (GNUNET_YES != active_migration) ||
3535          (putl > 2.5 * (1 + prq.priority)) ) )
3536     {
3537       cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3538                                               &other->hashPubKey);
3539       if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).value < 5000)
3540         return GNUNET_OK; /* already blocked */
3541       /* We're too busy; send MigrationStop message! */
3542       if (GNUNET_YES != active_migration) 
3543         putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
3544       block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
3545                                                   5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3546                                                                                    (unsigned int) (60000 * putl * putl)));
3547       
3548       cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
3549       pm = GNUNET_malloc (sizeof (struct PendingMessage) + 
3550                           sizeof (struct MigrationStopMessage));
3551       pm->msize = sizeof (struct MigrationStopMessage);
3552       pm->priority = UINT32_MAX;
3553       msm = (struct MigrationStopMessage*) &pm[1];
3554       msm->header.size = htons (sizeof (struct MigrationStopMessage));
3555       msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
3556       msm->duration = GNUNET_TIME_relative_hton (block_time);
3557       add_to_pending_messages_for_peer (cp,
3558                                         pm,
3559                                         NULL);
3560     }
3561   return GNUNET_OK;
3562 }
3563
3564
3565 /**
3566  * Handle P2P "MIGRATION_STOP" message.
3567  *
3568  * @param cls closure, always NULL
3569  * @param other the other peer involved (sender or receiver, NULL
3570  *        for loopback messages where we are both sender and receiver)
3571  * @param message the actual message
3572  * @param latency reported latency of the connection with 'other'
3573  * @param distance reported distance (DV) to 'other' 
3574  * @return GNUNET_OK to keep the connection open,
3575  *         GNUNET_SYSERR to close it (signal serious error)
3576  */
3577 static int
3578 handle_p2p_migration_stop (void *cls,
3579                            const struct GNUNET_PeerIdentity *other,
3580                            const struct GNUNET_MessageHeader *message,
3581                            struct GNUNET_TIME_Relative latency,
3582                            uint32_t distance)
3583 {
3584   struct ConnectedPeer *cp; 
3585   const struct MigrationStopMessage *msm;
3586
3587   msm = (const struct MigrationStopMessage*) message;
3588   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3589                                           &other->hashPubKey);
3590   if (cp == NULL)
3591     {
3592       GNUNET_break (0);
3593       return GNUNET_OK;
3594     }
3595   cp->migration_blocked = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration));
3596   return GNUNET_OK;
3597 }
3598
3599
3600
3601 /* **************************** P2P GET Handling ************************ */
3602
3603
3604 /**
3605  * Closure for 'check_duplicate_request_{peer,client}'.
3606  */
3607 struct CheckDuplicateRequestClosure
3608 {
3609   /**
3610    * The new request we should check if it already exists.
3611    */
3612   const struct PendingRequest *pr;
3613
3614   /**
3615    * Existing request found by the checker, NULL if none.
3616    */
3617   struct PendingRequest *have;
3618 };
3619
3620
3621 /**
3622  * Iterator over entries in the 'query_request_map' that
3623  * tries to see if we have the same request pending from
3624  * the same client already.
3625  *
3626  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
3627  * @param key current key code (query, ignored, must match)
3628  * @param value value in the hash map (a 'struct PendingRequest' 
3629  *              that already exists)
3630  * @return GNUNET_YES if we should continue to
3631  *         iterate (no match yet)
3632  *         GNUNET_NO if not (match found).
3633  */
3634 static int
3635 check_duplicate_request_client (void *cls,
3636                                 const GNUNET_HashCode * key,
3637                                 void *value)
3638 {
3639   struct CheckDuplicateRequestClosure *cdc = cls;
3640   struct PendingRequest *have = value;
3641
3642   if (have->client_request_list == NULL)
3643     return GNUNET_YES;
3644   if ( (cdc->pr->client_request_list->client_list->client == have->client_request_list->client_list->client) &&
3645        (cdc->pr != have) )
3646     {
3647       cdc->have = have;
3648       return GNUNET_NO;
3649     }
3650   return GNUNET_YES;
3651 }
3652
3653
3654 /**
3655  * We're processing (local) results for a search request
3656  * from another peer.  Pass applicable results to the
3657  * peer and if we are done either clean up (operation
3658  * complete) or forward to other peers (more results possible).
3659  *
3660  * @param cls our closure (struct LocalGetContext)
3661  * @param key key for the content
3662  * @param size number of bytes in data
3663  * @param data content stored
3664  * @param type type of the content
3665  * @param priority priority of the content
3666  * @param anonymity anonymity-level for the content
3667  * @param expiration expiration time for the content
3668  * @param uid unique identifier for the datum;
3669  *        maybe 0 if no unique identifier is available
3670  */
3671 static void
3672 process_local_reply (void *cls,
3673                      const GNUNET_HashCode * key,
3674                      size_t size,
3675                      const void *data,
3676                      enum GNUNET_BLOCK_Type type,
3677                      uint32_t priority,
3678                      uint32_t anonymity,
3679                      struct GNUNET_TIME_Absolute
3680                      expiration, 
3681                      uint64_t uid)
3682 {
3683   struct PendingRequest *pr = cls;
3684   struct ProcessReplyClosure prq;
3685   struct CheckDuplicateRequestClosure cdrc;
3686   GNUNET_HashCode query;
3687   unsigned int old_rf;
3688   
3689   if (NULL == key)
3690     {
3691 #if DEBUG_FS > 1
3692       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3693                   "Done processing local replies, forwarding request to other peers.\n");
3694 #endif
3695       pr->qe = NULL;
3696       if (pr->client_request_list != NULL)
3697         {
3698           GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
3699                                       GNUNET_YES);
3700           /* Figure out if this is a duplicate request and possibly
3701              merge 'struct PendingRequest' entries */
3702           cdrc.have = NULL;
3703           cdrc.pr = pr;
3704           GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3705                                                       &pr->query,
3706                                                       &check_duplicate_request_client,
3707                                                       &cdrc);
3708           if (cdrc.have != NULL)
3709             {
3710 #if DEBUG_FS
3711               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3712                           "Received request for block `%s' twice from client, will only request once.\n",
3713                           GNUNET_h2s (&pr->query));
3714 #endif
3715               
3716               destroy_pending_request (pr);
3717               return;
3718             }
3719         }
3720       if (pr->local_only == GNUNET_YES)
3721         {
3722           destroy_pending_request (pr);
3723           return;
3724         }
3725       /* no more results */
3726       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
3727         pr->task = GNUNET_SCHEDULER_add_now (sched,
3728                                              &forward_request_task,
3729                                              pr);      
3730       return;
3731     }
3732 #if DEBUG_FS
3733   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3734               "New local response to `%s' of type %u.\n",
3735               GNUNET_h2s (key),
3736               type);
3737 #endif
3738   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3739     {
3740 #if DEBUG_FS
3741       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3742                   "Found ONDEMAND block, performing on-demand encoding\n");
3743 #endif
3744       GNUNET_STATISTICS_update (stats,
3745                                 gettext_noop ("# on-demand blocks matched requests"),
3746                                 1,
3747                                 GNUNET_NO);
3748       if (GNUNET_OK != 
3749           GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, 
3750                                             anonymity, expiration, uid, 
3751                                             &process_local_reply,
3752                                             pr))
3753       if (pr->qe != NULL)
3754         {
3755           GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3756         }
3757       return;
3758     }
3759   old_rf = pr->results_found;
3760   memset (&prq, 0, sizeof (prq));
3761   prq.data = data;
3762   prq.expiration = expiration;
3763   prq.size = size;  
3764   if (GNUNET_OK != 
3765       GNUNET_BLOCK_get_key (block_ctx,
3766                             type,
3767                             data,
3768                             size,
3769                             &query))
3770     {
3771       GNUNET_break (0);
3772       GNUNET_DATASTORE_remove (dsh,
3773                                key,
3774                                size, data,
3775                                -1, -1, 
3776                                GNUNET_TIME_UNIT_FOREVER_REL,
3777                                NULL, NULL);
3778       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3779       return;
3780     }
3781   prq.type = type;
3782   prq.priority = priority;  
3783   prq.finished = GNUNET_NO;
3784   prq.request_found = GNUNET_NO;
3785   if ( (old_rf == 0) &&
3786        (pr->results_found == 0) )
3787     update_datastore_delays (pr->start_time);
3788   process_reply (&prq, key, pr);
3789   if (prq.finished == GNUNET_YES)
3790     return;
3791   if (pr->qe == NULL)
3792     return; /* done here */
3793   if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
3794     {
3795       pr->local_only = GNUNET_YES; /* do not forward */
3796       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3797       return;
3798     }
3799   if ( (pr->client_request_list == NULL) &&
3800        ( (GNUNET_YES == test_get_load_too_high (0)) ||
3801          (pr->results_found > 5 + 2 * pr->priority) ) )
3802     {
3803 #if DEBUG_FS > 2
3804       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3805                   "Load too high, done with request\n");
3806 #endif
3807       GNUNET_STATISTICS_update (stats,
3808                                 gettext_noop ("# processing result set cut short due to load"),
3809                                 1,
3810                                 GNUNET_NO);
3811       /* FIXME: if this is activated, we might stall large downloads
3812          indefinitely since (presumably) the load can never go down again! */
3813 #if 0
3814       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3815       return;
3816 #endif
3817     }
3818   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3819 }
3820
3821
3822 /**
3823  * We've received a request with the specified priority.  Bound it
3824  * according to how much we trust the given peer.
3825  * 
3826  * @param prio_in requested priority
3827  * @param cp the peer making the request
3828  * @return effective priority
3829  */
3830 static int32_t
3831 bound_priority (uint32_t prio_in,
3832                 struct ConnectedPeer *cp)
3833 {
3834 #define N ((double)128.0)
3835   uint32_t ret;
3836   double rret;
3837   int ld;
3838
3839   ld = test_get_load_too_high (0);
3840   if (ld == GNUNET_SYSERR)
3841     return 0; /* excess resources */
3842   ret = change_host_trust (cp, prio_in);
3843   if (ret > 0)
3844     {
3845       if (ret > current_priorities + N)
3846         rret = current_priorities + N;
3847       else
3848         rret = ret;
3849       current_priorities 
3850         = (current_priorities * (N-1) + rret)/N;
3851     }
3852   if ( (ld == GNUNET_YES) && (ret > 0) )
3853     {
3854       /* try with charging */
3855       ld = test_get_load_too_high (ret);
3856     }
3857   if (ld == GNUNET_YES)
3858     {
3859       /* undo charge */
3860       if (ret != 0)
3861         change_host_trust (cp, -ret);
3862       return -1; /* not enough resources */
3863     }
3864 #undef N
3865   return ret;
3866 }
3867
3868
3869 /**
3870  * Iterator over entries in the 'query_request_map' that
3871  * tries to see if we have the same request pending from
3872  * the same peer already.
3873  *
3874  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
3875  * @param key current key code (query, ignored, must match)
3876  * @param value value in the hash map (a 'struct PendingRequest' 
3877  *              that already exists)
3878  * @return GNUNET_YES if we should continue to
3879  *         iterate (no match yet)
3880  *         GNUNET_NO if not (match found).
3881  */
3882 static int
3883 check_duplicate_request_peer (void *cls,
3884                               const GNUNET_HashCode * key,
3885                               void *value)
3886 {
3887   struct CheckDuplicateRequestClosure *cdc = cls;
3888   struct PendingRequest *have = value;
3889
3890   if (cdc->pr->target_pid == have->target_pid)
3891     {
3892       cdc->have = have;
3893       return GNUNET_NO;
3894     }
3895   return GNUNET_YES;
3896 }
3897
3898
3899 /**
3900  * Handle P2P "GET" request.
3901  *
3902  * @param cls closure, always NULL
3903  * @param other the other peer involved (sender or receiver, NULL
3904  *        for loopback messages where we are both sender and receiver)
3905  * @param message the actual message
3906  * @param latency reported latency of the connection with 'other'
3907  * @param distance reported distance (DV) to 'other' 
3908  * @return GNUNET_OK to keep the connection open,
3909  *         GNUNET_SYSERR to close it (signal serious error)
3910  */
3911 static int
3912 handle_p2p_get (void *cls,
3913                 const struct GNUNET_PeerIdentity *other,
3914                 const struct GNUNET_MessageHeader *message,
3915                 struct GNUNET_TIME_Relative latency,
3916                 uint32_t distance)
3917 {
3918   struct PendingRequest *pr;
3919   struct ConnectedPeer *cp;
3920   struct ConnectedPeer *cps;
3921   struct CheckDuplicateRequestClosure cdc;
3922   struct GNUNET_TIME_Relative timeout;
3923   uint16_t msize;
3924   const struct GetMessage *gm;
3925   unsigned int bits;
3926   const GNUNET_HashCode *opt;
3927   uint32_t bm;
3928   size_t bfsize;
3929   uint32_t ttl_decrement;
3930   int32_t priority;
3931   enum GNUNET_BLOCK_Type type;
3932   int have_ns;
3933
3934   msize = ntohs(message->size);
3935   if (msize < sizeof (struct GetMessage))
3936     {
3937       GNUNET_break_op (0);
3938       return GNUNET_SYSERR;
3939     }
3940   gm = (const struct GetMessage*) message;
3941 #if DEBUG_FS
3942   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3943               "Received request for `%s'\n",
3944               GNUNET_h2s (&gm->query));
3945 #endif
3946   type = ntohl (gm->type);
3947   bm = ntohl (gm->hash_bitmap);
3948   bits = 0;
3949   while (bm > 0)
3950     {
3951       if (1 == (bm & 1))
3952         bits++;
3953       bm >>= 1;
3954     }
3955   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
3956     {
3957       GNUNET_break_op (0);
3958       return GNUNET_SYSERR;
3959     }  
3960   opt = (const GNUNET_HashCode*) &gm[1];
3961   bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
3962   bm = ntohl (gm->hash_bitmap);
3963   bits = 0;
3964   cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3965                                            &other->hashPubKey);
3966   if (NULL == cps)
3967     {
3968       /* peer must have just disconnected */
3969       GNUNET_STATISTICS_update (stats,
3970                                 gettext_noop ("# requests dropped due to initiator not being connected"),
3971                                 1,
3972                                 GNUNET_NO);
3973       return GNUNET_SYSERR;
3974     }
3975   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
3976     cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3977                                             &opt[bits++]);
3978   else
3979     cp = cps;
3980   if (cp == NULL)
3981     {
3982 #if DEBUG_FS
3983       if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
3984         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3985                     "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
3986                     GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
3987       
3988       else
3989         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3990                     "Failed to find peer `%4s' in connection set. Dropping query.\n",
3991                     GNUNET_i2s (other));
3992 #endif
3993       GNUNET_STATISTICS_update (stats,
3994                                 gettext_noop ("# requests dropped due to missing reverse route"),
3995                                 1,
3996                                 GNUNET_NO);
3997      /* FIXME: try connect? */
3998       return GNUNET_OK;
3999     }
4000   /* note that we can really only check load here since otherwise
4001      peers could find out that we are overloaded by not being
4002      disconnected after sending us a malformed query... */
4003   priority = bound_priority (ntohl (gm->priority), cps);
4004   if (priority < 0)
4005     {
4006 #if DEBUG_FS
4007       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4008                   "Dropping query from `%s', this peer is too busy.\n",
4009                   GNUNET_i2s (other));
4010 #endif
4011       GNUNET_STATISTICS_update (stats,
4012                                 gettext_noop ("# requests dropped due to high load"),
4013                                 1,
4014                                 GNUNET_NO);
4015 #if 0
4016       /* FIXME: this causes problems... */
4017       return GNUNET_OK;
4018 #endif
4019     }
4020 #if DEBUG_FS 
4021   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4022               "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
4023               GNUNET_h2s (&gm->query),
4024               (unsigned int) type,
4025               GNUNET_i2s (other),
4026               (unsigned int) bm);
4027 #endif
4028   have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
4029   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
4030                       (have_ns ? sizeof(GNUNET_HashCode) : 0));
4031   if (have_ns)
4032     {
4033       pr->namespace = (GNUNET_HashCode*) &pr[1];
4034       memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
4035     }
4036   if ( (GNUNET_LOAD_get_load (cp->transmission_delay) > 3 * (1 + priority)) ||
4037        (GNUNET_LOAD_get_average (cp->transmission_delay) > 
4038         GNUNET_CONSTANTS_MAX_CORK_DELAY.value * 2 + GNUNET_LOAD_get_average (rt_entry_lifetime)) )
4039     {
4040       /* don't have BW to send to peer, or would likely take longer than we have for it,
4041          so at best indirect the query */
4042       priority = 0;
4043       /* FIXME: if this line is enabled, the 'perf' test for larger files simply "hangs";
4044          the cause seems to be that the load goes up (to the point where we do this)
4045          and then never goes down again... (outch) */
4046       // pr->forward_only = GNUNET_YES;
4047     }
4048   pr->type = type;
4049   pr->mingle = ntohl (gm->filter_mutator);
4050   if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
4051     pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
4052   pr->anonymity_level = 1;
4053   pr->priority = (uint32_t) priority;
4054   pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
4055   pr->query = gm->query;
4056   /* decrement ttl (always) */
4057   ttl_decrement = 2 * TTL_DECREMENT +
4058     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
4059                               TTL_DECREMENT);
4060   if ( (pr->ttl < 0) &&
4061        (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
4062     {
4063 #if DEBUG_FS
4064       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4065                   "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
4066                   GNUNET_i2s (other),
4067                   pr->ttl,
4068                   ttl_decrement);
4069 #endif
4070       GNUNET_STATISTICS_update (stats,
4071                                 gettext_noop ("# requests dropped due TTL underflow"),
4072                                 1,
4073                                 GNUNET_NO);
4074       /* integer underflow => drop (should be very rare)! */      
4075       GNUNET_free (pr);
4076       return GNUNET_OK;
4077     } 
4078   pr->ttl -= ttl_decrement;
4079   pr->start_time = GNUNET_TIME_absolute_get ();
4080
4081   /* get bloom filter */
4082   if (bfsize > 0)
4083     {
4084       pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
4085                                                   bfsize,
4086                                                   BLOOMFILTER_K);
4087       pr->bf_size = bfsize;
4088     }
4089   cdc.have = NULL;
4090   cdc.pr = pr;
4091   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
4092                                               &gm->query,
4093                                               &check_duplicate_request_peer,
4094                                               &cdc);
4095   if (cdc.have != NULL)
4096     {
4097       if (cdc.have->start_time.value + cdc.have->ttl >=
4098           pr->start_time.value + pr->ttl)
4099         {
4100           /* existing request has higher TTL, drop new one! */
4101           cdc.have->priority += pr->priority;
4102           destroy_pending_request (pr);
4103 #if DEBUG_FS
4104           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4105                       "Have existing request with higher TTL, dropping new request.\n",
4106                       GNUNET_i2s (other));
4107 #endif
4108           GNUNET_STATISTICS_update (stats,
4109                                     gettext_noop ("# requests dropped due to higher-TTL request"),
4110                                     1,
4111                                     GNUNET_NO);
4112           return GNUNET_OK;
4113         }
4114       else
4115         {
4116           /* existing request has lower TTL, drop old one! */
4117           pr->priority += cdc.have->priority;
4118           /* Possible optimization: if we have applicable pending
4119              replies in 'cdc.have', we might want to move those over
4120              (this is a really rare special-case, so it is not clear
4121              that this would be worth it) */
4122           destroy_pending_request (cdc.have);
4123           /* keep processing 'pr'! */
4124         }
4125     }
4126
4127   pr->cp = cp;
4128   GNUNET_break (GNUNET_OK ==
4129                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
4130                                                    &gm->query,
4131                                                    pr,
4132                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4133   GNUNET_break (GNUNET_OK ==
4134                 GNUNET_CONTAINER_multihashmap_put (peer_request_map,
4135                                                    &other->hashPubKey,
4136                                                    pr,
4137                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4138   
4139   pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
4140                                             pr,
4141                                             pr->start_time.value + pr->ttl);
4142
4143   GNUNET_STATISTICS_update (stats,
4144                             gettext_noop ("# P2P searches received"),
4145                             1,
4146                             GNUNET_NO);
4147   GNUNET_STATISTICS_update (stats,
4148                             gettext_noop ("# P2P searches active"),
4149                             1,
4150                             GNUNET_NO);
4151
4152   /* calculate change in traffic preference */
4153   cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
4154   /* process locally */
4155   if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
4156     type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
4157   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
4158                                            (pr->priority + 1)); 
4159   if (GNUNET_YES != pr->forward_only)
4160     {
4161 #if DEBUG_FS
4162       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4163                   "Handing request for `%s' to datastore\n",
4164                   GNUNET_h2s (&gm->query));
4165 #endif
4166       pr->qe = GNUNET_DATASTORE_get (dsh,
4167                                      &gm->query,
4168                                      type,                             
4169                                      pr->priority + 1,
4170                                      MAX_DATASTORE_QUEUE,                                
4171                                      timeout,
4172                                      &process_local_reply,
4173                                      pr);
4174       if (NULL == pr->qe)
4175         {
4176           GNUNET_STATISTICS_update (stats,
4177                                     gettext_noop ("# requests dropped by datastore (queue length limit)"),
4178                                     1,
4179                                     GNUNET_NO);
4180         }
4181     }
4182   else
4183     {
4184       GNUNET_STATISTICS_update (stats,
4185                                 gettext_noop ("# requests forwarded due to high load"),
4186                                 1,
4187                                 GNUNET_NO);
4188     }
4189
4190   /* Are multiple results possible (and did we look locally)?  If so, start processing remotely now! */
4191   switch (pr->type)
4192     {
4193     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
4194     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
4195       /* only one result, wait for datastore */
4196       if (GNUNET_YES != pr->forward_only)
4197         {
4198           GNUNET_STATISTICS_update (stats,
4199                                     gettext_noop ("# requests not instantly forwarded (waiting for datastore)"),
4200                                     1,
4201                                     GNUNET_NO);
4202           break;
4203         }
4204     default:
4205       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
4206         pr->task = GNUNET_SCHEDULER_add_now (sched,
4207                                              &forward_request_task,
4208                                              pr);
4209     }
4210
4211   /* make sure we don't track too many requests */
4212   if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
4213     {
4214       pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
4215       GNUNET_assert (pr != NULL);
4216       destroy_pending_request (pr);
4217     }
4218   return GNUNET_OK;
4219 }
4220
4221
4222 /* **************************** CS GET Handling ************************ */
4223
4224
4225 /**
4226  * Handle START_SEARCH-message (search request from client).
4227  *
4228  * @param cls closure
4229  * @param client identification of the client
4230  * @param message the actual message
4231  */
4232 static void
4233 handle_start_search (void *cls,
4234                      struct GNUNET_SERVER_Client *client,
4235                      const struct GNUNET_MessageHeader *message)
4236 {
4237   static GNUNET_HashCode all_zeros;
4238   const struct SearchMessage *sm;
4239   struct ClientList *cl;
4240   struct ClientRequestList *crl;
4241   struct PendingRequest *pr;
4242   uint16_t msize;
4243   unsigned int sc;
4244   enum GNUNET_BLOCK_Type type;
4245
4246   msize = ntohs (message->size);
4247   if ( (msize < sizeof (struct SearchMessage)) ||
4248        (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
4249     {
4250       GNUNET_break (0);
4251       GNUNET_SERVER_receive_done (client,
4252                                   GNUNET_SYSERR);
4253       return;
4254     }
4255   GNUNET_STATISTICS_update (stats,
4256                             gettext_noop ("# client searches received"),
4257                             1,
4258                             GNUNET_NO);
4259   sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
4260   sm = (const struct SearchMessage*) message;
4261   type = ntohl (sm->type);
4262 #if DEBUG_FS
4263   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4264               "Received request for `%s' of type %u from local client\n",
4265               GNUNET_h2s (&sm->query),
4266               (unsigned int) type);
4267 #endif
4268   cl = client_list;
4269   while ( (cl != NULL) &&
4270           (cl->client != client) )
4271     cl = cl->next;
4272   if (cl == NULL)
4273     {
4274       cl = GNUNET_malloc (sizeof (struct ClientList));
4275       cl->client = client;
4276       GNUNET_SERVER_client_keep (client);
4277       cl->next = client_list;
4278       client_list = cl;
4279     }
4280   /* detect duplicate KBLOCK requests */
4281   if ( (type == GNUNET_BLOCK_TYPE_FS_KBLOCK) ||
4282        (type == GNUNET_BLOCK_TYPE_FS_NBLOCK) ||
4283        (type == GNUNET_BLOCK_TYPE_ANY) )
4284     {
4285       crl = cl->rl_head;
4286       while ( (crl != NULL) &&
4287               ( (0 != memcmp (&crl->req->query,
4288                               &sm->query,
4289                               sizeof (GNUNET_HashCode))) ||
4290                 (crl->req->type != type) ) )
4291         crl = crl->next;
4292       if (crl != NULL)  
4293         { 
4294 #if DEBUG_FS
4295           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4296                       "Have existing request, merging content-seen lists.\n");
4297 #endif
4298           pr = crl->req;
4299           /* Duplicate request (used to send long list of
4300              known/blocked results); merge 'pr->replies_seen'
4301              and update bloom filter */
4302           GNUNET_array_grow (pr->replies_seen,
4303                              pr->replies_seen_size,
4304                              pr->replies_seen_off + sc);
4305           memcpy (&pr->replies_seen[pr->replies_seen_off],
4306                   &sm[1],
4307                   sc * sizeof (GNUNET_HashCode));
4308           pr->replies_seen_off += sc;
4309           refresh_bloomfilter (pr);
4310           GNUNET_STATISTICS_update (stats,
4311                                     gettext_noop ("# client searches updated (merged content seen list)"),
4312                                     1,
4313                                     GNUNET_NO);
4314           GNUNET_SERVER_receive_done (client,
4315                                       GNUNET_OK);
4316           return;
4317         }
4318     }
4319   GNUNET_STATISTICS_update (stats,
4320                             gettext_noop ("# client searches active"),
4321                             1,
4322                             GNUNET_NO);
4323   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
4324                       ((type == GNUNET_BLOCK_TYPE_FS_SBLOCK) ? sizeof(GNUNET_HashCode) : 0));
4325   crl = GNUNET_malloc (sizeof (struct ClientRequestList));
4326   memset (crl, 0, sizeof (struct ClientRequestList));
4327   crl->client_list = cl;
4328   GNUNET_CONTAINER_DLL_insert (cl->rl_head,
4329                                cl->rl_tail,
4330                                crl);  
4331   crl->req = pr;
4332   pr->type = type;
4333   pr->client_request_list = crl;
4334   GNUNET_array_grow (pr->replies_seen,
4335                      pr->replies_seen_size,
4336                      sc);
4337   memcpy (pr->replies_seen,
4338           &sm[1],
4339           sc * sizeof (GNUNET_HashCode));
4340   pr->replies_seen_off = sc;
4341   pr->anonymity_level = ntohl (sm->anonymity_level); 
4342   pr->start_time = GNUNET_TIME_absolute_get ();
4343   refresh_bloomfilter (pr);
4344   pr->query = sm->query;
4345   if (0 == (1 & ntohl (sm->options)))
4346     pr->local_only = GNUNET_NO;
4347   else
4348     pr->local_only = GNUNET_YES;
4349   switch (type)
4350     {
4351     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
4352     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
4353       if (0 != memcmp (&sm->target,
4354                        &all_zeros,
4355                        sizeof (GNUNET_HashCode)))
4356         pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target);
4357       break;
4358     case GNUNET_BLOCK_TYPE_FS_SBLOCK:
4359       pr->namespace = (GNUNET_HashCode*) &pr[1];
4360       memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode));
4361       break;
4362     default:
4363       break;
4364     }
4365   GNUNET_break (GNUNET_OK ==
4366                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
4367                                                    &sm->query,
4368                                                    pr,
4369                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4370   if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
4371     type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */
4372   pr->qe = GNUNET_DATASTORE_get (dsh,
4373                                  &sm->query,
4374                                  type,
4375                                  -3, -1,
4376                                  GNUNET_CONSTANTS_SERVICE_TIMEOUT,                             
4377                                  &process_local_reply,
4378                                  pr);
4379 }
4380
4381
4382 /* **************************** Startup ************************ */
4383
4384 /**
4385  * Process fs requests.
4386  *
4387  * @param s scheduler to use
4388  * @param server the initialized server
4389  * @param c configuration to use
4390  */
4391 static int
4392 main_init (struct GNUNET_SCHEDULER_Handle *s,
4393            struct GNUNET_SERVER_Handle *server,
4394            const struct GNUNET_CONFIGURATION_Handle *c)
4395 {
4396   static const struct GNUNET_CORE_MessageHandler p2p_handlers[] =
4397     {
4398       { &handle_p2p_get, 
4399         GNUNET_MESSAGE_TYPE_FS_GET, 0 },
4400       { &handle_p2p_put, 
4401         GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
4402       { &handle_p2p_migration_stop, 
4403         GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP,
4404         sizeof (struct MigrationStopMessage) },
4405       { NULL, 0, 0 }
4406     };
4407   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
4408     {&GNUNET_FS_handle_index_start, NULL, 
4409      GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
4410     {&GNUNET_FS_handle_index_list_get, NULL, 
4411      GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
4412     {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
4413      sizeof (struct UnindexMessage) },
4414     {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
4415      0 },
4416     {NULL, NULL, 0, 0}
4417   };
4418   unsigned long long enc = 128;
4419
4420   sched = s;
4421   cfg = c;
4422   stats = GNUNET_STATISTICS_create (sched, "fs", cfg);
4423   min_migration_delay = GNUNET_TIME_UNIT_SECONDS;
4424   if ( (GNUNET_OK !=
4425         GNUNET_CONFIGURATION_get_value_number (cfg,
4426                                                "fs",
4427                                                "MAX_PENDING_REQUESTS",
4428                                                &max_pending_requests)) ||
4429        (GNUNET_OK !=
4430         GNUNET_CONFIGURATION_get_value_number (cfg,
4431                                                "fs",
4432                                                "EXPECTED_NEIGHBOUR_COUNT",
4433                                                &enc)) ||
4434        (GNUNET_OK != 
4435         GNUNET_CONFIGURATION_get_value_time (cfg,
4436                                              "fs",
4437                                              "MIN_MIGRATION_DELAY",
4438                                              &min_migration_delay)) )
4439     {
4440       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4441                   _("Configuration fails to specify certain parameters, assuming default values."));
4442     }
4443   connected_peers = GNUNET_CONTAINER_multihashmap_create (enc); 
4444   query_request_map = GNUNET_CONTAINER_multihashmap_create (max_pending_requests);
4445   rt_entry_lifetime = GNUNET_LOAD_value_init ();
4446   peer_request_map = GNUNET_CONTAINER_multihashmap_create (enc);
4447   requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
4448   core = GNUNET_CORE_connect (sched,
4449                               cfg,
4450                               GNUNET_TIME_UNIT_FOREVER_REL,
4451                               NULL,
4452                               NULL,
4453                               &peer_connect_handler,
4454                               &peer_disconnect_handler,
4455                               NULL,
4456                               NULL, GNUNET_NO,
4457                               NULL, GNUNET_NO,
4458                               p2p_handlers);
4459   if (NULL == core)
4460     {
4461       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
4462                   _("Failed to connect to `%s' service.\n"),
4463                   "core");
4464       GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
4465       connected_peers = NULL;
4466       GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
4467       query_request_map = NULL;
4468       GNUNET_LOAD_value_free (rt_entry_lifetime);
4469       rt_entry_lifetime = NULL;
4470       GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
4471       requests_by_expiration_heap = NULL;
4472       GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
4473       peer_request_map = NULL;
4474       if (dsh != NULL)
4475         {
4476           GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4477           dsh = NULL;
4478         }
4479       return GNUNET_SYSERR;
4480     }
4481   /* FIXME: distinguish between sending and storing in options? */
4482   if (active_migration) 
4483     {
4484       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4485                   _("Content migration is enabled, will start to gather data\n"));
4486       consider_migration_gathering ();
4487     }
4488   consider_dht_put_gathering (NULL);
4489   GNUNET_SERVER_disconnect_notify (server, 
4490                                    &handle_client_disconnect,
4491                                    NULL);
4492   GNUNET_assert (GNUNET_OK ==
4493                  GNUNET_CONFIGURATION_get_value_filename (cfg,
4494                                                           "fs",
4495                                                           "TRUST",
4496                                                           &trustDirectory));
4497   GNUNET_DISK_directory_create (trustDirectory);
4498   GNUNET_SCHEDULER_add_with_priority (sched,
4499                                       GNUNET_SCHEDULER_PRIORITY_HIGH,
4500                                       &cron_flush_trust, NULL);
4501
4502
4503   GNUNET_SERVER_add_handlers (server, handlers);
4504   GNUNET_SCHEDULER_add_delayed (sched,
4505                                 GNUNET_TIME_UNIT_FOREVER_REL,
4506                                 &shutdown_task,
4507                                 NULL);
4508   return GNUNET_OK;
4509 }
4510
4511
4512 /**
4513  * Process fs requests.
4514  *
4515  * @param cls closure
4516  * @param sched scheduler to use
4517  * @param server the initialized server
4518  * @param cfg configuration to use
4519  */
4520 static void
4521 run (void *cls,
4522      struct GNUNET_SCHEDULER_Handle *sched,
4523      struct GNUNET_SERVER_Handle *server,
4524      const struct GNUNET_CONFIGURATION_Handle *cfg)
4525 {
4526   active_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
4527                                                            "FS",
4528                                                            "ACTIVEMIGRATION");
4529   dsh = GNUNET_DATASTORE_connect (cfg,
4530                                   sched);
4531   if (dsh == NULL)
4532     {
4533       GNUNET_SCHEDULER_shutdown (sched);
4534       return;
4535     }
4536   datastore_get_load = GNUNET_LOAD_value_init ();
4537   datastore_put_load = GNUNET_LOAD_value_init ();
4538   block_cfg = GNUNET_CONFIGURATION_create ();
4539   GNUNET_CONFIGURATION_set_value_string (block_cfg,
4540                                          "block",
4541                                          "PLUGINS",
4542                                          "fs");
4543   block_ctx = GNUNET_BLOCK_context_create (block_cfg);
4544   GNUNET_assert (NULL != block_ctx);
4545   dht_handle = GNUNET_DHT_connect (sched,
4546                                    cfg,
4547                                    FS_DHT_HT_SIZE);
4548   if ( (GNUNET_OK != GNUNET_FS_indexing_init (sched, cfg, dsh)) ||
4549        (GNUNET_OK != main_init (sched, server, cfg)) )
4550     {    
4551       GNUNET_SCHEDULER_shutdown (sched);
4552       GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4553       dsh = NULL;
4554       GNUNET_DHT_disconnect (dht_handle);
4555       dht_handle = NULL;
4556       GNUNET_BLOCK_context_destroy (block_ctx);
4557       block_ctx = NULL;
4558       GNUNET_CONFIGURATION_destroy (block_cfg);
4559       block_cfg = NULL;
4560       GNUNET_LOAD_value_free (datastore_get_load);
4561       datastore_get_load = NULL;
4562       GNUNET_LOAD_value_free (datastore_put_load);
4563       datastore_put_load = NULL;
4564       return;   
4565     }
4566 }
4567
4568
4569 /**
4570  * The main function for the fs service.
4571  *
4572  * @param argc number of arguments from the command line
4573  * @param argv command line arguments
4574  * @return 0 ok, 1 on error
4575  */
4576 int
4577 main (int argc, char *const *argv)
4578 {
4579   return (GNUNET_OK ==
4580           GNUNET_SERVICE_run (argc,
4581                               argv,
4582                               "fs",
4583                               GNUNET_SERVICE_OPTION_NONE,
4584                               &run, NULL)) ? 0 : 1;
4585 }
4586
4587 /* end of gnunet-service-fs.c */