plane hacking
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs.c
23  * @brief gnunet anonymity protocol implementation
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - track per-peer request latency (using new load API)
28  * - consider more precise latency estimation (per-peer & request) -- again load API?
29  * - implement test_load_too_high, make decision priority-based, implement forwarding, etc.
30  * - introduce random latency in processing
31  * - tell other peers to stop migration if our PUTs fail (or if
32  *   we don't support migration per configuration?)
33  * - more statistics
34  */
35 #include "platform.h"
36 #include <float.h>
37 #include "gnunet_constants.h"
38 #include "gnunet_core_service.h"
39 #include "gnunet_dht_service.h"
40 #include "gnunet_datastore_service.h"
41 #include "gnunet_load_lib.h"
42 #include "gnunet_peer_lib.h"
43 #include "gnunet_protocols.h"
44 #include "gnunet_signatures.h"
45 #include "gnunet_statistics_service.h"
46 #include "gnunet_util_lib.h"
47 #include "gnunet-service-fs_indexing.h"
48 #include "fs.h"
49
50 #define DEBUG_FS GNUNET_NO
51
52 /**
53  * Maximum number of outgoing messages we queue per peer.
54  */
55 #define MAX_QUEUE_PER_PEER 16
56
57 /**
58  * Size for the hash map for DHT requests from the FS
59  * service.  Should be about the number of concurrent
60  * DHT requests we plan to make.
61  */
62 #define FS_DHT_HT_SIZE 1024
63
64 /**
65  * How often do we flush trust values to disk?
66  */
67 #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
68
69 /**
70  * Inverse of the probability that we will submit the same query
71  * to the same peer again.  If the same peer already got the query
72  * repeatedly recently, the probability is multiplied by the inverse
73  * of this number each time.  Note that we only try about every TTL_DECREMENT/2
74  * plus MAX_CORK_DELAY (so roughly every 3.5s).
75  */
76 #define RETRY_PROBABILITY_INV 3
77
78 /**
79  * What is the maximum delay for a P2P FS message (in our interaction
80  * with core)?  FS-internal delays are another story.  The value is
81  * chosen based on the 32k block size.  Given that peers typcially
82  * have at least 1 kb/s bandwidth, 45s waits give us a chance to
83  * transmit one message even to the lowest-bandwidth peers.
84  */
85 #define MAX_TRANSMIT_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 45)
86
87 /**
88  * Maximum number of requests (from other peers) that we're
89  * willing to have pending at any given point in time.
90  */
91 static unsigned long long max_pending_requests = (32 * 1024);
92
93
94 /**
95  * Information we keep for each pending reply.  The
96  * actual message follows at the end of this struct.
97  */
98 struct PendingMessage;
99
100 /**
101  * Function called upon completion of a transmission.
102  *
103  * @param cls closure
104  * @param pid ID of receiving peer, 0 on transmission error
105  */
106 typedef void (*TransmissionContinuation)(void * cls, 
107                                          GNUNET_PEER_Id tpid);
108
109
110 /**
111  * Information we keep for each pending message (GET/PUT).  The
112  * actual message follows at the end of this struct.
113  */
114 struct PendingMessage
115 {
116   /**
117    * This is a doubly-linked list of messages to the same peer.
118    */
119   struct PendingMessage *next;
120
121   /**
122    * This is a doubly-linked list of messages to the same peer.
123    */
124   struct PendingMessage *prev;
125
126   /**
127    * Entry in pending message list for this pending message.
128    */ 
129   struct PendingMessageList *pml;  
130
131   /**
132    * Function to call immediately once we have transmitted this
133    * message.
134    */
135   TransmissionContinuation cont;
136
137   /**
138    * Closure for cont.
139    */
140   void *cont_cls;
141
142   /**
143    * Size of the reply; actual reply message follows
144    * at the end of this struct.
145    */
146   size_t msize;
147   
148   /**
149    * How important is this message for us?
150    */
151   uint32_t priority;
152  
153 };
154
155
156 /**
157  * Information about a peer that we are connected to.
158  * We track data that is useful for determining which
159  * peers should receive our requests.  We also keep
160  * a list of messages to transmit to this peer.
161  */
162 struct ConnectedPeer
163 {
164
165   /**
166    * List of the last clients for which this peer successfully
167    * answered a query.
168    */
169   struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
170
171   /**
172    * List of the last PIDs for which
173    * this peer successfully answered a query;
174    * We use 0 to indicate no successful reply.
175    */
176   GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
177
178   /**
179    * Average delay between sending the peer a request and
180    * getting a reply (only calculated over the requests for
181    * which we actually got a reply).   Calculated
182    * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
183    */ 
184   struct GNUNET_TIME_Relative avg_delay;
185
186   /**
187    * Point in time until which this peer does not want us to migrate content
188    * to it.
189    */
190   struct GNUNET_TIME_Absolute migration_blocked;
191
192   /**
193    * Handle for an active request for transmission to this
194    * peer, or NULL.
195    */
196   struct GNUNET_CORE_TransmitHandle *cth;
197
198   /**
199    * Messages (replies, queries, content migration) we would like to
200    * send to this peer in the near future.  Sorted by priority, head.
201    */
202   struct PendingMessage *pending_messages_head;
203
204   /**
205    * Messages (replies, queries, content migration) we would like to
206    * send to this peer in the near future.  Sorted by priority, tail.
207    */
208   struct PendingMessage *pending_messages_tail;
209
210   /**
211    * Average priority of successful replies.  Calculated
212    * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
213    */
214   double avg_priority;
215
216   /**
217    * Increase in traffic preference still to be submitted
218    * to the core service for this peer.
219    */
220   uint64_t inc_preference;
221
222   /**
223    * Trust rating for this peer
224    */
225   uint32_t trust;
226
227   /**
228    * Trust rating for this peer on disk.
229    */
230   uint32_t disk_trust;
231
232   /**
233    * The peer's identity.
234    */
235   GNUNET_PEER_Id pid;  
236
237   /**
238    * Size of the linked list of 'pending_messages'.
239    */
240   unsigned int pending_requests;
241
242   /**
243    * Which offset in "last_p2p_replies" will be updated next?
244    * (we go round-robin).
245    */
246   unsigned int last_p2p_replies_woff;
247
248   /**
249    * Which offset in "last_client_replies" will be updated next?
250    * (we go round-robin).
251    */
252   unsigned int last_client_replies_woff;
253
254 };
255
256
257 /**
258  * Information we keep for each pending request.  We should try to
259  * keep this struct as small as possible since its memory consumption
260  * is key to how many requests we can have pending at once.
261  */
262 struct PendingRequest;
263
264
265 /**
266  * Doubly-linked list of requests we are performing
267  * on behalf of the same client.
268  */
269 struct ClientRequestList
270 {
271
272   /**
273    * This is a doubly-linked list.
274    */
275   struct ClientRequestList *next;
276
277   /**
278    * This is a doubly-linked list.
279    */
280   struct ClientRequestList *prev;
281
282   /**
283    * Request this entry represents.
284    */
285   struct PendingRequest *req;
286
287   /**
288    * Client list this request belongs to.
289    */
290   struct ClientList *client_list;
291
292 };
293
294
295 /**
296  * Replies to be transmitted to the client.  The actual
297  * response message is allocated after this struct.
298  */
299 struct ClientResponseMessage
300 {
301   /**
302    * This is a doubly-linked list.
303    */
304   struct ClientResponseMessage *next;
305
306   /**
307    * This is a doubly-linked list.
308    */
309   struct ClientResponseMessage *prev;
310
311   /**
312    * Client list entry this response belongs to.
313    */
314   struct ClientList *client_list;
315
316   /**
317    * Number of bytes in the response.
318    */
319   size_t msize;
320 };
321
322
323 /**
324  * Linked list of clients we are performing requests
325  * for right now.
326  */
327 struct ClientList
328 {
329   /**
330    * This is a linked list.
331    */
332   struct ClientList *next;
333
334   /**
335    * ID of a client making a request, NULL if this entry is for a
336    * peer.
337    */
338   struct GNUNET_SERVER_Client *client;
339
340   /**
341    * Head of list of requests performed on behalf
342    * of this client right now.
343    */
344   struct ClientRequestList *rl_head;
345
346   /**
347    * Tail of list of requests performed on behalf
348    * of this client right now.
349    */
350   struct ClientRequestList *rl_tail;
351
352   /**
353    * Head of linked list of responses.
354    */
355   struct ClientResponseMessage *res_head;
356
357   /**
358    * Tail of linked list of responses.
359    */
360   struct ClientResponseMessage *res_tail;
361
362   /**
363    * Context for sending replies.
364    */
365   struct GNUNET_CONNECTION_TransmitHandle *th;
366
367 };
368
369
370 /**
371  * Doubly-linked list of messages we are performing
372  * due to a pending request.
373  */
374 struct PendingMessageList
375 {
376
377   /**
378    * This is a doubly-linked list of messages on behalf of the same request.
379    */
380   struct PendingMessageList *next;
381
382   /**
383    * This is a doubly-linked list of messages on behalf of the same request.
384    */
385   struct PendingMessageList *prev;
386
387   /**
388    * Message this entry represents.
389    */
390   struct PendingMessage *pm;
391
392   /**
393    * Request this entry belongs to.
394    */
395   struct PendingRequest *req;
396
397   /**
398    * Peer this message is targeted for.
399    */
400   struct ConnectedPeer *target;
401
402 };
403
404
405 /**
406  * Information we keep for each pending request.  We should try to
407  * keep this struct as small as possible since its memory consumption
408  * is key to how many requests we can have pending at once.
409  */
410 struct PendingRequest
411 {
412
413   /**
414    * If this request was made by a client, this is our entry in the
415    * client request list; otherwise NULL.
416    */
417   struct ClientRequestList *client_request_list;
418
419   /**
420    * Entry of peer responsible for this entry (if this request
421    * was made by a peer).
422    */
423   struct ConnectedPeer *cp;
424
425   /**
426    * If this is a namespace query, pointer to the hash of the public
427    * key of the namespace; otherwise NULL.  Pointer will be to the 
428    * end of this struct (so no need to free it).
429    */
430   const GNUNET_HashCode *namespace;
431
432   /**
433    * Bloomfilter we use to filter out replies that we don't care about
434    * (anymore).  NULL as long as we are interested in all replies.
435    */
436   struct GNUNET_CONTAINER_BloomFilter *bf;
437
438   /**
439    * Context of our GNUNET_CORE_peer_change_preference call.
440    */
441   struct GNUNET_CORE_InformationRequestContext *irc;
442
443   /**
444    * Hash code of all replies that we have seen so far (only valid
445    * if client is not NULL since we only track replies like this for
446    * our own clients).
447    */
448   GNUNET_HashCode *replies_seen;
449
450   /**
451    * Node in the heap representing this entry; NULL
452    * if we have no heap node.
453    */
454   struct GNUNET_CONTAINER_HeapNode *hnode;
455
456   /**
457    * Head of list of messages being performed on behalf of this
458    * request.
459    */
460   struct PendingMessageList *pending_head;
461
462   /**
463    * Tail of list of messages being performed on behalf of this
464    * request.
465    */
466   struct PendingMessageList *pending_tail;
467
468   /**
469    * When did we first see this request (form this peer), or, if our
470    * client is initiating, when did we last initiate a search?
471    */
472   struct GNUNET_TIME_Absolute start_time;
473
474   /**
475    * The query that this request is for.
476    */
477   GNUNET_HashCode query;
478
479   /**
480    * The task responsible for transmitting queries
481    * for this request.
482    */
483   GNUNET_SCHEDULER_TaskIdentifier task;
484
485   /**
486    * (Interned) Peer identifier that identifies a preferred target
487    * for requests.
488    */
489   GNUNET_PEER_Id target_pid;
490
491   /**
492    * (Interned) Peer identifiers of peers that have already
493    * received our query for this content.
494    */
495   GNUNET_PEER_Id *used_pids;
496   
497   /**
498    * Our entry in the queue (non-NULL while we wait for our
499    * turn to interact with the local database).
500    */
501   struct GNUNET_DATASTORE_QueueEntry *qe;
502
503   /**
504    * Size of the 'bf' (in bytes).
505    */
506   size_t bf_size;
507
508   /**
509    * Desired anonymity level; only valid for requests from a local client.
510    */
511   uint32_t anonymity_level;
512
513   /**
514    * How many entries in "used_pids" are actually valid?
515    */
516   unsigned int used_pids_off;
517
518   /**
519    * How long is the "used_pids" array?
520    */
521   unsigned int used_pids_size;
522
523   /**
524    * Number of results found for this request.
525    */
526   unsigned int results_found;
527
528   /**
529    * How many entries in "replies_seen" are actually valid?
530    */
531   unsigned int replies_seen_off;
532
533   /**
534    * How long is the "replies_seen" array?
535    */
536   unsigned int replies_seen_size;
537   
538   /**
539    * Priority with which this request was made.  If one of our clients
540    * made the request, then this is the current priority that we are
541    * using when initiating the request.  This value is used when
542    * we decide to reward other peers with trust for providing a reply.
543    */
544   uint32_t priority;
545
546   /**
547    * Priority points left for us to spend when forwarding this request
548    * to other peers.
549    */
550   uint32_t remaining_priority;
551
552   /**
553    * Number to mingle hashes for bloom-filter tests with.
554    */
555   int32_t mingle;
556
557   /**
558    * TTL with which we saw this request (or, if we initiated, TTL that
559    * we used for the request).
560    */
561   int32_t ttl;
562   
563   /**
564    * Type of the content that this request is for.
565    */
566   enum GNUNET_BLOCK_Type type;
567
568   /**
569    * Remove this request after transmission of the current response.
570    */
571   int16_t do_remove;
572
573   /**
574    * GNUNET_YES if we should not forward this request to other peers.
575    */
576   int16_t local_only;
577
578 };
579
580
581 /**
582  * Block that is ready for migration to other peers.  Actual data is at the end of the block.
583  */
584 struct MigrationReadyBlock
585 {
586
587   /**
588    * This is a doubly-linked list.
589    */
590   struct MigrationReadyBlock *next;
591
592   /**
593    * This is a doubly-linked list.
594    */
595   struct MigrationReadyBlock *prev;
596
597   /**
598    * Query for the block.
599    */
600   GNUNET_HashCode query;
601
602   /**
603    * When does this block expire? 
604    */
605   struct GNUNET_TIME_Absolute expiration;
606
607   /**
608    * Peers we would consider forwarding this
609    * block to.  Zero for empty entries.
610    */
611   GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
612
613   /**
614    * Size of the block.
615    */
616   size_t size;
617
618   /**
619    *  Number of targets already used.
620    */
621   unsigned int used_targets;
622
623   /**
624    * Type of the block.
625    */
626   enum GNUNET_BLOCK_Type type;
627 };
628
629
630 /**
631  * Our connection to the datastore.
632  */
633 static struct GNUNET_DATASTORE_Handle *dsh;
634
635 /**
636  * Our block context.
637  */
638 static struct GNUNET_BLOCK_Context *block_ctx;
639
640 /**
641  * Our block configuration.
642  */
643 static struct GNUNET_CONFIGURATION_Handle *block_cfg;
644
645 /**
646  * Our scheduler.
647  */
648 static struct GNUNET_SCHEDULER_Handle *sched;
649
650 /**
651  * Our configuration.
652  */
653 static const struct GNUNET_CONFIGURATION_Handle *cfg;
654
655 /**
656  * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
657  */
658 static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
659
660 /**
661  * Map of peer identifiers to "struct PendingRequest" (for that peer).
662  */
663 static struct GNUNET_CONTAINER_MultiHashMap *peer_request_map;
664
665 /**
666  * Map of query identifiers to "struct PendingRequest" (for that query).
667  */
668 static struct GNUNET_CONTAINER_MultiHashMap *query_request_map;
669
670 /**
671  * Heap with the request that will expire next at the top.  Contains
672  * pointers of type "struct PendingRequest*"; these will *also* be
673  * aliased from the "requests_by_peer" data structures and the
674  * "requests_by_query" table.  Note that requests from our clients
675  * don't expire and are thus NOT in the "requests_by_expiration"
676  * (or the "requests_by_peer" tables).
677  */
678 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
679
680 /**
681  * Handle for reporting statistics.
682  */
683 static struct GNUNET_STATISTICS_Handle *stats;
684
685 /**
686  * Linked list of clients we are currently processing requests for.
687  */
688 static struct ClientList *client_list;
689
690 /**
691  * Pointer to handle to the core service (points to NULL until we've
692  * connected to it).
693  */
694 static struct GNUNET_CORE_Handle *core;
695
696 /**
697  * Head of linked list of blocks that can be migrated.
698  */
699 static struct MigrationReadyBlock *mig_head;
700
701 /**
702  * Tail of linked list of blocks that can be migrated.
703  */
704 static struct MigrationReadyBlock *mig_tail;
705
706 /**
707  * Request to datastore for migration (or NULL).
708  */
709 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
710
711 /**
712  * Where do we store trust information?
713  */
714 static char *trustDirectory;
715
716 /**
717  * ID of task that collects blocks for migration.
718  */
719 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
720
721 /**
722  * What is the maximum frequency at which we are allowed to
723  * poll the datastore for migration content?
724  */
725 static struct GNUNET_TIME_Relative min_migration_delay;
726
727 /**
728  * Handle for DHT operations.
729  */
730 static struct GNUNET_DHT_Handle *dht_handle;
731
732 /**
733  * Size of the doubly-linked list of migration blocks.
734  */
735 static unsigned int mig_size;
736
737 /**
738  * Are we allowed to migrate content to this peer.
739  */
740 static int active_migration;
741
742 /**
743  * Typical priorities we're seeing from other peers right now.  Since
744  * most priorities will be zero, this value is the weighted average of
745  * non-zero priorities seen "recently".  In order to ensure that new
746  * values do not dramatically change the ratio, values are first
747  * "capped" to a reasonable range (+N of the current value) and then
748  * averaged into the existing value by a ratio of 1:N.  Hence
749  * receiving the largest possible priority can still only raise our
750  * "current_priorities" by at most 1.
751  */
752 static double current_priorities;
753
754 /**
755  * Datastore load tracking.
756  */
757 static struct GNUNET_LOAD_Value *datastore_load;
758
759
760 /**
761  * We've just now completed a datastore request.  Update our
762  * datastore load calculations.
763  *
764  * @param start time when the datastore request was issued
765  */
766 static void
767 update_datastore_delays (struct GNUNET_TIME_Absolute start)
768 {
769   struct GNUNET_TIME_Relative delay;
770
771   delay = GNUNET_TIME_absolute_get_duration (start);
772   GNUNET_LOAD_update (datastore_load,
773                       delay.value);
774 }
775
776
777 /**
778  * Get the filename under which we would store the GNUNET_HELLO_Message
779  * for the given host and protocol.
780  * @return filename of the form DIRECTORY/HOSTID
781  */
782 static char *
783 get_trust_filename (const struct GNUNET_PeerIdentity *id)
784 {
785   struct GNUNET_CRYPTO_HashAsciiEncoded fil;
786   char *fn;
787
788   GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
789   GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil);
790   return fn;
791 }
792
793
794
795 /**
796  * Transmit messages by copying it to the target buffer
797  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
798  * for writing in the meantime.  In that case, do nothing
799  * (the disconnect or shutdown handler will take care of the rest).
800  * If we were able to transmit messages and there are still more
801  * pending, ask core again for further calls to this function.
802  *
803  * @param cls closure, pointer to the 'struct ConnectedPeer*'
804  * @param size number of bytes available in buf
805  * @param buf where the callee should write the message
806  * @return number of bytes written to buf
807  */
808 static size_t
809 transmit_to_peer (void *cls,
810                   size_t size, void *buf);
811
812
813 /* ******************* clean up functions ************************ */
814
815 /**
816  * Delete the given migration block.
817  *
818  * @param mb block to delete
819  */
820 static void
821 delete_migration_block (struct MigrationReadyBlock *mb)
822 {
823   GNUNET_CONTAINER_DLL_remove (mig_head,
824                                mig_tail,
825                                mb);
826   GNUNET_PEER_decrement_rcs (mb->target_list,
827                              MIGRATION_LIST_SIZE);
828   mig_size--;
829   GNUNET_free (mb);
830 }
831
832
833 /**
834  * Compare the distance of two peers to a key.
835  *
836  * @param key key
837  * @param p1 first peer
838  * @param p2 second peer
839  * @return GNUNET_YES if P1 is closer to key than P2
840  */
841 static int
842 is_closer (const GNUNET_HashCode *key,
843            const struct GNUNET_PeerIdentity *p1,
844            const struct GNUNET_PeerIdentity *p2)
845 {
846   return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey,
847                                     &p2->hashPubKey,
848                                     key);
849 }
850
851
852 /**
853  * Consider migrating content to a given peer.
854  *
855  * @param cls 'struct MigrationReadyBlock*' to select
856  *            targets for (or NULL for none)
857  * @param key ID of the peer 
858  * @param value 'struct ConnectedPeer' of the peer
859  * @return GNUNET_YES (always continue iteration)
860  */
861 static int
862 consider_migration (void *cls,
863                     const GNUNET_HashCode *key,
864                     void *value)
865 {
866   struct MigrationReadyBlock *mb = cls;
867   struct ConnectedPeer *cp = value;
868   struct MigrationReadyBlock *pos;
869   struct GNUNET_PeerIdentity cppid;
870   struct GNUNET_PeerIdentity otherpid;
871   struct GNUNET_PeerIdentity worstpid;
872   size_t msize;
873   unsigned int i;
874   unsigned int repl;
875   
876   /* consider 'cp' as a migration target for mb */
877   if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).value > 0)
878     return GNUNET_YES; /* peer has requested no migration! */
879   if (mb != NULL)
880     {
881       GNUNET_PEER_resolve (cp->pid,
882                            &cppid);
883       repl = MIGRATION_LIST_SIZE;
884       for (i=0;i<MIGRATION_LIST_SIZE;i++)
885         {
886           if (mb->target_list[i] == 0)
887             {
888               mb->target_list[i] = cp->pid;
889               GNUNET_PEER_change_rc (mb->target_list[i], 1);
890               repl = MIGRATION_LIST_SIZE;
891               break;
892             }
893           GNUNET_PEER_resolve (mb->target_list[i],
894                                &otherpid);
895           if ( (repl == MIGRATION_LIST_SIZE) &&
896                is_closer (&mb->query,
897                           &cppid,
898                           &otherpid)) 
899             {
900               repl = i;
901               worstpid = otherpid;
902             }
903           else if ( (repl != MIGRATION_LIST_SIZE) &&
904                     (is_closer (&mb->query,
905                                 &worstpid,
906                                 &otherpid) ) )
907             {
908               repl = i;
909               worstpid = otherpid;
910             }       
911         }
912       if (repl != MIGRATION_LIST_SIZE) 
913         {
914           GNUNET_PEER_change_rc (mb->target_list[repl], -1);
915           mb->target_list[repl] = cp->pid;
916           GNUNET_PEER_change_rc (mb->target_list[repl], 1);
917         }
918     }
919
920   /* consider scheduling transmission to cp for content migration */
921   if (cp->cth != NULL)
922     return GNUNET_YES; 
923   msize = 0;
924   pos = mig_head;
925   while (pos != NULL)
926     {
927       for (i=0;i<MIGRATION_LIST_SIZE;i++)
928         {
929           if (cp->pid == pos->target_list[i])
930             {
931               if (msize == 0)
932                 msize = pos->size;
933               else
934                 msize = GNUNET_MIN (msize,
935                                     pos->size);
936               break;
937             }
938         }
939       pos = pos->next;
940     }
941   if (msize == 0)
942     return GNUNET_YES; /* no content available */
943 #if DEBUG_FS
944   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
945               "Trying to migrate at least %u bytes to peer `%s'\n",
946               msize,
947               GNUNET_h2s (key));
948 #endif
949   cp->cth 
950     = GNUNET_CORE_notify_transmit_ready (core,
951                                          0, GNUNET_TIME_UNIT_FOREVER_REL,
952                                          (const struct GNUNET_PeerIdentity*) key,
953                                          msize + sizeof (struct PutMessage),
954                                          &transmit_to_peer,
955                                          cp);
956   return GNUNET_YES;
957 }
958
959
960 /**
961  * Task that is run periodically to obtain blocks for content
962  * migration
963  * 
964  * @param cls unused
965  * @param tc scheduler context (also unused)
966  */
967 static void
968 gather_migration_blocks (void *cls,
969                          const struct GNUNET_SCHEDULER_TaskContext *tc);
970
971
972 /**
973  * If the migration task is not currently running, consider
974  * (re)scheduling it with the appropriate delay.
975  */
976 static void
977 consider_migration_gathering ()
978 {
979   struct GNUNET_TIME_Relative delay;
980
981   if (dsh == NULL)
982     return;
983   if (mig_qe != NULL)
984     return;
985   if (mig_task != GNUNET_SCHEDULER_NO_TASK)
986     return;
987   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
988                                          mig_size);
989   delay = GNUNET_TIME_relative_divide (delay,
990                                        MAX_MIGRATION_QUEUE);
991   delay = GNUNET_TIME_relative_max (delay,
992                                     min_migration_delay);
993   mig_task = GNUNET_SCHEDULER_add_delayed (sched,
994                                            delay,
995                                            &gather_migration_blocks,
996                                            NULL);
997 }
998
999
1000 /**
1001  * Process content offered for migration.
1002  *
1003  * @param cls closure
1004  * @param key key for the content
1005  * @param size number of bytes in data
1006  * @param data content stored
1007  * @param type type of the content
1008  * @param priority priority of the content
1009  * @param anonymity anonymity-level for the content
1010  * @param expiration expiration time for the content
1011  * @param uid unique identifier for the datum;
1012  *        maybe 0 if no unique identifier is available
1013  */
1014 static void
1015 process_migration_content (void *cls,
1016                            const GNUNET_HashCode * key,
1017                            uint32_t size,
1018                            const void *data,
1019                            enum GNUNET_BLOCK_Type type,
1020                            uint32_t priority,
1021                            uint32_t anonymity,
1022                            struct GNUNET_TIME_Absolute
1023                            expiration, uint64_t uid)
1024 {
1025   struct MigrationReadyBlock *mb;
1026   
1027   if (key == NULL)
1028     {
1029       mig_qe = NULL;
1030       if (mig_size < MAX_MIGRATION_QUEUE)  
1031         consider_migration_gathering ();
1032       return;
1033     }
1034   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1035     {
1036       if (GNUNET_OK !=
1037           GNUNET_FS_handle_on_demand_block (key, size, data,
1038                                             type, priority, anonymity,
1039                                             expiration, uid, 
1040                                             &process_migration_content,
1041                                             NULL))
1042         {
1043           GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1044         }
1045       return;
1046     }
1047 #if DEBUG_FS
1048   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1049               "Retrieved block `%s' of type %u for migration\n",
1050               GNUNET_h2s (key),
1051               type);
1052 #endif
1053   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
1054   mb->query = *key;
1055   mb->expiration = expiration;
1056   mb->size = size;
1057   mb->type = type;
1058   memcpy (&mb[1], data, size);
1059   GNUNET_CONTAINER_DLL_insert_after (mig_head,
1060                                      mig_tail,
1061                                      mig_tail,
1062                                      mb);
1063   mig_size++;
1064   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1065                                          &consider_migration,
1066                                          mb);
1067   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1068 }
1069
1070
1071 /**
1072  * Task that is run periodically to obtain blocks for content
1073  * migration
1074  * 
1075  * @param cls unused
1076  * @param tc scheduler context (also unused)
1077  */
1078 static void
1079 gather_migration_blocks (void *cls,
1080                          const struct GNUNET_SCHEDULER_TaskContext *tc)
1081 {
1082   mig_task = GNUNET_SCHEDULER_NO_TASK;
1083   if (dsh != NULL)
1084     {
1085       mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, -1,
1086                                             GNUNET_TIME_UNIT_FOREVER_REL,
1087                                             &process_migration_content, NULL);
1088       GNUNET_assert (mig_qe != NULL);
1089     }
1090 }
1091
1092
1093 /**
1094  * We're done with a particular message list entry.
1095  * Free all associated resources.
1096  * 
1097  * @param pml entry to destroy
1098  */
1099 static void
1100 destroy_pending_message_list_entry (struct PendingMessageList *pml)
1101 {
1102   GNUNET_CONTAINER_DLL_remove (pml->req->pending_head,
1103                                pml->req->pending_tail,
1104                                pml);
1105   GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head,
1106                                pml->target->pending_messages_tail,
1107                                pml->pm);
1108   pml->target->pending_requests--;
1109   GNUNET_free (pml->pm);
1110   GNUNET_free (pml);
1111 }
1112
1113
1114 /**
1115  * Destroy the given pending message (and call the respective
1116  * continuation).
1117  *
1118  * @param pm message to destroy
1119  * @param tpid id of peer that the message was delivered to, or 0 for none
1120  */
1121 static void
1122 destroy_pending_message (struct PendingMessage *pm,
1123                          GNUNET_PEER_Id tpid)
1124 {
1125   struct PendingMessageList *pml = pm->pml;
1126   TransmissionContinuation cont;
1127   void *cont_cls;
1128
1129   GNUNET_assert (pml->pm == pm);
1130   GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
1131   cont = pm->cont;
1132   cont_cls = pm->cont_cls;
1133   destroy_pending_message_list_entry (pml);
1134   cont (cont_cls, tpid);  
1135 }
1136
1137
1138 /**
1139  * We're done processing a particular request.
1140  * Free all associated resources.
1141  *
1142  * @param pr request to destroy
1143  */
1144 static void
1145 destroy_pending_request (struct PendingRequest *pr)
1146 {
1147   struct GNUNET_PeerIdentity pid;
1148
1149   if (pr->hnode != NULL)
1150     {
1151       GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
1152                                          pr->hnode);
1153       pr->hnode = NULL;
1154     }
1155   if (NULL == pr->client_request_list)
1156     {
1157       GNUNET_STATISTICS_update (stats,
1158                                 gettext_noop ("# P2P searches active"),
1159                                 -1,
1160                                 GNUNET_NO);
1161     }
1162   else
1163     {
1164       GNUNET_STATISTICS_update (stats,
1165                                 gettext_noop ("# client searches active"),
1166                                 -1,
1167                                 GNUNET_NO);
1168     }
1169   /* might have already been removed from map in 'process_reply' (if
1170      there was a unique reply) or never inserted if it was a
1171      duplicate; hence ignore the return value here */
1172   (void) GNUNET_CONTAINER_multihashmap_remove (query_request_map,
1173                                                &pr->query,
1174                                                pr);
1175   if (pr->qe != NULL)
1176      {
1177       GNUNET_DATASTORE_cancel (pr->qe);
1178       pr->qe = NULL;
1179     }
1180   if (pr->client_request_list != NULL)
1181     {
1182       GNUNET_CONTAINER_DLL_remove (pr->client_request_list->client_list->rl_head,
1183                                    pr->client_request_list->client_list->rl_tail,
1184                                    pr->client_request_list);
1185       GNUNET_free (pr->client_request_list);
1186       pr->client_request_list = NULL;
1187     }
1188   if (pr->cp != NULL)
1189     {
1190       GNUNET_PEER_resolve (pr->cp->pid,
1191                            &pid);
1192       (void) GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1193                                                    &pid.hashPubKey,
1194                                                    pr);
1195       pr->cp = NULL;
1196     }
1197   if (pr->bf != NULL)
1198     {
1199       GNUNET_CONTAINER_bloomfilter_free (pr->bf);                                        
1200       pr->bf = NULL;
1201     }
1202   if (pr->irc != NULL)
1203     {
1204       GNUNET_CORE_peer_change_preference_cancel (pr->irc);
1205       pr->irc = NULL;
1206     }
1207   if (pr->replies_seen != NULL)
1208     {
1209       GNUNET_free (pr->replies_seen);
1210       pr->replies_seen = NULL;
1211     }
1212   if (pr->task != GNUNET_SCHEDULER_NO_TASK)
1213     {
1214       GNUNET_SCHEDULER_cancel (sched,
1215                                pr->task);
1216       pr->task = GNUNET_SCHEDULER_NO_TASK;
1217     }
1218   while (NULL != pr->pending_head)    
1219     destroy_pending_message_list_entry (pr->pending_head);
1220   GNUNET_PEER_change_rc (pr->target_pid, -1);
1221   if (pr->used_pids != NULL)
1222     {
1223       GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
1224       GNUNET_free (pr->used_pids);
1225       pr->used_pids_off = 0;
1226       pr->used_pids_size = 0;
1227       pr->used_pids = NULL;
1228     }
1229   GNUNET_free (pr);
1230 }
1231
1232
1233 /**
1234  * Method called whenever a given peer connects.
1235  *
1236  * @param cls closure, not used
1237  * @param peer peer identity this notification is about
1238  * @param latency reported latency of the connection with 'other'
1239  * @param distance reported distance (DV) to 'other' 
1240  */
1241 static void 
1242 peer_connect_handler (void *cls,
1243                       const struct
1244                       GNUNET_PeerIdentity * peer,
1245                       struct GNUNET_TIME_Relative latency,
1246                       uint32_t distance)
1247 {
1248   struct ConnectedPeer *cp;
1249   struct MigrationReadyBlock *pos;
1250   char *fn;
1251   uint32_t trust;
1252   
1253   cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
1254   cp->pid = GNUNET_PEER_intern (peer);
1255
1256   fn = get_trust_filename (peer);
1257   if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
1258       (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
1259     cp->disk_trust = cp->trust = ntohl (trust);
1260   GNUNET_free (fn);
1261
1262   GNUNET_break (GNUNET_OK ==
1263                 GNUNET_CONTAINER_multihashmap_put (connected_peers,
1264                                                    &peer->hashPubKey,
1265                                                    cp,
1266                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1267
1268   pos = mig_head;
1269   while (NULL != pos)
1270     {
1271       (void) consider_migration (pos, &peer->hashPubKey, cp);
1272       pos = pos->next;
1273     }
1274 }
1275
1276
1277 /**
1278  * Increase the host credit by a value.
1279  *
1280  * @param host which peer to change the trust value on
1281  * @param value is the int value by which the
1282  *  host credit is to be increased or decreased
1283  * @returns the actual change in trust (positive or negative)
1284  */
1285 static int
1286 change_host_trust (struct ConnectedPeer *host, int value)
1287 {
1288   unsigned int old_trust;
1289
1290   if (value == 0)
1291     return 0;
1292   GNUNET_assert (host != NULL);
1293   old_trust = host->trust;
1294   if (value > 0)
1295     {
1296       if (host->trust + value < host->trust)
1297         {
1298           value = UINT32_MAX - host->trust;
1299           host->trust = UINT32_MAX;
1300         }
1301       else
1302         host->trust += value;
1303     }
1304   else
1305     {
1306       if (host->trust < -value)
1307         {
1308           value = -host->trust;
1309           host->trust = 0;
1310         }
1311       else
1312         host->trust += value;
1313     }
1314   return value;
1315 }
1316
1317
1318 /**
1319  * Write host-trust information to a file - flush the buffer entry!
1320  */
1321 static int
1322 flush_trust (void *cls,
1323              const GNUNET_HashCode *key,
1324              void *value)
1325 {
1326   struct ConnectedPeer *host = value;
1327   char *fn;
1328   uint32_t trust;
1329   struct GNUNET_PeerIdentity pid;
1330
1331   if (host->trust == host->disk_trust)
1332     return GNUNET_OK;                     /* unchanged */
1333   GNUNET_PEER_resolve (host->pid,
1334                        &pid);
1335   fn = get_trust_filename (&pid);
1336   if (host->trust == 0)
1337     {
1338       if ((0 != UNLINK (fn)) && (errno != ENOENT))
1339         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
1340                                   GNUNET_ERROR_TYPE_BULK, "unlink", fn);
1341     }
1342   else
1343     {
1344       trust = htonl (host->trust);
1345       if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust, 
1346                                                     sizeof(uint32_t),
1347                                                     GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE
1348                                                     | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ))
1349         host->disk_trust = host->trust;
1350     }
1351   GNUNET_free (fn);
1352   return GNUNET_OK;
1353 }
1354
1355 /**
1356  * Call this method periodically to scan data/hosts for new hosts.
1357  */
1358 static void
1359 cron_flush_trust (void *cls,
1360                   const struct GNUNET_SCHEDULER_TaskContext *tc)
1361 {
1362
1363   if (NULL == connected_peers)
1364     return;
1365   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1366                                          &flush_trust,
1367                                          NULL);
1368   if (NULL == tc)
1369     return;
1370   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1371     return;
1372   GNUNET_SCHEDULER_add_delayed (tc->sched,
1373                                 TRUST_FLUSH_FREQ, &cron_flush_trust, NULL);
1374 }
1375
1376
1377 /**
1378  * Free (each) request made by the peer.
1379  *
1380  * @param cls closure, points to peer that the request belongs to
1381  * @param key current key code
1382  * @param value value in the hash map
1383  * @return GNUNET_YES (we should continue to iterate)
1384  */
1385 static int
1386 destroy_request (void *cls,
1387                  const GNUNET_HashCode * key,
1388                  void *value)
1389 {
1390   const struct GNUNET_PeerIdentity * peer = cls;
1391   struct PendingRequest *pr = value;
1392   
1393   GNUNET_break (GNUNET_YES ==
1394                 GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1395                                                       &peer->hashPubKey,
1396                                                       pr));
1397   destroy_pending_request (pr);
1398   return GNUNET_YES;
1399 }
1400
1401
1402 /**
1403  * Method called whenever a peer disconnects.
1404  *
1405  * @param cls closure, not used
1406  * @param peer peer identity this notification is about
1407  */
1408 static void
1409 peer_disconnect_handler (void *cls,
1410                          const struct
1411                          GNUNET_PeerIdentity * peer)
1412 {
1413   struct ConnectedPeer *cp;
1414   struct PendingMessage *pm;
1415   unsigned int i;
1416   struct MigrationReadyBlock *pos;
1417   struct MigrationReadyBlock *next;
1418
1419   GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map,
1420                                               &peer->hashPubKey,
1421                                               &destroy_request,
1422                                               (void*) peer);
1423   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1424                                           &peer->hashPubKey);
1425   if (cp == NULL)
1426     return;
1427   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1428     {
1429       if (NULL != cp->last_client_replies[i])
1430         {
1431           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1432           cp->last_client_replies[i] = NULL;
1433         }
1434     }
1435   GNUNET_break (GNUNET_YES ==
1436                 GNUNET_CONTAINER_multihashmap_remove (connected_peers,
1437                                                       &peer->hashPubKey,
1438                                                       cp));
1439   /* remove this peer from migration considerations; schedule
1440      alternatives */
1441   next = mig_head;
1442   while (NULL != (pos = next))
1443     {
1444       next = pos->next;
1445       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1446         {
1447           if (pos->target_list[i] == cp->pid)
1448             {
1449               GNUNET_PEER_change_rc (pos->target_list[i], -1);
1450               pos->target_list[i] = 0;
1451             }
1452          }
1453       if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers))
1454         {
1455           delete_migration_block (pos);
1456           consider_migration_gathering ();
1457           continue;
1458         }
1459       GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1460                                              &consider_migration,
1461                                              pos);
1462     }
1463   GNUNET_PEER_change_rc (cp->pid, -1);
1464   GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1465   if (NULL != cp->cth)
1466     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1467   while (NULL != (pm = cp->pending_messages_head))
1468     destroy_pending_message (pm, 0 /* delivery failed */);
1469   GNUNET_break (0 == cp->pending_requests);
1470   GNUNET_free (cp);
1471 }
1472
1473
1474 /**
1475  * Iterator over hash map entries that removes all occurences
1476  * of the given 'client' from the 'last_client_replies' of the
1477  * given connected peer.
1478  *
1479  * @param cls closure, the 'struct GNUNET_SERVER_Client*' to remove
1480  * @param key current key code (unused)
1481  * @param value value in the hash map (the 'struct ConnectedPeer*' to change)
1482  * @return GNUNET_YES (we should continue to iterate)
1483  */
1484 static int
1485 remove_client_from_last_client_replies (void *cls,
1486                                         const GNUNET_HashCode * key,
1487                                         void *value)
1488 {
1489   struct GNUNET_SERVER_Client *client = cls;
1490   struct ConnectedPeer *cp = value;
1491   unsigned int i;
1492
1493   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1494     {
1495       if (cp->last_client_replies[i] == client)
1496         {
1497           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1498           cp->last_client_replies[i] = NULL;
1499         }
1500     }  
1501   return GNUNET_YES;
1502 }
1503
1504
1505 /**
1506  * A client disconnected.  Remove all of its pending queries.
1507  *
1508  * @param cls closure, NULL
1509  * @param client identification of the client
1510  */
1511 static void
1512 handle_client_disconnect (void *cls,
1513                           struct GNUNET_SERVER_Client
1514                           * client)
1515 {
1516   struct ClientList *pos;
1517   struct ClientList *prev;
1518   struct ClientRequestList *rcl;
1519   struct ClientResponseMessage *creply;
1520
1521   if (client == NULL)
1522     return;
1523   prev = NULL;
1524   pos = client_list;
1525   while ( (NULL != pos) &&
1526           (pos->client != client) )
1527     {
1528       prev = pos;
1529       pos = pos->next;
1530     }
1531   if (pos == NULL)
1532     return; /* no requests pending for this client */
1533   while (NULL != (rcl = pos->rl_head))
1534     {
1535       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1536                   "Destroying pending request `%s' on disconnect\n",
1537                   GNUNET_h2s (&rcl->req->query));
1538       destroy_pending_request (rcl->req);
1539     }
1540   if (prev == NULL)
1541     client_list = pos->next;
1542   else
1543     prev->next = pos->next;
1544   if (pos->th != NULL)
1545     {
1546       GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
1547       pos->th = NULL;
1548     }
1549   while (NULL != (creply = pos->res_head))
1550     {
1551       GNUNET_CONTAINER_DLL_remove (pos->res_head,
1552                                    pos->res_tail,
1553                                    creply);
1554       GNUNET_free (creply);
1555     }    
1556   GNUNET_SERVER_client_drop (pos->client);
1557   GNUNET_free (pos);
1558   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1559                                          &remove_client_from_last_client_replies,
1560                                          client);
1561 }
1562
1563
1564 /**
1565  * Iterator to free peer entries.
1566  *
1567  * @param cls closure, unused
1568  * @param key current key code
1569  * @param value value in the hash map (peer entry)
1570  * @return GNUNET_YES (we should continue to iterate)
1571  */
1572 static int 
1573 clean_peer (void *cls,
1574             const GNUNET_HashCode * key,
1575             void *value)
1576 {
1577   peer_disconnect_handler (NULL, (const struct GNUNET_PeerIdentity*) key);
1578   return GNUNET_YES;
1579 }
1580
1581
1582 /**
1583  * Task run during shutdown.
1584  *
1585  * @param cls unused
1586  * @param tc unused
1587  */
1588 static void
1589 shutdown_task (void *cls,
1590                const struct GNUNET_SCHEDULER_TaskContext *tc)
1591 {
1592   if (mig_qe != NULL)
1593     {
1594       GNUNET_DATASTORE_cancel (mig_qe);
1595       mig_qe = NULL;
1596     }
1597   if (GNUNET_SCHEDULER_NO_TASK != mig_task)
1598     {
1599       GNUNET_SCHEDULER_cancel (sched, mig_task);
1600       mig_task = GNUNET_SCHEDULER_NO_TASK;
1601     }
1602   while (client_list != NULL)
1603     handle_client_disconnect (NULL,
1604                               client_list->client);
1605   cron_flush_trust (NULL, NULL);
1606   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1607                                          &clean_peer,
1608                                          NULL);
1609   GNUNET_break (0 == GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap));
1610   GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
1611   requests_by_expiration_heap = 0;
1612   GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
1613   connected_peers = NULL;
1614   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map));
1615   GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
1616   query_request_map = NULL;
1617   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map));
1618   GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
1619   peer_request_map = NULL;
1620   GNUNET_assert (NULL != core);
1621   GNUNET_CORE_disconnect (core);
1622   core = NULL;
1623   if (stats != NULL)
1624     {
1625       GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
1626       stats = NULL;
1627     }
1628   if (dsh != NULL)
1629     {
1630       GNUNET_DATASTORE_disconnect (dsh,
1631                                    GNUNET_NO);
1632       dsh = NULL;
1633     }
1634   while (mig_head != NULL)
1635     delete_migration_block (mig_head);
1636   GNUNET_assert (0 == mig_size);
1637   GNUNET_DHT_disconnect (dht_handle);
1638   dht_handle = NULL;
1639   GNUNET_LOAD_value_free (datastore_load);
1640   datastore_load = NULL;
1641   GNUNET_BLOCK_context_destroy (block_ctx);
1642   block_ctx = NULL;
1643   GNUNET_CONFIGURATION_destroy (block_cfg);
1644   block_cfg = NULL;
1645   sched = NULL;
1646   cfg = NULL;  
1647   GNUNET_free_non_null (trustDirectory);
1648   trustDirectory = NULL;
1649 }
1650
1651
1652 /* ******************* Utility functions  ******************** */
1653
1654
1655 /**
1656  * Transmit messages by copying it to the target buffer
1657  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
1658  * for writing in the meantime.  In that case, do nothing
1659  * (the disconnect or shutdown handler will take care of the rest).
1660  * If we were able to transmit messages and there are still more
1661  * pending, ask core again for further calls to this function.
1662  *
1663  * @param cls closure, pointer to the 'struct ConnectedPeer*'
1664  * @param size number of bytes available in buf
1665  * @param buf where the callee should write the message
1666  * @return number of bytes written to buf
1667  */
1668 static size_t
1669 transmit_to_peer (void *cls,
1670                   size_t size, void *buf)
1671 {
1672   struct ConnectedPeer *cp = cls;
1673   char *cbuf = buf;
1674   struct GNUNET_PeerIdentity pid;
1675   struct PendingMessage *pm;
1676   struct MigrationReadyBlock *mb;
1677   struct MigrationReadyBlock *next;
1678   struct PutMessage migm;
1679   size_t msize;
1680   unsigned int i;
1681  
1682   cp->cth = NULL;
1683   if (NULL == buf)
1684     {
1685 #if DEBUG_FS
1686       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1687                   "Dropping message, core too busy.\n");
1688 #endif
1689       return 0;
1690     }
1691   msize = 0;
1692   while ( (NULL != (pm = cp->pending_messages_head) ) &&
1693           (pm->msize <= size) )
1694     {
1695       memcpy (&cbuf[msize], &pm[1], pm->msize);
1696       msize += pm->msize;
1697       size -= pm->msize;
1698       destroy_pending_message (pm, cp->pid);
1699     }
1700   if (NULL != pm)
1701     {
1702       GNUNET_PEER_resolve (cp->pid,
1703                            &pid);
1704       cp->cth = GNUNET_CORE_notify_transmit_ready (core,
1705                                                    pm->priority,
1706                                                    GNUNET_CONSTANTS_SERVICE_TIMEOUT,
1707                                                    &pid,
1708                                                    pm->msize,
1709                                                    &transmit_to_peer,
1710                                                    cp);
1711     }
1712   else
1713     {      
1714       next = mig_head;
1715       while (NULL != (mb = next))
1716         {
1717           next = mb->next;
1718           for (i=0;i<MIGRATION_LIST_SIZE;i++)
1719             {
1720               if ( (cp->pid == mb->target_list[i]) &&
1721                    (mb->size + sizeof (migm) <= size) )
1722                 {
1723                   GNUNET_PEER_change_rc (mb->target_list[i], -1);
1724                   mb->target_list[i] = 0;
1725                   mb->used_targets++;
1726                   memset (&migm, 0, sizeof (migm));
1727                   migm.header.size = htons (sizeof (migm) + mb->size);
1728                   migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
1729                   migm.type = htonl (mb->type);
1730                   migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
1731                   memcpy (&cbuf[msize], &migm, sizeof (migm));
1732                   msize += sizeof (migm);
1733                   size -= sizeof (migm);
1734                   memcpy (&cbuf[msize], &mb[1], mb->size);
1735                   msize += mb->size;
1736                   size -= mb->size;
1737 #if DEBUG_FS
1738                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1739                               "Pushing migration block `%s' (%u bytes) to `%s'\n",
1740                               GNUNET_h2s (&mb->query),
1741                               mb->size,
1742                               GNUNET_i2s (&pid));
1743 #endif    
1744                   break;
1745                 }
1746               else
1747                 {
1748 #if DEBUG_FS
1749                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1750                               "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
1751                               GNUNET_h2s (&mb->query),
1752                               mb->size,
1753                               GNUNET_i2s (&pid));
1754 #endif    
1755                 }
1756             }
1757           if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
1758                (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
1759             {
1760               delete_migration_block (mb);
1761               consider_migration_gathering ();
1762             }
1763         }
1764       consider_migration (NULL, 
1765                           &pid.hashPubKey,
1766                           cp);
1767     }
1768 #if DEBUG_FS
1769   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1770               "Transmitting %u bytes to peer %u\n",
1771               msize,
1772               cp->pid);
1773 #endif
1774   return msize;
1775 }
1776
1777
1778 /**
1779  * Add a message to the set of pending messages for the given peer.
1780  *
1781  * @param cp peer to send message to
1782  * @param pm message to queue
1783  * @param pr request on which behalf this message is being queued
1784  */
1785 static void
1786 add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
1787                                   struct PendingMessage *pm,
1788                                   struct PendingRequest *pr)
1789 {
1790   struct PendingMessage *pos;
1791   struct PendingMessageList *pml;
1792   struct GNUNET_PeerIdentity pid;
1793
1794   GNUNET_assert (pm->next == NULL);
1795   GNUNET_assert (pm->pml == NULL);    
1796   pml = GNUNET_malloc (sizeof (struct PendingMessageList));
1797   pml->req = pr;
1798   pml->target = cp;
1799   pml->pm = pm;
1800   pm->pml = pml;  
1801   GNUNET_CONTAINER_DLL_insert (pr->pending_head,
1802                                pr->pending_tail,
1803                                pml);
1804   pos = cp->pending_messages_head;
1805   while ( (pos != NULL) &&
1806           (pm->priority < pos->priority) )
1807     pos = pos->next;    
1808   GNUNET_CONTAINER_DLL_insert_after (cp->pending_messages_head,
1809                                      cp->pending_messages_tail,
1810                                      pos,
1811                                      pm);
1812   cp->pending_requests++;
1813   if (cp->pending_requests > MAX_QUEUE_PER_PEER)
1814     destroy_pending_message (cp->pending_messages_tail, 0);  
1815   GNUNET_PEER_resolve (cp->pid, &pid);
1816   if (NULL != cp->cth)
1817     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1818   /* need to schedule transmission */
1819   cp->cth = GNUNET_CORE_notify_transmit_ready (core,
1820                                                cp->pending_messages_head->priority,
1821                                                MAX_TRANSMIT_DELAY,
1822                                                &pid,
1823                                                cp->pending_messages_head->msize,
1824                                                &transmit_to_peer,
1825                                                cp);
1826   if (cp->cth == NULL)
1827     {
1828 #if DEBUG_FS
1829       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1830                   "Failed to schedule transmission with core!\n");
1831 #endif
1832       GNUNET_STATISTICS_update (stats,
1833                                 gettext_noop ("# CORE transmission failures"),
1834                                 1,
1835                                 GNUNET_NO);
1836     }
1837 }
1838
1839
1840 /**
1841  * Test if the load on this peer is too high
1842  * to even consider processing the query at
1843  * all.
1844  * 
1845  * @return GNUNET_YES if the load is too high to do anything, GNUNET_NO to forward (load high, but not too high), GNUNET_SYSERR to indirect (load low)
1846  */
1847 static int
1848 test_load_too_high ()
1849 {
1850   return GNUNET_SYSERR; // FIXME
1851 }
1852
1853
1854 /* ******************* Pending Request Refresh Task ******************** */
1855
1856
1857
1858 /**
1859  * We use a random delay to make the timing of requests less
1860  * predictable.  This function returns such a random delay.  We add a base
1861  * delay of MAX_CORK_DELAY (1s).
1862  *
1863  * FIXME: make schedule dependent on the specifics of the request?
1864  * Or bandwidth and number of connected peers and load?
1865  *
1866  * @return random delay to use for some request, between 1s and 1000+TTL_DECREMENT ms
1867  */
1868 static struct GNUNET_TIME_Relative
1869 get_processing_delay ()
1870 {
1871   return 
1872     GNUNET_TIME_relative_add (GNUNET_CONSTANTS_MAX_CORK_DELAY,
1873                               GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
1874                                                              GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1875                                                                                        TTL_DECREMENT)));
1876 }
1877
1878
1879 /**
1880  * We're processing a GET request from another peer and have decided
1881  * to forward it to other peers.  This function is called periodically
1882  * and should forward the request to other peers until we have all
1883  * possible replies.  If we have transmitted the *only* reply to
1884  * the initiator we should destroy the pending request.  If we have
1885  * many replies in the queue to the initiator, we should delay sending
1886  * out more queries until the reply queue has shrunk some.
1887  *
1888  * @param cls our "struct ProcessGetContext *"
1889  * @param tc unused
1890  */
1891 static void
1892 forward_request_task (void *cls,
1893                       const struct GNUNET_SCHEDULER_TaskContext *tc);
1894
1895
1896 /**
1897  * Function called after we either failed or succeeded
1898  * at transmitting a query to a peer.  
1899  *
1900  * @param cls the requests "struct PendingRequest*"
1901  * @param tpid ID of receiving peer, 0 on transmission error
1902  */
1903 static void
1904 transmit_query_continuation (void *cls,
1905                              GNUNET_PEER_Id tpid)
1906 {
1907   struct PendingRequest *pr = cls;
1908
1909   GNUNET_STATISTICS_update (stats,
1910                             gettext_noop ("# queries scheduled for forwarding"),
1911                             -1,
1912                             GNUNET_NO);
1913   if (tpid == 0)   
1914     {
1915 #if DEBUG_FS
1916       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1917                   "Transmission of request failed, will try again later.\n");
1918 #endif
1919       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
1920         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1921                                                  get_processing_delay (),
1922                                                  &forward_request_task,
1923                                                  pr); 
1924       return;    
1925     }
1926   GNUNET_STATISTICS_update (stats,
1927                             gettext_noop ("# queries forwarded"),
1928                             1,
1929                             GNUNET_NO);
1930   GNUNET_PEER_change_rc (tpid, 1);
1931   if (pr->used_pids_off == pr->used_pids_size)
1932     GNUNET_array_grow (pr->used_pids,
1933                        pr->used_pids_size,
1934                        pr->used_pids_size * 2 + 2);
1935   pr->used_pids[pr->used_pids_off++] = tpid;
1936   if (pr->task == GNUNET_SCHEDULER_NO_TASK)
1937     pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1938                                              get_processing_delay (),
1939                                              &forward_request_task,
1940                                              pr);
1941 }
1942
1943
1944 /**
1945  * How many bytes should a bloomfilter be if we have already seen
1946  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
1947  * of bits set per entry.  Furthermore, we should not re-size the
1948  * filter too often (to keep it cheap).
1949  *
1950  * Since other peers will also add entries but not resize the filter,
1951  * we should generally pick a slightly larger size than what the
1952  * strict math would suggest.
1953  *
1954  * @return must be a power of two and smaller or equal to 2^15.
1955  */
1956 static size_t
1957 compute_bloomfilter_size (unsigned int entry_count)
1958 {
1959   size_t size;
1960   unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
1961   uint16_t max = 1 << 15;
1962
1963   if (entry_count > max)
1964     return max;
1965   size = 8;
1966   while ((size < max) && (size < ideal))
1967     size *= 2;
1968   if (size > max)
1969     return max;
1970   return size;
1971 }
1972
1973
1974 /**
1975  * Recalculate our bloom filter for filtering replies.  This function
1976  * will create a new bloom filter from scratch, so it should only be
1977  * called if we have no bloomfilter at all (and hence can create a
1978  * fresh one of minimal size without problems) OR if our peer is the
1979  * initiator (in which case we may resize to larger than mimimum size).
1980  *
1981  * @param pr request for which the BF is to be recomputed
1982  */
1983 static void
1984 refresh_bloomfilter (struct PendingRequest *pr)
1985 {
1986   unsigned int i;
1987   size_t nsize;
1988   GNUNET_HashCode mhash;
1989
1990   nsize = compute_bloomfilter_size (pr->replies_seen_off);
1991   if (nsize == pr->bf_size)
1992     return; /* size not changed */
1993   if (pr->bf != NULL)
1994     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
1995   pr->bf_size = nsize;
1996   pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
1997   pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
1998                                               pr->bf_size,
1999                                               BLOOMFILTER_K);
2000   for (i=0;i<pr->replies_seen_off;i++)
2001     {
2002       GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i],
2003                                 pr->mingle,
2004                                 &mhash);
2005       GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
2006     }
2007 }
2008
2009
2010 /**
2011  * Function called after we've tried to reserve a certain amount of
2012  * bandwidth for a reply.  Check if we succeeded and if so send our
2013  * query.
2014  *
2015  * @param cls the requests "struct PendingRequest*"
2016  * @param peer identifies the peer
2017  * @param bpm_in set to the current bandwidth limit (receiving) for this peer
2018  * @param bpm_out set to the current bandwidth limit (sending) for this peer
2019  * @param amount set to the amount that was actually reserved or unreserved
2020  * @param preference current traffic preference for the given peer
2021  */
2022 static void
2023 target_reservation_cb (void *cls,
2024                        const struct
2025                        GNUNET_PeerIdentity * peer,
2026                        struct GNUNET_BANDWIDTH_Value32NBO bpm_in,
2027                        struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
2028                        int amount,
2029                        uint64_t preference)
2030 {
2031   struct PendingRequest *pr = cls;
2032   struct ConnectedPeer *cp;
2033   struct PendingMessage *pm;
2034   struct GetMessage *gm;
2035   GNUNET_HashCode *ext;
2036   char *bfdata;
2037   size_t msize;
2038   unsigned int k;
2039   int no_route;
2040   uint32_t bm;
2041
2042   pr->irc = NULL;
2043   if (peer == NULL)
2044     {
2045       /* error in communication with core, try again later */
2046       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2047         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2048                                                  get_processing_delay (),
2049                                                  &forward_request_task,
2050                                                  pr);
2051       return;
2052     }
2053   // (3) transmit, update ttl/priority
2054   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2055                                           &peer->hashPubKey);
2056   if (cp == NULL)
2057     {
2058       /* Peer must have just left */
2059 #if DEBUG_FS
2060       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2061                   "Selected peer disconnected!\n");
2062 #endif
2063       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2064         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2065                                                  get_processing_delay (),
2066                                                  &forward_request_task,
2067                                                  pr);
2068       return;
2069     }
2070   no_route = GNUNET_NO;
2071   if (amount == 0)
2072     {
2073       if (pr->cp == NULL)
2074         {
2075 #if DEBUG_FS > 1
2076           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2077                       "Failed to reserve bandwidth for reply (got %d/%u bytes only)!\n",
2078                       amount,
2079                       DBLOCK_SIZE);
2080 #endif
2081           GNUNET_STATISTICS_update (stats,
2082                                     gettext_noop ("# reply bandwidth reservation requests failed"),
2083                                     1,
2084                                     GNUNET_NO);
2085           if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2086             pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2087                                                      get_processing_delay (),
2088                                                      &forward_request_task,
2089                                                      pr);
2090           return;  /* this target round failed */
2091         }
2092       /* FIXME: if we are "quite" busy, we may still want to skip
2093          this round; need more load detection code! */
2094       no_route = GNUNET_YES;
2095     }
2096   
2097   GNUNET_STATISTICS_update (stats,
2098                             gettext_noop ("# queries scheduled for forwarding"),
2099                             1,
2100                             GNUNET_NO);
2101   /* build message and insert message into priority queue */
2102 #if DEBUG_FS
2103   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2104               "Forwarding request `%s' to `%4s'!\n",
2105               GNUNET_h2s (&pr->query),
2106               GNUNET_i2s (peer));
2107 #endif
2108   k = 0;
2109   bm = 0;
2110   if (GNUNET_YES == no_route)
2111     {
2112       bm |= GET_MESSAGE_BIT_RETURN_TO;
2113       k++;      
2114     }
2115   if (pr->namespace != NULL)
2116     {
2117       bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
2118       k++;
2119     }
2120   if (pr->target_pid != 0)
2121     {
2122       bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
2123       k++;
2124     }
2125   msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
2126   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
2127   pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
2128   pm->msize = msize;
2129   gm = (struct GetMessage*) &pm[1];
2130   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
2131   gm->header.size = htons (msize);
2132   gm->type = htonl (pr->type);
2133   pr->remaining_priority /= 2;
2134   gm->priority = htonl (pr->remaining_priority);
2135   gm->ttl = htonl (pr->ttl);
2136   gm->filter_mutator = htonl(pr->mingle); 
2137   gm->hash_bitmap = htonl (bm);
2138   gm->query = pr->query;
2139   ext = (GNUNET_HashCode*) &gm[1];
2140   k = 0;
2141   if (GNUNET_YES == no_route)
2142     GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2143   if (pr->namespace != NULL)
2144     memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
2145   if (pr->target_pid != 0)
2146     GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2147   bfdata = (char *) &ext[k];
2148   if (pr->bf != NULL)
2149     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
2150                                                bfdata,
2151                                                pr->bf_size);
2152   pm->cont = &transmit_query_continuation;
2153   pm->cont_cls = pr;
2154   add_to_pending_messages_for_peer (cp, pm, pr);
2155 }
2156
2157
2158 /**
2159  * Closure used for "target_peer_select_cb".
2160  */
2161 struct PeerSelectionContext 
2162 {
2163   /**
2164    * The request for which we are selecting
2165    * peers.
2166    */
2167   struct PendingRequest *pr;
2168
2169   /**
2170    * Current "prime" target.
2171    */
2172   struct GNUNET_PeerIdentity target;
2173
2174   /**
2175    * How much do we like this target?
2176    */
2177   double target_score;
2178
2179 };
2180
2181
2182 /**
2183  * Function called for each connected peer to determine
2184  * which one(s) would make good targets for forwarding.
2185  *
2186  * @param cls closure (struct PeerSelectionContext)
2187  * @param key current key code (peer identity)
2188  * @param value value in the hash map (struct ConnectedPeer)
2189  * @return GNUNET_YES if we should continue to
2190  *         iterate,
2191  *         GNUNET_NO if not.
2192  */
2193 static int
2194 target_peer_select_cb (void *cls,
2195                        const GNUNET_HashCode * key,
2196                        void *value)
2197 {
2198   struct PeerSelectionContext *psc = cls;
2199   struct ConnectedPeer *cp = value;
2200   struct PendingRequest *pr = psc->pr;
2201   double score;
2202   unsigned int i;
2203   unsigned int pc;
2204
2205   /* 1) check that this peer is not the initiator */
2206   if (cp == pr->cp)
2207     {
2208 #if DEBUG_FS
2209       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2210                   "Skipping initiator in forwarding selection\n");
2211 #endif
2212       return GNUNET_YES; /* skip */        
2213     }
2214
2215   /* 2) check if we have already (recently) forwarded to this peer */
2216   pc = 0;
2217   for (i=0;i<pr->used_pids_off;i++)
2218     if (pr->used_pids[i] == cp->pid) 
2219       {
2220         pc++;
2221         if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2222                                            RETRY_PROBABILITY_INV))
2223           {
2224 #if DEBUG_FS
2225             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2226                         "NOT re-trying query that was previously transmitted %u times\n",
2227                         (unsigned int) pr->used_pids_off);
2228 #endif
2229             return GNUNET_YES; /* skip */
2230           }
2231       }
2232 #if DEBUG_FS
2233   if (0 < pc)
2234     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2235                 "Re-trying query that was previously transmitted %u times to this peer\n",
2236                 (unsigned int) pc);
2237 #endif
2238   /* 3) calculate how much we'd like to forward to this peer,
2239      starting with a random value that is strong enough
2240      to at least give any peer a chance sometimes 
2241      (compared to the other factors that come later) */
2242   /* 3a) count successful (recent) routes from cp for same source */
2243   if (pr->cp != NULL)
2244     {
2245       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2246                                         P2P_SUCCESS_LIST_SIZE);
2247       for (i=0;i<P2P_SUCCESS_LIST_SIZE;i++)
2248         if (cp->last_p2p_replies[i] == pr->cp->pid)
2249           score += 1; /* likely successful based on hot path */
2250     }
2251   else
2252     {
2253       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2254                                         CS2P_SUCCESS_LIST_SIZE);
2255       for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
2256         if (cp->last_client_replies[i] == pr->client_request_list->client_list->client)
2257           score += 1; /* likely successful based on hot path */
2258     }
2259   /* 3b) include latency */
2260   if (cp->avg_delay.value < 4 * TTL_DECREMENT)
2261     score += 1; /* likely fast based on latency */
2262   /* 3c) include priorities */
2263   if (cp->avg_priority <= pr->remaining_priority / 2.0)
2264     score += 1; /* likely successful based on priorities */
2265   /* 3d) penalize for queue size */  
2266   score -= (2.0 * cp->pending_requests / (double) MAX_QUEUE_PER_PEER); 
2267   /* 3e) include peer proximity */
2268   score -= (2.0 * (GNUNET_CRYPTO_hash_distance_u32 (key,
2269                                                     &pr->query)) / (double) UINT32_MAX);
2270   /* 4) super-bonus for being the known target */
2271   if (pr->target_pid == cp->pid)
2272     score += 100.0;
2273   /* store best-fit in closure */
2274 #if DEBUG_FS
2275   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2276               "Peer `%s' gets score %f for forwarding query, max is %f\n",
2277               GNUNET_h2s (key),
2278               score,
2279               psc->target_score);
2280 #endif  
2281   score++; /* avoid zero */
2282   if (score > psc->target_score)
2283     {
2284       psc->target_score = score;
2285       psc->target.hashPubKey = *key; 
2286     }
2287   return GNUNET_YES;
2288 }
2289   
2290
2291 /**
2292  * The priority level imposes a bound on the maximum
2293  * value for the ttl that can be requested.
2294  *
2295  * @param ttl_in requested ttl
2296  * @param prio given priority
2297  * @return ttl_in if ttl_in is below the limit,
2298  *         otherwise the ttl-limit for the given priority
2299  */
2300 static int32_t
2301 bound_ttl (int32_t ttl_in, uint32_t prio)
2302 {
2303   unsigned long long allowed;
2304
2305   if (ttl_in <= 0)
2306     return ttl_in;
2307   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
2308   if (ttl_in > allowed)      
2309     {
2310       if (allowed >= (1 << 30))
2311         return 1 << 30;
2312       return allowed;
2313     }
2314   return ttl_in;
2315 }
2316
2317
2318 /**
2319  * We're processing a GET request and have decided
2320  * to forward it to other peers.  This function is called periodically
2321  * and should forward the request to other peers until we have all
2322  * possible replies.  If we have transmitted the *only* reply to
2323  * the initiator we should destroy the pending request.  If we have
2324  * many replies in the queue to the initiator, we should delay sending
2325  * out more queries until the reply queue has shrunk some.
2326  *
2327  * @param cls our "struct ProcessGetContext *"
2328  * @param tc unused
2329  */
2330 static void
2331 forward_request_task (void *cls,
2332                      const struct GNUNET_SCHEDULER_TaskContext *tc)
2333 {
2334   struct PendingRequest *pr = cls;
2335   struct PeerSelectionContext psc;
2336   struct ConnectedPeer *cp; 
2337   struct GNUNET_TIME_Relative delay;
2338
2339   pr->task = GNUNET_SCHEDULER_NO_TASK;
2340   if (pr->irc != NULL)
2341     {
2342 #if DEBUG_FS
2343       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2344                   "Forwarding of query `%s' not attempted due to pending local lookup!\n",
2345                   GNUNET_h2s (&pr->query));
2346 #endif
2347       return; /* already pending */
2348     }
2349   if (GNUNET_YES == pr->local_only)
2350     return; /* configured to not do P2P search */
2351   /* (0) try DHT */
2352   if (0 == pr->anonymity_level)
2353     {
2354 #if 0      
2355       /* DHT API needs fixing... */
2356       pr->dht_get = GNUNET_DHT_get_start (dht_handle,
2357                                           GNUNET_TIME_UNIT_FOREVER_REL,
2358                                           pr->type,
2359                                           &pr->query,
2360                                           &process_dht_reply,
2361                                           pr,
2362                                           FIXME,
2363                                           FIXME);
2364 #endif                                    
2365     }
2366   /* (1) select target */
2367   psc.pr = pr;
2368   psc.target_score = -DBL_MAX;
2369   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
2370                                          &target_peer_select_cb,
2371                                          &psc);  
2372   if (psc.target_score == -DBL_MAX)
2373     {
2374       delay = get_processing_delay ();
2375 #if DEBUG_FS 
2376       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2377                   "No peer selected for forwarding of query `%s', will try again in %llu ms!\n",
2378                   GNUNET_h2s (&pr->query),
2379                   delay.value);
2380 #endif
2381       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2382                                                delay,
2383                                                &forward_request_task,
2384                                                pr);
2385       return; /* nobody selected */
2386     }
2387   /* (3) update TTL/priority */
2388   if (pr->client_request_list != NULL)
2389     {
2390       /* FIXME: use better algorithm!? */
2391       if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2392                                          4))
2393         pr->priority++;
2394       /* bound priority we use by priorities we see from other peers
2395          rounded up (must round up so that we can see non-zero
2396          priorities, but round up as little as possible to make it
2397          plausible that we forwarded another peers request) */
2398       if (pr->priority > current_priorities + 1.0)
2399         pr->priority = (uint32_t) current_priorities + 1.0;
2400       pr->ttl = bound_ttl (pr->ttl + TTL_DECREMENT * 2,
2401                            pr->priority);
2402 #if DEBUG_FS
2403       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2404                   "Trying query `%s' with priority %u and TTL %d.\n",
2405                   GNUNET_h2s (&pr->query),
2406                   pr->priority,
2407                   pr->ttl);
2408 #endif
2409     }
2410
2411   /* (3) reserve reply bandwidth */
2412   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2413                                           &psc.target.hashPubKey);
2414   GNUNET_assert (NULL != cp);
2415   pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
2416                                                 &psc.target,
2417                                                 GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
2418                                                 GNUNET_BANDWIDTH_value_init (UINT32_MAX),
2419                                                 DBLOCK_SIZE * 2, 
2420                                                 cp->inc_preference,
2421                                                 &target_reservation_cb,
2422                                                 pr);
2423   cp->inc_preference = 0;
2424 }
2425
2426
2427 /* **************************** P2P PUT Handling ************************ */
2428
2429
2430 /**
2431  * Function called after we either failed or succeeded
2432  * at transmitting a reply to a peer.  
2433  *
2434  * @param cls the requests "struct PendingRequest*"
2435  * @param tpid ID of receiving peer, 0 on transmission error
2436  */
2437 static void
2438 transmit_reply_continuation (void *cls,
2439                              GNUNET_PEER_Id tpid)
2440 {
2441   struct PendingRequest *pr = cls;
2442   
2443   switch (pr->type)
2444     {
2445     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
2446     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
2447       /* only one reply expected, done with the request! */
2448       destroy_pending_request (pr);
2449       break;
2450     case GNUNET_BLOCK_TYPE_ANY:
2451     case GNUNET_BLOCK_TYPE_FS_KBLOCK:
2452     case GNUNET_BLOCK_TYPE_FS_SBLOCK:
2453       break;
2454     default:
2455       GNUNET_break (0);
2456       break;
2457     }
2458 }
2459
2460
2461 /**
2462  * Transmit the given message by copying it to the target buffer
2463  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
2464  * for writing in the meantime.  In that case, do nothing
2465  * (the disconnect or shutdown handler will take care of the rest).
2466  * If we were able to transmit messages and there are still more
2467  * pending, ask core again for further calls to this function.
2468  *
2469  * @param cls closure, pointer to the 'struct ClientList*'
2470  * @param size number of bytes available in buf
2471  * @param buf where the callee should write the message
2472  * @return number of bytes written to buf
2473  */
2474 static size_t
2475 transmit_to_client (void *cls,
2476                   size_t size, void *buf)
2477 {
2478   struct ClientList *cl = cls;
2479   char *cbuf = buf;
2480   struct ClientResponseMessage *creply;
2481   size_t msize;
2482   
2483   cl->th = NULL;
2484   if (NULL == buf)
2485     {
2486 #if DEBUG_FS
2487       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2488                   "Not sending reply, client communication problem.\n");
2489 #endif
2490       return 0;
2491     }
2492   msize = 0;
2493   while ( (NULL != (creply = cl->res_head) ) &&
2494           (creply->msize <= size) )
2495     {
2496       memcpy (&cbuf[msize], &creply[1], creply->msize);
2497       msize += creply->msize;
2498       size -= creply->msize;
2499       GNUNET_CONTAINER_DLL_remove (cl->res_head,
2500                                    cl->res_tail,
2501                                    creply);
2502       GNUNET_free (creply);
2503     }
2504   if (NULL != creply)
2505     cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
2506                                                   creply->msize,
2507                                                   GNUNET_TIME_UNIT_FOREVER_REL,
2508                                                   &transmit_to_client,
2509                                                   cl);
2510 #if DEBUG_FS
2511   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2512               "Transmitted %u bytes to client\n",
2513               (unsigned int) msize);
2514 #endif
2515   return msize;
2516 }
2517
2518
2519 /**
2520  * Closure for "process_reply" function.
2521  */
2522 struct ProcessReplyClosure
2523 {
2524   /**
2525    * The data for the reply.
2526    */
2527   const void *data;
2528
2529   /**
2530    * Who gave us this reply? NULL for local host.
2531    */
2532   struct ConnectedPeer *sender;
2533
2534   /**
2535    * When the reply expires.
2536    */
2537   struct GNUNET_TIME_Absolute expiration;
2538
2539   /**
2540    * Size of data.
2541    */
2542   size_t size;
2543
2544   /**
2545    * Type of the block.
2546    */
2547   enum GNUNET_BLOCK_Type type;
2548
2549   /**
2550    * How much was this reply worth to us?
2551    */
2552   uint32_t priority;
2553
2554   /**
2555    * Evaluation result (returned).
2556    */
2557   enum GNUNET_BLOCK_EvaluationResult eval;
2558
2559   /**
2560    * Did we finish processing the associated request?
2561    */ 
2562   int finished;
2563 };
2564
2565
2566 /**
2567  * We have received a reply; handle it!
2568  *
2569  * @param cls response (struct ProcessReplyClosure)
2570  * @param key our query
2571  * @param value value in the hash map (info about the query)
2572  * @return GNUNET_YES (we should continue to iterate)
2573  */
2574 static int
2575 process_reply (void *cls,
2576                const GNUNET_HashCode * key,
2577                void *value)
2578 {
2579   struct ProcessReplyClosure *prq = cls;
2580   struct PendingRequest *pr = value;
2581   struct PendingMessage *reply;
2582   struct ClientResponseMessage *creply;
2583   struct ClientList *cl;
2584   struct PutMessage *pm;
2585   struct ConnectedPeer *cp;
2586   struct GNUNET_TIME_Relative cur_delay;
2587   size_t msize;
2588
2589 #if DEBUG_FS
2590   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2591               "Matched result (type %u) for query `%s' with pending request\n",
2592               (unsigned int) prq->type,
2593               GNUNET_h2s (key));
2594 #endif  
2595   GNUNET_STATISTICS_update (stats,
2596                             gettext_noop ("# replies received and matched"),
2597                             1,
2598                             GNUNET_NO);
2599   if (prq->sender != NULL)
2600     {
2601       /* FIXME: should we be more precise here and not use
2602          "start_time" but a peer-specific time stamp? */
2603       cur_delay = GNUNET_TIME_absolute_get_duration (pr->start_time);
2604       prq->sender->avg_delay.value
2605         = (prq->sender->avg_delay.value * 
2606            (RUNAVG_DELAY_N - 1) + cur_delay.value) / RUNAVG_DELAY_N; 
2607       prq->sender->avg_priority
2608         = (prq->sender->avg_priority * 
2609            (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
2610       if (pr->cp != NULL)
2611         {
2612           GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
2613                                  [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], 
2614                                  -1);
2615           GNUNET_PEER_change_rc (pr->cp->pid, 1);
2616           prq->sender->last_p2p_replies
2617             [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
2618             = pr->cp->pid;
2619         }
2620       else
2621         {
2622           if (NULL != prq->sender->last_client_replies
2623               [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
2624             GNUNET_SERVER_client_drop (prq->sender->last_client_replies
2625                                        [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
2626           prq->sender->last_client_replies
2627             [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
2628             = pr->client_request_list->client_list->client;
2629           GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
2630         }
2631     }
2632   prq->eval = GNUNET_BLOCK_evaluate (block_ctx,
2633                                      prq->type,
2634                                      key,
2635                                      &pr->bf,
2636                                      pr->mingle,
2637                                      pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
2638                                      prq->data,
2639                                      prq->size);
2640   switch (prq->eval)
2641     {
2642     case GNUNET_BLOCK_EVALUATION_OK_MORE:
2643       break;
2644     case GNUNET_BLOCK_EVALUATION_OK_LAST:
2645       while (NULL != pr->pending_head)
2646         destroy_pending_message_list_entry (pr->pending_head);
2647       if (pr->qe != NULL)
2648         {
2649           if (pr->client_request_list != NULL)
2650             GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
2651                                         GNUNET_YES);
2652           GNUNET_DATASTORE_cancel (pr->qe);
2653           pr->qe = NULL;
2654         }
2655       pr->do_remove = GNUNET_YES;
2656       if (pr->task != GNUNET_SCHEDULER_NO_TASK)
2657         {
2658           GNUNET_SCHEDULER_cancel (sched,
2659                                    pr->task);
2660           pr->task = GNUNET_SCHEDULER_NO_TASK;
2661         }
2662       GNUNET_break (GNUNET_YES ==
2663                     GNUNET_CONTAINER_multihashmap_remove (query_request_map,
2664                                                           key,
2665                                                           pr));
2666       break;
2667     case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
2668       GNUNET_STATISTICS_update (stats,
2669                                 gettext_noop ("# duplicate replies discarded (bloomfilter)"),
2670                                 1,
2671                                 GNUNET_NO);
2672 #if DEBUG_FS
2673       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2674                   "Duplicate response `%s', discarding.\n",
2675                   GNUNET_h2s (&mhash));
2676 #endif
2677       return GNUNET_YES; /* duplicate */
2678     case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
2679       return GNUNET_YES; /* wrong namespace */  
2680     case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
2681       GNUNET_break (0);
2682       return GNUNET_YES;
2683     case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
2684       GNUNET_break (0);
2685       return GNUNET_YES;
2686     case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
2687       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2688                   _("Unsupported block type %u\n"),
2689                   prq->type);
2690       return GNUNET_NO;
2691     }
2692   if (pr->client_request_list != NULL)
2693     {
2694       if (pr->replies_seen_size == pr->replies_seen_off)
2695         GNUNET_array_grow (pr->replies_seen,
2696                            pr->replies_seen_size,
2697                            pr->replies_seen_size * 2 + 4);      
2698       GNUNET_CRYPTO_hash (prq->data,
2699                           prq->size,
2700                           &pr->replies_seen[pr->replies_seen_off++]);         
2701       refresh_bloomfilter (pr);
2702     }
2703   if (NULL == prq->sender)
2704     {
2705 #if DEBUG_FS
2706       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2707                   "Found result for query `%s' in local datastore\n",
2708                   GNUNET_h2s (key));
2709 #endif
2710       GNUNET_STATISTICS_update (stats,
2711                                 gettext_noop ("# results found locally"),
2712                                 1,
2713                                 GNUNET_NO);      
2714     }
2715   prq->priority += pr->remaining_priority;
2716   pr->remaining_priority = 0;
2717   pr->results_found++;
2718   if (NULL != pr->client_request_list)
2719     {
2720       GNUNET_STATISTICS_update (stats,
2721                                 gettext_noop ("# replies received for local clients"),
2722                                 1,
2723                                 GNUNET_NO);
2724       cl = pr->client_request_list->client_list;
2725       msize = sizeof (struct PutMessage) + prq->size;
2726       creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage));
2727       creply->msize = msize;
2728       creply->client_list = cl;
2729       GNUNET_CONTAINER_DLL_insert_after (cl->res_head,
2730                                          cl->res_tail,
2731                                          cl->res_tail,
2732                                          creply);      
2733       pm = (struct PutMessage*) &creply[1];
2734       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2735       pm->header.size = htons (msize);
2736       pm->type = htonl (prq->type);
2737       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
2738       memcpy (&pm[1], prq->data, prq->size);      
2739       if (NULL == cl->th)
2740         {
2741 #if DEBUG_FS
2742           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2743                       "Transmitting result for query `%s' to client\n",
2744                       GNUNET_h2s (key));
2745 #endif  
2746           cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
2747                                                         msize,
2748                                                         GNUNET_TIME_UNIT_FOREVER_REL,
2749                                                         &transmit_to_client,
2750                                                         cl);
2751         }
2752       GNUNET_break (cl->th != NULL);
2753       if (pr->do_remove)                
2754         {
2755           prq->finished = GNUNET_YES;
2756           destroy_pending_request (pr);         
2757         }
2758     }
2759   else
2760     {
2761       cp = pr->cp;
2762 #if DEBUG_FS
2763       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2764                   "Transmitting result for query `%s' to other peer (PID=%u)\n",
2765                   GNUNET_h2s (key),
2766                   (unsigned int) cp->pid);
2767 #endif  
2768       GNUNET_STATISTICS_update (stats,
2769                                 gettext_noop ("# replies received for other peers"),
2770                                 1,
2771                                 GNUNET_NO);
2772       msize = sizeof (struct PutMessage) + prq->size;
2773       reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
2774       reply->cont = &transmit_reply_continuation;
2775       reply->cont_cls = pr;
2776       reply->msize = msize;
2777       reply->priority = UINT32_MAX; /* send replies first! */
2778       pm = (struct PutMessage*) &reply[1];
2779       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2780       pm->header.size = htons (msize);
2781       pm->type = htonl (prq->type);
2782       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
2783       memcpy (&pm[1], prq->data, prq->size);
2784       add_to_pending_messages_for_peer (cp, reply, pr);
2785     }
2786   return GNUNET_YES;
2787 }
2788
2789
2790 /**
2791  * Continuation called to notify client about result of the
2792  * operation.
2793  *
2794  * @param cls closure
2795  * @param success GNUNET_SYSERR on failure
2796  * @param msg NULL on success, otherwise an error message
2797  */
2798 static void 
2799 put_migration_continuation (void *cls,
2800                             int success,
2801                             const char *msg)
2802 {
2803   /* FIXME */
2804 }
2805
2806
2807 /**
2808  * Handle P2P "PUT" message.
2809  *
2810  * @param cls closure, always NULL
2811  * @param other the other peer involved (sender or receiver, NULL
2812  *        for loopback messages where we are both sender and receiver)
2813  * @param message the actual message
2814  * @param latency reported latency of the connection with 'other'
2815  * @param distance reported distance (DV) to 'other' 
2816  * @return GNUNET_OK to keep the connection open,
2817  *         GNUNET_SYSERR to close it (signal serious error)
2818  */
2819 static int
2820 handle_p2p_put (void *cls,
2821                 const struct GNUNET_PeerIdentity *other,
2822                 const struct GNUNET_MessageHeader *message,
2823                 struct GNUNET_TIME_Relative latency,
2824                 uint32_t distance)
2825 {
2826   const struct PutMessage *put;
2827   uint16_t msize;
2828   size_t dsize;
2829   enum GNUNET_BLOCK_Type type;
2830   struct GNUNET_TIME_Absolute expiration;
2831   GNUNET_HashCode query;
2832   struct ProcessReplyClosure prq;
2833
2834   msize = ntohs (message->size);
2835   if (msize < sizeof (struct PutMessage))
2836     {
2837       GNUNET_break_op(0);
2838       return GNUNET_SYSERR;
2839     }
2840   put = (const struct PutMessage*) message;
2841   dsize = msize - sizeof (struct PutMessage);
2842   type = ntohl (put->type);
2843   expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
2844
2845   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
2846     return GNUNET_SYSERR;
2847   if (GNUNET_OK !=
2848       GNUNET_BLOCK_get_key (block_ctx,
2849                             type,
2850                             &put[1],
2851                             dsize,
2852                             &query))
2853     {
2854       GNUNET_break_op (0);
2855       return GNUNET_SYSERR;
2856     }
2857 #if DEBUG_FS
2858   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2859               "Received result for query `%s' from peer `%4s'\n",
2860               GNUNET_h2s (&query),
2861               GNUNET_i2s (other));
2862 #endif
2863   GNUNET_STATISTICS_update (stats,
2864                             gettext_noop ("# replies received (overall)"),
2865                             1,
2866                             GNUNET_NO);
2867   /* now, lookup 'query' */
2868   prq.data = (const void*) &put[1];
2869   if (other != NULL)
2870     prq.sender = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2871                                                     &other->hashPubKey);
2872   else
2873     prq.sender = NULL;
2874   prq.size = dsize;
2875   prq.type = type;
2876   prq.expiration = expiration;
2877   prq.priority = 0;
2878   prq.finished = GNUNET_NO;
2879   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
2880                                               &query,
2881                                               &process_reply,
2882                                               &prq);
2883   if (prq.sender != NULL)
2884     {
2885       prq.sender->inc_preference += CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority;
2886       prq.sender->trust += prq.priority;
2887     }
2888   if (GNUNET_YES == active_migration)
2889     {
2890 #if DEBUG_FS
2891       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2892                   "Replicating result for query `%s' with priority %u\n",
2893                   GNUNET_h2s (&query),
2894                   prq.priority);
2895 #endif
2896       GNUNET_DATASTORE_put (dsh,
2897                             0, &query, dsize, &put[1],
2898                             type, prq.priority, 1 /* anonymity */, 
2899                             expiration, 
2900                             1 + prq.priority, MAX_DATASTORE_QUEUE,
2901                             GNUNET_CONSTANTS_SERVICE_TIMEOUT,
2902                             &put_migration_continuation, 
2903                             NULL);
2904     }
2905   return GNUNET_OK;
2906 }
2907
2908
2909 /**
2910  * Handle P2P "MIGRATION_STOP" message.
2911  *
2912  * @param cls closure, always NULL
2913  * @param other the other peer involved (sender or receiver, NULL
2914  *        for loopback messages where we are both sender and receiver)
2915  * @param message the actual message
2916  * @param latency reported latency of the connection with 'other'
2917  * @param distance reported distance (DV) to 'other' 
2918  * @return GNUNET_OK to keep the connection open,
2919  *         GNUNET_SYSERR to close it (signal serious error)
2920  */
2921 static int
2922 handle_p2p_migration_stop (void *cls,
2923                            const struct GNUNET_PeerIdentity *other,
2924                            const struct GNUNET_MessageHeader *message,
2925                            struct GNUNET_TIME_Relative latency,
2926                            uint32_t distance)
2927 {
2928   // FIXME!
2929 }
2930
2931
2932
2933 /* **************************** P2P GET Handling ************************ */
2934
2935
2936 /**
2937  * Closure for 'check_duplicate_request_{peer,client}'.
2938  */
2939 struct CheckDuplicateRequestClosure
2940 {
2941   /**
2942    * The new request we should check if it already exists.
2943    */
2944   const struct PendingRequest *pr;
2945
2946   /**
2947    * Existing request found by the checker, NULL if none.
2948    */
2949   struct PendingRequest *have;
2950 };
2951
2952
2953 /**
2954  * Iterator over entries in the 'query_request_map' that
2955  * tries to see if we have the same request pending from
2956  * the same client already.
2957  *
2958  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
2959  * @param key current key code (query, ignored, must match)
2960  * @param value value in the hash map (a 'struct PendingRequest' 
2961  *              that already exists)
2962  * @return GNUNET_YES if we should continue to
2963  *         iterate (no match yet)
2964  *         GNUNET_NO if not (match found).
2965  */
2966 static int
2967 check_duplicate_request_client (void *cls,
2968                                 const GNUNET_HashCode * key,
2969                                 void *value)
2970 {
2971   struct CheckDuplicateRequestClosure *cdc = cls;
2972   struct PendingRequest *have = value;
2973
2974   if (have->client_request_list == NULL)
2975     return GNUNET_YES;
2976   if ( (cdc->pr->client_request_list->client_list->client == have->client_request_list->client_list->client) &&
2977        (cdc->pr != have) )
2978     {
2979       cdc->have = have;
2980       return GNUNET_NO;
2981     }
2982   return GNUNET_YES;
2983 }
2984
2985
2986 /**
2987  * We're processing (local) results for a search request
2988  * from another peer.  Pass applicable results to the
2989  * peer and if we are done either clean up (operation
2990  * complete) or forward to other peers (more results possible).
2991  *
2992  * @param cls our closure (struct LocalGetContext)
2993  * @param key key for the content
2994  * @param size number of bytes in data
2995  * @param data content stored
2996  * @param type type of the content
2997  * @param priority priority of the content
2998  * @param anonymity anonymity-level for the content
2999  * @param expiration expiration time for the content
3000  * @param uid unique identifier for the datum;
3001  *        maybe 0 if no unique identifier is available
3002  */
3003 static void
3004 process_local_reply (void *cls,
3005                      const GNUNET_HashCode * key,
3006                      uint32_t size,
3007                      const void *data,
3008                      enum GNUNET_BLOCK_Type type,
3009                      uint32_t priority,
3010                      uint32_t anonymity,
3011                      struct GNUNET_TIME_Absolute
3012                      expiration, 
3013                      uint64_t uid)
3014 {
3015   struct PendingRequest *pr = cls;
3016   struct ProcessReplyClosure prq;
3017   struct CheckDuplicateRequestClosure cdrc;
3018   GNUNET_HashCode query;
3019   unsigned int old_rf;
3020   
3021   if (NULL == key)
3022     {
3023 #if DEBUG_FS > 1
3024       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3025                   "Done processing local replies, forwarding request to other peers.\n");
3026 #endif
3027       pr->qe = NULL;
3028       if (pr->client_request_list != NULL)
3029         {
3030           GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
3031                                       GNUNET_YES);
3032           /* Figure out if this is a duplicate request and possibly
3033              merge 'struct PendingRequest' entries */
3034           cdrc.have = NULL;
3035           cdrc.pr = pr;
3036           GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3037                                                       &pr->query,
3038                                                       &check_duplicate_request_client,
3039                                                       &cdrc);
3040           if (cdrc.have != NULL)
3041             {
3042 #if DEBUG_FS
3043               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3044                           "Received request for block `%s' twice from client, will only request once.\n",
3045                           GNUNET_h2s (&pr->query));
3046 #endif
3047               
3048               destroy_pending_request (pr);
3049               return;
3050             }
3051         }
3052
3053       /* no more results */
3054       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
3055         pr->task = GNUNET_SCHEDULER_add_now (sched,
3056                                              &forward_request_task,
3057                                              pr);      
3058       return;
3059     }
3060 #if DEBUG_FS
3061   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3062               "New local response to `%s' of type %u.\n",
3063               GNUNET_h2s (key),
3064               type);
3065 #endif
3066   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3067     {
3068 #if DEBUG_FS
3069       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3070                   "Found ONDEMAND block, performing on-demand encoding\n");
3071 #endif
3072       GNUNET_STATISTICS_update (stats,
3073                                 gettext_noop ("# on-demand blocks matched requests"),
3074                                 1,
3075                                 GNUNET_NO);
3076       if (GNUNET_OK != 
3077           GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, 
3078                                             anonymity, expiration, uid, 
3079                                             &process_local_reply,
3080                                             pr))
3081       if (pr->qe != NULL)
3082         {
3083           GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3084         }
3085       return;
3086     }
3087   old_rf = pr->results_found;
3088   memset (&prq, 0, sizeof (prq));
3089   prq.data = data;
3090   prq.expiration = expiration;
3091   prq.size = size;  
3092   if (GNUNET_OK != 
3093       GNUNET_BLOCK_get_key (block_ctx,
3094                             type,
3095                             data,
3096                             size,
3097                             &query))
3098     {
3099       GNUNET_break (0);
3100       GNUNET_DATASTORE_remove (dsh,
3101                                key,
3102                                size, data,
3103                                -1, -1, 
3104                                GNUNET_TIME_UNIT_FOREVER_REL,
3105                                NULL, NULL);
3106       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3107       return;
3108     }
3109   prq.type = type;
3110   prq.priority = priority;  
3111   prq.finished = GNUNET_NO;
3112   process_reply (&prq, key, pr);
3113   if ( (old_rf == 0) &&
3114        (pr->results_found == 1) )
3115     update_datastore_delays (pr->start_time);
3116   if (prq.finished == GNUNET_YES)
3117     return;
3118   if (pr->qe == NULL)
3119     return; /* done here */
3120   if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
3121     {
3122       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3123       return;
3124     }
3125   if ( (pr->client_request_list == NULL) &&
3126        ( (GNUNET_YES == test_load_too_high()) ||
3127          (pr->results_found > 5 + 2 * pr->priority) ) )
3128     {
3129 #if DEBUG_FS > 2
3130       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3131                   "Load too high, done with request\n");
3132 #endif
3133       GNUNET_STATISTICS_update (stats,
3134                                 gettext_noop ("# processing result set cut short due to load"),
3135                                 1,
3136                                 GNUNET_NO);
3137       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3138       return;
3139     }
3140   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3141 }
3142
3143
3144 /**
3145  * We've received a request with the specified priority.  Bound it
3146  * according to how much we trust the given peer.
3147  * 
3148  * @param prio_in requested priority
3149  * @param cp the peer making the request
3150  * @return effective priority
3151  */
3152 static uint32_t
3153 bound_priority (uint32_t prio_in,
3154                 struct ConnectedPeer *cp)
3155 {
3156 #define N ((double)128.0)
3157   uint32_t ret;
3158   double rret;
3159   int ld;
3160
3161   ld = test_load_too_high ();
3162   if (ld == GNUNET_SYSERR)
3163     return 0; /* excess resources */
3164   ret = change_host_trust (cp, prio_in);
3165   if (ret > 0)
3166     {
3167       if (ret > current_priorities + N)
3168         rret = current_priorities + N;
3169       else
3170         rret = ret;
3171       current_priorities 
3172         = (current_priorities * (N-1) + rret)/N;
3173     }
3174 #undef N
3175   return ret;
3176 }
3177
3178
3179 /**
3180  * Iterator over entries in the 'query_request_map' that
3181  * tries to see if we have the same request pending from
3182  * the same peer already.
3183  *
3184  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
3185  * @param key current key code (query, ignored, must match)
3186  * @param value value in the hash map (a 'struct PendingRequest' 
3187  *              that already exists)
3188  * @return GNUNET_YES if we should continue to
3189  *         iterate (no match yet)
3190  *         GNUNET_NO if not (match found).
3191  */
3192 static int
3193 check_duplicate_request_peer (void *cls,
3194                               const GNUNET_HashCode * key,
3195                               void *value)
3196 {
3197   struct CheckDuplicateRequestClosure *cdc = cls;
3198   struct PendingRequest *have = value;
3199
3200   if (cdc->pr->target_pid == have->target_pid)
3201     {
3202       cdc->have = have;
3203       return GNUNET_NO;
3204     }
3205   return GNUNET_YES;
3206 }
3207
3208
3209 /**
3210  * Handle P2P "GET" request.
3211  *
3212  * @param cls closure, always NULL
3213  * @param other the other peer involved (sender or receiver, NULL
3214  *        for loopback messages where we are both sender and receiver)
3215  * @param message the actual message
3216  * @param latency reported latency of the connection with 'other'
3217  * @param distance reported distance (DV) to 'other' 
3218  * @return GNUNET_OK to keep the connection open,
3219  *         GNUNET_SYSERR to close it (signal serious error)
3220  */
3221 static int
3222 handle_p2p_get (void *cls,
3223                 const struct GNUNET_PeerIdentity *other,
3224                 const struct GNUNET_MessageHeader *message,
3225                 struct GNUNET_TIME_Relative latency,
3226                 uint32_t distance)
3227 {
3228   struct PendingRequest *pr;
3229   struct ConnectedPeer *cp;
3230   struct ConnectedPeer *cps;
3231   struct CheckDuplicateRequestClosure cdc;
3232   struct GNUNET_TIME_Relative timeout;
3233   uint16_t msize;
3234   const struct GetMessage *gm;
3235   unsigned int bits;
3236   const GNUNET_HashCode *opt;
3237   uint32_t bm;
3238   size_t bfsize;
3239   uint32_t ttl_decrement;
3240   enum GNUNET_BLOCK_Type type;
3241   int have_ns;
3242   int ld;
3243
3244   msize = ntohs(message->size);
3245   if (msize < sizeof (struct GetMessage))
3246     {
3247       GNUNET_break_op (0);
3248       return GNUNET_SYSERR;
3249     }
3250   gm = (const struct GetMessage*) message;
3251   type = ntohl (gm->type);
3252   bm = ntohl (gm->hash_bitmap);
3253   bits = 0;
3254   while (bm > 0)
3255     {
3256       if (1 == (bm & 1))
3257         bits++;
3258       bm >>= 1;
3259     }
3260   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
3261     {
3262       GNUNET_break_op (0);
3263       return GNUNET_SYSERR;
3264     }  
3265   opt = (const GNUNET_HashCode*) &gm[1];
3266   bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
3267   bm = ntohl (gm->hash_bitmap);
3268   bits = 0;
3269   cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3270                                            &other->hashPubKey);
3271   if (NULL == cps)
3272     {
3273       /* peer must have just disconnected */
3274       GNUNET_STATISTICS_update (stats,
3275                                 gettext_noop ("# requests dropped due to initiator not being connected"),
3276                                 1,
3277                                 GNUNET_NO);
3278       return GNUNET_SYSERR;
3279     }
3280   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
3281     cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3282                                             &opt[bits++]);
3283   else
3284     cp = cps;
3285   if (cp == NULL)
3286     {
3287 #if DEBUG_FS
3288       if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
3289         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3290                     "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
3291                     GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
3292       
3293       else
3294         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3295                     "Failed to find peer `%4s' in connection set. Dropping query.\n",
3296                     GNUNET_i2s (other));
3297 #endif
3298       GNUNET_STATISTICS_update (stats,
3299                                 gettext_noop ("# requests dropped due to missing reverse route"),
3300                                 1,
3301                                 GNUNET_NO);
3302      /* FIXME: try connect? */
3303       return GNUNET_OK;
3304     }
3305   /* note that we can really only check load here since otherwise
3306      peers could find out that we are overloaded by not being
3307      disconnected after sending us a malformed query... */
3308
3309   /* FIXME: query priority should play
3310      a major role here! */
3311   ld = test_load_too_high ();
3312   if (GNUNET_YES == ld)
3313     {
3314 #if DEBUG_FS
3315       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3316                   "Dropping query from `%s', this peer is too busy.\n",
3317                   GNUNET_i2s (other));
3318 #endif
3319       GNUNET_STATISTICS_update (stats,
3320                                 gettext_noop ("# requests dropped due to high load"),
3321                                 1,
3322                                 GNUNET_NO);
3323       return GNUNET_OK;
3324     }
3325   /* FIXME: if ld == GNUNET_NO, forward
3326      instead of indirecting! */
3327
3328 #if DEBUG_FS 
3329   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3330               "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
3331               GNUNET_h2s (&gm->query),
3332               (unsigned int) type,
3333               GNUNET_i2s (other),
3334               (unsigned int) bm);
3335 #endif
3336   have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
3337   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
3338                       (have_ns ? sizeof(GNUNET_HashCode) : 0));
3339   if (have_ns)
3340     {
3341       pr->namespace = (GNUNET_HashCode*) &pr[1];
3342       memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
3343     }
3344   pr->type = type;
3345   pr->mingle = ntohl (gm->filter_mutator);
3346   if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
3347     pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
3348   pr->anonymity_level = 1;
3349   pr->priority = bound_priority (ntohl (gm->priority), cps);
3350   pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
3351   pr->query = gm->query;
3352   /* decrement ttl (always) */
3353   ttl_decrement = 2 * TTL_DECREMENT +
3354     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3355                               TTL_DECREMENT);
3356   if ( (pr->ttl < 0) &&
3357        (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
3358     {
3359 #if DEBUG_FS
3360       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3361                   "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
3362                   GNUNET_i2s (other),
3363                   pr->ttl,
3364                   ttl_decrement);
3365 #endif
3366       GNUNET_STATISTICS_update (stats,
3367                                 gettext_noop ("# requests dropped due TTL underflow"),
3368                                 1,
3369                                 GNUNET_NO);
3370       /* integer underflow => drop (should be very rare)! */      
3371       GNUNET_free (pr);
3372       return GNUNET_OK;
3373     } 
3374   pr->ttl -= ttl_decrement;
3375   pr->start_time = GNUNET_TIME_absolute_get ();
3376
3377   /* get bloom filter */
3378   if (bfsize > 0)
3379     {
3380       pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
3381                                                   bfsize,
3382                                                   BLOOMFILTER_K);
3383       pr->bf_size = bfsize;
3384     }
3385
3386   cdc.have = NULL;
3387   cdc.pr = pr;
3388   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3389                                               &gm->query,
3390                                               &check_duplicate_request_peer,
3391                                               &cdc);
3392   if (cdc.have != NULL)
3393     {
3394       if (cdc.have->start_time.value + cdc.have->ttl >=
3395           pr->start_time.value + pr->ttl)
3396         {
3397           /* existing request has higher TTL, drop new one! */
3398           cdc.have->priority += pr->priority;
3399           destroy_pending_request (pr);
3400 #if DEBUG_FS
3401           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3402                       "Have existing request with higher TTL, dropping new request.\n",
3403                       GNUNET_i2s (other));
3404 #endif
3405           GNUNET_STATISTICS_update (stats,
3406                                     gettext_noop ("# requests dropped due to higher-TTL request"),
3407                                     1,
3408                                     GNUNET_NO);
3409           return GNUNET_OK;
3410         }
3411       else
3412         {
3413           /* existing request has lower TTL, drop old one! */
3414           pr->priority += cdc.have->priority;
3415           /* Possible optimization: if we have applicable pending
3416              replies in 'cdc.have', we might want to move those over
3417              (this is a really rare special-case, so it is not clear
3418              that this would be worth it) */
3419           destroy_pending_request (cdc.have);
3420           /* keep processing 'pr'! */
3421         }
3422     }
3423
3424   pr->cp = cp;
3425   GNUNET_break (GNUNET_OK ==
3426                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
3427                                                    &gm->query,
3428                                                    pr,
3429                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
3430   GNUNET_break (GNUNET_OK ==
3431                 GNUNET_CONTAINER_multihashmap_put (peer_request_map,
3432                                                    &other->hashPubKey,
3433                                                    pr,
3434                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
3435   
3436   pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
3437                                             pr,
3438                                             pr->start_time.value + pr->ttl);
3439
3440   GNUNET_STATISTICS_update (stats,
3441                             gettext_noop ("# P2P searches received"),
3442                             1,
3443                             GNUNET_NO);
3444   GNUNET_STATISTICS_update (stats,
3445                             gettext_noop ("# P2P searches active"),
3446                             1,
3447                             GNUNET_NO);
3448
3449   /* calculate change in traffic preference */
3450   cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
3451   /* process locally */
3452   if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
3453     type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
3454   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
3455                                            (pr->priority + 1)); 
3456   pr->qe = GNUNET_DATASTORE_get (dsh,
3457                                  &gm->query,
3458                                  type,                         
3459                                  pr->priority + 1,
3460                                  MAX_DATASTORE_QUEUE,                            
3461                                  timeout,
3462                                  &process_local_reply,
3463                                  pr);
3464
3465   /* Are multiple results possible?  If so, start processing remotely now! */
3466   switch (pr->type)
3467     {
3468     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
3469     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
3470       /* only one result, wait for datastore */
3471       break;
3472     default:
3473       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
3474         pr->task = GNUNET_SCHEDULER_add_now (sched,
3475                                              &forward_request_task,
3476                                              pr);
3477     }
3478
3479   /* make sure we don't track too many requests */
3480   if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
3481     {
3482       pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
3483       GNUNET_assert (pr != NULL);
3484       destroy_pending_request (pr);
3485     }
3486   return GNUNET_OK;
3487 }
3488
3489
3490 /* **************************** CS GET Handling ************************ */
3491
3492
3493 /**
3494  * Handle START_SEARCH-message (search request from client).
3495  *
3496  * @param cls closure
3497  * @param client identification of the client
3498  * @param message the actual message
3499  */
3500 static void
3501 handle_start_search (void *cls,
3502                      struct GNUNET_SERVER_Client *client,
3503                      const struct GNUNET_MessageHeader *message)
3504 {
3505   static GNUNET_HashCode all_zeros;
3506   const struct SearchMessage *sm;
3507   struct ClientList *cl;
3508   struct ClientRequestList *crl;
3509   struct PendingRequest *pr;
3510   uint16_t msize;
3511   unsigned int sc;
3512   enum GNUNET_BLOCK_Type type;
3513
3514   msize = ntohs (message->size);
3515   if ( (msize < sizeof (struct SearchMessage)) ||
3516        (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
3517     {
3518       GNUNET_break (0);
3519       GNUNET_SERVER_receive_done (client,
3520                                   GNUNET_SYSERR);
3521       return;
3522     }
3523   GNUNET_STATISTICS_update (stats,
3524                             gettext_noop ("# client searches received"),
3525                             1,
3526                             GNUNET_NO);
3527   sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
3528   sm = (const struct SearchMessage*) message;
3529   type = ntohl (sm->type);
3530 #if DEBUG_FS
3531   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3532               "Received request for `%s' of type %u from local client\n",
3533               GNUNET_h2s (&sm->query),
3534               (unsigned int) type);
3535 #endif
3536   cl = client_list;
3537   while ( (cl != NULL) &&
3538           (cl->client != client) )
3539     cl = cl->next;
3540   if (cl == NULL)
3541     {
3542       cl = GNUNET_malloc (sizeof (struct ClientList));
3543       cl->client = client;
3544       GNUNET_SERVER_client_keep (client);
3545       cl->next = client_list;
3546       client_list = cl;
3547     }
3548   /* detect duplicate KBLOCK requests */
3549   if ( (type == GNUNET_BLOCK_TYPE_FS_KBLOCK) ||
3550        (type == GNUNET_BLOCK_TYPE_FS_NBLOCK) ||
3551        (type == GNUNET_BLOCK_TYPE_ANY) )
3552     {
3553       crl = cl->rl_head;
3554       while ( (crl != NULL) &&
3555               ( (0 != memcmp (&crl->req->query,
3556                               &sm->query,
3557                               sizeof (GNUNET_HashCode))) ||
3558                 (crl->req->type != type) ) )
3559         crl = crl->next;
3560       if (crl != NULL)  
3561         { 
3562 #if DEBUG_FS
3563           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3564                       "Have existing request, merging content-seen lists.\n");
3565 #endif
3566           pr = crl->req;
3567           /* Duplicate request (used to send long list of
3568              known/blocked results); merge 'pr->replies_seen'
3569              and update bloom filter */
3570           GNUNET_array_grow (pr->replies_seen,
3571                              pr->replies_seen_size,
3572                              pr->replies_seen_off + sc);
3573           memcpy (&pr->replies_seen[pr->replies_seen_off],
3574                   &sm[1],
3575                   sc * sizeof (GNUNET_HashCode));
3576           pr->replies_seen_off += sc;
3577           refresh_bloomfilter (pr);
3578           GNUNET_STATISTICS_update (stats,
3579                                     gettext_noop ("# client searches updated (merged content seen list)"),
3580                                     1,
3581                                     GNUNET_NO);
3582           GNUNET_SERVER_receive_done (client,
3583                                       GNUNET_OK);
3584           return;
3585         }
3586     }
3587   GNUNET_STATISTICS_update (stats,
3588                             gettext_noop ("# client searches active"),
3589                             1,
3590                             GNUNET_NO);
3591   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
3592                       ((type == GNUNET_BLOCK_TYPE_FS_SBLOCK) ? sizeof(GNUNET_HashCode) : 0));
3593   crl = GNUNET_malloc (sizeof (struct ClientRequestList));
3594   memset (crl, 0, sizeof (struct ClientRequestList));
3595   crl->client_list = cl;
3596   GNUNET_CONTAINER_DLL_insert (cl->rl_head,
3597                                cl->rl_tail,
3598                                crl);  
3599   crl->req = pr;
3600   pr->type = type;
3601   pr->client_request_list = crl;
3602   GNUNET_array_grow (pr->replies_seen,
3603                      pr->replies_seen_size,
3604                      sc);
3605   memcpy (pr->replies_seen,
3606           &sm[1],
3607           sc * sizeof (GNUNET_HashCode));
3608   pr->replies_seen_off = sc;
3609   pr->anonymity_level = ntohl (sm->anonymity_level); 
3610   pr->start_time = GNUNET_TIME_absolute_get ();
3611   refresh_bloomfilter (pr);
3612   pr->query = sm->query;
3613   if (0 == (1 & ntohl (sm->options)))
3614     pr->local_only = GNUNET_NO;
3615   else
3616     pr->local_only = GNUNET_YES;
3617   switch (type)
3618     {
3619     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
3620     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
3621       if (0 != memcmp (&sm->target,
3622                        &all_zeros,
3623                        sizeof (GNUNET_HashCode)))
3624         pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target);
3625       break;
3626     case GNUNET_BLOCK_TYPE_FS_SBLOCK:
3627       pr->namespace = (GNUNET_HashCode*) &pr[1];
3628       memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode));
3629       break;
3630     default:
3631       break;
3632     }
3633   GNUNET_break (GNUNET_OK ==
3634                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
3635                                                    &sm->query,
3636                                                    pr,
3637                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
3638   if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
3639     type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */
3640   pr->qe = GNUNET_DATASTORE_get (dsh,
3641                                  &sm->query,
3642                                  type,
3643                                  -3, -1,
3644                                  GNUNET_CONSTANTS_SERVICE_TIMEOUT,                             
3645                                  &process_local_reply,
3646                                  pr);
3647 }
3648
3649
3650 /* **************************** Startup ************************ */
3651
3652 /**
3653  * Process fs requests.
3654  *
3655  * @param s scheduler to use
3656  * @param server the initialized server
3657  * @param c configuration to use
3658  */
3659 static int
3660 main_init (struct GNUNET_SCHEDULER_Handle *s,
3661            struct GNUNET_SERVER_Handle *server,
3662            const struct GNUNET_CONFIGURATION_Handle *c)
3663 {
3664   static const struct GNUNET_CORE_MessageHandler p2p_handlers[] =
3665     {
3666       { &handle_p2p_get, 
3667         GNUNET_MESSAGE_TYPE_FS_GET, 0 },
3668       { &handle_p2p_put, 
3669         GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
3670       { &handle_p2p_migration_stop, 
3671         GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP,
3672         sizeof (struct MigrationStopMessage) },
3673       { NULL, 0, 0 }
3674     };
3675   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
3676     {&GNUNET_FS_handle_index_start, NULL, 
3677      GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
3678     {&GNUNET_FS_handle_index_list_get, NULL, 
3679      GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
3680     {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
3681      sizeof (struct UnindexMessage) },
3682     {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
3683      0 },
3684     {NULL, NULL, 0, 0}
3685   };
3686   unsigned long long enc = 128;
3687
3688   sched = s;
3689   cfg = c;
3690   stats = GNUNET_STATISTICS_create (sched, "fs", cfg);
3691   min_migration_delay = GNUNET_TIME_UNIT_SECONDS;
3692   if ( (GNUNET_OK !=
3693         GNUNET_CONFIGURATION_get_value_number (cfg,
3694                                                "fs",
3695                                                "MAX_PENDING_REQUESTS",
3696                                                &max_pending_requests)) ||
3697        (GNUNET_OK !=
3698         GNUNET_CONFIGURATION_get_value_number (cfg,
3699                                                "fs",
3700                                                "EXPECTED_NEIGHBOUR_COUNT",
3701                                                &enc)) ||
3702        (GNUNET_OK != 
3703         GNUNET_CONFIGURATION_get_value_time (cfg,
3704                                              "fs",
3705                                              "MIN_MIGRATION_DELAY",
3706                                              &min_migration_delay)) )
3707     {
3708       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3709                   _("Configuration fails to specify certain parameters, assuming default values."));
3710     }
3711   connected_peers = GNUNET_CONTAINER_multihashmap_create (enc); 
3712   query_request_map = GNUNET_CONTAINER_multihashmap_create (max_pending_requests);
3713   peer_request_map = GNUNET_CONTAINER_multihashmap_create (enc);
3714   requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
3715   core = GNUNET_CORE_connect (sched,
3716                               cfg,
3717                               GNUNET_TIME_UNIT_FOREVER_REL,
3718                               NULL,
3719                               NULL,
3720                               &peer_connect_handler,
3721                               &peer_disconnect_handler,
3722                               NULL,
3723                               NULL, GNUNET_NO,
3724                               NULL, GNUNET_NO,
3725                               p2p_handlers);
3726   if (NULL == core)
3727     {
3728       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3729                   _("Failed to connect to `%s' service.\n"),
3730                   "core");
3731       GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
3732       connected_peers = NULL;
3733       GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
3734       query_request_map = NULL;
3735       GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
3736       requests_by_expiration_heap = NULL;
3737       GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
3738       peer_request_map = NULL;
3739       if (dsh != NULL)
3740         {
3741           GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
3742           dsh = NULL;
3743         }
3744       return GNUNET_SYSERR;
3745     }
3746   /* FIXME: distinguish between sending and storing in options? */
3747   if (active_migration) 
3748     {
3749       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3750                   _("Content migration is enabled, will start to gather data\n"));
3751       consider_migration_gathering ();
3752     }
3753   GNUNET_SERVER_disconnect_notify (server, 
3754                                    &handle_client_disconnect,
3755                                    NULL);
3756   GNUNET_assert (GNUNET_OK ==
3757                  GNUNET_CONFIGURATION_get_value_filename (cfg,
3758                                                           "fs",
3759                                                           "TRUST",
3760                                                           &trustDirectory));
3761   GNUNET_DISK_directory_create (trustDirectory);
3762   GNUNET_SCHEDULER_add_with_priority (sched,
3763                                       GNUNET_SCHEDULER_PRIORITY_HIGH,
3764                                       &cron_flush_trust, NULL);
3765
3766
3767   GNUNET_SERVER_add_handlers (server, handlers);
3768   GNUNET_SCHEDULER_add_delayed (sched,
3769                                 GNUNET_TIME_UNIT_FOREVER_REL,
3770                                 &shutdown_task,
3771                                 NULL);
3772   return GNUNET_OK;
3773 }
3774
3775
3776 /**
3777  * Process fs requests.
3778  *
3779  * @param cls closure
3780  * @param sched scheduler to use
3781  * @param server the initialized server
3782  * @param cfg configuration to use
3783  */
3784 static void
3785 run (void *cls,
3786      struct GNUNET_SCHEDULER_Handle *sched,
3787      struct GNUNET_SERVER_Handle *server,
3788      const struct GNUNET_CONFIGURATION_Handle *cfg)
3789 {
3790   active_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
3791                                                            "FS",
3792                                                            "ACTIVEMIGRATION");
3793   dsh = GNUNET_DATASTORE_connect (cfg,
3794                                   sched);
3795   if (dsh == NULL)
3796     {
3797       GNUNET_SCHEDULER_shutdown (sched);
3798       return;
3799     }
3800   datastore_load = GNUNET_LOAD_value_init ();
3801   block_cfg = GNUNET_CONFIGURATION_create ();
3802   GNUNET_CONFIGURATION_set_value_string (block_cfg,
3803                                          "block",
3804                                          "PLUGINS",
3805                                          "fs");
3806   block_ctx = GNUNET_BLOCK_context_create (block_cfg);
3807   GNUNET_assert (NULL != block_ctx);
3808   dht_handle = GNUNET_DHT_connect (sched,
3809                                    cfg,
3810                                    FS_DHT_HT_SIZE);
3811   if ( (GNUNET_OK != GNUNET_FS_indexing_init (sched, cfg, dsh)) ||
3812        (GNUNET_OK != main_init (sched, server, cfg)) )
3813     {    
3814       GNUNET_SCHEDULER_shutdown (sched);
3815       GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
3816       dsh = NULL;
3817       GNUNET_DHT_disconnect (dht_handle);
3818       dht_handle = NULL;
3819       GNUNET_BLOCK_context_destroy (block_ctx);
3820       block_ctx = NULL;
3821       GNUNET_CONFIGURATION_destroy (block_cfg);
3822       block_cfg = NULL;
3823       GNUNET_LOAD_value_free (datastore_load);
3824       datastore_load = NULL;
3825       return;   
3826     }
3827 }
3828
3829
3830 /**
3831  * The main function for the fs service.
3832  *
3833  * @param argc number of arguments from the command line
3834  * @param argv command line arguments
3835  * @return 0 ok, 1 on error
3836  */
3837 int
3838 main (int argc, char *const *argv)
3839 {
3840   return (GNUNET_OK ==
3841           GNUNET_SERVICE_run (argc,
3842                               argv,
3843                               "fs",
3844                               GNUNET_SERVICE_OPTION_NONE,
3845                               &run, NULL)) ? 0 : 1;
3846 }
3847
3848 /* end of gnunet-service-fs.c */