fix compile warning
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs.c
23  * @brief gnunet anonymity protocol implementation
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - track per-peer request latency (using new load API)
28  * - consider more precise latency estimation (per-peer & request) -- again load API?
29  * - implement test_load_too_high, make decision priority-based, implement forwarding, etc.
30  * - introduce random latency in processing
31  * - tell other peers to stop migration if our PUTs fail (or if
32  *   we don't support migration per configuration?)
33  * - more statistics
34  */
35 #include "platform.h"
36 #include <float.h>
37 #include "gnunet_constants.h"
38 #include "gnunet_core_service.h"
39 #include "gnunet_dht_service.h"
40 #include "gnunet_datastore_service.h"
41 #include "gnunet_load_lib.h"
42 #include "gnunet_peer_lib.h"
43 #include "gnunet_protocols.h"
44 #include "gnunet_signatures.h"
45 #include "gnunet_statistics_service.h"
46 #include "gnunet_util_lib.h"
47 #include "gnunet-service-fs_indexing.h"
48 #include "fs.h"
49
50 #define DEBUG_FS GNUNET_NO
51
52 /**
53  * Maximum number of outgoing messages we queue per peer.
54  */
55 #define MAX_QUEUE_PER_PEER 16
56
57 /**
58  * Size for the hash map for DHT requests from the FS
59  * service.  Should be about the number of concurrent
60  * DHT requests we plan to make.
61  */
62 #define FS_DHT_HT_SIZE 1024
63
64 /**
65  * How often do we flush trust values to disk?
66  */
67 #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
68
69 /**
70  * Inverse of the probability that we will submit the same query
71  * to the same peer again.  If the same peer already got the query
72  * repeatedly recently, the probability is multiplied by the inverse
73  * of this number each time.  Note that we only try about every TTL_DECREMENT/2
74  * plus MAX_CORK_DELAY (so roughly every 3.5s).
75  */
76 #define RETRY_PROBABILITY_INV 3
77
78 /**
79  * What is the maximum delay for a P2P FS message (in our interaction
80  * with core)?  FS-internal delays are another story.  The value is
81  * chosen based on the 32k block size.  Given that peers typcially
82  * have at least 1 kb/s bandwidth, 45s waits give us a chance to
83  * transmit one message even to the lowest-bandwidth peers.
84  */
85 #define MAX_TRANSMIT_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 45)
86
87 /**
88  * Maximum number of requests (from other peers) that we're
89  * willing to have pending at any given point in time.
90  */
91 static unsigned long long max_pending_requests = (32 * 1024);
92
93
94 /**
95  * Information we keep for each pending reply.  The
96  * actual message follows at the end of this struct.
97  */
98 struct PendingMessage;
99
100 /**
101  * Function called upon completion of a transmission.
102  *
103  * @param cls closure
104  * @param pid ID of receiving peer, 0 on transmission error
105  */
106 typedef void (*TransmissionContinuation)(void * cls, 
107                                          GNUNET_PEER_Id tpid);
108
109
110 /**
111  * Information we keep for each pending message (GET/PUT).  The
112  * actual message follows at the end of this struct.
113  */
114 struct PendingMessage
115 {
116   /**
117    * This is a doubly-linked list of messages to the same peer.
118    */
119   struct PendingMessage *next;
120
121   /**
122    * This is a doubly-linked list of messages to the same peer.
123    */
124   struct PendingMessage *prev;
125
126   /**
127    * Entry in pending message list for this pending message.
128    */ 
129   struct PendingMessageList *pml;  
130
131   /**
132    * Function to call immediately once we have transmitted this
133    * message.
134    */
135   TransmissionContinuation cont;
136
137   /**
138    * Closure for cont.
139    */
140   void *cont_cls;
141
142   /**
143    * Size of the reply; actual reply message follows
144    * at the end of this struct.
145    */
146   size_t msize;
147   
148   /**
149    * How important is this message for us?
150    */
151   uint32_t priority;
152  
153 };
154
155
156 /**
157  * Information about a peer that we are connected to.
158  * We track data that is useful for determining which
159  * peers should receive our requests.  We also keep
160  * a list of messages to transmit to this peer.
161  */
162 struct ConnectedPeer
163 {
164
165   /**
166    * List of the last clients for which this peer successfully
167    * answered a query.
168    */
169   struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
170
171   /**
172    * List of the last PIDs for which
173    * this peer successfully answered a query;
174    * We use 0 to indicate no successful reply.
175    */
176   GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
177
178   /**
179    * Average delay between sending the peer a request and
180    * getting a reply (only calculated over the requests for
181    * which we actually got a reply).   Calculated
182    * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
183    */ 
184   struct GNUNET_TIME_Relative avg_delay;
185
186   /**
187    * Point in time until which this peer does not want us to migrate content
188    * to it.
189    */
190   struct GNUNET_TIME_Absolute migration_blocked;
191
192   /**
193    * Handle for an active request for transmission to this
194    * peer, or NULL.
195    */
196   struct GNUNET_CORE_TransmitHandle *cth;
197
198   /**
199    * Messages (replies, queries, content migration) we would like to
200    * send to this peer in the near future.  Sorted by priority, head.
201    */
202   struct PendingMessage *pending_messages_head;
203
204   /**
205    * Messages (replies, queries, content migration) we would like to
206    * send to this peer in the near future.  Sorted by priority, tail.
207    */
208   struct PendingMessage *pending_messages_tail;
209
210   /**
211    * Average priority of successful replies.  Calculated
212    * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
213    */
214   double avg_priority;
215
216   /**
217    * Increase in traffic preference still to be submitted
218    * to the core service for this peer.
219    */
220   uint64_t inc_preference;
221
222   /**
223    * Trust rating for this peer
224    */
225   uint32_t trust;
226
227   /**
228    * Trust rating for this peer on disk.
229    */
230   uint32_t disk_trust;
231
232   /**
233    * The peer's identity.
234    */
235   GNUNET_PEER_Id pid;  
236
237   /**
238    * Size of the linked list of 'pending_messages'.
239    */
240   unsigned int pending_requests;
241
242   /**
243    * Which offset in "last_p2p_replies" will be updated next?
244    * (we go round-robin).
245    */
246   unsigned int last_p2p_replies_woff;
247
248   /**
249    * Which offset in "last_client_replies" will be updated next?
250    * (we go round-robin).
251    */
252   unsigned int last_client_replies_woff;
253
254 };
255
256
257 /**
258  * Information we keep for each pending request.  We should try to
259  * keep this struct as small as possible since its memory consumption
260  * is key to how many requests we can have pending at once.
261  */
262 struct PendingRequest;
263
264
265 /**
266  * Doubly-linked list of requests we are performing
267  * on behalf of the same client.
268  */
269 struct ClientRequestList
270 {
271
272   /**
273    * This is a doubly-linked list.
274    */
275   struct ClientRequestList *next;
276
277   /**
278    * This is a doubly-linked list.
279    */
280   struct ClientRequestList *prev;
281
282   /**
283    * Request this entry represents.
284    */
285   struct PendingRequest *req;
286
287   /**
288    * Client list this request belongs to.
289    */
290   struct ClientList *client_list;
291
292 };
293
294
295 /**
296  * Replies to be transmitted to the client.  The actual
297  * response message is allocated after this struct.
298  */
299 struct ClientResponseMessage
300 {
301   /**
302    * This is a doubly-linked list.
303    */
304   struct ClientResponseMessage *next;
305
306   /**
307    * This is a doubly-linked list.
308    */
309   struct ClientResponseMessage *prev;
310
311   /**
312    * Client list entry this response belongs to.
313    */
314   struct ClientList *client_list;
315
316   /**
317    * Number of bytes in the response.
318    */
319   size_t msize;
320 };
321
322
323 /**
324  * Linked list of clients we are performing requests
325  * for right now.
326  */
327 struct ClientList
328 {
329   /**
330    * This is a linked list.
331    */
332   struct ClientList *next;
333
334   /**
335    * ID of a client making a request, NULL if this entry is for a
336    * peer.
337    */
338   struct GNUNET_SERVER_Client *client;
339
340   /**
341    * Head of list of requests performed on behalf
342    * of this client right now.
343    */
344   struct ClientRequestList *rl_head;
345
346   /**
347    * Tail of list of requests performed on behalf
348    * of this client right now.
349    */
350   struct ClientRequestList *rl_tail;
351
352   /**
353    * Head of linked list of responses.
354    */
355   struct ClientResponseMessage *res_head;
356
357   /**
358    * Tail of linked list of responses.
359    */
360   struct ClientResponseMessage *res_tail;
361
362   /**
363    * Context for sending replies.
364    */
365   struct GNUNET_CONNECTION_TransmitHandle *th;
366
367 };
368
369
370 /**
371  * Doubly-linked list of messages we are performing
372  * due to a pending request.
373  */
374 struct PendingMessageList
375 {
376
377   /**
378    * This is a doubly-linked list of messages on behalf of the same request.
379    */
380   struct PendingMessageList *next;
381
382   /**
383    * This is a doubly-linked list of messages on behalf of the same request.
384    */
385   struct PendingMessageList *prev;
386
387   /**
388    * Message this entry represents.
389    */
390   struct PendingMessage *pm;
391
392   /**
393    * Request this entry belongs to.
394    */
395   struct PendingRequest *req;
396
397   /**
398    * Peer this message is targeted for.
399    */
400   struct ConnectedPeer *target;
401
402 };
403
404
405 /**
406  * Information we keep for each pending request.  We should try to
407  * keep this struct as small as possible since its memory consumption
408  * is key to how many requests we can have pending at once.
409  */
410 struct PendingRequest
411 {
412
413   /**
414    * If this request was made by a client, this is our entry in the
415    * client request list; otherwise NULL.
416    */
417   struct ClientRequestList *client_request_list;
418
419   /**
420    * Entry of peer responsible for this entry (if this request
421    * was made by a peer).
422    */
423   struct ConnectedPeer *cp;
424
425   /**
426    * If this is a namespace query, pointer to the hash of the public
427    * key of the namespace; otherwise NULL.  Pointer will be to the 
428    * end of this struct (so no need to free it).
429    */
430   const GNUNET_HashCode *namespace;
431
432   /**
433    * Bloomfilter we use to filter out replies that we don't care about
434    * (anymore).  NULL as long as we are interested in all replies.
435    */
436   struct GNUNET_CONTAINER_BloomFilter *bf;
437
438   /**
439    * Context of our GNUNET_CORE_peer_change_preference call.
440    */
441   struct GNUNET_CORE_InformationRequestContext *irc;
442
443   /**
444    * Hash code of all replies that we have seen so far (only valid
445    * if client is not NULL since we only track replies like this for
446    * our own clients).
447    */
448   GNUNET_HashCode *replies_seen;
449
450   /**
451    * Node in the heap representing this entry; NULL
452    * if we have no heap node.
453    */
454   struct GNUNET_CONTAINER_HeapNode *hnode;
455
456   /**
457    * Head of list of messages being performed on behalf of this
458    * request.
459    */
460   struct PendingMessageList *pending_head;
461
462   /**
463    * Tail of list of messages being performed on behalf of this
464    * request.
465    */
466   struct PendingMessageList *pending_tail;
467
468   /**
469    * When did we first see this request (form this peer), or, if our
470    * client is initiating, when did we last initiate a search?
471    */
472   struct GNUNET_TIME_Absolute start_time;
473
474   /**
475    * The query that this request is for.
476    */
477   GNUNET_HashCode query;
478
479   /**
480    * The task responsible for transmitting queries
481    * for this request.
482    */
483   GNUNET_SCHEDULER_TaskIdentifier task;
484
485   /**
486    * (Interned) Peer identifier that identifies a preferred target
487    * for requests.
488    */
489   GNUNET_PEER_Id target_pid;
490
491   /**
492    * (Interned) Peer identifiers of peers that have already
493    * received our query for this content.
494    */
495   GNUNET_PEER_Id *used_pids;
496   
497   /**
498    * Our entry in the queue (non-NULL while we wait for our
499    * turn to interact with the local database).
500    */
501   struct GNUNET_DATASTORE_QueueEntry *qe;
502
503   /**
504    * Size of the 'bf' (in bytes).
505    */
506   size_t bf_size;
507
508   /**
509    * Desired anonymity level; only valid for requests from a local client.
510    */
511   uint32_t anonymity_level;
512
513   /**
514    * How many entries in "used_pids" are actually valid?
515    */
516   unsigned int used_pids_off;
517
518   /**
519    * How long is the "used_pids" array?
520    */
521   unsigned int used_pids_size;
522
523   /**
524    * Number of results found for this request.
525    */
526   unsigned int results_found;
527
528   /**
529    * How many entries in "replies_seen" are actually valid?
530    */
531   unsigned int replies_seen_off;
532
533   /**
534    * How long is the "replies_seen" array?
535    */
536   unsigned int replies_seen_size;
537   
538   /**
539    * Priority with which this request was made.  If one of our clients
540    * made the request, then this is the current priority that we are
541    * using when initiating the request.  This value is used when
542    * we decide to reward other peers with trust for providing a reply.
543    */
544   uint32_t priority;
545
546   /**
547    * Priority points left for us to spend when forwarding this request
548    * to other peers.
549    */
550   uint32_t remaining_priority;
551
552   /**
553    * Number to mingle hashes for bloom-filter tests with.
554    */
555   int32_t mingle;
556
557   /**
558    * TTL with which we saw this request (or, if we initiated, TTL that
559    * we used for the request).
560    */
561   int32_t ttl;
562   
563   /**
564    * Type of the content that this request is for.
565    */
566   enum GNUNET_BLOCK_Type type;
567
568   /**
569    * Remove this request after transmission of the current response.
570    */
571   int16_t do_remove;
572
573   /**
574    * GNUNET_YES if we should not forward this request to other peers.
575    */
576   int16_t local_only;
577
578 };
579
580
581 /**
582  * Block that is ready for migration to other peers.  Actual data is at the end of the block.
583  */
584 struct MigrationReadyBlock
585 {
586
587   /**
588    * This is a doubly-linked list.
589    */
590   struct MigrationReadyBlock *next;
591
592   /**
593    * This is a doubly-linked list.
594    */
595   struct MigrationReadyBlock *prev;
596
597   /**
598    * Query for the block.
599    */
600   GNUNET_HashCode query;
601
602   /**
603    * When does this block expire? 
604    */
605   struct GNUNET_TIME_Absolute expiration;
606
607   /**
608    * Peers we would consider forwarding this
609    * block to.  Zero for empty entries.
610    */
611   GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
612
613   /**
614    * Size of the block.
615    */
616   size_t size;
617
618   /**
619    *  Number of targets already used.
620    */
621   unsigned int used_targets;
622
623   /**
624    * Type of the block.
625    */
626   enum GNUNET_BLOCK_Type type;
627 };
628
629
630 /**
631  * Our connection to the datastore.
632  */
633 static struct GNUNET_DATASTORE_Handle *dsh;
634
635 /**
636  * Our block context.
637  */
638 static struct GNUNET_BLOCK_Context *block_ctx;
639
640 /**
641  * Our block configuration.
642  */
643 static struct GNUNET_CONFIGURATION_Handle *block_cfg;
644
645 /**
646  * Our scheduler.
647  */
648 static struct GNUNET_SCHEDULER_Handle *sched;
649
650 /**
651  * Our configuration.
652  */
653 static const struct GNUNET_CONFIGURATION_Handle *cfg;
654
655 /**
656  * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
657  */
658 static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
659
660 /**
661  * Map of peer identifiers to "struct PendingRequest" (for that peer).
662  */
663 static struct GNUNET_CONTAINER_MultiHashMap *peer_request_map;
664
665 /**
666  * Map of query identifiers to "struct PendingRequest" (for that query).
667  */
668 static struct GNUNET_CONTAINER_MultiHashMap *query_request_map;
669
670 /**
671  * Heap with the request that will expire next at the top.  Contains
672  * pointers of type "struct PendingRequest*"; these will *also* be
673  * aliased from the "requests_by_peer" data structures and the
674  * "requests_by_query" table.  Note that requests from our clients
675  * don't expire and are thus NOT in the "requests_by_expiration"
676  * (or the "requests_by_peer" tables).
677  */
678 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
679
680 /**
681  * Handle for reporting statistics.
682  */
683 static struct GNUNET_STATISTICS_Handle *stats;
684
685 /**
686  * Linked list of clients we are currently processing requests for.
687  */
688 static struct ClientList *client_list;
689
690 /**
691  * Pointer to handle to the core service (points to NULL until we've
692  * connected to it).
693  */
694 static struct GNUNET_CORE_Handle *core;
695
696 /**
697  * Head of linked list of blocks that can be migrated.
698  */
699 static struct MigrationReadyBlock *mig_head;
700
701 /**
702  * Tail of linked list of blocks that can be migrated.
703  */
704 static struct MigrationReadyBlock *mig_tail;
705
706 /**
707  * Request to datastore for migration (or NULL).
708  */
709 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
710
711 /**
712  * Where do we store trust information?
713  */
714 static char *trustDirectory;
715
716 /**
717  * ID of task that collects blocks for migration.
718  */
719 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
720
721 /**
722  * What is the maximum frequency at which we are allowed to
723  * poll the datastore for migration content?
724  */
725 static struct GNUNET_TIME_Relative min_migration_delay;
726
727 /**
728  * Handle for DHT operations.
729  */
730 static struct GNUNET_DHT_Handle *dht_handle;
731
732 /**
733  * Size of the doubly-linked list of migration blocks.
734  */
735 static unsigned int mig_size;
736
737 /**
738  * Are we allowed to migrate content to this peer.
739  */
740 static int active_migration;
741
742 /**
743  * Typical priorities we're seeing from other peers right now.  Since
744  * most priorities will be zero, this value is the weighted average of
745  * non-zero priorities seen "recently".  In order to ensure that new
746  * values do not dramatically change the ratio, values are first
747  * "capped" to a reasonable range (+N of the current value) and then
748  * averaged into the existing value by a ratio of 1:N.  Hence
749  * receiving the largest possible priority can still only raise our
750  * "current_priorities" by at most 1.
751  */
752 static double current_priorities;
753
754 /**
755  * Datastore load tracking.
756  */
757 static struct GNUNET_LOAD_Value *datastore_load;
758
759
760 /**
761  * We've just now completed a datastore request.  Update our
762  * datastore load calculations.
763  *
764  * @param start time when the datastore request was issued
765  */
766 static void
767 update_datastore_delays (struct GNUNET_TIME_Absolute start)
768 {
769   struct GNUNET_TIME_Relative delay;
770
771   delay = GNUNET_TIME_absolute_get_duration (start);
772   GNUNET_LOAD_update (datastore_load,
773                       delay.value);
774 }
775
776
777 /**
778  * Get the filename under which we would store the GNUNET_HELLO_Message
779  * for the given host and protocol.
780  * @return filename of the form DIRECTORY/HOSTID
781  */
782 static char *
783 get_trust_filename (const struct GNUNET_PeerIdentity *id)
784 {
785   struct GNUNET_CRYPTO_HashAsciiEncoded fil;
786   char *fn;
787
788   GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
789   GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil);
790   return fn;
791 }
792
793
794
795 /**
796  * Transmit messages by copying it to the target buffer
797  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
798  * for writing in the meantime.  In that case, do nothing
799  * (the disconnect or shutdown handler will take care of the rest).
800  * If we were able to transmit messages and there are still more
801  * pending, ask core again for further calls to this function.
802  *
803  * @param cls closure, pointer to the 'struct ConnectedPeer*'
804  * @param size number of bytes available in buf
805  * @param buf where the callee should write the message
806  * @return number of bytes written to buf
807  */
808 static size_t
809 transmit_to_peer (void *cls,
810                   size_t size, void *buf);
811
812
813 /* ******************* clean up functions ************************ */
814
815 /**
816  * Delete the given migration block.
817  *
818  * @param mb block to delete
819  */
820 static void
821 delete_migration_block (struct MigrationReadyBlock *mb)
822 {
823   GNUNET_CONTAINER_DLL_remove (mig_head,
824                                mig_tail,
825                                mb);
826   GNUNET_PEER_decrement_rcs (mb->target_list,
827                              MIGRATION_LIST_SIZE);
828   mig_size--;
829   GNUNET_free (mb);
830 }
831
832
833 /**
834  * Compare the distance of two peers to a key.
835  *
836  * @param key key
837  * @param p1 first peer
838  * @param p2 second peer
839  * @return GNUNET_YES if P1 is closer to key than P2
840  */
841 static int
842 is_closer (const GNUNET_HashCode *key,
843            const struct GNUNET_PeerIdentity *p1,
844            const struct GNUNET_PeerIdentity *p2)
845 {
846   return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey,
847                                     &p2->hashPubKey,
848                                     key);
849 }
850
851
852 /**
853  * Consider migrating content to a given peer.
854  *
855  * @param cls 'struct MigrationReadyBlock*' to select
856  *            targets for (or NULL for none)
857  * @param key ID of the peer 
858  * @param value 'struct ConnectedPeer' of the peer
859  * @return GNUNET_YES (always continue iteration)
860  */
861 static int
862 consider_migration (void *cls,
863                     const GNUNET_HashCode *key,
864                     void *value)
865 {
866   struct MigrationReadyBlock *mb = cls;
867   struct ConnectedPeer *cp = value;
868   struct MigrationReadyBlock *pos;
869   struct GNUNET_PeerIdentity cppid;
870   struct GNUNET_PeerIdentity otherpid;
871   struct GNUNET_PeerIdentity worstpid;
872   size_t msize;
873   unsigned int i;
874   unsigned int repl;
875   
876   /* consider 'cp' as a migration target for mb */
877   if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).value > 0)
878     return GNUNET_YES; /* peer has requested no migration! */
879   if (mb != NULL)
880     {
881       GNUNET_PEER_resolve (cp->pid,
882                            &cppid);
883       repl = MIGRATION_LIST_SIZE;
884       for (i=0;i<MIGRATION_LIST_SIZE;i++)
885         {
886           if (mb->target_list[i] == 0)
887             {
888               mb->target_list[i] = cp->pid;
889               GNUNET_PEER_change_rc (mb->target_list[i], 1);
890               repl = MIGRATION_LIST_SIZE;
891               break;
892             }
893           GNUNET_PEER_resolve (mb->target_list[i],
894                                &otherpid);
895           if ( (repl == MIGRATION_LIST_SIZE) &&
896                is_closer (&mb->query,
897                           &cppid,
898                           &otherpid)) 
899             {
900               repl = i;
901               worstpid = otherpid;
902             }
903           else if ( (repl != MIGRATION_LIST_SIZE) &&
904                     (is_closer (&mb->query,
905                                 &worstpid,
906                                 &otherpid) ) )
907             {
908               repl = i;
909               worstpid = otherpid;
910             }       
911         }
912       if (repl != MIGRATION_LIST_SIZE) 
913         {
914           GNUNET_PEER_change_rc (mb->target_list[repl], -1);
915           mb->target_list[repl] = cp->pid;
916           GNUNET_PEER_change_rc (mb->target_list[repl], 1);
917         }
918     }
919
920   /* consider scheduling transmission to cp for content migration */
921   if (cp->cth != NULL)
922     return GNUNET_YES; 
923   msize = 0;
924   pos = mig_head;
925   while (pos != NULL)
926     {
927       for (i=0;i<MIGRATION_LIST_SIZE;i++)
928         {
929           if (cp->pid == pos->target_list[i])
930             {
931               if (msize == 0)
932                 msize = pos->size;
933               else
934                 msize = GNUNET_MIN (msize,
935                                     pos->size);
936               break;
937             }
938         }
939       pos = pos->next;
940     }
941   if (msize == 0)
942     return GNUNET_YES; /* no content available */
943 #if DEBUG_FS
944   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
945               "Trying to migrate at least %u bytes to peer `%s'\n",
946               msize,
947               GNUNET_h2s (key));
948 #endif
949   cp->cth 
950     = GNUNET_CORE_notify_transmit_ready (core,
951                                          0, GNUNET_TIME_UNIT_FOREVER_REL,
952                                          (const struct GNUNET_PeerIdentity*) key,
953                                          msize + sizeof (struct PutMessage),
954                                          &transmit_to_peer,
955                                          cp);
956   return GNUNET_YES;
957 }
958
959
960 /**
961  * Task that is run periodically to obtain blocks for content
962  * migration
963  * 
964  * @param cls unused
965  * @param tc scheduler context (also unused)
966  */
967 static void
968 gather_migration_blocks (void *cls,
969                          const struct GNUNET_SCHEDULER_TaskContext *tc);
970
971
972 /**
973  * If the migration task is not currently running, consider
974  * (re)scheduling it with the appropriate delay.
975  */
976 static void
977 consider_migration_gathering ()
978 {
979   struct GNUNET_TIME_Relative delay;
980
981   if (dsh == NULL)
982     return;
983   if (mig_qe != NULL)
984     return;
985   if (mig_task != GNUNET_SCHEDULER_NO_TASK)
986     return;
987   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
988                                          mig_size);
989   delay = GNUNET_TIME_relative_divide (delay,
990                                        MAX_MIGRATION_QUEUE);
991   delay = GNUNET_TIME_relative_max (delay,
992                                     min_migration_delay);
993   mig_task = GNUNET_SCHEDULER_add_delayed (sched,
994                                            delay,
995                                            &gather_migration_blocks,
996                                            NULL);
997 }
998
999
1000 /**
1001  * Process content offered for migration.
1002  *
1003  * @param cls closure
1004  * @param key key for the content
1005  * @param size number of bytes in data
1006  * @param data content stored
1007  * @param type type of the content
1008  * @param priority priority of the content
1009  * @param anonymity anonymity-level for the content
1010  * @param expiration expiration time for the content
1011  * @param uid unique identifier for the datum;
1012  *        maybe 0 if no unique identifier is available
1013  */
1014 static void
1015 process_migration_content (void *cls,
1016                            const GNUNET_HashCode * key,
1017                            uint32_t size,
1018                            const void *data,
1019                            enum GNUNET_BLOCK_Type type,
1020                            uint32_t priority,
1021                            uint32_t anonymity,
1022                            struct GNUNET_TIME_Absolute
1023                            expiration, uint64_t uid)
1024 {
1025   struct MigrationReadyBlock *mb;
1026   
1027   if (key == NULL)
1028     {
1029       mig_qe = NULL;
1030       if (mig_size < MAX_MIGRATION_QUEUE)  
1031         consider_migration_gathering ();
1032       return;
1033     }
1034   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1035     {
1036       if (GNUNET_OK !=
1037           GNUNET_FS_handle_on_demand_block (key, size, data,
1038                                             type, priority, anonymity,
1039                                             expiration, uid, 
1040                                             &process_migration_content,
1041                                             NULL))
1042         {
1043           GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1044         }
1045       return;
1046     }
1047 #if DEBUG_FS
1048   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1049               "Retrieved block `%s' of type %u for migration\n",
1050               GNUNET_h2s (key),
1051               type);
1052 #endif
1053   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
1054   mb->query = *key;
1055   mb->expiration = expiration;
1056   mb->size = size;
1057   mb->type = type;
1058   memcpy (&mb[1], data, size);
1059   GNUNET_CONTAINER_DLL_insert_after (mig_head,
1060                                      mig_tail,
1061                                      mig_tail,
1062                                      mb);
1063   mig_size++;
1064   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1065                                          &consider_migration,
1066                                          mb);
1067   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1068 }
1069
1070
1071 /**
1072  * Task that is run periodically to obtain blocks for content
1073  * migration
1074  * 
1075  * @param cls unused
1076  * @param tc scheduler context (also unused)
1077  */
1078 static void
1079 gather_migration_blocks (void *cls,
1080                          const struct GNUNET_SCHEDULER_TaskContext *tc)
1081 {
1082   mig_task = GNUNET_SCHEDULER_NO_TASK;
1083   if (dsh != NULL)
1084     {
1085       mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, -1,
1086                                             GNUNET_TIME_UNIT_FOREVER_REL,
1087                                             &process_migration_content, NULL);
1088       GNUNET_assert (mig_qe != NULL);
1089     }
1090 }
1091
1092
1093 /**
1094  * We're done with a particular message list entry.
1095  * Free all associated resources.
1096  * 
1097  * @param pml entry to destroy
1098  */
1099 static void
1100 destroy_pending_message_list_entry (struct PendingMessageList *pml)
1101 {
1102   GNUNET_CONTAINER_DLL_remove (pml->req->pending_head,
1103                                pml->req->pending_tail,
1104                                pml);
1105   GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head,
1106                                pml->target->pending_messages_tail,
1107                                pml->pm);
1108   pml->target->pending_requests--;
1109   GNUNET_free (pml->pm);
1110   GNUNET_free (pml);
1111 }
1112
1113
1114 /**
1115  * Destroy the given pending message (and call the respective
1116  * continuation).
1117  *
1118  * @param pm message to destroy
1119  * @param tpid id of peer that the message was delivered to, or 0 for none
1120  */
1121 static void
1122 destroy_pending_message (struct PendingMessage *pm,
1123                          GNUNET_PEER_Id tpid)
1124 {
1125   struct PendingMessageList *pml = pm->pml;
1126   TransmissionContinuation cont;
1127   void *cont_cls;
1128
1129   GNUNET_assert (pml->pm == pm);
1130   GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
1131   cont = pm->cont;
1132   cont_cls = pm->cont_cls;
1133   destroy_pending_message_list_entry (pml);
1134   cont (cont_cls, tpid);  
1135 }
1136
1137
1138 /**
1139  * We're done processing a particular request.
1140  * Free all associated resources.
1141  *
1142  * @param pr request to destroy
1143  */
1144 static void
1145 destroy_pending_request (struct PendingRequest *pr)
1146 {
1147   struct GNUNET_PeerIdentity pid;
1148
1149   if (pr->hnode != NULL)
1150     {
1151       GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
1152                                          pr->hnode);
1153       pr->hnode = NULL;
1154     }
1155   if (NULL == pr->client_request_list)
1156     {
1157       GNUNET_STATISTICS_update (stats,
1158                                 gettext_noop ("# P2P searches active"),
1159                                 -1,
1160                                 GNUNET_NO);
1161     }
1162   else
1163     {
1164       GNUNET_STATISTICS_update (stats,
1165                                 gettext_noop ("# client searches active"),
1166                                 -1,
1167                                 GNUNET_NO);
1168     }
1169   /* might have already been removed from map in 'process_reply' (if
1170      there was a unique reply) or never inserted if it was a
1171      duplicate; hence ignore the return value here */
1172   (void) GNUNET_CONTAINER_multihashmap_remove (query_request_map,
1173                                                &pr->query,
1174                                                pr);
1175   if (pr->qe != NULL)
1176      {
1177       GNUNET_DATASTORE_cancel (pr->qe);
1178       pr->qe = NULL;
1179     }
1180   if (pr->client_request_list != NULL)
1181     {
1182       GNUNET_CONTAINER_DLL_remove (pr->client_request_list->client_list->rl_head,
1183                                    pr->client_request_list->client_list->rl_tail,
1184                                    pr->client_request_list);
1185       GNUNET_free (pr->client_request_list);
1186       pr->client_request_list = NULL;
1187     }
1188   if (pr->cp != NULL)
1189     {
1190       GNUNET_PEER_resolve (pr->cp->pid,
1191                            &pid);
1192       (void) GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1193                                                    &pid.hashPubKey,
1194                                                    pr);
1195       pr->cp = NULL;
1196     }
1197   if (pr->bf != NULL)
1198     {
1199       GNUNET_CONTAINER_bloomfilter_free (pr->bf);                                        
1200       pr->bf = NULL;
1201     }
1202   if (pr->irc != NULL)
1203     {
1204       GNUNET_CORE_peer_change_preference_cancel (pr->irc);
1205       pr->irc = NULL;
1206     }
1207   if (pr->replies_seen != NULL)
1208     {
1209       GNUNET_free (pr->replies_seen);
1210       pr->replies_seen = NULL;
1211     }
1212   if (pr->task != GNUNET_SCHEDULER_NO_TASK)
1213     {
1214       GNUNET_SCHEDULER_cancel (sched,
1215                                pr->task);
1216       pr->task = GNUNET_SCHEDULER_NO_TASK;
1217     }
1218   while (NULL != pr->pending_head)    
1219     destroy_pending_message_list_entry (pr->pending_head);
1220   GNUNET_PEER_change_rc (pr->target_pid, -1);
1221   if (pr->used_pids != NULL)
1222     {
1223       GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
1224       GNUNET_free (pr->used_pids);
1225       pr->used_pids_off = 0;
1226       pr->used_pids_size = 0;
1227       pr->used_pids = NULL;
1228     }
1229   GNUNET_free (pr);
1230 }
1231
1232
1233 /**
1234  * Method called whenever a given peer connects.
1235  *
1236  * @param cls closure, not used
1237  * @param peer peer identity this notification is about
1238  * @param latency reported latency of the connection with 'other'
1239  * @param distance reported distance (DV) to 'other' 
1240  */
1241 static void 
1242 peer_connect_handler (void *cls,
1243                       const struct
1244                       GNUNET_PeerIdentity * peer,
1245                       struct GNUNET_TIME_Relative latency,
1246                       uint32_t distance)
1247 {
1248   struct ConnectedPeer *cp;
1249   struct MigrationReadyBlock *pos;
1250   char *fn;
1251   uint32_t trust;
1252   
1253   cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
1254   cp->pid = GNUNET_PEER_intern (peer);
1255
1256   fn = get_trust_filename (peer);
1257   if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
1258       (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
1259     cp->disk_trust = cp->trust = ntohl (trust);
1260   GNUNET_free (fn);
1261
1262   GNUNET_break (GNUNET_OK ==
1263                 GNUNET_CONTAINER_multihashmap_put (connected_peers,
1264                                                    &peer->hashPubKey,
1265                                                    cp,
1266                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1267
1268   pos = mig_head;
1269   while (NULL != pos)
1270     {
1271       (void) consider_migration (pos, &peer->hashPubKey, cp);
1272       pos = pos->next;
1273     }
1274 }
1275
1276
1277 /**
1278  * Increase the host credit by a value.
1279  *
1280  * @param host which peer to change the trust value on
1281  * @param value is the int value by which the
1282  *  host credit is to be increased or decreased
1283  * @returns the actual change in trust (positive or negative)
1284  */
1285 static int
1286 change_host_trust (struct ConnectedPeer *host, int value)
1287 {
1288   unsigned int old_trust;
1289
1290   if (value == 0)
1291     return 0;
1292   GNUNET_assert (host != NULL);
1293   old_trust = host->trust;
1294   if (value > 0)
1295     {
1296       if (host->trust + value < host->trust)
1297         {
1298           value = UINT32_MAX - host->trust;
1299           host->trust = UINT32_MAX;
1300         }
1301       else
1302         host->trust += value;
1303     }
1304   else
1305     {
1306       if (host->trust < -value)
1307         {
1308           value = -host->trust;
1309           host->trust = 0;
1310         }
1311       else
1312         host->trust += value;
1313     }
1314   return value;
1315 }
1316
1317
1318 /**
1319  * Write host-trust information to a file - flush the buffer entry!
1320  */
1321 static int
1322 flush_trust (void *cls,
1323              const GNUNET_HashCode *key,
1324              void *value)
1325 {
1326   struct ConnectedPeer *host = value;
1327   char *fn;
1328   uint32_t trust;
1329   struct GNUNET_PeerIdentity pid;
1330
1331   if (host->trust == host->disk_trust)
1332     return GNUNET_OK;                     /* unchanged */
1333   GNUNET_PEER_resolve (host->pid,
1334                        &pid);
1335   fn = get_trust_filename (&pid);
1336   if (host->trust == 0)
1337     {
1338       if ((0 != UNLINK (fn)) && (errno != ENOENT))
1339         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
1340                                   GNUNET_ERROR_TYPE_BULK, "unlink", fn);
1341     }
1342   else
1343     {
1344       trust = htonl (host->trust);
1345       if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust, 
1346                                                     sizeof(uint32_t),
1347                                                     GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE
1348                                                     | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ))
1349         host->disk_trust = host->trust;
1350     }
1351   GNUNET_free (fn);
1352   return GNUNET_OK;
1353 }
1354
1355 /**
1356  * Call this method periodically to scan data/hosts for new hosts.
1357  */
1358 static void
1359 cron_flush_trust (void *cls,
1360                   const struct GNUNET_SCHEDULER_TaskContext *tc)
1361 {
1362
1363   if (NULL == connected_peers)
1364     return;
1365   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1366                                          &flush_trust,
1367                                          NULL);
1368   if (NULL == tc)
1369     return;
1370   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1371     return;
1372   GNUNET_SCHEDULER_add_delayed (tc->sched,
1373                                 TRUST_FLUSH_FREQ, &cron_flush_trust, NULL);
1374 }
1375
1376
1377 /**
1378  * Free (each) request made by the peer.
1379  *
1380  * @param cls closure, points to peer that the request belongs to
1381  * @param key current key code
1382  * @param value value in the hash map
1383  * @return GNUNET_YES (we should continue to iterate)
1384  */
1385 static int
1386 destroy_request (void *cls,
1387                  const GNUNET_HashCode * key,
1388                  void *value)
1389 {
1390   const struct GNUNET_PeerIdentity * peer = cls;
1391   struct PendingRequest *pr = value;
1392   
1393   GNUNET_break (GNUNET_YES ==
1394                 GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1395                                                       &peer->hashPubKey,
1396                                                       pr));
1397   destroy_pending_request (pr);
1398   return GNUNET_YES;
1399 }
1400
1401
1402 /**
1403  * Method called whenever a peer disconnects.
1404  *
1405  * @param cls closure, not used
1406  * @param peer peer identity this notification is about
1407  */
1408 static void
1409 peer_disconnect_handler (void *cls,
1410                          const struct
1411                          GNUNET_PeerIdentity * peer)
1412 {
1413   struct ConnectedPeer *cp;
1414   struct PendingMessage *pm;
1415   unsigned int i;
1416   struct MigrationReadyBlock *pos;
1417   struct MigrationReadyBlock *next;
1418
1419   GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map,
1420                                               &peer->hashPubKey,
1421                                               &destroy_request,
1422                                               (void*) peer);
1423   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1424                                           &peer->hashPubKey);
1425   if (cp == NULL)
1426     return;
1427   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1428     {
1429       if (NULL != cp->last_client_replies[i])
1430         {
1431           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1432           cp->last_client_replies[i] = NULL;
1433         }
1434     }
1435   GNUNET_break (GNUNET_YES ==
1436                 GNUNET_CONTAINER_multihashmap_remove (connected_peers,
1437                                                       &peer->hashPubKey,
1438                                                       cp));
1439   /* remove this peer from migration considerations; schedule
1440      alternatives */
1441   next = mig_head;
1442   while (NULL != (pos = next))
1443     {
1444       next = pos->next;
1445       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1446         {
1447           if (pos->target_list[i] == cp->pid)
1448             {
1449               GNUNET_PEER_change_rc (pos->target_list[i], -1);
1450               pos->target_list[i] = 0;
1451             }
1452          }
1453       if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers))
1454         {
1455           delete_migration_block (pos);
1456           consider_migration_gathering ();
1457           continue;
1458         }
1459       GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1460                                              &consider_migration,
1461                                              pos);
1462     }
1463   GNUNET_PEER_change_rc (cp->pid, -1);
1464   GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1465   if (NULL != cp->cth)
1466     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1467   while (NULL != (pm = cp->pending_messages_head))
1468     destroy_pending_message (pm, 0 /* delivery failed */);
1469   GNUNET_break (0 == cp->pending_requests);
1470   GNUNET_free (cp);
1471 }
1472
1473
1474 /**
1475  * Iterator over hash map entries that removes all occurences
1476  * of the given 'client' from the 'last_client_replies' of the
1477  * given connected peer.
1478  *
1479  * @param cls closure, the 'struct GNUNET_SERVER_Client*' to remove
1480  * @param key current key code (unused)
1481  * @param value value in the hash map (the 'struct ConnectedPeer*' to change)
1482  * @return GNUNET_YES (we should continue to iterate)
1483  */
1484 static int
1485 remove_client_from_last_client_replies (void *cls,
1486                                         const GNUNET_HashCode * key,
1487                                         void *value)
1488 {
1489   struct GNUNET_SERVER_Client *client = cls;
1490   struct ConnectedPeer *cp = value;
1491   unsigned int i;
1492
1493   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1494     {
1495       if (cp->last_client_replies[i] == client)
1496         {
1497           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1498           cp->last_client_replies[i] = NULL;
1499         }
1500     }  
1501   return GNUNET_YES;
1502 }
1503
1504
1505 /**
1506  * A client disconnected.  Remove all of its pending queries.
1507  *
1508  * @param cls closure, NULL
1509  * @param client identification of the client
1510  */
1511 static void
1512 handle_client_disconnect (void *cls,
1513                           struct GNUNET_SERVER_Client
1514                           * client)
1515 {
1516   struct ClientList *pos;
1517   struct ClientList *prev;
1518   struct ClientRequestList *rcl;
1519   struct ClientResponseMessage *creply;
1520
1521   if (client == NULL)
1522     return;
1523   prev = NULL;
1524   pos = client_list;
1525   while ( (NULL != pos) &&
1526           (pos->client != client) )
1527     {
1528       prev = pos;
1529       pos = pos->next;
1530     }
1531   if (pos == NULL)
1532     return; /* no requests pending for this client */
1533   while (NULL != (rcl = pos->rl_head))
1534     {
1535       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1536                   "Destroying pending request `%s' on disconnect\n",
1537                   GNUNET_h2s (&rcl->req->query));
1538       destroy_pending_request (rcl->req);
1539     }
1540   if (prev == NULL)
1541     client_list = pos->next;
1542   else
1543     prev->next = pos->next;
1544   if (pos->th != NULL)
1545     {
1546       GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
1547       pos->th = NULL;
1548     }
1549   while (NULL != (creply = pos->res_head))
1550     {
1551       GNUNET_CONTAINER_DLL_remove (pos->res_head,
1552                                    pos->res_tail,
1553                                    creply);
1554       GNUNET_free (creply);
1555     }    
1556   GNUNET_SERVER_client_drop (pos->client);
1557   GNUNET_free (pos);
1558   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1559                                          &remove_client_from_last_client_replies,
1560                                          client);
1561 }
1562
1563
1564 /**
1565  * Iterator to free peer entries.
1566  *
1567  * @param cls closure, unused
1568  * @param key current key code
1569  * @param value value in the hash map (peer entry)
1570  * @return GNUNET_YES (we should continue to iterate)
1571  */
1572 static int 
1573 clean_peer (void *cls,
1574             const GNUNET_HashCode * key,
1575             void *value)
1576 {
1577   peer_disconnect_handler (NULL, (const struct GNUNET_PeerIdentity*) key);
1578   return GNUNET_YES;
1579 }
1580
1581
1582 /**
1583  * Task run during shutdown.
1584  *
1585  * @param cls unused
1586  * @param tc unused
1587  */
1588 static void
1589 shutdown_task (void *cls,
1590                const struct GNUNET_SCHEDULER_TaskContext *tc)
1591 {
1592   if (mig_qe != NULL)
1593     {
1594       GNUNET_DATASTORE_cancel (mig_qe);
1595       mig_qe = NULL;
1596     }
1597   if (GNUNET_SCHEDULER_NO_TASK != mig_task)
1598     {
1599       GNUNET_SCHEDULER_cancel (sched, mig_task);
1600       mig_task = GNUNET_SCHEDULER_NO_TASK;
1601     }
1602   while (client_list != NULL)
1603     handle_client_disconnect (NULL,
1604                               client_list->client);
1605   cron_flush_trust (NULL, NULL);
1606   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1607                                          &clean_peer,
1608                                          NULL);
1609   GNUNET_break (0 == GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap));
1610   GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
1611   requests_by_expiration_heap = 0;
1612   GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
1613   connected_peers = NULL;
1614   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map));
1615   GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
1616   query_request_map = NULL;
1617   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map));
1618   GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
1619   peer_request_map = NULL;
1620   GNUNET_assert (NULL != core);
1621   GNUNET_CORE_disconnect (core);
1622   core = NULL;
1623   if (stats != NULL)
1624     {
1625       GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
1626       stats = NULL;
1627     }
1628   if (dsh != NULL)
1629     {
1630       GNUNET_DATASTORE_disconnect (dsh,
1631                                    GNUNET_NO);
1632       dsh = NULL;
1633     }
1634   while (mig_head != NULL)
1635     delete_migration_block (mig_head);
1636   GNUNET_assert (0 == mig_size);
1637   GNUNET_DHT_disconnect (dht_handle);
1638   dht_handle = NULL;
1639   GNUNET_LOAD_value_free (datastore_load);
1640   datastore_load = NULL;
1641   GNUNET_BLOCK_context_destroy (block_ctx);
1642   block_ctx = NULL;
1643   GNUNET_CONFIGURATION_destroy (block_cfg);
1644   block_cfg = NULL;
1645   sched = NULL;
1646   cfg = NULL;  
1647   GNUNET_free_non_null (trustDirectory);
1648   trustDirectory = NULL;
1649 }
1650
1651
1652 /* ******************* Utility functions  ******************** */
1653
1654
1655 /**
1656  * Transmit messages by copying it to the target buffer
1657  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
1658  * for writing in the meantime.  In that case, do nothing
1659  * (the disconnect or shutdown handler will take care of the rest).
1660  * If we were able to transmit messages and there are still more
1661  * pending, ask core again for further calls to this function.
1662  *
1663  * @param cls closure, pointer to the 'struct ConnectedPeer*'
1664  * @param size number of bytes available in buf
1665  * @param buf where the callee should write the message
1666  * @return number of bytes written to buf
1667  */
1668 static size_t
1669 transmit_to_peer (void *cls,
1670                   size_t size, void *buf)
1671 {
1672   struct ConnectedPeer *cp = cls;
1673   char *cbuf = buf;
1674   struct GNUNET_PeerIdentity pid;
1675   struct PendingMessage *pm;
1676   struct MigrationReadyBlock *mb;
1677   struct MigrationReadyBlock *next;
1678   struct PutMessage migm;
1679   size_t msize;
1680   unsigned int i;
1681  
1682   cp->cth = NULL;
1683   if (NULL == buf)
1684     {
1685 #if DEBUG_FS
1686       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1687                   "Dropping message, core too busy.\n");
1688 #endif
1689       return 0;
1690     }
1691   msize = 0;
1692   while ( (NULL != (pm = cp->pending_messages_head) ) &&
1693           (pm->msize <= size) )
1694     {
1695       memcpy (&cbuf[msize], &pm[1], pm->msize);
1696       msize += pm->msize;
1697       size -= pm->msize;
1698       destroy_pending_message (pm, cp->pid);
1699     }
1700   if (NULL != pm)
1701     {
1702       GNUNET_PEER_resolve (cp->pid,
1703                            &pid);
1704       cp->cth = GNUNET_CORE_notify_transmit_ready (core,
1705                                                    pm->priority,
1706                                                    GNUNET_CONSTANTS_SERVICE_TIMEOUT,
1707                                                    &pid,
1708                                                    pm->msize,
1709                                                    &transmit_to_peer,
1710                                                    cp);
1711     }
1712   else
1713     {      
1714       next = mig_head;
1715       while (NULL != (mb = next))
1716         {
1717           next = mb->next;
1718           for (i=0;i<MIGRATION_LIST_SIZE;i++)
1719             {
1720               if ( (cp->pid == mb->target_list[i]) &&
1721                    (mb->size + sizeof (migm) <= size) )
1722                 {
1723                   GNUNET_PEER_change_rc (mb->target_list[i], -1);
1724                   mb->target_list[i] = 0;
1725                   mb->used_targets++;
1726                   memset (&migm, 0, sizeof (migm));
1727                   migm.header.size = htons (sizeof (migm) + mb->size);
1728                   migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
1729                   migm.type = htonl (mb->type);
1730                   migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
1731                   memcpy (&cbuf[msize], &migm, sizeof (migm));
1732                   msize += sizeof (migm);
1733                   size -= sizeof (migm);
1734                   memcpy (&cbuf[msize], &mb[1], mb->size);
1735                   msize += mb->size;
1736                   size -= mb->size;
1737 #if DEBUG_FS
1738                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1739                               "Pushing migration block `%s' (%u bytes) to `%s'\n",
1740                               GNUNET_h2s (&mb->query),
1741                               mb->size,
1742                               GNUNET_i2s (&pid));
1743 #endif    
1744                   break;
1745                 }
1746               else
1747                 {
1748 #if DEBUG_FS
1749                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1750                               "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
1751                               GNUNET_h2s (&mb->query),
1752                               mb->size,
1753                               GNUNET_i2s (&pid));
1754 #endif    
1755                 }
1756             }
1757           if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
1758                (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
1759             {
1760               delete_migration_block (mb);
1761               consider_migration_gathering ();
1762             }
1763         }
1764       consider_migration (NULL, 
1765                           &pid.hashPubKey,
1766                           cp);
1767     }
1768 #if DEBUG_FS
1769   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1770               "Transmitting %u bytes to peer %u\n",
1771               msize,
1772               cp->pid);
1773 #endif
1774   return msize;
1775 }
1776
1777
1778 /**
1779  * Add a message to the set of pending messages for the given peer.
1780  *
1781  * @param cp peer to send message to
1782  * @param pm message to queue
1783  * @param pr request on which behalf this message is being queued
1784  */
1785 static void
1786 add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
1787                                   struct PendingMessage *pm,
1788                                   struct PendingRequest *pr)
1789 {
1790   struct PendingMessage *pos;
1791   struct PendingMessageList *pml;
1792   struct GNUNET_PeerIdentity pid;
1793
1794   GNUNET_assert (pm->next == NULL);
1795   GNUNET_assert (pm->pml == NULL);    
1796   pml = GNUNET_malloc (sizeof (struct PendingMessageList));
1797   pml->req = pr;
1798   pml->target = cp;
1799   pml->pm = pm;
1800   pm->pml = pml;  
1801   GNUNET_CONTAINER_DLL_insert (pr->pending_head,
1802                                pr->pending_tail,
1803                                pml);
1804   pos = cp->pending_messages_head;
1805   while ( (pos != NULL) &&
1806           (pm->priority < pos->priority) )
1807     pos = pos->next;    
1808   GNUNET_CONTAINER_DLL_insert_after (cp->pending_messages_head,
1809                                      cp->pending_messages_tail,
1810                                      pos,
1811                                      pm);
1812   cp->pending_requests++;
1813   if (cp->pending_requests > MAX_QUEUE_PER_PEER)
1814     destroy_pending_message (cp->pending_messages_tail, 0);  
1815   GNUNET_PEER_resolve (cp->pid, &pid);
1816   if (NULL != cp->cth)
1817     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1818   /* need to schedule transmission */
1819   cp->cth = GNUNET_CORE_notify_transmit_ready (core,
1820                                                cp->pending_messages_head->priority,
1821                                                MAX_TRANSMIT_DELAY,
1822                                                &pid,
1823                                                cp->pending_messages_head->msize,
1824                                                &transmit_to_peer,
1825                                                cp);
1826   if (cp->cth == NULL)
1827     {
1828 #if DEBUG_FS
1829       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1830                   "Failed to schedule transmission with core!\n");
1831 #endif
1832       GNUNET_STATISTICS_update (stats,
1833                                 gettext_noop ("# CORE transmission failures"),
1834                                 1,
1835                                 GNUNET_NO);
1836     }
1837 }
1838
1839
1840 /**
1841  * Test if the load on this peer is too high
1842  * to even consider processing the query at
1843  * all.
1844  * 
1845  * @return GNUNET_YES if the load is too high to do anything, GNUNET_NO to forward (load high, but not too high), GNUNET_SYSERR to indirect (load low)
1846  */
1847 static int
1848 test_load_too_high ()
1849 {
1850   return GNUNET_SYSERR; // FIXME
1851 }
1852
1853
1854 /* ******************* Pending Request Refresh Task ******************** */
1855
1856
1857
1858 /**
1859  * We use a random delay to make the timing of requests less
1860  * predictable.  This function returns such a random delay.  We add a base
1861  * delay of MAX_CORK_DELAY (1s).
1862  *
1863  * FIXME: make schedule dependent on the specifics of the request?
1864  * Or bandwidth and number of connected peers and load?
1865  *
1866  * @return random delay to use for some request, between 1s and 1000+TTL_DECREMENT ms
1867  */
1868 static struct GNUNET_TIME_Relative
1869 get_processing_delay ()
1870 {
1871   return 
1872     GNUNET_TIME_relative_add (GNUNET_CONSTANTS_MAX_CORK_DELAY,
1873                               GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
1874                                                              GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1875                                                                                        TTL_DECREMENT)));
1876 }
1877
1878
1879 /**
1880  * We're processing a GET request from another peer and have decided
1881  * to forward it to other peers.  This function is called periodically
1882  * and should forward the request to other peers until we have all
1883  * possible replies.  If we have transmitted the *only* reply to
1884  * the initiator we should destroy the pending request.  If we have
1885  * many replies in the queue to the initiator, we should delay sending
1886  * out more queries until the reply queue has shrunk some.
1887  *
1888  * @param cls our "struct ProcessGetContext *"
1889  * @param tc unused
1890  */
1891 static void
1892 forward_request_task (void *cls,
1893                       const struct GNUNET_SCHEDULER_TaskContext *tc);
1894
1895
1896 /**
1897  * Function called after we either failed or succeeded
1898  * at transmitting a query to a peer.  
1899  *
1900  * @param cls the requests "struct PendingRequest*"
1901  * @param tpid ID of receiving peer, 0 on transmission error
1902  */
1903 static void
1904 transmit_query_continuation (void *cls,
1905                              GNUNET_PEER_Id tpid)
1906 {
1907   struct PendingRequest *pr = cls;
1908
1909   GNUNET_STATISTICS_update (stats,
1910                             gettext_noop ("# queries scheduled for forwarding"),
1911                             -1,
1912                             GNUNET_NO);
1913   if (tpid == 0)   
1914     {
1915 #if DEBUG_FS
1916       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1917                   "Transmission of request failed, will try again later.\n");
1918 #endif
1919       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
1920         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1921                                                  get_processing_delay (),
1922                                                  &forward_request_task,
1923                                                  pr); 
1924       return;    
1925     }
1926   GNUNET_STATISTICS_update (stats,
1927                             gettext_noop ("# queries forwarded"),
1928                             1,
1929                             GNUNET_NO);
1930   GNUNET_PEER_change_rc (tpid, 1);
1931   if (pr->used_pids_off == pr->used_pids_size)
1932     GNUNET_array_grow (pr->used_pids,
1933                        pr->used_pids_size,
1934                        pr->used_pids_size * 2 + 2);
1935   pr->used_pids[pr->used_pids_off++] = tpid;
1936   if (pr->task == GNUNET_SCHEDULER_NO_TASK)
1937     pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1938                                              get_processing_delay (),
1939                                              &forward_request_task,
1940                                              pr);
1941 }
1942
1943
1944 /**
1945  * How many bytes should a bloomfilter be if we have already seen
1946  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
1947  * of bits set per entry.  Furthermore, we should not re-size the
1948  * filter too often (to keep it cheap).
1949  *
1950  * Since other peers will also add entries but not resize the filter,
1951  * we should generally pick a slightly larger size than what the
1952  * strict math would suggest.
1953  *
1954  * @return must be a power of two and smaller or equal to 2^15.
1955  */
1956 static size_t
1957 compute_bloomfilter_size (unsigned int entry_count)
1958 {
1959   size_t size;
1960   unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
1961   uint16_t max = 1 << 15;
1962
1963   if (entry_count > max)
1964     return max;
1965   size = 8;
1966   while ((size < max) && (size < ideal))
1967     size *= 2;
1968   if (size > max)
1969     return max;
1970   return size;
1971 }
1972
1973
1974 /**
1975  * Recalculate our bloom filter for filtering replies.  This function
1976  * will create a new bloom filter from scratch, so it should only be
1977  * called if we have no bloomfilter at all (and hence can create a
1978  * fresh one of minimal size without problems) OR if our peer is the
1979  * initiator (in which case we may resize to larger than mimimum size).
1980  *
1981  * @param pr request for which the BF is to be recomputed
1982  */
1983 static void
1984 refresh_bloomfilter (struct PendingRequest *pr)
1985 {
1986   unsigned int i;
1987   size_t nsize;
1988   GNUNET_HashCode mhash;
1989
1990   nsize = compute_bloomfilter_size (pr->replies_seen_off);
1991   if (nsize == pr->bf_size)
1992     return; /* size not changed */
1993   if (pr->bf != NULL)
1994     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
1995   pr->bf_size = nsize;
1996   pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
1997   pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
1998                                               pr->bf_size,
1999                                               BLOOMFILTER_K);
2000   for (i=0;i<pr->replies_seen_off;i++)
2001     {
2002       GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i],
2003                                 pr->mingle,
2004                                 &mhash);
2005       GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
2006     }
2007 }
2008
2009
2010 /**
2011  * Function called after we've tried to reserve a certain amount of
2012  * bandwidth for a reply.  Check if we succeeded and if so send our
2013  * query.
2014  *
2015  * @param cls the requests "struct PendingRequest*"
2016  * @param peer identifies the peer
2017  * @param bpm_in set to the current bandwidth limit (receiving) for this peer
2018  * @param bpm_out set to the current bandwidth limit (sending) for this peer
2019  * @param amount set to the amount that was actually reserved or unreserved
2020  * @param preference current traffic preference for the given peer
2021  */
2022 static void
2023 target_reservation_cb (void *cls,
2024                        const struct
2025                        GNUNET_PeerIdentity * peer,
2026                        struct GNUNET_BANDWIDTH_Value32NBO bpm_in,
2027                        struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
2028                        int amount,
2029                        uint64_t preference)
2030 {
2031   struct PendingRequest *pr = cls;
2032   struct ConnectedPeer *cp;
2033   struct PendingMessage *pm;
2034   struct GetMessage *gm;
2035   GNUNET_HashCode *ext;
2036   char *bfdata;
2037   size_t msize;
2038   unsigned int k;
2039   int no_route;
2040   uint32_t bm;
2041
2042   pr->irc = NULL;
2043   if (peer == NULL)
2044     {
2045       /* error in communication with core, try again later */
2046       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2047         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2048                                                  get_processing_delay (),
2049                                                  &forward_request_task,
2050                                                  pr);
2051       return;
2052     }
2053   // (3) transmit, update ttl/priority
2054   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2055                                           &peer->hashPubKey);
2056   if (cp == NULL)
2057     {
2058       /* Peer must have just left */
2059 #if DEBUG_FS
2060       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2061                   "Selected peer disconnected!\n");
2062 #endif
2063       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2064         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2065                                                  get_processing_delay (),
2066                                                  &forward_request_task,
2067                                                  pr);
2068       return;
2069     }
2070   no_route = GNUNET_NO;
2071   if (amount == 0)
2072     {
2073       if (pr->cp == NULL)
2074         {
2075 #if DEBUG_FS > 1
2076           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2077                       "Failed to reserve bandwidth for reply (got %d/%u bytes only)!\n",
2078                       amount,
2079                       DBLOCK_SIZE);
2080 #endif
2081           GNUNET_STATISTICS_update (stats,
2082                                     gettext_noop ("# reply bandwidth reservation requests failed"),
2083                                     1,
2084                                     GNUNET_NO);
2085           if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2086             pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2087                                                      get_processing_delay (),
2088                                                      &forward_request_task,
2089                                                      pr);
2090           return;  /* this target round failed */
2091         }
2092       /* FIXME: if we are "quite" busy, we may still want to skip
2093          this round; need more load detection code! */
2094       no_route = GNUNET_YES;
2095     }
2096   
2097   GNUNET_STATISTICS_update (stats,
2098                             gettext_noop ("# queries scheduled for forwarding"),
2099                             1,
2100                             GNUNET_NO);
2101   /* build message and insert message into priority queue */
2102 #if DEBUG_FS
2103   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2104               "Forwarding request `%s' to `%4s'!\n",
2105               GNUNET_h2s (&pr->query),
2106               GNUNET_i2s (peer));
2107 #endif
2108   k = 0;
2109   bm = 0;
2110   if (GNUNET_YES == no_route)
2111     {
2112       bm |= GET_MESSAGE_BIT_RETURN_TO;
2113       k++;      
2114     }
2115   if (pr->namespace != NULL)
2116     {
2117       bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
2118       k++;
2119     }
2120   if (pr->target_pid != 0)
2121     {
2122       bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
2123       k++;
2124     }
2125   msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
2126   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
2127   pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
2128   pm->msize = msize;
2129   gm = (struct GetMessage*) &pm[1];
2130   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
2131   gm->header.size = htons (msize);
2132   gm->type = htonl (pr->type);
2133   pr->remaining_priority /= 2;
2134   gm->priority = htonl (pr->remaining_priority);
2135   gm->ttl = htonl (pr->ttl);
2136   gm->filter_mutator = htonl(pr->mingle); 
2137   gm->hash_bitmap = htonl (bm);
2138   gm->query = pr->query;
2139   ext = (GNUNET_HashCode*) &gm[1];
2140   k = 0;
2141   if (GNUNET_YES == no_route)
2142     GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2143   if (pr->namespace != NULL)
2144     memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
2145   if (pr->target_pid != 0)
2146     GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2147   bfdata = (char *) &ext[k];
2148   if (pr->bf != NULL)
2149     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
2150                                                bfdata,
2151                                                pr->bf_size);
2152   pm->cont = &transmit_query_continuation;
2153   pm->cont_cls = pr;
2154   add_to_pending_messages_for_peer (cp, pm, pr);
2155 }
2156
2157
2158 /**
2159  * Closure used for "target_peer_select_cb".
2160  */
2161 struct PeerSelectionContext 
2162 {
2163   /**
2164    * The request for which we are selecting
2165    * peers.
2166    */
2167   struct PendingRequest *pr;
2168
2169   /**
2170    * Current "prime" target.
2171    */
2172   struct GNUNET_PeerIdentity target;
2173
2174   /**
2175    * How much do we like this target?
2176    */
2177   double target_score;
2178
2179 };
2180
2181
2182 /**
2183  * Function called for each connected peer to determine
2184  * which one(s) would make good targets for forwarding.
2185  *
2186  * @param cls closure (struct PeerSelectionContext)
2187  * @param key current key code (peer identity)
2188  * @param value value in the hash map (struct ConnectedPeer)
2189  * @return GNUNET_YES if we should continue to
2190  *         iterate,
2191  *         GNUNET_NO if not.
2192  */
2193 static int
2194 target_peer_select_cb (void *cls,
2195                        const GNUNET_HashCode * key,
2196                        void *value)
2197 {
2198   struct PeerSelectionContext *psc = cls;
2199   struct ConnectedPeer *cp = value;
2200   struct PendingRequest *pr = psc->pr;
2201   double score;
2202   unsigned int i;
2203   unsigned int pc;
2204
2205   /* 1) check that this peer is not the initiator */
2206   if (cp == pr->cp)
2207     {
2208 #if DEBUG_FS
2209       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2210                   "Skipping initiator in forwarding selection\n");
2211 #endif
2212       return GNUNET_YES; /* skip */        
2213     }
2214
2215   /* 2) check if we have already (recently) forwarded to this peer */
2216   pc = 0;
2217   for (i=0;i<pr->used_pids_off;i++)
2218     if (pr->used_pids[i] == cp->pid) 
2219       {
2220         pc++;
2221         if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2222                                            RETRY_PROBABILITY_INV))
2223           {
2224 #if DEBUG_FS
2225             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2226                         "NOT re-trying query that was previously transmitted %u times\n",
2227                         (unsigned int) pr->used_pids_off);
2228 #endif
2229             return GNUNET_YES; /* skip */
2230           }
2231       }
2232 #if DEBUG_FS
2233   if (0 < pc)
2234     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2235                 "Re-trying query that was previously transmitted %u times to this peer\n",
2236                 (unsigned int) pc);
2237 #endif
2238   /* 3) calculate how much we'd like to forward to this peer,
2239      starting with a random value that is strong enough
2240      to at least give any peer a chance sometimes 
2241      (compared to the other factors that come later) */
2242   /* 3a) count successful (recent) routes from cp for same source */
2243   if (pr->cp != NULL)
2244     {
2245       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2246                                         P2P_SUCCESS_LIST_SIZE);
2247       for (i=0;i<P2P_SUCCESS_LIST_SIZE;i++)
2248         if (cp->last_p2p_replies[i] == pr->cp->pid)
2249           score += 1; /* likely successful based on hot path */
2250     }
2251   else
2252     {
2253       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2254                                         CS2P_SUCCESS_LIST_SIZE);
2255       for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
2256         if (cp->last_client_replies[i] == pr->client_request_list->client_list->client)
2257           score += 1; /* likely successful based on hot path */
2258     }
2259   /* 3b) include latency */
2260   if (cp->avg_delay.value < 4 * TTL_DECREMENT)
2261     score += 1; /* likely fast based on latency */
2262   /* 3c) include priorities */
2263   if (cp->avg_priority <= pr->remaining_priority / 2.0)
2264     score += 1; /* likely successful based on priorities */
2265   /* 3d) penalize for queue size */  
2266   score -= (2.0 * cp->pending_requests / (double) MAX_QUEUE_PER_PEER); 
2267   /* 3e) include peer proximity */
2268   score -= (2.0 * (GNUNET_CRYPTO_hash_distance_u32 (key,
2269                                                     &pr->query)) / (double) UINT32_MAX);
2270   /* 4) super-bonus for being the known target */
2271   if (pr->target_pid == cp->pid)
2272     score += 100.0;
2273   /* store best-fit in closure */
2274 #if DEBUG_FS
2275   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2276               "Peer `%s' gets score %f for forwarding query, max is %f\n",
2277               GNUNET_h2s (key),
2278               score,
2279               psc->target_score);
2280 #endif  
2281   score++; /* avoid zero */
2282   if (score > psc->target_score)
2283     {
2284       psc->target_score = score;
2285       psc->target.hashPubKey = *key; 
2286     }
2287   return GNUNET_YES;
2288 }
2289   
2290
2291 /**
2292  * The priority level imposes a bound on the maximum
2293  * value for the ttl that can be requested.
2294  *
2295  * @param ttl_in requested ttl
2296  * @param prio given priority
2297  * @return ttl_in if ttl_in is below the limit,
2298  *         otherwise the ttl-limit for the given priority
2299  */
2300 static int32_t
2301 bound_ttl (int32_t ttl_in, uint32_t prio)
2302 {
2303   unsigned long long allowed;
2304
2305   if (ttl_in <= 0)
2306     return ttl_in;
2307   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
2308   if (ttl_in > allowed)      
2309     {
2310       if (allowed >= (1 << 30))
2311         return 1 << 30;
2312       return allowed;
2313     }
2314   return ttl_in;
2315 }
2316
2317
2318 /**
2319  * We're processing a GET request and have decided
2320  * to forward it to other peers.  This function is called periodically
2321  * and should forward the request to other peers until we have all
2322  * possible replies.  If we have transmitted the *only* reply to
2323  * the initiator we should destroy the pending request.  If we have
2324  * many replies in the queue to the initiator, we should delay sending
2325  * out more queries until the reply queue has shrunk some.
2326  *
2327  * @param cls our "struct ProcessGetContext *"
2328  * @param tc unused
2329  */
2330 static void
2331 forward_request_task (void *cls,
2332                      const struct GNUNET_SCHEDULER_TaskContext *tc)
2333 {
2334   struct PendingRequest *pr = cls;
2335   struct PeerSelectionContext psc;
2336   struct ConnectedPeer *cp; 
2337   struct GNUNET_TIME_Relative delay;
2338
2339   pr->task = GNUNET_SCHEDULER_NO_TASK;
2340   if (pr->irc != NULL)
2341     {
2342 #if DEBUG_FS
2343       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2344                   "Forwarding of query `%s' not attempted due to pending local lookup!\n",
2345                   GNUNET_h2s (&pr->query));
2346 #endif
2347       return; /* already pending */
2348     }
2349   if (GNUNET_YES == pr->local_only)
2350     return; /* configured to not do P2P search */
2351   /* (0) try DHT */
2352   if (0 == pr->anonymity_level)
2353     {
2354 #if 0      
2355       /* DHT API needs fixing... */
2356       pr->dht_get = GNUNET_DHT_get_start (dht_handle,
2357                                           GNUNET_TIME_UNIT_FOREVER_REL,
2358                                           pr->type,
2359                                           &pr->query,
2360                                           &process_dht_reply,
2361                                           pr,
2362                                           FIXME,
2363                                           FIXME);
2364 #endif                                    
2365     }
2366   /* (1) select target */
2367   psc.pr = pr;
2368   psc.target_score = -DBL_MAX;
2369   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
2370                                          &target_peer_select_cb,
2371                                          &psc);  
2372   if (psc.target_score == -DBL_MAX)
2373     {
2374       delay = get_processing_delay ();
2375 #if DEBUG_FS 
2376       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2377                   "No peer selected for forwarding of query `%s', will try again in %llu ms!\n",
2378                   GNUNET_h2s (&pr->query),
2379                   delay.value);
2380 #endif
2381       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2382                                                delay,
2383                                                &forward_request_task,
2384                                                pr);
2385       return; /* nobody selected */
2386     }
2387   /* (3) update TTL/priority */
2388   if (pr->client_request_list != NULL)
2389     {
2390       /* FIXME: use better algorithm!? */
2391       if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2392                                          4))
2393         pr->priority++;
2394       /* bound priority we use by priorities we see from other peers
2395          rounded up (must round up so that we can see non-zero
2396          priorities, but round up as little as possible to make it
2397          plausible that we forwarded another peers request) */
2398       if (pr->priority > current_priorities + 1.0)
2399         pr->priority = (uint32_t) current_priorities + 1.0;
2400       pr->ttl = bound_ttl (pr->ttl + TTL_DECREMENT * 2,
2401                            pr->priority);
2402 #if DEBUG_FS
2403       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2404                   "Trying query `%s' with priority %u and TTL %d.\n",
2405                   GNUNET_h2s (&pr->query),
2406                   pr->priority,
2407                   pr->ttl);
2408 #endif
2409     }
2410
2411   /* (3) reserve reply bandwidth */
2412   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2413                                           &psc.target.hashPubKey);
2414   GNUNET_assert (NULL != cp);
2415   pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
2416                                                 &psc.target,
2417                                                 GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
2418                                                 GNUNET_BANDWIDTH_value_init (UINT32_MAX),
2419                                                 DBLOCK_SIZE * 2, 
2420                                                 cp->inc_preference,
2421                                                 &target_reservation_cb,
2422                                                 pr);
2423   cp->inc_preference = 0;
2424 }
2425
2426
2427 /* **************************** P2P PUT Handling ************************ */
2428
2429
2430 /**
2431  * Function called after we either failed or succeeded
2432  * at transmitting a reply to a peer.  
2433  *
2434  * @param cls the requests "struct PendingRequest*"
2435  * @param tpid ID of receiving peer, 0 on transmission error
2436  */
2437 static void
2438 transmit_reply_continuation (void *cls,
2439                              GNUNET_PEER_Id tpid)
2440 {
2441   struct PendingRequest *pr = cls;
2442   
2443   switch (pr->type)
2444     {
2445     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
2446     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
2447       /* only one reply expected, done with the request! */
2448       destroy_pending_request (pr);
2449       break;
2450     case GNUNET_BLOCK_TYPE_ANY:
2451     case GNUNET_BLOCK_TYPE_FS_KBLOCK:
2452     case GNUNET_BLOCK_TYPE_FS_SBLOCK:
2453       break;
2454     default:
2455       GNUNET_break (0);
2456       break;
2457     }
2458 }
2459
2460
2461 /**
2462  * Transmit the given message by copying it to the target buffer
2463  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
2464  * for writing in the meantime.  In that case, do nothing
2465  * (the disconnect or shutdown handler will take care of the rest).
2466  * If we were able to transmit messages and there are still more
2467  * pending, ask core again for further calls to this function.
2468  *
2469  * @param cls closure, pointer to the 'struct ClientList*'
2470  * @param size number of bytes available in buf
2471  * @param buf where the callee should write the message
2472  * @return number of bytes written to buf
2473  */
2474 static size_t
2475 transmit_to_client (void *cls,
2476                   size_t size, void *buf)
2477 {
2478   struct ClientList *cl = cls;
2479   char *cbuf = buf;
2480   struct ClientResponseMessage *creply;
2481   size_t msize;
2482   
2483   cl->th = NULL;
2484   if (NULL == buf)
2485     {
2486 #if DEBUG_FS
2487       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2488                   "Not sending reply, client communication problem.\n");
2489 #endif
2490       return 0;
2491     }
2492   msize = 0;
2493   while ( (NULL != (creply = cl->res_head) ) &&
2494           (creply->msize <= size) )
2495     {
2496       memcpy (&cbuf[msize], &creply[1], creply->msize);
2497       msize += creply->msize;
2498       size -= creply->msize;
2499       GNUNET_CONTAINER_DLL_remove (cl->res_head,
2500                                    cl->res_tail,
2501                                    creply);
2502       GNUNET_free (creply);
2503     }
2504   if (NULL != creply)
2505     cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
2506                                                   creply->msize,
2507                                                   GNUNET_TIME_UNIT_FOREVER_REL,
2508                                                   &transmit_to_client,
2509                                                   cl);
2510 #if DEBUG_FS
2511   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2512               "Transmitted %u bytes to client\n",
2513               (unsigned int) msize);
2514 #endif
2515   return msize;
2516 }
2517
2518
2519 /**
2520  * Closure for "process_reply" function.
2521  */
2522 struct ProcessReplyClosure
2523 {
2524   /**
2525    * The data for the reply.
2526    */
2527   const void *data;
2528
2529   /**
2530    * Who gave us this reply? NULL for local host.
2531    */
2532   struct ConnectedPeer *sender;
2533
2534   /**
2535    * When the reply expires.
2536    */
2537   struct GNUNET_TIME_Absolute expiration;
2538
2539   /**
2540    * Size of data.
2541    */
2542   size_t size;
2543
2544   /**
2545    * Type of the block.
2546    */
2547   enum GNUNET_BLOCK_Type type;
2548
2549   /**
2550    * How much was this reply worth to us?
2551    */
2552   uint32_t priority;
2553
2554   /**
2555    * Evaluation result (returned).
2556    */
2557   enum GNUNET_BLOCK_EvaluationResult eval;
2558
2559   /**
2560    * Did we finish processing the associated request?
2561    */ 
2562   int finished;
2563 };
2564
2565
2566 /**
2567  * We have received a reply; handle it!
2568  *
2569  * @param cls response (struct ProcessReplyClosure)
2570  * @param key our query
2571  * @param value value in the hash map (info about the query)
2572  * @return GNUNET_YES (we should continue to iterate)
2573  */
2574 static int
2575 process_reply (void *cls,
2576                const GNUNET_HashCode * key,
2577                void *value)
2578 {
2579   struct ProcessReplyClosure *prq = cls;
2580   struct PendingRequest *pr = value;
2581   struct PendingMessage *reply;
2582   struct ClientResponseMessage *creply;
2583   struct ClientList *cl;
2584   struct PutMessage *pm;
2585   struct ConnectedPeer *cp;
2586   struct GNUNET_TIME_Relative cur_delay;
2587   size_t msize;
2588
2589 #if DEBUG_FS
2590   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2591               "Matched result (type %u) for query `%s' with pending request\n",
2592               (unsigned int) prq->type,
2593               GNUNET_h2s (key));
2594 #endif  
2595   GNUNET_STATISTICS_update (stats,
2596                             gettext_noop ("# replies received and matched"),
2597                             1,
2598                             GNUNET_NO);
2599   if (prq->sender != NULL)
2600     {
2601       /* FIXME: should we be more precise here and not use
2602          "start_time" but a peer-specific time stamp? */
2603       cur_delay = GNUNET_TIME_absolute_get_duration (pr->start_time);
2604       prq->sender->avg_delay.value
2605         = (prq->sender->avg_delay.value * 
2606            (RUNAVG_DELAY_N - 1) + cur_delay.value) / RUNAVG_DELAY_N; 
2607       prq->sender->avg_priority
2608         = (prq->sender->avg_priority * 
2609            (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
2610       if (pr->cp != NULL)
2611         {
2612           GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
2613                                  [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], 
2614                                  -1);
2615           GNUNET_PEER_change_rc (pr->cp->pid, 1);
2616           prq->sender->last_p2p_replies
2617             [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
2618             = pr->cp->pid;
2619         }
2620       else
2621         {
2622           if (NULL != prq->sender->last_client_replies
2623               [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
2624             GNUNET_SERVER_client_drop (prq->sender->last_client_replies
2625                                        [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
2626           prq->sender->last_client_replies
2627             [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
2628             = pr->client_request_list->client_list->client;
2629           GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
2630         }
2631     }
2632   prq->eval = GNUNET_BLOCK_evaluate (block_ctx,
2633                                      prq->type,
2634                                      key,
2635                                      &pr->bf,
2636                                      pr->mingle,
2637                                      pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
2638                                      prq->data,
2639                                      prq->size);
2640   switch (prq->eval)
2641     {
2642     case GNUNET_BLOCK_EVALUATION_OK_MORE:
2643       break;
2644     case GNUNET_BLOCK_EVALUATION_OK_LAST:
2645       while (NULL != pr->pending_head)
2646         destroy_pending_message_list_entry (pr->pending_head);
2647       if (pr->qe != NULL)
2648         {
2649           if (pr->client_request_list != NULL)
2650             GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
2651                                         GNUNET_YES);
2652           GNUNET_DATASTORE_cancel (pr->qe);
2653           pr->qe = NULL;
2654         }
2655       pr->do_remove = GNUNET_YES;
2656       if (pr->task != GNUNET_SCHEDULER_NO_TASK)
2657         {
2658           GNUNET_SCHEDULER_cancel (sched,
2659                                    pr->task);
2660           pr->task = GNUNET_SCHEDULER_NO_TASK;
2661         }
2662       GNUNET_break (GNUNET_YES ==
2663                     GNUNET_CONTAINER_multihashmap_remove (query_request_map,
2664                                                           key,
2665                                                           pr));
2666       break;
2667     case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
2668       GNUNET_STATISTICS_update (stats,
2669                                 gettext_noop ("# duplicate replies discarded (bloomfilter)"),
2670                                 1,
2671                                 GNUNET_NO);
2672 #if DEBUG_FS
2673       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2674                   "Duplicate response `%s', discarding.\n",
2675                   GNUNET_h2s (&mhash));
2676 #endif
2677       return GNUNET_YES; /* duplicate */
2678     case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
2679       return GNUNET_YES; /* wrong namespace */  
2680     case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
2681       GNUNET_break (0);
2682       return GNUNET_YES;
2683     case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
2684       GNUNET_break (0);
2685       return GNUNET_YES;
2686     case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
2687       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2688                   _("Unsupported block type %u\n"),
2689                   prq->type);
2690       return GNUNET_NO;
2691     }
2692   if (pr->client_request_list != NULL)
2693     {
2694       if (pr->replies_seen_size == pr->replies_seen_off)
2695         GNUNET_array_grow (pr->replies_seen,
2696                            pr->replies_seen_size,
2697                            pr->replies_seen_size * 2 + 4);      
2698       GNUNET_CRYPTO_hash (prq->data,
2699                           prq->size,
2700                           &pr->replies_seen[pr->replies_seen_off++]);         
2701       refresh_bloomfilter (pr);
2702     }
2703   if (NULL == prq->sender)
2704     {
2705 #if DEBUG_FS
2706       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2707                   "Found result for query `%s' in local datastore\n",
2708                   GNUNET_h2s (key));
2709 #endif
2710       GNUNET_STATISTICS_update (stats,
2711                                 gettext_noop ("# results found locally"),
2712                                 1,
2713                                 GNUNET_NO);      
2714     }
2715   prq->priority += pr->remaining_priority;
2716   pr->remaining_priority = 0;
2717   pr->results_found++;
2718   if (NULL != pr->client_request_list)
2719     {
2720       GNUNET_STATISTICS_update (stats,
2721                                 gettext_noop ("# replies received for local clients"),
2722                                 1,
2723                                 GNUNET_NO);
2724       cl = pr->client_request_list->client_list;
2725       msize = sizeof (struct PutMessage) + prq->size;
2726       creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage));
2727       creply->msize = msize;
2728       creply->client_list = cl;
2729       GNUNET_CONTAINER_DLL_insert_after (cl->res_head,
2730                                          cl->res_tail,
2731                                          cl->res_tail,
2732                                          creply);      
2733       pm = (struct PutMessage*) &creply[1];
2734       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2735       pm->header.size = htons (msize);
2736       pm->type = htonl (prq->type);
2737       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
2738       memcpy (&pm[1], prq->data, prq->size);      
2739       if (NULL == cl->th)
2740         {
2741 #if DEBUG_FS
2742           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2743                       "Transmitting result for query `%s' to client\n",
2744                       GNUNET_h2s (key));
2745 #endif  
2746           cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
2747                                                         msize,
2748                                                         GNUNET_TIME_UNIT_FOREVER_REL,
2749                                                         &transmit_to_client,
2750                                                         cl);
2751         }
2752       GNUNET_break (cl->th != NULL);
2753       if (pr->do_remove)                
2754         {
2755           prq->finished = GNUNET_YES;
2756           destroy_pending_request (pr);         
2757         }
2758     }
2759   else
2760     {
2761       cp = pr->cp;
2762 #if DEBUG_FS
2763       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2764                   "Transmitting result for query `%s' to other peer (PID=%u)\n",
2765                   GNUNET_h2s (key),
2766                   (unsigned int) cp->pid);
2767 #endif  
2768       GNUNET_STATISTICS_update (stats,
2769                                 gettext_noop ("# replies received for other peers"),
2770                                 1,
2771                                 GNUNET_NO);
2772       msize = sizeof (struct PutMessage) + prq->size;
2773       reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
2774       reply->cont = &transmit_reply_continuation;
2775       reply->cont_cls = pr;
2776       reply->msize = msize;
2777       reply->priority = UINT32_MAX; /* send replies first! */
2778       pm = (struct PutMessage*) &reply[1];
2779       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2780       pm->header.size = htons (msize);
2781       pm->type = htonl (prq->type);
2782       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
2783       memcpy (&pm[1], prq->data, prq->size);
2784       add_to_pending_messages_for_peer (cp, reply, pr);
2785     }
2786   return GNUNET_YES;
2787 }
2788
2789
2790 /**
2791  * Continuation called to notify client about result of the
2792  * operation.
2793  *
2794  * @param cls closure
2795  * @param success GNUNET_SYSERR on failure
2796  * @param msg NULL on success, otherwise an error message
2797  */
2798 static void 
2799 put_migration_continuation (void *cls,
2800                             int success,
2801                             const char *msg)
2802 {
2803   /* FIXME */
2804 }
2805
2806
2807 /**
2808  * Handle P2P "PUT" message.
2809  *
2810  * @param cls closure, always NULL
2811  * @param other the other peer involved (sender or receiver, NULL
2812  *        for loopback messages where we are both sender and receiver)
2813  * @param message the actual message
2814  * @param latency reported latency of the connection with 'other'
2815  * @param distance reported distance (DV) to 'other' 
2816  * @return GNUNET_OK to keep the connection open,
2817  *         GNUNET_SYSERR to close it (signal serious error)
2818  */
2819 static int
2820 handle_p2p_put (void *cls,
2821                 const struct GNUNET_PeerIdentity *other,
2822                 const struct GNUNET_MessageHeader *message,
2823                 struct GNUNET_TIME_Relative latency,
2824                 uint32_t distance)
2825 {
2826   const struct PutMessage *put;
2827   uint16_t msize;
2828   size_t dsize;
2829   enum GNUNET_BLOCK_Type type;
2830   struct GNUNET_TIME_Absolute expiration;
2831   GNUNET_HashCode query;
2832   struct ProcessReplyClosure prq;
2833
2834   msize = ntohs (message->size);
2835   if (msize < sizeof (struct PutMessage))
2836     {
2837       GNUNET_break_op(0);
2838       return GNUNET_SYSERR;
2839     }
2840   put = (const struct PutMessage*) message;
2841   dsize = msize - sizeof (struct PutMessage);
2842   type = ntohl (put->type);
2843   expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
2844
2845   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
2846     return GNUNET_SYSERR;
2847   if (GNUNET_OK !=
2848       GNUNET_BLOCK_get_key (block_ctx,
2849                             type,
2850                             &put[1],
2851                             dsize,
2852                             &query))
2853     {
2854       GNUNET_break_op (0);
2855       return GNUNET_SYSERR;
2856     }
2857 #if DEBUG_FS
2858   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2859               "Received result for query `%s' from peer `%4s'\n",
2860               GNUNET_h2s (&query),
2861               GNUNET_i2s (other));
2862 #endif
2863   GNUNET_STATISTICS_update (stats,
2864                             gettext_noop ("# replies received (overall)"),
2865                             1,
2866                             GNUNET_NO);
2867   /* now, lookup 'query' */
2868   prq.data = (const void*) &put[1];
2869   if (other != NULL)
2870     prq.sender = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2871                                                     &other->hashPubKey);
2872   else
2873     prq.sender = NULL;
2874   prq.size = dsize;
2875   prq.type = type;
2876   prq.expiration = expiration;
2877   prq.priority = 0;
2878   prq.finished = GNUNET_NO;
2879   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
2880                                               &query,
2881                                               &process_reply,
2882                                               &prq);
2883   if (prq.sender != NULL)
2884     {
2885       prq.sender->inc_preference += CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority;
2886       prq.sender->trust += prq.priority;
2887     }
2888   if (GNUNET_YES == active_migration)
2889     {
2890 #if DEBUG_FS
2891       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2892                   "Replicating result for query `%s' with priority %u\n",
2893                   GNUNET_h2s (&query),
2894                   prq.priority);
2895 #endif
2896       GNUNET_DATASTORE_put (dsh,
2897                             0, &query, dsize, &put[1],
2898                             type, prq.priority, 1 /* anonymity */, 
2899                             expiration, 
2900                             1 + prq.priority, MAX_DATASTORE_QUEUE,
2901                             GNUNET_CONSTANTS_SERVICE_TIMEOUT,
2902                             &put_migration_continuation, 
2903                             NULL);
2904     }
2905   return GNUNET_OK;
2906 }
2907
2908
2909 /**
2910  * Handle P2P "MIGRATION_STOP" message.
2911  *
2912  * @param cls closure, always NULL
2913  * @param other the other peer involved (sender or receiver, NULL
2914  *        for loopback messages where we are both sender and receiver)
2915  * @param message the actual message
2916  * @param latency reported latency of the connection with 'other'
2917  * @param distance reported distance (DV) to 'other' 
2918  * @return GNUNET_OK to keep the connection open,
2919  *         GNUNET_SYSERR to close it (signal serious error)
2920  */
2921 static int
2922 handle_p2p_migration_stop (void *cls,
2923                            const struct GNUNET_PeerIdentity *other,
2924                            const struct GNUNET_MessageHeader *message,
2925                            struct GNUNET_TIME_Relative latency,
2926                            uint32_t distance)
2927 {
2928   // FIXME!
2929   return GNUNET_OK;
2930 }
2931
2932
2933
2934 /* **************************** P2P GET Handling ************************ */
2935
2936
2937 /**
2938  * Closure for 'check_duplicate_request_{peer,client}'.
2939  */
2940 struct CheckDuplicateRequestClosure
2941 {
2942   /**
2943    * The new request we should check if it already exists.
2944    */
2945   const struct PendingRequest *pr;
2946
2947   /**
2948    * Existing request found by the checker, NULL if none.
2949    */
2950   struct PendingRequest *have;
2951 };
2952
2953
2954 /**
2955  * Iterator over entries in the 'query_request_map' that
2956  * tries to see if we have the same request pending from
2957  * the same client already.
2958  *
2959  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
2960  * @param key current key code (query, ignored, must match)
2961  * @param value value in the hash map (a 'struct PendingRequest' 
2962  *              that already exists)
2963  * @return GNUNET_YES if we should continue to
2964  *         iterate (no match yet)
2965  *         GNUNET_NO if not (match found).
2966  */
2967 static int
2968 check_duplicate_request_client (void *cls,
2969                                 const GNUNET_HashCode * key,
2970                                 void *value)
2971 {
2972   struct CheckDuplicateRequestClosure *cdc = cls;
2973   struct PendingRequest *have = value;
2974
2975   if (have->client_request_list == NULL)
2976     return GNUNET_YES;
2977   if ( (cdc->pr->client_request_list->client_list->client == have->client_request_list->client_list->client) &&
2978        (cdc->pr != have) )
2979     {
2980       cdc->have = have;
2981       return GNUNET_NO;
2982     }
2983   return GNUNET_YES;
2984 }
2985
2986
2987 /**
2988  * We're processing (local) results for a search request
2989  * from another peer.  Pass applicable results to the
2990  * peer and if we are done either clean up (operation
2991  * complete) or forward to other peers (more results possible).
2992  *
2993  * @param cls our closure (struct LocalGetContext)
2994  * @param key key for the content
2995  * @param size number of bytes in data
2996  * @param data content stored
2997  * @param type type of the content
2998  * @param priority priority of the content
2999  * @param anonymity anonymity-level for the content
3000  * @param expiration expiration time for the content
3001  * @param uid unique identifier for the datum;
3002  *        maybe 0 if no unique identifier is available
3003  */
3004 static void
3005 process_local_reply (void *cls,
3006                      const GNUNET_HashCode * key,
3007                      uint32_t size,
3008                      const void *data,
3009                      enum GNUNET_BLOCK_Type type,
3010                      uint32_t priority,
3011                      uint32_t anonymity,
3012                      struct GNUNET_TIME_Absolute
3013                      expiration, 
3014                      uint64_t uid)
3015 {
3016   struct PendingRequest *pr = cls;
3017   struct ProcessReplyClosure prq;
3018   struct CheckDuplicateRequestClosure cdrc;
3019   GNUNET_HashCode query;
3020   unsigned int old_rf;
3021   
3022   if (NULL == key)
3023     {
3024 #if DEBUG_FS > 1
3025       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3026                   "Done processing local replies, forwarding request to other peers.\n");
3027 #endif
3028       pr->qe = NULL;
3029       if (pr->client_request_list != NULL)
3030         {
3031           GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
3032                                       GNUNET_YES);
3033           /* Figure out if this is a duplicate request and possibly
3034              merge 'struct PendingRequest' entries */
3035           cdrc.have = NULL;
3036           cdrc.pr = pr;
3037           GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3038                                                       &pr->query,
3039                                                       &check_duplicate_request_client,
3040                                                       &cdrc);
3041           if (cdrc.have != NULL)
3042             {
3043 #if DEBUG_FS
3044               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3045                           "Received request for block `%s' twice from client, will only request once.\n",
3046                           GNUNET_h2s (&pr->query));
3047 #endif
3048               
3049               destroy_pending_request (pr);
3050               return;
3051             }
3052         }
3053
3054       /* no more results */
3055       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
3056         pr->task = GNUNET_SCHEDULER_add_now (sched,
3057                                              &forward_request_task,
3058                                              pr);      
3059       return;
3060     }
3061 #if DEBUG_FS
3062   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3063               "New local response to `%s' of type %u.\n",
3064               GNUNET_h2s (key),
3065               type);
3066 #endif
3067   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3068     {
3069 #if DEBUG_FS
3070       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3071                   "Found ONDEMAND block, performing on-demand encoding\n");
3072 #endif
3073       GNUNET_STATISTICS_update (stats,
3074                                 gettext_noop ("# on-demand blocks matched requests"),
3075                                 1,
3076                                 GNUNET_NO);
3077       if (GNUNET_OK != 
3078           GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, 
3079                                             anonymity, expiration, uid, 
3080                                             &process_local_reply,
3081                                             pr))
3082       if (pr->qe != NULL)
3083         {
3084           GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3085         }
3086       return;
3087     }
3088   old_rf = pr->results_found;
3089   memset (&prq, 0, sizeof (prq));
3090   prq.data = data;
3091   prq.expiration = expiration;
3092   prq.size = size;  
3093   if (GNUNET_OK != 
3094       GNUNET_BLOCK_get_key (block_ctx,
3095                             type,
3096                             data,
3097                             size,
3098                             &query))
3099     {
3100       GNUNET_break (0);
3101       GNUNET_DATASTORE_remove (dsh,
3102                                key,
3103                                size, data,
3104                                -1, -1, 
3105                                GNUNET_TIME_UNIT_FOREVER_REL,
3106                                NULL, NULL);
3107       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3108       return;
3109     }
3110   prq.type = type;
3111   prq.priority = priority;  
3112   prq.finished = GNUNET_NO;
3113   process_reply (&prq, key, pr);
3114   if ( (old_rf == 0) &&
3115        (pr->results_found == 1) )
3116     update_datastore_delays (pr->start_time);
3117   if (prq.finished == GNUNET_YES)
3118     return;
3119   if (pr->qe == NULL)
3120     return; /* done here */
3121   if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
3122     {
3123       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3124       return;
3125     }
3126   if ( (pr->client_request_list == NULL) &&
3127        ( (GNUNET_YES == test_load_too_high()) ||
3128          (pr->results_found > 5 + 2 * pr->priority) ) )
3129     {
3130 #if DEBUG_FS > 2
3131       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3132                   "Load too high, done with request\n");
3133 #endif
3134       GNUNET_STATISTICS_update (stats,
3135                                 gettext_noop ("# processing result set cut short due to load"),
3136                                 1,
3137                                 GNUNET_NO);
3138       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3139       return;
3140     }
3141   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3142 }
3143
3144
3145 /**
3146  * We've received a request with the specified priority.  Bound it
3147  * according to how much we trust the given peer.
3148  * 
3149  * @param prio_in requested priority
3150  * @param cp the peer making the request
3151  * @return effective priority
3152  */
3153 static uint32_t
3154 bound_priority (uint32_t prio_in,
3155                 struct ConnectedPeer *cp)
3156 {
3157 #define N ((double)128.0)
3158   uint32_t ret;
3159   double rret;
3160   int ld;
3161
3162   ld = test_load_too_high ();
3163   if (ld == GNUNET_SYSERR)
3164     return 0; /* excess resources */
3165   ret = change_host_trust (cp, prio_in);
3166   if (ret > 0)
3167     {
3168       if (ret > current_priorities + N)
3169         rret = current_priorities + N;
3170       else
3171         rret = ret;
3172       current_priorities 
3173         = (current_priorities * (N-1) + rret)/N;
3174     }
3175 #undef N
3176   return ret;
3177 }
3178
3179
3180 /**
3181  * Iterator over entries in the 'query_request_map' that
3182  * tries to see if we have the same request pending from
3183  * the same peer already.
3184  *
3185  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
3186  * @param key current key code (query, ignored, must match)
3187  * @param value value in the hash map (a 'struct PendingRequest' 
3188  *              that already exists)
3189  * @return GNUNET_YES if we should continue to
3190  *         iterate (no match yet)
3191  *         GNUNET_NO if not (match found).
3192  */
3193 static int
3194 check_duplicate_request_peer (void *cls,
3195                               const GNUNET_HashCode * key,
3196                               void *value)
3197 {
3198   struct CheckDuplicateRequestClosure *cdc = cls;
3199   struct PendingRequest *have = value;
3200
3201   if (cdc->pr->target_pid == have->target_pid)
3202     {
3203       cdc->have = have;
3204       return GNUNET_NO;
3205     }
3206   return GNUNET_YES;
3207 }
3208
3209
3210 /**
3211  * Handle P2P "GET" request.
3212  *
3213  * @param cls closure, always NULL
3214  * @param other the other peer involved (sender or receiver, NULL
3215  *        for loopback messages where we are both sender and receiver)
3216  * @param message the actual message
3217  * @param latency reported latency of the connection with 'other'
3218  * @param distance reported distance (DV) to 'other' 
3219  * @return GNUNET_OK to keep the connection open,
3220  *         GNUNET_SYSERR to close it (signal serious error)
3221  */
3222 static int
3223 handle_p2p_get (void *cls,
3224                 const struct GNUNET_PeerIdentity *other,
3225                 const struct GNUNET_MessageHeader *message,
3226                 struct GNUNET_TIME_Relative latency,
3227                 uint32_t distance)
3228 {
3229   struct PendingRequest *pr;
3230   struct ConnectedPeer *cp;
3231   struct ConnectedPeer *cps;
3232   struct CheckDuplicateRequestClosure cdc;
3233   struct GNUNET_TIME_Relative timeout;
3234   uint16_t msize;
3235   const struct GetMessage *gm;
3236   unsigned int bits;
3237   const GNUNET_HashCode *opt;
3238   uint32_t bm;
3239   size_t bfsize;
3240   uint32_t ttl_decrement;
3241   enum GNUNET_BLOCK_Type type;
3242   int have_ns;
3243   int ld;
3244
3245   msize = ntohs(message->size);
3246   if (msize < sizeof (struct GetMessage))
3247     {
3248       GNUNET_break_op (0);
3249       return GNUNET_SYSERR;
3250     }
3251   gm = (const struct GetMessage*) message;
3252   type = ntohl (gm->type);
3253   bm = ntohl (gm->hash_bitmap);
3254   bits = 0;
3255   while (bm > 0)
3256     {
3257       if (1 == (bm & 1))
3258         bits++;
3259       bm >>= 1;
3260     }
3261   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
3262     {
3263       GNUNET_break_op (0);
3264       return GNUNET_SYSERR;
3265     }  
3266   opt = (const GNUNET_HashCode*) &gm[1];
3267   bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
3268   bm = ntohl (gm->hash_bitmap);
3269   bits = 0;
3270   cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3271                                            &other->hashPubKey);
3272   if (NULL == cps)
3273     {
3274       /* peer must have just disconnected */
3275       GNUNET_STATISTICS_update (stats,
3276                                 gettext_noop ("# requests dropped due to initiator not being connected"),
3277                                 1,
3278                                 GNUNET_NO);
3279       return GNUNET_SYSERR;
3280     }
3281   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
3282     cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3283                                             &opt[bits++]);
3284   else
3285     cp = cps;
3286   if (cp == NULL)
3287     {
3288 #if DEBUG_FS
3289       if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
3290         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3291                     "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
3292                     GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
3293       
3294       else
3295         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3296                     "Failed to find peer `%4s' in connection set. Dropping query.\n",
3297                     GNUNET_i2s (other));
3298 #endif
3299       GNUNET_STATISTICS_update (stats,
3300                                 gettext_noop ("# requests dropped due to missing reverse route"),
3301                                 1,
3302                                 GNUNET_NO);
3303      /* FIXME: try connect? */
3304       return GNUNET_OK;
3305     }
3306   /* note that we can really only check load here since otherwise
3307      peers could find out that we are overloaded by not being
3308      disconnected after sending us a malformed query... */
3309
3310   /* FIXME: query priority should play
3311      a major role here! */
3312   ld = test_load_too_high ();
3313   if (GNUNET_YES == ld)
3314     {
3315 #if DEBUG_FS
3316       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3317                   "Dropping query from `%s', this peer is too busy.\n",
3318                   GNUNET_i2s (other));
3319 #endif
3320       GNUNET_STATISTICS_update (stats,
3321                                 gettext_noop ("# requests dropped due to high load"),
3322                                 1,
3323                                 GNUNET_NO);
3324       return GNUNET_OK;
3325     }
3326   /* FIXME: if ld == GNUNET_NO, forward
3327      instead of indirecting! */
3328
3329 #if DEBUG_FS 
3330   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3331               "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
3332               GNUNET_h2s (&gm->query),
3333               (unsigned int) type,
3334               GNUNET_i2s (other),
3335               (unsigned int) bm);
3336 #endif
3337   have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
3338   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
3339                       (have_ns ? sizeof(GNUNET_HashCode) : 0));
3340   if (have_ns)
3341     {
3342       pr->namespace = (GNUNET_HashCode*) &pr[1];
3343       memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
3344     }
3345   pr->type = type;
3346   pr->mingle = ntohl (gm->filter_mutator);
3347   if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
3348     pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
3349   pr->anonymity_level = 1;
3350   pr->priority = bound_priority (ntohl (gm->priority), cps);
3351   pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
3352   pr->query = gm->query;
3353   /* decrement ttl (always) */
3354   ttl_decrement = 2 * TTL_DECREMENT +
3355     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3356                               TTL_DECREMENT);
3357   if ( (pr->ttl < 0) &&
3358        (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
3359     {
3360 #if DEBUG_FS
3361       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3362                   "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
3363                   GNUNET_i2s (other),
3364                   pr->ttl,
3365                   ttl_decrement);
3366 #endif
3367       GNUNET_STATISTICS_update (stats,
3368                                 gettext_noop ("# requests dropped due TTL underflow"),
3369                                 1,
3370                                 GNUNET_NO);
3371       /* integer underflow => drop (should be very rare)! */      
3372       GNUNET_free (pr);
3373       return GNUNET_OK;
3374     } 
3375   pr->ttl -= ttl_decrement;
3376   pr->start_time = GNUNET_TIME_absolute_get ();
3377
3378   /* get bloom filter */
3379   if (bfsize > 0)
3380     {
3381       pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
3382                                                   bfsize,
3383                                                   BLOOMFILTER_K);
3384       pr->bf_size = bfsize;
3385     }
3386
3387   cdc.have = NULL;
3388   cdc.pr = pr;
3389   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3390                                               &gm->query,
3391                                               &check_duplicate_request_peer,
3392                                               &cdc);
3393   if (cdc.have != NULL)
3394     {
3395       if (cdc.have->start_time.value + cdc.have->ttl >=
3396           pr->start_time.value + pr->ttl)
3397         {
3398           /* existing request has higher TTL, drop new one! */
3399           cdc.have->priority += pr->priority;
3400           destroy_pending_request (pr);
3401 #if DEBUG_FS
3402           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3403                       "Have existing request with higher TTL, dropping new request.\n",
3404                       GNUNET_i2s (other));
3405 #endif
3406           GNUNET_STATISTICS_update (stats,
3407                                     gettext_noop ("# requests dropped due to higher-TTL request"),
3408                                     1,
3409                                     GNUNET_NO);
3410           return GNUNET_OK;
3411         }
3412       else
3413         {
3414           /* existing request has lower TTL, drop old one! */
3415           pr->priority += cdc.have->priority;
3416           /* Possible optimization: if we have applicable pending
3417              replies in 'cdc.have', we might want to move those over
3418              (this is a really rare special-case, so it is not clear
3419              that this would be worth it) */
3420           destroy_pending_request (cdc.have);
3421           /* keep processing 'pr'! */
3422         }
3423     }
3424
3425   pr->cp = cp;
3426   GNUNET_break (GNUNET_OK ==
3427                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
3428                                                    &gm->query,
3429                                                    pr,
3430                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
3431   GNUNET_break (GNUNET_OK ==
3432                 GNUNET_CONTAINER_multihashmap_put (peer_request_map,
3433                                                    &other->hashPubKey,
3434                                                    pr,
3435                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
3436   
3437   pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
3438                                             pr,
3439                                             pr->start_time.value + pr->ttl);
3440
3441   GNUNET_STATISTICS_update (stats,
3442                             gettext_noop ("# P2P searches received"),
3443                             1,
3444                             GNUNET_NO);
3445   GNUNET_STATISTICS_update (stats,
3446                             gettext_noop ("# P2P searches active"),
3447                             1,
3448                             GNUNET_NO);
3449
3450   /* calculate change in traffic preference */
3451   cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
3452   /* process locally */
3453   if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
3454     type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
3455   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
3456                                            (pr->priority + 1)); 
3457   pr->qe = GNUNET_DATASTORE_get (dsh,
3458                                  &gm->query,
3459                                  type,                         
3460                                  pr->priority + 1,
3461                                  MAX_DATASTORE_QUEUE,                            
3462                                  timeout,
3463                                  &process_local_reply,
3464                                  pr);
3465
3466   /* Are multiple results possible?  If so, start processing remotely now! */
3467   switch (pr->type)
3468     {
3469     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
3470     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
3471       /* only one result, wait for datastore */
3472       break;
3473     default:
3474       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
3475         pr->task = GNUNET_SCHEDULER_add_now (sched,
3476                                              &forward_request_task,
3477                                              pr);
3478     }
3479
3480   /* make sure we don't track too many requests */
3481   if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
3482     {
3483       pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
3484       GNUNET_assert (pr != NULL);
3485       destroy_pending_request (pr);
3486     }
3487   return GNUNET_OK;
3488 }
3489
3490
3491 /* **************************** CS GET Handling ************************ */
3492
3493
3494 /**
3495  * Handle START_SEARCH-message (search request from client).
3496  *
3497  * @param cls closure
3498  * @param client identification of the client
3499  * @param message the actual message
3500  */
3501 static void
3502 handle_start_search (void *cls,
3503                      struct GNUNET_SERVER_Client *client,
3504                      const struct GNUNET_MessageHeader *message)
3505 {
3506   static GNUNET_HashCode all_zeros;
3507   const struct SearchMessage *sm;
3508   struct ClientList *cl;
3509   struct ClientRequestList *crl;
3510   struct PendingRequest *pr;
3511   uint16_t msize;
3512   unsigned int sc;
3513   enum GNUNET_BLOCK_Type type;
3514
3515   msize = ntohs (message->size);
3516   if ( (msize < sizeof (struct SearchMessage)) ||
3517        (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
3518     {
3519       GNUNET_break (0);
3520       GNUNET_SERVER_receive_done (client,
3521                                   GNUNET_SYSERR);
3522       return;
3523     }
3524   GNUNET_STATISTICS_update (stats,
3525                             gettext_noop ("# client searches received"),
3526                             1,
3527                             GNUNET_NO);
3528   sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
3529   sm = (const struct SearchMessage*) message;
3530   type = ntohl (sm->type);
3531 #if DEBUG_FS
3532   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3533               "Received request for `%s' of type %u from local client\n",
3534               GNUNET_h2s (&sm->query),
3535               (unsigned int) type);
3536 #endif
3537   cl = client_list;
3538   while ( (cl != NULL) &&
3539           (cl->client != client) )
3540     cl = cl->next;
3541   if (cl == NULL)
3542     {
3543       cl = GNUNET_malloc (sizeof (struct ClientList));
3544       cl->client = client;
3545       GNUNET_SERVER_client_keep (client);
3546       cl->next = client_list;
3547       client_list = cl;
3548     }
3549   /* detect duplicate KBLOCK requests */
3550   if ( (type == GNUNET_BLOCK_TYPE_FS_KBLOCK) ||
3551        (type == GNUNET_BLOCK_TYPE_FS_NBLOCK) ||
3552        (type == GNUNET_BLOCK_TYPE_ANY) )
3553     {
3554       crl = cl->rl_head;
3555       while ( (crl != NULL) &&
3556               ( (0 != memcmp (&crl->req->query,
3557                               &sm->query,
3558                               sizeof (GNUNET_HashCode))) ||
3559                 (crl->req->type != type) ) )
3560         crl = crl->next;
3561       if (crl != NULL)  
3562         { 
3563 #if DEBUG_FS
3564           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3565                       "Have existing request, merging content-seen lists.\n");
3566 #endif
3567           pr = crl->req;
3568           /* Duplicate request (used to send long list of
3569              known/blocked results); merge 'pr->replies_seen'
3570              and update bloom filter */
3571           GNUNET_array_grow (pr->replies_seen,
3572                              pr->replies_seen_size,
3573                              pr->replies_seen_off + sc);
3574           memcpy (&pr->replies_seen[pr->replies_seen_off],
3575                   &sm[1],
3576                   sc * sizeof (GNUNET_HashCode));
3577           pr->replies_seen_off += sc;
3578           refresh_bloomfilter (pr);
3579           GNUNET_STATISTICS_update (stats,
3580                                     gettext_noop ("# client searches updated (merged content seen list)"),
3581                                     1,
3582                                     GNUNET_NO);
3583           GNUNET_SERVER_receive_done (client,
3584                                       GNUNET_OK);
3585           return;
3586         }
3587     }
3588   GNUNET_STATISTICS_update (stats,
3589                             gettext_noop ("# client searches active"),
3590                             1,
3591                             GNUNET_NO);
3592   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
3593                       ((type == GNUNET_BLOCK_TYPE_FS_SBLOCK) ? sizeof(GNUNET_HashCode) : 0));
3594   crl = GNUNET_malloc (sizeof (struct ClientRequestList));
3595   memset (crl, 0, sizeof (struct ClientRequestList));
3596   crl->client_list = cl;
3597   GNUNET_CONTAINER_DLL_insert (cl->rl_head,
3598                                cl->rl_tail,
3599                                crl);  
3600   crl->req = pr;
3601   pr->type = type;
3602   pr->client_request_list = crl;
3603   GNUNET_array_grow (pr->replies_seen,
3604                      pr->replies_seen_size,
3605                      sc);
3606   memcpy (pr->replies_seen,
3607           &sm[1],
3608           sc * sizeof (GNUNET_HashCode));
3609   pr->replies_seen_off = sc;
3610   pr->anonymity_level = ntohl (sm->anonymity_level); 
3611   pr->start_time = GNUNET_TIME_absolute_get ();
3612   refresh_bloomfilter (pr);
3613   pr->query = sm->query;
3614   if (0 == (1 & ntohl (sm->options)))
3615     pr->local_only = GNUNET_NO;
3616   else
3617     pr->local_only = GNUNET_YES;
3618   switch (type)
3619     {
3620     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
3621     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
3622       if (0 != memcmp (&sm->target,
3623                        &all_zeros,
3624                        sizeof (GNUNET_HashCode)))
3625         pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target);
3626       break;
3627     case GNUNET_BLOCK_TYPE_FS_SBLOCK:
3628       pr->namespace = (GNUNET_HashCode*) &pr[1];
3629       memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode));
3630       break;
3631     default:
3632       break;
3633     }
3634   GNUNET_break (GNUNET_OK ==
3635                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
3636                                                    &sm->query,
3637                                                    pr,
3638                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
3639   if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
3640     type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */
3641   pr->qe = GNUNET_DATASTORE_get (dsh,
3642                                  &sm->query,
3643                                  type,
3644                                  -3, -1,
3645                                  GNUNET_CONSTANTS_SERVICE_TIMEOUT,                             
3646                                  &process_local_reply,
3647                                  pr);
3648 }
3649
3650
3651 /* **************************** Startup ************************ */
3652
3653 /**
3654  * Process fs requests.
3655  *
3656  * @param s scheduler to use
3657  * @param server the initialized server
3658  * @param c configuration to use
3659  */
3660 static int
3661 main_init (struct GNUNET_SCHEDULER_Handle *s,
3662            struct GNUNET_SERVER_Handle *server,
3663            const struct GNUNET_CONFIGURATION_Handle *c)
3664 {
3665   static const struct GNUNET_CORE_MessageHandler p2p_handlers[] =
3666     {
3667       { &handle_p2p_get, 
3668         GNUNET_MESSAGE_TYPE_FS_GET, 0 },
3669       { &handle_p2p_put, 
3670         GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
3671       { &handle_p2p_migration_stop, 
3672         GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP,
3673         sizeof (struct MigrationStopMessage) },
3674       { NULL, 0, 0 }
3675     };
3676   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
3677     {&GNUNET_FS_handle_index_start, NULL, 
3678      GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
3679     {&GNUNET_FS_handle_index_list_get, NULL, 
3680      GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
3681     {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
3682      sizeof (struct UnindexMessage) },
3683     {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
3684      0 },
3685     {NULL, NULL, 0, 0}
3686   };
3687   unsigned long long enc = 128;
3688
3689   sched = s;
3690   cfg = c;
3691   stats = GNUNET_STATISTICS_create (sched, "fs", cfg);
3692   min_migration_delay = GNUNET_TIME_UNIT_SECONDS;
3693   if ( (GNUNET_OK !=
3694         GNUNET_CONFIGURATION_get_value_number (cfg,
3695                                                "fs",
3696                                                "MAX_PENDING_REQUESTS",
3697                                                &max_pending_requests)) ||
3698        (GNUNET_OK !=
3699         GNUNET_CONFIGURATION_get_value_number (cfg,
3700                                                "fs",
3701                                                "EXPECTED_NEIGHBOUR_COUNT",
3702                                                &enc)) ||
3703        (GNUNET_OK != 
3704         GNUNET_CONFIGURATION_get_value_time (cfg,
3705                                              "fs",
3706                                              "MIN_MIGRATION_DELAY",
3707                                              &min_migration_delay)) )
3708     {
3709       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3710                   _("Configuration fails to specify certain parameters, assuming default values."));
3711     }
3712   connected_peers = GNUNET_CONTAINER_multihashmap_create (enc); 
3713   query_request_map = GNUNET_CONTAINER_multihashmap_create (max_pending_requests);
3714   peer_request_map = GNUNET_CONTAINER_multihashmap_create (enc);
3715   requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
3716   core = GNUNET_CORE_connect (sched,
3717                               cfg,
3718                               GNUNET_TIME_UNIT_FOREVER_REL,
3719                               NULL,
3720                               NULL,
3721                               &peer_connect_handler,
3722                               &peer_disconnect_handler,
3723                               NULL,
3724                               NULL, GNUNET_NO,
3725                               NULL, GNUNET_NO,
3726                               p2p_handlers);
3727   if (NULL == core)
3728     {
3729       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3730                   _("Failed to connect to `%s' service.\n"),
3731                   "core");
3732       GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
3733       connected_peers = NULL;
3734       GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
3735       query_request_map = NULL;
3736       GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
3737       requests_by_expiration_heap = NULL;
3738       GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
3739       peer_request_map = NULL;
3740       if (dsh != NULL)
3741         {
3742           GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
3743           dsh = NULL;
3744         }
3745       return GNUNET_SYSERR;
3746     }
3747   /* FIXME: distinguish between sending and storing in options? */
3748   if (active_migration) 
3749     {
3750       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3751                   _("Content migration is enabled, will start to gather data\n"));
3752       consider_migration_gathering ();
3753     }
3754   GNUNET_SERVER_disconnect_notify (server, 
3755                                    &handle_client_disconnect,
3756                                    NULL);
3757   GNUNET_assert (GNUNET_OK ==
3758                  GNUNET_CONFIGURATION_get_value_filename (cfg,
3759                                                           "fs",
3760                                                           "TRUST",
3761                                                           &trustDirectory));
3762   GNUNET_DISK_directory_create (trustDirectory);
3763   GNUNET_SCHEDULER_add_with_priority (sched,
3764                                       GNUNET_SCHEDULER_PRIORITY_HIGH,
3765                                       &cron_flush_trust, NULL);
3766
3767
3768   GNUNET_SERVER_add_handlers (server, handlers);
3769   GNUNET_SCHEDULER_add_delayed (sched,
3770                                 GNUNET_TIME_UNIT_FOREVER_REL,
3771                                 &shutdown_task,
3772                                 NULL);
3773   return GNUNET_OK;
3774 }
3775
3776
3777 /**
3778  * Process fs requests.
3779  *
3780  * @param cls closure
3781  * @param sched scheduler to use
3782  * @param server the initialized server
3783  * @param cfg configuration to use
3784  */
3785 static void
3786 run (void *cls,
3787      struct GNUNET_SCHEDULER_Handle *sched,
3788      struct GNUNET_SERVER_Handle *server,
3789      const struct GNUNET_CONFIGURATION_Handle *cfg)
3790 {
3791   active_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
3792                                                            "FS",
3793                                                            "ACTIVEMIGRATION");
3794   dsh = GNUNET_DATASTORE_connect (cfg,
3795                                   sched);
3796   if (dsh == NULL)
3797     {
3798       GNUNET_SCHEDULER_shutdown (sched);
3799       return;
3800     }
3801   datastore_load = GNUNET_LOAD_value_init ();
3802   block_cfg = GNUNET_CONFIGURATION_create ();
3803   GNUNET_CONFIGURATION_set_value_string (block_cfg,
3804                                          "block",
3805                                          "PLUGINS",
3806                                          "fs");
3807   block_ctx = GNUNET_BLOCK_context_create (block_cfg);
3808   GNUNET_assert (NULL != block_ctx);
3809   dht_handle = GNUNET_DHT_connect (sched,
3810                                    cfg,
3811                                    FS_DHT_HT_SIZE);
3812   if ( (GNUNET_OK != GNUNET_FS_indexing_init (sched, cfg, dsh)) ||
3813        (GNUNET_OK != main_init (sched, server, cfg)) )
3814     {    
3815       GNUNET_SCHEDULER_shutdown (sched);
3816       GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
3817       dsh = NULL;
3818       GNUNET_DHT_disconnect (dht_handle);
3819       dht_handle = NULL;
3820       GNUNET_BLOCK_context_destroy (block_ctx);
3821       block_ctx = NULL;
3822       GNUNET_CONFIGURATION_destroy (block_cfg);
3823       block_cfg = NULL;
3824       GNUNET_LOAD_value_free (datastore_load);
3825       datastore_load = NULL;
3826       return;   
3827     }
3828 }
3829
3830
3831 /**
3832  * The main function for the fs service.
3833  *
3834  * @param argc number of arguments from the command line
3835  * @param argv command line arguments
3836  * @return 0 ok, 1 on error
3837  */
3838 int
3839 main (int argc, char *const *argv)
3840 {
3841   return (GNUNET_OK ==
3842           GNUNET_SERVICE_run (argc,
3843                               argv,
3844                               "fs",
3845                               GNUNET_SERVICE_OPTION_NONE,
3846                               &run, NULL)) ? 0 : 1;
3847 }
3848
3849 /* end of gnunet-service-fs.c */