track delay between transmission request and satisfaction:
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs.c
23  * @brief gnunet anonymity protocol implementation
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - track per-peer request latency (using new load API)
28  * - consider more precise latency estimation (per-peer & request) -- again load API?
29  * - implement test_load_too_high, make decision priority-based, implement forwarding, etc.
30  * - introduce random latency in processing
31  * - more statistics
32  */
33 #include "platform.h"
34 #include <float.h>
35 #include "gnunet_constants.h"
36 #include "gnunet_core_service.h"
37 #include "gnunet_dht_service.h"
38 #include "gnunet_datastore_service.h"
39 #include "gnunet_load_lib.h"
40 #include "gnunet_peer_lib.h"
41 #include "gnunet_protocols.h"
42 #include "gnunet_signatures.h"
43 #include "gnunet_statistics_service.h"
44 #include "gnunet_util_lib.h"
45 #include "gnunet-service-fs_indexing.h"
46 #include "fs.h"
47
48 #define DEBUG_FS GNUNET_NO
49
50 /**
51  * Maximum number of outgoing messages we queue per peer.
52  */
53 #define MAX_QUEUE_PER_PEER 16
54
55 /**
56  * Size for the hash map for DHT requests from the FS
57  * service.  Should be about the number of concurrent
58  * DHT requests we plan to make.
59  */
60 #define FS_DHT_HT_SIZE 1024
61
62 /**
63  * How often do we flush trust values to disk?
64  */
65 #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
66
67 /**
68  * How often do we at most PUT content into the DHT?
69  */
70 #define MAX_DHT_PUT_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
71
72 /**
73  * Inverse of the probability that we will submit the same query
74  * to the same peer again.  If the same peer already got the query
75  * repeatedly recently, the probability is multiplied by the inverse
76  * of this number each time.  Note that we only try about every TTL_DECREMENT/2
77  * plus MAX_CORK_DELAY (so roughly every 3.5s).
78  */
79 #define RETRY_PROBABILITY_INV 3
80
81 /**
82  * What is the maximum delay for a P2P FS message (in our interaction
83  * with core)?  FS-internal delays are another story.  The value is
84  * chosen based on the 32k block size.  Given that peers typcially
85  * have at least 1 kb/s bandwidth, 45s waits give us a chance to
86  * transmit one message even to the lowest-bandwidth peers.
87  */
88 #define MAX_TRANSMIT_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 45)
89
90 /**
91  * Maximum number of requests (from other peers) that we're
92  * willing to have pending at any given point in time.
93  */
94 static unsigned long long max_pending_requests = (32 * 1024);
95
96
97 /**
98  * Information we keep for each pending reply.  The
99  * actual message follows at the end of this struct.
100  */
101 struct PendingMessage;
102
103 /**
104  * Function called upon completion of a transmission.
105  *
106  * @param cls closure
107  * @param pid ID of receiving peer, 0 on transmission error
108  */
109 typedef void (*TransmissionContinuation)(void * cls, 
110                                          GNUNET_PEER_Id tpid);
111
112
113 /**
114  * Information we keep for each pending message (GET/PUT).  The
115  * actual message follows at the end of this struct.
116  */
117 struct PendingMessage
118 {
119   /**
120    * This is a doubly-linked list of messages to the same peer.
121    */
122   struct PendingMessage *next;
123
124   /**
125    * This is a doubly-linked list of messages to the same peer.
126    */
127   struct PendingMessage *prev;
128
129   /**
130    * Entry in pending message list for this pending message.
131    */ 
132   struct PendingMessageList *pml;  
133
134   /**
135    * Function to call immediately once we have transmitted this
136    * message.
137    */
138   TransmissionContinuation cont;
139
140   /**
141    * Closure for cont.
142    */
143   void *cont_cls;
144
145   /**
146    * Size of the reply; actual reply message follows
147    * at the end of this struct.
148    */
149   size_t msize;
150   
151   /**
152    * How important is this message for us?
153    */
154   uint32_t priority;
155  
156 };
157
158
159 /**
160  * Information about a peer that we are connected to.
161  * We track data that is useful for determining which
162  * peers should receive our requests.  We also keep
163  * a list of messages to transmit to this peer.
164  */
165 struct ConnectedPeer
166 {
167
168   /**
169    * List of the last clients for which this peer successfully
170    * answered a query.
171    */
172   struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
173
174   /**
175    * List of the last PIDs for which
176    * this peer successfully answered a query;
177    * We use 0 to indicate no successful reply.
178    */
179   GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
180
181   /**
182    * Average delay between sending the peer a request and
183    * getting a reply (only calculated over the requests for
184    * which we actually got a reply).   Calculated
185    * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
186    */ 
187   struct GNUNET_TIME_Relative avg_delay;
188
189   /**
190    * Point in time until which this peer does not want us to migrate content
191    * to it.
192    */
193   struct GNUNET_TIME_Absolute migration_blocked;
194
195   /**
196    * Time until when we blocked this peer from migrating
197    * data to us.
198    */
199   struct GNUNET_TIME_Absolute last_migration_block;
200
201   /**
202    * Handle for an active request for transmission to this
203    * peer, or NULL.
204    */
205   struct GNUNET_CORE_TransmitHandle *cth;
206
207   /**
208    * Messages (replies, queries, content migration) we would like to
209    * send to this peer in the near future.  Sorted by priority, head.
210    */
211   struct PendingMessage *pending_messages_head;
212
213   /**
214    * Messages (replies, queries, content migration) we would like to
215    * send to this peer in the near future.  Sorted by priority, tail.
216    */
217   struct PendingMessage *pending_messages_tail;
218
219   /**
220    * How long does it typically take for us to transmit a message
221    * to this peer?  (delay between the request being issued and
222    * the callback being invoked).
223    */
224   struct GNUNET_LOAD_Value *transmission_delay;
225
226   /**
227    * Time when the last transmission request was issued.
228    */
229   struct GNUNET_TIME_Absolute last_transmission_request_start;
230
231   /**
232    * Average priority of successful replies.  Calculated
233    * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
234    */
235   double avg_priority;
236
237   /**
238    * Increase in traffic preference still to be submitted
239    * to the core service for this peer.
240    */
241   uint64_t inc_preference;
242
243   /**
244    * Trust rating for this peer
245    */
246   uint32_t trust;
247
248   /**
249    * Trust rating for this peer on disk.
250    */
251   uint32_t disk_trust;
252
253   /**
254    * The peer's identity.
255    */
256   GNUNET_PEER_Id pid;  
257
258   /**
259    * Size of the linked list of 'pending_messages'.
260    */
261   unsigned int pending_requests;
262
263   /**
264    * Which offset in "last_p2p_replies" will be updated next?
265    * (we go round-robin).
266    */
267   unsigned int last_p2p_replies_woff;
268
269   /**
270    * Which offset in "last_client_replies" will be updated next?
271    * (we go round-robin).
272    */
273   unsigned int last_client_replies_woff;
274
275 };
276
277
278 /**
279  * Information we keep for each pending request.  We should try to
280  * keep this struct as small as possible since its memory consumption
281  * is key to how many requests we can have pending at once.
282  */
283 struct PendingRequest;
284
285
286 /**
287  * Doubly-linked list of requests we are performing
288  * on behalf of the same client.
289  */
290 struct ClientRequestList
291 {
292
293   /**
294    * This is a doubly-linked list.
295    */
296   struct ClientRequestList *next;
297
298   /**
299    * This is a doubly-linked list.
300    */
301   struct ClientRequestList *prev;
302
303   /**
304    * Request this entry represents.
305    */
306   struct PendingRequest *req;
307
308   /**
309    * Client list this request belongs to.
310    */
311   struct ClientList *client_list;
312
313 };
314
315
316 /**
317  * Replies to be transmitted to the client.  The actual
318  * response message is allocated after this struct.
319  */
320 struct ClientResponseMessage
321 {
322   /**
323    * This is a doubly-linked list.
324    */
325   struct ClientResponseMessage *next;
326
327   /**
328    * This is a doubly-linked list.
329    */
330   struct ClientResponseMessage *prev;
331
332   /**
333    * Client list entry this response belongs to.
334    */
335   struct ClientList *client_list;
336
337   /**
338    * Number of bytes in the response.
339    */
340   size_t msize;
341 };
342
343
344 /**
345  * Linked list of clients we are performing requests
346  * for right now.
347  */
348 struct ClientList
349 {
350   /**
351    * This is a linked list.
352    */
353   struct ClientList *next;
354
355   /**
356    * ID of a client making a request, NULL if this entry is for a
357    * peer.
358    */
359   struct GNUNET_SERVER_Client *client;
360
361   /**
362    * Head of list of requests performed on behalf
363    * of this client right now.
364    */
365   struct ClientRequestList *rl_head;
366
367   /**
368    * Tail of list of requests performed on behalf
369    * of this client right now.
370    */
371   struct ClientRequestList *rl_tail;
372
373   /**
374    * Head of linked list of responses.
375    */
376   struct ClientResponseMessage *res_head;
377
378   /**
379    * Tail of linked list of responses.
380    */
381   struct ClientResponseMessage *res_tail;
382
383   /**
384    * Context for sending replies.
385    */
386   struct GNUNET_CONNECTION_TransmitHandle *th;
387
388 };
389
390
391 /**
392  * Doubly-linked list of messages we are performing
393  * due to a pending request.
394  */
395 struct PendingMessageList
396 {
397
398   /**
399    * This is a doubly-linked list of messages on behalf of the same request.
400    */
401   struct PendingMessageList *next;
402
403   /**
404    * This is a doubly-linked list of messages on behalf of the same request.
405    */
406   struct PendingMessageList *prev;
407
408   /**
409    * Message this entry represents.
410    */
411   struct PendingMessage *pm;
412
413   /**
414    * Request this entry belongs to.
415    */
416   struct PendingRequest *req;
417
418   /**
419    * Peer this message is targeted for.
420    */
421   struct ConnectedPeer *target;
422
423 };
424
425
426 /**
427  * Information we keep for each pending request.  We should try to
428  * keep this struct as small as possible since its memory consumption
429  * is key to how many requests we can have pending at once.
430  */
431 struct PendingRequest
432 {
433
434   /**
435    * If this request was made by a client, this is our entry in the
436    * client request list; otherwise NULL.
437    */
438   struct ClientRequestList *client_request_list;
439
440   /**
441    * Entry of peer responsible for this entry (if this request
442    * was made by a peer).
443    */
444   struct ConnectedPeer *cp;
445
446   /**
447    * If this is a namespace query, pointer to the hash of the public
448    * key of the namespace; otherwise NULL.  Pointer will be to the 
449    * end of this struct (so no need to free it).
450    */
451   const GNUNET_HashCode *namespace;
452
453   /**
454    * Bloomfilter we use to filter out replies that we don't care about
455    * (anymore).  NULL as long as we are interested in all replies.
456    */
457   struct GNUNET_CONTAINER_BloomFilter *bf;
458
459   /**
460    * Context of our GNUNET_CORE_peer_change_preference call.
461    */
462   struct GNUNET_CORE_InformationRequestContext *irc;
463
464   /**
465    * Reference to DHT get operation for this request (or NULL).
466    */
467   struct GNUNET_DHT_GetHandle *dht_get;
468
469   /**
470    * Hash code of all replies that we have seen so far (only valid
471    * if client is not NULL since we only track replies like this for
472    * our own clients).
473    */
474   GNUNET_HashCode *replies_seen;
475
476   /**
477    * Node in the heap representing this entry; NULL
478    * if we have no heap node.
479    */
480   struct GNUNET_CONTAINER_HeapNode *hnode;
481
482   /**
483    * Head of list of messages being performed on behalf of this
484    * request.
485    */
486   struct PendingMessageList *pending_head;
487
488   /**
489    * Tail of list of messages being performed on behalf of this
490    * request.
491    */
492   struct PendingMessageList *pending_tail;
493
494   /**
495    * When did we first see this request (form this peer), or, if our
496    * client is initiating, when did we last initiate a search?
497    */
498   struct GNUNET_TIME_Absolute start_time;
499
500   /**
501    * The query that this request is for.
502    */
503   GNUNET_HashCode query;
504
505   /**
506    * The task responsible for transmitting queries
507    * for this request.
508    */
509   GNUNET_SCHEDULER_TaskIdentifier task;
510
511   /**
512    * (Interned) Peer identifier that identifies a preferred target
513    * for requests.
514    */
515   GNUNET_PEER_Id target_pid;
516
517   /**
518    * (Interned) Peer identifiers of peers that have already
519    * received our query for this content.
520    */
521   GNUNET_PEER_Id *used_pids;
522   
523   /**
524    * Our entry in the queue (non-NULL while we wait for our
525    * turn to interact with the local database).
526    */
527   struct GNUNET_DATASTORE_QueueEntry *qe;
528
529   /**
530    * Size of the 'bf' (in bytes).
531    */
532   size_t bf_size;
533
534   /**
535    * Desired anonymity level; only valid for requests from a local client.
536    */
537   uint32_t anonymity_level;
538
539   /**
540    * How many entries in "used_pids" are actually valid?
541    */
542   unsigned int used_pids_off;
543
544   /**
545    * How long is the "used_pids" array?
546    */
547   unsigned int used_pids_size;
548
549   /**
550    * Number of results found for this request.
551    */
552   unsigned int results_found;
553
554   /**
555    * How many entries in "replies_seen" are actually valid?
556    */
557   unsigned int replies_seen_off;
558
559   /**
560    * How long is the "replies_seen" array?
561    */
562   unsigned int replies_seen_size;
563   
564   /**
565    * Priority with which this request was made.  If one of our clients
566    * made the request, then this is the current priority that we are
567    * using when initiating the request.  This value is used when
568    * we decide to reward other peers with trust for providing a reply.
569    */
570   uint32_t priority;
571
572   /**
573    * Priority points left for us to spend when forwarding this request
574    * to other peers.
575    */
576   uint32_t remaining_priority;
577
578   /**
579    * Number to mingle hashes for bloom-filter tests with.
580    */
581   int32_t mingle;
582
583   /**
584    * TTL with which we saw this request (or, if we initiated, TTL that
585    * we used for the request).
586    */
587   int32_t ttl;
588   
589   /**
590    * Type of the content that this request is for.
591    */
592   enum GNUNET_BLOCK_Type type;
593
594   /**
595    * Remove this request after transmission of the current response.
596    */
597   int16_t do_remove;
598
599   /**
600    * GNUNET_YES if we should not forward this request to other peers.
601    */
602   int16_t local_only;
603
604 };
605
606
607 /**
608  * Block that is ready for migration to other peers.  Actual data is at the end of the block.
609  */
610 struct MigrationReadyBlock
611 {
612
613   /**
614    * This is a doubly-linked list.
615    */
616   struct MigrationReadyBlock *next;
617
618   /**
619    * This is a doubly-linked list.
620    */
621   struct MigrationReadyBlock *prev;
622
623   /**
624    * Query for the block.
625    */
626   GNUNET_HashCode query;
627
628   /**
629    * When does this block expire? 
630    */
631   struct GNUNET_TIME_Absolute expiration;
632
633   /**
634    * Peers we would consider forwarding this
635    * block to.  Zero for empty entries.
636    */
637   GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
638
639   /**
640    * Size of the block.
641    */
642   size_t size;
643
644   /**
645    *  Number of targets already used.
646    */
647   unsigned int used_targets;
648
649   /**
650    * Type of the block.
651    */
652   enum GNUNET_BLOCK_Type type;
653 };
654
655
656 /**
657  * Our connection to the datastore.
658  */
659 static struct GNUNET_DATASTORE_Handle *dsh;
660
661 /**
662  * Our block context.
663  */
664 static struct GNUNET_BLOCK_Context *block_ctx;
665
666 /**
667  * Our block configuration.
668  */
669 static struct GNUNET_CONFIGURATION_Handle *block_cfg;
670
671 /**
672  * Our scheduler.
673  */
674 static struct GNUNET_SCHEDULER_Handle *sched;
675
676 /**
677  * Our configuration.
678  */
679 static const struct GNUNET_CONFIGURATION_Handle *cfg;
680
681 /**
682  * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
683  */
684 static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
685
686 /**
687  * Map of peer identifiers to "struct PendingRequest" (for that peer).
688  */
689 static struct GNUNET_CONTAINER_MultiHashMap *peer_request_map;
690
691 /**
692  * Map of query identifiers to "struct PendingRequest" (for that query).
693  */
694 static struct GNUNET_CONTAINER_MultiHashMap *query_request_map;
695
696 /**
697  * Heap with the request that will expire next at the top.  Contains
698  * pointers of type "struct PendingRequest*"; these will *also* be
699  * aliased from the "requests_by_peer" data structures and the
700  * "requests_by_query" table.  Note that requests from our clients
701  * don't expire and are thus NOT in the "requests_by_expiration"
702  * (or the "requests_by_peer" tables).
703  */
704 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
705
706 /**
707  * Handle for reporting statistics.
708  */
709 static struct GNUNET_STATISTICS_Handle *stats;
710
711 /**
712  * Linked list of clients we are currently processing requests for.
713  */
714 static struct ClientList *client_list;
715
716 /**
717  * Pointer to handle to the core service (points to NULL until we've
718  * connected to it).
719  */
720 static struct GNUNET_CORE_Handle *core;
721
722 /**
723  * Head of linked list of blocks that can be migrated.
724  */
725 static struct MigrationReadyBlock *mig_head;
726
727 /**
728  * Tail of linked list of blocks that can be migrated.
729  */
730 static struct MigrationReadyBlock *mig_tail;
731
732 /**
733  * Request to datastore for migration (or NULL).
734  */
735 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
736
737 /**
738  * Request to datastore for DHT PUTs (or NULL).
739  */
740 static struct GNUNET_DATASTORE_QueueEntry *dht_qe;
741
742 /**
743  * Type we will request for the next DHT PUT round from the datastore.
744  */
745 static enum GNUNET_BLOCK_Type dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
746
747 /**
748  * Where do we store trust information?
749  */
750 static char *trustDirectory;
751
752 /**
753  * ID of task that collects blocks for migration.
754  */
755 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
756
757 /**
758  * ID of task that collects blocks for DHT PUTs.
759  */
760 static GNUNET_SCHEDULER_TaskIdentifier dht_task;
761
762 /**
763  * What is the maximum frequency at which we are allowed to
764  * poll the datastore for migration content?
765  */
766 static struct GNUNET_TIME_Relative min_migration_delay;
767
768 /**
769  * Handle for DHT operations.
770  */
771 static struct GNUNET_DHT_Handle *dht_handle;
772
773 /**
774  * Size of the doubly-linked list of migration blocks.
775  */
776 static unsigned int mig_size;
777
778 /**
779  * Are we allowed to migrate content to this peer.
780  */
781 static int active_migration;
782
783 /**
784  * How many entires with zero anonymity do we currently estimate
785  * to have in the database?
786  */
787 static unsigned int zero_anonymity_count_estimate;
788
789 /**
790  * Typical priorities we're seeing from other peers right now.  Since
791  * most priorities will be zero, this value is the weighted average of
792  * non-zero priorities seen "recently".  In order to ensure that new
793  * values do not dramatically change the ratio, values are first
794  * "capped" to a reasonable range (+N of the current value) and then
795  * averaged into the existing value by a ratio of 1:N.  Hence
796  * receiving the largest possible priority can still only raise our
797  * "current_priorities" by at most 1.
798  */
799 static double current_priorities;
800
801 /**
802  * Datastore 'GET' load tracking.
803  */
804 static struct GNUNET_LOAD_Value *datastore_get_load;
805
806 /**
807  * Datastore 'PUT' load tracking.
808  */
809 static struct GNUNET_LOAD_Value *datastore_put_load;
810
811
812 /**
813  * We've just now completed a datastore request.  Update our
814  * datastore load calculations.
815  *
816  * @param start time when the datastore request was issued
817  */
818 static void
819 update_datastore_delays (struct GNUNET_TIME_Absolute start)
820 {
821   struct GNUNET_TIME_Relative delay;
822
823   delay = GNUNET_TIME_absolute_get_duration (start);
824   GNUNET_LOAD_update (datastore_get_load,
825                       delay.value);
826 }
827
828
829 /**
830  * Get the filename under which we would store the GNUNET_HELLO_Message
831  * for the given host and protocol.
832  * @return filename of the form DIRECTORY/HOSTID
833  */
834 static char *
835 get_trust_filename (const struct GNUNET_PeerIdentity *id)
836 {
837   struct GNUNET_CRYPTO_HashAsciiEncoded fil;
838   char *fn;
839
840   GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
841   GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil);
842   return fn;
843 }
844
845
846
847 /**
848  * Transmit messages by copying it to the target buffer
849  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
850  * for writing in the meantime.  In that case, do nothing
851  * (the disconnect or shutdown handler will take care of the rest).
852  * If we were able to transmit messages and there are still more
853  * pending, ask core again for further calls to this function.
854  *
855  * @param cls closure, pointer to the 'struct ConnectedPeer*'
856  * @param size number of bytes available in buf
857  * @param buf where the callee should write the message
858  * @return number of bytes written to buf
859  */
860 static size_t
861 transmit_to_peer (void *cls,
862                   size_t size, void *buf);
863
864
865 /* ******************* clean up functions ************************ */
866
867 /**
868  * Delete the given migration block.
869  *
870  * @param mb block to delete
871  */
872 static void
873 delete_migration_block (struct MigrationReadyBlock *mb)
874 {
875   GNUNET_CONTAINER_DLL_remove (mig_head,
876                                mig_tail,
877                                mb);
878   GNUNET_PEER_decrement_rcs (mb->target_list,
879                              MIGRATION_LIST_SIZE);
880   mig_size--;
881   GNUNET_free (mb);
882 }
883
884
885 /**
886  * Compare the distance of two peers to a key.
887  *
888  * @param key key
889  * @param p1 first peer
890  * @param p2 second peer
891  * @return GNUNET_YES if P1 is closer to key than P2
892  */
893 static int
894 is_closer (const GNUNET_HashCode *key,
895            const struct GNUNET_PeerIdentity *p1,
896            const struct GNUNET_PeerIdentity *p2)
897 {
898   return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey,
899                                     &p2->hashPubKey,
900                                     key);
901 }
902
903
904 /**
905  * Consider migrating content to a given peer.
906  *
907  * @param cls 'struct MigrationReadyBlock*' to select
908  *            targets for (or NULL for none)
909  * @param key ID of the peer 
910  * @param value 'struct ConnectedPeer' of the peer
911  * @return GNUNET_YES (always continue iteration)
912  */
913 static int
914 consider_migration (void *cls,
915                     const GNUNET_HashCode *key,
916                     void *value)
917 {
918   struct MigrationReadyBlock *mb = cls;
919   struct ConnectedPeer *cp = value;
920   struct MigrationReadyBlock *pos;
921   struct GNUNET_PeerIdentity cppid;
922   struct GNUNET_PeerIdentity otherpid;
923   struct GNUNET_PeerIdentity worstpid;
924   size_t msize;
925   unsigned int i;
926   unsigned int repl;
927   
928   /* consider 'cp' as a migration target for mb */
929   if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).value > 0)
930     return GNUNET_YES; /* peer has requested no migration! */
931   if (mb != NULL)
932     {
933       GNUNET_PEER_resolve (cp->pid,
934                            &cppid);
935       repl = MIGRATION_LIST_SIZE;
936       for (i=0;i<MIGRATION_LIST_SIZE;i++)
937         {
938           if (mb->target_list[i] == 0)
939             {
940               mb->target_list[i] = cp->pid;
941               GNUNET_PEER_change_rc (mb->target_list[i], 1);
942               repl = MIGRATION_LIST_SIZE;
943               break;
944             }
945           GNUNET_PEER_resolve (mb->target_list[i],
946                                &otherpid);
947           if ( (repl == MIGRATION_LIST_SIZE) &&
948                is_closer (&mb->query,
949                           &cppid,
950                           &otherpid)) 
951             {
952               repl = i;
953               worstpid = otherpid;
954             }
955           else if ( (repl != MIGRATION_LIST_SIZE) &&
956                     (is_closer (&mb->query,
957                                 &worstpid,
958                                 &otherpid) ) )
959             {
960               repl = i;
961               worstpid = otherpid;
962             }       
963         }
964       if (repl != MIGRATION_LIST_SIZE) 
965         {
966           GNUNET_PEER_change_rc (mb->target_list[repl], -1);
967           mb->target_list[repl] = cp->pid;
968           GNUNET_PEER_change_rc (mb->target_list[repl], 1);
969         }
970     }
971
972   /* consider scheduling transmission to cp for content migration */
973   if (cp->cth != NULL)
974     return GNUNET_YES; 
975   msize = 0;
976   pos = mig_head;
977   while (pos != NULL)
978     {
979       for (i=0;i<MIGRATION_LIST_SIZE;i++)
980         {
981           if (cp->pid == pos->target_list[i])
982             {
983               if (msize == 0)
984                 msize = pos->size;
985               else
986                 msize = GNUNET_MIN (msize,
987                                     pos->size);
988               break;
989             }
990         }
991       pos = pos->next;
992     }
993   if (msize == 0)
994     return GNUNET_YES; /* no content available */
995 #if DEBUG_FS
996   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
997               "Trying to migrate at least %u bytes to peer `%s'\n",
998               msize,
999               GNUNET_h2s (key));
1000 #endif
1001   cp->cth 
1002     = GNUNET_CORE_notify_transmit_ready (core,
1003                                          0, GNUNET_TIME_UNIT_FOREVER_REL,
1004                                          (const struct GNUNET_PeerIdentity*) key,
1005                                          msize + sizeof (struct PutMessage),
1006                                          &transmit_to_peer,
1007                                          cp);
1008   return GNUNET_YES;
1009 }
1010
1011
1012 /**
1013  * Task that is run periodically to obtain blocks for content
1014  * migration
1015  * 
1016  * @param cls unused
1017  * @param tc scheduler context (also unused)
1018  */
1019 static void
1020 gather_migration_blocks (void *cls,
1021                          const struct GNUNET_SCHEDULER_TaskContext *tc);
1022
1023
1024
1025
1026 /**
1027  * Task that is run periodically to obtain blocks for DHT PUTs.
1028  * 
1029  * @param cls type of blocks to gather
1030  * @param tc scheduler context (unused)
1031  */
1032 static void
1033 gather_dht_put_blocks (void *cls,
1034                        const struct GNUNET_SCHEDULER_TaskContext *tc);
1035
1036
1037 /**
1038  * If the migration task is not currently running, consider
1039  * (re)scheduling it with the appropriate delay.
1040  */
1041 static void
1042 consider_migration_gathering ()
1043 {
1044   struct GNUNET_TIME_Relative delay;
1045
1046   if (dsh == NULL)
1047     return;
1048   if (mig_qe != NULL)
1049     return;
1050   if (mig_task != GNUNET_SCHEDULER_NO_TASK)
1051     return;
1052   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
1053                                          mig_size);
1054   delay = GNUNET_TIME_relative_divide (delay,
1055                                        MAX_MIGRATION_QUEUE);
1056   delay = GNUNET_TIME_relative_max (delay,
1057                                     min_migration_delay);
1058   mig_task = GNUNET_SCHEDULER_add_delayed (sched,
1059                                            delay,
1060                                            &gather_migration_blocks,
1061                                            NULL);
1062 }
1063
1064
1065 /**
1066  * If the DHT PUT gathering task is not currently running, consider
1067  * (re)scheduling it with the appropriate delay.
1068  */
1069 static void
1070 consider_dht_put_gathering (void *cls)
1071 {
1072   struct GNUNET_TIME_Relative delay;
1073
1074   if (dsh == NULL)
1075     return;
1076   if (dht_qe != NULL)
1077     return;
1078   if (dht_task != GNUNET_SCHEDULER_NO_TASK)
1079     return;
1080   if (zero_anonymity_count_estimate > 0)
1081     {
1082       delay = GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY,
1083                                            zero_anonymity_count_estimate);
1084       delay = GNUNET_TIME_relative_min (delay,
1085                                         MAX_DHT_PUT_FREQ);
1086     }
1087   else
1088     {
1089       /* if we have NO zero-anonymity content yet, wait 5 minutes for some to
1090          (hopefully) appear */
1091       delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5);
1092     }
1093   dht_task = GNUNET_SCHEDULER_add_delayed (sched,
1094                                            delay,
1095                                            &gather_dht_put_blocks,
1096                                            cls);
1097 }
1098
1099
1100 /**
1101  * Process content offered for migration.
1102  *
1103  * @param cls closure
1104  * @param key key for the content
1105  * @param size number of bytes in data
1106  * @param data content stored
1107  * @param type type of the content
1108  * @param priority priority of the content
1109  * @param anonymity anonymity-level for the content
1110  * @param expiration expiration time for the content
1111  * @param uid unique identifier for the datum;
1112  *        maybe 0 if no unique identifier is available
1113  */
1114 static void
1115 process_migration_content (void *cls,
1116                            const GNUNET_HashCode * key,
1117                            size_t size,
1118                            const void *data,
1119                            enum GNUNET_BLOCK_Type type,
1120                            uint32_t priority,
1121                            uint32_t anonymity,
1122                            struct GNUNET_TIME_Absolute
1123                            expiration, uint64_t uid)
1124 {
1125   struct MigrationReadyBlock *mb;
1126   
1127   if (key == NULL)
1128     {
1129       mig_qe = NULL;
1130       if (mig_size < MAX_MIGRATION_QUEUE)  
1131         consider_migration_gathering ();
1132       return;
1133     }
1134   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1135     {
1136       if (GNUNET_OK !=
1137           GNUNET_FS_handle_on_demand_block (key, size, data,
1138                                             type, priority, anonymity,
1139                                             expiration, uid, 
1140                                             &process_migration_content,
1141                                             NULL))
1142         {
1143           GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1144         }
1145       return;
1146     }
1147 #if DEBUG_FS
1148   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1149               "Retrieved block `%s' of type %u for migration\n",
1150               GNUNET_h2s (key),
1151               type);
1152 #endif
1153   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
1154   mb->query = *key;
1155   mb->expiration = expiration;
1156   mb->size = size;
1157   mb->type = type;
1158   memcpy (&mb[1], data, size);
1159   GNUNET_CONTAINER_DLL_insert_after (mig_head,
1160                                      mig_tail,
1161                                      mig_tail,
1162                                      mb);
1163   mig_size++;
1164   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1165                                          &consider_migration,
1166                                          mb);
1167   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1168 }
1169
1170
1171 /**
1172  * Function called upon completion of the DHT PUT operation.
1173  */
1174 static void
1175 dht_put_continuation (void *cls,
1176                       const struct GNUNET_SCHEDULER_TaskContext *tc)
1177 {
1178   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1179 }
1180
1181
1182 /**
1183  * Store content in DHT.
1184  *
1185  * @param cls closure
1186  * @param key key for the content
1187  * @param size number of bytes in data
1188  * @param data content stored
1189  * @param type type of the content
1190  * @param priority priority of the content
1191  * @param anonymity anonymity-level for the content
1192  * @param expiration expiration time for the content
1193  * @param uid unique identifier for the datum;
1194  *        maybe 0 if no unique identifier is available
1195  */
1196 static void
1197 process_dht_put_content (void *cls,
1198                          const GNUNET_HashCode * key,
1199                          size_t size,
1200                          const void *data,
1201                          enum GNUNET_BLOCK_Type type,
1202                          uint32_t priority,
1203                          uint32_t anonymity,
1204                          struct GNUNET_TIME_Absolute
1205                          expiration, uint64_t uid)
1206
1207   static unsigned int counter;
1208   static GNUNET_HashCode last_vhash;
1209   static GNUNET_HashCode vhash;
1210
1211   if (key == NULL)
1212     {
1213       dht_qe = NULL;
1214       consider_dht_put_gathering (cls);
1215       return;
1216     }
1217   /* slightly funky code to estimate the total number of values with zero
1218      anonymity from the maximum observed length of a monotonically increasing 
1219      sequence of hashes over the contents */
1220   GNUNET_CRYPTO_hash (data, size, &vhash);
1221   if (GNUNET_CRYPTO_hash_cmp (&vhash, &last_vhash) <= 0)
1222     {
1223       if (zero_anonymity_count_estimate > 0)
1224         zero_anonymity_count_estimate /= 2;
1225       counter = 0;
1226     }
1227   last_vhash = vhash;
1228   if (counter < 31)
1229     counter++;
1230   if (zero_anonymity_count_estimate < (1 << counter))
1231     zero_anonymity_count_estimate = (1 << counter);
1232 #if DEBUG_FS
1233   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1234               "Retrieved block `%s' of type %u for DHT PUT\n",
1235               GNUNET_h2s (key),
1236               type);
1237 #endif
1238   GNUNET_DHT_put (dht_handle,
1239                   key,
1240                   GNUNET_DHT_RO_NONE,
1241                   type,
1242                   size,
1243                   data,
1244                   expiration,
1245                   GNUNET_TIME_UNIT_FOREVER_REL,
1246                   &dht_put_continuation,
1247                   cls);
1248 }
1249
1250
1251 /**
1252  * Task that is run periodically to obtain blocks for content
1253  * migration
1254  * 
1255  * @param cls unused
1256  * @param tc scheduler context (also unused)
1257  */
1258 static void
1259 gather_migration_blocks (void *cls,
1260                          const struct GNUNET_SCHEDULER_TaskContext *tc)
1261 {
1262   mig_task = GNUNET_SCHEDULER_NO_TASK;
1263   if (dsh != NULL)
1264     {
1265       mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, UINT_MAX,
1266                                             GNUNET_TIME_UNIT_FOREVER_REL,
1267                                             &process_migration_content, NULL);
1268       GNUNET_assert (mig_qe != NULL);
1269     }
1270 }
1271
1272
1273 /**
1274  * Task that is run periodically to obtain blocks for DHT PUTs.
1275  * 
1276  * @param cls type of blocks to gather
1277  * @param tc scheduler context (unused)
1278  */
1279 static void
1280 gather_dht_put_blocks (void *cls,
1281                        const struct GNUNET_SCHEDULER_TaskContext *tc)
1282 {
1283   dht_task = GNUNET_SCHEDULER_NO_TASK;
1284   if (dsh != NULL)
1285     {
1286       if (dht_put_type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1287         dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
1288       dht_qe = GNUNET_DATASTORE_get_zero_anonymity (dsh, 0, UINT_MAX,
1289                                                     GNUNET_TIME_UNIT_FOREVER_REL,
1290                                                     dht_put_type++,
1291                                                     &process_dht_put_content, NULL);
1292       GNUNET_assert (dht_qe != NULL);
1293     }
1294 }
1295
1296
1297 /**
1298  * We're done with a particular message list entry.
1299  * Free all associated resources.
1300  * 
1301  * @param pml entry to destroy
1302  */
1303 static void
1304 destroy_pending_message_list_entry (struct PendingMessageList *pml)
1305 {
1306   GNUNET_CONTAINER_DLL_remove (pml->req->pending_head,
1307                                pml->req->pending_tail,
1308                                pml);
1309   GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head,
1310                                pml->target->pending_messages_tail,
1311                                pml->pm);
1312   pml->target->pending_requests--;
1313   GNUNET_free (pml->pm);
1314   GNUNET_free (pml);
1315 }
1316
1317
1318 /**
1319  * Destroy the given pending message (and call the respective
1320  * continuation).
1321  *
1322  * @param pm message to destroy
1323  * @param tpid id of peer that the message was delivered to, or 0 for none
1324  */
1325 static void
1326 destroy_pending_message (struct PendingMessage *pm,
1327                          GNUNET_PEER_Id tpid)
1328 {
1329   struct PendingMessageList *pml = pm->pml;
1330   TransmissionContinuation cont;
1331   void *cont_cls;
1332
1333   if (pml != NULL)
1334     {
1335       GNUNET_assert (pml->pm == pm);
1336       GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
1337       cont = pm->cont;
1338       cont_cls = pm->cont_cls;
1339       destroy_pending_message_list_entry (pml);
1340     }
1341   else
1342     {
1343       GNUNET_free (pm);
1344     }
1345   if (cont != NULL)
1346     cont (cont_cls, tpid);  
1347 }
1348
1349
1350 /**
1351  * We're done processing a particular request.
1352  * Free all associated resources.
1353  *
1354  * @param pr request to destroy
1355  */
1356 static void
1357 destroy_pending_request (struct PendingRequest *pr)
1358 {
1359   struct GNUNET_PeerIdentity pid;
1360
1361   if (pr->hnode != NULL)
1362     {
1363       GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
1364                                          pr->hnode);
1365       pr->hnode = NULL;
1366     }
1367   if (NULL == pr->client_request_list)
1368     {
1369       GNUNET_STATISTICS_update (stats,
1370                                 gettext_noop ("# P2P searches active"),
1371                                 -1,
1372                                 GNUNET_NO);
1373     }
1374   else
1375     {
1376       GNUNET_STATISTICS_update (stats,
1377                                 gettext_noop ("# client searches active"),
1378                                 -1,
1379                                 GNUNET_NO);
1380     }
1381   /* might have already been removed from map in 'process_reply' (if
1382      there was a unique reply) or never inserted if it was a
1383      duplicate; hence ignore the return value here */
1384   (void) GNUNET_CONTAINER_multihashmap_remove (query_request_map,
1385                                                &pr->query,
1386                                                pr);
1387   if (pr->qe != NULL)
1388      {
1389       GNUNET_DATASTORE_cancel (pr->qe);
1390       pr->qe = NULL;
1391     }
1392   if (pr->dht_get != NULL)
1393     {
1394       GNUNET_DHT_get_stop (pr->dht_get);
1395       pr->dht_get = NULL;
1396     }
1397   if (pr->client_request_list != NULL)
1398     {
1399       GNUNET_CONTAINER_DLL_remove (pr->client_request_list->client_list->rl_head,
1400                                    pr->client_request_list->client_list->rl_tail,
1401                                    pr->client_request_list);
1402       GNUNET_free (pr->client_request_list);
1403       pr->client_request_list = NULL;
1404     }
1405   if (pr->cp != NULL)
1406     {
1407       GNUNET_PEER_resolve (pr->cp->pid,
1408                            &pid);
1409       (void) GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1410                                                    &pid.hashPubKey,
1411                                                    pr);
1412       pr->cp = NULL;
1413     }
1414   if (pr->bf != NULL)
1415     {
1416       GNUNET_CONTAINER_bloomfilter_free (pr->bf);                                        
1417       pr->bf = NULL;
1418     }
1419   if (pr->irc != NULL)
1420     {
1421       GNUNET_CORE_peer_change_preference_cancel (pr->irc);
1422       pr->irc = NULL;
1423     }
1424   if (pr->replies_seen != NULL)
1425     {
1426       GNUNET_free (pr->replies_seen);
1427       pr->replies_seen = NULL;
1428     }
1429   if (pr->task != GNUNET_SCHEDULER_NO_TASK)
1430     {
1431       GNUNET_SCHEDULER_cancel (sched,
1432                                pr->task);
1433       pr->task = GNUNET_SCHEDULER_NO_TASK;
1434     }
1435   while (NULL != pr->pending_head)    
1436     destroy_pending_message_list_entry (pr->pending_head);
1437   GNUNET_PEER_change_rc (pr->target_pid, -1);
1438   if (pr->used_pids != NULL)
1439     {
1440       GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
1441       GNUNET_free (pr->used_pids);
1442       pr->used_pids_off = 0;
1443       pr->used_pids_size = 0;
1444       pr->used_pids = NULL;
1445     }
1446   GNUNET_free (pr);
1447 }
1448
1449
1450 /**
1451  * Method called whenever a given peer connects.
1452  *
1453  * @param cls closure, not used
1454  * @param peer peer identity this notification is about
1455  * @param latency reported latency of the connection with 'other'
1456  * @param distance reported distance (DV) to 'other' 
1457  */
1458 static void 
1459 peer_connect_handler (void *cls,
1460                       const struct
1461                       GNUNET_PeerIdentity * peer,
1462                       struct GNUNET_TIME_Relative latency,
1463                       uint32_t distance)
1464 {
1465   struct ConnectedPeer *cp;
1466   struct MigrationReadyBlock *pos;
1467   char *fn;
1468   uint32_t trust;
1469   
1470   cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
1471   cp->transmission_delay = GNUNET_LOAD_value_init ();
1472   cp->pid = GNUNET_PEER_intern (peer);
1473
1474   fn = get_trust_filename (peer);
1475   if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
1476       (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
1477     cp->disk_trust = cp->trust = ntohl (trust);
1478   GNUNET_free (fn);
1479
1480   GNUNET_break (GNUNET_OK ==
1481                 GNUNET_CONTAINER_multihashmap_put (connected_peers,
1482                                                    &peer->hashPubKey,
1483                                                    cp,
1484                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1485
1486   pos = mig_head;
1487   while (NULL != pos)
1488     {
1489       (void) consider_migration (pos, &peer->hashPubKey, cp);
1490       pos = pos->next;
1491     }
1492 }
1493
1494
1495 /**
1496  * Increase the host credit by a value.
1497  *
1498  * @param host which peer to change the trust value on
1499  * @param value is the int value by which the
1500  *  host credit is to be increased or decreased
1501  * @returns the actual change in trust (positive or negative)
1502  */
1503 static int
1504 change_host_trust (struct ConnectedPeer *host, int value)
1505 {
1506   unsigned int old_trust;
1507
1508   if (value == 0)
1509     return 0;
1510   GNUNET_assert (host != NULL);
1511   old_trust = host->trust;
1512   if (value > 0)
1513     {
1514       if (host->trust + value < host->trust)
1515         {
1516           value = UINT32_MAX - host->trust;
1517           host->trust = UINT32_MAX;
1518         }
1519       else
1520         host->trust += value;
1521     }
1522   else
1523     {
1524       if (host->trust < -value)
1525         {
1526           value = -host->trust;
1527           host->trust = 0;
1528         }
1529       else
1530         host->trust += value;
1531     }
1532   return value;
1533 }
1534
1535
1536 /**
1537  * Write host-trust information to a file - flush the buffer entry!
1538  */
1539 static int
1540 flush_trust (void *cls,
1541              const GNUNET_HashCode *key,
1542              void *value)
1543 {
1544   struct ConnectedPeer *host = value;
1545   char *fn;
1546   uint32_t trust;
1547   struct GNUNET_PeerIdentity pid;
1548
1549   if (host->trust == host->disk_trust)
1550     return GNUNET_OK;                     /* unchanged */
1551   GNUNET_PEER_resolve (host->pid,
1552                        &pid);
1553   fn = get_trust_filename (&pid);
1554   if (host->trust == 0)
1555     {
1556       if ((0 != UNLINK (fn)) && (errno != ENOENT))
1557         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
1558                                   GNUNET_ERROR_TYPE_BULK, "unlink", fn);
1559     }
1560   else
1561     {
1562       trust = htonl (host->trust);
1563       if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust, 
1564                                                     sizeof(uint32_t),
1565                                                     GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE
1566                                                     | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ))
1567         host->disk_trust = host->trust;
1568     }
1569   GNUNET_free (fn);
1570   return GNUNET_OK;
1571 }
1572
1573 /**
1574  * Call this method periodically to scan data/hosts for new hosts.
1575  */
1576 static void
1577 cron_flush_trust (void *cls,
1578                   const struct GNUNET_SCHEDULER_TaskContext *tc)
1579 {
1580
1581   if (NULL == connected_peers)
1582     return;
1583   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1584                                          &flush_trust,
1585                                          NULL);
1586   if (NULL == tc)
1587     return;
1588   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1589     return;
1590   GNUNET_SCHEDULER_add_delayed (tc->sched,
1591                                 TRUST_FLUSH_FREQ, &cron_flush_trust, NULL);
1592 }
1593
1594
1595 /**
1596  * Free (each) request made by the peer.
1597  *
1598  * @param cls closure, points to peer that the request belongs to
1599  * @param key current key code
1600  * @param value value in the hash map
1601  * @return GNUNET_YES (we should continue to iterate)
1602  */
1603 static int
1604 destroy_request (void *cls,
1605                  const GNUNET_HashCode * key,
1606                  void *value)
1607 {
1608   const struct GNUNET_PeerIdentity * peer = cls;
1609   struct PendingRequest *pr = value;
1610   
1611   GNUNET_break (GNUNET_YES ==
1612                 GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1613                                                       &peer->hashPubKey,
1614                                                       pr));
1615   destroy_pending_request (pr);
1616   return GNUNET_YES;
1617 }
1618
1619
1620 /**
1621  * Method called whenever a peer disconnects.
1622  *
1623  * @param cls closure, not used
1624  * @param peer peer identity this notification is about
1625  */
1626 static void
1627 peer_disconnect_handler (void *cls,
1628                          const struct
1629                          GNUNET_PeerIdentity * peer)
1630 {
1631   struct ConnectedPeer *cp;
1632   struct PendingMessage *pm;
1633   unsigned int i;
1634   struct MigrationReadyBlock *pos;
1635   struct MigrationReadyBlock *next;
1636
1637   GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map,
1638                                               &peer->hashPubKey,
1639                                               &destroy_request,
1640                                               (void*) peer);
1641   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1642                                           &peer->hashPubKey);
1643   if (cp == NULL)
1644     return;
1645   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1646     {
1647       if (NULL != cp->last_client_replies[i])
1648         {
1649           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1650           cp->last_client_replies[i] = NULL;
1651         }
1652     }
1653   GNUNET_break (GNUNET_YES ==
1654                 GNUNET_CONTAINER_multihashmap_remove (connected_peers,
1655                                                       &peer->hashPubKey,
1656                                                       cp));
1657   /* remove this peer from migration considerations; schedule
1658      alternatives */
1659   next = mig_head;
1660   while (NULL != (pos = next))
1661     {
1662       next = pos->next;
1663       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1664         {
1665           if (pos->target_list[i] == cp->pid)
1666             {
1667               GNUNET_PEER_change_rc (pos->target_list[i], -1);
1668               pos->target_list[i] = 0;
1669             }
1670          }
1671       if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers))
1672         {
1673           delete_migration_block (pos);
1674           consider_migration_gathering ();
1675           continue;
1676         }
1677       GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1678                                              &consider_migration,
1679                                              pos);
1680     }
1681   GNUNET_PEER_change_rc (cp->pid, -1);
1682   GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1683   if (NULL != cp->cth)
1684     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1685   while (NULL != (pm = cp->pending_messages_head))
1686     destroy_pending_message (pm, 0 /* delivery failed */);
1687   GNUNET_LOAD_value_free (cp->transmission_delay);
1688   GNUNET_break (0 == cp->pending_requests);
1689   GNUNET_free (cp);
1690 }
1691
1692
1693 /**
1694  * Iterator over hash map entries that removes all occurences
1695  * of the given 'client' from the 'last_client_replies' of the
1696  * given connected peer.
1697  *
1698  * @param cls closure, the 'struct GNUNET_SERVER_Client*' to remove
1699  * @param key current key code (unused)
1700  * @param value value in the hash map (the 'struct ConnectedPeer*' to change)
1701  * @return GNUNET_YES (we should continue to iterate)
1702  */
1703 static int
1704 remove_client_from_last_client_replies (void *cls,
1705                                         const GNUNET_HashCode * key,
1706                                         void *value)
1707 {
1708   struct GNUNET_SERVER_Client *client = cls;
1709   struct ConnectedPeer *cp = value;
1710   unsigned int i;
1711
1712   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1713     {
1714       if (cp->last_client_replies[i] == client)
1715         {
1716           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1717           cp->last_client_replies[i] = NULL;
1718         }
1719     }  
1720   return GNUNET_YES;
1721 }
1722
1723
1724 /**
1725  * A client disconnected.  Remove all of its pending queries.
1726  *
1727  * @param cls closure, NULL
1728  * @param client identification of the client
1729  */
1730 static void
1731 handle_client_disconnect (void *cls,
1732                           struct GNUNET_SERVER_Client
1733                           * client)
1734 {
1735   struct ClientList *pos;
1736   struct ClientList *prev;
1737   struct ClientRequestList *rcl;
1738   struct ClientResponseMessage *creply;
1739
1740   if (client == NULL)
1741     return;
1742   prev = NULL;
1743   pos = client_list;
1744   while ( (NULL != pos) &&
1745           (pos->client != client) )
1746     {
1747       prev = pos;
1748       pos = pos->next;
1749     }
1750   if (pos == NULL)
1751     return; /* no requests pending for this client */
1752   while (NULL != (rcl = pos->rl_head))
1753     {
1754       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1755                   "Destroying pending request `%s' on disconnect\n",
1756                   GNUNET_h2s (&rcl->req->query));
1757       destroy_pending_request (rcl->req);
1758     }
1759   if (prev == NULL)
1760     client_list = pos->next;
1761   else
1762     prev->next = pos->next;
1763   if (pos->th != NULL)
1764     {
1765       GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
1766       pos->th = NULL;
1767     }
1768   while (NULL != (creply = pos->res_head))
1769     {
1770       GNUNET_CONTAINER_DLL_remove (pos->res_head,
1771                                    pos->res_tail,
1772                                    creply);
1773       GNUNET_free (creply);
1774     }    
1775   GNUNET_SERVER_client_drop (pos->client);
1776   GNUNET_free (pos);
1777   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1778                                          &remove_client_from_last_client_replies,
1779                                          client);
1780 }
1781
1782
1783 /**
1784  * Iterator to free peer entries.
1785  *
1786  * @param cls closure, unused
1787  * @param key current key code
1788  * @param value value in the hash map (peer entry)
1789  * @return GNUNET_YES (we should continue to iterate)
1790  */
1791 static int 
1792 clean_peer (void *cls,
1793             const GNUNET_HashCode * key,
1794             void *value)
1795 {
1796   peer_disconnect_handler (NULL, (const struct GNUNET_PeerIdentity*) key);
1797   return GNUNET_YES;
1798 }
1799
1800
1801 /**
1802  * Task run during shutdown.
1803  *
1804  * @param cls unused
1805  * @param tc unused
1806  */
1807 static void
1808 shutdown_task (void *cls,
1809                const struct GNUNET_SCHEDULER_TaskContext *tc)
1810 {
1811   if (mig_qe != NULL)
1812     {
1813       GNUNET_DATASTORE_cancel (mig_qe);
1814       mig_qe = NULL;
1815     }
1816   if (dht_qe != NULL)
1817     {
1818       GNUNET_DATASTORE_cancel (dht_qe);
1819       dht_qe = NULL;
1820     }
1821   if (GNUNET_SCHEDULER_NO_TASK != mig_task)
1822     {
1823       GNUNET_SCHEDULER_cancel (sched, mig_task);
1824       mig_task = GNUNET_SCHEDULER_NO_TASK;
1825     }
1826   if (GNUNET_SCHEDULER_NO_TASK != dht_task)
1827     {
1828       GNUNET_SCHEDULER_cancel (sched, dht_task);
1829       dht_task = GNUNET_SCHEDULER_NO_TASK;
1830     }
1831   while (client_list != NULL)
1832     handle_client_disconnect (NULL,
1833                               client_list->client);
1834   cron_flush_trust (NULL, NULL);
1835   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1836                                          &clean_peer,
1837                                          NULL);
1838   GNUNET_break (0 == GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap));
1839   GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
1840   requests_by_expiration_heap = 0;
1841   GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
1842   connected_peers = NULL;
1843   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map));
1844   GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
1845   query_request_map = NULL;
1846   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map));
1847   GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
1848   peer_request_map = NULL;
1849   GNUNET_assert (NULL != core);
1850   GNUNET_CORE_disconnect (core);
1851   core = NULL;
1852   if (stats != NULL)
1853     {
1854       GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
1855       stats = NULL;
1856     }
1857   if (dsh != NULL)
1858     {
1859       GNUNET_DATASTORE_disconnect (dsh,
1860                                    GNUNET_NO);
1861       dsh = NULL;
1862     }
1863   while (mig_head != NULL)
1864     delete_migration_block (mig_head);
1865   GNUNET_assert (0 == mig_size);
1866   GNUNET_DHT_disconnect (dht_handle);
1867   dht_handle = NULL;
1868   GNUNET_LOAD_value_free (datastore_get_load);
1869   datastore_get_load = NULL;
1870   GNUNET_LOAD_value_free (datastore_put_load);
1871   datastore_put_load = NULL;
1872   GNUNET_BLOCK_context_destroy (block_ctx);
1873   block_ctx = NULL;
1874   GNUNET_CONFIGURATION_destroy (block_cfg);
1875   block_cfg = NULL;
1876   sched = NULL;
1877   cfg = NULL;  
1878   GNUNET_free_non_null (trustDirectory);
1879   trustDirectory = NULL;
1880 }
1881
1882
1883 /* ******************* Utility functions  ******************** */
1884
1885
1886 /**
1887  * Transmit messages by copying it to the target buffer
1888  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
1889  * for writing in the meantime.  In that case, do nothing
1890  * (the disconnect or shutdown handler will take care of the rest).
1891  * If we were able to transmit messages and there are still more
1892  * pending, ask core again for further calls to this function.
1893  *
1894  * @param cls closure, pointer to the 'struct ConnectedPeer*'
1895  * @param size number of bytes available in buf
1896  * @param buf where the callee should write the message
1897  * @return number of bytes written to buf
1898  */
1899 static size_t
1900 transmit_to_peer (void *cls,
1901                   size_t size, void *buf)
1902 {
1903   struct ConnectedPeer *cp = cls;
1904   char *cbuf = buf;
1905   struct GNUNET_PeerIdentity pid;
1906   struct PendingMessage *pm;
1907   struct MigrationReadyBlock *mb;
1908   struct MigrationReadyBlock *next;
1909   struct PutMessage migm;
1910   size_t msize;
1911   unsigned int i;
1912  
1913   cp->cth = NULL;
1914   if (NULL == buf)
1915     {
1916 #if DEBUG_FS
1917       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1918                   "Dropping message, core too busy.\n");
1919 #endif
1920       return 0;
1921     }
1922   GNUNET_LOAD_update (cp->transmission_delay,
1923                       GNUNET_TIME_absolute_get_duration (cp->last_transmission_request_start).value);
1924   msize = 0;
1925   while ( (NULL != (pm = cp->pending_messages_head) ) &&
1926           (pm->msize <= size) )
1927     {
1928       memcpy (&cbuf[msize], &pm[1], pm->msize);
1929       msize += pm->msize;
1930       size -= pm->msize;
1931       destroy_pending_message (pm, cp->pid);
1932     }
1933   if (NULL != pm)
1934     {
1935       GNUNET_PEER_resolve (cp->pid,
1936                            &pid);
1937       cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
1938       cp->cth = GNUNET_CORE_notify_transmit_ready (core,
1939                                                    pm->priority,
1940                                                    GNUNET_CONSTANTS_SERVICE_TIMEOUT,
1941                                                    &pid,
1942                                                    pm->msize,
1943                                                    &transmit_to_peer,
1944                                                    cp);
1945     }
1946   else
1947     {      
1948       next = mig_head;
1949       while (NULL != (mb = next))
1950         {
1951           next = mb->next;
1952           for (i=0;i<MIGRATION_LIST_SIZE;i++)
1953             {
1954               if ( (cp->pid == mb->target_list[i]) &&
1955                    (mb->size + sizeof (migm) <= size) )
1956                 {
1957                   GNUNET_PEER_change_rc (mb->target_list[i], -1);
1958                   mb->target_list[i] = 0;
1959                   mb->used_targets++;
1960                   memset (&migm, 0, sizeof (migm));
1961                   migm.header.size = htons (sizeof (migm) + mb->size);
1962                   migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
1963                   migm.type = htonl (mb->type);
1964                   migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
1965                   memcpy (&cbuf[msize], &migm, sizeof (migm));
1966                   msize += sizeof (migm);
1967                   size -= sizeof (migm);
1968                   memcpy (&cbuf[msize], &mb[1], mb->size);
1969                   msize += mb->size;
1970                   size -= mb->size;
1971 #if DEBUG_FS
1972                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1973                               "Pushing migration block `%s' (%u bytes) to `%s'\n",
1974                               GNUNET_h2s (&mb->query),
1975                               mb->size,
1976                               GNUNET_i2s (&pid));
1977 #endif    
1978                   break;
1979                 }
1980               else
1981                 {
1982 #if DEBUG_FS
1983                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1984                               "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
1985                               GNUNET_h2s (&mb->query),
1986                               mb->size,
1987                               GNUNET_i2s (&pid));
1988 #endif    
1989                 }
1990             }
1991           if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
1992                (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
1993             {
1994               delete_migration_block (mb);
1995               consider_migration_gathering ();
1996             }
1997         }
1998       consider_migration (NULL, 
1999                           &pid.hashPubKey,
2000                           cp);
2001     }
2002 #if DEBUG_FS
2003   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2004               "Transmitting %u bytes to peer %u\n",
2005               msize,
2006               cp->pid);
2007 #endif
2008   return msize;
2009 }
2010
2011
2012 /**
2013  * Add a message to the set of pending messages for the given peer.
2014  *
2015  * @param cp peer to send message to
2016  * @param pm message to queue
2017  * @param pr request on which behalf this message is being queued
2018  */
2019 static void
2020 add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
2021                                   struct PendingMessage *pm,
2022                                   struct PendingRequest *pr)
2023 {
2024   struct PendingMessage *pos;
2025   struct PendingMessageList *pml;
2026   struct GNUNET_PeerIdentity pid;
2027
2028   GNUNET_assert (pm->next == NULL);
2029   GNUNET_assert (pm->pml == NULL);    
2030   if (pr != NULL)
2031     {
2032       pml = GNUNET_malloc (sizeof (struct PendingMessageList));
2033       pml->req = pr;
2034       pml->target = cp;
2035       pml->pm = pm;
2036       pm->pml = pml;  
2037       GNUNET_CONTAINER_DLL_insert (pr->pending_head,
2038                                    pr->pending_tail,
2039                                    pml);
2040     }
2041   pos = cp->pending_messages_head;
2042   while ( (pos != NULL) &&
2043           (pm->priority < pos->priority) )
2044     pos = pos->next;    
2045   GNUNET_CONTAINER_DLL_insert_after (cp->pending_messages_head,
2046                                      cp->pending_messages_tail,
2047                                      pos,
2048                                      pm);
2049   cp->pending_requests++;
2050   if (cp->pending_requests > MAX_QUEUE_PER_PEER)
2051     destroy_pending_message (cp->pending_messages_tail, 0);  
2052   GNUNET_PEER_resolve (cp->pid, &pid);
2053   if (NULL != cp->cth)
2054     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
2055   /* need to schedule transmission */
2056   cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
2057   cp->cth = GNUNET_CORE_notify_transmit_ready (core,
2058                                                cp->pending_messages_head->priority,
2059                                                MAX_TRANSMIT_DELAY,
2060                                                &pid,
2061                                                cp->pending_messages_head->msize,
2062                                                &transmit_to_peer,
2063                                                cp);
2064   if (cp->cth == NULL)
2065     {
2066 #if DEBUG_FS
2067       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2068                   "Failed to schedule transmission with core!\n");
2069 #endif
2070       GNUNET_STATISTICS_update (stats,
2071                                 gettext_noop ("# CORE transmission failures"),
2072                                 1,
2073                                 GNUNET_NO);
2074     }
2075 }
2076
2077
2078 /**
2079  * Test if the load on this peer is too high
2080  * to even consider processing the query at
2081  * all.
2082  * 
2083  * @return GNUNET_YES if the load is too high to do anything, GNUNET_NO to forward (load high, but not too high), GNUNET_SYSERR to indirect (load low)
2084  */
2085 static int
2086 test_load_too_high ()
2087 {
2088   return GNUNET_SYSERR; // FIXME
2089 }
2090
2091
2092 /* ******************* Pending Request Refresh Task ******************** */
2093
2094
2095
2096 /**
2097  * We use a random delay to make the timing of requests less
2098  * predictable.  This function returns such a random delay.  We add a base
2099  * delay of MAX_CORK_DELAY (1s).
2100  *
2101  * FIXME: make schedule dependent on the specifics of the request?
2102  * Or bandwidth and number of connected peers and load?
2103  *
2104  * @return random delay to use for some request, between 1s and 1000+TTL_DECREMENT ms
2105  */
2106 static struct GNUNET_TIME_Relative
2107 get_processing_delay ()
2108 {
2109   return 
2110     GNUNET_TIME_relative_add (GNUNET_CONSTANTS_MAX_CORK_DELAY,
2111                               GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
2112                                                              GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2113                                                                                        TTL_DECREMENT)));
2114 }
2115
2116
2117 /**
2118  * We're processing a GET request from another peer and have decided
2119  * to forward it to other peers.  This function is called periodically
2120  * and should forward the request to other peers until we have all
2121  * possible replies.  If we have transmitted the *only* reply to
2122  * the initiator we should destroy the pending request.  If we have
2123  * many replies in the queue to the initiator, we should delay sending
2124  * out more queries until the reply queue has shrunk some.
2125  *
2126  * @param cls our "struct ProcessGetContext *"
2127  * @param tc unused
2128  */
2129 static void
2130 forward_request_task (void *cls,
2131                       const struct GNUNET_SCHEDULER_TaskContext *tc);
2132
2133
2134 /**
2135  * Function called after we either failed or succeeded
2136  * at transmitting a query to a peer.  
2137  *
2138  * @param cls the requests "struct PendingRequest*"
2139  * @param tpid ID of receiving peer, 0 on transmission error
2140  */
2141 static void
2142 transmit_query_continuation (void *cls,
2143                              GNUNET_PEER_Id tpid)
2144 {
2145   struct PendingRequest *pr = cls;
2146
2147   GNUNET_STATISTICS_update (stats,
2148                             gettext_noop ("# queries scheduled for forwarding"),
2149                             -1,
2150                             GNUNET_NO);
2151   if (tpid == 0)   
2152     {
2153 #if DEBUG_FS
2154       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2155                   "Transmission of request failed, will try again later.\n");
2156 #endif
2157       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2158         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2159                                                  get_processing_delay (),
2160                                                  &forward_request_task,
2161                                                  pr); 
2162       return;    
2163     }
2164   GNUNET_STATISTICS_update (stats,
2165                             gettext_noop ("# queries forwarded"),
2166                             1,
2167                             GNUNET_NO);
2168   GNUNET_PEER_change_rc (tpid, 1);
2169   if (pr->used_pids_off == pr->used_pids_size)
2170     GNUNET_array_grow (pr->used_pids,
2171                        pr->used_pids_size,
2172                        pr->used_pids_size * 2 + 2);
2173   pr->used_pids[pr->used_pids_off++] = tpid;
2174   if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2175     pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2176                                              get_processing_delay (),
2177                                              &forward_request_task,
2178                                              pr);
2179 }
2180
2181
2182 /**
2183  * How many bytes should a bloomfilter be if we have already seen
2184  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
2185  * of bits set per entry.  Furthermore, we should not re-size the
2186  * filter too often (to keep it cheap).
2187  *
2188  * Since other peers will also add entries but not resize the filter,
2189  * we should generally pick a slightly larger size than what the
2190  * strict math would suggest.
2191  *
2192  * @return must be a power of two and smaller or equal to 2^15.
2193  */
2194 static size_t
2195 compute_bloomfilter_size (unsigned int entry_count)
2196 {
2197   size_t size;
2198   unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
2199   uint16_t max = 1 << 15;
2200
2201   if (entry_count > max)
2202     return max;
2203   size = 8;
2204   while ((size < max) && (size < ideal))
2205     size *= 2;
2206   if (size > max)
2207     return max;
2208   return size;
2209 }
2210
2211
2212 /**
2213  * Recalculate our bloom filter for filtering replies.  This function
2214  * will create a new bloom filter from scratch, so it should only be
2215  * called if we have no bloomfilter at all (and hence can create a
2216  * fresh one of minimal size without problems) OR if our peer is the
2217  * initiator (in which case we may resize to larger than mimimum size).
2218  *
2219  * @param pr request for which the BF is to be recomputed
2220  */
2221 static void
2222 refresh_bloomfilter (struct PendingRequest *pr)
2223 {
2224   unsigned int i;
2225   size_t nsize;
2226   GNUNET_HashCode mhash;
2227
2228   nsize = compute_bloomfilter_size (pr->replies_seen_off);
2229   if (nsize == pr->bf_size)
2230     return; /* size not changed */
2231   if (pr->bf != NULL)
2232     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
2233   pr->bf_size = nsize;
2234   pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
2235   pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
2236                                               pr->bf_size,
2237                                               BLOOMFILTER_K);
2238   for (i=0;i<pr->replies_seen_off;i++)
2239     {
2240       GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i],
2241                                 pr->mingle,
2242                                 &mhash);
2243       GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
2244     }
2245 }
2246
2247
2248 /**
2249  * Function called after we've tried to reserve a certain amount of
2250  * bandwidth for a reply.  Check if we succeeded and if so send our
2251  * query.
2252  *
2253  * @param cls the requests "struct PendingRequest*"
2254  * @param peer identifies the peer
2255  * @param bpm_in set to the current bandwidth limit (receiving) for this peer
2256  * @param bpm_out set to the current bandwidth limit (sending) for this peer
2257  * @param amount set to the amount that was actually reserved or unreserved
2258  * @param preference current traffic preference for the given peer
2259  */
2260 static void
2261 target_reservation_cb (void *cls,
2262                        const struct
2263                        GNUNET_PeerIdentity * peer,
2264                        struct GNUNET_BANDWIDTH_Value32NBO bpm_in,
2265                        struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
2266                        int amount,
2267                        uint64_t preference)
2268 {
2269   struct PendingRequest *pr = cls;
2270   struct ConnectedPeer *cp;
2271   struct PendingMessage *pm;
2272   struct GetMessage *gm;
2273   GNUNET_HashCode *ext;
2274   char *bfdata;
2275   size_t msize;
2276   unsigned int k;
2277   int no_route;
2278   uint32_t bm;
2279
2280   pr->irc = NULL;
2281   if (peer == NULL)
2282     {
2283       /* error in communication with core, try again later */
2284       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2285         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2286                                                  get_processing_delay (),
2287                                                  &forward_request_task,
2288                                                  pr);
2289       return;
2290     }
2291   // (3) transmit, update ttl/priority
2292   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2293                                           &peer->hashPubKey);
2294   if (cp == NULL)
2295     {
2296       /* Peer must have just left */
2297 #if DEBUG_FS
2298       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2299                   "Selected peer disconnected!\n");
2300 #endif
2301       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2302         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2303                                                  get_processing_delay (),
2304                                                  &forward_request_task,
2305                                                  pr);
2306       return;
2307     }
2308   no_route = GNUNET_NO;
2309   if (amount == 0)
2310     {
2311       if (pr->cp == NULL)
2312         {
2313 #if DEBUG_FS > 1
2314           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2315                       "Failed to reserve bandwidth for reply (got %d/%u bytes only)!\n",
2316                       amount,
2317                       DBLOCK_SIZE);
2318 #endif
2319           GNUNET_STATISTICS_update (stats,
2320                                     gettext_noop ("# reply bandwidth reservation requests failed"),
2321                                     1,
2322                                     GNUNET_NO);
2323           if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2324             pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2325                                                      get_processing_delay (),
2326                                                      &forward_request_task,
2327                                                      pr);
2328           return;  /* this target round failed */
2329         }
2330       /* FIXME: if we are "quite" busy, we may still want to skip
2331          this round; need more load detection code! */
2332       no_route = GNUNET_YES;
2333     }
2334   
2335   GNUNET_STATISTICS_update (stats,
2336                             gettext_noop ("# queries scheduled for forwarding"),
2337                             1,
2338                             GNUNET_NO);
2339   /* build message and insert message into priority queue */
2340 #if DEBUG_FS
2341   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2342               "Forwarding request `%s' to `%4s'!\n",
2343               GNUNET_h2s (&pr->query),
2344               GNUNET_i2s (peer));
2345 #endif
2346   k = 0;
2347   bm = 0;
2348   if (GNUNET_YES == no_route)
2349     {
2350       bm |= GET_MESSAGE_BIT_RETURN_TO;
2351       k++;      
2352     }
2353   if (pr->namespace != NULL)
2354     {
2355       bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
2356       k++;
2357     }
2358   if (pr->target_pid != 0)
2359     {
2360       bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
2361       k++;
2362     }
2363   msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
2364   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
2365   pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
2366   pm->msize = msize;
2367   gm = (struct GetMessage*) &pm[1];
2368   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
2369   gm->header.size = htons (msize);
2370   gm->type = htonl (pr->type);
2371   pr->remaining_priority /= 2;
2372   gm->priority = htonl (pr->remaining_priority);
2373   gm->ttl = htonl (pr->ttl);
2374   gm->filter_mutator = htonl(pr->mingle); 
2375   gm->hash_bitmap = htonl (bm);
2376   gm->query = pr->query;
2377   ext = (GNUNET_HashCode*) &gm[1];
2378   k = 0;
2379   if (GNUNET_YES == no_route)
2380     GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2381   if (pr->namespace != NULL)
2382     memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
2383   if (pr->target_pid != 0)
2384     GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2385   bfdata = (char *) &ext[k];
2386   if (pr->bf != NULL)
2387     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
2388                                                bfdata,
2389                                                pr->bf_size);
2390   pm->cont = &transmit_query_continuation;
2391   pm->cont_cls = pr;
2392   add_to_pending_messages_for_peer (cp, pm, pr);
2393 }
2394
2395
2396 /**
2397  * Closure used for "target_peer_select_cb".
2398  */
2399 struct PeerSelectionContext 
2400 {
2401   /**
2402    * The request for which we are selecting
2403    * peers.
2404    */
2405   struct PendingRequest *pr;
2406
2407   /**
2408    * Current "prime" target.
2409    */
2410   struct GNUNET_PeerIdentity target;
2411
2412   /**
2413    * How much do we like this target?
2414    */
2415   double target_score;
2416
2417 };
2418
2419
2420 /**
2421  * Function called for each connected peer to determine
2422  * which one(s) would make good targets for forwarding.
2423  *
2424  * @param cls closure (struct PeerSelectionContext)
2425  * @param key current key code (peer identity)
2426  * @param value value in the hash map (struct ConnectedPeer)
2427  * @return GNUNET_YES if we should continue to
2428  *         iterate,
2429  *         GNUNET_NO if not.
2430  */
2431 static int
2432 target_peer_select_cb (void *cls,
2433                        const GNUNET_HashCode * key,
2434                        void *value)
2435 {
2436   struct PeerSelectionContext *psc = cls;
2437   struct ConnectedPeer *cp = value;
2438   struct PendingRequest *pr = psc->pr;
2439   double score;
2440   unsigned int i;
2441   unsigned int pc;
2442
2443   /* 1) check that this peer is not the initiator */
2444   if (cp == pr->cp)
2445     {
2446 #if DEBUG_FS
2447       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2448                   "Skipping initiator in forwarding selection\n");
2449 #endif
2450       return GNUNET_YES; /* skip */        
2451     }
2452
2453   /* 2) check if we have already (recently) forwarded to this peer */
2454   pc = 0;
2455   for (i=0;i<pr->used_pids_off;i++)
2456     if (pr->used_pids[i] == cp->pid) 
2457       {
2458         pc++;
2459         if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2460                                            RETRY_PROBABILITY_INV))
2461           {
2462 #if DEBUG_FS
2463             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2464                         "NOT re-trying query that was previously transmitted %u times\n",
2465                         (unsigned int) pr->used_pids_off);
2466 #endif
2467             return GNUNET_YES; /* skip */
2468           }
2469       }
2470 #if DEBUG_FS
2471   if (0 < pc)
2472     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2473                 "Re-trying query that was previously transmitted %u times to this peer\n",
2474                 (unsigned int) pc);
2475 #endif
2476   /* 3) calculate how much we'd like to forward to this peer,
2477      starting with a random value that is strong enough
2478      to at least give any peer a chance sometimes 
2479      (compared to the other factors that come later) */
2480   /* 3a) count successful (recent) routes from cp for same source */
2481   if (pr->cp != NULL)
2482     {
2483       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2484                                         P2P_SUCCESS_LIST_SIZE);
2485       for (i=0;i<P2P_SUCCESS_LIST_SIZE;i++)
2486         if (cp->last_p2p_replies[i] == pr->cp->pid)
2487           score += 1; /* likely successful based on hot path */
2488     }
2489   else
2490     {
2491       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2492                                         CS2P_SUCCESS_LIST_SIZE);
2493       for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
2494         if (cp->last_client_replies[i] == pr->client_request_list->client_list->client)
2495           score += 1; /* likely successful based on hot path */
2496     }
2497   /* 3b) include latency */
2498   if (cp->avg_delay.value < 4 * TTL_DECREMENT)
2499     score += 1; /* likely fast based on latency */
2500   /* 3c) include priorities */
2501   if (cp->avg_priority <= pr->remaining_priority / 2.0)
2502     score += 1; /* likely successful based on priorities */
2503   /* 3d) penalize for queue size */  
2504   score -= (2.0 * cp->pending_requests / (double) MAX_QUEUE_PER_PEER); 
2505   /* 3e) include peer proximity */
2506   score -= (2.0 * (GNUNET_CRYPTO_hash_distance_u32 (key,
2507                                                     &pr->query)) / (double) UINT32_MAX);
2508   /* 4) super-bonus for being the known target */
2509   if (pr->target_pid == cp->pid)
2510     score += 100.0;
2511   /* store best-fit in closure */
2512 #if DEBUG_FS
2513   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2514               "Peer `%s' gets score %f for forwarding query, max is %f\n",
2515               GNUNET_h2s (key),
2516               score,
2517               psc->target_score);
2518 #endif  
2519   score++; /* avoid zero */
2520   if (score > psc->target_score)
2521     {
2522       psc->target_score = score;
2523       psc->target.hashPubKey = *key; 
2524     }
2525   return GNUNET_YES;
2526 }
2527   
2528
2529 /**
2530  * The priority level imposes a bound on the maximum
2531  * value for the ttl that can be requested.
2532  *
2533  * @param ttl_in requested ttl
2534  * @param prio given priority
2535  * @return ttl_in if ttl_in is below the limit,
2536  *         otherwise the ttl-limit for the given priority
2537  */
2538 static int32_t
2539 bound_ttl (int32_t ttl_in, uint32_t prio)
2540 {
2541   unsigned long long allowed;
2542
2543   if (ttl_in <= 0)
2544     return ttl_in;
2545   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
2546   if (ttl_in > allowed)      
2547     {
2548       if (allowed >= (1 << 30))
2549         return 1 << 30;
2550       return allowed;
2551     }
2552   return ttl_in;
2553 }
2554
2555
2556 /**
2557  * Iterator called on each result obtained for a DHT
2558  * operation that expects a reply
2559  *
2560  * @param cls closure
2561  * @param exp when will this value expire
2562  * @param key key of the result
2563  * @param get_path NULL-terminated array of pointers
2564  *                 to the peers on reverse GET path (or NULL if not recorded)
2565  * @param put_path NULL-terminated array of pointers
2566  *                 to the peers on the PUT path (or NULL if not recorded)
2567  * @param type type of the result
2568  * @param size number of bytes in data
2569  * @param data pointer to the result data
2570  */
2571 static void
2572 process_dht_reply (void *cls,
2573                    struct GNUNET_TIME_Absolute exp,
2574                    const GNUNET_HashCode * key,
2575                    const struct GNUNET_PeerIdentity * const *get_path,
2576                    const struct GNUNET_PeerIdentity * const *put_path,
2577                    enum GNUNET_BLOCK_Type type,
2578                    size_t size,
2579                    const void *data);
2580
2581
2582 /**
2583  * We're processing a GET request and have decided
2584  * to forward it to other peers.  This function is called periodically
2585  * and should forward the request to other peers until we have all
2586  * possible replies.  If we have transmitted the *only* reply to
2587  * the initiator we should destroy the pending request.  If we have
2588  * many replies in the queue to the initiator, we should delay sending
2589  * out more queries until the reply queue has shrunk some.
2590  *
2591  * @param cls our "struct ProcessGetContext *"
2592  * @param tc unused
2593  */
2594 static void
2595 forward_request_task (void *cls,
2596                      const struct GNUNET_SCHEDULER_TaskContext *tc)
2597 {
2598   struct PendingRequest *pr = cls;
2599   struct PeerSelectionContext psc;
2600   struct ConnectedPeer *cp; 
2601   struct GNUNET_TIME_Relative delay;
2602
2603   pr->task = GNUNET_SCHEDULER_NO_TASK;
2604   if (pr->irc != NULL)
2605     {
2606 #if DEBUG_FS
2607       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2608                   "Forwarding of query `%s' not attempted due to pending local lookup!\n",
2609                   GNUNET_h2s (&pr->query));
2610 #endif
2611       return; /* already pending */
2612     }
2613   if (GNUNET_YES == pr->local_only)
2614     return; /* configured to not do P2P search */
2615   /* (0) try DHT */
2616   if ( (0 == pr->anonymity_level) &&
2617        (pr->type != GNUNET_BLOCK_TYPE_FS_DBLOCK) &&
2618        (pr->type != GNUNET_BLOCK_TYPE_FS_IBLOCK) )
2619     {
2620       pr->dht_get = GNUNET_DHT_get_start (dht_handle,
2621                                           GNUNET_TIME_UNIT_FOREVER_REL,
2622                                           pr->type,
2623                                           &pr->query,
2624                                           GNUNET_DHT_RO_NONE,
2625                                           pr->bf,
2626                                           pr->mingle,
2627                                           pr->namespace,
2628                                           (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
2629                                           &process_dht_reply,
2630                                           pr);
2631     }
2632   /* (1) select target */
2633   psc.pr = pr;
2634   psc.target_score = -DBL_MAX;
2635   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
2636                                          &target_peer_select_cb,
2637                                          &psc);  
2638   if (psc.target_score == -DBL_MAX)
2639     {
2640       delay = get_processing_delay ();
2641 #if DEBUG_FS 
2642       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2643                   "No peer selected for forwarding of query `%s', will try again in %llu ms!\n",
2644                   GNUNET_h2s (&pr->query),
2645                   delay.value);
2646 #endif
2647       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2648                                                delay,
2649                                                &forward_request_task,
2650                                                pr);
2651       return; /* nobody selected */
2652     }
2653   /* (3) update TTL/priority */
2654   if (pr->client_request_list != NULL)
2655     {
2656       /* FIXME: use better algorithm!? */
2657       if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2658                                          4))
2659         pr->priority++;
2660       /* bound priority we use by priorities we see from other peers
2661          rounded up (must round up so that we can see non-zero
2662          priorities, but round up as little as possible to make it
2663          plausible that we forwarded another peers request) */
2664       if (pr->priority > current_priorities + 1.0)
2665         pr->priority = (uint32_t) current_priorities + 1.0;
2666       pr->ttl = bound_ttl (pr->ttl + TTL_DECREMENT * 2,
2667                            pr->priority);
2668 #if DEBUG_FS
2669       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2670                   "Trying query `%s' with priority %u and TTL %d.\n",
2671                   GNUNET_h2s (&pr->query),
2672                   pr->priority,
2673                   pr->ttl);
2674 #endif
2675     }
2676
2677   /* (3) reserve reply bandwidth */
2678   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2679                                           &psc.target.hashPubKey);
2680   GNUNET_assert (NULL != cp);
2681   pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
2682                                                 &psc.target,
2683                                                 GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
2684                                                 GNUNET_BANDWIDTH_value_init (UINT32_MAX),
2685                                                 DBLOCK_SIZE * 2, 
2686                                                 cp->inc_preference,
2687                                                 &target_reservation_cb,
2688                                                 pr);
2689   cp->inc_preference = 0;
2690 }
2691
2692
2693 /* **************************** P2P PUT Handling ************************ */
2694
2695
2696 /**
2697  * Function called after we either failed or succeeded
2698  * at transmitting a reply to a peer.  
2699  *
2700  * @param cls the requests "struct PendingRequest*"
2701  * @param tpid ID of receiving peer, 0 on transmission error
2702  */
2703 static void
2704 transmit_reply_continuation (void *cls,
2705                              GNUNET_PEER_Id tpid)
2706 {
2707   struct PendingRequest *pr = cls;
2708   
2709   switch (pr->type)
2710     {
2711     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
2712     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
2713       /* only one reply expected, done with the request! */
2714       destroy_pending_request (pr);
2715       break;
2716     case GNUNET_BLOCK_TYPE_ANY:
2717     case GNUNET_BLOCK_TYPE_FS_KBLOCK:
2718     case GNUNET_BLOCK_TYPE_FS_SBLOCK:
2719       break;
2720     default:
2721       GNUNET_break (0);
2722       break;
2723     }
2724 }
2725
2726
2727 /**
2728  * Transmit the given message by copying it to the target buffer
2729  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
2730  * for writing in the meantime.  In that case, do nothing
2731  * (the disconnect or shutdown handler will take care of the rest).
2732  * If we were able to transmit messages and there are still more
2733  * pending, ask core again for further calls to this function.
2734  *
2735  * @param cls closure, pointer to the 'struct ClientList*'
2736  * @param size number of bytes available in buf
2737  * @param buf where the callee should write the message
2738  * @return number of bytes written to buf
2739  */
2740 static size_t
2741 transmit_to_client (void *cls,
2742                   size_t size, void *buf)
2743 {
2744   struct ClientList *cl = cls;
2745   char *cbuf = buf;
2746   struct ClientResponseMessage *creply;
2747   size_t msize;
2748   
2749   cl->th = NULL;
2750   if (NULL == buf)
2751     {
2752 #if DEBUG_FS
2753       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2754                   "Not sending reply, client communication problem.\n");
2755 #endif
2756       return 0;
2757     }
2758   msize = 0;
2759   while ( (NULL != (creply = cl->res_head) ) &&
2760           (creply->msize <= size) )
2761     {
2762       memcpy (&cbuf[msize], &creply[1], creply->msize);
2763       msize += creply->msize;
2764       size -= creply->msize;
2765       GNUNET_CONTAINER_DLL_remove (cl->res_head,
2766                                    cl->res_tail,
2767                                    creply);
2768       GNUNET_free (creply);
2769     }
2770   if (NULL != creply)
2771     cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
2772                                                   creply->msize,
2773                                                   GNUNET_TIME_UNIT_FOREVER_REL,
2774                                                   &transmit_to_client,
2775                                                   cl);
2776 #if DEBUG_FS
2777   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2778               "Transmitted %u bytes to client\n",
2779               (unsigned int) msize);
2780 #endif
2781   return msize;
2782 }
2783
2784
2785 /**
2786  * Closure for "process_reply" function.
2787  */
2788 struct ProcessReplyClosure
2789 {
2790   /**
2791    * The data for the reply.
2792    */
2793   const void *data;
2794
2795   /**
2796    * Who gave us this reply? NULL for local host (or DHT)
2797    */
2798   struct ConnectedPeer *sender;
2799
2800   /**
2801    * When the reply expires.
2802    */
2803   struct GNUNET_TIME_Absolute expiration;
2804
2805   /**
2806    * Size of data.
2807    */
2808   size_t size;
2809
2810   /**
2811    * Type of the block.
2812    */
2813   enum GNUNET_BLOCK_Type type;
2814
2815   /**
2816    * How much was this reply worth to us?
2817    */
2818   uint32_t priority;
2819
2820   /**
2821    * Evaluation result (returned).
2822    */
2823   enum GNUNET_BLOCK_EvaluationResult eval;
2824
2825   /**
2826    * Did we finish processing the associated request?
2827    */ 
2828   int finished;
2829
2830   /**
2831    * Did we find a matching request?
2832    */
2833   int request_found;
2834 };
2835
2836
2837 /**
2838  * We have received a reply; handle it!
2839  *
2840  * @param cls response (struct ProcessReplyClosure)
2841  * @param key our query
2842  * @param value value in the hash map (info about the query)
2843  * @return GNUNET_YES (we should continue to iterate)
2844  */
2845 static int
2846 process_reply (void *cls,
2847                const GNUNET_HashCode * key,
2848                void *value)
2849 {
2850   struct ProcessReplyClosure *prq = cls;
2851   struct PendingRequest *pr = value;
2852   struct PendingMessage *reply;
2853   struct ClientResponseMessage *creply;
2854   struct ClientList *cl;
2855   struct PutMessage *pm;
2856   struct ConnectedPeer *cp;
2857   struct GNUNET_TIME_Relative cur_delay;
2858   size_t msize;
2859
2860 #if DEBUG_FS
2861   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2862               "Matched result (type %u) for query `%s' with pending request\n",
2863               (unsigned int) prq->type,
2864               GNUNET_h2s (key));
2865 #endif  
2866   GNUNET_STATISTICS_update (stats,
2867                             gettext_noop ("# replies received and matched"),
2868                             1,
2869                             GNUNET_NO);
2870   if (prq->sender != NULL)
2871     {
2872       /* FIXME: should we be more precise here and not use
2873          "start_time" but a peer-specific time stamp? */
2874       cur_delay = GNUNET_TIME_absolute_get_duration (pr->start_time);
2875       prq->sender->avg_delay.value
2876         = (prq->sender->avg_delay.value * 
2877            (RUNAVG_DELAY_N - 1) + cur_delay.value) / RUNAVG_DELAY_N; 
2878       prq->sender->avg_priority
2879         = (prq->sender->avg_priority * 
2880            (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
2881       if (pr->cp != NULL)
2882         {
2883           GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
2884                                  [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], 
2885                                  -1);
2886           GNUNET_PEER_change_rc (pr->cp->pid, 1);
2887           prq->sender->last_p2p_replies
2888             [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
2889             = pr->cp->pid;
2890         }
2891       else
2892         {
2893           if (NULL != prq->sender->last_client_replies
2894               [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
2895             GNUNET_SERVER_client_drop (prq->sender->last_client_replies
2896                                        [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
2897           prq->sender->last_client_replies
2898             [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
2899             = pr->client_request_list->client_list->client;
2900           GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
2901         }
2902     }
2903   prq->eval = GNUNET_BLOCK_evaluate (block_ctx,
2904                                      prq->type,
2905                                      key,
2906                                      &pr->bf,
2907                                      pr->mingle,
2908                                      pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
2909                                      prq->data,
2910                                      prq->size);
2911   switch (prq->eval)
2912     {
2913     case GNUNET_BLOCK_EVALUATION_OK_MORE:
2914       break;
2915     case GNUNET_BLOCK_EVALUATION_OK_LAST:
2916       while (NULL != pr->pending_head)
2917         destroy_pending_message_list_entry (pr->pending_head);
2918       if (pr->qe != NULL)
2919         {
2920           if (pr->client_request_list != NULL)
2921             GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
2922                                         GNUNET_YES);
2923           GNUNET_DATASTORE_cancel (pr->qe);
2924           pr->qe = NULL;
2925         }
2926       pr->do_remove = GNUNET_YES;
2927       if (pr->task != GNUNET_SCHEDULER_NO_TASK)
2928         {
2929           GNUNET_SCHEDULER_cancel (sched,
2930                                    pr->task);
2931           pr->task = GNUNET_SCHEDULER_NO_TASK;
2932         }
2933       GNUNET_break (GNUNET_YES ==
2934                     GNUNET_CONTAINER_multihashmap_remove (query_request_map,
2935                                                           key,
2936                                                           pr));
2937       break;
2938     case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
2939       GNUNET_STATISTICS_update (stats,
2940                                 gettext_noop ("# duplicate replies discarded (bloomfilter)"),
2941                                 1,
2942                                 GNUNET_NO);
2943 #if DEBUG_FS
2944       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2945                   "Duplicate response `%s', discarding.\n",
2946                   GNUNET_h2s (&mhash));
2947 #endif
2948       return GNUNET_YES; /* duplicate */
2949     case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
2950       return GNUNET_YES; /* wrong namespace */  
2951     case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
2952       GNUNET_break (0);
2953       return GNUNET_YES;
2954     case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
2955       GNUNET_break (0);
2956       return GNUNET_YES;
2957     case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
2958       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2959                   _("Unsupported block type %u\n"),
2960                   prq->type);
2961       return GNUNET_NO;
2962     }
2963   if (pr->client_request_list != NULL)
2964     {
2965       if (pr->replies_seen_size == pr->replies_seen_off)
2966         GNUNET_array_grow (pr->replies_seen,
2967                            pr->replies_seen_size,
2968                            pr->replies_seen_size * 2 + 4);      
2969       GNUNET_CRYPTO_hash (prq->data,
2970                           prq->size,
2971                           &pr->replies_seen[pr->replies_seen_off++]);         
2972       refresh_bloomfilter (pr);
2973     }
2974   if (NULL == prq->sender)
2975     {
2976 #if DEBUG_FS
2977       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2978                   "Found result for query `%s' in local datastore\n",
2979                   GNUNET_h2s (key));
2980 #endif
2981       GNUNET_STATISTICS_update (stats,
2982                                 gettext_noop ("# results found locally"),
2983                                 1,
2984                                 GNUNET_NO);      
2985     }
2986   prq->priority += pr->remaining_priority;
2987   pr->remaining_priority = 0;
2988   pr->results_found++;
2989   prq->request_found = GNUNET_YES;
2990   if (NULL != pr->client_request_list)
2991     {
2992       GNUNET_STATISTICS_update (stats,
2993                                 gettext_noop ("# replies received for local clients"),
2994                                 1,
2995                                 GNUNET_NO);
2996       cl = pr->client_request_list->client_list;
2997       msize = sizeof (struct PutMessage) + prq->size;
2998       creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage));
2999       creply->msize = msize;
3000       creply->client_list = cl;
3001       GNUNET_CONTAINER_DLL_insert_after (cl->res_head,
3002                                          cl->res_tail,
3003                                          cl->res_tail,
3004                                          creply);      
3005       pm = (struct PutMessage*) &creply[1];
3006       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3007       pm->header.size = htons (msize);
3008       pm->type = htonl (prq->type);
3009       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3010       memcpy (&pm[1], prq->data, prq->size);      
3011       if (NULL == cl->th)
3012         {
3013 #if DEBUG_FS
3014           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3015                       "Transmitting result for query `%s' to client\n",
3016                       GNUNET_h2s (key));
3017 #endif  
3018           cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
3019                                                         msize,
3020                                                         GNUNET_TIME_UNIT_FOREVER_REL,
3021                                                         &transmit_to_client,
3022                                                         cl);
3023         }
3024       GNUNET_break (cl->th != NULL);
3025       if (pr->do_remove)                
3026         {
3027           prq->finished = GNUNET_YES;
3028           destroy_pending_request (pr);         
3029         }
3030     }
3031   else
3032     {
3033       cp = pr->cp;
3034 #if DEBUG_FS
3035       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3036                   "Transmitting result for query `%s' to other peer (PID=%u)\n",
3037                   GNUNET_h2s (key),
3038                   (unsigned int) cp->pid);
3039 #endif  
3040       GNUNET_STATISTICS_update (stats,
3041                                 gettext_noop ("# replies received for other peers"),
3042                                 1,
3043                                 GNUNET_NO);
3044       msize = sizeof (struct PutMessage) + prq->size;
3045       reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
3046       reply->cont = &transmit_reply_continuation;
3047       reply->cont_cls = pr;
3048       reply->msize = msize;
3049       reply->priority = UINT32_MAX; /* send replies first! */
3050       pm = (struct PutMessage*) &reply[1];
3051       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3052       pm->header.size = htons (msize);
3053       pm->type = htonl (prq->type);
3054       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3055       memcpy (&pm[1], prq->data, prq->size);
3056       add_to_pending_messages_for_peer (cp, reply, pr);
3057     }
3058   return GNUNET_YES;
3059 }
3060
3061
3062 /**
3063  * Iterator called on each result obtained for a DHT
3064  * operation that expects a reply
3065  *
3066  * @param cls closure
3067  * @param exp when will this value expire
3068  * @param key key of the result
3069  * @param get_path NULL-terminated array of pointers
3070  *                 to the peers on reverse GET path (or NULL if not recorded)
3071  * @param put_path NULL-terminated array of pointers
3072  *                 to the peers on the PUT path (or NULL if not recorded)
3073  * @param type type of the result
3074  * @param size number of bytes in data
3075  * @param data pointer to the result data
3076  */
3077 static void
3078 process_dht_reply (void *cls,
3079                    struct GNUNET_TIME_Absolute exp,
3080                    const GNUNET_HashCode * key,
3081                    const struct GNUNET_PeerIdentity * const *get_path,
3082                    const struct GNUNET_PeerIdentity * const *put_path,
3083                    enum GNUNET_BLOCK_Type type,
3084                    size_t size,
3085                    const void *data)
3086 {
3087   struct PendingRequest *pr = cls;
3088   struct ProcessReplyClosure prq;
3089
3090   memset (&prq, 0, sizeof (prq));
3091   prq.data = data;
3092   prq.expiration = exp;
3093   prq.size = size;  
3094   prq.type = type;
3095   process_reply (&prq, key, pr);
3096 }
3097
3098
3099
3100 /**
3101  * Continuation called to notify client about result of the
3102  * operation.
3103  *
3104  * @param cls closure
3105  * @param success GNUNET_SYSERR on failure
3106  * @param msg NULL on success, otherwise an error message
3107  */
3108 static void 
3109 put_migration_continuation (void *cls,
3110                             int success,
3111                             const char *msg)
3112 {
3113   struct GNUNET_TIME_Absolute *start = cls;
3114   struct GNUNET_TIME_Relative delay;
3115   
3116   delay = GNUNET_TIME_absolute_get_duration (*start);
3117   GNUNET_free (start);
3118   GNUNET_LOAD_update (datastore_put_load,
3119                       delay.value);
3120   if (GNUNET_OK == success)
3121     return;
3122   GNUNET_STATISTICS_update (stats,
3123                             gettext_noop ("# datastore 'put' failures"),
3124                             1,
3125                             GNUNET_NO);
3126 }
3127
3128
3129 /**
3130  * Handle P2P "PUT" message.
3131  *
3132  * @param cls closure, always NULL
3133  * @param other the other peer involved (sender or receiver, NULL
3134  *        for loopback messages where we are both sender and receiver)
3135  * @param message the actual message
3136  * @param latency reported latency of the connection with 'other'
3137  * @param distance reported distance (DV) to 'other' 
3138  * @return GNUNET_OK to keep the connection open,
3139  *         GNUNET_SYSERR to close it (signal serious error)
3140  */
3141 static int
3142 handle_p2p_put (void *cls,
3143                 const struct GNUNET_PeerIdentity *other,
3144                 const struct GNUNET_MessageHeader *message,
3145                 struct GNUNET_TIME_Relative latency,
3146                 uint32_t distance)
3147 {
3148   const struct PutMessage *put;
3149   uint16_t msize;
3150   size_t dsize;
3151   enum GNUNET_BLOCK_Type type;
3152   struct GNUNET_TIME_Absolute expiration;
3153   GNUNET_HashCode query;
3154   struct ProcessReplyClosure prq;
3155   struct GNUNET_TIME_Absolute *start;
3156   struct GNUNET_TIME_Relative block_time;  
3157   double putl;
3158   struct ConnectedPeer *cp; 
3159   struct PendingMessage *pm;
3160   struct MigrationStopMessage *msm;
3161
3162   msize = ntohs (message->size);
3163   if (msize < sizeof (struct PutMessage))
3164     {
3165       GNUNET_break_op(0);
3166       return GNUNET_SYSERR;
3167     }
3168   put = (const struct PutMessage*) message;
3169   dsize = msize - sizeof (struct PutMessage);
3170   type = ntohl (put->type);
3171   expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
3172
3173   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3174     return GNUNET_SYSERR;
3175   if (GNUNET_OK !=
3176       GNUNET_BLOCK_get_key (block_ctx,
3177                             type,
3178                             &put[1],
3179                             dsize,
3180                             &query))
3181     {
3182       GNUNET_break_op (0);
3183       return GNUNET_SYSERR;
3184     }
3185 #if DEBUG_FS
3186   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3187               "Received result for query `%s' from peer `%4s'\n",
3188               GNUNET_h2s (&query),
3189               GNUNET_i2s (other));
3190 #endif
3191   GNUNET_STATISTICS_update (stats,
3192                             gettext_noop ("# replies received (overall)"),
3193                             1,
3194                             GNUNET_NO);
3195   /* now, lookup 'query' */
3196   prq.data = (const void*) &put[1];
3197   if (other != NULL)
3198     prq.sender = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3199                                                     &other->hashPubKey);
3200   else
3201     prq.sender = NULL;
3202   prq.size = dsize;
3203   prq.type = type;
3204   prq.expiration = expiration;
3205   prq.priority = 0;
3206   prq.finished = GNUNET_NO;
3207   prq.request_found = GNUNET_NO;
3208   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3209                                               &query,
3210                                               &process_reply,
3211                                               &prq);
3212   if (prq.sender != NULL)
3213     {
3214       prq.sender->inc_preference += CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority;
3215       prq.sender->trust += prq.priority;
3216     }
3217   if (GNUNET_YES == active_migration)
3218     {
3219 #if DEBUG_FS
3220       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3221                   "Replicating result for query `%s' with priority %u\n",
3222                   GNUNET_h2s (&query),
3223                   prq.priority);
3224 #endif
3225       start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
3226       *start = GNUNET_TIME_absolute_get ();
3227       GNUNET_DATASTORE_put (dsh,
3228                             0, &query, dsize, &put[1],
3229                             type, prq.priority, 1 /* anonymity */, 
3230                             expiration, 
3231                             1 + prq.priority, MAX_DATASTORE_QUEUE,
3232                             GNUNET_CONSTANTS_SERVICE_TIMEOUT,
3233                             &put_migration_continuation, 
3234                             start);
3235     }
3236   putl = GNUNET_LOAD_get_load (datastore_put_load);
3237   if ( (GNUNET_NO == prq.request_found) &&
3238        ( (GNUNET_YES != active_migration) ||
3239          (putl > 2.0) ) )
3240     {
3241       cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3242                                               &other->hashPubKey);
3243       if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).value < 5000)
3244         return GNUNET_OK; /* already blocked */
3245       /* We're too busy; send MigrationStop message! */
3246       if (GNUNET_YES != active_migration) 
3247         putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
3248       block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
3249                                                   5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3250                                                                                    (unsigned int) (60000 * putl * putl)));
3251       
3252       cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
3253       pm = GNUNET_malloc (sizeof (struct PendingMessage) + 
3254                           sizeof (struct MigrationStopMessage));
3255       pm->msize = sizeof (struct MigrationStopMessage);
3256       pm->priority = UINT32_MAX;
3257       msm = (struct MigrationStopMessage*) &pm[1];
3258       msm->header.size = htons (sizeof (struct MigrationStopMessage));
3259       msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
3260       msm->duration = GNUNET_TIME_relative_hton (block_time);
3261       add_to_pending_messages_for_peer (cp,
3262                                         pm,
3263                                         NULL);
3264     }
3265   return GNUNET_OK;
3266 }
3267
3268
3269 /**
3270  * Handle P2P "MIGRATION_STOP" message.
3271  *
3272  * @param cls closure, always NULL
3273  * @param other the other peer involved (sender or receiver, NULL
3274  *        for loopback messages where we are both sender and receiver)
3275  * @param message the actual message
3276  * @param latency reported latency of the connection with 'other'
3277  * @param distance reported distance (DV) to 'other' 
3278  * @return GNUNET_OK to keep the connection open,
3279  *         GNUNET_SYSERR to close it (signal serious error)
3280  */
3281 static int
3282 handle_p2p_migration_stop (void *cls,
3283                            const struct GNUNET_PeerIdentity *other,
3284                            const struct GNUNET_MessageHeader *message,
3285                            struct GNUNET_TIME_Relative latency,
3286                            uint32_t distance)
3287 {
3288   struct ConnectedPeer *cp; 
3289   const struct MigrationStopMessage *msm;
3290
3291   msm = (const struct MigrationStopMessage*) message;
3292   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3293                                           &other->hashPubKey);
3294   if (cp == NULL)
3295     {
3296       GNUNET_break (0);
3297       return GNUNET_OK;
3298     }
3299   cp->migration_blocked = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration));
3300   return GNUNET_OK;
3301 }
3302
3303
3304
3305 /* **************************** P2P GET Handling ************************ */
3306
3307
3308 /**
3309  * Closure for 'check_duplicate_request_{peer,client}'.
3310  */
3311 struct CheckDuplicateRequestClosure
3312 {
3313   /**
3314    * The new request we should check if it already exists.
3315    */
3316   const struct PendingRequest *pr;
3317
3318   /**
3319    * Existing request found by the checker, NULL if none.
3320    */
3321   struct PendingRequest *have;
3322 };
3323
3324
3325 /**
3326  * Iterator over entries in the 'query_request_map' that
3327  * tries to see if we have the same request pending from
3328  * the same client already.
3329  *
3330  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
3331  * @param key current key code (query, ignored, must match)
3332  * @param value value in the hash map (a 'struct PendingRequest' 
3333  *              that already exists)
3334  * @return GNUNET_YES if we should continue to
3335  *         iterate (no match yet)
3336  *         GNUNET_NO if not (match found).
3337  */
3338 static int
3339 check_duplicate_request_client (void *cls,
3340                                 const GNUNET_HashCode * key,
3341                                 void *value)
3342 {
3343   struct CheckDuplicateRequestClosure *cdc = cls;
3344   struct PendingRequest *have = value;
3345
3346   if (have->client_request_list == NULL)
3347     return GNUNET_YES;
3348   if ( (cdc->pr->client_request_list->client_list->client == have->client_request_list->client_list->client) &&
3349        (cdc->pr != have) )
3350     {
3351       cdc->have = have;
3352       return GNUNET_NO;
3353     }
3354   return GNUNET_YES;
3355 }
3356
3357
3358 /**
3359  * We're processing (local) results for a search request
3360  * from another peer.  Pass applicable results to the
3361  * peer and if we are done either clean up (operation
3362  * complete) or forward to other peers (more results possible).
3363  *
3364  * @param cls our closure (struct LocalGetContext)
3365  * @param key key for the content
3366  * @param size number of bytes in data
3367  * @param data content stored
3368  * @param type type of the content
3369  * @param priority priority of the content
3370  * @param anonymity anonymity-level for the content
3371  * @param expiration expiration time for the content
3372  * @param uid unique identifier for the datum;
3373  *        maybe 0 if no unique identifier is available
3374  */
3375 static void
3376 process_local_reply (void *cls,
3377                      const GNUNET_HashCode * key,
3378                      size_t size,
3379                      const void *data,
3380                      enum GNUNET_BLOCK_Type type,
3381                      uint32_t priority,
3382                      uint32_t anonymity,
3383                      struct GNUNET_TIME_Absolute
3384                      expiration, 
3385                      uint64_t uid)
3386 {
3387   struct PendingRequest *pr = cls;
3388   struct ProcessReplyClosure prq;
3389   struct CheckDuplicateRequestClosure cdrc;
3390   GNUNET_HashCode query;
3391   unsigned int old_rf;
3392   
3393   if (NULL == key)
3394     {
3395 #if DEBUG_FS > 1
3396       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3397                   "Done processing local replies, forwarding request to other peers.\n");
3398 #endif
3399       pr->qe = NULL;
3400       if (pr->client_request_list != NULL)
3401         {
3402           GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
3403                                       GNUNET_YES);
3404           /* Figure out if this is a duplicate request and possibly
3405              merge 'struct PendingRequest' entries */
3406           cdrc.have = NULL;
3407           cdrc.pr = pr;
3408           GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3409                                                       &pr->query,
3410                                                       &check_duplicate_request_client,
3411                                                       &cdrc);
3412           if (cdrc.have != NULL)
3413             {
3414 #if DEBUG_FS
3415               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3416                           "Received request for block `%s' twice from client, will only request once.\n",
3417                           GNUNET_h2s (&pr->query));
3418 #endif
3419               
3420               destroy_pending_request (pr);
3421               return;
3422             }
3423         }
3424
3425       /* no more results */
3426       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
3427         pr->task = GNUNET_SCHEDULER_add_now (sched,
3428                                              &forward_request_task,
3429                                              pr);      
3430       return;
3431     }
3432 #if DEBUG_FS
3433   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3434               "New local response to `%s' of type %u.\n",
3435               GNUNET_h2s (key),
3436               type);
3437 #endif
3438   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3439     {
3440 #if DEBUG_FS
3441       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3442                   "Found ONDEMAND block, performing on-demand encoding\n");
3443 #endif
3444       GNUNET_STATISTICS_update (stats,
3445                                 gettext_noop ("# on-demand blocks matched requests"),
3446                                 1,
3447                                 GNUNET_NO);
3448       if (GNUNET_OK != 
3449           GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, 
3450                                             anonymity, expiration, uid, 
3451                                             &process_local_reply,
3452                                             pr))
3453       if (pr->qe != NULL)
3454         {
3455           GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3456         }
3457       return;
3458     }
3459   old_rf = pr->results_found;
3460   memset (&prq, 0, sizeof (prq));
3461   prq.data = data;
3462   prq.expiration = expiration;
3463   prq.size = size;  
3464   if (GNUNET_OK != 
3465       GNUNET_BLOCK_get_key (block_ctx,
3466                             type,
3467                             data,
3468                             size,
3469                             &query))
3470     {
3471       GNUNET_break (0);
3472       GNUNET_DATASTORE_remove (dsh,
3473                                key,
3474                                size, data,
3475                                -1, -1, 
3476                                GNUNET_TIME_UNIT_FOREVER_REL,
3477                                NULL, NULL);
3478       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3479       return;
3480     }
3481   prq.type = type;
3482   prq.priority = priority;  
3483   prq.finished = GNUNET_NO;
3484   prq.request_found = GNUNET_NO;
3485   process_reply (&prq, key, pr);
3486   if ( (old_rf == 0) &&
3487        (pr->results_found == 1) )
3488     update_datastore_delays (pr->start_time);
3489   if (prq.finished == GNUNET_YES)
3490     return;
3491   if (pr->qe == NULL)
3492     return; /* done here */
3493   if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
3494     {
3495       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3496       return;
3497     }
3498   if ( (pr->client_request_list == NULL) &&
3499        ( (GNUNET_YES == test_load_too_high()) ||
3500          (pr->results_found > 5 + 2 * pr->priority) ) )
3501     {
3502 #if DEBUG_FS > 2
3503       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3504                   "Load too high, done with request\n");
3505 #endif
3506       GNUNET_STATISTICS_update (stats,
3507                                 gettext_noop ("# processing result set cut short due to load"),
3508                                 1,
3509                                 GNUNET_NO);
3510       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3511       return;
3512     }
3513   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3514 }
3515
3516
3517 /**
3518  * We've received a request with the specified priority.  Bound it
3519  * according to how much we trust the given peer.
3520  * 
3521  * @param prio_in requested priority
3522  * @param cp the peer making the request
3523  * @return effective priority
3524  */
3525 static uint32_t
3526 bound_priority (uint32_t prio_in,
3527                 struct ConnectedPeer *cp)
3528 {
3529 #define N ((double)128.0)
3530   uint32_t ret;
3531   double rret;
3532   int ld;
3533
3534   ld = test_load_too_high ();
3535   if (ld == GNUNET_SYSERR)
3536     return 0; /* excess resources */
3537   ret = change_host_trust (cp, prio_in);
3538   if (ret > 0)
3539     {
3540       if (ret > current_priorities + N)
3541         rret = current_priorities + N;
3542       else
3543         rret = ret;
3544       current_priorities 
3545         = (current_priorities * (N-1) + rret)/N;
3546     }
3547 #undef N
3548   return ret;
3549 }
3550
3551
3552 /**
3553  * Iterator over entries in the 'query_request_map' that
3554  * tries to see if we have the same request pending from
3555  * the same peer already.
3556  *
3557  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
3558  * @param key current key code (query, ignored, must match)
3559  * @param value value in the hash map (a 'struct PendingRequest' 
3560  *              that already exists)
3561  * @return GNUNET_YES if we should continue to
3562  *         iterate (no match yet)
3563  *         GNUNET_NO if not (match found).
3564  */
3565 static int
3566 check_duplicate_request_peer (void *cls,
3567                               const GNUNET_HashCode * key,
3568                               void *value)
3569 {
3570   struct CheckDuplicateRequestClosure *cdc = cls;
3571   struct PendingRequest *have = value;
3572
3573   if (cdc->pr->target_pid == have->target_pid)
3574     {
3575       cdc->have = have;
3576       return GNUNET_NO;
3577     }
3578   return GNUNET_YES;
3579 }
3580
3581
3582 /**
3583  * Handle P2P "GET" request.
3584  *
3585  * @param cls closure, always NULL
3586  * @param other the other peer involved (sender or receiver, NULL
3587  *        for loopback messages where we are both sender and receiver)
3588  * @param message the actual message
3589  * @param latency reported latency of the connection with 'other'
3590  * @param distance reported distance (DV) to 'other' 
3591  * @return GNUNET_OK to keep the connection open,
3592  *         GNUNET_SYSERR to close it (signal serious error)
3593  */
3594 static int
3595 handle_p2p_get (void *cls,
3596                 const struct GNUNET_PeerIdentity *other,
3597                 const struct GNUNET_MessageHeader *message,
3598                 struct GNUNET_TIME_Relative latency,
3599                 uint32_t distance)
3600 {
3601   struct PendingRequest *pr;
3602   struct ConnectedPeer *cp;
3603   struct ConnectedPeer *cps;
3604   struct CheckDuplicateRequestClosure cdc;
3605   struct GNUNET_TIME_Relative timeout;
3606   uint16_t msize;
3607   const struct GetMessage *gm;
3608   unsigned int bits;
3609   const GNUNET_HashCode *opt;
3610   uint32_t bm;
3611   size_t bfsize;
3612   uint32_t ttl_decrement;
3613   enum GNUNET_BLOCK_Type type;
3614   int have_ns;
3615   int ld;
3616
3617   msize = ntohs(message->size);
3618   if (msize < sizeof (struct GetMessage))
3619     {
3620       GNUNET_break_op (0);
3621       return GNUNET_SYSERR;
3622     }
3623   gm = (const struct GetMessage*) message;
3624   type = ntohl (gm->type);
3625   bm = ntohl (gm->hash_bitmap);
3626   bits = 0;
3627   while (bm > 0)
3628     {
3629       if (1 == (bm & 1))
3630         bits++;
3631       bm >>= 1;
3632     }
3633   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
3634     {
3635       GNUNET_break_op (0);
3636       return GNUNET_SYSERR;
3637     }  
3638   opt = (const GNUNET_HashCode*) &gm[1];
3639   bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
3640   bm = ntohl (gm->hash_bitmap);
3641   bits = 0;
3642   cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3643                                            &other->hashPubKey);
3644   if (NULL == cps)
3645     {
3646       /* peer must have just disconnected */
3647       GNUNET_STATISTICS_update (stats,
3648                                 gettext_noop ("# requests dropped due to initiator not being connected"),
3649                                 1,
3650                                 GNUNET_NO);
3651       return GNUNET_SYSERR;
3652     }
3653   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
3654     cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3655                                             &opt[bits++]);
3656   else
3657     cp = cps;
3658   if (cp == NULL)
3659     {
3660 #if DEBUG_FS
3661       if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
3662         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3663                     "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
3664                     GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
3665       
3666       else
3667         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3668                     "Failed to find peer `%4s' in connection set. Dropping query.\n",
3669                     GNUNET_i2s (other));
3670 #endif
3671       GNUNET_STATISTICS_update (stats,
3672                                 gettext_noop ("# requests dropped due to missing reverse route"),
3673                                 1,
3674                                 GNUNET_NO);
3675      /* FIXME: try connect? */
3676       return GNUNET_OK;
3677     }
3678   /* note that we can really only check load here since otherwise
3679      peers could find out that we are overloaded by not being
3680      disconnected after sending us a malformed query... */
3681
3682   /* FIXME: query priority should play
3683      a major role here! */
3684   ld = test_load_too_high ();
3685   if (GNUNET_YES == ld)
3686     {
3687 #if DEBUG_FS
3688       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3689                   "Dropping query from `%s', this peer is too busy.\n",
3690                   GNUNET_i2s (other));
3691 #endif
3692       GNUNET_STATISTICS_update (stats,
3693                                 gettext_noop ("# requests dropped due to high load"),
3694                                 1,
3695                                 GNUNET_NO);
3696       return GNUNET_OK;
3697     }
3698   /* FIXME: if ld == GNUNET_NO, forward
3699      instead of indirecting! */
3700
3701 #if DEBUG_FS 
3702   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3703               "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
3704               GNUNET_h2s (&gm->query),
3705               (unsigned int) type,
3706               GNUNET_i2s (other),
3707               (unsigned int) bm);
3708 #endif
3709   have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
3710   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
3711                       (have_ns ? sizeof(GNUNET_HashCode) : 0));
3712   if (have_ns)
3713     {
3714       pr->namespace = (GNUNET_HashCode*) &pr[1];
3715       memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
3716     }
3717   pr->type = type;
3718   pr->mingle = ntohl (gm->filter_mutator);
3719   if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
3720     pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
3721   pr->anonymity_level = 1;
3722   pr->priority = bound_priority (ntohl (gm->priority), cps);
3723   pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
3724   pr->query = gm->query;
3725   /* decrement ttl (always) */
3726   ttl_decrement = 2 * TTL_DECREMENT +
3727     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3728                               TTL_DECREMENT);
3729   if ( (pr->ttl < 0) &&
3730        (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
3731     {
3732 #if DEBUG_FS
3733       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3734                   "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
3735                   GNUNET_i2s (other),
3736                   pr->ttl,
3737                   ttl_decrement);
3738 #endif
3739       GNUNET_STATISTICS_update (stats,
3740                                 gettext_noop ("# requests dropped due TTL underflow"),
3741                                 1,
3742                                 GNUNET_NO);
3743       /* integer underflow => drop (should be very rare)! */      
3744       GNUNET_free (pr);
3745       return GNUNET_OK;
3746     } 
3747   pr->ttl -= ttl_decrement;
3748   pr->start_time = GNUNET_TIME_absolute_get ();
3749
3750   /* get bloom filter */
3751   if (bfsize > 0)
3752     {
3753       pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
3754                                                   bfsize,
3755                                                   BLOOMFILTER_K);
3756       pr->bf_size = bfsize;
3757     }
3758
3759   cdc.have = NULL;
3760   cdc.pr = pr;
3761   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3762                                               &gm->query,
3763                                               &check_duplicate_request_peer,
3764                                               &cdc);
3765   if (cdc.have != NULL)
3766     {
3767       if (cdc.have->start_time.value + cdc.have->ttl >=
3768           pr->start_time.value + pr->ttl)
3769         {
3770           /* existing request has higher TTL, drop new one! */
3771           cdc.have->priority += pr->priority;
3772           destroy_pending_request (pr);
3773 #if DEBUG_FS
3774           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3775                       "Have existing request with higher TTL, dropping new request.\n",
3776                       GNUNET_i2s (other));
3777 #endif
3778           GNUNET_STATISTICS_update (stats,
3779                                     gettext_noop ("# requests dropped due to higher-TTL request"),
3780                                     1,
3781                                     GNUNET_NO);
3782           return GNUNET_OK;
3783         }
3784       else
3785         {
3786           /* existing request has lower TTL, drop old one! */
3787           pr->priority += cdc.have->priority;
3788           /* Possible optimization: if we have applicable pending
3789              replies in 'cdc.have', we might want to move those over
3790              (this is a really rare special-case, so it is not clear
3791              that this would be worth it) */
3792           destroy_pending_request (cdc.have);
3793           /* keep processing 'pr'! */
3794         }
3795     }
3796
3797   pr->cp = cp;
3798   GNUNET_break (GNUNET_OK ==
3799                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
3800                                                    &gm->query,
3801                                                    pr,
3802                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
3803   GNUNET_break (GNUNET_OK ==
3804                 GNUNET_CONTAINER_multihashmap_put (peer_request_map,
3805                                                    &other->hashPubKey,
3806                                                    pr,
3807                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
3808   
3809   pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
3810                                             pr,
3811                                             pr->start_time.value + pr->ttl);
3812
3813   GNUNET_STATISTICS_update (stats,
3814                             gettext_noop ("# P2P searches received"),
3815                             1,
3816                             GNUNET_NO);
3817   GNUNET_STATISTICS_update (stats,
3818                             gettext_noop ("# P2P searches active"),
3819                             1,
3820                             GNUNET_NO);
3821
3822   /* calculate change in traffic preference */
3823   cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
3824   /* process locally */
3825   if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
3826     type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
3827   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
3828                                            (pr->priority + 1)); 
3829   pr->qe = GNUNET_DATASTORE_get (dsh,
3830                                  &gm->query,
3831                                  type,                         
3832                                  pr->priority + 1,
3833                                  MAX_DATASTORE_QUEUE,                            
3834                                  timeout,
3835                                  &process_local_reply,
3836                                  pr);
3837
3838   /* Are multiple results possible?  If so, start processing remotely now! */
3839   switch (pr->type)
3840     {
3841     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
3842     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
3843       /* only one result, wait for datastore */
3844       break;
3845     default:
3846       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
3847         pr->task = GNUNET_SCHEDULER_add_now (sched,
3848                                              &forward_request_task,
3849                                              pr);
3850     }
3851
3852   /* make sure we don't track too many requests */
3853   if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
3854     {
3855       pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
3856       GNUNET_assert (pr != NULL);
3857       destroy_pending_request (pr);
3858     }
3859   return GNUNET_OK;
3860 }
3861
3862
3863 /* **************************** CS GET Handling ************************ */
3864
3865
3866 /**
3867  * Handle START_SEARCH-message (search request from client).
3868  *
3869  * @param cls closure
3870  * @param client identification of the client
3871  * @param message the actual message
3872  */
3873 static void
3874 handle_start_search (void *cls,
3875                      struct GNUNET_SERVER_Client *client,
3876                      const struct GNUNET_MessageHeader *message)
3877 {
3878   static GNUNET_HashCode all_zeros;
3879   const struct SearchMessage *sm;
3880   struct ClientList *cl;
3881   struct ClientRequestList *crl;
3882   struct PendingRequest *pr;
3883   uint16_t msize;
3884   unsigned int sc;
3885   enum GNUNET_BLOCK_Type type;
3886
3887   msize = ntohs (message->size);
3888   if ( (msize < sizeof (struct SearchMessage)) ||
3889        (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
3890     {
3891       GNUNET_break (0);
3892       GNUNET_SERVER_receive_done (client,
3893                                   GNUNET_SYSERR);
3894       return;
3895     }
3896   GNUNET_STATISTICS_update (stats,
3897                             gettext_noop ("# client searches received"),
3898                             1,
3899                             GNUNET_NO);
3900   sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
3901   sm = (const struct SearchMessage*) message;
3902   type = ntohl (sm->type);
3903 #if DEBUG_FS
3904   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3905               "Received request for `%s' of type %u from local client\n",
3906               GNUNET_h2s (&sm->query),
3907               (unsigned int) type);
3908 #endif
3909   cl = client_list;
3910   while ( (cl != NULL) &&
3911           (cl->client != client) )
3912     cl = cl->next;
3913   if (cl == NULL)
3914     {
3915       cl = GNUNET_malloc (sizeof (struct ClientList));
3916       cl->client = client;
3917       GNUNET_SERVER_client_keep (client);
3918       cl->next = client_list;
3919       client_list = cl;
3920     }
3921   /* detect duplicate KBLOCK requests */
3922   if ( (type == GNUNET_BLOCK_TYPE_FS_KBLOCK) ||
3923        (type == GNUNET_BLOCK_TYPE_FS_NBLOCK) ||
3924        (type == GNUNET_BLOCK_TYPE_ANY) )
3925     {
3926       crl = cl->rl_head;
3927       while ( (crl != NULL) &&
3928               ( (0 != memcmp (&crl->req->query,
3929                               &sm->query,
3930                               sizeof (GNUNET_HashCode))) ||
3931                 (crl->req->type != type) ) )
3932         crl = crl->next;
3933       if (crl != NULL)  
3934         { 
3935 #if DEBUG_FS
3936           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3937                       "Have existing request, merging content-seen lists.\n");
3938 #endif
3939           pr = crl->req;
3940           /* Duplicate request (used to send long list of
3941              known/blocked results); merge 'pr->replies_seen'
3942              and update bloom filter */
3943           GNUNET_array_grow (pr->replies_seen,
3944                              pr->replies_seen_size,
3945                              pr->replies_seen_off + sc);
3946           memcpy (&pr->replies_seen[pr->replies_seen_off],
3947                   &sm[1],
3948                   sc * sizeof (GNUNET_HashCode));
3949           pr->replies_seen_off += sc;
3950           refresh_bloomfilter (pr);
3951           GNUNET_STATISTICS_update (stats,
3952                                     gettext_noop ("# client searches updated (merged content seen list)"),
3953                                     1,
3954                                     GNUNET_NO);
3955           GNUNET_SERVER_receive_done (client,
3956                                       GNUNET_OK);
3957           return;
3958         }
3959     }
3960   GNUNET_STATISTICS_update (stats,
3961                             gettext_noop ("# client searches active"),
3962                             1,
3963                             GNUNET_NO);
3964   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
3965                       ((type == GNUNET_BLOCK_TYPE_FS_SBLOCK) ? sizeof(GNUNET_HashCode) : 0));
3966   crl = GNUNET_malloc (sizeof (struct ClientRequestList));
3967   memset (crl, 0, sizeof (struct ClientRequestList));
3968   crl->client_list = cl;
3969   GNUNET_CONTAINER_DLL_insert (cl->rl_head,
3970                                cl->rl_tail,
3971                                crl);  
3972   crl->req = pr;
3973   pr->type = type;
3974   pr->client_request_list = crl;
3975   GNUNET_array_grow (pr->replies_seen,
3976                      pr->replies_seen_size,
3977                      sc);
3978   memcpy (pr->replies_seen,
3979           &sm[1],
3980           sc * sizeof (GNUNET_HashCode));
3981   pr->replies_seen_off = sc;
3982   pr->anonymity_level = ntohl (sm->anonymity_level); 
3983   pr->start_time = GNUNET_TIME_absolute_get ();
3984   refresh_bloomfilter (pr);
3985   pr->query = sm->query;
3986   if (0 == (1 & ntohl (sm->options)))
3987     pr->local_only = GNUNET_NO;
3988   else
3989     pr->local_only = GNUNET_YES;
3990   switch (type)
3991     {
3992     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
3993     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
3994       if (0 != memcmp (&sm->target,
3995                        &all_zeros,
3996                        sizeof (GNUNET_HashCode)))
3997         pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target);
3998       break;
3999     case GNUNET_BLOCK_TYPE_FS_SBLOCK:
4000       pr->namespace = (GNUNET_HashCode*) &pr[1];
4001       memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode));
4002       break;
4003     default:
4004       break;
4005     }
4006   GNUNET_break (GNUNET_OK ==
4007                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
4008                                                    &sm->query,
4009                                                    pr,
4010                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4011   if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
4012     type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */
4013   pr->qe = GNUNET_DATASTORE_get (dsh,
4014                                  &sm->query,
4015                                  type,
4016                                  -3, -1,
4017                                  GNUNET_CONSTANTS_SERVICE_TIMEOUT,                             
4018                                  &process_local_reply,
4019                                  pr);
4020 }
4021
4022
4023 /* **************************** Startup ************************ */
4024
4025 /**
4026  * Process fs requests.
4027  *
4028  * @param s scheduler to use
4029  * @param server the initialized server
4030  * @param c configuration to use
4031  */
4032 static int
4033 main_init (struct GNUNET_SCHEDULER_Handle *s,
4034            struct GNUNET_SERVER_Handle *server,
4035            const struct GNUNET_CONFIGURATION_Handle *c)
4036 {
4037   static const struct GNUNET_CORE_MessageHandler p2p_handlers[] =
4038     {
4039       { &handle_p2p_get, 
4040         GNUNET_MESSAGE_TYPE_FS_GET, 0 },
4041       { &handle_p2p_put, 
4042         GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
4043       { &handle_p2p_migration_stop, 
4044         GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP,
4045         sizeof (struct MigrationStopMessage) },
4046       { NULL, 0, 0 }
4047     };
4048   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
4049     {&GNUNET_FS_handle_index_start, NULL, 
4050      GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
4051     {&GNUNET_FS_handle_index_list_get, NULL, 
4052      GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
4053     {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
4054      sizeof (struct UnindexMessage) },
4055     {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
4056      0 },
4057     {NULL, NULL, 0, 0}
4058   };
4059   unsigned long long enc = 128;
4060
4061   sched = s;
4062   cfg = c;
4063   stats = GNUNET_STATISTICS_create (sched, "fs", cfg);
4064   min_migration_delay = GNUNET_TIME_UNIT_SECONDS;
4065   if ( (GNUNET_OK !=
4066         GNUNET_CONFIGURATION_get_value_number (cfg,
4067                                                "fs",
4068                                                "MAX_PENDING_REQUESTS",
4069                                                &max_pending_requests)) ||
4070        (GNUNET_OK !=
4071         GNUNET_CONFIGURATION_get_value_number (cfg,
4072                                                "fs",
4073                                                "EXPECTED_NEIGHBOUR_COUNT",
4074                                                &enc)) ||
4075        (GNUNET_OK != 
4076         GNUNET_CONFIGURATION_get_value_time (cfg,
4077                                              "fs",
4078                                              "MIN_MIGRATION_DELAY",
4079                                              &min_migration_delay)) )
4080     {
4081       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4082                   _("Configuration fails to specify certain parameters, assuming default values."));
4083     }
4084   connected_peers = GNUNET_CONTAINER_multihashmap_create (enc); 
4085   query_request_map = GNUNET_CONTAINER_multihashmap_create (max_pending_requests);
4086   peer_request_map = GNUNET_CONTAINER_multihashmap_create (enc);
4087   requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
4088   core = GNUNET_CORE_connect (sched,
4089                               cfg,
4090                               GNUNET_TIME_UNIT_FOREVER_REL,
4091                               NULL,
4092                               NULL,
4093                               &peer_connect_handler,
4094                               &peer_disconnect_handler,
4095                               NULL,
4096                               NULL, GNUNET_NO,
4097                               NULL, GNUNET_NO,
4098                               p2p_handlers);
4099   if (NULL == core)
4100     {
4101       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
4102                   _("Failed to connect to `%s' service.\n"),
4103                   "core");
4104       GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
4105       connected_peers = NULL;
4106       GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
4107       query_request_map = NULL;
4108       GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
4109       requests_by_expiration_heap = NULL;
4110       GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
4111       peer_request_map = NULL;
4112       if (dsh != NULL)
4113         {
4114           GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4115           dsh = NULL;
4116         }
4117       return GNUNET_SYSERR;
4118     }
4119   /* FIXME: distinguish between sending and storing in options? */
4120   if (active_migration) 
4121     {
4122       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4123                   _("Content migration is enabled, will start to gather data\n"));
4124       consider_migration_gathering ();
4125     }
4126   consider_dht_put_gathering (NULL);
4127   GNUNET_SERVER_disconnect_notify (server, 
4128                                    &handle_client_disconnect,
4129                                    NULL);
4130   GNUNET_assert (GNUNET_OK ==
4131                  GNUNET_CONFIGURATION_get_value_filename (cfg,
4132                                                           "fs",
4133                                                           "TRUST",
4134                                                           &trustDirectory));
4135   GNUNET_DISK_directory_create (trustDirectory);
4136   GNUNET_SCHEDULER_add_with_priority (sched,
4137                                       GNUNET_SCHEDULER_PRIORITY_HIGH,
4138                                       &cron_flush_trust, NULL);
4139
4140
4141   GNUNET_SERVER_add_handlers (server, handlers);
4142   GNUNET_SCHEDULER_add_delayed (sched,
4143                                 GNUNET_TIME_UNIT_FOREVER_REL,
4144                                 &shutdown_task,
4145                                 NULL);
4146   return GNUNET_OK;
4147 }
4148
4149
4150 /**
4151  * Process fs requests.
4152  *
4153  * @param cls closure
4154  * @param sched scheduler to use
4155  * @param server the initialized server
4156  * @param cfg configuration to use
4157  */
4158 static void
4159 run (void *cls,
4160      struct GNUNET_SCHEDULER_Handle *sched,
4161      struct GNUNET_SERVER_Handle *server,
4162      const struct GNUNET_CONFIGURATION_Handle *cfg)
4163 {
4164   active_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
4165                                                            "FS",
4166                                                            "ACTIVEMIGRATION");
4167   dsh = GNUNET_DATASTORE_connect (cfg,
4168                                   sched);
4169   if (dsh == NULL)
4170     {
4171       GNUNET_SCHEDULER_shutdown (sched);
4172       return;
4173     }
4174   datastore_get_load = GNUNET_LOAD_value_init ();
4175   datastore_put_load = GNUNET_LOAD_value_init ();
4176   block_cfg = GNUNET_CONFIGURATION_create ();
4177   GNUNET_CONFIGURATION_set_value_string (block_cfg,
4178                                          "block",
4179                                          "PLUGINS",
4180                                          "fs");
4181   block_ctx = GNUNET_BLOCK_context_create (block_cfg);
4182   GNUNET_assert (NULL != block_ctx);
4183   dht_handle = GNUNET_DHT_connect (sched,
4184                                    cfg,
4185                                    FS_DHT_HT_SIZE);
4186   if ( (GNUNET_OK != GNUNET_FS_indexing_init (sched, cfg, dsh)) ||
4187        (GNUNET_OK != main_init (sched, server, cfg)) )
4188     {    
4189       GNUNET_SCHEDULER_shutdown (sched);
4190       GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4191       dsh = NULL;
4192       GNUNET_DHT_disconnect (dht_handle);
4193       dht_handle = NULL;
4194       GNUNET_BLOCK_context_destroy (block_ctx);
4195       block_ctx = NULL;
4196       GNUNET_CONFIGURATION_destroy (block_cfg);
4197       block_cfg = NULL;
4198       GNUNET_LOAD_value_free (datastore_get_load);
4199       datastore_get_load = NULL;
4200       GNUNET_LOAD_value_free (datastore_put_load);
4201       datastore_put_load = NULL;
4202       return;   
4203     }
4204 }
4205
4206
4207 /**
4208  * The main function for the fs service.
4209  *
4210  * @param argc number of arguments from the command line
4211  * @param argv command line arguments
4212  * @return 0 ok, 1 on error
4213  */
4214 int
4215 main (int argc, char *const *argv)
4216 {
4217   return (GNUNET_OK ==
4218           GNUNET_SERVICE_run (argc,
4219                               argv,
4220                               "fs",
4221                               GNUNET_SERVICE_OPTION_NONE,
4222                               &run, NULL)) ? 0 : 1;
4223 }
4224
4225 /* end of gnunet-service-fs.c */