bugfixes, more todo
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs.c
23  * @brief gnunet anonymity protocol implementation
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - more statistics
28  */
29 #include "platform.h"
30 #include <float.h>
31 #include "gnunet_constants.h"
32 #include "gnunet_core_service.h"
33 #include "gnunet_dht_service.h"
34 #include "gnunet_datastore_service.h"
35 #include "gnunet_load_lib.h"
36 #include "gnunet_peer_lib.h"
37 #include "gnunet_protocols.h"
38 #include "gnunet_signatures.h"
39 #include "gnunet_statistics_service.h"
40 #include "gnunet_util_lib.h"
41 #include "gnunet-service-fs_indexing.h"
42 #include "fs.h"
43
44 #define DEBUG_FS GNUNET_NO
45
46 /**
47  * Should we introduce random latency in processing?  Required for proper
48  * implementation of GAP, but can be disabled for performance evaluation of
49  * the basic routing algorithm.
50  */
51 #define SUPPORT_DELAYS GNUNET_NO
52
53 /**
54  * Size for the hash map for DHT requests from the FS
55  * service.  Should be about the number of concurrent
56  * DHT requests we plan to make.
57  */
58 #define FS_DHT_HT_SIZE 1024
59
60 /**
61  * How often do we flush trust values to disk?
62  */
63 #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
64
65 /**
66  * How often do we at most PUT content into the DHT?
67  */
68 #define MAX_DHT_PUT_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
69
70 /**
71  * Inverse of the probability that we will submit the same query
72  * to the same peer again.  If the same peer already got the query
73  * repeatedly recently, the probability is multiplied by the inverse
74  * of this number each time.  Note that we only try about every TTL_DECREMENT/2
75  * plus MAX_CORK_DELAY (so roughly every 3.5s).
76  */
77 #define RETRY_PROBABILITY_INV 3
78
79 /**
80  * What is the maximum delay for a P2P FS message (in our interaction
81  * with core)?  FS-internal delays are another story.  The value is
82  * chosen based on the 32k block size.  Given that peers typcially
83  * have at least 1 kb/s bandwidth, 45s waits give us a chance to
84  * transmit one message even to the lowest-bandwidth peers.
85  */
86 #define MAX_TRANSMIT_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 45)
87
88 /**
89  * Maximum number of requests (from other peers) that we're
90  * willing to have pending at any given point in time.
91  */
92 static unsigned long long max_pending_requests = (32 * 1024);
93
94
95 /**
96  * Information we keep for each pending reply.  The
97  * actual message follows at the end of this struct.
98  */
99 struct PendingMessage;
100
101 /**
102  * Function called upon completion of a transmission.
103  *
104  * @param cls closure
105  * @param pid ID of receiving peer, 0 on transmission error
106  */
107 typedef void (*TransmissionContinuation)(void * cls, 
108                                          GNUNET_PEER_Id tpid);
109
110
111 /**
112  * Information we keep for each pending message (GET/PUT).  The
113  * actual message follows at the end of this struct.
114  */
115 struct PendingMessage
116 {
117   /**
118    * This is a doubly-linked list of messages to the same peer.
119    */
120   struct PendingMessage *next;
121
122   /**
123    * This is a doubly-linked list of messages to the same peer.
124    */
125   struct PendingMessage *prev;
126
127   /**
128    * Entry in pending message list for this pending message.
129    */ 
130   struct PendingMessageList *pml;  
131
132   /**
133    * Function to call immediately once we have transmitted this
134    * message.
135    */
136   TransmissionContinuation cont;
137
138   /**
139    * Closure for cont.
140    */
141   void *cont_cls;
142
143   /**
144    * Do not transmit this pending message until this deadline.
145    */
146   struct GNUNET_TIME_Absolute delay_until;
147
148   /**
149    * Size of the reply; actual reply message follows
150    * at the end of this struct.
151    */
152   size_t msize;
153   
154   /**
155    * How important is this message for us?
156    */
157   uint32_t priority;
158  
159 };
160
161
162 /**
163  * Information about a peer that we are connected to.
164  * We track data that is useful for determining which
165  * peers should receive our requests.  We also keep
166  * a list of messages to transmit to this peer.
167  */
168 struct ConnectedPeer
169 {
170
171   /**
172    * List of the last clients for which this peer successfully
173    * answered a query.
174    */
175   struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
176
177   /**
178    * List of the last PIDs for which
179    * this peer successfully answered a query;
180    * We use 0 to indicate no successful reply.
181    */
182   GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
183
184   /**
185    * Average delay between sending the peer a request and
186    * getting a reply (only calculated over the requests for
187    * which we actually got a reply).   Calculated
188    * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
189    *
190    * FIXME: actually, this is currently the delay between us originally
191    * receiving (not forwarding!) a request and us receiving a reply from
192    * this peer (regardless of when we transmitted this request to this peer!)
193    */ 
194   struct GNUNET_TIME_Relative avg_delay;
195
196   /**
197    * Point in time until which this peer does not want us to migrate content
198    * to it.
199    */
200   struct GNUNET_TIME_Absolute migration_blocked;
201
202   /**
203    * Time until when we blocked this peer from migrating
204    * data to us.
205    */
206   struct GNUNET_TIME_Absolute last_migration_block;
207
208   /**
209    * Transmission times for the last MAX_QUEUE_PER_PEER
210    * requests for this peer.  Used as a ring buffer, current
211    * offset is stored in 'last_request_times_off'.  If the
212    * oldest entry is more recent than the 'avg_delay', we should
213    * not send any more requests right now.
214    */
215   struct GNUNET_TIME_Absolute last_request_times[MAX_QUEUE_PER_PEER];
216
217   /**
218    * Handle for an active request for transmission to this
219    * peer, or NULL.
220    */
221   struct GNUNET_CORE_TransmitHandle *cth;
222
223   /**
224    * Messages (replies, queries, content migration) we would like to
225    * send to this peer in the near future.  Sorted by priority, head.
226    */
227   struct PendingMessage *pending_messages_head;
228
229   /**
230    * Messages (replies, queries, content migration) we would like to
231    * send to this peer in the near future.  Sorted by priority, tail.
232    */
233   struct PendingMessage *pending_messages_tail;
234
235   /**
236    * How long does it typically take for us to transmit a message
237    * to this peer?  (delay between the request being issued and
238    * the callback being invoked).
239    */
240   struct GNUNET_LOAD_Value *transmission_delay;
241
242   /**
243    * Time when the last transmission request was issued.
244    */
245   struct GNUNET_TIME_Absolute last_transmission_request_start;
246
247   /**
248    * ID of delay task for scheduling transmission.
249    */
250   GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task;
251
252   /**
253    * Average priority of successful replies.  Calculated
254    * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
255    */
256   double avg_priority;
257
258   /**
259    * Increase in traffic preference still to be submitted
260    * to the core service for this peer.
261    */
262   uint64_t inc_preference;
263
264   /**
265    * Trust rating for this peer
266    */
267   uint32_t trust;
268
269   /**
270    * Trust rating for this peer on disk.
271    */
272   uint32_t disk_trust;
273
274   /**
275    * The peer's identity.
276    */
277   GNUNET_PEER_Id pid;  
278
279   /**
280    * Size of the linked list of 'pending_messages'.
281    */
282   unsigned int pending_requests;
283
284   /**
285    * Which offset in "last_p2p_replies" will be updated next?
286    * (we go round-robin).
287    */
288   unsigned int last_p2p_replies_woff;
289
290   /**
291    * Which offset in "last_client_replies" will be updated next?
292    * (we go round-robin).
293    */
294   unsigned int last_client_replies_woff;
295
296   /**
297    * Current offset into 'last_request_times' ring buffer.
298    */
299   unsigned int last_request_times_off;
300
301 };
302
303
304 /**
305  * Information we keep for each pending request.  We should try to
306  * keep this struct as small as possible since its memory consumption
307  * is key to how many requests we can have pending at once.
308  */
309 struct PendingRequest;
310
311
312 /**
313  * Doubly-linked list of requests we are performing
314  * on behalf of the same client.
315  */
316 struct ClientRequestList
317 {
318
319   /**
320    * This is a doubly-linked list.
321    */
322   struct ClientRequestList *next;
323
324   /**
325    * This is a doubly-linked list.
326    */
327   struct ClientRequestList *prev;
328
329   /**
330    * Request this entry represents.
331    */
332   struct PendingRequest *req;
333
334   /**
335    * Client list this request belongs to.
336    */
337   struct ClientList *client_list;
338
339 };
340
341
342 /**
343  * Replies to be transmitted to the client.  The actual
344  * response message is allocated after this struct.
345  */
346 struct ClientResponseMessage
347 {
348   /**
349    * This is a doubly-linked list.
350    */
351   struct ClientResponseMessage *next;
352
353   /**
354    * This is a doubly-linked list.
355    */
356   struct ClientResponseMessage *prev;
357
358   /**
359    * Client list entry this response belongs to.
360    */
361   struct ClientList *client_list;
362
363   /**
364    * Number of bytes in the response.
365    */
366   size_t msize;
367 };
368
369
370 /**
371  * Linked list of clients we are performing requests
372  * for right now.
373  */
374 struct ClientList
375 {
376   /**
377    * This is a linked list.
378    */
379   struct ClientList *next;
380
381   /**
382    * ID of a client making a request, NULL if this entry is for a
383    * peer.
384    */
385   struct GNUNET_SERVER_Client *client;
386
387   /**
388    * Head of list of requests performed on behalf
389    * of this client right now.
390    */
391   struct ClientRequestList *rl_head;
392
393   /**
394    * Tail of list of requests performed on behalf
395    * of this client right now.
396    */
397   struct ClientRequestList *rl_tail;
398
399   /**
400    * Head of linked list of responses.
401    */
402   struct ClientResponseMessage *res_head;
403
404   /**
405    * Tail of linked list of responses.
406    */
407   struct ClientResponseMessage *res_tail;
408
409   /**
410    * Context for sending replies.
411    */
412   struct GNUNET_CONNECTION_TransmitHandle *th;
413
414 };
415
416
417 /**
418  * Information about a peer that we have forwarded this
419  * request to already.  
420  */
421 struct UsedTargetEntry
422 {
423   /**
424    * What was the last time we have transmitted this request to this
425    * peer?
426    */
427   struct GNUNET_TIME_Absolute last_request_time;
428
429   /**
430    * How often have we transmitted this request to this peer?
431    */
432   unsigned int num_requests;
433
434   /**
435    * PID of the target peer.
436    */
437   GNUNET_PEER_Id pid;
438
439 };
440
441
442
443
444
445 /**
446  * Doubly-linked list of messages we are performing
447  * due to a pending request.
448  */
449 struct PendingMessageList
450 {
451
452   /**
453    * This is a doubly-linked list of messages on behalf of the same request.
454    */
455   struct PendingMessageList *next;
456
457   /**
458    * This is a doubly-linked list of messages on behalf of the same request.
459    */
460   struct PendingMessageList *prev;
461
462   /**
463    * Message this entry represents.
464    */
465   struct PendingMessage *pm;
466
467   /**
468    * Request this entry belongs to.
469    */
470   struct PendingRequest *req;
471
472   /**
473    * Peer this message is targeted for.
474    */
475   struct ConnectedPeer *target;
476
477 };
478
479
480 /**
481  * Information we keep for each pending request.  We should try to
482  * keep this struct as small as possible since its memory consumption
483  * is key to how many requests we can have pending at once.
484  */
485 struct PendingRequest
486 {
487
488   /**
489    * If this request was made by a client, this is our entry in the
490    * client request list; otherwise NULL.
491    */
492   struct ClientRequestList *client_request_list;
493
494   /**
495    * Entry of peer responsible for this entry (if this request
496    * was made by a peer).
497    */
498   struct ConnectedPeer *cp;
499
500   /**
501    * If this is a namespace query, pointer to the hash of the public
502    * key of the namespace; otherwise NULL.  Pointer will be to the 
503    * end of this struct (so no need to free it).
504    */
505   const GNUNET_HashCode *namespace;
506
507   /**
508    * Bloomfilter we use to filter out replies that we don't care about
509    * (anymore).  NULL as long as we are interested in all replies.
510    */
511   struct GNUNET_CONTAINER_BloomFilter *bf;
512
513   /**
514    * Context of our GNUNET_CORE_peer_change_preference call.
515    */
516   struct GNUNET_CORE_InformationRequestContext *irc;
517
518   /**
519    * Reference to DHT get operation for this request (or NULL).
520    */
521   struct GNUNET_DHT_GetHandle *dht_get;
522
523   /**
524    * Hash code of all replies that we have seen so far (only valid
525    * if client is not NULL since we only track replies like this for
526    * our own clients).
527    */
528   GNUNET_HashCode *replies_seen;
529
530   /**
531    * Node in the heap representing this entry; NULL
532    * if we have no heap node.
533    */
534   struct GNUNET_CONTAINER_HeapNode *hnode;
535
536   /**
537    * Head of list of messages being performed on behalf of this
538    * request.
539    */
540   struct PendingMessageList *pending_head;
541
542   /**
543    * Tail of list of messages being performed on behalf of this
544    * request.
545    */
546   struct PendingMessageList *pending_tail;
547
548   /**
549    * When did we first see this request (form this peer), or, if our
550    * client is initiating, when did we last initiate a search?
551    */
552   struct GNUNET_TIME_Absolute start_time;
553
554   /**
555    * The query that this request is for.
556    */
557   GNUNET_HashCode query;
558
559   /**
560    * The task responsible for transmitting queries
561    * for this request.
562    */
563   GNUNET_SCHEDULER_TaskIdentifier task;
564
565   /**
566    * (Interned) Peer identifier that identifies a preferred target
567    * for requests.
568    */
569   GNUNET_PEER_Id target_pid;
570
571   /**
572    * (Interned) Peer identifiers of peers that have already
573    * received our query for this content.
574    */
575   struct UsedTargetEntry *used_targets;
576   
577   /**
578    * Our entry in the queue (non-NULL while we wait for our
579    * turn to interact with the local database).
580    */
581   struct GNUNET_DATASTORE_QueueEntry *qe;
582
583   /**
584    * Size of the 'bf' (in bytes).
585    */
586   size_t bf_size;
587
588   /**
589    * Desired anonymity level; only valid for requests from a local client.
590    */
591   uint32_t anonymity_level;
592
593   /**
594    * How many entries in "used_targets" are actually valid?
595    */
596   unsigned int used_targets_off;
597
598   /**
599    * How long is the "used_targets" array?
600    */
601   unsigned int used_targets_size;
602
603   /**
604    * Number of results found for this request.
605    */
606   unsigned int results_found;
607
608   /**
609    * How many entries in "replies_seen" are actually valid?
610    */
611   unsigned int replies_seen_off;
612
613   /**
614    * How long is the "replies_seen" array?
615    */
616   unsigned int replies_seen_size;
617   
618   /**
619    * Priority with which this request was made.  If one of our clients
620    * made the request, then this is the current priority that we are
621    * using when initiating the request.  This value is used when
622    * we decide to reward other peers with trust for providing a reply.
623    */
624   uint32_t priority;
625
626   /**
627    * Priority points left for us to spend when forwarding this request
628    * to other peers.
629    */
630   uint32_t remaining_priority;
631
632   /**
633    * Number to mingle hashes for bloom-filter tests with.
634    */
635   int32_t mingle;
636
637   /**
638    * TTL with which we saw this request (or, if we initiated, TTL that
639    * we used for the request).
640    */
641   int32_t ttl;
642   
643   /**
644    * Type of the content that this request is for.
645    */
646   enum GNUNET_BLOCK_Type type;
647
648   /**
649    * Remove this request after transmission of the current response.
650    */
651   int8_t do_remove;
652
653   /**
654    * GNUNET_YES if we should not forward this request to other peers.
655    */
656   int8_t local_only;
657
658   /**
659    * GNUNET_YES if we should not forward this request to other peers.
660    */
661   int8_t forward_only;
662
663 };
664
665
666 /**
667  * Block that is ready for migration to other peers.  Actual data is at the end of the block.
668  */
669 struct MigrationReadyBlock
670 {
671
672   /**
673    * This is a doubly-linked list.
674    */
675   struct MigrationReadyBlock *next;
676
677   /**
678    * This is a doubly-linked list.
679    */
680   struct MigrationReadyBlock *prev;
681
682   /**
683    * Query for the block.
684    */
685   GNUNET_HashCode query;
686
687   /**
688    * When does this block expire? 
689    */
690   struct GNUNET_TIME_Absolute expiration;
691
692   /**
693    * Peers we would consider forwarding this
694    * block to.  Zero for empty entries.
695    */
696   GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
697
698   /**
699    * Size of the block.
700    */
701   size_t size;
702
703   /**
704    *  Number of targets already used.
705    */
706   unsigned int used_targets;
707
708   /**
709    * Type of the block.
710    */
711   enum GNUNET_BLOCK_Type type;
712 };
713
714
715 /**
716  * Our connection to the datastore.
717  */
718 static struct GNUNET_DATASTORE_Handle *dsh;
719
720 /**
721  * Our block context.
722  */
723 static struct GNUNET_BLOCK_Context *block_ctx;
724
725 /**
726  * Our block configuration.
727  */
728 static struct GNUNET_CONFIGURATION_Handle *block_cfg;
729
730 /**
731  * Our scheduler.
732  */
733 static struct GNUNET_SCHEDULER_Handle *sched;
734
735 /**
736  * Our configuration.
737  */
738 static const struct GNUNET_CONFIGURATION_Handle *cfg;
739
740 /**
741  * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
742  */
743 static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
744
745 /**
746  * Map of peer identifiers to "struct PendingRequest" (for that peer).
747  */
748 static struct GNUNET_CONTAINER_MultiHashMap *peer_request_map;
749
750 /**
751  * Map of query identifiers to "struct PendingRequest" (for that query).
752  */
753 static struct GNUNET_CONTAINER_MultiHashMap *query_request_map;
754
755 /**
756  * Heap with the request that will expire next at the top.  Contains
757  * pointers of type "struct PendingRequest*"; these will *also* be
758  * aliased from the "requests_by_peer" data structures and the
759  * "requests_by_query" table.  Note that requests from our clients
760  * don't expire and are thus NOT in the "requests_by_expiration"
761  * (or the "requests_by_peer" tables).
762  */
763 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
764
765 /**
766  * Handle for reporting statistics.
767  */
768 static struct GNUNET_STATISTICS_Handle *stats;
769
770 /**
771  * Linked list of clients we are currently processing requests for.
772  */
773 static struct ClientList *client_list;
774
775 /**
776  * Pointer to handle to the core service (points to NULL until we've
777  * connected to it).
778  */
779 static struct GNUNET_CORE_Handle *core;
780
781 /**
782  * Head of linked list of blocks that can be migrated.
783  */
784 static struct MigrationReadyBlock *mig_head;
785
786 /**
787  * Tail of linked list of blocks that can be migrated.
788  */
789 static struct MigrationReadyBlock *mig_tail;
790
791 /**
792  * Request to datastore for migration (or NULL).
793  */
794 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
795
796 /**
797  * Request to datastore for DHT PUTs (or NULL).
798  */
799 static struct GNUNET_DATASTORE_QueueEntry *dht_qe;
800
801 /**
802  * Type we will request for the next DHT PUT round from the datastore.
803  */
804 static enum GNUNET_BLOCK_Type dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
805
806 /**
807  * Where do we store trust information?
808  */
809 static char *trustDirectory;
810
811 /**
812  * ID of task that collects blocks for migration.
813  */
814 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
815
816 /**
817  * ID of task that collects blocks for DHT PUTs.
818  */
819 static GNUNET_SCHEDULER_TaskIdentifier dht_task;
820
821 /**
822  * What is the maximum frequency at which we are allowed to
823  * poll the datastore for migration content?
824  */
825 static struct GNUNET_TIME_Relative min_migration_delay;
826
827 /**
828  * Handle for DHT operations.
829  */
830 static struct GNUNET_DHT_Handle *dht_handle;
831
832 /**
833  * Size of the doubly-linked list of migration blocks.
834  */
835 static unsigned int mig_size;
836
837 /**
838  * Are we allowed to migrate content to this peer.
839  */
840 static int active_migration;
841
842 /**
843  * How many entires with zero anonymity do we currently estimate
844  * to have in the database?
845  */
846 static unsigned int zero_anonymity_count_estimate;
847
848 /**
849  * Typical priorities we're seeing from other peers right now.  Since
850  * most priorities will be zero, this value is the weighted average of
851  * non-zero priorities seen "recently".  In order to ensure that new
852  * values do not dramatically change the ratio, values are first
853  * "capped" to a reasonable range (+N of the current value) and then
854  * averaged into the existing value by a ratio of 1:N.  Hence
855  * receiving the largest possible priority can still only raise our
856  * "current_priorities" by at most 1.
857  */
858 static double current_priorities;
859
860 /**
861  * Datastore 'GET' load tracking.
862  */
863 static struct GNUNET_LOAD_Value *datastore_get_load;
864
865 /**
866  * Datastore 'PUT' load tracking.
867  */
868 static struct GNUNET_LOAD_Value *datastore_put_load;
869
870 /**
871  * How long do requests typically stay in the routing table?
872  */
873 static struct GNUNET_LOAD_Value *rt_entry_lifetime;
874
875 /**
876  * We've just now completed a datastore request.  Update our
877  * datastore load calculations.
878  *
879  * @param start time when the datastore request was issued
880  */
881 static void
882 update_datastore_delays (struct GNUNET_TIME_Absolute start)
883 {
884   struct GNUNET_TIME_Relative delay;
885
886   delay = GNUNET_TIME_absolute_get_duration (start);
887   GNUNET_LOAD_update (datastore_get_load,
888                       delay.value);
889 }
890
891
892 /**
893  * Get the filename under which we would store the GNUNET_HELLO_Message
894  * for the given host and protocol.
895  * @return filename of the form DIRECTORY/HOSTID
896  */
897 static char *
898 get_trust_filename (const struct GNUNET_PeerIdentity *id)
899 {
900   struct GNUNET_CRYPTO_HashAsciiEncoded fil;
901   char *fn;
902
903   GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
904   GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil);
905   return fn;
906 }
907
908
909
910 /**
911  * Transmit messages by copying it to the target buffer
912  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
913  * for writing in the meantime.  In that case, do nothing
914  * (the disconnect or shutdown handler will take care of the rest).
915  * If we were able to transmit messages and there are still more
916  * pending, ask core again for further calls to this function.
917  *
918  * @param cls closure, pointer to the 'struct ConnectedPeer*'
919  * @param size number of bytes available in buf
920  * @param buf where the callee should write the message
921  * @return number of bytes written to buf
922  */
923 static size_t
924 transmit_to_peer (void *cls,
925                   size_t size, void *buf);
926
927
928 /* ******************* clean up functions ************************ */
929
930 /**
931  * Delete the given migration block.
932  *
933  * @param mb block to delete
934  */
935 static void
936 delete_migration_block (struct MigrationReadyBlock *mb)
937 {
938   GNUNET_CONTAINER_DLL_remove (mig_head,
939                                mig_tail,
940                                mb);
941   GNUNET_PEER_decrement_rcs (mb->target_list,
942                              MIGRATION_LIST_SIZE);
943   mig_size--;
944   GNUNET_free (mb);
945 }
946
947
948 /**
949  * Compare the distance of two peers to a key.
950  *
951  * @param key key
952  * @param p1 first peer
953  * @param p2 second peer
954  * @return GNUNET_YES if P1 is closer to key than P2
955  */
956 static int
957 is_closer (const GNUNET_HashCode *key,
958            const struct GNUNET_PeerIdentity *p1,
959            const struct GNUNET_PeerIdentity *p2)
960 {
961   return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey,
962                                     &p2->hashPubKey,
963                                     key);
964 }
965
966
967 /**
968  * Consider migrating content to a given peer.
969  *
970  * @param cls 'struct MigrationReadyBlock*' to select
971  *            targets for (or NULL for none)
972  * @param key ID of the peer 
973  * @param value 'struct ConnectedPeer' of the peer
974  * @return GNUNET_YES (always continue iteration)
975  */
976 static int
977 consider_migration (void *cls,
978                     const GNUNET_HashCode *key,
979                     void *value)
980 {
981   struct MigrationReadyBlock *mb = cls;
982   struct ConnectedPeer *cp = value;
983   struct MigrationReadyBlock *pos;
984   struct GNUNET_PeerIdentity cppid;
985   struct GNUNET_PeerIdentity otherpid;
986   struct GNUNET_PeerIdentity worstpid;
987   size_t msize;
988   unsigned int i;
989   unsigned int repl;
990   
991   /* consider 'cp' as a migration target for mb */
992   if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).value > 0)
993     return GNUNET_YES; /* peer has requested no migration! */
994   if (mb != NULL)
995     {
996       GNUNET_PEER_resolve (cp->pid,
997                            &cppid);
998       repl = MIGRATION_LIST_SIZE;
999       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1000         {
1001           if (mb->target_list[i] == 0)
1002             {
1003               mb->target_list[i] = cp->pid;
1004               GNUNET_PEER_change_rc (mb->target_list[i], 1);
1005               repl = MIGRATION_LIST_SIZE;
1006               break;
1007             }
1008           GNUNET_PEER_resolve (mb->target_list[i],
1009                                &otherpid);
1010           if ( (repl == MIGRATION_LIST_SIZE) &&
1011                is_closer (&mb->query,
1012                           &cppid,
1013                           &otherpid)) 
1014             {
1015               repl = i;
1016               worstpid = otherpid;
1017             }
1018           else if ( (repl != MIGRATION_LIST_SIZE) &&
1019                     (is_closer (&mb->query,
1020                                 &worstpid,
1021                                 &otherpid) ) )
1022             {
1023               repl = i;
1024               worstpid = otherpid;
1025             }       
1026         }
1027       if (repl != MIGRATION_LIST_SIZE) 
1028         {
1029           GNUNET_PEER_change_rc (mb->target_list[repl], -1);
1030           mb->target_list[repl] = cp->pid;
1031           GNUNET_PEER_change_rc (mb->target_list[repl], 1);
1032         }
1033     }
1034
1035   /* consider scheduling transmission to cp for content migration */
1036   if (cp->cth != NULL)        
1037     return GNUNET_YES; 
1038   msize = 0;
1039   pos = mig_head;
1040   while (pos != NULL)
1041     {
1042       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1043         {
1044           if (cp->pid == pos->target_list[i])
1045             {
1046               if (msize == 0)
1047                 msize = pos->size;
1048               else
1049                 msize = GNUNET_MIN (msize,
1050                                     pos->size);
1051               break;
1052             }
1053         }
1054       pos = pos->next;
1055     }
1056   if (msize == 0)
1057     return GNUNET_YES; /* no content available */
1058 #if DEBUG_FS
1059   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1060               "Trying to migrate at least %u bytes to peer `%s'\n",
1061               msize,
1062               GNUNET_h2s (key));
1063 #endif
1064   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
1065     {
1066       GNUNET_SCHEDULER_cancel (sched, cp->delayed_transmission_request_task);
1067       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
1068     }
1069   cp->cth 
1070     = GNUNET_CORE_notify_transmit_ready (core,
1071                                          0, GNUNET_TIME_UNIT_FOREVER_REL,
1072                                          (const struct GNUNET_PeerIdentity*) key,
1073                                          msize + sizeof (struct PutMessage),
1074                                          &transmit_to_peer,
1075                                          cp);
1076   return GNUNET_YES;
1077 }
1078
1079
1080 /**
1081  * Task that is run periodically to obtain blocks for content
1082  * migration
1083  * 
1084  * @param cls unused
1085  * @param tc scheduler context (also unused)
1086  */
1087 static void
1088 gather_migration_blocks (void *cls,
1089                          const struct GNUNET_SCHEDULER_TaskContext *tc);
1090
1091
1092
1093
1094 /**
1095  * Task that is run periodically to obtain blocks for DHT PUTs.
1096  * 
1097  * @param cls type of blocks to gather
1098  * @param tc scheduler context (unused)
1099  */
1100 static void
1101 gather_dht_put_blocks (void *cls,
1102                        const struct GNUNET_SCHEDULER_TaskContext *tc);
1103
1104
1105 /**
1106  * If the migration task is not currently running, consider
1107  * (re)scheduling it with the appropriate delay.
1108  */
1109 static void
1110 consider_migration_gathering ()
1111 {
1112   struct GNUNET_TIME_Relative delay;
1113
1114   if (dsh == NULL)
1115     return;
1116   if (mig_qe != NULL)
1117     return;
1118   if (mig_task != GNUNET_SCHEDULER_NO_TASK)
1119     return;
1120   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
1121                                          mig_size);
1122   delay = GNUNET_TIME_relative_divide (delay,
1123                                        MAX_MIGRATION_QUEUE);
1124   delay = GNUNET_TIME_relative_max (delay,
1125                                     min_migration_delay);
1126   mig_task = GNUNET_SCHEDULER_add_delayed (sched,
1127                                            delay,
1128                                            &gather_migration_blocks,
1129                                            NULL);
1130 }
1131
1132
1133 /**
1134  * If the DHT PUT gathering task is not currently running, consider
1135  * (re)scheduling it with the appropriate delay.
1136  */
1137 static void
1138 consider_dht_put_gathering (void *cls)
1139 {
1140   struct GNUNET_TIME_Relative delay;
1141
1142   if (dsh == NULL)
1143     return;
1144   if (dht_qe != NULL)
1145     return;
1146   if (dht_task != GNUNET_SCHEDULER_NO_TASK)
1147     return;
1148   if (zero_anonymity_count_estimate > 0)
1149     {
1150       delay = GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY,
1151                                            zero_anonymity_count_estimate);
1152       delay = GNUNET_TIME_relative_min (delay,
1153                                         MAX_DHT_PUT_FREQ);
1154     }
1155   else
1156     {
1157       /* if we have NO zero-anonymity content yet, wait 5 minutes for some to
1158          (hopefully) appear */
1159       delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5);
1160     }
1161   dht_task = GNUNET_SCHEDULER_add_delayed (sched,
1162                                            delay,
1163                                            &gather_dht_put_blocks,
1164                                            cls);
1165 }
1166
1167
1168 /**
1169  * Process content offered for migration.
1170  *
1171  * @param cls closure
1172  * @param key key for the content
1173  * @param size number of bytes in data
1174  * @param data content stored
1175  * @param type type of the content
1176  * @param priority priority of the content
1177  * @param anonymity anonymity-level for the content
1178  * @param expiration expiration time for the content
1179  * @param uid unique identifier for the datum;
1180  *        maybe 0 if no unique identifier is available
1181  */
1182 static void
1183 process_migration_content (void *cls,
1184                            const GNUNET_HashCode * key,
1185                            size_t size,
1186                            const void *data,
1187                            enum GNUNET_BLOCK_Type type,
1188                            uint32_t priority,
1189                            uint32_t anonymity,
1190                            struct GNUNET_TIME_Absolute
1191                            expiration, uint64_t uid)
1192 {
1193   struct MigrationReadyBlock *mb;
1194   
1195   if (key == NULL)
1196     {
1197       mig_qe = NULL;
1198       if (mig_size < MAX_MIGRATION_QUEUE)  
1199         consider_migration_gathering ();
1200       return;
1201     }
1202   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1203     {
1204       if (GNUNET_OK !=
1205           GNUNET_FS_handle_on_demand_block (key, size, data,
1206                                             type, priority, anonymity,
1207                                             expiration, uid, 
1208                                             &process_migration_content,
1209                                             NULL))
1210         {
1211           GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1212         }
1213       return;
1214     }
1215 #if DEBUG_FS
1216   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1217               "Retrieved block `%s' of type %u for migration\n",
1218               GNUNET_h2s (key),
1219               type);
1220 #endif
1221   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
1222   mb->query = *key;
1223   mb->expiration = expiration;
1224   mb->size = size;
1225   mb->type = type;
1226   memcpy (&mb[1], data, size);
1227   GNUNET_CONTAINER_DLL_insert_after (mig_head,
1228                                      mig_tail,
1229                                      mig_tail,
1230                                      mb);
1231   mig_size++;
1232   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1233                                          &consider_migration,
1234                                          mb);
1235   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1236 }
1237
1238
1239 /**
1240  * Function called upon completion of the DHT PUT operation.
1241  */
1242 static void
1243 dht_put_continuation (void *cls,
1244                       const struct GNUNET_SCHEDULER_TaskContext *tc)
1245 {
1246   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1247 }
1248
1249
1250 /**
1251  * Store content in DHT.
1252  *
1253  * @param cls closure
1254  * @param key key for the content
1255  * @param size number of bytes in data
1256  * @param data content stored
1257  * @param type type of the content
1258  * @param priority priority of the content
1259  * @param anonymity anonymity-level for the content
1260  * @param expiration expiration time for the content
1261  * @param uid unique identifier for the datum;
1262  *        maybe 0 if no unique identifier is available
1263  */
1264 static void
1265 process_dht_put_content (void *cls,
1266                          const GNUNET_HashCode * key,
1267                          size_t size,
1268                          const void *data,
1269                          enum GNUNET_BLOCK_Type type,
1270                          uint32_t priority,
1271                          uint32_t anonymity,
1272                          struct GNUNET_TIME_Absolute
1273                          expiration, uint64_t uid)
1274
1275   static unsigned int counter;
1276   static GNUNET_HashCode last_vhash;
1277   static GNUNET_HashCode vhash;
1278
1279   if (key == NULL)
1280     {
1281       dht_qe = NULL;
1282       consider_dht_put_gathering (cls);
1283       return;
1284     }
1285   /* slightly funky code to estimate the total number of values with zero
1286      anonymity from the maximum observed length of a monotonically increasing 
1287      sequence of hashes over the contents */
1288   GNUNET_CRYPTO_hash (data, size, &vhash);
1289   if (GNUNET_CRYPTO_hash_cmp (&vhash, &last_vhash) <= 0)
1290     {
1291       if (zero_anonymity_count_estimate > 0)
1292         zero_anonymity_count_estimate /= 2;
1293       counter = 0;
1294     }
1295   last_vhash = vhash;
1296   if (counter < 31)
1297     counter++;
1298   if (zero_anonymity_count_estimate < (1 << counter))
1299     zero_anonymity_count_estimate = (1 << counter);
1300 #if DEBUG_FS
1301   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1302               "Retrieved block `%s' of type %u for DHT PUT\n",
1303               GNUNET_h2s (key),
1304               type);
1305 #endif
1306   GNUNET_DHT_put (dht_handle,
1307                   key,
1308                   GNUNET_DHT_RO_NONE,
1309                   type,
1310                   size,
1311                   data,
1312                   expiration,
1313                   GNUNET_TIME_UNIT_FOREVER_REL,
1314                   &dht_put_continuation,
1315                   cls);
1316 }
1317
1318
1319 /**
1320  * Task that is run periodically to obtain blocks for content
1321  * migration
1322  * 
1323  * @param cls unused
1324  * @param tc scheduler context (also unused)
1325  */
1326 static void
1327 gather_migration_blocks (void *cls,
1328                          const struct GNUNET_SCHEDULER_TaskContext *tc)
1329 {
1330   mig_task = GNUNET_SCHEDULER_NO_TASK;
1331   if (dsh != NULL)
1332     {
1333       mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, UINT_MAX,
1334                                             GNUNET_TIME_UNIT_FOREVER_REL,
1335                                             &process_migration_content, NULL);
1336       GNUNET_assert (mig_qe != NULL);
1337     }
1338 }
1339
1340
1341 /**
1342  * Task that is run periodically to obtain blocks for DHT PUTs.
1343  * 
1344  * @param cls type of blocks to gather
1345  * @param tc scheduler context (unused)
1346  */
1347 static void
1348 gather_dht_put_blocks (void *cls,
1349                        const struct GNUNET_SCHEDULER_TaskContext *tc)
1350 {
1351   dht_task = GNUNET_SCHEDULER_NO_TASK;
1352   if (dsh != NULL)
1353     {
1354       if (dht_put_type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1355         dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
1356       dht_qe = GNUNET_DATASTORE_get_zero_anonymity (dsh, 0, UINT_MAX,
1357                                                     GNUNET_TIME_UNIT_FOREVER_REL,
1358                                                     dht_put_type++,
1359                                                     &process_dht_put_content, NULL);
1360       GNUNET_assert (dht_qe != NULL);
1361     }
1362 }
1363
1364
1365 /**
1366  * We're done with a particular message list entry.
1367  * Free all associated resources.
1368  * 
1369  * @param pml entry to destroy
1370  */
1371 static void
1372 destroy_pending_message_list_entry (struct PendingMessageList *pml)
1373 {
1374   GNUNET_CONTAINER_DLL_remove (pml->req->pending_head,
1375                                pml->req->pending_tail,
1376                                pml);
1377   GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head,
1378                                pml->target->pending_messages_tail,
1379                                pml->pm);
1380   pml->target->pending_requests--;
1381   GNUNET_free (pml->pm);
1382   GNUNET_free (pml);
1383 }
1384
1385
1386 /**
1387  * Destroy the given pending message (and call the respective
1388  * continuation).
1389  *
1390  * @param pm message to destroy
1391  * @param tpid id of peer that the message was delivered to, or 0 for none
1392  */
1393 static void
1394 destroy_pending_message (struct PendingMessage *pm,
1395                          GNUNET_PEER_Id tpid)
1396 {
1397   struct PendingMessageList *pml = pm->pml;
1398   TransmissionContinuation cont;
1399   void *cont_cls;
1400
1401   cont = pm->cont;
1402   cont_cls = pm->cont_cls;
1403   if (pml != NULL)
1404     {
1405       GNUNET_assert (pml->pm == pm);
1406       GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
1407       destroy_pending_message_list_entry (pml);
1408     }
1409   else
1410     {
1411       GNUNET_free (pm);
1412     }
1413   if (cont != NULL)
1414     cont (cont_cls, tpid);  
1415 }
1416
1417
1418 /**
1419  * We're done processing a particular request.
1420  * Free all associated resources.
1421  *
1422  * @param pr request to destroy
1423  */
1424 static void
1425 destroy_pending_request (struct PendingRequest *pr)
1426 {
1427   struct GNUNET_PeerIdentity pid;
1428   unsigned int i;
1429
1430   if (pr->hnode != NULL)
1431     {
1432       GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
1433                                          pr->hnode);
1434       pr->hnode = NULL;
1435     }
1436   if (NULL == pr->client_request_list)
1437     {
1438       GNUNET_STATISTICS_update (stats,
1439                                 gettext_noop ("# P2P searches active"),
1440                                 -1,
1441                                 GNUNET_NO);
1442     }
1443   else
1444     {
1445       GNUNET_STATISTICS_update (stats,
1446                                 gettext_noop ("# client searches active"),
1447                                 -1,
1448                                 GNUNET_NO);
1449     }
1450   if (GNUNET_YES == 
1451       GNUNET_CONTAINER_multihashmap_remove (query_request_map,
1452                                             &pr->query,
1453                                             pr))
1454     {
1455       GNUNET_LOAD_update (rt_entry_lifetime,
1456                           GNUNET_TIME_absolute_get_duration (pr->start_time).value);
1457     }
1458   if (pr->qe != NULL)
1459      {
1460       GNUNET_DATASTORE_cancel (pr->qe);
1461       pr->qe = NULL;
1462     }
1463   if (pr->dht_get != NULL)
1464     {
1465       GNUNET_DHT_get_stop (pr->dht_get);
1466       pr->dht_get = NULL;
1467     }
1468   if (pr->client_request_list != NULL)
1469     {
1470       GNUNET_CONTAINER_DLL_remove (pr->client_request_list->client_list->rl_head,
1471                                    pr->client_request_list->client_list->rl_tail,
1472                                    pr->client_request_list);
1473       GNUNET_free (pr->client_request_list);
1474       pr->client_request_list = NULL;
1475     }
1476   if (pr->cp != NULL)
1477     {
1478       GNUNET_PEER_resolve (pr->cp->pid,
1479                            &pid);
1480       (void) GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1481                                                    &pid.hashPubKey,
1482                                                    pr);
1483       pr->cp = NULL;
1484     }
1485   if (pr->bf != NULL)
1486     {
1487       GNUNET_CONTAINER_bloomfilter_free (pr->bf);                                        
1488       pr->bf = NULL;
1489     }
1490   if (pr->irc != NULL)
1491     {
1492       GNUNET_CORE_peer_change_preference_cancel (pr->irc);
1493       pr->irc = NULL;
1494     }
1495   if (pr->replies_seen != NULL)
1496     {
1497       GNUNET_free (pr->replies_seen);
1498       pr->replies_seen = NULL;
1499     }
1500   if (pr->task != GNUNET_SCHEDULER_NO_TASK)
1501     {
1502       GNUNET_SCHEDULER_cancel (sched,
1503                                pr->task);
1504       pr->task = GNUNET_SCHEDULER_NO_TASK;
1505     }
1506   while (NULL != pr->pending_head)    
1507     destroy_pending_message_list_entry (pr->pending_head);
1508   GNUNET_PEER_change_rc (pr->target_pid, -1);
1509   if (pr->used_targets != NULL)
1510     {
1511       for (i=0;i<pr->used_targets_off;i++)
1512         GNUNET_PEER_change_rc (pr->used_targets[i].pid, -1);
1513       GNUNET_free (pr->used_targets);
1514       pr->used_targets_off = 0;
1515       pr->used_targets_size = 0;
1516       pr->used_targets = NULL;
1517     }
1518   GNUNET_free (pr);
1519 }
1520
1521
1522 /**
1523  * Method called whenever a given peer connects.
1524  *
1525  * @param cls closure, not used
1526  * @param peer peer identity this notification is about
1527  * @param latency reported latency of the connection with 'other'
1528  * @param distance reported distance (DV) to 'other' 
1529  */
1530 static void 
1531 peer_connect_handler (void *cls,
1532                       const struct
1533                       GNUNET_PeerIdentity * peer,
1534                       struct GNUNET_TIME_Relative latency,
1535                       uint32_t distance)
1536 {
1537   struct ConnectedPeer *cp;
1538   struct MigrationReadyBlock *pos;
1539   char *fn;
1540   uint32_t trust;
1541   
1542   cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
1543   cp->transmission_delay = GNUNET_LOAD_value_init ();
1544   cp->pid = GNUNET_PEER_intern (peer);
1545
1546   fn = get_trust_filename (peer);
1547   if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
1548       (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
1549     cp->disk_trust = cp->trust = ntohl (trust);
1550   GNUNET_free (fn);
1551
1552   GNUNET_break (GNUNET_OK ==
1553                 GNUNET_CONTAINER_multihashmap_put (connected_peers,
1554                                                    &peer->hashPubKey,
1555                                                    cp,
1556                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1557
1558   pos = mig_head;
1559   while (NULL != pos)
1560     {
1561       (void) consider_migration (pos, &peer->hashPubKey, cp);
1562       pos = pos->next;
1563     }
1564 }
1565
1566
1567 /**
1568  * Increase the host credit by a value.
1569  *
1570  * @param host which peer to change the trust value on
1571  * @param value is the int value by which the
1572  *  host credit is to be increased or decreased
1573  * @returns the actual change in trust (positive or negative)
1574  */
1575 static int
1576 change_host_trust (struct ConnectedPeer *host, int value)
1577 {
1578   unsigned int old_trust;
1579
1580   if (value == 0)
1581     return 0;
1582   GNUNET_assert (host != NULL);
1583   old_trust = host->trust;
1584   if (value > 0)
1585     {
1586       if (host->trust + value < host->trust)
1587         {
1588           value = UINT32_MAX - host->trust;
1589           host->trust = UINT32_MAX;
1590         }
1591       else
1592         host->trust += value;
1593     }
1594   else
1595     {
1596       if (host->trust < -value)
1597         {
1598           value = -host->trust;
1599           host->trust = 0;
1600         }
1601       else
1602         host->trust += value;
1603     }
1604   return value;
1605 }
1606
1607
1608 /**
1609  * Write host-trust information to a file - flush the buffer entry!
1610  */
1611 static int
1612 flush_trust (void *cls,
1613              const GNUNET_HashCode *key,
1614              void *value)
1615 {
1616   struct ConnectedPeer *host = value;
1617   char *fn;
1618   uint32_t trust;
1619   struct GNUNET_PeerIdentity pid;
1620
1621   if (host->trust == host->disk_trust)
1622     return GNUNET_OK;                     /* unchanged */
1623   GNUNET_PEER_resolve (host->pid,
1624                        &pid);
1625   fn = get_trust_filename (&pid);
1626   if (host->trust == 0)
1627     {
1628       if ((0 != UNLINK (fn)) && (errno != ENOENT))
1629         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
1630                                   GNUNET_ERROR_TYPE_BULK, "unlink", fn);
1631     }
1632   else
1633     {
1634       trust = htonl (host->trust);
1635       if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust, 
1636                                                     sizeof(uint32_t),
1637                                                     GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE
1638                                                     | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ))
1639         host->disk_trust = host->trust;
1640     }
1641   GNUNET_free (fn);
1642   return GNUNET_OK;
1643 }
1644
1645 /**
1646  * Call this method periodically to scan data/hosts for new hosts.
1647  */
1648 static void
1649 cron_flush_trust (void *cls,
1650                   const struct GNUNET_SCHEDULER_TaskContext *tc)
1651 {
1652
1653   if (NULL == connected_peers)
1654     return;
1655   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1656                                          &flush_trust,
1657                                          NULL);
1658   if (NULL == tc)
1659     return;
1660   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1661     return;
1662   GNUNET_SCHEDULER_add_delayed (tc->sched,
1663                                 TRUST_FLUSH_FREQ, &cron_flush_trust, NULL);
1664 }
1665
1666
1667 /**
1668  * Free (each) request made by the peer.
1669  *
1670  * @param cls closure, points to peer that the request belongs to
1671  * @param key current key code
1672  * @param value value in the hash map
1673  * @return GNUNET_YES (we should continue to iterate)
1674  */
1675 static int
1676 destroy_request (void *cls,
1677                  const GNUNET_HashCode * key,
1678                  void *value)
1679 {
1680   const struct GNUNET_PeerIdentity * peer = cls;
1681   struct PendingRequest *pr = value;
1682   
1683   GNUNET_break (GNUNET_YES ==
1684                 GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1685                                                       &peer->hashPubKey,
1686                                                       pr));
1687   destroy_pending_request (pr);
1688   return GNUNET_YES;
1689 }
1690
1691
1692 /**
1693  * Method called whenever a peer disconnects.
1694  *
1695  * @param cls closure, not used
1696  * @param peer peer identity this notification is about
1697  */
1698 static void
1699 peer_disconnect_handler (void *cls,
1700                          const struct
1701                          GNUNET_PeerIdentity * peer)
1702 {
1703   struct ConnectedPeer *cp;
1704   struct PendingMessage *pm;
1705   unsigned int i;
1706   struct MigrationReadyBlock *pos;
1707   struct MigrationReadyBlock *next;
1708
1709   GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map,
1710                                               &peer->hashPubKey,
1711                                               &destroy_request,
1712                                               (void*) peer);
1713   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1714                                           &peer->hashPubKey);
1715   if (cp == NULL)
1716     return;
1717   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1718     {
1719       if (NULL != cp->last_client_replies[i])
1720         {
1721           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1722           cp->last_client_replies[i] = NULL;
1723         }
1724     }
1725   GNUNET_break (GNUNET_YES ==
1726                 GNUNET_CONTAINER_multihashmap_remove (connected_peers,
1727                                                       &peer->hashPubKey,
1728                                                       cp));
1729   /* remove this peer from migration considerations; schedule
1730      alternatives */
1731   next = mig_head;
1732   while (NULL != (pos = next))
1733     {
1734       next = pos->next;
1735       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1736         {
1737           if (pos->target_list[i] == cp->pid)
1738             {
1739               GNUNET_PEER_change_rc (pos->target_list[i], -1);
1740               pos->target_list[i] = 0;
1741             }
1742          }
1743       if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers))
1744         {
1745           delete_migration_block (pos);
1746           consider_migration_gathering ();
1747           continue;
1748         }
1749       GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1750                                              &consider_migration,
1751                                              pos);
1752     }
1753   GNUNET_PEER_change_rc (cp->pid, -1);
1754   GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1755   if (NULL != cp->cth)
1756     {
1757       GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1758       cp->cth = NULL;
1759     }
1760   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
1761     {
1762       GNUNET_SCHEDULER_cancel (sched, cp->delayed_transmission_request_task);
1763       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
1764     }
1765   while (NULL != (pm = cp->pending_messages_head))
1766     destroy_pending_message (pm, 0 /* delivery failed */);
1767   GNUNET_LOAD_value_free (cp->transmission_delay);
1768   GNUNET_break (0 == cp->pending_requests);
1769   GNUNET_free (cp);
1770 }
1771
1772
1773 /**
1774  * Iterator over hash map entries that removes all occurences
1775  * of the given 'client' from the 'last_client_replies' of the
1776  * given connected peer.
1777  *
1778  * @param cls closure, the 'struct GNUNET_SERVER_Client*' to remove
1779  * @param key current key code (unused)
1780  * @param value value in the hash map (the 'struct ConnectedPeer*' to change)
1781  * @return GNUNET_YES (we should continue to iterate)
1782  */
1783 static int
1784 remove_client_from_last_client_replies (void *cls,
1785                                         const GNUNET_HashCode * key,
1786                                         void *value)
1787 {
1788   struct GNUNET_SERVER_Client *client = cls;
1789   struct ConnectedPeer *cp = value;
1790   unsigned int i;
1791
1792   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1793     {
1794       if (cp->last_client_replies[i] == client)
1795         {
1796           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1797           cp->last_client_replies[i] = NULL;
1798         }
1799     }  
1800   return GNUNET_YES;
1801 }
1802
1803
1804 /**
1805  * A client disconnected.  Remove all of its pending queries.
1806  *
1807  * @param cls closure, NULL
1808  * @param client identification of the client
1809  */
1810 static void
1811 handle_client_disconnect (void *cls,
1812                           struct GNUNET_SERVER_Client
1813                           * client)
1814 {
1815   struct ClientList *pos;
1816   struct ClientList *prev;
1817   struct ClientRequestList *rcl;
1818   struct ClientResponseMessage *creply;
1819
1820   if (client == NULL)
1821     return;
1822   prev = NULL;
1823   pos = client_list;
1824   while ( (NULL != pos) &&
1825           (pos->client != client) )
1826     {
1827       prev = pos;
1828       pos = pos->next;
1829     }
1830   if (pos == NULL)
1831     return; /* no requests pending for this client */
1832   while (NULL != (rcl = pos->rl_head))
1833     {
1834       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1835                   "Destroying pending request `%s' on disconnect\n",
1836                   GNUNET_h2s (&rcl->req->query));
1837       destroy_pending_request (rcl->req);
1838     }
1839   if (prev == NULL)
1840     client_list = pos->next;
1841   else
1842     prev->next = pos->next;
1843   if (pos->th != NULL)
1844     {
1845       GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
1846       pos->th = NULL;
1847     }
1848   while (NULL != (creply = pos->res_head))
1849     {
1850       GNUNET_CONTAINER_DLL_remove (pos->res_head,
1851                                    pos->res_tail,
1852                                    creply);
1853       GNUNET_free (creply);
1854     }    
1855   GNUNET_SERVER_client_drop (pos->client);
1856   GNUNET_free (pos);
1857   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1858                                          &remove_client_from_last_client_replies,
1859                                          client);
1860 }
1861
1862
1863 /**
1864  * Iterator to free peer entries.
1865  *
1866  * @param cls closure, unused
1867  * @param key current key code
1868  * @param value value in the hash map (peer entry)
1869  * @return GNUNET_YES (we should continue to iterate)
1870  */
1871 static int 
1872 clean_peer (void *cls,
1873             const GNUNET_HashCode * key,
1874             void *value)
1875 {
1876   peer_disconnect_handler (NULL, (const struct GNUNET_PeerIdentity*) key);
1877   return GNUNET_YES;
1878 }
1879
1880
1881 /**
1882  * Task run during shutdown.
1883  *
1884  * @param cls unused
1885  * @param tc unused
1886  */
1887 static void
1888 shutdown_task (void *cls,
1889                const struct GNUNET_SCHEDULER_TaskContext *tc)
1890 {
1891   if (mig_qe != NULL)
1892     {
1893       GNUNET_DATASTORE_cancel (mig_qe);
1894       mig_qe = NULL;
1895     }
1896   if (dht_qe != NULL)
1897     {
1898       GNUNET_DATASTORE_cancel (dht_qe);
1899       dht_qe = NULL;
1900     }
1901   if (GNUNET_SCHEDULER_NO_TASK != mig_task)
1902     {
1903       GNUNET_SCHEDULER_cancel (sched, mig_task);
1904       mig_task = GNUNET_SCHEDULER_NO_TASK;
1905     }
1906   if (GNUNET_SCHEDULER_NO_TASK != dht_task)
1907     {
1908       GNUNET_SCHEDULER_cancel (sched, dht_task);
1909       dht_task = GNUNET_SCHEDULER_NO_TASK;
1910     }
1911   while (client_list != NULL)
1912     handle_client_disconnect (NULL,
1913                               client_list->client);
1914   cron_flush_trust (NULL, NULL);
1915   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1916                                          &clean_peer,
1917                                          NULL);
1918   GNUNET_break (0 == GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap));
1919   GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
1920   requests_by_expiration_heap = 0;
1921   GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
1922   connected_peers = NULL;
1923   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map));
1924   GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
1925   query_request_map = NULL;
1926   GNUNET_LOAD_value_free (rt_entry_lifetime);
1927   rt_entry_lifetime = NULL;
1928   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map));
1929   GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
1930   peer_request_map = NULL;
1931   GNUNET_assert (NULL != core);
1932   GNUNET_CORE_disconnect (core);
1933   core = NULL;
1934   if (stats != NULL)
1935     {
1936       GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
1937       stats = NULL;
1938     }
1939   if (dsh != NULL)
1940     {
1941       GNUNET_DATASTORE_disconnect (dsh,
1942                                    GNUNET_NO);
1943       dsh = NULL;
1944     }
1945   while (mig_head != NULL)
1946     delete_migration_block (mig_head);
1947   GNUNET_assert (0 == mig_size);
1948   GNUNET_DHT_disconnect (dht_handle);
1949   dht_handle = NULL;
1950   GNUNET_LOAD_value_free (datastore_get_load);
1951   datastore_get_load = NULL;
1952   GNUNET_LOAD_value_free (datastore_put_load);
1953   datastore_put_load = NULL;
1954   GNUNET_BLOCK_context_destroy (block_ctx);
1955   block_ctx = NULL;
1956   GNUNET_CONFIGURATION_destroy (block_cfg);
1957   block_cfg = NULL;
1958   sched = NULL;
1959   cfg = NULL;  
1960   GNUNET_free_non_null (trustDirectory);
1961   trustDirectory = NULL;
1962 }
1963
1964
1965 /* ******************* Utility functions  ******************** */
1966
1967
1968 /**
1969  * We've had to delay a request for transmission to core, but now
1970  * we should be ready.  Run it.
1971  *
1972  * @param cls the 'struct ConnectedPeer' for which a request was delayed
1973  * @param tc task context (unused)
1974  */
1975 static void
1976 delayed_transmission_request (void *cls,
1977                               const struct GNUNET_SCHEDULER_TaskContext *tc)
1978 {
1979   struct ConnectedPeer *cp = cls;
1980   struct GNUNET_PeerIdentity pid;
1981   struct PendingMessage *pm;
1982
1983   pm = cp->pending_messages_head;
1984   cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
1985   GNUNET_assert (cp->cth == NULL);
1986   if (pm == NULL)
1987     return;
1988   GNUNET_PEER_resolve (cp->pid,
1989                        &pid);
1990   cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
1991   cp->cth = GNUNET_CORE_notify_transmit_ready (core,
1992                                                pm->priority,
1993                                                GNUNET_CONSTANTS_SERVICE_TIMEOUT,
1994                                                &pid,
1995                                                pm->msize,
1996                                                &transmit_to_peer,
1997                                                cp);
1998 }
1999
2000
2001 /**
2002  * Transmit messages by copying it to the target buffer
2003  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
2004  * for writing in the meantime.  In that case, do nothing
2005  * (the disconnect or shutdown handler will take care of the rest).
2006  * If we were able to transmit messages and there are still more
2007  * pending, ask core again for further calls to this function.
2008  *
2009  * @param cls closure, pointer to the 'struct ConnectedPeer*'
2010  * @param size number of bytes available in buf
2011  * @param buf where the callee should write the message
2012  * @return number of bytes written to buf
2013  */
2014 static size_t
2015 transmit_to_peer (void *cls,
2016                   size_t size, void *buf)
2017 {
2018   struct ConnectedPeer *cp = cls;
2019   char *cbuf = buf;
2020   struct PendingMessage *pm;
2021   struct PendingMessage *next_pm;
2022   struct GNUNET_TIME_Absolute now;
2023   struct GNUNET_TIME_Relative min_delay;
2024   struct MigrationReadyBlock *mb;
2025   struct MigrationReadyBlock *next;
2026   struct PutMessage migm;
2027   size_t msize;
2028   unsigned int i;
2029   struct GNUNET_PeerIdentity pid;
2030  
2031   cp->cth = NULL;
2032   if (NULL == buf)
2033     {
2034 #if DEBUG_FS
2035       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2036                   "Dropping message, core too busy.\n");
2037 #endif
2038       GNUNET_LOAD_update (cp->transmission_delay,
2039                           UINT64_MAX);
2040       return 0;
2041     }  
2042   GNUNET_LOAD_update (cp->transmission_delay,
2043                       GNUNET_TIME_absolute_get_duration (cp->last_transmission_request_start).value);
2044   now = GNUNET_TIME_absolute_get ();
2045   msize = 0;
2046   min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
2047   next_pm = cp->pending_messages_head;
2048   while ( (NULL != (pm = next_pm) ) &&
2049           (pm->msize <= size) )
2050     {
2051       next_pm = pm->next;
2052       if (pm->delay_until.value > now.value)
2053         {
2054           min_delay = GNUNET_TIME_relative_min (min_delay,
2055                                                 GNUNET_TIME_absolute_get_remaining (pm->delay_until));
2056           continue;
2057         }
2058       memcpy (&cbuf[msize], &pm[1], pm->msize);
2059       msize += pm->msize;
2060       size -= pm->msize;
2061       if (NULL == pm->pml)
2062         {
2063           GNUNET_CONTAINER_DLL_remove (cp->pending_messages_head,
2064                                        cp->pending_messages_tail,
2065                                        pm);
2066           cp->pending_requests--;
2067         }
2068       destroy_pending_message (pm, cp->pid);
2069     }
2070   if (pm != NULL)
2071     min_delay = GNUNET_TIME_UNIT_ZERO;
2072   if (NULL != cp->pending_messages_head)
2073     {     
2074       GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == cp->delayed_transmission_request_task);
2075       cp->delayed_transmission_request_task
2076         = GNUNET_SCHEDULER_add_delayed (sched,
2077                                         min_delay,
2078                                         &delayed_transmission_request,
2079                                         cp);
2080     }
2081   if (pm == NULL)
2082     {      
2083       GNUNET_PEER_resolve (cp->pid,
2084                            &pid);
2085       next = mig_head;
2086       while (NULL != (mb = next))
2087         {
2088           next = mb->next;
2089           for (i=0;i<MIGRATION_LIST_SIZE;i++)
2090             {
2091               if ( (cp->pid == mb->target_list[i]) &&
2092                    (mb->size + sizeof (migm) <= size) )
2093                 {
2094                   GNUNET_PEER_change_rc (mb->target_list[i], -1);
2095                   mb->target_list[i] = 0;
2096                   mb->used_targets++;
2097                   memset (&migm, 0, sizeof (migm));
2098                   migm.header.size = htons (sizeof (migm) + mb->size);
2099                   migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2100                   migm.type = htonl (mb->type);
2101                   migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
2102                   memcpy (&cbuf[msize], &migm, sizeof (migm));
2103                   msize += sizeof (migm);
2104                   size -= sizeof (migm);
2105                   memcpy (&cbuf[msize], &mb[1], mb->size);
2106                   msize += mb->size;
2107                   size -= mb->size;
2108 #if DEBUG_FS
2109                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2110                               "Pushing migration block `%s' (%u bytes) to `%s'\n",
2111                               GNUNET_h2s (&mb->query),
2112                               (unsigned int) mb->size,
2113                               GNUNET_i2s (&pid));
2114 #endif    
2115                   break;
2116                 }
2117               else
2118                 {
2119 #if DEBUG_FS
2120                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2121                               "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
2122                               GNUNET_h2s (&mb->query),
2123                               (unsigned int) mb->size,
2124                               GNUNET_i2s (&pid));
2125 #endif    
2126                 }
2127             }
2128           if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
2129                (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
2130             {
2131               delete_migration_block (mb);
2132               consider_migration_gathering ();
2133             }
2134         }
2135       consider_migration (NULL, 
2136                           &pid.hashPubKey,
2137                           cp);
2138     }
2139 #if DEBUG_FS
2140   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2141               "Transmitting %u bytes to peer with PID %u\n",
2142               (unsigned int) msize,
2143               (unsigned int) cp->pid);
2144 #endif
2145   return msize;
2146 }
2147
2148
2149 /**
2150  * Add a message to the set of pending messages for the given peer.
2151  *
2152  * @param cp peer to send message to
2153  * @param pm message to queue
2154  * @param pr request on which behalf this message is being queued
2155  */
2156 static void
2157 add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
2158                                   struct PendingMessage *pm,
2159                                   struct PendingRequest *pr)
2160 {
2161   struct PendingMessage *pos;
2162   struct PendingMessageList *pml;
2163   struct GNUNET_PeerIdentity pid;
2164
2165   GNUNET_assert (pm->next == NULL);
2166   GNUNET_assert (pm->pml == NULL);    
2167   if (pr != NULL)
2168     {
2169       pml = GNUNET_malloc (sizeof (struct PendingMessageList));
2170       pml->req = pr;
2171       pml->target = cp;
2172       pml->pm = pm;
2173       pm->pml = pml;  
2174       GNUNET_CONTAINER_DLL_insert (pr->pending_head,
2175                                    pr->pending_tail,
2176                                    pml);
2177     }
2178   pos = cp->pending_messages_head;
2179   while ( (pos != NULL) &&
2180           (pm->priority < pos->priority) )
2181     pos = pos->next;    
2182   GNUNET_CONTAINER_DLL_insert_after (cp->pending_messages_head,
2183                                      cp->pending_messages_tail,
2184                                      pos,
2185                                      pm);
2186   cp->pending_requests++;
2187   if (cp->pending_requests > MAX_QUEUE_PER_PEER)
2188     {
2189       GNUNET_STATISTICS_update (stats,
2190                                 gettext_noop ("# P2P searches discarded (queue length bound)"),
2191                                 1,
2192                                 GNUNET_NO);
2193       destroy_pending_message (cp->pending_messages_tail, 0);  
2194     }
2195   GNUNET_PEER_resolve (cp->pid, &pid);
2196   if (NULL != cp->cth)
2197     {
2198       GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
2199       cp->cth = NULL;
2200     }
2201   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
2202     {
2203       GNUNET_SCHEDULER_cancel (sched, cp->delayed_transmission_request_task);
2204       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
2205     }
2206   /* need to schedule transmission */
2207   cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
2208   cp->cth = GNUNET_CORE_notify_transmit_ready (core,
2209                                                cp->pending_messages_head->priority,
2210                                                MAX_TRANSMIT_DELAY,
2211                                                &pid,
2212                                                cp->pending_messages_head->msize,
2213                                                &transmit_to_peer,
2214                                                cp);
2215   if (cp->cth == NULL)
2216     {
2217 #if DEBUG_FS
2218       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2219                   "Failed to schedule transmission with core!\n");
2220 #endif
2221       GNUNET_STATISTICS_update (stats,
2222                                 gettext_noop ("# CORE transmission failures"),
2223                                 1,
2224                                 GNUNET_NO);
2225     }
2226 }
2227
2228
2229 /**
2230  * Test if the DATABASE (GET) load on this peer is too high
2231  * to even consider processing the query at
2232  * all.  
2233  * 
2234  * @return GNUNET_YES if the load is too high to do anything (load high)
2235  *         GNUNET_NO to process normally (load normal)
2236  *         GNUNET_SYSERR to process for free (load low)
2237  */
2238 static int
2239 test_get_load_too_high (uint32_t priority)
2240 {
2241   double ld;
2242
2243   ld = GNUNET_LOAD_get_load (datastore_get_load);
2244   if (ld < 1)
2245     {
2246       GNUNET_STATISTICS_update (stats,
2247                                 gettext_noop ("# requests done for free (low load)"),
2248                                 1,
2249                                 GNUNET_NO);
2250       return GNUNET_SYSERR;
2251     }
2252   if (ld <= priority)
2253     {
2254       GNUNET_STATISTICS_update (stats,
2255                                 gettext_noop ("# requests done for a price (normal load)"),
2256                                 1,
2257                                 GNUNET_NO);
2258       return GNUNET_NO;
2259     }
2260   GNUNET_STATISTICS_update (stats,
2261                             gettext_noop ("# requests dropped due to high load"),
2262                             1,
2263                             GNUNET_NO);
2264   return GNUNET_YES;
2265 }
2266
2267
2268
2269
2270 /**
2271  * Test if the DATABASE (PUT) load on this peer is too high
2272  * to even consider processing the query at
2273  * all.  
2274  * 
2275  * @return GNUNET_YES if the load is too high to do anything (load high)
2276  *         GNUNET_NO to process normally (load normal or low)
2277  */
2278 static int
2279 test_put_load_too_high (uint32_t priority)
2280 {
2281   double ld;
2282
2283   if (GNUNET_LOAD_get_average (datastore_put_load) < 50)
2284     return GNUNET_NO; /* very fast */
2285   ld = GNUNET_LOAD_get_load (datastore_put_load);
2286   if (ld < 2.0 * (1 + priority))
2287     return GNUNET_NO;
2288   GNUNET_STATISTICS_update (stats,
2289                             gettext_noop ("# storage requests dropped due to high load"),
2290                             1,
2291                             GNUNET_NO);
2292   return GNUNET_YES;
2293 }
2294
2295
2296 /* ******************* Pending Request Refresh Task ******************** */
2297
2298
2299
2300 /**
2301  * We use a random delay to make the timing of requests less
2302  * predictable.  This function returns such a random delay.  We add a base
2303  * delay of MAX_CORK_DELAY (1s).
2304  *
2305  * FIXME: make schedule dependent on the specifics of the request?
2306  * Or bandwidth and number of connected peers and load?
2307  *
2308  * @return random delay to use for some request, between 1s and 1000+TTL_DECREMENT ms
2309  */
2310 static struct GNUNET_TIME_Relative
2311 get_processing_delay ()
2312 {
2313   return 
2314     GNUNET_TIME_relative_add (GNUNET_CONSTANTS_MAX_CORK_DELAY,
2315                               GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
2316                                                              GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2317                                                                                        TTL_DECREMENT)));
2318 }
2319
2320
2321 /**
2322  * We're processing a GET request from another peer and have decided
2323  * to forward it to other peers.  This function is called periodically
2324  * and should forward the request to other peers until we have all
2325  * possible replies.  If we have transmitted the *only* reply to
2326  * the initiator we should destroy the pending request.  If we have
2327  * many replies in the queue to the initiator, we should delay sending
2328  * out more queries until the reply queue has shrunk some.
2329  *
2330  * @param cls our "struct ProcessGetContext *"
2331  * @param tc unused
2332  */
2333 static void
2334 forward_request_task (void *cls,
2335                       const struct GNUNET_SCHEDULER_TaskContext *tc);
2336
2337
2338 /**
2339  * Function called after we either failed or succeeded
2340  * at transmitting a query to a peer.  
2341  *
2342  * @param cls the requests "struct PendingRequest*"
2343  * @param tpid ID of receiving peer, 0 on transmission error
2344  */
2345 static void
2346 transmit_query_continuation (void *cls,
2347                              GNUNET_PEER_Id tpid)
2348 {
2349   struct PendingRequest *pr = cls;
2350   unsigned int i;
2351
2352   GNUNET_STATISTICS_update (stats,
2353                             gettext_noop ("# queries scheduled for forwarding"),
2354                             -1,
2355                             GNUNET_NO);
2356   if (tpid == 0)   
2357     {
2358 #if DEBUG_FS
2359       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2360                   "Transmission of request failed, will try again later.\n");
2361 #endif
2362       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2363         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2364                                                  get_processing_delay (),
2365                                                  &forward_request_task,
2366                                                  pr); 
2367       return;    
2368     }
2369 #if DEBUG_FS
2370   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2371               "Transmitted query `%s'\n",
2372               GNUNET_h2s (&pr->query));
2373 #endif
2374   GNUNET_STATISTICS_update (stats,
2375                             gettext_noop ("# queries forwarded"),
2376                             1,
2377                             GNUNET_NO);
2378   for (i=0;i<pr->used_targets_off;i++)
2379     if (pr->used_targets[i].pid == tpid)
2380       break; /* found match! */    
2381   if (i == pr->used_targets_off)
2382     {
2383       /* need to create new entry */
2384       if (pr->used_targets_off == pr->used_targets_size)
2385         GNUNET_array_grow (pr->used_targets,
2386                            pr->used_targets_size,
2387                            pr->used_targets_size * 2 + 2);
2388       GNUNET_PEER_change_rc (tpid, 1);
2389       pr->used_targets[pr->used_targets_off].pid = tpid;
2390       pr->used_targets[pr->used_targets_off].num_requests = 0;
2391       i = pr->used_targets_off++;
2392     }
2393   pr->used_targets[i].last_request_time = GNUNET_TIME_absolute_get ();
2394   pr->used_targets[i].num_requests++;
2395   if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2396     pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2397                                              get_processing_delay (),
2398                                              &forward_request_task,
2399                                              pr);
2400 }
2401
2402
2403 /**
2404  * How many bytes should a bloomfilter be if we have already seen
2405  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
2406  * of bits set per entry.  Furthermore, we should not re-size the
2407  * filter too often (to keep it cheap).
2408  *
2409  * Since other peers will also add entries but not resize the filter,
2410  * we should generally pick a slightly larger size than what the
2411  * strict math would suggest.
2412  *
2413  * @return must be a power of two and smaller or equal to 2^15.
2414  */
2415 static size_t
2416 compute_bloomfilter_size (unsigned int entry_count)
2417 {
2418   size_t size;
2419   unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
2420   uint16_t max = 1 << 15;
2421
2422   if (entry_count > max)
2423     return max;
2424   size = 8;
2425   while ((size < max) && (size < ideal))
2426     size *= 2;
2427   if (size > max)
2428     return max;
2429   return size;
2430 }
2431
2432
2433 /**
2434  * Recalculate our bloom filter for filtering replies.  This function
2435  * will create a new bloom filter from scratch, so it should only be
2436  * called if we have no bloomfilter at all (and hence can create a
2437  * fresh one of minimal size without problems) OR if our peer is the
2438  * initiator (in which case we may resize to larger than mimimum size).
2439  *
2440  * @param pr request for which the BF is to be recomputed
2441  */
2442 static void
2443 refresh_bloomfilter (struct PendingRequest *pr)
2444 {
2445   unsigned int i;
2446   size_t nsize;
2447   GNUNET_HashCode mhash;
2448
2449   nsize = compute_bloomfilter_size (pr->replies_seen_off);
2450   if (nsize == pr->bf_size)
2451     return; /* size not changed */
2452   if (pr->bf != NULL)
2453     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
2454   pr->bf_size = nsize;
2455   pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
2456   pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
2457                                               pr->bf_size,
2458                                               BLOOMFILTER_K);
2459   for (i=0;i<pr->replies_seen_off;i++)
2460     {
2461       GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i],
2462                                 pr->mingle,
2463                                 &mhash);
2464       GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
2465     }
2466 }
2467
2468
2469 /**
2470  * Function called after we've tried to reserve a certain amount of
2471  * bandwidth for a reply.  Check if we succeeded and if so send our
2472  * query.
2473  *
2474  * @param cls the requests "struct PendingRequest*"
2475  * @param peer identifies the peer
2476  * @param bpm_in set to the current bandwidth limit (receiving) for this peer
2477  * @param bpm_out set to the current bandwidth limit (sending) for this peer
2478  * @param amount set to the amount that was actually reserved or unreserved
2479  * @param preference current traffic preference for the given peer
2480  */
2481 static void
2482 target_reservation_cb (void *cls,
2483                        const struct
2484                        GNUNET_PeerIdentity * peer,
2485                        struct GNUNET_BANDWIDTH_Value32NBO bpm_in,
2486                        struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
2487                        int amount,
2488                        uint64_t preference)
2489 {
2490   struct PendingRequest *pr = cls;
2491   struct ConnectedPeer *cp;
2492   struct PendingMessage *pm;
2493   struct GetMessage *gm;
2494   GNUNET_HashCode *ext;
2495   char *bfdata;
2496   size_t msize;
2497   unsigned int k;
2498   int no_route;
2499   uint32_t bm;
2500   unsigned int i;
2501
2502   pr->irc = NULL;
2503   if (peer == NULL)
2504     {
2505       /* error in communication with core, try again later */
2506       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2507         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2508                                                  get_processing_delay (),
2509                                                  &forward_request_task,
2510                                                  pr);
2511       return;
2512     }
2513   /* (3) transmit, update ttl/priority */
2514   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2515                                           &peer->hashPubKey);
2516   if (cp == NULL)
2517     {
2518       /* Peer must have just left */
2519 #if DEBUG_FS
2520       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2521                   "Selected peer disconnected!\n");
2522 #endif
2523       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2524         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2525                                                  get_processing_delay (),
2526                                                  &forward_request_task,
2527                                                  pr);
2528       return;
2529     }
2530   no_route = GNUNET_NO;
2531   if (amount == 0)
2532     {
2533       if (pr->cp == NULL)
2534         {
2535 #if DEBUG_FS > 1
2536           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2537                       "Failed to reserve bandwidth for reply (got %d/%u bytes only)!\n",
2538                       amount,
2539                       DBLOCK_SIZE);
2540 #endif
2541           GNUNET_STATISTICS_update (stats,
2542                                     gettext_noop ("# reply bandwidth reservation requests failed"),
2543                                     1,
2544                                     GNUNET_NO);
2545           if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2546             pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2547                                                      get_processing_delay (),
2548                                                      &forward_request_task,
2549                                                      pr);
2550           return;  /* this target round failed */
2551         }
2552       no_route = GNUNET_YES;
2553     }
2554   
2555   GNUNET_STATISTICS_update (stats,
2556                             gettext_noop ("# queries scheduled for forwarding"),
2557                             1,
2558                             GNUNET_NO);
2559   for (i=0;i<pr->used_targets_off;i++)
2560     if (pr->used_targets[i].pid == cp->pid) 
2561       {
2562         GNUNET_STATISTICS_update (stats,
2563                                   gettext_noop ("# queries retransmitted to same target"),
2564                                   1,
2565                                   GNUNET_NO);
2566         break;
2567       } 
2568
2569   /* build message and insert message into priority queue */
2570 #if DEBUG_FS
2571   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2572               "Forwarding request `%s' to `%4s'!\n",
2573               GNUNET_h2s (&pr->query),
2574               GNUNET_i2s (peer));
2575 #endif
2576   k = 0;
2577   bm = 0;
2578   if (GNUNET_YES == no_route)
2579     {
2580       bm |= GET_MESSAGE_BIT_RETURN_TO;
2581       k++;      
2582     }
2583   if (pr->namespace != NULL)
2584     {
2585       bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
2586       k++;
2587     }
2588   if (pr->target_pid != 0)
2589     {
2590       bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
2591       k++;
2592     }
2593   msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
2594   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
2595   pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
2596   pm->msize = msize;
2597   gm = (struct GetMessage*) &pm[1];
2598   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
2599   gm->header.size = htons (msize);
2600   gm->type = htonl (pr->type);
2601   pr->remaining_priority /= 2;
2602   gm->priority = htonl (pr->remaining_priority);
2603   gm->ttl = htonl (pr->ttl);
2604   gm->filter_mutator = htonl(pr->mingle); 
2605   gm->hash_bitmap = htonl (bm);
2606   gm->query = pr->query;
2607   ext = (GNUNET_HashCode*) &gm[1];
2608   k = 0;
2609   if (GNUNET_YES == no_route)
2610     GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2611   if (pr->namespace != NULL)
2612     memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
2613   if (pr->target_pid != 0)
2614     GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2615   bfdata = (char *) &ext[k];
2616   if (pr->bf != NULL)
2617     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
2618                                                bfdata,
2619                                                pr->bf_size);
2620   pm->cont = &transmit_query_continuation;
2621   pm->cont_cls = pr;
2622   cp->last_request_times[(cp->last_request_times_off++) % MAX_QUEUE_PER_PEER] = GNUNET_TIME_absolute_get ();
2623   add_to_pending_messages_for_peer (cp, pm, pr);
2624 }
2625
2626
2627 /**
2628  * Closure used for "target_peer_select_cb".
2629  */
2630 struct PeerSelectionContext 
2631 {
2632   /**
2633    * The request for which we are selecting
2634    * peers.
2635    */
2636   struct PendingRequest *pr;
2637
2638   /**
2639    * Current "prime" target.
2640    */
2641   struct GNUNET_PeerIdentity target;
2642
2643   /**
2644    * How much do we like this target?
2645    */
2646   double target_score;
2647
2648 };
2649
2650
2651 /**
2652  * Function called for each connected peer to determine
2653  * which one(s) would make good targets for forwarding.
2654  *
2655  * @param cls closure (struct PeerSelectionContext)
2656  * @param key current key code (peer identity)
2657  * @param value value in the hash map (struct ConnectedPeer)
2658  * @return GNUNET_YES if we should continue to
2659  *         iterate,
2660  *         GNUNET_NO if not.
2661  */
2662 static int
2663 target_peer_select_cb (void *cls,
2664                        const GNUNET_HashCode * key,
2665                        void *value)
2666 {
2667   struct PeerSelectionContext *psc = cls;
2668   struct ConnectedPeer *cp = value;
2669   struct PendingRequest *pr = psc->pr;
2670   struct GNUNET_TIME_Relative delay;
2671   double score;
2672   unsigned int i;
2673   unsigned int pc;
2674
2675   /* 1) check that this peer is not the initiator */
2676   if (cp == pr->cp)
2677     {
2678 #if DEBUG_FS
2679       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2680                   "Skipping initiator in forwarding selection\n");
2681 #endif
2682       return GNUNET_YES; /* skip */        
2683     }
2684
2685   /* 2) check if we have already (recently) forwarded to this peer */
2686   /* 2a) this particular request */
2687   pc = 0;
2688   for (i=0;i<pr->used_targets_off;i++)
2689     if (pr->used_targets[i].pid == cp->pid) 
2690       {
2691         pc = pr->used_targets[i].num_requests;
2692         GNUNET_assert (pc > 0);
2693         if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2694                                            RETRY_PROBABILITY_INV * pc))
2695           {
2696 #if DEBUG_FS
2697             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2698                         "NOT re-trying query that was previously transmitted %u times\n",
2699                         (unsigned int) pc);
2700 #endif
2701             return GNUNET_YES; /* skip */
2702           }
2703         break;
2704       }
2705 #if DEBUG_FS
2706   if (0 < pc)
2707     {
2708       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2709                   "Re-trying query that was previously transmitted %u times to this peer\n",
2710                   (unsigned int) pc);
2711     }
2712 #endif
2713   /* 2b) many other requests to this peer */
2714   delay = GNUNET_TIME_absolute_get_duration (cp->last_request_times[cp->last_request_times_off % MAX_QUEUE_PER_PEER]);
2715   if (delay.value <= cp->avg_delay.value)
2716     {
2717 #if DEBUG_FS
2718       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2719                   "NOT sending query since we send %u others to this peer in the last %llums\n",
2720                   MAX_QUEUE_PER_PEER,
2721                   cp->avg_delay.value);
2722 #endif
2723       return GNUNET_YES; /* skip */      
2724     }
2725
2726   /* 3) calculate how much we'd like to forward to this peer,
2727      starting with a random value that is strong enough
2728      to at least give any peer a chance sometimes 
2729      (compared to the other factors that come later) */
2730   /* 3a) count successful (recent) routes from cp for same source */
2731   if (pr->cp != NULL)
2732     {
2733       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2734                                         P2P_SUCCESS_LIST_SIZE);
2735       for (i=0;i<P2P_SUCCESS_LIST_SIZE;i++)
2736         if (cp->last_p2p_replies[i] == pr->cp->pid)
2737           score += 1.0; /* likely successful based on hot path */
2738     }
2739   else
2740     {
2741       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2742                                         CS2P_SUCCESS_LIST_SIZE);
2743       for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
2744         if (cp->last_client_replies[i] == pr->client_request_list->client_list->client)
2745           score += 1.0; /* likely successful based on hot path */
2746     }
2747   /* 3b) include latency */
2748   if (cp->avg_delay.value < 4 * TTL_DECREMENT)
2749     score += 1.0; /* likely fast based on latency */
2750   /* 3c) include priorities */
2751   if (cp->avg_priority <= pr->remaining_priority / 2.0)
2752     score += 1.0; /* likely successful based on priorities */
2753   /* 3d) penalize for queue size */  
2754   score -= (2.0 * cp->pending_requests / (double) MAX_QUEUE_PER_PEER); 
2755   /* 3e) include peer proximity */
2756   score -= (2.0 * (GNUNET_CRYPTO_hash_distance_u32 (key,
2757                                                     &pr->query)) / (double) UINT32_MAX);
2758   /* 4) super-bonus for being the known target */
2759   if (pr->target_pid == cp->pid)
2760     score += 100.0;
2761   /* store best-fit in closure */
2762 #if DEBUG_FS
2763   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2764               "Peer `%s' gets score %f for forwarding query, max is %f\n",
2765               GNUNET_h2s (key),
2766               score,
2767               psc->target_score);
2768 #endif  
2769   score++; /* avoid zero */
2770   if (score > psc->target_score)
2771     {
2772       psc->target_score = score;
2773       psc->target.hashPubKey = *key; 
2774     }
2775   return GNUNET_YES;
2776 }
2777   
2778
2779 /**
2780  * The priority level imposes a bound on the maximum
2781  * value for the ttl that can be requested.
2782  *
2783  * @param ttl_in requested ttl
2784  * @param prio given priority
2785  * @return ttl_in if ttl_in is below the limit,
2786  *         otherwise the ttl-limit for the given priority
2787  */
2788 static int32_t
2789 bound_ttl (int32_t ttl_in, uint32_t prio)
2790 {
2791   unsigned long long allowed;
2792
2793   if (ttl_in <= 0)
2794     return ttl_in;
2795   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
2796   if (ttl_in > allowed)      
2797     {
2798       if (allowed >= (1 << 30))
2799         return 1 << 30;
2800       return allowed;
2801     }
2802   return ttl_in;
2803 }
2804
2805
2806 /**
2807  * Iterator called on each result obtained for a DHT
2808  * operation that expects a reply
2809  *
2810  * @param cls closure
2811  * @param exp when will this value expire
2812  * @param key key of the result
2813  * @param get_path NULL-terminated array of pointers
2814  *                 to the peers on reverse GET path (or NULL if not recorded)
2815  * @param put_path NULL-terminated array of pointers
2816  *                 to the peers on the PUT path (or NULL if not recorded)
2817  * @param type type of the result
2818  * @param size number of bytes in data
2819  * @param data pointer to the result data
2820  */
2821 static void
2822 process_dht_reply (void *cls,
2823                    struct GNUNET_TIME_Absolute exp,
2824                    const GNUNET_HashCode * key,
2825                    const struct GNUNET_PeerIdentity * const *get_path,
2826                    const struct GNUNET_PeerIdentity * const *put_path,
2827                    enum GNUNET_BLOCK_Type type,
2828                    size_t size,
2829                    const void *data);
2830
2831
2832 /**
2833  * We're processing a GET request and have decided
2834  * to forward it to other peers.  This function is called periodically
2835  * and should forward the request to other peers until we have all
2836  * possible replies.  If we have transmitted the *only* reply to
2837  * the initiator we should destroy the pending request.  If we have
2838  * many replies in the queue to the initiator, we should delay sending
2839  * out more queries until the reply queue has shrunk some.
2840  *
2841  * @param cls our "struct ProcessGetContext *"
2842  * @param tc unused
2843  */
2844 static void
2845 forward_request_task (void *cls,
2846                      const struct GNUNET_SCHEDULER_TaskContext *tc)
2847 {
2848   struct PendingRequest *pr = cls;
2849   struct PeerSelectionContext psc;
2850   struct ConnectedPeer *cp; 
2851   struct GNUNET_TIME_Relative delay;
2852
2853   pr->task = GNUNET_SCHEDULER_NO_TASK;
2854   if (pr->irc != NULL)
2855     {
2856 #if DEBUG_FS
2857       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2858                   "Forwarding of query `%s' not attempted due to pending local lookup!\n",
2859                   GNUNET_h2s (&pr->query));
2860 #endif
2861       return; /* already pending */
2862     }
2863   if (GNUNET_YES == pr->local_only)
2864     return; /* configured to not do P2P search */
2865   /* (0) try DHT */
2866   if ( (0 == pr->anonymity_level) &&
2867        (GNUNET_YES != pr->forward_only) &&
2868        (pr->type != GNUNET_BLOCK_TYPE_FS_DBLOCK) &&
2869        (pr->type != GNUNET_BLOCK_TYPE_FS_IBLOCK) )
2870     {
2871       pr->dht_get = GNUNET_DHT_get_start (dht_handle,
2872                                           GNUNET_TIME_UNIT_FOREVER_REL,
2873                                           pr->type,
2874                                           &pr->query,
2875                                           GNUNET_DHT_RO_NONE,
2876                                           pr->bf,
2877                                           pr->mingle,
2878                                           pr->namespace,
2879                                           (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
2880                                           &process_dht_reply,
2881                                           pr);
2882     }
2883   /* (1) select target */
2884   psc.pr = pr;
2885   psc.target_score = -DBL_MAX;
2886   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
2887                                          &target_peer_select_cb,
2888                                          &psc);  
2889   if (psc.target_score == -DBL_MAX)
2890     {
2891       delay = get_processing_delay ();
2892 #if DEBUG_FS 
2893       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2894                   "No peer selected for forwarding of query `%s', will try again in %llu ms!\n",
2895                   GNUNET_h2s (&pr->query),
2896                   delay.value);
2897 #endif
2898       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2899                                                delay,
2900                                                &forward_request_task,
2901                                                pr);
2902       return; /* nobody selected */
2903     }
2904   /* (3) update TTL/priority */
2905   if (pr->client_request_list != NULL)
2906     {
2907       /* FIXME: use better algorithm!? */
2908       if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2909                                          4))
2910         pr->priority++;
2911       /* bound priority we use by priorities we see from other peers
2912          rounded up (must round up so that we can see non-zero
2913          priorities, but round up as little as possible to make it
2914          plausible that we forwarded another peers request) */
2915       if (pr->priority > current_priorities + 1.0)
2916         pr->priority = (uint32_t) current_priorities + 1.0;
2917       pr->ttl = bound_ttl (pr->ttl + TTL_DECREMENT * 2,
2918                            pr->priority);
2919 #if DEBUG_FS
2920       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2921                   "Trying query `%s' with priority %u and TTL %d.\n",
2922                   GNUNET_h2s (&pr->query),
2923                   pr->priority,
2924                   pr->ttl);
2925 #endif
2926     }
2927
2928   /* (3) reserve reply bandwidth */
2929   if (GNUNET_NO == pr->forward_only)
2930     {
2931       cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2932                                               &psc.target.hashPubKey);
2933       GNUNET_assert (NULL != cp);
2934       pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
2935                                                     &psc.target,
2936                                                     GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
2937                                                     GNUNET_BANDWIDTH_value_init (UINT32_MAX),
2938                                                     DBLOCK_SIZE * 2, 
2939                                                     cp->inc_preference,
2940                                                     &target_reservation_cb,
2941                                                     pr);
2942       cp->inc_preference = 0;
2943     }
2944   else
2945     {
2946       /* force forwarding */
2947       static struct GNUNET_BANDWIDTH_Value32NBO zerobw;
2948       target_reservation_cb (pr, &psc.target,
2949                              zerobw, zerobw, 0, 0.0);
2950     }
2951 }
2952
2953
2954 /* **************************** P2P PUT Handling ************************ */
2955
2956
2957 /**
2958  * Function called after we either failed or succeeded
2959  * at transmitting a reply to a peer.  
2960  *
2961  * @param cls the requests "struct PendingRequest*"
2962  * @param tpid ID of receiving peer, 0 on transmission error
2963  */
2964 static void
2965 transmit_reply_continuation (void *cls,
2966                              GNUNET_PEER_Id tpid)
2967 {
2968   struct PendingRequest *pr = cls;
2969   
2970   switch (pr->type)
2971     {
2972     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
2973     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
2974       /* only one reply expected, done with the request! */
2975       destroy_pending_request (pr);
2976       break;
2977     case GNUNET_BLOCK_TYPE_ANY:
2978     case GNUNET_BLOCK_TYPE_FS_KBLOCK:
2979     case GNUNET_BLOCK_TYPE_FS_SBLOCK:
2980       break;
2981     default:
2982       GNUNET_break (0);
2983       break;
2984     }
2985 }
2986
2987
2988 /**
2989  * Transmit the given message by copying it to the target buffer
2990  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
2991  * for writing in the meantime.  In that case, do nothing
2992  * (the disconnect or shutdown handler will take care of the rest).
2993  * If we were able to transmit messages and there are still more
2994  * pending, ask core again for further calls to this function.
2995  *
2996  * @param cls closure, pointer to the 'struct ClientList*'
2997  * @param size number of bytes available in buf
2998  * @param buf where the callee should write the message
2999  * @return number of bytes written to buf
3000  */
3001 static size_t
3002 transmit_to_client (void *cls,
3003                   size_t size, void *buf)
3004 {
3005   struct ClientList *cl = cls;
3006   char *cbuf = buf;
3007   struct ClientResponseMessage *creply;
3008   size_t msize;
3009   
3010   cl->th = NULL;
3011   if (NULL == buf)
3012     {
3013 #if DEBUG_FS
3014       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3015                   "Not sending reply, client communication problem.\n");
3016 #endif
3017       return 0;
3018     }
3019   msize = 0;
3020   while ( (NULL != (creply = cl->res_head) ) &&
3021           (creply->msize <= size) )
3022     {
3023       memcpy (&cbuf[msize], &creply[1], creply->msize);
3024       msize += creply->msize;
3025       size -= creply->msize;
3026       GNUNET_CONTAINER_DLL_remove (cl->res_head,
3027                                    cl->res_tail,
3028                                    creply);
3029       GNUNET_free (creply);
3030     }
3031   if (NULL != creply)
3032     cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
3033                                                   creply->msize,
3034                                                   GNUNET_TIME_UNIT_FOREVER_REL,
3035                                                   &transmit_to_client,
3036                                                   cl);
3037 #if DEBUG_FS
3038   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3039               "Transmitted %u bytes to client\n",
3040               (unsigned int) msize);
3041 #endif
3042   return msize;
3043 }
3044
3045
3046 /**
3047  * Closure for "process_reply" function.
3048  */
3049 struct ProcessReplyClosure
3050 {
3051   /**
3052    * The data for the reply.
3053    */
3054   const void *data;
3055
3056   /**
3057    * Who gave us this reply? NULL for local host (or DHT)
3058    */
3059   struct ConnectedPeer *sender;
3060
3061   /**
3062    * When the reply expires.
3063    */
3064   struct GNUNET_TIME_Absolute expiration;
3065
3066   /**
3067    * Size of data.
3068    */
3069   size_t size;
3070
3071   /**
3072    * Type of the block.
3073    */
3074   enum GNUNET_BLOCK_Type type;
3075
3076   /**
3077    * How much was this reply worth to us?
3078    */
3079   uint32_t priority;
3080
3081   /**
3082    * Evaluation result (returned).
3083    */
3084   enum GNUNET_BLOCK_EvaluationResult eval;
3085
3086   /**
3087    * Did we finish processing the associated request?
3088    */ 
3089   int finished;
3090
3091   /**
3092    * Did we find a matching request?
3093    */
3094   int request_found;
3095 };
3096
3097
3098 /**
3099  * We have received a reply; handle it!
3100  *
3101  * @param cls response (struct ProcessReplyClosure)
3102  * @param key our query
3103  * @param value value in the hash map (info about the query)
3104  * @return GNUNET_YES (we should continue to iterate)
3105  */
3106 static int
3107 process_reply (void *cls,
3108                const GNUNET_HashCode * key,
3109                void *value)
3110 {
3111   struct ProcessReplyClosure *prq = cls;
3112   struct PendingRequest *pr = value;
3113   struct PendingMessage *reply;
3114   struct ClientResponseMessage *creply;
3115   struct ClientList *cl;
3116   struct PutMessage *pm;
3117   struct ConnectedPeer *cp;
3118   struct GNUNET_TIME_Relative cur_delay;
3119 #if SUPPORT_DELAYS  
3120 struct GNUNET_TIME_Relative art_delay;
3121 #endif
3122   size_t msize;
3123   unsigned int i;
3124
3125 #if DEBUG_FS
3126   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3127               "Matched result (type %u) for query `%s' with pending request\n",
3128               (unsigned int) prq->type,
3129               GNUNET_h2s (key));
3130 #endif  
3131   GNUNET_STATISTICS_update (stats,
3132                             gettext_noop ("# replies received and matched"),
3133                             1,
3134                             GNUNET_NO);
3135   if (prq->sender != NULL)
3136     {
3137       for (i=0;i<pr->used_targets_off;i++)
3138         if (pr->used_targets[i].pid == prq->sender->pid)
3139           break;
3140       if (i < pr->used_targets_off)
3141         {
3142           cur_delay = GNUNET_TIME_absolute_get_duration (pr->used_targets[i].last_request_time);      
3143           prq->sender->avg_delay.value
3144             = (prq->sender->avg_delay.value * 
3145                (RUNAVG_DELAY_N - 1) + cur_delay.value) / RUNAVG_DELAY_N; 
3146           prq->sender->avg_priority
3147             = (prq->sender->avg_priority * 
3148                (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
3149         }
3150       if (pr->cp != NULL)
3151         {
3152           GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
3153                                  [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], 
3154                                  -1);
3155           GNUNET_PEER_change_rc (pr->cp->pid, 1);
3156           prq->sender->last_p2p_replies
3157             [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
3158             = pr->cp->pid;
3159         }
3160       else
3161         {
3162           if (NULL != prq->sender->last_client_replies
3163               [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
3164             GNUNET_SERVER_client_drop (prq->sender->last_client_replies
3165                                        [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
3166           prq->sender->last_client_replies
3167             [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
3168             = pr->client_request_list->client_list->client;
3169           GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
3170         }
3171     }
3172   prq->eval = GNUNET_BLOCK_evaluate (block_ctx,
3173                                      prq->type,
3174                                      key,
3175                                      &pr->bf,
3176                                      pr->mingle,
3177                                      pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
3178                                      prq->data,
3179                                      prq->size);
3180   switch (prq->eval)
3181     {
3182     case GNUNET_BLOCK_EVALUATION_OK_MORE:
3183       break;
3184     case GNUNET_BLOCK_EVALUATION_OK_LAST:
3185       while (NULL != pr->pending_head)
3186         destroy_pending_message_list_entry (pr->pending_head);
3187       if (pr->qe != NULL)
3188         {
3189           if (pr->client_request_list != NULL)
3190             GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
3191                                         GNUNET_YES);
3192           GNUNET_DATASTORE_cancel (pr->qe);
3193           pr->qe = NULL;
3194         }
3195       pr->do_remove = GNUNET_YES;
3196       if (pr->task != GNUNET_SCHEDULER_NO_TASK)
3197         {
3198           GNUNET_SCHEDULER_cancel (sched,
3199                                    pr->task);
3200           pr->task = GNUNET_SCHEDULER_NO_TASK;
3201         }
3202       GNUNET_break (GNUNET_YES ==
3203                     GNUNET_CONTAINER_multihashmap_remove (query_request_map,
3204                                                           key,
3205                                                           pr));
3206       GNUNET_LOAD_update (rt_entry_lifetime,
3207                           GNUNET_TIME_absolute_get_duration (pr->start_time).value);
3208       break;
3209     case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
3210       GNUNET_STATISTICS_update (stats,
3211                                 gettext_noop ("# duplicate replies discarded (bloomfilter)"),
3212                                 1,
3213                                 GNUNET_NO);
3214 #if DEBUG_FS
3215       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3216                   "Duplicate response `%s', discarding.\n",
3217                   GNUNET_h2s (&mhash));
3218 #endif
3219       return GNUNET_YES; /* duplicate */
3220     case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
3221       return GNUNET_YES; /* wrong namespace */  
3222     case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
3223       GNUNET_break (0);
3224       return GNUNET_YES;
3225     case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
3226       GNUNET_break (0);
3227       return GNUNET_YES;
3228     case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
3229       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3230                   _("Unsupported block type %u\n"),
3231                   prq->type);
3232       return GNUNET_NO;
3233     }
3234   if (pr->client_request_list != NULL)
3235     {
3236       if (pr->replies_seen_size == pr->replies_seen_off)
3237         GNUNET_array_grow (pr->replies_seen,
3238                            pr->replies_seen_size,
3239                            pr->replies_seen_size * 2 + 4);      
3240       GNUNET_CRYPTO_hash (prq->data,
3241                           prq->size,
3242                           &pr->replies_seen[pr->replies_seen_off++]);         
3243       refresh_bloomfilter (pr);
3244     }
3245   if (NULL == prq->sender)
3246     {
3247 #if DEBUG_FS
3248       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3249                   "Found result for query `%s' in local datastore\n",
3250                   GNUNET_h2s (key));
3251 #endif
3252       GNUNET_STATISTICS_update (stats,
3253                                 gettext_noop ("# results found locally"),
3254                                 1,
3255                                 GNUNET_NO);      
3256     }
3257   prq->priority += pr->remaining_priority;
3258   pr->remaining_priority = 0;
3259   pr->results_found++;
3260   prq->request_found = GNUNET_YES;
3261   if (NULL != pr->client_request_list)
3262     {
3263       GNUNET_STATISTICS_update (stats,
3264                                 gettext_noop ("# replies received for local clients"),
3265                                 1,
3266                                 GNUNET_NO);
3267       cl = pr->client_request_list->client_list;
3268       msize = sizeof (struct PutMessage) + prq->size;
3269       creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage));
3270       creply->msize = msize;
3271       creply->client_list = cl;
3272       GNUNET_CONTAINER_DLL_insert_after (cl->res_head,
3273                                          cl->res_tail,
3274                                          cl->res_tail,
3275                                          creply);      
3276       pm = (struct PutMessage*) &creply[1];
3277       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3278       pm->header.size = htons (msize);
3279       pm->type = htonl (prq->type);
3280       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3281       memcpy (&pm[1], prq->data, prq->size);      
3282       if (NULL == cl->th)
3283         {
3284 #if DEBUG_FS
3285           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3286                       "Transmitting result for query `%s' to client\n",
3287                       GNUNET_h2s (key));
3288 #endif  
3289           cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
3290                                                         msize,
3291                                                         GNUNET_TIME_UNIT_FOREVER_REL,
3292                                                         &transmit_to_client,
3293                                                         cl);
3294         }
3295       GNUNET_break (cl->th != NULL);
3296       if (pr->do_remove)                
3297         {
3298           prq->finished = GNUNET_YES;
3299           destroy_pending_request (pr);         
3300         }
3301     }
3302   else
3303     {
3304       cp = pr->cp;
3305 #if DEBUG_FS
3306       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3307                   "Transmitting result for query `%s' to other peer (PID=%u)\n",
3308                   GNUNET_h2s (key),
3309                   (unsigned int) cp->pid);
3310 #endif  
3311       GNUNET_STATISTICS_update (stats,
3312                                 gettext_noop ("# replies received for other peers"),
3313                                 1,
3314                                 GNUNET_NO);
3315       msize = sizeof (struct PutMessage) + prq->size;
3316       reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
3317       reply->cont = &transmit_reply_continuation;
3318       reply->cont_cls = pr;
3319 #if SUPPORT_DELAYS
3320       art_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
3321                                                  GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3322                                                                            TTL_DECREMENT));
3323       reply->delay_until 
3324         = GNUNET_TIME_relative_to_absolute (art_delay);
3325       GNUNET_STATISTICS_update (stats,
3326                                 gettext_noop ("cummulative artificial delay introduced (ms)"),
3327                                 art_delay.value,
3328                                 GNUNET_NO);
3329 #endif
3330       reply->msize = msize;
3331       reply->priority = UINT32_MAX; /* send replies first! */
3332       pm = (struct PutMessage*) &reply[1];
3333       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3334       pm->header.size = htons (msize);
3335       pm->type = htonl (prq->type);
3336       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3337       memcpy (&pm[1], prq->data, prq->size);
3338       add_to_pending_messages_for_peer (cp, reply, pr);
3339     }
3340   return GNUNET_YES;
3341 }
3342
3343
3344 /**
3345  * Iterator called on each result obtained for a DHT
3346  * operation that expects a reply
3347  *
3348  * @param cls closure
3349  * @param exp when will this value expire
3350  * @param key key of the result
3351  * @param get_path NULL-terminated array of pointers
3352  *                 to the peers on reverse GET path (or NULL if not recorded)
3353  * @param put_path NULL-terminated array of pointers
3354  *                 to the peers on the PUT path (or NULL if not recorded)
3355  * @param type type of the result
3356  * @param size number of bytes in data
3357  * @param data pointer to the result data
3358  */
3359 static void
3360 process_dht_reply (void *cls,
3361                    struct GNUNET_TIME_Absolute exp,
3362                    const GNUNET_HashCode * key,
3363                    const struct GNUNET_PeerIdentity * const *get_path,
3364                    const struct GNUNET_PeerIdentity * const *put_path,
3365                    enum GNUNET_BLOCK_Type type,
3366                    size_t size,
3367                    const void *data)
3368 {
3369   struct PendingRequest *pr = cls;
3370   struct ProcessReplyClosure prq;
3371
3372   memset (&prq, 0, sizeof (prq));
3373   prq.data = data;
3374   prq.expiration = exp;
3375   prq.size = size;  
3376   prq.type = type;
3377   process_reply (&prq, key, pr);
3378 }
3379
3380
3381
3382 /**
3383  * Continuation called to notify client about result of the
3384  * operation.
3385  *
3386  * @param cls closure
3387  * @param success GNUNET_SYSERR on failure
3388  * @param msg NULL on success, otherwise an error message
3389  */
3390 static void 
3391 put_migration_continuation (void *cls,
3392                             int success,
3393                             const char *msg)
3394 {
3395   struct GNUNET_TIME_Absolute *start = cls;
3396   struct GNUNET_TIME_Relative delay;
3397   
3398   delay = GNUNET_TIME_absolute_get_duration (*start);
3399   GNUNET_free (start);
3400   GNUNET_LOAD_update (datastore_put_load,
3401                       delay.value);
3402   if (GNUNET_OK == success)
3403     return;
3404   GNUNET_STATISTICS_update (stats,
3405                             gettext_noop ("# datastore 'put' failures"),
3406                             1,
3407                             GNUNET_NO);
3408 }
3409
3410
3411 /**
3412  * Handle P2P "PUT" message.
3413  *
3414  * @param cls closure, always NULL
3415  * @param other the other peer involved (sender or receiver, NULL
3416  *        for loopback messages where we are both sender and receiver)
3417  * @param message the actual message
3418  * @param latency reported latency of the connection with 'other'
3419  * @param distance reported distance (DV) to 'other' 
3420  * @return GNUNET_OK to keep the connection open,
3421  *         GNUNET_SYSERR to close it (signal serious error)
3422  */
3423 static int
3424 handle_p2p_put (void *cls,
3425                 const struct GNUNET_PeerIdentity *other,
3426                 const struct GNUNET_MessageHeader *message,
3427                 struct GNUNET_TIME_Relative latency,
3428                 uint32_t distance)
3429 {
3430   const struct PutMessage *put;
3431   uint16_t msize;
3432   size_t dsize;
3433   enum GNUNET_BLOCK_Type type;
3434   struct GNUNET_TIME_Absolute expiration;
3435   GNUNET_HashCode query;
3436   struct ProcessReplyClosure prq;
3437   struct GNUNET_TIME_Absolute *start;
3438   struct GNUNET_TIME_Relative block_time;  
3439   double putl;
3440   struct ConnectedPeer *cp; 
3441   struct PendingMessage *pm;
3442   struct MigrationStopMessage *msm;
3443
3444   msize = ntohs (message->size);
3445   if (msize < sizeof (struct PutMessage))
3446     {
3447       GNUNET_break_op(0);
3448       return GNUNET_SYSERR;
3449     }
3450   put = (const struct PutMessage*) message;
3451   dsize = msize - sizeof (struct PutMessage);
3452   type = ntohl (put->type);
3453   expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
3454
3455   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3456     return GNUNET_SYSERR;
3457   if (GNUNET_OK !=
3458       GNUNET_BLOCK_get_key (block_ctx,
3459                             type,
3460                             &put[1],
3461                             dsize,
3462                             &query))
3463     {
3464       GNUNET_break_op (0);
3465       return GNUNET_SYSERR;
3466     }
3467 #if DEBUG_FS
3468   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3469               "Received result for query `%s' from peer `%4s'\n",
3470               GNUNET_h2s (&query),
3471               GNUNET_i2s (other));
3472 #endif
3473   GNUNET_STATISTICS_update (stats,
3474                             gettext_noop ("# replies received (overall)"),
3475                             1,
3476                             GNUNET_NO);
3477   /* now, lookup 'query' */
3478   prq.data = (const void*) &put[1];
3479   if (other != NULL)
3480     prq.sender = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3481                                                     &other->hashPubKey);
3482   else
3483     prq.sender = NULL;
3484   prq.size = dsize;
3485   prq.type = type;
3486   prq.expiration = expiration;
3487   prq.priority = 0;
3488   prq.finished = GNUNET_NO;
3489   prq.request_found = GNUNET_NO;
3490   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3491                                               &query,
3492                                               &process_reply,
3493                                               &prq);
3494   if (prq.sender != NULL)
3495     {
3496       prq.sender->inc_preference += CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority;
3497       prq.sender->trust += prq.priority;
3498     }
3499   if ( (GNUNET_YES == active_migration) &&
3500        (GNUNET_NO == test_put_load_too_high (prq.priority)) )
3501     {      
3502 #if DEBUG_FS
3503       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3504                   "Replicating result for query `%s' with priority %u\n",
3505                   GNUNET_h2s (&query),
3506                   prq.priority);
3507 #endif
3508       start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
3509       *start = GNUNET_TIME_absolute_get ();
3510       GNUNET_DATASTORE_put (dsh,
3511                             0, &query, dsize, &put[1],
3512                             type, prq.priority, 1 /* anonymity */, 
3513                             expiration, 
3514                             1 + prq.priority, MAX_DATASTORE_QUEUE,
3515                             GNUNET_CONSTANTS_SERVICE_TIMEOUT,
3516                             &put_migration_continuation, 
3517                             start);
3518     }
3519   putl = GNUNET_LOAD_get_load (datastore_put_load);
3520   if ( (GNUNET_NO == prq.request_found) &&
3521        ( (GNUNET_YES != active_migration) ||
3522          (putl > 2.5 * (1 + prq.priority)) ) )
3523     {
3524       cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3525                                               &other->hashPubKey);
3526       if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).value < 5000)
3527         return GNUNET_OK; /* already blocked */
3528       /* We're too busy; send MigrationStop message! */
3529       if (GNUNET_YES != active_migration) 
3530         putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
3531       block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
3532                                                   5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3533                                                                                    (unsigned int) (60000 * putl * putl)));
3534       
3535       cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
3536       pm = GNUNET_malloc (sizeof (struct PendingMessage) + 
3537                           sizeof (struct MigrationStopMessage));
3538       pm->msize = sizeof (struct MigrationStopMessage);
3539       pm->priority = UINT32_MAX;
3540       msm = (struct MigrationStopMessage*) &pm[1];
3541       msm->header.size = htons (sizeof (struct MigrationStopMessage));
3542       msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
3543       msm->duration = GNUNET_TIME_relative_hton (block_time);
3544       add_to_pending_messages_for_peer (cp,
3545                                         pm,
3546                                         NULL);
3547     }
3548   return GNUNET_OK;
3549 }
3550
3551
3552 /**
3553  * Handle P2P "MIGRATION_STOP" message.
3554  *
3555  * @param cls closure, always NULL
3556  * @param other the other peer involved (sender or receiver, NULL
3557  *        for loopback messages where we are both sender and receiver)
3558  * @param message the actual message
3559  * @param latency reported latency of the connection with 'other'
3560  * @param distance reported distance (DV) to 'other' 
3561  * @return GNUNET_OK to keep the connection open,
3562  *         GNUNET_SYSERR to close it (signal serious error)
3563  */
3564 static int
3565 handle_p2p_migration_stop (void *cls,
3566                            const struct GNUNET_PeerIdentity *other,
3567                            const struct GNUNET_MessageHeader *message,
3568                            struct GNUNET_TIME_Relative latency,
3569                            uint32_t distance)
3570 {
3571   struct ConnectedPeer *cp; 
3572   const struct MigrationStopMessage *msm;
3573
3574   msm = (const struct MigrationStopMessage*) message;
3575   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3576                                           &other->hashPubKey);
3577   if (cp == NULL)
3578     {
3579       GNUNET_break (0);
3580       return GNUNET_OK;
3581     }
3582   cp->migration_blocked = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration));
3583   return GNUNET_OK;
3584 }
3585
3586
3587
3588 /* **************************** P2P GET Handling ************************ */
3589
3590
3591 /**
3592  * Closure for 'check_duplicate_request_{peer,client}'.
3593  */
3594 struct CheckDuplicateRequestClosure
3595 {
3596   /**
3597    * The new request we should check if it already exists.
3598    */
3599   const struct PendingRequest *pr;
3600
3601   /**
3602    * Existing request found by the checker, NULL if none.
3603    */
3604   struct PendingRequest *have;
3605 };
3606
3607
3608 /**
3609  * Iterator over entries in the 'query_request_map' that
3610  * tries to see if we have the same request pending from
3611  * the same client already.
3612  *
3613  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
3614  * @param key current key code (query, ignored, must match)
3615  * @param value value in the hash map (a 'struct PendingRequest' 
3616  *              that already exists)
3617  * @return GNUNET_YES if we should continue to
3618  *         iterate (no match yet)
3619  *         GNUNET_NO if not (match found).
3620  */
3621 static int
3622 check_duplicate_request_client (void *cls,
3623                                 const GNUNET_HashCode * key,
3624                                 void *value)
3625 {
3626   struct CheckDuplicateRequestClosure *cdc = cls;
3627   struct PendingRequest *have = value;
3628
3629   if (have->client_request_list == NULL)
3630     return GNUNET_YES;
3631   if ( (cdc->pr->client_request_list->client_list->client == have->client_request_list->client_list->client) &&
3632        (cdc->pr != have) )
3633     {
3634       cdc->have = have;
3635       return GNUNET_NO;
3636     }
3637   return GNUNET_YES;
3638 }
3639
3640
3641 /**
3642  * We're processing (local) results for a search request
3643  * from another peer.  Pass applicable results to the
3644  * peer and if we are done either clean up (operation
3645  * complete) or forward to other peers (more results possible).
3646  *
3647  * @param cls our closure (struct LocalGetContext)
3648  * @param key key for the content
3649  * @param size number of bytes in data
3650  * @param data content stored
3651  * @param type type of the content
3652  * @param priority priority of the content
3653  * @param anonymity anonymity-level for the content
3654  * @param expiration expiration time for the content
3655  * @param uid unique identifier for the datum;
3656  *        maybe 0 if no unique identifier is available
3657  */
3658 static void
3659 process_local_reply (void *cls,
3660                      const GNUNET_HashCode * key,
3661                      size_t size,
3662                      const void *data,
3663                      enum GNUNET_BLOCK_Type type,
3664                      uint32_t priority,
3665                      uint32_t anonymity,
3666                      struct GNUNET_TIME_Absolute
3667                      expiration, 
3668                      uint64_t uid)
3669 {
3670   struct PendingRequest *pr = cls;
3671   struct ProcessReplyClosure prq;
3672   struct CheckDuplicateRequestClosure cdrc;
3673   GNUNET_HashCode query;
3674   unsigned int old_rf;
3675   
3676   if (NULL == key)
3677     {
3678 #if DEBUG_FS > 1
3679       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3680                   "Done processing local replies, forwarding request to other peers.\n");
3681 #endif
3682       pr->qe = NULL;
3683       if (pr->client_request_list != NULL)
3684         {
3685           GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
3686                                       GNUNET_YES);
3687           /* Figure out if this is a duplicate request and possibly
3688              merge 'struct PendingRequest' entries */
3689           cdrc.have = NULL;
3690           cdrc.pr = pr;
3691           GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3692                                                       &pr->query,
3693                                                       &check_duplicate_request_client,
3694                                                       &cdrc);
3695           if (cdrc.have != NULL)
3696             {
3697 #if DEBUG_FS
3698               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3699                           "Received request for block `%s' twice from client, will only request once.\n",
3700                           GNUNET_h2s (&pr->query));
3701 #endif
3702               
3703               destroy_pending_request (pr);
3704               return;
3705             }
3706         }
3707
3708       /* no more results */
3709       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
3710         pr->task = GNUNET_SCHEDULER_add_now (sched,
3711                                              &forward_request_task,
3712                                              pr);      
3713       return;
3714     }
3715 #if DEBUG_FS
3716   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3717               "New local response to `%s' of type %u.\n",
3718               GNUNET_h2s (key),
3719               type);
3720 #endif
3721   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3722     {
3723 #if DEBUG_FS
3724       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3725                   "Found ONDEMAND block, performing on-demand encoding\n");
3726 #endif
3727       GNUNET_STATISTICS_update (stats,
3728                                 gettext_noop ("# on-demand blocks matched requests"),
3729                                 1,
3730                                 GNUNET_NO);
3731       if (GNUNET_OK != 
3732           GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, 
3733                                             anonymity, expiration, uid, 
3734                                             &process_local_reply,
3735                                             pr))
3736       if (pr->qe != NULL)
3737         {
3738           GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3739         }
3740       return;
3741     }
3742   old_rf = pr->results_found;
3743   memset (&prq, 0, sizeof (prq));
3744   prq.data = data;
3745   prq.expiration = expiration;
3746   prq.size = size;  
3747   if (GNUNET_OK != 
3748       GNUNET_BLOCK_get_key (block_ctx,
3749                             type,
3750                             data,
3751                             size,
3752                             &query))
3753     {
3754       GNUNET_break (0);
3755       GNUNET_DATASTORE_remove (dsh,
3756                                key,
3757                                size, data,
3758                                -1, -1, 
3759                                GNUNET_TIME_UNIT_FOREVER_REL,
3760                                NULL, NULL);
3761       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3762       return;
3763     }
3764   prq.type = type;
3765   prq.priority = priority;  
3766   prq.finished = GNUNET_NO;
3767   prq.request_found = GNUNET_NO;
3768   if ( (old_rf == 0) &&
3769        (pr->results_found == 0) )
3770     update_datastore_delays (pr->start_time);
3771   process_reply (&prq, key, pr);
3772   if (prq.finished == GNUNET_YES)
3773     return;
3774   if (pr->qe == NULL)
3775     return; /* done here */
3776   if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
3777     {
3778       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3779       return;
3780     }
3781   if ( (pr->client_request_list == NULL) &&
3782        ( (GNUNET_YES == test_get_load_too_high (0)) ||
3783          (pr->results_found > 5 + 2 * pr->priority) ) )
3784     {
3785 #if DEBUG_FS > 2
3786       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3787                   "Load too high, done with request\n");
3788 #endif
3789       GNUNET_STATISTICS_update (stats,
3790                                 gettext_noop ("# processing result set cut short due to load"),
3791                                 1,
3792                                 GNUNET_NO);
3793       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3794       return;
3795     }
3796   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3797 }
3798
3799
3800 /**
3801  * We've received a request with the specified priority.  Bound it
3802  * according to how much we trust the given peer.
3803  * 
3804  * @param prio_in requested priority
3805  * @param cp the peer making the request
3806  * @return effective priority
3807  */
3808 static int32_t
3809 bound_priority (uint32_t prio_in,
3810                 struct ConnectedPeer *cp)
3811 {
3812 #define N ((double)128.0)
3813   uint32_t ret;
3814   double rret;
3815   int ld;
3816
3817   ld = test_get_load_too_high (0);
3818   if (ld == GNUNET_SYSERR)
3819     return 0; /* excess resources */
3820   ret = change_host_trust (cp, prio_in);
3821   if (ret > 0)
3822     {
3823       if (ret > current_priorities + N)
3824         rret = current_priorities + N;
3825       else
3826         rret = ret;
3827       current_priorities 
3828         = (current_priorities * (N-1) + rret)/N;
3829     }
3830   if ( (ld == GNUNET_YES) && (ret > 0) )
3831     {
3832       /* try with charging */
3833       ld = test_get_load_too_high (ret);
3834     }
3835   if (ld == GNUNET_YES)
3836     {
3837       /* undo charge */
3838       if (ret != 0)
3839         change_host_trust (cp, -ret);
3840       return -1; /* not enough resources */
3841     }
3842 #undef N
3843   return ret;
3844 }
3845
3846
3847 /**
3848  * Iterator over entries in the 'query_request_map' that
3849  * tries to see if we have the same request pending from
3850  * the same peer already.
3851  *
3852  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
3853  * @param key current key code (query, ignored, must match)
3854  * @param value value in the hash map (a 'struct PendingRequest' 
3855  *              that already exists)
3856  * @return GNUNET_YES if we should continue to
3857  *         iterate (no match yet)
3858  *         GNUNET_NO if not (match found).
3859  */
3860 static int
3861 check_duplicate_request_peer (void *cls,
3862                               const GNUNET_HashCode * key,
3863                               void *value)
3864 {
3865   struct CheckDuplicateRequestClosure *cdc = cls;
3866   struct PendingRequest *have = value;
3867
3868   if (cdc->pr->target_pid == have->target_pid)
3869     {
3870       cdc->have = have;
3871       return GNUNET_NO;
3872     }
3873   return GNUNET_YES;
3874 }
3875
3876
3877 /**
3878  * Handle P2P "GET" request.
3879  *
3880  * @param cls closure, always NULL
3881  * @param other the other peer involved (sender or receiver, NULL
3882  *        for loopback messages where we are both sender and receiver)
3883  * @param message the actual message
3884  * @param latency reported latency of the connection with 'other'
3885  * @param distance reported distance (DV) to 'other' 
3886  * @return GNUNET_OK to keep the connection open,
3887  *         GNUNET_SYSERR to close it (signal serious error)
3888  */
3889 static int
3890 handle_p2p_get (void *cls,
3891                 const struct GNUNET_PeerIdentity *other,
3892                 const struct GNUNET_MessageHeader *message,
3893                 struct GNUNET_TIME_Relative latency,
3894                 uint32_t distance)
3895 {
3896   struct PendingRequest *pr;
3897   struct ConnectedPeer *cp;
3898   struct ConnectedPeer *cps;
3899   struct CheckDuplicateRequestClosure cdc;
3900   struct GNUNET_TIME_Relative timeout;
3901   uint16_t msize;
3902   const struct GetMessage *gm;
3903   unsigned int bits;
3904   const GNUNET_HashCode *opt;
3905   uint32_t bm;
3906   size_t bfsize;
3907   uint32_t ttl_decrement;
3908   int32_t priority;
3909   enum GNUNET_BLOCK_Type type;
3910   int have_ns;
3911
3912   msize = ntohs(message->size);
3913   if (msize < sizeof (struct GetMessage))
3914     {
3915       GNUNET_break_op (0);
3916       return GNUNET_SYSERR;
3917     }
3918   gm = (const struct GetMessage*) message;
3919 #if DEBUG_FS
3920   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3921               "Received request for `%s'\n",
3922               GNUNET_h2s (&gm->query));
3923 #endif
3924   type = ntohl (gm->type);
3925   bm = ntohl (gm->hash_bitmap);
3926   bits = 0;
3927   while (bm > 0)
3928     {
3929       if (1 == (bm & 1))
3930         bits++;
3931       bm >>= 1;
3932     }
3933   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
3934     {
3935       GNUNET_break_op (0);
3936       return GNUNET_SYSERR;
3937     }  
3938   opt = (const GNUNET_HashCode*) &gm[1];
3939   bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
3940   bm = ntohl (gm->hash_bitmap);
3941   bits = 0;
3942   cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3943                                            &other->hashPubKey);
3944   if (NULL == cps)
3945     {
3946       /* peer must have just disconnected */
3947       GNUNET_STATISTICS_update (stats,
3948                                 gettext_noop ("# requests dropped due to initiator not being connected"),
3949                                 1,
3950                                 GNUNET_NO);
3951       return GNUNET_SYSERR;
3952     }
3953   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
3954     cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3955                                             &opt[bits++]);
3956   else
3957     cp = cps;
3958   if (cp == NULL)
3959     {
3960 #if DEBUG_FS
3961       if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
3962         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3963                     "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
3964                     GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
3965       
3966       else
3967         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3968                     "Failed to find peer `%4s' in connection set. Dropping query.\n",
3969                     GNUNET_i2s (other));
3970 #endif
3971       GNUNET_STATISTICS_update (stats,
3972                                 gettext_noop ("# requests dropped due to missing reverse route"),
3973                                 1,
3974                                 GNUNET_NO);
3975      /* FIXME: try connect? */
3976       return GNUNET_OK;
3977     }
3978   /* note that we can really only check load here since otherwise
3979      peers could find out that we are overloaded by not being
3980      disconnected after sending us a malformed query... */
3981   priority = bound_priority (ntohl (gm->priority), cps);
3982   if (priority < 0)
3983     {
3984 #if DEBUG_FS
3985       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3986                   "Dropping query from `%s', this peer is too busy.\n",
3987                   GNUNET_i2s (other));
3988 #endif
3989       GNUNET_STATISTICS_update (stats,
3990                                 gettext_noop ("# requests dropped due to high load"),
3991                                 1,
3992                                 GNUNET_NO);
3993       return GNUNET_OK;
3994     }
3995 #if DEBUG_FS 
3996   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3997               "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
3998               GNUNET_h2s (&gm->query),
3999               (unsigned int) type,
4000               GNUNET_i2s (other),
4001               (unsigned int) bm);
4002 #endif
4003   have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
4004   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
4005                       (have_ns ? sizeof(GNUNET_HashCode) : 0));
4006   if (have_ns)
4007     {
4008       pr->namespace = (GNUNET_HashCode*) &pr[1];
4009       memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
4010     }
4011   if ( (GNUNET_LOAD_get_load (cp->transmission_delay) > 3 * (1 + priority)) ||
4012        (GNUNET_LOAD_get_average (cp->transmission_delay) > 
4013         GNUNET_CONSTANTS_MAX_CORK_DELAY.value * 2 + GNUNET_LOAD_get_average (rt_entry_lifetime)) )
4014     {
4015       /* don't have BW to send to peer, or would likely take longer than we have for it,
4016          so at best indirect the query */
4017       priority = 0;
4018       pr->forward_only = GNUNET_YES;
4019     }
4020   pr->type = type;
4021   pr->mingle = ntohl (gm->filter_mutator);
4022   if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
4023     pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
4024   pr->anonymity_level = 1;
4025   pr->priority = (uint32_t) priority;
4026   pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
4027   pr->query = gm->query;
4028   /* decrement ttl (always) */
4029   ttl_decrement = 2 * TTL_DECREMENT +
4030     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
4031                               TTL_DECREMENT);
4032   if ( (pr->ttl < 0) &&
4033        (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
4034     {
4035 #if DEBUG_FS
4036       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4037                   "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
4038                   GNUNET_i2s (other),
4039                   pr->ttl,
4040                   ttl_decrement);
4041 #endif
4042       GNUNET_STATISTICS_update (stats,
4043                                 gettext_noop ("# requests dropped due TTL underflow"),
4044                                 1,
4045                                 GNUNET_NO);
4046       /* integer underflow => drop (should be very rare)! */      
4047       GNUNET_free (pr);
4048       return GNUNET_OK;
4049     } 
4050   pr->ttl -= ttl_decrement;
4051   pr->start_time = GNUNET_TIME_absolute_get ();
4052
4053   /* get bloom filter */
4054   if (bfsize > 0)
4055     {
4056       pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
4057                                                   bfsize,
4058                                                   BLOOMFILTER_K);
4059       pr->bf_size = bfsize;
4060     }
4061   cdc.have = NULL;
4062   cdc.pr = pr;
4063   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
4064                                               &gm->query,
4065                                               &check_duplicate_request_peer,
4066                                               &cdc);
4067   if (cdc.have != NULL)
4068     {
4069       if (cdc.have->start_time.value + cdc.have->ttl >=
4070           pr->start_time.value + pr->ttl)
4071         {
4072           /* existing request has higher TTL, drop new one! */
4073           cdc.have->priority += pr->priority;
4074           destroy_pending_request (pr);
4075 #if DEBUG_FS
4076           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4077                       "Have existing request with higher TTL, dropping new request.\n",
4078                       GNUNET_i2s (other));
4079 #endif
4080           GNUNET_STATISTICS_update (stats,
4081                                     gettext_noop ("# requests dropped due to higher-TTL request"),
4082                                     1,
4083                                     GNUNET_NO);
4084           return GNUNET_OK;
4085         }
4086       else
4087         {
4088           /* existing request has lower TTL, drop old one! */
4089           pr->priority += cdc.have->priority;
4090           /* Possible optimization: if we have applicable pending
4091              replies in 'cdc.have', we might want to move those over
4092              (this is a really rare special-case, so it is not clear
4093              that this would be worth it) */
4094           destroy_pending_request (cdc.have);
4095           /* keep processing 'pr'! */
4096         }
4097     }
4098
4099   pr->cp = cp;
4100   GNUNET_break (GNUNET_OK ==
4101                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
4102                                                    &gm->query,
4103                                                    pr,
4104                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4105   GNUNET_break (GNUNET_OK ==
4106                 GNUNET_CONTAINER_multihashmap_put (peer_request_map,
4107                                                    &other->hashPubKey,
4108                                                    pr,
4109                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4110   
4111   pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
4112                                             pr,
4113                                             pr->start_time.value + pr->ttl);
4114
4115   GNUNET_STATISTICS_update (stats,
4116                             gettext_noop ("# P2P searches received"),
4117                             1,
4118                             GNUNET_NO);
4119   GNUNET_STATISTICS_update (stats,
4120                             gettext_noop ("# P2P searches active"),
4121                             1,
4122                             GNUNET_NO);
4123
4124   /* calculate change in traffic preference */
4125   cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
4126   /* process locally */
4127   if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
4128     type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
4129   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
4130                                            (pr->priority + 1)); 
4131   if (GNUNET_YES != pr->forward_only)
4132     {
4133 #if DEBUG_FS
4134       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4135                   "Handing request for `%s' to datastore\n",
4136                   GNUNET_h2s (&gm->query));
4137 #endif
4138       pr->qe = GNUNET_DATASTORE_get (dsh,
4139                                      &gm->query,
4140                                      type,                             
4141                                      pr->priority + 1,
4142                                      MAX_DATASTORE_QUEUE,                                
4143                                      timeout,
4144                                      &process_local_reply,
4145                                      pr);
4146       if (NULL == pr->qe)
4147         {
4148           GNUNET_STATISTICS_update (stats,
4149                                     gettext_noop ("# requests dropped by datastore (queue length limit)"),
4150                                     1,
4151                                     GNUNET_NO);
4152         }
4153     }
4154   else
4155     {
4156       GNUNET_STATISTICS_update (stats,
4157                                 gettext_noop ("# requests forwarded due to high load"),
4158                                 1,
4159                                 GNUNET_NO);
4160     }
4161
4162   /* Are multiple results possible (and did we look locally)?  If so, start processing remotely now! */
4163   switch (pr->type)
4164     {
4165     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
4166     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
4167       /* only one result, wait for datastore */
4168       if (GNUNET_YES != pr->forward_only)
4169         break;
4170     default:
4171       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
4172         pr->task = GNUNET_SCHEDULER_add_now (sched,
4173                                              &forward_request_task,
4174                                              pr);
4175     }
4176
4177   /* make sure we don't track too many requests */
4178   if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
4179     {
4180       pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
4181       GNUNET_assert (pr != NULL);
4182       destroy_pending_request (pr);
4183     }
4184   return GNUNET_OK;
4185 }
4186
4187
4188 /* **************************** CS GET Handling ************************ */
4189
4190
4191 /**
4192  * Handle START_SEARCH-message (search request from client).
4193  *
4194  * @param cls closure
4195  * @param client identification of the client
4196  * @param message the actual message
4197  */
4198 static void
4199 handle_start_search (void *cls,
4200                      struct GNUNET_SERVER_Client *client,
4201                      const struct GNUNET_MessageHeader *message)
4202 {
4203   static GNUNET_HashCode all_zeros;
4204   const struct SearchMessage *sm;
4205   struct ClientList *cl;
4206   struct ClientRequestList *crl;
4207   struct PendingRequest *pr;
4208   uint16_t msize;
4209   unsigned int sc;
4210   enum GNUNET_BLOCK_Type type;
4211
4212   msize = ntohs (message->size);
4213   if ( (msize < sizeof (struct SearchMessage)) ||
4214        (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
4215     {
4216       GNUNET_break (0);
4217       GNUNET_SERVER_receive_done (client,
4218                                   GNUNET_SYSERR);
4219       return;
4220     }
4221   GNUNET_STATISTICS_update (stats,
4222                             gettext_noop ("# client searches received"),
4223                             1,
4224                             GNUNET_NO);
4225   sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
4226   sm = (const struct SearchMessage*) message;
4227   type = ntohl (sm->type);
4228 #if DEBUG_FS
4229   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4230               "Received request for `%s' of type %u from local client\n",
4231               GNUNET_h2s (&sm->query),
4232               (unsigned int) type);
4233 #endif
4234   cl = client_list;
4235   while ( (cl != NULL) &&
4236           (cl->client != client) )
4237     cl = cl->next;
4238   if (cl == NULL)
4239     {
4240       cl = GNUNET_malloc (sizeof (struct ClientList));
4241       cl->client = client;
4242       GNUNET_SERVER_client_keep (client);
4243       cl->next = client_list;
4244       client_list = cl;
4245     }
4246   /* detect duplicate KBLOCK requests */
4247   if ( (type == GNUNET_BLOCK_TYPE_FS_KBLOCK) ||
4248        (type == GNUNET_BLOCK_TYPE_FS_NBLOCK) ||
4249        (type == GNUNET_BLOCK_TYPE_ANY) )
4250     {
4251       crl = cl->rl_head;
4252       while ( (crl != NULL) &&
4253               ( (0 != memcmp (&crl->req->query,
4254                               &sm->query,
4255                               sizeof (GNUNET_HashCode))) ||
4256                 (crl->req->type != type) ) )
4257         crl = crl->next;
4258       if (crl != NULL)  
4259         { 
4260 #if DEBUG_FS
4261           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4262                       "Have existing request, merging content-seen lists.\n");
4263 #endif
4264           pr = crl->req;
4265           /* Duplicate request (used to send long list of
4266              known/blocked results); merge 'pr->replies_seen'
4267              and update bloom filter */
4268           GNUNET_array_grow (pr->replies_seen,
4269                              pr->replies_seen_size,
4270                              pr->replies_seen_off + sc);
4271           memcpy (&pr->replies_seen[pr->replies_seen_off],
4272                   &sm[1],
4273                   sc * sizeof (GNUNET_HashCode));
4274           pr->replies_seen_off += sc;
4275           refresh_bloomfilter (pr);
4276           GNUNET_STATISTICS_update (stats,
4277                                     gettext_noop ("# client searches updated (merged content seen list)"),
4278                                     1,
4279                                     GNUNET_NO);
4280           GNUNET_SERVER_receive_done (client,
4281                                       GNUNET_OK);
4282           return;
4283         }
4284     }
4285   GNUNET_STATISTICS_update (stats,
4286                             gettext_noop ("# client searches active"),
4287                             1,
4288                             GNUNET_NO);
4289   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
4290                       ((type == GNUNET_BLOCK_TYPE_FS_SBLOCK) ? sizeof(GNUNET_HashCode) : 0));
4291   crl = GNUNET_malloc (sizeof (struct ClientRequestList));
4292   memset (crl, 0, sizeof (struct ClientRequestList));
4293   crl->client_list = cl;
4294   GNUNET_CONTAINER_DLL_insert (cl->rl_head,
4295                                cl->rl_tail,
4296                                crl);  
4297   crl->req = pr;
4298   pr->type = type;
4299   pr->client_request_list = crl;
4300   GNUNET_array_grow (pr->replies_seen,
4301                      pr->replies_seen_size,
4302                      sc);
4303   memcpy (pr->replies_seen,
4304           &sm[1],
4305           sc * sizeof (GNUNET_HashCode));
4306   pr->replies_seen_off = sc;
4307   pr->anonymity_level = ntohl (sm->anonymity_level); 
4308   pr->start_time = GNUNET_TIME_absolute_get ();
4309   refresh_bloomfilter (pr);
4310   pr->query = sm->query;
4311   if (0 == (1 & ntohl (sm->options)))
4312     pr->local_only = GNUNET_NO;
4313   else
4314     pr->local_only = GNUNET_YES;
4315   switch (type)
4316     {
4317     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
4318     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
4319       if (0 != memcmp (&sm->target,
4320                        &all_zeros,
4321                        sizeof (GNUNET_HashCode)))
4322         pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target);
4323       break;
4324     case GNUNET_BLOCK_TYPE_FS_SBLOCK:
4325       pr->namespace = (GNUNET_HashCode*) &pr[1];
4326       memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode));
4327       break;
4328     default:
4329       break;
4330     }
4331   GNUNET_break (GNUNET_OK ==
4332                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
4333                                                    &sm->query,
4334                                                    pr,
4335                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4336   if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
4337     type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */
4338   pr->qe = GNUNET_DATASTORE_get (dsh,
4339                                  &sm->query,
4340                                  type,
4341                                  -3, -1,
4342                                  GNUNET_CONSTANTS_SERVICE_TIMEOUT,                             
4343                                  &process_local_reply,
4344                                  pr);
4345 }
4346
4347
4348 /* **************************** Startup ************************ */
4349
4350 /**
4351  * Process fs requests.
4352  *
4353  * @param s scheduler to use
4354  * @param server the initialized server
4355  * @param c configuration to use
4356  */
4357 static int
4358 main_init (struct GNUNET_SCHEDULER_Handle *s,
4359            struct GNUNET_SERVER_Handle *server,
4360            const struct GNUNET_CONFIGURATION_Handle *c)
4361 {
4362   static const struct GNUNET_CORE_MessageHandler p2p_handlers[] =
4363     {
4364       { &handle_p2p_get, 
4365         GNUNET_MESSAGE_TYPE_FS_GET, 0 },
4366       { &handle_p2p_put, 
4367         GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
4368       { &handle_p2p_migration_stop, 
4369         GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP,
4370         sizeof (struct MigrationStopMessage) },
4371       { NULL, 0, 0 }
4372     };
4373   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
4374     {&GNUNET_FS_handle_index_start, NULL, 
4375      GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
4376     {&GNUNET_FS_handle_index_list_get, NULL, 
4377      GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
4378     {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
4379      sizeof (struct UnindexMessage) },
4380     {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
4381      0 },
4382     {NULL, NULL, 0, 0}
4383   };
4384   unsigned long long enc = 128;
4385
4386   sched = s;
4387   cfg = c;
4388   stats = GNUNET_STATISTICS_create (sched, "fs", cfg);
4389   min_migration_delay = GNUNET_TIME_UNIT_SECONDS;
4390   if ( (GNUNET_OK !=
4391         GNUNET_CONFIGURATION_get_value_number (cfg,
4392                                                "fs",
4393                                                "MAX_PENDING_REQUESTS",
4394                                                &max_pending_requests)) ||
4395        (GNUNET_OK !=
4396         GNUNET_CONFIGURATION_get_value_number (cfg,
4397                                                "fs",
4398                                                "EXPECTED_NEIGHBOUR_COUNT",
4399                                                &enc)) ||
4400        (GNUNET_OK != 
4401         GNUNET_CONFIGURATION_get_value_time (cfg,
4402                                              "fs",
4403                                              "MIN_MIGRATION_DELAY",
4404                                              &min_migration_delay)) )
4405     {
4406       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4407                   _("Configuration fails to specify certain parameters, assuming default values."));
4408     }
4409   connected_peers = GNUNET_CONTAINER_multihashmap_create (enc); 
4410   query_request_map = GNUNET_CONTAINER_multihashmap_create (max_pending_requests);
4411   rt_entry_lifetime = GNUNET_LOAD_value_init ();
4412   peer_request_map = GNUNET_CONTAINER_multihashmap_create (enc);
4413   requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
4414   core = GNUNET_CORE_connect (sched,
4415                               cfg,
4416                               GNUNET_TIME_UNIT_FOREVER_REL,
4417                               NULL,
4418                               NULL,
4419                               &peer_connect_handler,
4420                               &peer_disconnect_handler,
4421                               NULL,
4422                               NULL, GNUNET_NO,
4423                               NULL, GNUNET_NO,
4424                               p2p_handlers);
4425   if (NULL == core)
4426     {
4427       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
4428                   _("Failed to connect to `%s' service.\n"),
4429                   "core");
4430       GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
4431       connected_peers = NULL;
4432       GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
4433       query_request_map = NULL;
4434       GNUNET_LOAD_value_free (rt_entry_lifetime);
4435       rt_entry_lifetime = NULL;
4436       GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
4437       requests_by_expiration_heap = NULL;
4438       GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
4439       peer_request_map = NULL;
4440       if (dsh != NULL)
4441         {
4442           GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4443           dsh = NULL;
4444         }
4445       return GNUNET_SYSERR;
4446     }
4447   /* FIXME: distinguish between sending and storing in options? */
4448   if (active_migration) 
4449     {
4450       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4451                   _("Content migration is enabled, will start to gather data\n"));
4452       consider_migration_gathering ();
4453     }
4454   consider_dht_put_gathering (NULL);
4455   GNUNET_SERVER_disconnect_notify (server, 
4456                                    &handle_client_disconnect,
4457                                    NULL);
4458   GNUNET_assert (GNUNET_OK ==
4459                  GNUNET_CONFIGURATION_get_value_filename (cfg,
4460                                                           "fs",
4461                                                           "TRUST",
4462                                                           &trustDirectory));
4463   GNUNET_DISK_directory_create (trustDirectory);
4464   GNUNET_SCHEDULER_add_with_priority (sched,
4465                                       GNUNET_SCHEDULER_PRIORITY_HIGH,
4466                                       &cron_flush_trust, NULL);
4467
4468
4469   GNUNET_SERVER_add_handlers (server, handlers);
4470   GNUNET_SCHEDULER_add_delayed (sched,
4471                                 GNUNET_TIME_UNIT_FOREVER_REL,
4472                                 &shutdown_task,
4473                                 NULL);
4474   return GNUNET_OK;
4475 }
4476
4477
4478 /**
4479  * Process fs requests.
4480  *
4481  * @param cls closure
4482  * @param sched scheduler to use
4483  * @param server the initialized server
4484  * @param cfg configuration to use
4485  */
4486 static void
4487 run (void *cls,
4488      struct GNUNET_SCHEDULER_Handle *sched,
4489      struct GNUNET_SERVER_Handle *server,
4490      const struct GNUNET_CONFIGURATION_Handle *cfg)
4491 {
4492   active_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
4493                                                            "FS",
4494                                                            "ACTIVEMIGRATION");
4495   dsh = GNUNET_DATASTORE_connect (cfg,
4496                                   sched);
4497   if (dsh == NULL)
4498     {
4499       GNUNET_SCHEDULER_shutdown (sched);
4500       return;
4501     }
4502   datastore_get_load = GNUNET_LOAD_value_init ();
4503   datastore_put_load = GNUNET_LOAD_value_init ();
4504   block_cfg = GNUNET_CONFIGURATION_create ();
4505   GNUNET_CONFIGURATION_set_value_string (block_cfg,
4506                                          "block",
4507                                          "PLUGINS",
4508                                          "fs");
4509   block_ctx = GNUNET_BLOCK_context_create (block_cfg);
4510   GNUNET_assert (NULL != block_ctx);
4511   dht_handle = GNUNET_DHT_connect (sched,
4512                                    cfg,
4513                                    FS_DHT_HT_SIZE);
4514   if ( (GNUNET_OK != GNUNET_FS_indexing_init (sched, cfg, dsh)) ||
4515        (GNUNET_OK != main_init (sched, server, cfg)) )
4516     {    
4517       GNUNET_SCHEDULER_shutdown (sched);
4518       GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4519       dsh = NULL;
4520       GNUNET_DHT_disconnect (dht_handle);
4521       dht_handle = NULL;
4522       GNUNET_BLOCK_context_destroy (block_ctx);
4523       block_ctx = NULL;
4524       GNUNET_CONFIGURATION_destroy (block_cfg);
4525       block_cfg = NULL;
4526       GNUNET_LOAD_value_free (datastore_get_load);
4527       datastore_get_load = NULL;
4528       GNUNET_LOAD_value_free (datastore_put_load);
4529       datastore_put_load = NULL;
4530       return;   
4531     }
4532 }
4533
4534
4535 /**
4536  * The main function for the fs service.
4537  *
4538  * @param argc number of arguments from the command line
4539  * @param argv command line arguments
4540  * @return 0 ok, 1 on error
4541  */
4542 int
4543 main (int argc, char *const *argv)
4544 {
4545   return (GNUNET_OK ==
4546           GNUNET_SERVICE_run (argc,
4547                               argv,
4548                               "fs",
4549                               GNUNET_SERVICE_OPTION_NONE,
4550                               &run, NULL)) ? 0 : 1;
4551 }
4552
4553 /* end of gnunet-service-fs.c */