authentication of ciphertexts (+ seed)
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs.c
23  * @brief gnunet anonymity protocol implementation
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - track per-peer request latency (using new load API)
28  * - consider more precise latency estimation (per-peer & request) -- again load API?
29  * - implement test_load_too_high, make decision priority-based, implement forwarding, etc.
30  * - introduce random latency in processing
31  * - more statistics
32  */
33 #include "platform.h"
34 #include <float.h>
35 #include "gnunet_constants.h"
36 #include "gnunet_core_service.h"
37 #include "gnunet_dht_service.h"
38 #include "gnunet_datastore_service.h"
39 #include "gnunet_load_lib.h"
40 #include "gnunet_peer_lib.h"
41 #include "gnunet_protocols.h"
42 #include "gnunet_signatures.h"
43 #include "gnunet_statistics_service.h"
44 #include "gnunet_util_lib.h"
45 #include "gnunet-service-fs_indexing.h"
46 #include "fs.h"
47
48 #define DEBUG_FS GNUNET_NO
49
50 /**
51  * Maximum number of outgoing messages we queue per peer.
52  */
53 #define MAX_QUEUE_PER_PEER 16
54
55 /**
56  * Size for the hash map for DHT requests from the FS
57  * service.  Should be about the number of concurrent
58  * DHT requests we plan to make.
59  */
60 #define FS_DHT_HT_SIZE 1024
61
62 /**
63  * How often do we flush trust values to disk?
64  */
65 #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
66
67 /**
68  * Inverse of the probability that we will submit the same query
69  * to the same peer again.  If the same peer already got the query
70  * repeatedly recently, the probability is multiplied by the inverse
71  * of this number each time.  Note that we only try about every TTL_DECREMENT/2
72  * plus MAX_CORK_DELAY (so roughly every 3.5s).
73  */
74 #define RETRY_PROBABILITY_INV 3
75
76 /**
77  * What is the maximum delay for a P2P FS message (in our interaction
78  * with core)?  FS-internal delays are another story.  The value is
79  * chosen based on the 32k block size.  Given that peers typcially
80  * have at least 1 kb/s bandwidth, 45s waits give us a chance to
81  * transmit one message even to the lowest-bandwidth peers.
82  */
83 #define MAX_TRANSMIT_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 45)
84
85 /**
86  * Maximum number of requests (from other peers) that we're
87  * willing to have pending at any given point in time.
88  */
89 static unsigned long long max_pending_requests = (32 * 1024);
90
91
92 /**
93  * Information we keep for each pending reply.  The
94  * actual message follows at the end of this struct.
95  */
96 struct PendingMessage;
97
98 /**
99  * Function called upon completion of a transmission.
100  *
101  * @param cls closure
102  * @param pid ID of receiving peer, 0 on transmission error
103  */
104 typedef void (*TransmissionContinuation)(void * cls, 
105                                          GNUNET_PEER_Id tpid);
106
107
108 /**
109  * Information we keep for each pending message (GET/PUT).  The
110  * actual message follows at the end of this struct.
111  */
112 struct PendingMessage
113 {
114   /**
115    * This is a doubly-linked list of messages to the same peer.
116    */
117   struct PendingMessage *next;
118
119   /**
120    * This is a doubly-linked list of messages to the same peer.
121    */
122   struct PendingMessage *prev;
123
124   /**
125    * Entry in pending message list for this pending message.
126    */ 
127   struct PendingMessageList *pml;  
128
129   /**
130    * Function to call immediately once we have transmitted this
131    * message.
132    */
133   TransmissionContinuation cont;
134
135   /**
136    * Closure for cont.
137    */
138   void *cont_cls;
139
140   /**
141    * Size of the reply; actual reply message follows
142    * at the end of this struct.
143    */
144   size_t msize;
145   
146   /**
147    * How important is this message for us?
148    */
149   uint32_t priority;
150  
151 };
152
153
154 /**
155  * Information about a peer that we are connected to.
156  * We track data that is useful for determining which
157  * peers should receive our requests.  We also keep
158  * a list of messages to transmit to this peer.
159  */
160 struct ConnectedPeer
161 {
162
163   /**
164    * List of the last clients for which this peer successfully
165    * answered a query.
166    */
167   struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
168
169   /**
170    * List of the last PIDs for which
171    * this peer successfully answered a query;
172    * We use 0 to indicate no successful reply.
173    */
174   GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
175
176   /**
177    * Average delay between sending the peer a request and
178    * getting a reply (only calculated over the requests for
179    * which we actually got a reply).   Calculated
180    * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
181    */ 
182   struct GNUNET_TIME_Relative avg_delay;
183
184   /**
185    * Point in time until which this peer does not want us to migrate content
186    * to it.
187    */
188   struct GNUNET_TIME_Absolute migration_blocked;
189
190   /**
191    * Time until when we blocked this peer from migrating
192    * data to us.
193    */
194   struct GNUNET_TIME_Absolute last_migration_block;
195
196   /**
197    * Handle for an active request for transmission to this
198    * peer, or NULL.
199    */
200   struct GNUNET_CORE_TransmitHandle *cth;
201
202   /**
203    * Messages (replies, queries, content migration) we would like to
204    * send to this peer in the near future.  Sorted by priority, head.
205    */
206   struct PendingMessage *pending_messages_head;
207
208   /**
209    * Messages (replies, queries, content migration) we would like to
210    * send to this peer in the near future.  Sorted by priority, tail.
211    */
212   struct PendingMessage *pending_messages_tail;
213
214   /**
215    * Average priority of successful replies.  Calculated
216    * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
217    */
218   double avg_priority;
219
220   /**
221    * Increase in traffic preference still to be submitted
222    * to the core service for this peer.
223    */
224   uint64_t inc_preference;
225
226   /**
227    * Trust rating for this peer
228    */
229   uint32_t trust;
230
231   /**
232    * Trust rating for this peer on disk.
233    */
234   uint32_t disk_trust;
235
236   /**
237    * The peer's identity.
238    */
239   GNUNET_PEER_Id pid;  
240
241   /**
242    * Size of the linked list of 'pending_messages'.
243    */
244   unsigned int pending_requests;
245
246   /**
247    * Which offset in "last_p2p_replies" will be updated next?
248    * (we go round-robin).
249    */
250   unsigned int last_p2p_replies_woff;
251
252   /**
253    * Which offset in "last_client_replies" will be updated next?
254    * (we go round-robin).
255    */
256   unsigned int last_client_replies_woff;
257
258 };
259
260
261 /**
262  * Information we keep for each pending request.  We should try to
263  * keep this struct as small as possible since its memory consumption
264  * is key to how many requests we can have pending at once.
265  */
266 struct PendingRequest;
267
268
269 /**
270  * Doubly-linked list of requests we are performing
271  * on behalf of the same client.
272  */
273 struct ClientRequestList
274 {
275
276   /**
277    * This is a doubly-linked list.
278    */
279   struct ClientRequestList *next;
280
281   /**
282    * This is a doubly-linked list.
283    */
284   struct ClientRequestList *prev;
285
286   /**
287    * Request this entry represents.
288    */
289   struct PendingRequest *req;
290
291   /**
292    * Client list this request belongs to.
293    */
294   struct ClientList *client_list;
295
296 };
297
298
299 /**
300  * Replies to be transmitted to the client.  The actual
301  * response message is allocated after this struct.
302  */
303 struct ClientResponseMessage
304 {
305   /**
306    * This is a doubly-linked list.
307    */
308   struct ClientResponseMessage *next;
309
310   /**
311    * This is a doubly-linked list.
312    */
313   struct ClientResponseMessage *prev;
314
315   /**
316    * Client list entry this response belongs to.
317    */
318   struct ClientList *client_list;
319
320   /**
321    * Number of bytes in the response.
322    */
323   size_t msize;
324 };
325
326
327 /**
328  * Linked list of clients we are performing requests
329  * for right now.
330  */
331 struct ClientList
332 {
333   /**
334    * This is a linked list.
335    */
336   struct ClientList *next;
337
338   /**
339    * ID of a client making a request, NULL if this entry is for a
340    * peer.
341    */
342   struct GNUNET_SERVER_Client *client;
343
344   /**
345    * Head of list of requests performed on behalf
346    * of this client right now.
347    */
348   struct ClientRequestList *rl_head;
349
350   /**
351    * Tail of list of requests performed on behalf
352    * of this client right now.
353    */
354   struct ClientRequestList *rl_tail;
355
356   /**
357    * Head of linked list of responses.
358    */
359   struct ClientResponseMessage *res_head;
360
361   /**
362    * Tail of linked list of responses.
363    */
364   struct ClientResponseMessage *res_tail;
365
366   /**
367    * Context for sending replies.
368    */
369   struct GNUNET_CONNECTION_TransmitHandle *th;
370
371 };
372
373
374 /**
375  * Doubly-linked list of messages we are performing
376  * due to a pending request.
377  */
378 struct PendingMessageList
379 {
380
381   /**
382    * This is a doubly-linked list of messages on behalf of the same request.
383    */
384   struct PendingMessageList *next;
385
386   /**
387    * This is a doubly-linked list of messages on behalf of the same request.
388    */
389   struct PendingMessageList *prev;
390
391   /**
392    * Message this entry represents.
393    */
394   struct PendingMessage *pm;
395
396   /**
397    * Request this entry belongs to.
398    */
399   struct PendingRequest *req;
400
401   /**
402    * Peer this message is targeted for.
403    */
404   struct ConnectedPeer *target;
405
406 };
407
408
409 /**
410  * Information we keep for each pending request.  We should try to
411  * keep this struct as small as possible since its memory consumption
412  * is key to how many requests we can have pending at once.
413  */
414 struct PendingRequest
415 {
416
417   /**
418    * If this request was made by a client, this is our entry in the
419    * client request list; otherwise NULL.
420    */
421   struct ClientRequestList *client_request_list;
422
423   /**
424    * Entry of peer responsible for this entry (if this request
425    * was made by a peer).
426    */
427   struct ConnectedPeer *cp;
428
429   /**
430    * If this is a namespace query, pointer to the hash of the public
431    * key of the namespace; otherwise NULL.  Pointer will be to the 
432    * end of this struct (so no need to free it).
433    */
434   const GNUNET_HashCode *namespace;
435
436   /**
437    * Bloomfilter we use to filter out replies that we don't care about
438    * (anymore).  NULL as long as we are interested in all replies.
439    */
440   struct GNUNET_CONTAINER_BloomFilter *bf;
441
442   /**
443    * Context of our GNUNET_CORE_peer_change_preference call.
444    */
445   struct GNUNET_CORE_InformationRequestContext *irc;
446
447   /**
448    * Hash code of all replies that we have seen so far (only valid
449    * if client is not NULL since we only track replies like this for
450    * our own clients).
451    */
452   GNUNET_HashCode *replies_seen;
453
454   /**
455    * Node in the heap representing this entry; NULL
456    * if we have no heap node.
457    */
458   struct GNUNET_CONTAINER_HeapNode *hnode;
459
460   /**
461    * Head of list of messages being performed on behalf of this
462    * request.
463    */
464   struct PendingMessageList *pending_head;
465
466   /**
467    * Tail of list of messages being performed on behalf of this
468    * request.
469    */
470   struct PendingMessageList *pending_tail;
471
472   /**
473    * When did we first see this request (form this peer), or, if our
474    * client is initiating, when did we last initiate a search?
475    */
476   struct GNUNET_TIME_Absolute start_time;
477
478   /**
479    * The query that this request is for.
480    */
481   GNUNET_HashCode query;
482
483   /**
484    * The task responsible for transmitting queries
485    * for this request.
486    */
487   GNUNET_SCHEDULER_TaskIdentifier task;
488
489   /**
490    * (Interned) Peer identifier that identifies a preferred target
491    * for requests.
492    */
493   GNUNET_PEER_Id target_pid;
494
495   /**
496    * (Interned) Peer identifiers of peers that have already
497    * received our query for this content.
498    */
499   GNUNET_PEER_Id *used_pids;
500   
501   /**
502    * Our entry in the queue (non-NULL while we wait for our
503    * turn to interact with the local database).
504    */
505   struct GNUNET_DATASTORE_QueueEntry *qe;
506
507   /**
508    * Size of the 'bf' (in bytes).
509    */
510   size_t bf_size;
511
512   /**
513    * Desired anonymity level; only valid for requests from a local client.
514    */
515   uint32_t anonymity_level;
516
517   /**
518    * How many entries in "used_pids" are actually valid?
519    */
520   unsigned int used_pids_off;
521
522   /**
523    * How long is the "used_pids" array?
524    */
525   unsigned int used_pids_size;
526
527   /**
528    * Number of results found for this request.
529    */
530   unsigned int results_found;
531
532   /**
533    * How many entries in "replies_seen" are actually valid?
534    */
535   unsigned int replies_seen_off;
536
537   /**
538    * How long is the "replies_seen" array?
539    */
540   unsigned int replies_seen_size;
541   
542   /**
543    * Priority with which this request was made.  If one of our clients
544    * made the request, then this is the current priority that we are
545    * using when initiating the request.  This value is used when
546    * we decide to reward other peers with trust for providing a reply.
547    */
548   uint32_t priority;
549
550   /**
551    * Priority points left for us to spend when forwarding this request
552    * to other peers.
553    */
554   uint32_t remaining_priority;
555
556   /**
557    * Number to mingle hashes for bloom-filter tests with.
558    */
559   int32_t mingle;
560
561   /**
562    * TTL with which we saw this request (or, if we initiated, TTL that
563    * we used for the request).
564    */
565   int32_t ttl;
566   
567   /**
568    * Type of the content that this request is for.
569    */
570   enum GNUNET_BLOCK_Type type;
571
572   /**
573    * Remove this request after transmission of the current response.
574    */
575   int16_t do_remove;
576
577   /**
578    * GNUNET_YES if we should not forward this request to other peers.
579    */
580   int16_t local_only;
581
582 };
583
584
585 /**
586  * Block that is ready for migration to other peers.  Actual data is at the end of the block.
587  */
588 struct MigrationReadyBlock
589 {
590
591   /**
592    * This is a doubly-linked list.
593    */
594   struct MigrationReadyBlock *next;
595
596   /**
597    * This is a doubly-linked list.
598    */
599   struct MigrationReadyBlock *prev;
600
601   /**
602    * Query for the block.
603    */
604   GNUNET_HashCode query;
605
606   /**
607    * When does this block expire? 
608    */
609   struct GNUNET_TIME_Absolute expiration;
610
611   /**
612    * Peers we would consider forwarding this
613    * block to.  Zero for empty entries.
614    */
615   GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
616
617   /**
618    * Size of the block.
619    */
620   size_t size;
621
622   /**
623    *  Number of targets already used.
624    */
625   unsigned int used_targets;
626
627   /**
628    * Type of the block.
629    */
630   enum GNUNET_BLOCK_Type type;
631 };
632
633
634 /**
635  * Our connection to the datastore.
636  */
637 static struct GNUNET_DATASTORE_Handle *dsh;
638
639 /**
640  * Our block context.
641  */
642 static struct GNUNET_BLOCK_Context *block_ctx;
643
644 /**
645  * Our block configuration.
646  */
647 static struct GNUNET_CONFIGURATION_Handle *block_cfg;
648
649 /**
650  * Our scheduler.
651  */
652 static struct GNUNET_SCHEDULER_Handle *sched;
653
654 /**
655  * Our configuration.
656  */
657 static const struct GNUNET_CONFIGURATION_Handle *cfg;
658
659 /**
660  * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
661  */
662 static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
663
664 /**
665  * Map of peer identifiers to "struct PendingRequest" (for that peer).
666  */
667 static struct GNUNET_CONTAINER_MultiHashMap *peer_request_map;
668
669 /**
670  * Map of query identifiers to "struct PendingRequest" (for that query).
671  */
672 static struct GNUNET_CONTAINER_MultiHashMap *query_request_map;
673
674 /**
675  * Heap with the request that will expire next at the top.  Contains
676  * pointers of type "struct PendingRequest*"; these will *also* be
677  * aliased from the "requests_by_peer" data structures and the
678  * "requests_by_query" table.  Note that requests from our clients
679  * don't expire and are thus NOT in the "requests_by_expiration"
680  * (or the "requests_by_peer" tables).
681  */
682 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
683
684 /**
685  * Handle for reporting statistics.
686  */
687 static struct GNUNET_STATISTICS_Handle *stats;
688
689 /**
690  * Linked list of clients we are currently processing requests for.
691  */
692 static struct ClientList *client_list;
693
694 /**
695  * Pointer to handle to the core service (points to NULL until we've
696  * connected to it).
697  */
698 static struct GNUNET_CORE_Handle *core;
699
700 /**
701  * Head of linked list of blocks that can be migrated.
702  */
703 static struct MigrationReadyBlock *mig_head;
704
705 /**
706  * Tail of linked list of blocks that can be migrated.
707  */
708 static struct MigrationReadyBlock *mig_tail;
709
710 /**
711  * Request to datastore for migration (or NULL).
712  */
713 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
714
715 /**
716  * Where do we store trust information?
717  */
718 static char *trustDirectory;
719
720 /**
721  * ID of task that collects blocks for migration.
722  */
723 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
724
725 /**
726  * What is the maximum frequency at which we are allowed to
727  * poll the datastore for migration content?
728  */
729 static struct GNUNET_TIME_Relative min_migration_delay;
730
731 /**
732  * Handle for DHT operations.
733  */
734 static struct GNUNET_DHT_Handle *dht_handle;
735
736 /**
737  * Size of the doubly-linked list of migration blocks.
738  */
739 static unsigned int mig_size;
740
741 /**
742  * Are we allowed to migrate content to this peer.
743  */
744 static int active_migration;
745
746 /**
747  * Typical priorities we're seeing from other peers right now.  Since
748  * most priorities will be zero, this value is the weighted average of
749  * non-zero priorities seen "recently".  In order to ensure that new
750  * values do not dramatically change the ratio, values are first
751  * "capped" to a reasonable range (+N of the current value) and then
752  * averaged into the existing value by a ratio of 1:N.  Hence
753  * receiving the largest possible priority can still only raise our
754  * "current_priorities" by at most 1.
755  */
756 static double current_priorities;
757
758 /**
759  * Datastore 'GET' load tracking.
760  */
761 static struct GNUNET_LOAD_Value *datastore_get_load;
762
763 /**
764  * Datastore 'PUT' load tracking.
765  */
766 static struct GNUNET_LOAD_Value *datastore_put_load;
767
768
769 /**
770  * We've just now completed a datastore request.  Update our
771  * datastore load calculations.
772  *
773  * @param start time when the datastore request was issued
774  */
775 static void
776 update_datastore_delays (struct GNUNET_TIME_Absolute start)
777 {
778   struct GNUNET_TIME_Relative delay;
779
780   delay = GNUNET_TIME_absolute_get_duration (start);
781   GNUNET_LOAD_update (datastore_get_load,
782                       delay.value);
783 }
784
785
786 /**
787  * Get the filename under which we would store the GNUNET_HELLO_Message
788  * for the given host and protocol.
789  * @return filename of the form DIRECTORY/HOSTID
790  */
791 static char *
792 get_trust_filename (const struct GNUNET_PeerIdentity *id)
793 {
794   struct GNUNET_CRYPTO_HashAsciiEncoded fil;
795   char *fn;
796
797   GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
798   GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil);
799   return fn;
800 }
801
802
803
804 /**
805  * Transmit messages by copying it to the target buffer
806  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
807  * for writing in the meantime.  In that case, do nothing
808  * (the disconnect or shutdown handler will take care of the rest).
809  * If we were able to transmit messages and there are still more
810  * pending, ask core again for further calls to this function.
811  *
812  * @param cls closure, pointer to the 'struct ConnectedPeer*'
813  * @param size number of bytes available in buf
814  * @param buf where the callee should write the message
815  * @return number of bytes written to buf
816  */
817 static size_t
818 transmit_to_peer (void *cls,
819                   size_t size, void *buf);
820
821
822 /* ******************* clean up functions ************************ */
823
824 /**
825  * Delete the given migration block.
826  *
827  * @param mb block to delete
828  */
829 static void
830 delete_migration_block (struct MigrationReadyBlock *mb)
831 {
832   GNUNET_CONTAINER_DLL_remove (mig_head,
833                                mig_tail,
834                                mb);
835   GNUNET_PEER_decrement_rcs (mb->target_list,
836                              MIGRATION_LIST_SIZE);
837   mig_size--;
838   GNUNET_free (mb);
839 }
840
841
842 /**
843  * Compare the distance of two peers to a key.
844  *
845  * @param key key
846  * @param p1 first peer
847  * @param p2 second peer
848  * @return GNUNET_YES if P1 is closer to key than P2
849  */
850 static int
851 is_closer (const GNUNET_HashCode *key,
852            const struct GNUNET_PeerIdentity *p1,
853            const struct GNUNET_PeerIdentity *p2)
854 {
855   return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey,
856                                     &p2->hashPubKey,
857                                     key);
858 }
859
860
861 /**
862  * Consider migrating content to a given peer.
863  *
864  * @param cls 'struct MigrationReadyBlock*' to select
865  *            targets for (or NULL for none)
866  * @param key ID of the peer 
867  * @param value 'struct ConnectedPeer' of the peer
868  * @return GNUNET_YES (always continue iteration)
869  */
870 static int
871 consider_migration (void *cls,
872                     const GNUNET_HashCode *key,
873                     void *value)
874 {
875   struct MigrationReadyBlock *mb = cls;
876   struct ConnectedPeer *cp = value;
877   struct MigrationReadyBlock *pos;
878   struct GNUNET_PeerIdentity cppid;
879   struct GNUNET_PeerIdentity otherpid;
880   struct GNUNET_PeerIdentity worstpid;
881   size_t msize;
882   unsigned int i;
883   unsigned int repl;
884   
885   /* consider 'cp' as a migration target for mb */
886   if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).value > 0)
887     return GNUNET_YES; /* peer has requested no migration! */
888   if (mb != NULL)
889     {
890       GNUNET_PEER_resolve (cp->pid,
891                            &cppid);
892       repl = MIGRATION_LIST_SIZE;
893       for (i=0;i<MIGRATION_LIST_SIZE;i++)
894         {
895           if (mb->target_list[i] == 0)
896             {
897               mb->target_list[i] = cp->pid;
898               GNUNET_PEER_change_rc (mb->target_list[i], 1);
899               repl = MIGRATION_LIST_SIZE;
900               break;
901             }
902           GNUNET_PEER_resolve (mb->target_list[i],
903                                &otherpid);
904           if ( (repl == MIGRATION_LIST_SIZE) &&
905                is_closer (&mb->query,
906                           &cppid,
907                           &otherpid)) 
908             {
909               repl = i;
910               worstpid = otherpid;
911             }
912           else if ( (repl != MIGRATION_LIST_SIZE) &&
913                     (is_closer (&mb->query,
914                                 &worstpid,
915                                 &otherpid) ) )
916             {
917               repl = i;
918               worstpid = otherpid;
919             }       
920         }
921       if (repl != MIGRATION_LIST_SIZE) 
922         {
923           GNUNET_PEER_change_rc (mb->target_list[repl], -1);
924           mb->target_list[repl] = cp->pid;
925           GNUNET_PEER_change_rc (mb->target_list[repl], 1);
926         }
927     }
928
929   /* consider scheduling transmission to cp for content migration */
930   if (cp->cth != NULL)
931     return GNUNET_YES; 
932   msize = 0;
933   pos = mig_head;
934   while (pos != NULL)
935     {
936       for (i=0;i<MIGRATION_LIST_SIZE;i++)
937         {
938           if (cp->pid == pos->target_list[i])
939             {
940               if (msize == 0)
941                 msize = pos->size;
942               else
943                 msize = GNUNET_MIN (msize,
944                                     pos->size);
945               break;
946             }
947         }
948       pos = pos->next;
949     }
950   if (msize == 0)
951     return GNUNET_YES; /* no content available */
952 #if DEBUG_FS
953   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
954               "Trying to migrate at least %u bytes to peer `%s'\n",
955               msize,
956               GNUNET_h2s (key));
957 #endif
958   cp->cth 
959     = GNUNET_CORE_notify_transmit_ready (core,
960                                          0, GNUNET_TIME_UNIT_FOREVER_REL,
961                                          (const struct GNUNET_PeerIdentity*) key,
962                                          msize + sizeof (struct PutMessage),
963                                          &transmit_to_peer,
964                                          cp);
965   return GNUNET_YES;
966 }
967
968
969 /**
970  * Task that is run periodically to obtain blocks for content
971  * migration
972  * 
973  * @param cls unused
974  * @param tc scheduler context (also unused)
975  */
976 static void
977 gather_migration_blocks (void *cls,
978                          const struct GNUNET_SCHEDULER_TaskContext *tc);
979
980
981 /**
982  * If the migration task is not currently running, consider
983  * (re)scheduling it with the appropriate delay.
984  */
985 static void
986 consider_migration_gathering ()
987 {
988   struct GNUNET_TIME_Relative delay;
989
990   if (dsh == NULL)
991     return;
992   if (mig_qe != NULL)
993     return;
994   if (mig_task != GNUNET_SCHEDULER_NO_TASK)
995     return;
996   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
997                                          mig_size);
998   delay = GNUNET_TIME_relative_divide (delay,
999                                        MAX_MIGRATION_QUEUE);
1000   delay = GNUNET_TIME_relative_max (delay,
1001                                     min_migration_delay);
1002   mig_task = GNUNET_SCHEDULER_add_delayed (sched,
1003                                            delay,
1004                                            &gather_migration_blocks,
1005                                            NULL);
1006 }
1007
1008
1009 /**
1010  * Process content offered for migration.
1011  *
1012  * @param cls closure
1013  * @param key key for the content
1014  * @param size number of bytes in data
1015  * @param data content stored
1016  * @param type type of the content
1017  * @param priority priority of the content
1018  * @param anonymity anonymity-level for the content
1019  * @param expiration expiration time for the content
1020  * @param uid unique identifier for the datum;
1021  *        maybe 0 if no unique identifier is available
1022  */
1023 static void
1024 process_migration_content (void *cls,
1025                            const GNUNET_HashCode * key,
1026                            uint32_t size,
1027                            const void *data,
1028                            enum GNUNET_BLOCK_Type type,
1029                            uint32_t priority,
1030                            uint32_t anonymity,
1031                            struct GNUNET_TIME_Absolute
1032                            expiration, uint64_t uid)
1033 {
1034   struct MigrationReadyBlock *mb;
1035   
1036   if (key == NULL)
1037     {
1038       mig_qe = NULL;
1039       if (mig_size < MAX_MIGRATION_QUEUE)  
1040         consider_migration_gathering ();
1041       return;
1042     }
1043   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1044     {
1045       if (GNUNET_OK !=
1046           GNUNET_FS_handle_on_demand_block (key, size, data,
1047                                             type, priority, anonymity,
1048                                             expiration, uid, 
1049                                             &process_migration_content,
1050                                             NULL))
1051         {
1052           GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1053         }
1054       return;
1055     }
1056 #if DEBUG_FS
1057   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1058               "Retrieved block `%s' of type %u for migration\n",
1059               GNUNET_h2s (key),
1060               type);
1061 #endif
1062   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
1063   mb->query = *key;
1064   mb->expiration = expiration;
1065   mb->size = size;
1066   mb->type = type;
1067   memcpy (&mb[1], data, size);
1068   GNUNET_CONTAINER_DLL_insert_after (mig_head,
1069                                      mig_tail,
1070                                      mig_tail,
1071                                      mb);
1072   mig_size++;
1073   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1074                                          &consider_migration,
1075                                          mb);
1076   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1077 }
1078
1079
1080 /**
1081  * Task that is run periodically to obtain blocks for content
1082  * migration
1083  * 
1084  * @param cls unused
1085  * @param tc scheduler context (also unused)
1086  */
1087 static void
1088 gather_migration_blocks (void *cls,
1089                          const struct GNUNET_SCHEDULER_TaskContext *tc)
1090 {
1091   mig_task = GNUNET_SCHEDULER_NO_TASK;
1092   if (dsh != NULL)
1093     {
1094       mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, -1,
1095                                             GNUNET_TIME_UNIT_FOREVER_REL,
1096                                             &process_migration_content, NULL);
1097       GNUNET_assert (mig_qe != NULL);
1098     }
1099 }
1100
1101
1102 /**
1103  * We're done with a particular message list entry.
1104  * Free all associated resources.
1105  * 
1106  * @param pml entry to destroy
1107  */
1108 static void
1109 destroy_pending_message_list_entry (struct PendingMessageList *pml)
1110 {
1111   GNUNET_CONTAINER_DLL_remove (pml->req->pending_head,
1112                                pml->req->pending_tail,
1113                                pml);
1114   GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head,
1115                                pml->target->pending_messages_tail,
1116                                pml->pm);
1117   pml->target->pending_requests--;
1118   GNUNET_free (pml->pm);
1119   GNUNET_free (pml);
1120 }
1121
1122
1123 /**
1124  * Destroy the given pending message (and call the respective
1125  * continuation).
1126  *
1127  * @param pm message to destroy
1128  * @param tpid id of peer that the message was delivered to, or 0 for none
1129  */
1130 static void
1131 destroy_pending_message (struct PendingMessage *pm,
1132                          GNUNET_PEER_Id tpid)
1133 {
1134   struct PendingMessageList *pml = pm->pml;
1135   TransmissionContinuation cont;
1136   void *cont_cls;
1137
1138   if (pml != NULL)
1139     {
1140       GNUNET_assert (pml->pm == pm);
1141       GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
1142       cont = pm->cont;
1143       cont_cls = pm->cont_cls;
1144       destroy_pending_message_list_entry (pml);
1145     }
1146   else
1147     {
1148       GNUNET_free (pm);
1149     }
1150   if (cont != NULL)
1151     cont (cont_cls, tpid);  
1152 }
1153
1154
1155 /**
1156  * We're done processing a particular request.
1157  * Free all associated resources.
1158  *
1159  * @param pr request to destroy
1160  */
1161 static void
1162 destroy_pending_request (struct PendingRequest *pr)
1163 {
1164   struct GNUNET_PeerIdentity pid;
1165
1166   if (pr->hnode != NULL)
1167     {
1168       GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
1169                                          pr->hnode);
1170       pr->hnode = NULL;
1171     }
1172   if (NULL == pr->client_request_list)
1173     {
1174       GNUNET_STATISTICS_update (stats,
1175                                 gettext_noop ("# P2P searches active"),
1176                                 -1,
1177                                 GNUNET_NO);
1178     }
1179   else
1180     {
1181       GNUNET_STATISTICS_update (stats,
1182                                 gettext_noop ("# client searches active"),
1183                                 -1,
1184                                 GNUNET_NO);
1185     }
1186   /* might have already been removed from map in 'process_reply' (if
1187      there was a unique reply) or never inserted if it was a
1188      duplicate; hence ignore the return value here */
1189   (void) GNUNET_CONTAINER_multihashmap_remove (query_request_map,
1190                                                &pr->query,
1191                                                pr);
1192   if (pr->qe != NULL)
1193      {
1194       GNUNET_DATASTORE_cancel (pr->qe);
1195       pr->qe = NULL;
1196     }
1197   if (pr->client_request_list != NULL)
1198     {
1199       GNUNET_CONTAINER_DLL_remove (pr->client_request_list->client_list->rl_head,
1200                                    pr->client_request_list->client_list->rl_tail,
1201                                    pr->client_request_list);
1202       GNUNET_free (pr->client_request_list);
1203       pr->client_request_list = NULL;
1204     }
1205   if (pr->cp != NULL)
1206     {
1207       GNUNET_PEER_resolve (pr->cp->pid,
1208                            &pid);
1209       (void) GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1210                                                    &pid.hashPubKey,
1211                                                    pr);
1212       pr->cp = NULL;
1213     }
1214   if (pr->bf != NULL)
1215     {
1216       GNUNET_CONTAINER_bloomfilter_free (pr->bf);                                        
1217       pr->bf = NULL;
1218     }
1219   if (pr->irc != NULL)
1220     {
1221       GNUNET_CORE_peer_change_preference_cancel (pr->irc);
1222       pr->irc = NULL;
1223     }
1224   if (pr->replies_seen != NULL)
1225     {
1226       GNUNET_free (pr->replies_seen);
1227       pr->replies_seen = NULL;
1228     }
1229   if (pr->task != GNUNET_SCHEDULER_NO_TASK)
1230     {
1231       GNUNET_SCHEDULER_cancel (sched,
1232                                pr->task);
1233       pr->task = GNUNET_SCHEDULER_NO_TASK;
1234     }
1235   while (NULL != pr->pending_head)    
1236     destroy_pending_message_list_entry (pr->pending_head);
1237   GNUNET_PEER_change_rc (pr->target_pid, -1);
1238   if (pr->used_pids != NULL)
1239     {
1240       GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
1241       GNUNET_free (pr->used_pids);
1242       pr->used_pids_off = 0;
1243       pr->used_pids_size = 0;
1244       pr->used_pids = NULL;
1245     }
1246   GNUNET_free (pr);
1247 }
1248
1249
1250 /**
1251  * Method called whenever a given peer connects.
1252  *
1253  * @param cls closure, not used
1254  * @param peer peer identity this notification is about
1255  * @param latency reported latency of the connection with 'other'
1256  * @param distance reported distance (DV) to 'other' 
1257  */
1258 static void 
1259 peer_connect_handler (void *cls,
1260                       const struct
1261                       GNUNET_PeerIdentity * peer,
1262                       struct GNUNET_TIME_Relative latency,
1263                       uint32_t distance)
1264 {
1265   struct ConnectedPeer *cp;
1266   struct MigrationReadyBlock *pos;
1267   char *fn;
1268   uint32_t trust;
1269   
1270   cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
1271   cp->pid = GNUNET_PEER_intern (peer);
1272
1273   fn = get_trust_filename (peer);
1274   if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
1275       (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
1276     cp->disk_trust = cp->trust = ntohl (trust);
1277   GNUNET_free (fn);
1278
1279   GNUNET_break (GNUNET_OK ==
1280                 GNUNET_CONTAINER_multihashmap_put (connected_peers,
1281                                                    &peer->hashPubKey,
1282                                                    cp,
1283                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1284
1285   pos = mig_head;
1286   while (NULL != pos)
1287     {
1288       (void) consider_migration (pos, &peer->hashPubKey, cp);
1289       pos = pos->next;
1290     }
1291 }
1292
1293
1294 /**
1295  * Increase the host credit by a value.
1296  *
1297  * @param host which peer to change the trust value on
1298  * @param value is the int value by which the
1299  *  host credit is to be increased or decreased
1300  * @returns the actual change in trust (positive or negative)
1301  */
1302 static int
1303 change_host_trust (struct ConnectedPeer *host, int value)
1304 {
1305   unsigned int old_trust;
1306
1307   if (value == 0)
1308     return 0;
1309   GNUNET_assert (host != NULL);
1310   old_trust = host->trust;
1311   if (value > 0)
1312     {
1313       if (host->trust + value < host->trust)
1314         {
1315           value = UINT32_MAX - host->trust;
1316           host->trust = UINT32_MAX;
1317         }
1318       else
1319         host->trust += value;
1320     }
1321   else
1322     {
1323       if (host->trust < -value)
1324         {
1325           value = -host->trust;
1326           host->trust = 0;
1327         }
1328       else
1329         host->trust += value;
1330     }
1331   return value;
1332 }
1333
1334
1335 /**
1336  * Write host-trust information to a file - flush the buffer entry!
1337  */
1338 static int
1339 flush_trust (void *cls,
1340              const GNUNET_HashCode *key,
1341              void *value)
1342 {
1343   struct ConnectedPeer *host = value;
1344   char *fn;
1345   uint32_t trust;
1346   struct GNUNET_PeerIdentity pid;
1347
1348   if (host->trust == host->disk_trust)
1349     return GNUNET_OK;                     /* unchanged */
1350   GNUNET_PEER_resolve (host->pid,
1351                        &pid);
1352   fn = get_trust_filename (&pid);
1353   if (host->trust == 0)
1354     {
1355       if ((0 != UNLINK (fn)) && (errno != ENOENT))
1356         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
1357                                   GNUNET_ERROR_TYPE_BULK, "unlink", fn);
1358     }
1359   else
1360     {
1361       trust = htonl (host->trust);
1362       if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust, 
1363                                                     sizeof(uint32_t),
1364                                                     GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE
1365                                                     | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ))
1366         host->disk_trust = host->trust;
1367     }
1368   GNUNET_free (fn);
1369   return GNUNET_OK;
1370 }
1371
1372 /**
1373  * Call this method periodically to scan data/hosts for new hosts.
1374  */
1375 static void
1376 cron_flush_trust (void *cls,
1377                   const struct GNUNET_SCHEDULER_TaskContext *tc)
1378 {
1379
1380   if (NULL == connected_peers)
1381     return;
1382   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1383                                          &flush_trust,
1384                                          NULL);
1385   if (NULL == tc)
1386     return;
1387   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1388     return;
1389   GNUNET_SCHEDULER_add_delayed (tc->sched,
1390                                 TRUST_FLUSH_FREQ, &cron_flush_trust, NULL);
1391 }
1392
1393
1394 /**
1395  * Free (each) request made by the peer.
1396  *
1397  * @param cls closure, points to peer that the request belongs to
1398  * @param key current key code
1399  * @param value value in the hash map
1400  * @return GNUNET_YES (we should continue to iterate)
1401  */
1402 static int
1403 destroy_request (void *cls,
1404                  const GNUNET_HashCode * key,
1405                  void *value)
1406 {
1407   const struct GNUNET_PeerIdentity * peer = cls;
1408   struct PendingRequest *pr = value;
1409   
1410   GNUNET_break (GNUNET_YES ==
1411                 GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1412                                                       &peer->hashPubKey,
1413                                                       pr));
1414   destroy_pending_request (pr);
1415   return GNUNET_YES;
1416 }
1417
1418
1419 /**
1420  * Method called whenever a peer disconnects.
1421  *
1422  * @param cls closure, not used
1423  * @param peer peer identity this notification is about
1424  */
1425 static void
1426 peer_disconnect_handler (void *cls,
1427                          const struct
1428                          GNUNET_PeerIdentity * peer)
1429 {
1430   struct ConnectedPeer *cp;
1431   struct PendingMessage *pm;
1432   unsigned int i;
1433   struct MigrationReadyBlock *pos;
1434   struct MigrationReadyBlock *next;
1435
1436   GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map,
1437                                               &peer->hashPubKey,
1438                                               &destroy_request,
1439                                               (void*) peer);
1440   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1441                                           &peer->hashPubKey);
1442   if (cp == NULL)
1443     return;
1444   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1445     {
1446       if (NULL != cp->last_client_replies[i])
1447         {
1448           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1449           cp->last_client_replies[i] = NULL;
1450         }
1451     }
1452   GNUNET_break (GNUNET_YES ==
1453                 GNUNET_CONTAINER_multihashmap_remove (connected_peers,
1454                                                       &peer->hashPubKey,
1455                                                       cp));
1456   /* remove this peer from migration considerations; schedule
1457      alternatives */
1458   next = mig_head;
1459   while (NULL != (pos = next))
1460     {
1461       next = pos->next;
1462       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1463         {
1464           if (pos->target_list[i] == cp->pid)
1465             {
1466               GNUNET_PEER_change_rc (pos->target_list[i], -1);
1467               pos->target_list[i] = 0;
1468             }
1469          }
1470       if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers))
1471         {
1472           delete_migration_block (pos);
1473           consider_migration_gathering ();
1474           continue;
1475         }
1476       GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1477                                              &consider_migration,
1478                                              pos);
1479     }
1480   GNUNET_PEER_change_rc (cp->pid, -1);
1481   GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1482   if (NULL != cp->cth)
1483     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1484   while (NULL != (pm = cp->pending_messages_head))
1485     destroy_pending_message (pm, 0 /* delivery failed */);
1486   GNUNET_break (0 == cp->pending_requests);
1487   GNUNET_free (cp);
1488 }
1489
1490
1491 /**
1492  * Iterator over hash map entries that removes all occurences
1493  * of the given 'client' from the 'last_client_replies' of the
1494  * given connected peer.
1495  *
1496  * @param cls closure, the 'struct GNUNET_SERVER_Client*' to remove
1497  * @param key current key code (unused)
1498  * @param value value in the hash map (the 'struct ConnectedPeer*' to change)
1499  * @return GNUNET_YES (we should continue to iterate)
1500  */
1501 static int
1502 remove_client_from_last_client_replies (void *cls,
1503                                         const GNUNET_HashCode * key,
1504                                         void *value)
1505 {
1506   struct GNUNET_SERVER_Client *client = cls;
1507   struct ConnectedPeer *cp = value;
1508   unsigned int i;
1509
1510   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1511     {
1512       if (cp->last_client_replies[i] == client)
1513         {
1514           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1515           cp->last_client_replies[i] = NULL;
1516         }
1517     }  
1518   return GNUNET_YES;
1519 }
1520
1521
1522 /**
1523  * A client disconnected.  Remove all of its pending queries.
1524  *
1525  * @param cls closure, NULL
1526  * @param client identification of the client
1527  */
1528 static void
1529 handle_client_disconnect (void *cls,
1530                           struct GNUNET_SERVER_Client
1531                           * client)
1532 {
1533   struct ClientList *pos;
1534   struct ClientList *prev;
1535   struct ClientRequestList *rcl;
1536   struct ClientResponseMessage *creply;
1537
1538   if (client == NULL)
1539     return;
1540   prev = NULL;
1541   pos = client_list;
1542   while ( (NULL != pos) &&
1543           (pos->client != client) )
1544     {
1545       prev = pos;
1546       pos = pos->next;
1547     }
1548   if (pos == NULL)
1549     return; /* no requests pending for this client */
1550   while (NULL != (rcl = pos->rl_head))
1551     {
1552       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1553                   "Destroying pending request `%s' on disconnect\n",
1554                   GNUNET_h2s (&rcl->req->query));
1555       destroy_pending_request (rcl->req);
1556     }
1557   if (prev == NULL)
1558     client_list = pos->next;
1559   else
1560     prev->next = pos->next;
1561   if (pos->th != NULL)
1562     {
1563       GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
1564       pos->th = NULL;
1565     }
1566   while (NULL != (creply = pos->res_head))
1567     {
1568       GNUNET_CONTAINER_DLL_remove (pos->res_head,
1569                                    pos->res_tail,
1570                                    creply);
1571       GNUNET_free (creply);
1572     }    
1573   GNUNET_SERVER_client_drop (pos->client);
1574   GNUNET_free (pos);
1575   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1576                                          &remove_client_from_last_client_replies,
1577                                          client);
1578 }
1579
1580
1581 /**
1582  * Iterator to free peer entries.
1583  *
1584  * @param cls closure, unused
1585  * @param key current key code
1586  * @param value value in the hash map (peer entry)
1587  * @return GNUNET_YES (we should continue to iterate)
1588  */
1589 static int 
1590 clean_peer (void *cls,
1591             const GNUNET_HashCode * key,
1592             void *value)
1593 {
1594   peer_disconnect_handler (NULL, (const struct GNUNET_PeerIdentity*) key);
1595   return GNUNET_YES;
1596 }
1597
1598
1599 /**
1600  * Task run during shutdown.
1601  *
1602  * @param cls unused
1603  * @param tc unused
1604  */
1605 static void
1606 shutdown_task (void *cls,
1607                const struct GNUNET_SCHEDULER_TaskContext *tc)
1608 {
1609   if (mig_qe != NULL)
1610     {
1611       GNUNET_DATASTORE_cancel (mig_qe);
1612       mig_qe = NULL;
1613     }
1614   if (GNUNET_SCHEDULER_NO_TASK != mig_task)
1615     {
1616       GNUNET_SCHEDULER_cancel (sched, mig_task);
1617       mig_task = GNUNET_SCHEDULER_NO_TASK;
1618     }
1619   while (client_list != NULL)
1620     handle_client_disconnect (NULL,
1621                               client_list->client);
1622   cron_flush_trust (NULL, NULL);
1623   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1624                                          &clean_peer,
1625                                          NULL);
1626   GNUNET_break (0 == GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap));
1627   GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
1628   requests_by_expiration_heap = 0;
1629   GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
1630   connected_peers = NULL;
1631   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map));
1632   GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
1633   query_request_map = NULL;
1634   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map));
1635   GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
1636   peer_request_map = NULL;
1637   GNUNET_assert (NULL != core);
1638   GNUNET_CORE_disconnect (core);
1639   core = NULL;
1640   if (stats != NULL)
1641     {
1642       GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
1643       stats = NULL;
1644     }
1645   if (dsh != NULL)
1646     {
1647       GNUNET_DATASTORE_disconnect (dsh,
1648                                    GNUNET_NO);
1649       dsh = NULL;
1650     }
1651   while (mig_head != NULL)
1652     delete_migration_block (mig_head);
1653   GNUNET_assert (0 == mig_size);
1654   GNUNET_DHT_disconnect (dht_handle);
1655   dht_handle = NULL;
1656   GNUNET_LOAD_value_free (datastore_get_load);
1657   datastore_get_load = NULL;
1658   GNUNET_LOAD_value_free (datastore_put_load);
1659   datastore_put_load = NULL;
1660   GNUNET_BLOCK_context_destroy (block_ctx);
1661   block_ctx = NULL;
1662   GNUNET_CONFIGURATION_destroy (block_cfg);
1663   block_cfg = NULL;
1664   sched = NULL;
1665   cfg = NULL;  
1666   GNUNET_free_non_null (trustDirectory);
1667   trustDirectory = NULL;
1668 }
1669
1670
1671 /* ******************* Utility functions  ******************** */
1672
1673
1674 /**
1675  * Transmit messages by copying it to the target buffer
1676  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
1677  * for writing in the meantime.  In that case, do nothing
1678  * (the disconnect or shutdown handler will take care of the rest).
1679  * If we were able to transmit messages and there are still more
1680  * pending, ask core again for further calls to this function.
1681  *
1682  * @param cls closure, pointer to the 'struct ConnectedPeer*'
1683  * @param size number of bytes available in buf
1684  * @param buf where the callee should write the message
1685  * @return number of bytes written to buf
1686  */
1687 static size_t
1688 transmit_to_peer (void *cls,
1689                   size_t size, void *buf)
1690 {
1691   struct ConnectedPeer *cp = cls;
1692   char *cbuf = buf;
1693   struct GNUNET_PeerIdentity pid;
1694   struct PendingMessage *pm;
1695   struct MigrationReadyBlock *mb;
1696   struct MigrationReadyBlock *next;
1697   struct PutMessage migm;
1698   size_t msize;
1699   unsigned int i;
1700  
1701   cp->cth = NULL;
1702   if (NULL == buf)
1703     {
1704 #if DEBUG_FS
1705       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1706                   "Dropping message, core too busy.\n");
1707 #endif
1708       return 0;
1709     }
1710   msize = 0;
1711   while ( (NULL != (pm = cp->pending_messages_head) ) &&
1712           (pm->msize <= size) )
1713     {
1714       memcpy (&cbuf[msize], &pm[1], pm->msize);
1715       msize += pm->msize;
1716       size -= pm->msize;
1717       destroy_pending_message (pm, cp->pid);
1718     }
1719   if (NULL != pm)
1720     {
1721       GNUNET_PEER_resolve (cp->pid,
1722                            &pid);
1723       cp->cth = GNUNET_CORE_notify_transmit_ready (core,
1724                                                    pm->priority,
1725                                                    GNUNET_CONSTANTS_SERVICE_TIMEOUT,
1726                                                    &pid,
1727                                                    pm->msize,
1728                                                    &transmit_to_peer,
1729                                                    cp);
1730     }
1731   else
1732     {      
1733       next = mig_head;
1734       while (NULL != (mb = next))
1735         {
1736           next = mb->next;
1737           for (i=0;i<MIGRATION_LIST_SIZE;i++)
1738             {
1739               if ( (cp->pid == mb->target_list[i]) &&
1740                    (mb->size + sizeof (migm) <= size) )
1741                 {
1742                   GNUNET_PEER_change_rc (mb->target_list[i], -1);
1743                   mb->target_list[i] = 0;
1744                   mb->used_targets++;
1745                   memset (&migm, 0, sizeof (migm));
1746                   migm.header.size = htons (sizeof (migm) + mb->size);
1747                   migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
1748                   migm.type = htonl (mb->type);
1749                   migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
1750                   memcpy (&cbuf[msize], &migm, sizeof (migm));
1751                   msize += sizeof (migm);
1752                   size -= sizeof (migm);
1753                   memcpy (&cbuf[msize], &mb[1], mb->size);
1754                   msize += mb->size;
1755                   size -= mb->size;
1756 #if DEBUG_FS
1757                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1758                               "Pushing migration block `%s' (%u bytes) to `%s'\n",
1759                               GNUNET_h2s (&mb->query),
1760                               mb->size,
1761                               GNUNET_i2s (&pid));
1762 #endif    
1763                   break;
1764                 }
1765               else
1766                 {
1767 #if DEBUG_FS
1768                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1769                               "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
1770                               GNUNET_h2s (&mb->query),
1771                               mb->size,
1772                               GNUNET_i2s (&pid));
1773 #endif    
1774                 }
1775             }
1776           if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
1777                (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
1778             {
1779               delete_migration_block (mb);
1780               consider_migration_gathering ();
1781             }
1782         }
1783       consider_migration (NULL, 
1784                           &pid.hashPubKey,
1785                           cp);
1786     }
1787 #if DEBUG_FS
1788   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1789               "Transmitting %u bytes to peer %u\n",
1790               msize,
1791               cp->pid);
1792 #endif
1793   return msize;
1794 }
1795
1796
1797 /**
1798  * Add a message to the set of pending messages for the given peer.
1799  *
1800  * @param cp peer to send message to
1801  * @param pm message to queue
1802  * @param pr request on which behalf this message is being queued
1803  */
1804 static void
1805 add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
1806                                   struct PendingMessage *pm,
1807                                   struct PendingRequest *pr)
1808 {
1809   struct PendingMessage *pos;
1810   struct PendingMessageList *pml;
1811   struct GNUNET_PeerIdentity pid;
1812
1813   GNUNET_assert (pm->next == NULL);
1814   GNUNET_assert (pm->pml == NULL);    
1815   if (pr != NULL)
1816     {
1817       pml = GNUNET_malloc (sizeof (struct PendingMessageList));
1818       pml->req = pr;
1819       pml->target = cp;
1820       pml->pm = pm;
1821       pm->pml = pml;  
1822       GNUNET_CONTAINER_DLL_insert (pr->pending_head,
1823                                    pr->pending_tail,
1824                                    pml);
1825     }
1826   pos = cp->pending_messages_head;
1827   while ( (pos != NULL) &&
1828           (pm->priority < pos->priority) )
1829     pos = pos->next;    
1830   GNUNET_CONTAINER_DLL_insert_after (cp->pending_messages_head,
1831                                      cp->pending_messages_tail,
1832                                      pos,
1833                                      pm);
1834   cp->pending_requests++;
1835   if (cp->pending_requests > MAX_QUEUE_PER_PEER)
1836     destroy_pending_message (cp->pending_messages_tail, 0);  
1837   GNUNET_PEER_resolve (cp->pid, &pid);
1838   if (NULL != cp->cth)
1839     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1840   /* need to schedule transmission */
1841   cp->cth = GNUNET_CORE_notify_transmit_ready (core,
1842                                                cp->pending_messages_head->priority,
1843                                                MAX_TRANSMIT_DELAY,
1844                                                &pid,
1845                                                cp->pending_messages_head->msize,
1846                                                &transmit_to_peer,
1847                                                cp);
1848   if (cp->cth == NULL)
1849     {
1850 #if DEBUG_FS
1851       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1852                   "Failed to schedule transmission with core!\n");
1853 #endif
1854       GNUNET_STATISTICS_update (stats,
1855                                 gettext_noop ("# CORE transmission failures"),
1856                                 1,
1857                                 GNUNET_NO);
1858     }
1859 }
1860
1861
1862 /**
1863  * Test if the load on this peer is too high
1864  * to even consider processing the query at
1865  * all.
1866  * 
1867  * @return GNUNET_YES if the load is too high to do anything, GNUNET_NO to forward (load high, but not too high), GNUNET_SYSERR to indirect (load low)
1868  */
1869 static int
1870 test_load_too_high ()
1871 {
1872   return GNUNET_SYSERR; // FIXME
1873 }
1874
1875
1876 /* ******************* Pending Request Refresh Task ******************** */
1877
1878
1879
1880 /**
1881  * We use a random delay to make the timing of requests less
1882  * predictable.  This function returns such a random delay.  We add a base
1883  * delay of MAX_CORK_DELAY (1s).
1884  *
1885  * FIXME: make schedule dependent on the specifics of the request?
1886  * Or bandwidth and number of connected peers and load?
1887  *
1888  * @return random delay to use for some request, between 1s and 1000+TTL_DECREMENT ms
1889  */
1890 static struct GNUNET_TIME_Relative
1891 get_processing_delay ()
1892 {
1893   return 
1894     GNUNET_TIME_relative_add (GNUNET_CONSTANTS_MAX_CORK_DELAY,
1895                               GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
1896                                                              GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1897                                                                                        TTL_DECREMENT)));
1898 }
1899
1900
1901 /**
1902  * We're processing a GET request from another peer and have decided
1903  * to forward it to other peers.  This function is called periodically
1904  * and should forward the request to other peers until we have all
1905  * possible replies.  If we have transmitted the *only* reply to
1906  * the initiator we should destroy the pending request.  If we have
1907  * many replies in the queue to the initiator, we should delay sending
1908  * out more queries until the reply queue has shrunk some.
1909  *
1910  * @param cls our "struct ProcessGetContext *"
1911  * @param tc unused
1912  */
1913 static void
1914 forward_request_task (void *cls,
1915                       const struct GNUNET_SCHEDULER_TaskContext *tc);
1916
1917
1918 /**
1919  * Function called after we either failed or succeeded
1920  * at transmitting a query to a peer.  
1921  *
1922  * @param cls the requests "struct PendingRequest*"
1923  * @param tpid ID of receiving peer, 0 on transmission error
1924  */
1925 static void
1926 transmit_query_continuation (void *cls,
1927                              GNUNET_PEER_Id tpid)
1928 {
1929   struct PendingRequest *pr = cls;
1930
1931   GNUNET_STATISTICS_update (stats,
1932                             gettext_noop ("# queries scheduled for forwarding"),
1933                             -1,
1934                             GNUNET_NO);
1935   if (tpid == 0)   
1936     {
1937 #if DEBUG_FS
1938       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1939                   "Transmission of request failed, will try again later.\n");
1940 #endif
1941       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
1942         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1943                                                  get_processing_delay (),
1944                                                  &forward_request_task,
1945                                                  pr); 
1946       return;    
1947     }
1948   GNUNET_STATISTICS_update (stats,
1949                             gettext_noop ("# queries forwarded"),
1950                             1,
1951                             GNUNET_NO);
1952   GNUNET_PEER_change_rc (tpid, 1);
1953   if (pr->used_pids_off == pr->used_pids_size)
1954     GNUNET_array_grow (pr->used_pids,
1955                        pr->used_pids_size,
1956                        pr->used_pids_size * 2 + 2);
1957   pr->used_pids[pr->used_pids_off++] = tpid;
1958   if (pr->task == GNUNET_SCHEDULER_NO_TASK)
1959     pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1960                                              get_processing_delay (),
1961                                              &forward_request_task,
1962                                              pr);
1963 }
1964
1965
1966 /**
1967  * How many bytes should a bloomfilter be if we have already seen
1968  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
1969  * of bits set per entry.  Furthermore, we should not re-size the
1970  * filter too often (to keep it cheap).
1971  *
1972  * Since other peers will also add entries but not resize the filter,
1973  * we should generally pick a slightly larger size than what the
1974  * strict math would suggest.
1975  *
1976  * @return must be a power of two and smaller or equal to 2^15.
1977  */
1978 static size_t
1979 compute_bloomfilter_size (unsigned int entry_count)
1980 {
1981   size_t size;
1982   unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
1983   uint16_t max = 1 << 15;
1984
1985   if (entry_count > max)
1986     return max;
1987   size = 8;
1988   while ((size < max) && (size < ideal))
1989     size *= 2;
1990   if (size > max)
1991     return max;
1992   return size;
1993 }
1994
1995
1996 /**
1997  * Recalculate our bloom filter for filtering replies.  This function
1998  * will create a new bloom filter from scratch, so it should only be
1999  * called if we have no bloomfilter at all (and hence can create a
2000  * fresh one of minimal size without problems) OR if our peer is the
2001  * initiator (in which case we may resize to larger than mimimum size).
2002  *
2003  * @param pr request for which the BF is to be recomputed
2004  */
2005 static void
2006 refresh_bloomfilter (struct PendingRequest *pr)
2007 {
2008   unsigned int i;
2009   size_t nsize;
2010   GNUNET_HashCode mhash;
2011
2012   nsize = compute_bloomfilter_size (pr->replies_seen_off);
2013   if (nsize == pr->bf_size)
2014     return; /* size not changed */
2015   if (pr->bf != NULL)
2016     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
2017   pr->bf_size = nsize;
2018   pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
2019   pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
2020                                               pr->bf_size,
2021                                               BLOOMFILTER_K);
2022   for (i=0;i<pr->replies_seen_off;i++)
2023     {
2024       GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i],
2025                                 pr->mingle,
2026                                 &mhash);
2027       GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
2028     }
2029 }
2030
2031
2032 /**
2033  * Function called after we've tried to reserve a certain amount of
2034  * bandwidth for a reply.  Check if we succeeded and if so send our
2035  * query.
2036  *
2037  * @param cls the requests "struct PendingRequest*"
2038  * @param peer identifies the peer
2039  * @param bpm_in set to the current bandwidth limit (receiving) for this peer
2040  * @param bpm_out set to the current bandwidth limit (sending) for this peer
2041  * @param amount set to the amount that was actually reserved or unreserved
2042  * @param preference current traffic preference for the given peer
2043  */
2044 static void
2045 target_reservation_cb (void *cls,
2046                        const struct
2047                        GNUNET_PeerIdentity * peer,
2048                        struct GNUNET_BANDWIDTH_Value32NBO bpm_in,
2049                        struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
2050                        int amount,
2051                        uint64_t preference)
2052 {
2053   struct PendingRequest *pr = cls;
2054   struct ConnectedPeer *cp;
2055   struct PendingMessage *pm;
2056   struct GetMessage *gm;
2057   GNUNET_HashCode *ext;
2058   char *bfdata;
2059   size_t msize;
2060   unsigned int k;
2061   int no_route;
2062   uint32_t bm;
2063
2064   pr->irc = NULL;
2065   if (peer == NULL)
2066     {
2067       /* error in communication with core, try again later */
2068       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2069         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2070                                                  get_processing_delay (),
2071                                                  &forward_request_task,
2072                                                  pr);
2073       return;
2074     }
2075   // (3) transmit, update ttl/priority
2076   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2077                                           &peer->hashPubKey);
2078   if (cp == NULL)
2079     {
2080       /* Peer must have just left */
2081 #if DEBUG_FS
2082       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2083                   "Selected peer disconnected!\n");
2084 #endif
2085       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2086         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2087                                                  get_processing_delay (),
2088                                                  &forward_request_task,
2089                                                  pr);
2090       return;
2091     }
2092   no_route = GNUNET_NO;
2093   if (amount == 0)
2094     {
2095       if (pr->cp == NULL)
2096         {
2097 #if DEBUG_FS > 1
2098           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2099                       "Failed to reserve bandwidth for reply (got %d/%u bytes only)!\n",
2100                       amount,
2101                       DBLOCK_SIZE);
2102 #endif
2103           GNUNET_STATISTICS_update (stats,
2104                                     gettext_noop ("# reply bandwidth reservation requests failed"),
2105                                     1,
2106                                     GNUNET_NO);
2107           if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2108             pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2109                                                      get_processing_delay (),
2110                                                      &forward_request_task,
2111                                                      pr);
2112           return;  /* this target round failed */
2113         }
2114       /* FIXME: if we are "quite" busy, we may still want to skip
2115          this round; need more load detection code! */
2116       no_route = GNUNET_YES;
2117     }
2118   
2119   GNUNET_STATISTICS_update (stats,
2120                             gettext_noop ("# queries scheduled for forwarding"),
2121                             1,
2122                             GNUNET_NO);
2123   /* build message and insert message into priority queue */
2124 #if DEBUG_FS
2125   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2126               "Forwarding request `%s' to `%4s'!\n",
2127               GNUNET_h2s (&pr->query),
2128               GNUNET_i2s (peer));
2129 #endif
2130   k = 0;
2131   bm = 0;
2132   if (GNUNET_YES == no_route)
2133     {
2134       bm |= GET_MESSAGE_BIT_RETURN_TO;
2135       k++;      
2136     }
2137   if (pr->namespace != NULL)
2138     {
2139       bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
2140       k++;
2141     }
2142   if (pr->target_pid != 0)
2143     {
2144       bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
2145       k++;
2146     }
2147   msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
2148   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
2149   pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
2150   pm->msize = msize;
2151   gm = (struct GetMessage*) &pm[1];
2152   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
2153   gm->header.size = htons (msize);
2154   gm->type = htonl (pr->type);
2155   pr->remaining_priority /= 2;
2156   gm->priority = htonl (pr->remaining_priority);
2157   gm->ttl = htonl (pr->ttl);
2158   gm->filter_mutator = htonl(pr->mingle); 
2159   gm->hash_bitmap = htonl (bm);
2160   gm->query = pr->query;
2161   ext = (GNUNET_HashCode*) &gm[1];
2162   k = 0;
2163   if (GNUNET_YES == no_route)
2164     GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2165   if (pr->namespace != NULL)
2166     memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
2167   if (pr->target_pid != 0)
2168     GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2169   bfdata = (char *) &ext[k];
2170   if (pr->bf != NULL)
2171     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
2172                                                bfdata,
2173                                                pr->bf_size);
2174   pm->cont = &transmit_query_continuation;
2175   pm->cont_cls = pr;
2176   add_to_pending_messages_for_peer (cp, pm, pr);
2177 }
2178
2179
2180 /**
2181  * Closure used for "target_peer_select_cb".
2182  */
2183 struct PeerSelectionContext 
2184 {
2185   /**
2186    * The request for which we are selecting
2187    * peers.
2188    */
2189   struct PendingRequest *pr;
2190
2191   /**
2192    * Current "prime" target.
2193    */
2194   struct GNUNET_PeerIdentity target;
2195
2196   /**
2197    * How much do we like this target?
2198    */
2199   double target_score;
2200
2201 };
2202
2203
2204 /**
2205  * Function called for each connected peer to determine
2206  * which one(s) would make good targets for forwarding.
2207  *
2208  * @param cls closure (struct PeerSelectionContext)
2209  * @param key current key code (peer identity)
2210  * @param value value in the hash map (struct ConnectedPeer)
2211  * @return GNUNET_YES if we should continue to
2212  *         iterate,
2213  *         GNUNET_NO if not.
2214  */
2215 static int
2216 target_peer_select_cb (void *cls,
2217                        const GNUNET_HashCode * key,
2218                        void *value)
2219 {
2220   struct PeerSelectionContext *psc = cls;
2221   struct ConnectedPeer *cp = value;
2222   struct PendingRequest *pr = psc->pr;
2223   double score;
2224   unsigned int i;
2225   unsigned int pc;
2226
2227   /* 1) check that this peer is not the initiator */
2228   if (cp == pr->cp)
2229     {
2230 #if DEBUG_FS
2231       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2232                   "Skipping initiator in forwarding selection\n");
2233 #endif
2234       return GNUNET_YES; /* skip */        
2235     }
2236
2237   /* 2) check if we have already (recently) forwarded to this peer */
2238   pc = 0;
2239   for (i=0;i<pr->used_pids_off;i++)
2240     if (pr->used_pids[i] == cp->pid) 
2241       {
2242         pc++;
2243         if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2244                                            RETRY_PROBABILITY_INV))
2245           {
2246 #if DEBUG_FS
2247             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2248                         "NOT re-trying query that was previously transmitted %u times\n",
2249                         (unsigned int) pr->used_pids_off);
2250 #endif
2251             return GNUNET_YES; /* skip */
2252           }
2253       }
2254 #if DEBUG_FS
2255   if (0 < pc)
2256     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2257                 "Re-trying query that was previously transmitted %u times to this peer\n",
2258                 (unsigned int) pc);
2259 #endif
2260   /* 3) calculate how much we'd like to forward to this peer,
2261      starting with a random value that is strong enough
2262      to at least give any peer a chance sometimes 
2263      (compared to the other factors that come later) */
2264   /* 3a) count successful (recent) routes from cp for same source */
2265   if (pr->cp != NULL)
2266     {
2267       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2268                                         P2P_SUCCESS_LIST_SIZE);
2269       for (i=0;i<P2P_SUCCESS_LIST_SIZE;i++)
2270         if (cp->last_p2p_replies[i] == pr->cp->pid)
2271           score += 1; /* likely successful based on hot path */
2272     }
2273   else
2274     {
2275       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2276                                         CS2P_SUCCESS_LIST_SIZE);
2277       for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
2278         if (cp->last_client_replies[i] == pr->client_request_list->client_list->client)
2279           score += 1; /* likely successful based on hot path */
2280     }
2281   /* 3b) include latency */
2282   if (cp->avg_delay.value < 4 * TTL_DECREMENT)
2283     score += 1; /* likely fast based on latency */
2284   /* 3c) include priorities */
2285   if (cp->avg_priority <= pr->remaining_priority / 2.0)
2286     score += 1; /* likely successful based on priorities */
2287   /* 3d) penalize for queue size */  
2288   score -= (2.0 * cp->pending_requests / (double) MAX_QUEUE_PER_PEER); 
2289   /* 3e) include peer proximity */
2290   score -= (2.0 * (GNUNET_CRYPTO_hash_distance_u32 (key,
2291                                                     &pr->query)) / (double) UINT32_MAX);
2292   /* 4) super-bonus for being the known target */
2293   if (pr->target_pid == cp->pid)
2294     score += 100.0;
2295   /* store best-fit in closure */
2296 #if DEBUG_FS
2297   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2298               "Peer `%s' gets score %f for forwarding query, max is %f\n",
2299               GNUNET_h2s (key),
2300               score,
2301               psc->target_score);
2302 #endif  
2303   score++; /* avoid zero */
2304   if (score > psc->target_score)
2305     {
2306       psc->target_score = score;
2307       psc->target.hashPubKey = *key; 
2308     }
2309   return GNUNET_YES;
2310 }
2311   
2312
2313 /**
2314  * The priority level imposes a bound on the maximum
2315  * value for the ttl that can be requested.
2316  *
2317  * @param ttl_in requested ttl
2318  * @param prio given priority
2319  * @return ttl_in if ttl_in is below the limit,
2320  *         otherwise the ttl-limit for the given priority
2321  */
2322 static int32_t
2323 bound_ttl (int32_t ttl_in, uint32_t prio)
2324 {
2325   unsigned long long allowed;
2326
2327   if (ttl_in <= 0)
2328     return ttl_in;
2329   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
2330   if (ttl_in > allowed)      
2331     {
2332       if (allowed >= (1 << 30))
2333         return 1 << 30;
2334       return allowed;
2335     }
2336   return ttl_in;
2337 }
2338
2339
2340 /**
2341  * We're processing a GET request and have decided
2342  * to forward it to other peers.  This function is called periodically
2343  * and should forward the request to other peers until we have all
2344  * possible replies.  If we have transmitted the *only* reply to
2345  * the initiator we should destroy the pending request.  If we have
2346  * many replies in the queue to the initiator, we should delay sending
2347  * out more queries until the reply queue has shrunk some.
2348  *
2349  * @param cls our "struct ProcessGetContext *"
2350  * @param tc unused
2351  */
2352 static void
2353 forward_request_task (void *cls,
2354                      const struct GNUNET_SCHEDULER_TaskContext *tc)
2355 {
2356   struct PendingRequest *pr = cls;
2357   struct PeerSelectionContext psc;
2358   struct ConnectedPeer *cp; 
2359   struct GNUNET_TIME_Relative delay;
2360
2361   pr->task = GNUNET_SCHEDULER_NO_TASK;
2362   if (pr->irc != NULL)
2363     {
2364 #if DEBUG_FS
2365       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2366                   "Forwarding of query `%s' not attempted due to pending local lookup!\n",
2367                   GNUNET_h2s (&pr->query));
2368 #endif
2369       return; /* already pending */
2370     }
2371   if (GNUNET_YES == pr->local_only)
2372     return; /* configured to not do P2P search */
2373   /* (0) try DHT */
2374   if (0 == pr->anonymity_level)
2375     {
2376 #if 0      
2377       /* DHT API needs fixing... */
2378       pr->dht_get = GNUNET_DHT_get_start (dht_handle,
2379                                           GNUNET_TIME_UNIT_FOREVER_REL,
2380                                           pr->type,
2381                                           &pr->query,
2382                                           &process_dht_reply,
2383                                           pr,
2384                                           FIXME,
2385                                           FIXME);
2386 #endif                                    
2387     }
2388   /* (1) select target */
2389   psc.pr = pr;
2390   psc.target_score = -DBL_MAX;
2391   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
2392                                          &target_peer_select_cb,
2393                                          &psc);  
2394   if (psc.target_score == -DBL_MAX)
2395     {
2396       delay = get_processing_delay ();
2397 #if DEBUG_FS 
2398       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2399                   "No peer selected for forwarding of query `%s', will try again in %llu ms!\n",
2400                   GNUNET_h2s (&pr->query),
2401                   delay.value);
2402 #endif
2403       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2404                                                delay,
2405                                                &forward_request_task,
2406                                                pr);
2407       return; /* nobody selected */
2408     }
2409   /* (3) update TTL/priority */
2410   if (pr->client_request_list != NULL)
2411     {
2412       /* FIXME: use better algorithm!? */
2413       if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2414                                          4))
2415         pr->priority++;
2416       /* bound priority we use by priorities we see from other peers
2417          rounded up (must round up so that we can see non-zero
2418          priorities, but round up as little as possible to make it
2419          plausible that we forwarded another peers request) */
2420       if (pr->priority > current_priorities + 1.0)
2421         pr->priority = (uint32_t) current_priorities + 1.0;
2422       pr->ttl = bound_ttl (pr->ttl + TTL_DECREMENT * 2,
2423                            pr->priority);
2424 #if DEBUG_FS
2425       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2426                   "Trying query `%s' with priority %u and TTL %d.\n",
2427                   GNUNET_h2s (&pr->query),
2428                   pr->priority,
2429                   pr->ttl);
2430 #endif
2431     }
2432
2433   /* (3) reserve reply bandwidth */
2434   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2435                                           &psc.target.hashPubKey);
2436   GNUNET_assert (NULL != cp);
2437   pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
2438                                                 &psc.target,
2439                                                 GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
2440                                                 GNUNET_BANDWIDTH_value_init (UINT32_MAX),
2441                                                 DBLOCK_SIZE * 2, 
2442                                                 cp->inc_preference,
2443                                                 &target_reservation_cb,
2444                                                 pr);
2445   cp->inc_preference = 0;
2446 }
2447
2448
2449 /* **************************** P2P PUT Handling ************************ */
2450
2451
2452 /**
2453  * Function called after we either failed or succeeded
2454  * at transmitting a reply to a peer.  
2455  *
2456  * @param cls the requests "struct PendingRequest*"
2457  * @param tpid ID of receiving peer, 0 on transmission error
2458  */
2459 static void
2460 transmit_reply_continuation (void *cls,
2461                              GNUNET_PEER_Id tpid)
2462 {
2463   struct PendingRequest *pr = cls;
2464   
2465   switch (pr->type)
2466     {
2467     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
2468     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
2469       /* only one reply expected, done with the request! */
2470       destroy_pending_request (pr);
2471       break;
2472     case GNUNET_BLOCK_TYPE_ANY:
2473     case GNUNET_BLOCK_TYPE_FS_KBLOCK:
2474     case GNUNET_BLOCK_TYPE_FS_SBLOCK:
2475       break;
2476     default:
2477       GNUNET_break (0);
2478       break;
2479     }
2480 }
2481
2482
2483 /**
2484  * Transmit the given message by copying it to the target buffer
2485  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
2486  * for writing in the meantime.  In that case, do nothing
2487  * (the disconnect or shutdown handler will take care of the rest).
2488  * If we were able to transmit messages and there are still more
2489  * pending, ask core again for further calls to this function.
2490  *
2491  * @param cls closure, pointer to the 'struct ClientList*'
2492  * @param size number of bytes available in buf
2493  * @param buf where the callee should write the message
2494  * @return number of bytes written to buf
2495  */
2496 static size_t
2497 transmit_to_client (void *cls,
2498                   size_t size, void *buf)
2499 {
2500   struct ClientList *cl = cls;
2501   char *cbuf = buf;
2502   struct ClientResponseMessage *creply;
2503   size_t msize;
2504   
2505   cl->th = NULL;
2506   if (NULL == buf)
2507     {
2508 #if DEBUG_FS
2509       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2510                   "Not sending reply, client communication problem.\n");
2511 #endif
2512       return 0;
2513     }
2514   msize = 0;
2515   while ( (NULL != (creply = cl->res_head) ) &&
2516           (creply->msize <= size) )
2517     {
2518       memcpy (&cbuf[msize], &creply[1], creply->msize);
2519       msize += creply->msize;
2520       size -= creply->msize;
2521       GNUNET_CONTAINER_DLL_remove (cl->res_head,
2522                                    cl->res_tail,
2523                                    creply);
2524       GNUNET_free (creply);
2525     }
2526   if (NULL != creply)
2527     cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
2528                                                   creply->msize,
2529                                                   GNUNET_TIME_UNIT_FOREVER_REL,
2530                                                   &transmit_to_client,
2531                                                   cl);
2532 #if DEBUG_FS
2533   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2534               "Transmitted %u bytes to client\n",
2535               (unsigned int) msize);
2536 #endif
2537   return msize;
2538 }
2539
2540
2541 /**
2542  * Closure for "process_reply" function.
2543  */
2544 struct ProcessReplyClosure
2545 {
2546   /**
2547    * The data for the reply.
2548    */
2549   const void *data;
2550
2551   /**
2552    * Who gave us this reply? NULL for local host.
2553    */
2554   struct ConnectedPeer *sender;
2555
2556   /**
2557    * When the reply expires.
2558    */
2559   struct GNUNET_TIME_Absolute expiration;
2560
2561   /**
2562    * Size of data.
2563    */
2564   size_t size;
2565
2566   /**
2567    * Type of the block.
2568    */
2569   enum GNUNET_BLOCK_Type type;
2570
2571   /**
2572    * How much was this reply worth to us?
2573    */
2574   uint32_t priority;
2575
2576   /**
2577    * Evaluation result (returned).
2578    */
2579   enum GNUNET_BLOCK_EvaluationResult eval;
2580
2581   /**
2582    * Did we finish processing the associated request?
2583    */ 
2584   int finished;
2585
2586   /**
2587    * Did we find a matching request?
2588    */
2589   int request_found;
2590 };
2591
2592
2593 /**
2594  * We have received a reply; handle it!
2595  *
2596  * @param cls response (struct ProcessReplyClosure)
2597  * @param key our query
2598  * @param value value in the hash map (info about the query)
2599  * @return GNUNET_YES (we should continue to iterate)
2600  */
2601 static int
2602 process_reply (void *cls,
2603                const GNUNET_HashCode * key,
2604                void *value)
2605 {
2606   struct ProcessReplyClosure *prq = cls;
2607   struct PendingRequest *pr = value;
2608   struct PendingMessage *reply;
2609   struct ClientResponseMessage *creply;
2610   struct ClientList *cl;
2611   struct PutMessage *pm;
2612   struct ConnectedPeer *cp;
2613   struct GNUNET_TIME_Relative cur_delay;
2614   size_t msize;
2615
2616 #if DEBUG_FS
2617   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2618               "Matched result (type %u) for query `%s' with pending request\n",
2619               (unsigned int) prq->type,
2620               GNUNET_h2s (key));
2621 #endif  
2622   GNUNET_STATISTICS_update (stats,
2623                             gettext_noop ("# replies received and matched"),
2624                             1,
2625                             GNUNET_NO);
2626   if (prq->sender != NULL)
2627     {
2628       /* FIXME: should we be more precise here and not use
2629          "start_time" but a peer-specific time stamp? */
2630       cur_delay = GNUNET_TIME_absolute_get_duration (pr->start_time);
2631       prq->sender->avg_delay.value
2632         = (prq->sender->avg_delay.value * 
2633            (RUNAVG_DELAY_N - 1) + cur_delay.value) / RUNAVG_DELAY_N; 
2634       prq->sender->avg_priority
2635         = (prq->sender->avg_priority * 
2636            (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
2637       if (pr->cp != NULL)
2638         {
2639           GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
2640                                  [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], 
2641                                  -1);
2642           GNUNET_PEER_change_rc (pr->cp->pid, 1);
2643           prq->sender->last_p2p_replies
2644             [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
2645             = pr->cp->pid;
2646         }
2647       else
2648         {
2649           if (NULL != prq->sender->last_client_replies
2650               [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
2651             GNUNET_SERVER_client_drop (prq->sender->last_client_replies
2652                                        [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
2653           prq->sender->last_client_replies
2654             [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
2655             = pr->client_request_list->client_list->client;
2656           GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
2657         }
2658     }
2659   prq->eval = GNUNET_BLOCK_evaluate (block_ctx,
2660                                      prq->type,
2661                                      key,
2662                                      &pr->bf,
2663                                      pr->mingle,
2664                                      pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
2665                                      prq->data,
2666                                      prq->size);
2667   switch (prq->eval)
2668     {
2669     case GNUNET_BLOCK_EVALUATION_OK_MORE:
2670       break;
2671     case GNUNET_BLOCK_EVALUATION_OK_LAST:
2672       while (NULL != pr->pending_head)
2673         destroy_pending_message_list_entry (pr->pending_head);
2674       if (pr->qe != NULL)
2675         {
2676           if (pr->client_request_list != NULL)
2677             GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
2678                                         GNUNET_YES);
2679           GNUNET_DATASTORE_cancel (pr->qe);
2680           pr->qe = NULL;
2681         }
2682       pr->do_remove = GNUNET_YES;
2683       if (pr->task != GNUNET_SCHEDULER_NO_TASK)
2684         {
2685           GNUNET_SCHEDULER_cancel (sched,
2686                                    pr->task);
2687           pr->task = GNUNET_SCHEDULER_NO_TASK;
2688         }
2689       GNUNET_break (GNUNET_YES ==
2690                     GNUNET_CONTAINER_multihashmap_remove (query_request_map,
2691                                                           key,
2692                                                           pr));
2693       break;
2694     case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
2695       GNUNET_STATISTICS_update (stats,
2696                                 gettext_noop ("# duplicate replies discarded (bloomfilter)"),
2697                                 1,
2698                                 GNUNET_NO);
2699 #if DEBUG_FS
2700       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2701                   "Duplicate response `%s', discarding.\n",
2702                   GNUNET_h2s (&mhash));
2703 #endif
2704       return GNUNET_YES; /* duplicate */
2705     case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
2706       return GNUNET_YES; /* wrong namespace */  
2707     case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
2708       GNUNET_break (0);
2709       return GNUNET_YES;
2710     case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
2711       GNUNET_break (0);
2712       return GNUNET_YES;
2713     case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
2714       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2715                   _("Unsupported block type %u\n"),
2716                   prq->type);
2717       return GNUNET_NO;
2718     }
2719   if (pr->client_request_list != NULL)
2720     {
2721       if (pr->replies_seen_size == pr->replies_seen_off)
2722         GNUNET_array_grow (pr->replies_seen,
2723                            pr->replies_seen_size,
2724                            pr->replies_seen_size * 2 + 4);      
2725       GNUNET_CRYPTO_hash (prq->data,
2726                           prq->size,
2727                           &pr->replies_seen[pr->replies_seen_off++]);         
2728       refresh_bloomfilter (pr);
2729     }
2730   if (NULL == prq->sender)
2731     {
2732 #if DEBUG_FS
2733       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2734                   "Found result for query `%s' in local datastore\n",
2735                   GNUNET_h2s (key));
2736 #endif
2737       GNUNET_STATISTICS_update (stats,
2738                                 gettext_noop ("# results found locally"),
2739                                 1,
2740                                 GNUNET_NO);      
2741     }
2742   prq->priority += pr->remaining_priority;
2743   pr->remaining_priority = 0;
2744   pr->results_found++;
2745   prq->request_found = GNUNET_YES;
2746   if (NULL != pr->client_request_list)
2747     {
2748       GNUNET_STATISTICS_update (stats,
2749                                 gettext_noop ("# replies received for local clients"),
2750                                 1,
2751                                 GNUNET_NO);
2752       cl = pr->client_request_list->client_list;
2753       msize = sizeof (struct PutMessage) + prq->size;
2754       creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage));
2755       creply->msize = msize;
2756       creply->client_list = cl;
2757       GNUNET_CONTAINER_DLL_insert_after (cl->res_head,
2758                                          cl->res_tail,
2759                                          cl->res_tail,
2760                                          creply);      
2761       pm = (struct PutMessage*) &creply[1];
2762       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2763       pm->header.size = htons (msize);
2764       pm->type = htonl (prq->type);
2765       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
2766       memcpy (&pm[1], prq->data, prq->size);      
2767       if (NULL == cl->th)
2768         {
2769 #if DEBUG_FS
2770           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2771                       "Transmitting result for query `%s' to client\n",
2772                       GNUNET_h2s (key));
2773 #endif  
2774           cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
2775                                                         msize,
2776                                                         GNUNET_TIME_UNIT_FOREVER_REL,
2777                                                         &transmit_to_client,
2778                                                         cl);
2779         }
2780       GNUNET_break (cl->th != NULL);
2781       if (pr->do_remove)                
2782         {
2783           prq->finished = GNUNET_YES;
2784           destroy_pending_request (pr);         
2785         }
2786     }
2787   else
2788     {
2789       cp = pr->cp;
2790 #if DEBUG_FS
2791       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2792                   "Transmitting result for query `%s' to other peer (PID=%u)\n",
2793                   GNUNET_h2s (key),
2794                   (unsigned int) cp->pid);
2795 #endif  
2796       GNUNET_STATISTICS_update (stats,
2797                                 gettext_noop ("# replies received for other peers"),
2798                                 1,
2799                                 GNUNET_NO);
2800       msize = sizeof (struct PutMessage) + prq->size;
2801       reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
2802       reply->cont = &transmit_reply_continuation;
2803       reply->cont_cls = pr;
2804       reply->msize = msize;
2805       reply->priority = UINT32_MAX; /* send replies first! */
2806       pm = (struct PutMessage*) &reply[1];
2807       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2808       pm->header.size = htons (msize);
2809       pm->type = htonl (prq->type);
2810       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
2811       memcpy (&pm[1], prq->data, prq->size);
2812       add_to_pending_messages_for_peer (cp, reply, pr);
2813     }
2814   return GNUNET_YES;
2815 }
2816
2817
2818 /**
2819  * Continuation called to notify client about result of the
2820  * operation.
2821  *
2822  * @param cls closure
2823  * @param success GNUNET_SYSERR on failure
2824  * @param msg NULL on success, otherwise an error message
2825  */
2826 static void 
2827 put_migration_continuation (void *cls,
2828                             int success,
2829                             const char *msg)
2830 {
2831   struct GNUNET_TIME_Absolute *start = cls;
2832   struct GNUNET_TIME_Relative delay;
2833   
2834   delay = GNUNET_TIME_absolute_get_duration (*start);
2835   GNUNET_free (start);
2836   GNUNET_LOAD_update (datastore_put_load,
2837                       delay.value);
2838   if (GNUNET_OK == success)
2839     return;
2840   GNUNET_STATISTICS_update (stats,
2841                             gettext_noop ("# datastore 'put' failures"),
2842                             1,
2843                             GNUNET_NO);
2844 }
2845
2846
2847 /**
2848  * Handle P2P "PUT" message.
2849  *
2850  * @param cls closure, always NULL
2851  * @param other the other peer involved (sender or receiver, NULL
2852  *        for loopback messages where we are both sender and receiver)
2853  * @param message the actual message
2854  * @param latency reported latency of the connection with 'other'
2855  * @param distance reported distance (DV) to 'other' 
2856  * @return GNUNET_OK to keep the connection open,
2857  *         GNUNET_SYSERR to close it (signal serious error)
2858  */
2859 static int
2860 handle_p2p_put (void *cls,
2861                 const struct GNUNET_PeerIdentity *other,
2862                 const struct GNUNET_MessageHeader *message,
2863                 struct GNUNET_TIME_Relative latency,
2864                 uint32_t distance)
2865 {
2866   const struct PutMessage *put;
2867   uint16_t msize;
2868   size_t dsize;
2869   enum GNUNET_BLOCK_Type type;
2870   struct GNUNET_TIME_Absolute expiration;
2871   GNUNET_HashCode query;
2872   struct ProcessReplyClosure prq;
2873   struct GNUNET_TIME_Absolute *start;
2874   struct GNUNET_TIME_Relative block_time;  
2875   double putl;
2876   struct ConnectedPeer *cp; 
2877   struct PendingMessage *pm;
2878   struct MigrationStopMessage *msm;
2879
2880   msize = ntohs (message->size);
2881   if (msize < sizeof (struct PutMessage))
2882     {
2883       GNUNET_break_op(0);
2884       return GNUNET_SYSERR;
2885     }
2886   put = (const struct PutMessage*) message;
2887   dsize = msize - sizeof (struct PutMessage);
2888   type = ntohl (put->type);
2889   expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
2890
2891   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
2892     return GNUNET_SYSERR;
2893   if (GNUNET_OK !=
2894       GNUNET_BLOCK_get_key (block_ctx,
2895                             type,
2896                             &put[1],
2897                             dsize,
2898                             &query))
2899     {
2900       GNUNET_break_op (0);
2901       return GNUNET_SYSERR;
2902     }
2903 #if DEBUG_FS
2904   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2905               "Received result for query `%s' from peer `%4s'\n",
2906               GNUNET_h2s (&query),
2907               GNUNET_i2s (other));
2908 #endif
2909   GNUNET_STATISTICS_update (stats,
2910                             gettext_noop ("# replies received (overall)"),
2911                             1,
2912                             GNUNET_NO);
2913   /* now, lookup 'query' */
2914   prq.data = (const void*) &put[1];
2915   if (other != NULL)
2916     prq.sender = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2917                                                     &other->hashPubKey);
2918   else
2919     prq.sender = NULL;
2920   prq.size = dsize;
2921   prq.type = type;
2922   prq.expiration = expiration;
2923   prq.priority = 0;
2924   prq.finished = GNUNET_NO;
2925   prq.request_found = GNUNET_NO;
2926   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
2927                                               &query,
2928                                               &process_reply,
2929                                               &prq);
2930   if (prq.sender != NULL)
2931     {
2932       prq.sender->inc_preference += CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority;
2933       prq.sender->trust += prq.priority;
2934     }
2935   if (GNUNET_YES == active_migration)
2936     {
2937 #if DEBUG_FS
2938       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2939                   "Replicating result for query `%s' with priority %u\n",
2940                   GNUNET_h2s (&query),
2941                   prq.priority);
2942 #endif
2943       start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
2944       *start = GNUNET_TIME_absolute_get ();
2945       GNUNET_DATASTORE_put (dsh,
2946                             0, &query, dsize, &put[1],
2947                             type, prq.priority, 1 /* anonymity */, 
2948                             expiration, 
2949                             1 + prq.priority, MAX_DATASTORE_QUEUE,
2950                             GNUNET_CONSTANTS_SERVICE_TIMEOUT,
2951                             &put_migration_continuation, 
2952                             start);
2953     }
2954   putl = GNUNET_LOAD_get_load (datastore_put_load);
2955   if ( (GNUNET_NO == prq.request_found) &&
2956        ( (GNUNET_YES != active_migration) ||
2957          (putl > 2.0) ) )
2958     {
2959       cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2960                                               &other->hashPubKey);
2961       if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).value < 5000)
2962         return GNUNET_OK; /* already blocked */
2963       /* We're too busy; send MigrationStop message! */
2964       if (GNUNET_YES != active_migration) 
2965         putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
2966       block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
2967                                                   5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2968                                                                                    (unsigned int) (60000 * putl * putl)));
2969       
2970       cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
2971       pm = GNUNET_malloc (sizeof (struct PendingMessage) + 
2972                           sizeof (struct MigrationStopMessage));
2973       pm->msize = sizeof (struct MigrationStopMessage);
2974       pm->priority = UINT32_MAX;
2975       msm = (struct MigrationStopMessage*) &pm[1];
2976       msm->header.size = htons (sizeof (struct MigrationStopMessage));
2977       msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
2978       msm->duration = GNUNET_TIME_relative_hton (block_time);
2979       add_to_pending_messages_for_peer (cp,
2980                                         pm,
2981                                         NULL);
2982     }
2983   return GNUNET_OK;
2984 }
2985
2986
2987 /**
2988  * Handle P2P "MIGRATION_STOP" message.
2989  *
2990  * @param cls closure, always NULL
2991  * @param other the other peer involved (sender or receiver, NULL
2992  *        for loopback messages where we are both sender and receiver)
2993  * @param message the actual message
2994  * @param latency reported latency of the connection with 'other'
2995  * @param distance reported distance (DV) to 'other' 
2996  * @return GNUNET_OK to keep the connection open,
2997  *         GNUNET_SYSERR to close it (signal serious error)
2998  */
2999 static int
3000 handle_p2p_migration_stop (void *cls,
3001                            const struct GNUNET_PeerIdentity *other,
3002                            const struct GNUNET_MessageHeader *message,
3003                            struct GNUNET_TIME_Relative latency,
3004                            uint32_t distance)
3005 {
3006   struct ConnectedPeer *cp; 
3007   const struct MigrationStopMessage *msm;
3008
3009   msm = (const struct MigrationStopMessage*) message;
3010   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3011                                           &other->hashPubKey);
3012   if (cp == NULL)
3013     {
3014       GNUNET_break (0);
3015       return GNUNET_OK;
3016     }
3017   cp->migration_blocked = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration));
3018   return GNUNET_OK;
3019 }
3020
3021
3022
3023 /* **************************** P2P GET Handling ************************ */
3024
3025
3026 /**
3027  * Closure for 'check_duplicate_request_{peer,client}'.
3028  */
3029 struct CheckDuplicateRequestClosure
3030 {
3031   /**
3032    * The new request we should check if it already exists.
3033    */
3034   const struct PendingRequest *pr;
3035
3036   /**
3037    * Existing request found by the checker, NULL if none.
3038    */
3039   struct PendingRequest *have;
3040 };
3041
3042
3043 /**
3044  * Iterator over entries in the 'query_request_map' that
3045  * tries to see if we have the same request pending from
3046  * the same client already.
3047  *
3048  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
3049  * @param key current key code (query, ignored, must match)
3050  * @param value value in the hash map (a 'struct PendingRequest' 
3051  *              that already exists)
3052  * @return GNUNET_YES if we should continue to
3053  *         iterate (no match yet)
3054  *         GNUNET_NO if not (match found).
3055  */
3056 static int
3057 check_duplicate_request_client (void *cls,
3058                                 const GNUNET_HashCode * key,
3059                                 void *value)
3060 {
3061   struct CheckDuplicateRequestClosure *cdc = cls;
3062   struct PendingRequest *have = value;
3063
3064   if (have->client_request_list == NULL)
3065     return GNUNET_YES;
3066   if ( (cdc->pr->client_request_list->client_list->client == have->client_request_list->client_list->client) &&
3067        (cdc->pr != have) )
3068     {
3069       cdc->have = have;
3070       return GNUNET_NO;
3071     }
3072   return GNUNET_YES;
3073 }
3074
3075
3076 /**
3077  * We're processing (local) results for a search request
3078  * from another peer.  Pass applicable results to the
3079  * peer and if we are done either clean up (operation
3080  * complete) or forward to other peers (more results possible).
3081  *
3082  * @param cls our closure (struct LocalGetContext)
3083  * @param key key for the content
3084  * @param size number of bytes in data
3085  * @param data content stored
3086  * @param type type of the content
3087  * @param priority priority of the content
3088  * @param anonymity anonymity-level for the content
3089  * @param expiration expiration time for the content
3090  * @param uid unique identifier for the datum;
3091  *        maybe 0 if no unique identifier is available
3092  */
3093 static void
3094 process_local_reply (void *cls,
3095                      const GNUNET_HashCode * key,
3096                      uint32_t size,
3097                      const void *data,
3098                      enum GNUNET_BLOCK_Type type,
3099                      uint32_t priority,
3100                      uint32_t anonymity,
3101                      struct GNUNET_TIME_Absolute
3102                      expiration, 
3103                      uint64_t uid)
3104 {
3105   struct PendingRequest *pr = cls;
3106   struct ProcessReplyClosure prq;
3107   struct CheckDuplicateRequestClosure cdrc;
3108   GNUNET_HashCode query;
3109   unsigned int old_rf;
3110   
3111   if (NULL == key)
3112     {
3113 #if DEBUG_FS > 1
3114       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3115                   "Done processing local replies, forwarding request to other peers.\n");
3116 #endif
3117       pr->qe = NULL;
3118       if (pr->client_request_list != NULL)
3119         {
3120           GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
3121                                       GNUNET_YES);
3122           /* Figure out if this is a duplicate request and possibly
3123              merge 'struct PendingRequest' entries */
3124           cdrc.have = NULL;
3125           cdrc.pr = pr;
3126           GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3127                                                       &pr->query,
3128                                                       &check_duplicate_request_client,
3129                                                       &cdrc);
3130           if (cdrc.have != NULL)
3131             {
3132 #if DEBUG_FS
3133               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3134                           "Received request for block `%s' twice from client, will only request once.\n",
3135                           GNUNET_h2s (&pr->query));
3136 #endif
3137               
3138               destroy_pending_request (pr);
3139               return;
3140             }
3141         }
3142
3143       /* no more results */
3144       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
3145         pr->task = GNUNET_SCHEDULER_add_now (sched,
3146                                              &forward_request_task,
3147                                              pr);      
3148       return;
3149     }
3150 #if DEBUG_FS
3151   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3152               "New local response to `%s' of type %u.\n",
3153               GNUNET_h2s (key),
3154               type);
3155 #endif
3156   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3157     {
3158 #if DEBUG_FS
3159       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3160                   "Found ONDEMAND block, performing on-demand encoding\n");
3161 #endif
3162       GNUNET_STATISTICS_update (stats,
3163                                 gettext_noop ("# on-demand blocks matched requests"),
3164                                 1,
3165                                 GNUNET_NO);
3166       if (GNUNET_OK != 
3167           GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, 
3168                                             anonymity, expiration, uid, 
3169                                             &process_local_reply,
3170                                             pr))
3171       if (pr->qe != NULL)
3172         {
3173           GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3174         }
3175       return;
3176     }
3177   old_rf = pr->results_found;
3178   memset (&prq, 0, sizeof (prq));
3179   prq.data = data;
3180   prq.expiration = expiration;
3181   prq.size = size;  
3182   if (GNUNET_OK != 
3183       GNUNET_BLOCK_get_key (block_ctx,
3184                             type,
3185                             data,
3186                             size,
3187                             &query))
3188     {
3189       GNUNET_break (0);
3190       GNUNET_DATASTORE_remove (dsh,
3191                                key,
3192                                size, data,
3193                                -1, -1, 
3194                                GNUNET_TIME_UNIT_FOREVER_REL,
3195                                NULL, NULL);
3196       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3197       return;
3198     }
3199   prq.type = type;
3200   prq.priority = priority;  
3201   prq.finished = GNUNET_NO;
3202   prq.request_found = GNUNET_NO;
3203   process_reply (&prq, key, pr);
3204   if ( (old_rf == 0) &&
3205        (pr->results_found == 1) )
3206     update_datastore_delays (pr->start_time);
3207   if (prq.finished == GNUNET_YES)
3208     return;
3209   if (pr->qe == NULL)
3210     return; /* done here */
3211   if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
3212     {
3213       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3214       return;
3215     }
3216   if ( (pr->client_request_list == NULL) &&
3217        ( (GNUNET_YES == test_load_too_high()) ||
3218          (pr->results_found > 5 + 2 * pr->priority) ) )
3219     {
3220 #if DEBUG_FS > 2
3221       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3222                   "Load too high, done with request\n");
3223 #endif
3224       GNUNET_STATISTICS_update (stats,
3225                                 gettext_noop ("# processing result set cut short due to load"),
3226                                 1,
3227                                 GNUNET_NO);
3228       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3229       return;
3230     }
3231   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3232 }
3233
3234
3235 /**
3236  * We've received a request with the specified priority.  Bound it
3237  * according to how much we trust the given peer.
3238  * 
3239  * @param prio_in requested priority
3240  * @param cp the peer making the request
3241  * @return effective priority
3242  */
3243 static uint32_t
3244 bound_priority (uint32_t prio_in,
3245                 struct ConnectedPeer *cp)
3246 {
3247 #define N ((double)128.0)
3248   uint32_t ret;
3249   double rret;
3250   int ld;
3251
3252   ld = test_load_too_high ();
3253   if (ld == GNUNET_SYSERR)
3254     return 0; /* excess resources */
3255   ret = change_host_trust (cp, prio_in);
3256   if (ret > 0)
3257     {
3258       if (ret > current_priorities + N)
3259         rret = current_priorities + N;
3260       else
3261         rret = ret;
3262       current_priorities 
3263         = (current_priorities * (N-1) + rret)/N;
3264     }
3265 #undef N
3266   return ret;
3267 }
3268
3269
3270 /**
3271  * Iterator over entries in the 'query_request_map' that
3272  * tries to see if we have the same request pending from
3273  * the same peer already.
3274  *
3275  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
3276  * @param key current key code (query, ignored, must match)
3277  * @param value value in the hash map (a 'struct PendingRequest' 
3278  *              that already exists)
3279  * @return GNUNET_YES if we should continue to
3280  *         iterate (no match yet)
3281  *         GNUNET_NO if not (match found).
3282  */
3283 static int
3284 check_duplicate_request_peer (void *cls,
3285                               const GNUNET_HashCode * key,
3286                               void *value)
3287 {
3288   struct CheckDuplicateRequestClosure *cdc = cls;
3289   struct PendingRequest *have = value;
3290
3291   if (cdc->pr->target_pid == have->target_pid)
3292     {
3293       cdc->have = have;
3294       return GNUNET_NO;
3295     }
3296   return GNUNET_YES;
3297 }
3298
3299
3300 /**
3301  * Handle P2P "GET" request.
3302  *
3303  * @param cls closure, always NULL
3304  * @param other the other peer involved (sender or receiver, NULL
3305  *        for loopback messages where we are both sender and receiver)
3306  * @param message the actual message
3307  * @param latency reported latency of the connection with 'other'
3308  * @param distance reported distance (DV) to 'other' 
3309  * @return GNUNET_OK to keep the connection open,
3310  *         GNUNET_SYSERR to close it (signal serious error)
3311  */
3312 static int
3313 handle_p2p_get (void *cls,
3314                 const struct GNUNET_PeerIdentity *other,
3315                 const struct GNUNET_MessageHeader *message,
3316                 struct GNUNET_TIME_Relative latency,
3317                 uint32_t distance)
3318 {
3319   struct PendingRequest *pr;
3320   struct ConnectedPeer *cp;
3321   struct ConnectedPeer *cps;
3322   struct CheckDuplicateRequestClosure cdc;
3323   struct GNUNET_TIME_Relative timeout;
3324   uint16_t msize;
3325   const struct GetMessage *gm;
3326   unsigned int bits;
3327   const GNUNET_HashCode *opt;
3328   uint32_t bm;
3329   size_t bfsize;
3330   uint32_t ttl_decrement;
3331   enum GNUNET_BLOCK_Type type;
3332   int have_ns;
3333   int ld;
3334
3335   msize = ntohs(message->size);
3336   if (msize < sizeof (struct GetMessage))
3337     {
3338       GNUNET_break_op (0);
3339       return GNUNET_SYSERR;
3340     }
3341   gm = (const struct GetMessage*) message;
3342   type = ntohl (gm->type);
3343   bm = ntohl (gm->hash_bitmap);
3344   bits = 0;
3345   while (bm > 0)
3346     {
3347       if (1 == (bm & 1))
3348         bits++;
3349       bm >>= 1;
3350     }
3351   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
3352     {
3353       GNUNET_break_op (0);
3354       return GNUNET_SYSERR;
3355     }  
3356   opt = (const GNUNET_HashCode*) &gm[1];
3357   bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
3358   bm = ntohl (gm->hash_bitmap);
3359   bits = 0;
3360   cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3361                                            &other->hashPubKey);
3362   if (NULL == cps)
3363     {
3364       /* peer must have just disconnected */
3365       GNUNET_STATISTICS_update (stats,
3366                                 gettext_noop ("# requests dropped due to initiator not being connected"),
3367                                 1,
3368                                 GNUNET_NO);
3369       return GNUNET_SYSERR;
3370     }
3371   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
3372     cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3373                                             &opt[bits++]);
3374   else
3375     cp = cps;
3376   if (cp == NULL)
3377     {
3378 #if DEBUG_FS
3379       if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
3380         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3381                     "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
3382                     GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
3383       
3384       else
3385         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3386                     "Failed to find peer `%4s' in connection set. Dropping query.\n",
3387                     GNUNET_i2s (other));
3388 #endif
3389       GNUNET_STATISTICS_update (stats,
3390                                 gettext_noop ("# requests dropped due to missing reverse route"),
3391                                 1,
3392                                 GNUNET_NO);
3393      /* FIXME: try connect? */
3394       return GNUNET_OK;
3395     }
3396   /* note that we can really only check load here since otherwise
3397      peers could find out that we are overloaded by not being
3398      disconnected after sending us a malformed query... */
3399
3400   /* FIXME: query priority should play
3401      a major role here! */
3402   ld = test_load_too_high ();
3403   if (GNUNET_YES == ld)
3404     {
3405 #if DEBUG_FS
3406       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3407                   "Dropping query from `%s', this peer is too busy.\n",
3408                   GNUNET_i2s (other));
3409 #endif
3410       GNUNET_STATISTICS_update (stats,
3411                                 gettext_noop ("# requests dropped due to high load"),
3412                                 1,
3413                                 GNUNET_NO);
3414       return GNUNET_OK;
3415     }
3416   /* FIXME: if ld == GNUNET_NO, forward
3417      instead of indirecting! */
3418
3419 #if DEBUG_FS 
3420   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3421               "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
3422               GNUNET_h2s (&gm->query),
3423               (unsigned int) type,
3424               GNUNET_i2s (other),
3425               (unsigned int) bm);
3426 #endif
3427   have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
3428   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
3429                       (have_ns ? sizeof(GNUNET_HashCode) : 0));
3430   if (have_ns)
3431     {
3432       pr->namespace = (GNUNET_HashCode*) &pr[1];
3433       memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
3434     }
3435   pr->type = type;
3436   pr->mingle = ntohl (gm->filter_mutator);
3437   if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
3438     pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
3439   pr->anonymity_level = 1;
3440   pr->priority = bound_priority (ntohl (gm->priority), cps);
3441   pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
3442   pr->query = gm->query;
3443   /* decrement ttl (always) */
3444   ttl_decrement = 2 * TTL_DECREMENT +
3445     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3446                               TTL_DECREMENT);
3447   if ( (pr->ttl < 0) &&
3448        (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
3449     {
3450 #if DEBUG_FS
3451       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3452                   "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
3453                   GNUNET_i2s (other),
3454                   pr->ttl,
3455                   ttl_decrement);
3456 #endif
3457       GNUNET_STATISTICS_update (stats,
3458                                 gettext_noop ("# requests dropped due TTL underflow"),
3459                                 1,
3460                                 GNUNET_NO);
3461       /* integer underflow => drop (should be very rare)! */      
3462       GNUNET_free (pr);
3463       return GNUNET_OK;
3464     } 
3465   pr->ttl -= ttl_decrement;
3466   pr->start_time = GNUNET_TIME_absolute_get ();
3467
3468   /* get bloom filter */
3469   if (bfsize > 0)
3470     {
3471       pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
3472                                                   bfsize,
3473                                                   BLOOMFILTER_K);
3474       pr->bf_size = bfsize;
3475     }
3476
3477   cdc.have = NULL;
3478   cdc.pr = pr;
3479   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3480                                               &gm->query,
3481                                               &check_duplicate_request_peer,
3482                                               &cdc);
3483   if (cdc.have != NULL)
3484     {
3485       if (cdc.have->start_time.value + cdc.have->ttl >=
3486           pr->start_time.value + pr->ttl)
3487         {
3488           /* existing request has higher TTL, drop new one! */
3489           cdc.have->priority += pr->priority;
3490           destroy_pending_request (pr);
3491 #if DEBUG_FS
3492           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3493                       "Have existing request with higher TTL, dropping new request.\n",
3494                       GNUNET_i2s (other));
3495 #endif
3496           GNUNET_STATISTICS_update (stats,
3497                                     gettext_noop ("# requests dropped due to higher-TTL request"),
3498                                     1,
3499                                     GNUNET_NO);
3500           return GNUNET_OK;
3501         }
3502       else
3503         {
3504           /* existing request has lower TTL, drop old one! */
3505           pr->priority += cdc.have->priority;
3506           /* Possible optimization: if we have applicable pending
3507              replies in 'cdc.have', we might want to move those over
3508              (this is a really rare special-case, so it is not clear
3509              that this would be worth it) */
3510           destroy_pending_request (cdc.have);
3511           /* keep processing 'pr'! */
3512         }
3513     }
3514
3515   pr->cp = cp;
3516   GNUNET_break (GNUNET_OK ==
3517                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
3518                                                    &gm->query,
3519                                                    pr,
3520                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
3521   GNUNET_break (GNUNET_OK ==
3522                 GNUNET_CONTAINER_multihashmap_put (peer_request_map,
3523                                                    &other->hashPubKey,
3524                                                    pr,
3525                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
3526   
3527   pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
3528                                             pr,
3529                                             pr->start_time.value + pr->ttl);
3530
3531   GNUNET_STATISTICS_update (stats,
3532                             gettext_noop ("# P2P searches received"),
3533                             1,
3534                             GNUNET_NO);
3535   GNUNET_STATISTICS_update (stats,
3536                             gettext_noop ("# P2P searches active"),
3537                             1,
3538                             GNUNET_NO);
3539
3540   /* calculate change in traffic preference */
3541   cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
3542   /* process locally */
3543   if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
3544     type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
3545   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
3546                                            (pr->priority + 1)); 
3547   pr->qe = GNUNET_DATASTORE_get (dsh,
3548                                  &gm->query,
3549                                  type,                         
3550                                  pr->priority + 1,
3551                                  MAX_DATASTORE_QUEUE,                            
3552                                  timeout,
3553                                  &process_local_reply,
3554                                  pr);
3555
3556   /* Are multiple results possible?  If so, start processing remotely now! */
3557   switch (pr->type)
3558     {
3559     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
3560     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
3561       /* only one result, wait for datastore */
3562       break;
3563     default:
3564       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
3565         pr->task = GNUNET_SCHEDULER_add_now (sched,
3566                                              &forward_request_task,
3567                                              pr);
3568     }
3569
3570   /* make sure we don't track too many requests */
3571   if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
3572     {
3573       pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
3574       GNUNET_assert (pr != NULL);
3575       destroy_pending_request (pr);
3576     }
3577   return GNUNET_OK;
3578 }
3579
3580
3581 /* **************************** CS GET Handling ************************ */
3582
3583
3584 /**
3585  * Handle START_SEARCH-message (search request from client).
3586  *
3587  * @param cls closure
3588  * @param client identification of the client
3589  * @param message the actual message
3590  */
3591 static void
3592 handle_start_search (void *cls,
3593                      struct GNUNET_SERVER_Client *client,
3594                      const struct GNUNET_MessageHeader *message)
3595 {
3596   static GNUNET_HashCode all_zeros;
3597   const struct SearchMessage *sm;
3598   struct ClientList *cl;
3599   struct ClientRequestList *crl;
3600   struct PendingRequest *pr;
3601   uint16_t msize;
3602   unsigned int sc;
3603   enum GNUNET_BLOCK_Type type;
3604
3605   msize = ntohs (message->size);
3606   if ( (msize < sizeof (struct SearchMessage)) ||
3607        (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
3608     {
3609       GNUNET_break (0);
3610       GNUNET_SERVER_receive_done (client,
3611                                   GNUNET_SYSERR);
3612       return;
3613     }
3614   GNUNET_STATISTICS_update (stats,
3615                             gettext_noop ("# client searches received"),
3616                             1,
3617                             GNUNET_NO);
3618   sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
3619   sm = (const struct SearchMessage*) message;
3620   type = ntohl (sm->type);
3621 #if DEBUG_FS
3622   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3623               "Received request for `%s' of type %u from local client\n",
3624               GNUNET_h2s (&sm->query),
3625               (unsigned int) type);
3626 #endif
3627   cl = client_list;
3628   while ( (cl != NULL) &&
3629           (cl->client != client) )
3630     cl = cl->next;
3631   if (cl == NULL)
3632     {
3633       cl = GNUNET_malloc (sizeof (struct ClientList));
3634       cl->client = client;
3635       GNUNET_SERVER_client_keep (client);
3636       cl->next = client_list;
3637       client_list = cl;
3638     }
3639   /* detect duplicate KBLOCK requests */
3640   if ( (type == GNUNET_BLOCK_TYPE_FS_KBLOCK) ||
3641        (type == GNUNET_BLOCK_TYPE_FS_NBLOCK) ||
3642        (type == GNUNET_BLOCK_TYPE_ANY) )
3643     {
3644       crl = cl->rl_head;
3645       while ( (crl != NULL) &&
3646               ( (0 != memcmp (&crl->req->query,
3647                               &sm->query,
3648                               sizeof (GNUNET_HashCode))) ||
3649                 (crl->req->type != type) ) )
3650         crl = crl->next;
3651       if (crl != NULL)  
3652         { 
3653 #if DEBUG_FS
3654           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3655                       "Have existing request, merging content-seen lists.\n");
3656 #endif
3657           pr = crl->req;
3658           /* Duplicate request (used to send long list of
3659              known/blocked results); merge 'pr->replies_seen'
3660              and update bloom filter */
3661           GNUNET_array_grow (pr->replies_seen,
3662                              pr->replies_seen_size,
3663                              pr->replies_seen_off + sc);
3664           memcpy (&pr->replies_seen[pr->replies_seen_off],
3665                   &sm[1],
3666                   sc * sizeof (GNUNET_HashCode));
3667           pr->replies_seen_off += sc;
3668           refresh_bloomfilter (pr);
3669           GNUNET_STATISTICS_update (stats,
3670                                     gettext_noop ("# client searches updated (merged content seen list)"),
3671                                     1,
3672                                     GNUNET_NO);
3673           GNUNET_SERVER_receive_done (client,
3674                                       GNUNET_OK);
3675           return;
3676         }
3677     }
3678   GNUNET_STATISTICS_update (stats,
3679                             gettext_noop ("# client searches active"),
3680                             1,
3681                             GNUNET_NO);
3682   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
3683                       ((type == GNUNET_BLOCK_TYPE_FS_SBLOCK) ? sizeof(GNUNET_HashCode) : 0));
3684   crl = GNUNET_malloc (sizeof (struct ClientRequestList));
3685   memset (crl, 0, sizeof (struct ClientRequestList));
3686   crl->client_list = cl;
3687   GNUNET_CONTAINER_DLL_insert (cl->rl_head,
3688                                cl->rl_tail,
3689                                crl);  
3690   crl->req = pr;
3691   pr->type = type;
3692   pr->client_request_list = crl;
3693   GNUNET_array_grow (pr->replies_seen,
3694                      pr->replies_seen_size,
3695                      sc);
3696   memcpy (pr->replies_seen,
3697           &sm[1],
3698           sc * sizeof (GNUNET_HashCode));
3699   pr->replies_seen_off = sc;
3700   pr->anonymity_level = ntohl (sm->anonymity_level); 
3701   pr->start_time = GNUNET_TIME_absolute_get ();
3702   refresh_bloomfilter (pr);
3703   pr->query = sm->query;
3704   if (0 == (1 & ntohl (sm->options)))
3705     pr->local_only = GNUNET_NO;
3706   else
3707     pr->local_only = GNUNET_YES;
3708   switch (type)
3709     {
3710     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
3711     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
3712       if (0 != memcmp (&sm->target,
3713                        &all_zeros,
3714                        sizeof (GNUNET_HashCode)))
3715         pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target);
3716       break;
3717     case GNUNET_BLOCK_TYPE_FS_SBLOCK:
3718       pr->namespace = (GNUNET_HashCode*) &pr[1];
3719       memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode));
3720       break;
3721     default:
3722       break;
3723     }
3724   GNUNET_break (GNUNET_OK ==
3725                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
3726                                                    &sm->query,
3727                                                    pr,
3728                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
3729   if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
3730     type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */
3731   pr->qe = GNUNET_DATASTORE_get (dsh,
3732                                  &sm->query,
3733                                  type,
3734                                  -3, -1,
3735                                  GNUNET_CONSTANTS_SERVICE_TIMEOUT,                             
3736                                  &process_local_reply,
3737                                  pr);
3738 }
3739
3740
3741 /* **************************** Startup ************************ */
3742
3743 /**
3744  * Process fs requests.
3745  *
3746  * @param s scheduler to use
3747  * @param server the initialized server
3748  * @param c configuration to use
3749  */
3750 static int
3751 main_init (struct GNUNET_SCHEDULER_Handle *s,
3752            struct GNUNET_SERVER_Handle *server,
3753            const struct GNUNET_CONFIGURATION_Handle *c)
3754 {
3755   static const struct GNUNET_CORE_MessageHandler p2p_handlers[] =
3756     {
3757       { &handle_p2p_get, 
3758         GNUNET_MESSAGE_TYPE_FS_GET, 0 },
3759       { &handle_p2p_put, 
3760         GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
3761       { &handle_p2p_migration_stop, 
3762         GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP,
3763         sizeof (struct MigrationStopMessage) },
3764       { NULL, 0, 0 }
3765     };
3766   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
3767     {&GNUNET_FS_handle_index_start, NULL, 
3768      GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
3769     {&GNUNET_FS_handle_index_list_get, NULL, 
3770      GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
3771     {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
3772      sizeof (struct UnindexMessage) },
3773     {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
3774      0 },
3775     {NULL, NULL, 0, 0}
3776   };
3777   unsigned long long enc = 128;
3778
3779   sched = s;
3780   cfg = c;
3781   stats = GNUNET_STATISTICS_create (sched, "fs", cfg);
3782   min_migration_delay = GNUNET_TIME_UNIT_SECONDS;
3783   if ( (GNUNET_OK !=
3784         GNUNET_CONFIGURATION_get_value_number (cfg,
3785                                                "fs",
3786                                                "MAX_PENDING_REQUESTS",
3787                                                &max_pending_requests)) ||
3788        (GNUNET_OK !=
3789         GNUNET_CONFIGURATION_get_value_number (cfg,
3790                                                "fs",
3791                                                "EXPECTED_NEIGHBOUR_COUNT",
3792                                                &enc)) ||
3793        (GNUNET_OK != 
3794         GNUNET_CONFIGURATION_get_value_time (cfg,
3795                                              "fs",
3796                                              "MIN_MIGRATION_DELAY",
3797                                              &min_migration_delay)) )
3798     {
3799       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3800                   _("Configuration fails to specify certain parameters, assuming default values."));
3801     }
3802   connected_peers = GNUNET_CONTAINER_multihashmap_create (enc); 
3803   query_request_map = GNUNET_CONTAINER_multihashmap_create (max_pending_requests);
3804   peer_request_map = GNUNET_CONTAINER_multihashmap_create (enc);
3805   requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
3806   core = GNUNET_CORE_connect (sched,
3807                               cfg,
3808                               GNUNET_TIME_UNIT_FOREVER_REL,
3809                               NULL,
3810                               NULL,
3811                               &peer_connect_handler,
3812                               &peer_disconnect_handler,
3813                               NULL,
3814                               NULL, GNUNET_NO,
3815                               NULL, GNUNET_NO,
3816                               p2p_handlers);
3817   if (NULL == core)
3818     {
3819       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3820                   _("Failed to connect to `%s' service.\n"),
3821                   "core");
3822       GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
3823       connected_peers = NULL;
3824       GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
3825       query_request_map = NULL;
3826       GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
3827       requests_by_expiration_heap = NULL;
3828       GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
3829       peer_request_map = NULL;
3830       if (dsh != NULL)
3831         {
3832           GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
3833           dsh = NULL;
3834         }
3835       return GNUNET_SYSERR;
3836     }
3837   /* FIXME: distinguish between sending and storing in options? */
3838   if (active_migration) 
3839     {
3840       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3841                   _("Content migration is enabled, will start to gather data\n"));
3842       consider_migration_gathering ();
3843     }
3844   GNUNET_SERVER_disconnect_notify (server, 
3845                                    &handle_client_disconnect,
3846                                    NULL);
3847   GNUNET_assert (GNUNET_OK ==
3848                  GNUNET_CONFIGURATION_get_value_filename (cfg,
3849                                                           "fs",
3850                                                           "TRUST",
3851                                                           &trustDirectory));
3852   GNUNET_DISK_directory_create (trustDirectory);
3853   GNUNET_SCHEDULER_add_with_priority (sched,
3854                                       GNUNET_SCHEDULER_PRIORITY_HIGH,
3855                                       &cron_flush_trust, NULL);
3856
3857
3858   GNUNET_SERVER_add_handlers (server, handlers);
3859   GNUNET_SCHEDULER_add_delayed (sched,
3860                                 GNUNET_TIME_UNIT_FOREVER_REL,
3861                                 &shutdown_task,
3862                                 NULL);
3863   return GNUNET_OK;
3864 }
3865
3866
3867 /**
3868  * Process fs requests.
3869  *
3870  * @param cls closure
3871  * @param sched scheduler to use
3872  * @param server the initialized server
3873  * @param cfg configuration to use
3874  */
3875 static void
3876 run (void *cls,
3877      struct GNUNET_SCHEDULER_Handle *sched,
3878      struct GNUNET_SERVER_Handle *server,
3879      const struct GNUNET_CONFIGURATION_Handle *cfg)
3880 {
3881   active_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
3882                                                            "FS",
3883                                                            "ACTIVEMIGRATION");
3884   dsh = GNUNET_DATASTORE_connect (cfg,
3885                                   sched);
3886   if (dsh == NULL)
3887     {
3888       GNUNET_SCHEDULER_shutdown (sched);
3889       return;
3890     }
3891   datastore_get_load = GNUNET_LOAD_value_init ();
3892   datastore_put_load = GNUNET_LOAD_value_init ();
3893   block_cfg = GNUNET_CONFIGURATION_create ();
3894   GNUNET_CONFIGURATION_set_value_string (block_cfg,
3895                                          "block",
3896                                          "PLUGINS",
3897                                          "fs");
3898   block_ctx = GNUNET_BLOCK_context_create (block_cfg);
3899   GNUNET_assert (NULL != block_ctx);
3900   dht_handle = GNUNET_DHT_connect (sched,
3901                                    cfg,
3902                                    FS_DHT_HT_SIZE);
3903   if ( (GNUNET_OK != GNUNET_FS_indexing_init (sched, cfg, dsh)) ||
3904        (GNUNET_OK != main_init (sched, server, cfg)) )
3905     {    
3906       GNUNET_SCHEDULER_shutdown (sched);
3907       GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
3908       dsh = NULL;
3909       GNUNET_DHT_disconnect (dht_handle);
3910       dht_handle = NULL;
3911       GNUNET_BLOCK_context_destroy (block_ctx);
3912       block_ctx = NULL;
3913       GNUNET_CONFIGURATION_destroy (block_cfg);
3914       block_cfg = NULL;
3915       GNUNET_LOAD_value_free (datastore_get_load);
3916       datastore_get_load = NULL;
3917       GNUNET_LOAD_value_free (datastore_put_load);
3918       datastore_put_load = NULL;
3919       return;   
3920     }
3921 }
3922
3923
3924 /**
3925  * The main function for the fs service.
3926  *
3927  * @param argc number of arguments from the command line
3928  * @param argv command line arguments
3929  * @return 0 ok, 1 on error
3930  */
3931 int
3932 main (int argc, char *const *argv)
3933 {
3934   return (GNUNET_OK ==
3935           GNUNET_SERVICE_run (argc,
3936                               argv,
3937                               "fs",
3938                               GNUNET_SERVICE_OPTION_NONE,
3939                               &run, NULL)) ? 0 : 1;
3940 }
3941
3942 /* end of gnunet-service-fs.c */