DHT PUT integration into FS
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs.c
23  * @brief gnunet anonymity protocol implementation
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - track per-peer request latency (using new load API)
28  * - consider more precise latency estimation (per-peer & request) -- again load API?
29  * - implement test_load_too_high, make decision priority-based, implement forwarding, etc.
30  * - introduce random latency in processing
31  * - more statistics
32  */
33 #include "platform.h"
34 #include <float.h>
35 #include "gnunet_constants.h"
36 #include "gnunet_core_service.h"
37 #include "gnunet_dht_service.h"
38 #include "gnunet_datastore_service.h"
39 #include "gnunet_load_lib.h"
40 #include "gnunet_peer_lib.h"
41 #include "gnunet_protocols.h"
42 #include "gnunet_signatures.h"
43 #include "gnunet_statistics_service.h"
44 #include "gnunet_util_lib.h"
45 #include "gnunet-service-fs_indexing.h"
46 #include "fs.h"
47
48 #define DEBUG_FS GNUNET_NO
49
50 /**
51  * Maximum number of outgoing messages we queue per peer.
52  */
53 #define MAX_QUEUE_PER_PEER 16
54
55 /**
56  * Size for the hash map for DHT requests from the FS
57  * service.  Should be about the number of concurrent
58  * DHT requests we plan to make.
59  */
60 #define FS_DHT_HT_SIZE 1024
61
62 /**
63  * How often do we flush trust values to disk?
64  */
65 #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
66
67 /**
68  * How often do we at most PUT content into the DHT?
69  */
70 #define MAX_DHT_PUT_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
71
72 /**
73  * Inverse of the probability that we will submit the same query
74  * to the same peer again.  If the same peer already got the query
75  * repeatedly recently, the probability is multiplied by the inverse
76  * of this number each time.  Note that we only try about every TTL_DECREMENT/2
77  * plus MAX_CORK_DELAY (so roughly every 3.5s).
78  */
79 #define RETRY_PROBABILITY_INV 3
80
81 /**
82  * What is the maximum delay for a P2P FS message (in our interaction
83  * with core)?  FS-internal delays are another story.  The value is
84  * chosen based on the 32k block size.  Given that peers typcially
85  * have at least 1 kb/s bandwidth, 45s waits give us a chance to
86  * transmit one message even to the lowest-bandwidth peers.
87  */
88 #define MAX_TRANSMIT_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 45)
89
90 /**
91  * Maximum number of requests (from other peers) that we're
92  * willing to have pending at any given point in time.
93  */
94 static unsigned long long max_pending_requests = (32 * 1024);
95
96
97 /**
98  * Information we keep for each pending reply.  The
99  * actual message follows at the end of this struct.
100  */
101 struct PendingMessage;
102
103 /**
104  * Function called upon completion of a transmission.
105  *
106  * @param cls closure
107  * @param pid ID of receiving peer, 0 on transmission error
108  */
109 typedef void (*TransmissionContinuation)(void * cls, 
110                                          GNUNET_PEER_Id tpid);
111
112
113 /**
114  * Information we keep for each pending message (GET/PUT).  The
115  * actual message follows at the end of this struct.
116  */
117 struct PendingMessage
118 {
119   /**
120    * This is a doubly-linked list of messages to the same peer.
121    */
122   struct PendingMessage *next;
123
124   /**
125    * This is a doubly-linked list of messages to the same peer.
126    */
127   struct PendingMessage *prev;
128
129   /**
130    * Entry in pending message list for this pending message.
131    */ 
132   struct PendingMessageList *pml;  
133
134   /**
135    * Function to call immediately once we have transmitted this
136    * message.
137    */
138   TransmissionContinuation cont;
139
140   /**
141    * Closure for cont.
142    */
143   void *cont_cls;
144
145   /**
146    * Size of the reply; actual reply message follows
147    * at the end of this struct.
148    */
149   size_t msize;
150   
151   /**
152    * How important is this message for us?
153    */
154   uint32_t priority;
155  
156 };
157
158
159 /**
160  * Information about a peer that we are connected to.
161  * We track data that is useful for determining which
162  * peers should receive our requests.  We also keep
163  * a list of messages to transmit to this peer.
164  */
165 struct ConnectedPeer
166 {
167
168   /**
169    * List of the last clients for which this peer successfully
170    * answered a query.
171    */
172   struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
173
174   /**
175    * List of the last PIDs for which
176    * this peer successfully answered a query;
177    * We use 0 to indicate no successful reply.
178    */
179   GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
180
181   /**
182    * Average delay between sending the peer a request and
183    * getting a reply (only calculated over the requests for
184    * which we actually got a reply).   Calculated
185    * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
186    */ 
187   struct GNUNET_TIME_Relative avg_delay;
188
189   /**
190    * Point in time until which this peer does not want us to migrate content
191    * to it.
192    */
193   struct GNUNET_TIME_Absolute migration_blocked;
194
195   /**
196    * Time until when we blocked this peer from migrating
197    * data to us.
198    */
199   struct GNUNET_TIME_Absolute last_migration_block;
200
201   /**
202    * Handle for an active request for transmission to this
203    * peer, or NULL.
204    */
205   struct GNUNET_CORE_TransmitHandle *cth;
206
207   /**
208    * Messages (replies, queries, content migration) we would like to
209    * send to this peer in the near future.  Sorted by priority, head.
210    */
211   struct PendingMessage *pending_messages_head;
212
213   /**
214    * Messages (replies, queries, content migration) we would like to
215    * send to this peer in the near future.  Sorted by priority, tail.
216    */
217   struct PendingMessage *pending_messages_tail;
218
219   /**
220    * Average priority of successful replies.  Calculated
221    * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
222    */
223   double avg_priority;
224
225   /**
226    * Increase in traffic preference still to be submitted
227    * to the core service for this peer.
228    */
229   uint64_t inc_preference;
230
231   /**
232    * Trust rating for this peer
233    */
234   uint32_t trust;
235
236   /**
237    * Trust rating for this peer on disk.
238    */
239   uint32_t disk_trust;
240
241   /**
242    * The peer's identity.
243    */
244   GNUNET_PEER_Id pid;  
245
246   /**
247    * Size of the linked list of 'pending_messages'.
248    */
249   unsigned int pending_requests;
250
251   /**
252    * Which offset in "last_p2p_replies" will be updated next?
253    * (we go round-robin).
254    */
255   unsigned int last_p2p_replies_woff;
256
257   /**
258    * Which offset in "last_client_replies" will be updated next?
259    * (we go round-robin).
260    */
261   unsigned int last_client_replies_woff;
262
263 };
264
265
266 /**
267  * Information we keep for each pending request.  We should try to
268  * keep this struct as small as possible since its memory consumption
269  * is key to how many requests we can have pending at once.
270  */
271 struct PendingRequest;
272
273
274 /**
275  * Doubly-linked list of requests we are performing
276  * on behalf of the same client.
277  */
278 struct ClientRequestList
279 {
280
281   /**
282    * This is a doubly-linked list.
283    */
284   struct ClientRequestList *next;
285
286   /**
287    * This is a doubly-linked list.
288    */
289   struct ClientRequestList *prev;
290
291   /**
292    * Request this entry represents.
293    */
294   struct PendingRequest *req;
295
296   /**
297    * Client list this request belongs to.
298    */
299   struct ClientList *client_list;
300
301 };
302
303
304 /**
305  * Replies to be transmitted to the client.  The actual
306  * response message is allocated after this struct.
307  */
308 struct ClientResponseMessage
309 {
310   /**
311    * This is a doubly-linked list.
312    */
313   struct ClientResponseMessage *next;
314
315   /**
316    * This is a doubly-linked list.
317    */
318   struct ClientResponseMessage *prev;
319
320   /**
321    * Client list entry this response belongs to.
322    */
323   struct ClientList *client_list;
324
325   /**
326    * Number of bytes in the response.
327    */
328   size_t msize;
329 };
330
331
332 /**
333  * Linked list of clients we are performing requests
334  * for right now.
335  */
336 struct ClientList
337 {
338   /**
339    * This is a linked list.
340    */
341   struct ClientList *next;
342
343   /**
344    * ID of a client making a request, NULL if this entry is for a
345    * peer.
346    */
347   struct GNUNET_SERVER_Client *client;
348
349   /**
350    * Head of list of requests performed on behalf
351    * of this client right now.
352    */
353   struct ClientRequestList *rl_head;
354
355   /**
356    * Tail of list of requests performed on behalf
357    * of this client right now.
358    */
359   struct ClientRequestList *rl_tail;
360
361   /**
362    * Head of linked list of responses.
363    */
364   struct ClientResponseMessage *res_head;
365
366   /**
367    * Tail of linked list of responses.
368    */
369   struct ClientResponseMessage *res_tail;
370
371   /**
372    * Context for sending replies.
373    */
374   struct GNUNET_CONNECTION_TransmitHandle *th;
375
376 };
377
378
379 /**
380  * Doubly-linked list of messages we are performing
381  * due to a pending request.
382  */
383 struct PendingMessageList
384 {
385
386   /**
387    * This is a doubly-linked list of messages on behalf of the same request.
388    */
389   struct PendingMessageList *next;
390
391   /**
392    * This is a doubly-linked list of messages on behalf of the same request.
393    */
394   struct PendingMessageList *prev;
395
396   /**
397    * Message this entry represents.
398    */
399   struct PendingMessage *pm;
400
401   /**
402    * Request this entry belongs to.
403    */
404   struct PendingRequest *req;
405
406   /**
407    * Peer this message is targeted for.
408    */
409   struct ConnectedPeer *target;
410
411 };
412
413
414 /**
415  * Information we keep for each pending request.  We should try to
416  * keep this struct as small as possible since its memory consumption
417  * is key to how many requests we can have pending at once.
418  */
419 struct PendingRequest
420 {
421
422   /**
423    * If this request was made by a client, this is our entry in the
424    * client request list; otherwise NULL.
425    */
426   struct ClientRequestList *client_request_list;
427
428   /**
429    * Entry of peer responsible for this entry (if this request
430    * was made by a peer).
431    */
432   struct ConnectedPeer *cp;
433
434   /**
435    * If this is a namespace query, pointer to the hash of the public
436    * key of the namespace; otherwise NULL.  Pointer will be to the 
437    * end of this struct (so no need to free it).
438    */
439   const GNUNET_HashCode *namespace;
440
441   /**
442    * Bloomfilter we use to filter out replies that we don't care about
443    * (anymore).  NULL as long as we are interested in all replies.
444    */
445   struct GNUNET_CONTAINER_BloomFilter *bf;
446
447   /**
448    * Context of our GNUNET_CORE_peer_change_preference call.
449    */
450   struct GNUNET_CORE_InformationRequestContext *irc;
451
452   /**
453    * Reference to DHT get operation for this request (or NULL).
454    */
455   struct GNUNET_DHT_GetHandle *dht_get;
456
457   /**
458    * Hash code of all replies that we have seen so far (only valid
459    * if client is not NULL since we only track replies like this for
460    * our own clients).
461    */
462   GNUNET_HashCode *replies_seen;
463
464   /**
465    * Node in the heap representing this entry; NULL
466    * if we have no heap node.
467    */
468   struct GNUNET_CONTAINER_HeapNode *hnode;
469
470   /**
471    * Head of list of messages being performed on behalf of this
472    * request.
473    */
474   struct PendingMessageList *pending_head;
475
476   /**
477    * Tail of list of messages being performed on behalf of this
478    * request.
479    */
480   struct PendingMessageList *pending_tail;
481
482   /**
483    * When did we first see this request (form this peer), or, if our
484    * client is initiating, when did we last initiate a search?
485    */
486   struct GNUNET_TIME_Absolute start_time;
487
488   /**
489    * The query that this request is for.
490    */
491   GNUNET_HashCode query;
492
493   /**
494    * The task responsible for transmitting queries
495    * for this request.
496    */
497   GNUNET_SCHEDULER_TaskIdentifier task;
498
499   /**
500    * (Interned) Peer identifier that identifies a preferred target
501    * for requests.
502    */
503   GNUNET_PEER_Id target_pid;
504
505   /**
506    * (Interned) Peer identifiers of peers that have already
507    * received our query for this content.
508    */
509   GNUNET_PEER_Id *used_pids;
510   
511   /**
512    * Our entry in the queue (non-NULL while we wait for our
513    * turn to interact with the local database).
514    */
515   struct GNUNET_DATASTORE_QueueEntry *qe;
516
517   /**
518    * Size of the 'bf' (in bytes).
519    */
520   size_t bf_size;
521
522   /**
523    * Desired anonymity level; only valid for requests from a local client.
524    */
525   uint32_t anonymity_level;
526
527   /**
528    * How many entries in "used_pids" are actually valid?
529    */
530   unsigned int used_pids_off;
531
532   /**
533    * How long is the "used_pids" array?
534    */
535   unsigned int used_pids_size;
536
537   /**
538    * Number of results found for this request.
539    */
540   unsigned int results_found;
541
542   /**
543    * How many entries in "replies_seen" are actually valid?
544    */
545   unsigned int replies_seen_off;
546
547   /**
548    * How long is the "replies_seen" array?
549    */
550   unsigned int replies_seen_size;
551   
552   /**
553    * Priority with which this request was made.  If one of our clients
554    * made the request, then this is the current priority that we are
555    * using when initiating the request.  This value is used when
556    * we decide to reward other peers with trust for providing a reply.
557    */
558   uint32_t priority;
559
560   /**
561    * Priority points left for us to spend when forwarding this request
562    * to other peers.
563    */
564   uint32_t remaining_priority;
565
566   /**
567    * Number to mingle hashes for bloom-filter tests with.
568    */
569   int32_t mingle;
570
571   /**
572    * TTL with which we saw this request (or, if we initiated, TTL that
573    * we used for the request).
574    */
575   int32_t ttl;
576   
577   /**
578    * Type of the content that this request is for.
579    */
580   enum GNUNET_BLOCK_Type type;
581
582   /**
583    * Remove this request after transmission of the current response.
584    */
585   int16_t do_remove;
586
587   /**
588    * GNUNET_YES if we should not forward this request to other peers.
589    */
590   int16_t local_only;
591
592 };
593
594
595 /**
596  * Block that is ready for migration to other peers.  Actual data is at the end of the block.
597  */
598 struct MigrationReadyBlock
599 {
600
601   /**
602    * This is a doubly-linked list.
603    */
604   struct MigrationReadyBlock *next;
605
606   /**
607    * This is a doubly-linked list.
608    */
609   struct MigrationReadyBlock *prev;
610
611   /**
612    * Query for the block.
613    */
614   GNUNET_HashCode query;
615
616   /**
617    * When does this block expire? 
618    */
619   struct GNUNET_TIME_Absolute expiration;
620
621   /**
622    * Peers we would consider forwarding this
623    * block to.  Zero for empty entries.
624    */
625   GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
626
627   /**
628    * Size of the block.
629    */
630   size_t size;
631
632   /**
633    *  Number of targets already used.
634    */
635   unsigned int used_targets;
636
637   /**
638    * Type of the block.
639    */
640   enum GNUNET_BLOCK_Type type;
641 };
642
643
644 /**
645  * Our connection to the datastore.
646  */
647 static struct GNUNET_DATASTORE_Handle *dsh;
648
649 /**
650  * Our block context.
651  */
652 static struct GNUNET_BLOCK_Context *block_ctx;
653
654 /**
655  * Our block configuration.
656  */
657 static struct GNUNET_CONFIGURATION_Handle *block_cfg;
658
659 /**
660  * Our scheduler.
661  */
662 static struct GNUNET_SCHEDULER_Handle *sched;
663
664 /**
665  * Our configuration.
666  */
667 static const struct GNUNET_CONFIGURATION_Handle *cfg;
668
669 /**
670  * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
671  */
672 static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
673
674 /**
675  * Map of peer identifiers to "struct PendingRequest" (for that peer).
676  */
677 static struct GNUNET_CONTAINER_MultiHashMap *peer_request_map;
678
679 /**
680  * Map of query identifiers to "struct PendingRequest" (for that query).
681  */
682 static struct GNUNET_CONTAINER_MultiHashMap *query_request_map;
683
684 /**
685  * Heap with the request that will expire next at the top.  Contains
686  * pointers of type "struct PendingRequest*"; these will *also* be
687  * aliased from the "requests_by_peer" data structures and the
688  * "requests_by_query" table.  Note that requests from our clients
689  * don't expire and are thus NOT in the "requests_by_expiration"
690  * (or the "requests_by_peer" tables).
691  */
692 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
693
694 /**
695  * Handle for reporting statistics.
696  */
697 static struct GNUNET_STATISTICS_Handle *stats;
698
699 /**
700  * Linked list of clients we are currently processing requests for.
701  */
702 static struct ClientList *client_list;
703
704 /**
705  * Pointer to handle to the core service (points to NULL until we've
706  * connected to it).
707  */
708 static struct GNUNET_CORE_Handle *core;
709
710 /**
711  * Head of linked list of blocks that can be migrated.
712  */
713 static struct MigrationReadyBlock *mig_head;
714
715 /**
716  * Tail of linked list of blocks that can be migrated.
717  */
718 static struct MigrationReadyBlock *mig_tail;
719
720 /**
721  * Request to datastore for migration (or NULL).
722  */
723 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
724
725 /**
726  * Request to datastore for DHT PUTs (or NULL).
727  */
728 static struct GNUNET_DATASTORE_QueueEntry *dht_qe;
729
730 /**
731  * Type we will request for the next DHT PUT round from the datastore.
732  */
733 static enum GNUNET_BLOCK_Type dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
734
735 /**
736  * Where do we store trust information?
737  */
738 static char *trustDirectory;
739
740 /**
741  * ID of task that collects blocks for migration.
742  */
743 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
744
745 /**
746  * ID of task that collects blocks for DHT PUTs.
747  */
748 static GNUNET_SCHEDULER_TaskIdentifier dht_task;
749
750 /**
751  * What is the maximum frequency at which we are allowed to
752  * poll the datastore for migration content?
753  */
754 static struct GNUNET_TIME_Relative min_migration_delay;
755
756 /**
757  * Handle for DHT operations.
758  */
759 static struct GNUNET_DHT_Handle *dht_handle;
760
761 /**
762  * Size of the doubly-linked list of migration blocks.
763  */
764 static unsigned int mig_size;
765
766 /**
767  * Are we allowed to migrate content to this peer.
768  */
769 static int active_migration;
770
771 /**
772  * How many entires with zero anonymity do we currently estimate
773  * to have in the database?
774  */
775 static unsigned int zero_anonymity_count_estimate;
776
777 /**
778  * Typical priorities we're seeing from other peers right now.  Since
779  * most priorities will be zero, this value is the weighted average of
780  * non-zero priorities seen "recently".  In order to ensure that new
781  * values do not dramatically change the ratio, values are first
782  * "capped" to a reasonable range (+N of the current value) and then
783  * averaged into the existing value by a ratio of 1:N.  Hence
784  * receiving the largest possible priority can still only raise our
785  * "current_priorities" by at most 1.
786  */
787 static double current_priorities;
788
789 /**
790  * Datastore 'GET' load tracking.
791  */
792 static struct GNUNET_LOAD_Value *datastore_get_load;
793
794 /**
795  * Datastore 'PUT' load tracking.
796  */
797 static struct GNUNET_LOAD_Value *datastore_put_load;
798
799
800 /**
801  * We've just now completed a datastore request.  Update our
802  * datastore load calculations.
803  *
804  * @param start time when the datastore request was issued
805  */
806 static void
807 update_datastore_delays (struct GNUNET_TIME_Absolute start)
808 {
809   struct GNUNET_TIME_Relative delay;
810
811   delay = GNUNET_TIME_absolute_get_duration (start);
812   GNUNET_LOAD_update (datastore_get_load,
813                       delay.value);
814 }
815
816
817 /**
818  * Get the filename under which we would store the GNUNET_HELLO_Message
819  * for the given host and protocol.
820  * @return filename of the form DIRECTORY/HOSTID
821  */
822 static char *
823 get_trust_filename (const struct GNUNET_PeerIdentity *id)
824 {
825   struct GNUNET_CRYPTO_HashAsciiEncoded fil;
826   char *fn;
827
828   GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
829   GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil);
830   return fn;
831 }
832
833
834
835 /**
836  * Transmit messages by copying it to the target buffer
837  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
838  * for writing in the meantime.  In that case, do nothing
839  * (the disconnect or shutdown handler will take care of the rest).
840  * If we were able to transmit messages and there are still more
841  * pending, ask core again for further calls to this function.
842  *
843  * @param cls closure, pointer to the 'struct ConnectedPeer*'
844  * @param size number of bytes available in buf
845  * @param buf where the callee should write the message
846  * @return number of bytes written to buf
847  */
848 static size_t
849 transmit_to_peer (void *cls,
850                   size_t size, void *buf);
851
852
853 /* ******************* clean up functions ************************ */
854
855 /**
856  * Delete the given migration block.
857  *
858  * @param mb block to delete
859  */
860 static void
861 delete_migration_block (struct MigrationReadyBlock *mb)
862 {
863   GNUNET_CONTAINER_DLL_remove (mig_head,
864                                mig_tail,
865                                mb);
866   GNUNET_PEER_decrement_rcs (mb->target_list,
867                              MIGRATION_LIST_SIZE);
868   mig_size--;
869   GNUNET_free (mb);
870 }
871
872
873 /**
874  * Compare the distance of two peers to a key.
875  *
876  * @param key key
877  * @param p1 first peer
878  * @param p2 second peer
879  * @return GNUNET_YES if P1 is closer to key than P2
880  */
881 static int
882 is_closer (const GNUNET_HashCode *key,
883            const struct GNUNET_PeerIdentity *p1,
884            const struct GNUNET_PeerIdentity *p2)
885 {
886   return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey,
887                                     &p2->hashPubKey,
888                                     key);
889 }
890
891
892 /**
893  * Consider migrating content to a given peer.
894  *
895  * @param cls 'struct MigrationReadyBlock*' to select
896  *            targets for (or NULL for none)
897  * @param key ID of the peer 
898  * @param value 'struct ConnectedPeer' of the peer
899  * @return GNUNET_YES (always continue iteration)
900  */
901 static int
902 consider_migration (void *cls,
903                     const GNUNET_HashCode *key,
904                     void *value)
905 {
906   struct MigrationReadyBlock *mb = cls;
907   struct ConnectedPeer *cp = value;
908   struct MigrationReadyBlock *pos;
909   struct GNUNET_PeerIdentity cppid;
910   struct GNUNET_PeerIdentity otherpid;
911   struct GNUNET_PeerIdentity worstpid;
912   size_t msize;
913   unsigned int i;
914   unsigned int repl;
915   
916   /* consider 'cp' as a migration target for mb */
917   if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).value > 0)
918     return GNUNET_YES; /* peer has requested no migration! */
919   if (mb != NULL)
920     {
921       GNUNET_PEER_resolve (cp->pid,
922                            &cppid);
923       repl = MIGRATION_LIST_SIZE;
924       for (i=0;i<MIGRATION_LIST_SIZE;i++)
925         {
926           if (mb->target_list[i] == 0)
927             {
928               mb->target_list[i] = cp->pid;
929               GNUNET_PEER_change_rc (mb->target_list[i], 1);
930               repl = MIGRATION_LIST_SIZE;
931               break;
932             }
933           GNUNET_PEER_resolve (mb->target_list[i],
934                                &otherpid);
935           if ( (repl == MIGRATION_LIST_SIZE) &&
936                is_closer (&mb->query,
937                           &cppid,
938                           &otherpid)) 
939             {
940               repl = i;
941               worstpid = otherpid;
942             }
943           else if ( (repl != MIGRATION_LIST_SIZE) &&
944                     (is_closer (&mb->query,
945                                 &worstpid,
946                                 &otherpid) ) )
947             {
948               repl = i;
949               worstpid = otherpid;
950             }       
951         }
952       if (repl != MIGRATION_LIST_SIZE) 
953         {
954           GNUNET_PEER_change_rc (mb->target_list[repl], -1);
955           mb->target_list[repl] = cp->pid;
956           GNUNET_PEER_change_rc (mb->target_list[repl], 1);
957         }
958     }
959
960   /* consider scheduling transmission to cp for content migration */
961   if (cp->cth != NULL)
962     return GNUNET_YES; 
963   msize = 0;
964   pos = mig_head;
965   while (pos != NULL)
966     {
967       for (i=0;i<MIGRATION_LIST_SIZE;i++)
968         {
969           if (cp->pid == pos->target_list[i])
970             {
971               if (msize == 0)
972                 msize = pos->size;
973               else
974                 msize = GNUNET_MIN (msize,
975                                     pos->size);
976               break;
977             }
978         }
979       pos = pos->next;
980     }
981   if (msize == 0)
982     return GNUNET_YES; /* no content available */
983 #if DEBUG_FS
984   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
985               "Trying to migrate at least %u bytes to peer `%s'\n",
986               msize,
987               GNUNET_h2s (key));
988 #endif
989   cp->cth 
990     = GNUNET_CORE_notify_transmit_ready (core,
991                                          0, GNUNET_TIME_UNIT_FOREVER_REL,
992                                          (const struct GNUNET_PeerIdentity*) key,
993                                          msize + sizeof (struct PutMessage),
994                                          &transmit_to_peer,
995                                          cp);
996   return GNUNET_YES;
997 }
998
999
1000 /**
1001  * Task that is run periodically to obtain blocks for content
1002  * migration
1003  * 
1004  * @param cls unused
1005  * @param tc scheduler context (also unused)
1006  */
1007 static void
1008 gather_migration_blocks (void *cls,
1009                          const struct GNUNET_SCHEDULER_TaskContext *tc);
1010
1011
1012
1013
1014 /**
1015  * Task that is run periodically to obtain blocks for DHT PUTs.
1016  * 
1017  * @param cls type of blocks to gather
1018  * @param tc scheduler context (unused)
1019  */
1020 static void
1021 gather_dht_put_blocks (void *cls,
1022                        const struct GNUNET_SCHEDULER_TaskContext *tc);
1023
1024
1025 /**
1026  * If the migration task is not currently running, consider
1027  * (re)scheduling it with the appropriate delay.
1028  */
1029 static void
1030 consider_migration_gathering ()
1031 {
1032   struct GNUNET_TIME_Relative delay;
1033
1034   if (dsh == NULL)
1035     return;
1036   if (mig_qe != NULL)
1037     return;
1038   if (mig_task != GNUNET_SCHEDULER_NO_TASK)
1039     return;
1040   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
1041                                          mig_size);
1042   delay = GNUNET_TIME_relative_divide (delay,
1043                                        MAX_MIGRATION_QUEUE);
1044   delay = GNUNET_TIME_relative_max (delay,
1045                                     min_migration_delay);
1046   mig_task = GNUNET_SCHEDULER_add_delayed (sched,
1047                                            delay,
1048                                            &gather_migration_blocks,
1049                                            NULL);
1050 }
1051
1052
1053 /**
1054  * If the DHT PUT gathering task is not currently running, consider
1055  * (re)scheduling it with the appropriate delay.
1056  */
1057 static void
1058 consider_dht_put_gathering (void *cls)
1059 {
1060   struct GNUNET_TIME_Relative delay;
1061
1062   if (dsh == NULL)
1063     return;
1064   if (dht_qe != NULL)
1065     return;
1066   if (dht_task != GNUNET_SCHEDULER_NO_TASK)
1067     return;
1068   if (zero_anonymity_count_estimate > 0)
1069     {
1070       delay = GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY,
1071                                            zero_anonymity_count_estimate);
1072       delay = GNUNET_TIME_relative_min (delay,
1073                                         MAX_DHT_PUT_FREQ);
1074     }
1075   else
1076     {
1077       /* if we have NO zero-anonymity content yet, wait 5 minutes for some to
1078          (hopefully) appear */
1079       delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5);
1080     }
1081   dht_task = GNUNET_SCHEDULER_add_delayed (sched,
1082                                            delay,
1083                                            &gather_dht_put_blocks,
1084                                            cls);
1085 }
1086
1087
1088 /**
1089  * Process content offered for migration.
1090  *
1091  * @param cls closure
1092  * @param key key for the content
1093  * @param size number of bytes in data
1094  * @param data content stored
1095  * @param type type of the content
1096  * @param priority priority of the content
1097  * @param anonymity anonymity-level for the content
1098  * @param expiration expiration time for the content
1099  * @param uid unique identifier for the datum;
1100  *        maybe 0 if no unique identifier is available
1101  */
1102 static void
1103 process_migration_content (void *cls,
1104                            const GNUNET_HashCode * key,
1105                            size_t size,
1106                            const void *data,
1107                            enum GNUNET_BLOCK_Type type,
1108                            uint32_t priority,
1109                            uint32_t anonymity,
1110                            struct GNUNET_TIME_Absolute
1111                            expiration, uint64_t uid)
1112 {
1113   struct MigrationReadyBlock *mb;
1114   
1115   if (key == NULL)
1116     {
1117       mig_qe = NULL;
1118       if (mig_size < MAX_MIGRATION_QUEUE)  
1119         consider_migration_gathering ();
1120       return;
1121     }
1122   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1123     {
1124       if (GNUNET_OK !=
1125           GNUNET_FS_handle_on_demand_block (key, size, data,
1126                                             type, priority, anonymity,
1127                                             expiration, uid, 
1128                                             &process_migration_content,
1129                                             NULL))
1130         {
1131           GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1132         }
1133       return;
1134     }
1135 #if DEBUG_FS
1136   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1137               "Retrieved block `%s' of type %u for migration\n",
1138               GNUNET_h2s (key),
1139               type);
1140 #endif
1141   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
1142   mb->query = *key;
1143   mb->expiration = expiration;
1144   mb->size = size;
1145   mb->type = type;
1146   memcpy (&mb[1], data, size);
1147   GNUNET_CONTAINER_DLL_insert_after (mig_head,
1148                                      mig_tail,
1149                                      mig_tail,
1150                                      mb);
1151   mig_size++;
1152   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1153                                          &consider_migration,
1154                                          mb);
1155   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1156 }
1157
1158
1159 /**
1160  * Function called upon completion of the DHT PUT operation.
1161  */
1162 static void
1163 dht_put_continuation (void *cls,
1164                       const struct GNUNET_SCHEDULER_TaskContext *tc)
1165 {
1166   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1167 }
1168
1169
1170 /**
1171  * Store content in DHT.
1172  *
1173  * @param cls closure
1174  * @param key key for the content
1175  * @param size number of bytes in data
1176  * @param data content stored
1177  * @param type type of the content
1178  * @param priority priority of the content
1179  * @param anonymity anonymity-level for the content
1180  * @param expiration expiration time for the content
1181  * @param uid unique identifier for the datum;
1182  *        maybe 0 if no unique identifier is available
1183  */
1184 static void
1185 process_dht_put_content (void *cls,
1186                          const GNUNET_HashCode * key,
1187                          size_t size,
1188                          const void *data,
1189                          enum GNUNET_BLOCK_Type type,
1190                          uint32_t priority,
1191                          uint32_t anonymity,
1192                          struct GNUNET_TIME_Absolute
1193                          expiration, uint64_t uid)
1194
1195   static unsigned int counter;
1196   static GNUNET_HashCode last_vhash;
1197   static GNUNET_HashCode vhash;
1198
1199   if (key == NULL)
1200     {
1201       dht_qe = NULL;
1202       consider_dht_put_gathering (cls);
1203       return;
1204     }
1205   /* slightly funky code to estimate the total number of values with zero
1206      anonymity from the maximum observed length of a monotonically increasing 
1207      sequence of hashes over the contents */
1208   GNUNET_CRYPTO_hash (data, size, &vhash);
1209   if (GNUNET_CRYPTO_hash_cmp (&vhash, &last_vhash) <= 0)
1210     {
1211       if (zero_anonymity_count_estimate > 0)
1212         zero_anonymity_count_estimate /= 2;
1213       counter = 0;
1214     }
1215   last_vhash = vhash;
1216   if (counter < 31)
1217     counter++;
1218   if (zero_anonymity_count_estimate < (1 << counter))
1219     zero_anonymity_count_estimate = (1 << counter);
1220 #if DEBUG_FS
1221   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1222               "Retrieved block `%s' of type %u for DHT PUT\n",
1223               GNUNET_h2s (key),
1224               type);
1225 #endif
1226   GNUNET_DHT_put (dht_handle,
1227                   key,
1228                   GNUNET_DHT_RO_NONE,
1229                   type,
1230                   size,
1231                   data,
1232                   expiration,
1233                   GNUNET_TIME_UNIT_FOREVER_REL,
1234                   &dht_put_continuation,
1235                   cls);
1236 }
1237
1238
1239 /**
1240  * Task that is run periodically to obtain blocks for content
1241  * migration
1242  * 
1243  * @param cls unused
1244  * @param tc scheduler context (also unused)
1245  */
1246 static void
1247 gather_migration_blocks (void *cls,
1248                          const struct GNUNET_SCHEDULER_TaskContext *tc)
1249 {
1250   mig_task = GNUNET_SCHEDULER_NO_TASK;
1251   if (dsh != NULL)
1252     {
1253       mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, UINT_MAX,
1254                                             GNUNET_TIME_UNIT_FOREVER_REL,
1255                                             &process_migration_content, NULL);
1256       GNUNET_assert (mig_qe != NULL);
1257     }
1258 }
1259
1260
1261 /**
1262  * Task that is run periodically to obtain blocks for DHT PUTs.
1263  * 
1264  * @param cls type of blocks to gather
1265  * @param tc scheduler context (unused)
1266  */
1267 static void
1268 gather_dht_put_blocks (void *cls,
1269                        const struct GNUNET_SCHEDULER_TaskContext *tc)
1270 {
1271   dht_task = GNUNET_SCHEDULER_NO_TASK;
1272   if (dsh != NULL)
1273     {
1274       if (dht_put_type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1275         dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
1276       dht_qe = GNUNET_DATASTORE_get_zero_anonymity (dsh, 0, UINT_MAX,
1277                                                     GNUNET_TIME_UNIT_FOREVER_REL,
1278                                                     dht_put_type++,
1279                                                     &process_dht_put_content, NULL);
1280       GNUNET_assert (dht_qe != NULL);
1281     }
1282 }
1283
1284
1285 /**
1286  * We're done with a particular message list entry.
1287  * Free all associated resources.
1288  * 
1289  * @param pml entry to destroy
1290  */
1291 static void
1292 destroy_pending_message_list_entry (struct PendingMessageList *pml)
1293 {
1294   GNUNET_CONTAINER_DLL_remove (pml->req->pending_head,
1295                                pml->req->pending_tail,
1296                                pml);
1297   GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head,
1298                                pml->target->pending_messages_tail,
1299                                pml->pm);
1300   pml->target->pending_requests--;
1301   GNUNET_free (pml->pm);
1302   GNUNET_free (pml);
1303 }
1304
1305
1306 /**
1307  * Destroy the given pending message (and call the respective
1308  * continuation).
1309  *
1310  * @param pm message to destroy
1311  * @param tpid id of peer that the message was delivered to, or 0 for none
1312  */
1313 static void
1314 destroy_pending_message (struct PendingMessage *pm,
1315                          GNUNET_PEER_Id tpid)
1316 {
1317   struct PendingMessageList *pml = pm->pml;
1318   TransmissionContinuation cont;
1319   void *cont_cls;
1320
1321   if (pml != NULL)
1322     {
1323       GNUNET_assert (pml->pm == pm);
1324       GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
1325       cont = pm->cont;
1326       cont_cls = pm->cont_cls;
1327       destroy_pending_message_list_entry (pml);
1328     }
1329   else
1330     {
1331       GNUNET_free (pm);
1332     }
1333   if (cont != NULL)
1334     cont (cont_cls, tpid);  
1335 }
1336
1337
1338 /**
1339  * We're done processing a particular request.
1340  * Free all associated resources.
1341  *
1342  * @param pr request to destroy
1343  */
1344 static void
1345 destroy_pending_request (struct PendingRequest *pr)
1346 {
1347   struct GNUNET_PeerIdentity pid;
1348
1349   if (pr->hnode != NULL)
1350     {
1351       GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
1352                                          pr->hnode);
1353       pr->hnode = NULL;
1354     }
1355   if (NULL == pr->client_request_list)
1356     {
1357       GNUNET_STATISTICS_update (stats,
1358                                 gettext_noop ("# P2P searches active"),
1359                                 -1,
1360                                 GNUNET_NO);
1361     }
1362   else
1363     {
1364       GNUNET_STATISTICS_update (stats,
1365                                 gettext_noop ("# client searches active"),
1366                                 -1,
1367                                 GNUNET_NO);
1368     }
1369   /* might have already been removed from map in 'process_reply' (if
1370      there was a unique reply) or never inserted if it was a
1371      duplicate; hence ignore the return value here */
1372   (void) GNUNET_CONTAINER_multihashmap_remove (query_request_map,
1373                                                &pr->query,
1374                                                pr);
1375   if (pr->qe != NULL)
1376      {
1377       GNUNET_DATASTORE_cancel (pr->qe);
1378       pr->qe = NULL;
1379     }
1380   if (pr->dht_get != NULL)
1381     {
1382       GNUNET_DHT_get_stop (pr->dht_get);
1383       pr->dht_get = NULL;
1384     }
1385   if (pr->client_request_list != NULL)
1386     {
1387       GNUNET_CONTAINER_DLL_remove (pr->client_request_list->client_list->rl_head,
1388                                    pr->client_request_list->client_list->rl_tail,
1389                                    pr->client_request_list);
1390       GNUNET_free (pr->client_request_list);
1391       pr->client_request_list = NULL;
1392     }
1393   if (pr->cp != NULL)
1394     {
1395       GNUNET_PEER_resolve (pr->cp->pid,
1396                            &pid);
1397       (void) GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1398                                                    &pid.hashPubKey,
1399                                                    pr);
1400       pr->cp = NULL;
1401     }
1402   if (pr->bf != NULL)
1403     {
1404       GNUNET_CONTAINER_bloomfilter_free (pr->bf);                                        
1405       pr->bf = NULL;
1406     }
1407   if (pr->irc != NULL)
1408     {
1409       GNUNET_CORE_peer_change_preference_cancel (pr->irc);
1410       pr->irc = NULL;
1411     }
1412   if (pr->replies_seen != NULL)
1413     {
1414       GNUNET_free (pr->replies_seen);
1415       pr->replies_seen = NULL;
1416     }
1417   if (pr->task != GNUNET_SCHEDULER_NO_TASK)
1418     {
1419       GNUNET_SCHEDULER_cancel (sched,
1420                                pr->task);
1421       pr->task = GNUNET_SCHEDULER_NO_TASK;
1422     }
1423   while (NULL != pr->pending_head)    
1424     destroy_pending_message_list_entry (pr->pending_head);
1425   GNUNET_PEER_change_rc (pr->target_pid, -1);
1426   if (pr->used_pids != NULL)
1427     {
1428       GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
1429       GNUNET_free (pr->used_pids);
1430       pr->used_pids_off = 0;
1431       pr->used_pids_size = 0;
1432       pr->used_pids = NULL;
1433     }
1434   GNUNET_free (pr);
1435 }
1436
1437
1438 /**
1439  * Method called whenever a given peer connects.
1440  *
1441  * @param cls closure, not used
1442  * @param peer peer identity this notification is about
1443  * @param latency reported latency of the connection with 'other'
1444  * @param distance reported distance (DV) to 'other' 
1445  */
1446 static void 
1447 peer_connect_handler (void *cls,
1448                       const struct
1449                       GNUNET_PeerIdentity * peer,
1450                       struct GNUNET_TIME_Relative latency,
1451                       uint32_t distance)
1452 {
1453   struct ConnectedPeer *cp;
1454   struct MigrationReadyBlock *pos;
1455   char *fn;
1456   uint32_t trust;
1457   
1458   cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
1459   cp->pid = GNUNET_PEER_intern (peer);
1460
1461   fn = get_trust_filename (peer);
1462   if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
1463       (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
1464     cp->disk_trust = cp->trust = ntohl (trust);
1465   GNUNET_free (fn);
1466
1467   GNUNET_break (GNUNET_OK ==
1468                 GNUNET_CONTAINER_multihashmap_put (connected_peers,
1469                                                    &peer->hashPubKey,
1470                                                    cp,
1471                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1472
1473   pos = mig_head;
1474   while (NULL != pos)
1475     {
1476       (void) consider_migration (pos, &peer->hashPubKey, cp);
1477       pos = pos->next;
1478     }
1479 }
1480
1481
1482 /**
1483  * Increase the host credit by a value.
1484  *
1485  * @param host which peer to change the trust value on
1486  * @param value is the int value by which the
1487  *  host credit is to be increased or decreased
1488  * @returns the actual change in trust (positive or negative)
1489  */
1490 static int
1491 change_host_trust (struct ConnectedPeer *host, int value)
1492 {
1493   unsigned int old_trust;
1494
1495   if (value == 0)
1496     return 0;
1497   GNUNET_assert (host != NULL);
1498   old_trust = host->trust;
1499   if (value > 0)
1500     {
1501       if (host->trust + value < host->trust)
1502         {
1503           value = UINT32_MAX - host->trust;
1504           host->trust = UINT32_MAX;
1505         }
1506       else
1507         host->trust += value;
1508     }
1509   else
1510     {
1511       if (host->trust < -value)
1512         {
1513           value = -host->trust;
1514           host->trust = 0;
1515         }
1516       else
1517         host->trust += value;
1518     }
1519   return value;
1520 }
1521
1522
1523 /**
1524  * Write host-trust information to a file - flush the buffer entry!
1525  */
1526 static int
1527 flush_trust (void *cls,
1528              const GNUNET_HashCode *key,
1529              void *value)
1530 {
1531   struct ConnectedPeer *host = value;
1532   char *fn;
1533   uint32_t trust;
1534   struct GNUNET_PeerIdentity pid;
1535
1536   if (host->trust == host->disk_trust)
1537     return GNUNET_OK;                     /* unchanged */
1538   GNUNET_PEER_resolve (host->pid,
1539                        &pid);
1540   fn = get_trust_filename (&pid);
1541   if (host->trust == 0)
1542     {
1543       if ((0 != UNLINK (fn)) && (errno != ENOENT))
1544         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
1545                                   GNUNET_ERROR_TYPE_BULK, "unlink", fn);
1546     }
1547   else
1548     {
1549       trust = htonl (host->trust);
1550       if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust, 
1551                                                     sizeof(uint32_t),
1552                                                     GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE
1553                                                     | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ))
1554         host->disk_trust = host->trust;
1555     }
1556   GNUNET_free (fn);
1557   return GNUNET_OK;
1558 }
1559
1560 /**
1561  * Call this method periodically to scan data/hosts for new hosts.
1562  */
1563 static void
1564 cron_flush_trust (void *cls,
1565                   const struct GNUNET_SCHEDULER_TaskContext *tc)
1566 {
1567
1568   if (NULL == connected_peers)
1569     return;
1570   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1571                                          &flush_trust,
1572                                          NULL);
1573   if (NULL == tc)
1574     return;
1575   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1576     return;
1577   GNUNET_SCHEDULER_add_delayed (tc->sched,
1578                                 TRUST_FLUSH_FREQ, &cron_flush_trust, NULL);
1579 }
1580
1581
1582 /**
1583  * Free (each) request made by the peer.
1584  *
1585  * @param cls closure, points to peer that the request belongs to
1586  * @param key current key code
1587  * @param value value in the hash map
1588  * @return GNUNET_YES (we should continue to iterate)
1589  */
1590 static int
1591 destroy_request (void *cls,
1592                  const GNUNET_HashCode * key,
1593                  void *value)
1594 {
1595   const struct GNUNET_PeerIdentity * peer = cls;
1596   struct PendingRequest *pr = value;
1597   
1598   GNUNET_break (GNUNET_YES ==
1599                 GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1600                                                       &peer->hashPubKey,
1601                                                       pr));
1602   destroy_pending_request (pr);
1603   return GNUNET_YES;
1604 }
1605
1606
1607 /**
1608  * Method called whenever a peer disconnects.
1609  *
1610  * @param cls closure, not used
1611  * @param peer peer identity this notification is about
1612  */
1613 static void
1614 peer_disconnect_handler (void *cls,
1615                          const struct
1616                          GNUNET_PeerIdentity * peer)
1617 {
1618   struct ConnectedPeer *cp;
1619   struct PendingMessage *pm;
1620   unsigned int i;
1621   struct MigrationReadyBlock *pos;
1622   struct MigrationReadyBlock *next;
1623
1624   GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map,
1625                                               &peer->hashPubKey,
1626                                               &destroy_request,
1627                                               (void*) peer);
1628   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1629                                           &peer->hashPubKey);
1630   if (cp == NULL)
1631     return;
1632   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1633     {
1634       if (NULL != cp->last_client_replies[i])
1635         {
1636           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1637           cp->last_client_replies[i] = NULL;
1638         }
1639     }
1640   GNUNET_break (GNUNET_YES ==
1641                 GNUNET_CONTAINER_multihashmap_remove (connected_peers,
1642                                                       &peer->hashPubKey,
1643                                                       cp));
1644   /* remove this peer from migration considerations; schedule
1645      alternatives */
1646   next = mig_head;
1647   while (NULL != (pos = next))
1648     {
1649       next = pos->next;
1650       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1651         {
1652           if (pos->target_list[i] == cp->pid)
1653             {
1654               GNUNET_PEER_change_rc (pos->target_list[i], -1);
1655               pos->target_list[i] = 0;
1656             }
1657          }
1658       if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers))
1659         {
1660           delete_migration_block (pos);
1661           consider_migration_gathering ();
1662           continue;
1663         }
1664       GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1665                                              &consider_migration,
1666                                              pos);
1667     }
1668   GNUNET_PEER_change_rc (cp->pid, -1);
1669   GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1670   if (NULL != cp->cth)
1671     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1672   while (NULL != (pm = cp->pending_messages_head))
1673     destroy_pending_message (pm, 0 /* delivery failed */);
1674   GNUNET_break (0 == cp->pending_requests);
1675   GNUNET_free (cp);
1676 }
1677
1678
1679 /**
1680  * Iterator over hash map entries that removes all occurences
1681  * of the given 'client' from the 'last_client_replies' of the
1682  * given connected peer.
1683  *
1684  * @param cls closure, the 'struct GNUNET_SERVER_Client*' to remove
1685  * @param key current key code (unused)
1686  * @param value value in the hash map (the 'struct ConnectedPeer*' to change)
1687  * @return GNUNET_YES (we should continue to iterate)
1688  */
1689 static int
1690 remove_client_from_last_client_replies (void *cls,
1691                                         const GNUNET_HashCode * key,
1692                                         void *value)
1693 {
1694   struct GNUNET_SERVER_Client *client = cls;
1695   struct ConnectedPeer *cp = value;
1696   unsigned int i;
1697
1698   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1699     {
1700       if (cp->last_client_replies[i] == client)
1701         {
1702           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1703           cp->last_client_replies[i] = NULL;
1704         }
1705     }  
1706   return GNUNET_YES;
1707 }
1708
1709
1710 /**
1711  * A client disconnected.  Remove all of its pending queries.
1712  *
1713  * @param cls closure, NULL
1714  * @param client identification of the client
1715  */
1716 static void
1717 handle_client_disconnect (void *cls,
1718                           struct GNUNET_SERVER_Client
1719                           * client)
1720 {
1721   struct ClientList *pos;
1722   struct ClientList *prev;
1723   struct ClientRequestList *rcl;
1724   struct ClientResponseMessage *creply;
1725
1726   if (client == NULL)
1727     return;
1728   prev = NULL;
1729   pos = client_list;
1730   while ( (NULL != pos) &&
1731           (pos->client != client) )
1732     {
1733       prev = pos;
1734       pos = pos->next;
1735     }
1736   if (pos == NULL)
1737     return; /* no requests pending for this client */
1738   while (NULL != (rcl = pos->rl_head))
1739     {
1740       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1741                   "Destroying pending request `%s' on disconnect\n",
1742                   GNUNET_h2s (&rcl->req->query));
1743       destroy_pending_request (rcl->req);
1744     }
1745   if (prev == NULL)
1746     client_list = pos->next;
1747   else
1748     prev->next = pos->next;
1749   if (pos->th != NULL)
1750     {
1751       GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
1752       pos->th = NULL;
1753     }
1754   while (NULL != (creply = pos->res_head))
1755     {
1756       GNUNET_CONTAINER_DLL_remove (pos->res_head,
1757                                    pos->res_tail,
1758                                    creply);
1759       GNUNET_free (creply);
1760     }    
1761   GNUNET_SERVER_client_drop (pos->client);
1762   GNUNET_free (pos);
1763   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1764                                          &remove_client_from_last_client_replies,
1765                                          client);
1766 }
1767
1768
1769 /**
1770  * Iterator to free peer entries.
1771  *
1772  * @param cls closure, unused
1773  * @param key current key code
1774  * @param value value in the hash map (peer entry)
1775  * @return GNUNET_YES (we should continue to iterate)
1776  */
1777 static int 
1778 clean_peer (void *cls,
1779             const GNUNET_HashCode * key,
1780             void *value)
1781 {
1782   peer_disconnect_handler (NULL, (const struct GNUNET_PeerIdentity*) key);
1783   return GNUNET_YES;
1784 }
1785
1786
1787 /**
1788  * Task run during shutdown.
1789  *
1790  * @param cls unused
1791  * @param tc unused
1792  */
1793 static void
1794 shutdown_task (void *cls,
1795                const struct GNUNET_SCHEDULER_TaskContext *tc)
1796 {
1797   if (mig_qe != NULL)
1798     {
1799       GNUNET_DATASTORE_cancel (mig_qe);
1800       mig_qe = NULL;
1801     }
1802   if (dht_qe != NULL)
1803     {
1804       GNUNET_DATASTORE_cancel (dht_qe);
1805       dht_qe = NULL;
1806     }
1807   if (GNUNET_SCHEDULER_NO_TASK != mig_task)
1808     {
1809       GNUNET_SCHEDULER_cancel (sched, mig_task);
1810       mig_task = GNUNET_SCHEDULER_NO_TASK;
1811     }
1812   if (GNUNET_SCHEDULER_NO_TASK != dht_task)
1813     {
1814       GNUNET_SCHEDULER_cancel (sched, dht_task);
1815       dht_task = GNUNET_SCHEDULER_NO_TASK;
1816     }
1817   while (client_list != NULL)
1818     handle_client_disconnect (NULL,
1819                               client_list->client);
1820   cron_flush_trust (NULL, NULL);
1821   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1822                                          &clean_peer,
1823                                          NULL);
1824   GNUNET_break (0 == GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap));
1825   GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
1826   requests_by_expiration_heap = 0;
1827   GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
1828   connected_peers = NULL;
1829   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map));
1830   GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
1831   query_request_map = NULL;
1832   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map));
1833   GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
1834   peer_request_map = NULL;
1835   GNUNET_assert (NULL != core);
1836   GNUNET_CORE_disconnect (core);
1837   core = NULL;
1838   if (stats != NULL)
1839     {
1840       GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
1841       stats = NULL;
1842     }
1843   if (dsh != NULL)
1844     {
1845       GNUNET_DATASTORE_disconnect (dsh,
1846                                    GNUNET_NO);
1847       dsh = NULL;
1848     }
1849   while (mig_head != NULL)
1850     delete_migration_block (mig_head);
1851   GNUNET_assert (0 == mig_size);
1852   GNUNET_DHT_disconnect (dht_handle);
1853   dht_handle = NULL;
1854   GNUNET_LOAD_value_free (datastore_get_load);
1855   datastore_get_load = NULL;
1856   GNUNET_LOAD_value_free (datastore_put_load);
1857   datastore_put_load = NULL;
1858   GNUNET_BLOCK_context_destroy (block_ctx);
1859   block_ctx = NULL;
1860   GNUNET_CONFIGURATION_destroy (block_cfg);
1861   block_cfg = NULL;
1862   sched = NULL;
1863   cfg = NULL;  
1864   GNUNET_free_non_null (trustDirectory);
1865   trustDirectory = NULL;
1866 }
1867
1868
1869 /* ******************* Utility functions  ******************** */
1870
1871
1872 /**
1873  * Transmit messages by copying it to the target buffer
1874  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
1875  * for writing in the meantime.  In that case, do nothing
1876  * (the disconnect or shutdown handler will take care of the rest).
1877  * If we were able to transmit messages and there are still more
1878  * pending, ask core again for further calls to this function.
1879  *
1880  * @param cls closure, pointer to the 'struct ConnectedPeer*'
1881  * @param size number of bytes available in buf
1882  * @param buf where the callee should write the message
1883  * @return number of bytes written to buf
1884  */
1885 static size_t
1886 transmit_to_peer (void *cls,
1887                   size_t size, void *buf)
1888 {
1889   struct ConnectedPeer *cp = cls;
1890   char *cbuf = buf;
1891   struct GNUNET_PeerIdentity pid;
1892   struct PendingMessage *pm;
1893   struct MigrationReadyBlock *mb;
1894   struct MigrationReadyBlock *next;
1895   struct PutMessage migm;
1896   size_t msize;
1897   unsigned int i;
1898  
1899   cp->cth = NULL;
1900   if (NULL == buf)
1901     {
1902 #if DEBUG_FS
1903       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1904                   "Dropping message, core too busy.\n");
1905 #endif
1906       return 0;
1907     }
1908   msize = 0;
1909   while ( (NULL != (pm = cp->pending_messages_head) ) &&
1910           (pm->msize <= size) )
1911     {
1912       memcpy (&cbuf[msize], &pm[1], pm->msize);
1913       msize += pm->msize;
1914       size -= pm->msize;
1915       destroy_pending_message (pm, cp->pid);
1916     }
1917   if (NULL != pm)
1918     {
1919       GNUNET_PEER_resolve (cp->pid,
1920                            &pid);
1921       cp->cth = GNUNET_CORE_notify_transmit_ready (core,
1922                                                    pm->priority,
1923                                                    GNUNET_CONSTANTS_SERVICE_TIMEOUT,
1924                                                    &pid,
1925                                                    pm->msize,
1926                                                    &transmit_to_peer,
1927                                                    cp);
1928     }
1929   else
1930     {      
1931       next = mig_head;
1932       while (NULL != (mb = next))
1933         {
1934           next = mb->next;
1935           for (i=0;i<MIGRATION_LIST_SIZE;i++)
1936             {
1937               if ( (cp->pid == mb->target_list[i]) &&
1938                    (mb->size + sizeof (migm) <= size) )
1939                 {
1940                   GNUNET_PEER_change_rc (mb->target_list[i], -1);
1941                   mb->target_list[i] = 0;
1942                   mb->used_targets++;
1943                   memset (&migm, 0, sizeof (migm));
1944                   migm.header.size = htons (sizeof (migm) + mb->size);
1945                   migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
1946                   migm.type = htonl (mb->type);
1947                   migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
1948                   memcpy (&cbuf[msize], &migm, sizeof (migm));
1949                   msize += sizeof (migm);
1950                   size -= sizeof (migm);
1951                   memcpy (&cbuf[msize], &mb[1], mb->size);
1952                   msize += mb->size;
1953                   size -= mb->size;
1954 #if DEBUG_FS
1955                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1956                               "Pushing migration block `%s' (%u bytes) to `%s'\n",
1957                               GNUNET_h2s (&mb->query),
1958                               mb->size,
1959                               GNUNET_i2s (&pid));
1960 #endif    
1961                   break;
1962                 }
1963               else
1964                 {
1965 #if DEBUG_FS
1966                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1967                               "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
1968                               GNUNET_h2s (&mb->query),
1969                               mb->size,
1970                               GNUNET_i2s (&pid));
1971 #endif    
1972                 }
1973             }
1974           if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
1975                (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
1976             {
1977               delete_migration_block (mb);
1978               consider_migration_gathering ();
1979             }
1980         }
1981       consider_migration (NULL, 
1982                           &pid.hashPubKey,
1983                           cp);
1984     }
1985 #if DEBUG_FS
1986   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1987               "Transmitting %u bytes to peer %u\n",
1988               msize,
1989               cp->pid);
1990 #endif
1991   return msize;
1992 }
1993
1994
1995 /**
1996  * Add a message to the set of pending messages for the given peer.
1997  *
1998  * @param cp peer to send message to
1999  * @param pm message to queue
2000  * @param pr request on which behalf this message is being queued
2001  */
2002 static void
2003 add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
2004                                   struct PendingMessage *pm,
2005                                   struct PendingRequest *pr)
2006 {
2007   struct PendingMessage *pos;
2008   struct PendingMessageList *pml;
2009   struct GNUNET_PeerIdentity pid;
2010
2011   GNUNET_assert (pm->next == NULL);
2012   GNUNET_assert (pm->pml == NULL);    
2013   if (pr != NULL)
2014     {
2015       pml = GNUNET_malloc (sizeof (struct PendingMessageList));
2016       pml->req = pr;
2017       pml->target = cp;
2018       pml->pm = pm;
2019       pm->pml = pml;  
2020       GNUNET_CONTAINER_DLL_insert (pr->pending_head,
2021                                    pr->pending_tail,
2022                                    pml);
2023     }
2024   pos = cp->pending_messages_head;
2025   while ( (pos != NULL) &&
2026           (pm->priority < pos->priority) )
2027     pos = pos->next;    
2028   GNUNET_CONTAINER_DLL_insert_after (cp->pending_messages_head,
2029                                      cp->pending_messages_tail,
2030                                      pos,
2031                                      pm);
2032   cp->pending_requests++;
2033   if (cp->pending_requests > MAX_QUEUE_PER_PEER)
2034     destroy_pending_message (cp->pending_messages_tail, 0);  
2035   GNUNET_PEER_resolve (cp->pid, &pid);
2036   if (NULL != cp->cth)
2037     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
2038   /* need to schedule transmission */
2039   cp->cth = GNUNET_CORE_notify_transmit_ready (core,
2040                                                cp->pending_messages_head->priority,
2041                                                MAX_TRANSMIT_DELAY,
2042                                                &pid,
2043                                                cp->pending_messages_head->msize,
2044                                                &transmit_to_peer,
2045                                                cp);
2046   if (cp->cth == NULL)
2047     {
2048 #if DEBUG_FS
2049       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2050                   "Failed to schedule transmission with core!\n");
2051 #endif
2052       GNUNET_STATISTICS_update (stats,
2053                                 gettext_noop ("# CORE transmission failures"),
2054                                 1,
2055                                 GNUNET_NO);
2056     }
2057 }
2058
2059
2060 /**
2061  * Test if the load on this peer is too high
2062  * to even consider processing the query at
2063  * all.
2064  * 
2065  * @return GNUNET_YES if the load is too high to do anything, GNUNET_NO to forward (load high, but not too high), GNUNET_SYSERR to indirect (load low)
2066  */
2067 static int
2068 test_load_too_high ()
2069 {
2070   return GNUNET_SYSERR; // FIXME
2071 }
2072
2073
2074 /* ******************* Pending Request Refresh Task ******************** */
2075
2076
2077
2078 /**
2079  * We use a random delay to make the timing of requests less
2080  * predictable.  This function returns such a random delay.  We add a base
2081  * delay of MAX_CORK_DELAY (1s).
2082  *
2083  * FIXME: make schedule dependent on the specifics of the request?
2084  * Or bandwidth and number of connected peers and load?
2085  *
2086  * @return random delay to use for some request, between 1s and 1000+TTL_DECREMENT ms
2087  */
2088 static struct GNUNET_TIME_Relative
2089 get_processing_delay ()
2090 {
2091   return 
2092     GNUNET_TIME_relative_add (GNUNET_CONSTANTS_MAX_CORK_DELAY,
2093                               GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
2094                                                              GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2095                                                                                        TTL_DECREMENT)));
2096 }
2097
2098
2099 /**
2100  * We're processing a GET request from another peer and have decided
2101  * to forward it to other peers.  This function is called periodically
2102  * and should forward the request to other peers until we have all
2103  * possible replies.  If we have transmitted the *only* reply to
2104  * the initiator we should destroy the pending request.  If we have
2105  * many replies in the queue to the initiator, we should delay sending
2106  * out more queries until the reply queue has shrunk some.
2107  *
2108  * @param cls our "struct ProcessGetContext *"
2109  * @param tc unused
2110  */
2111 static void
2112 forward_request_task (void *cls,
2113                       const struct GNUNET_SCHEDULER_TaskContext *tc);
2114
2115
2116 /**
2117  * Function called after we either failed or succeeded
2118  * at transmitting a query to a peer.  
2119  *
2120  * @param cls the requests "struct PendingRequest*"
2121  * @param tpid ID of receiving peer, 0 on transmission error
2122  */
2123 static void
2124 transmit_query_continuation (void *cls,
2125                              GNUNET_PEER_Id tpid)
2126 {
2127   struct PendingRequest *pr = cls;
2128
2129   GNUNET_STATISTICS_update (stats,
2130                             gettext_noop ("# queries scheduled for forwarding"),
2131                             -1,
2132                             GNUNET_NO);
2133   if (tpid == 0)   
2134     {
2135 #if DEBUG_FS
2136       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2137                   "Transmission of request failed, will try again later.\n");
2138 #endif
2139       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2140         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2141                                                  get_processing_delay (),
2142                                                  &forward_request_task,
2143                                                  pr); 
2144       return;    
2145     }
2146   GNUNET_STATISTICS_update (stats,
2147                             gettext_noop ("# queries forwarded"),
2148                             1,
2149                             GNUNET_NO);
2150   GNUNET_PEER_change_rc (tpid, 1);
2151   if (pr->used_pids_off == pr->used_pids_size)
2152     GNUNET_array_grow (pr->used_pids,
2153                        pr->used_pids_size,
2154                        pr->used_pids_size * 2 + 2);
2155   pr->used_pids[pr->used_pids_off++] = tpid;
2156   if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2157     pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2158                                              get_processing_delay (),
2159                                              &forward_request_task,
2160                                              pr);
2161 }
2162
2163
2164 /**
2165  * How many bytes should a bloomfilter be if we have already seen
2166  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
2167  * of bits set per entry.  Furthermore, we should not re-size the
2168  * filter too often (to keep it cheap).
2169  *
2170  * Since other peers will also add entries but not resize the filter,
2171  * we should generally pick a slightly larger size than what the
2172  * strict math would suggest.
2173  *
2174  * @return must be a power of two and smaller or equal to 2^15.
2175  */
2176 static size_t
2177 compute_bloomfilter_size (unsigned int entry_count)
2178 {
2179   size_t size;
2180   unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
2181   uint16_t max = 1 << 15;
2182
2183   if (entry_count > max)
2184     return max;
2185   size = 8;
2186   while ((size < max) && (size < ideal))
2187     size *= 2;
2188   if (size > max)
2189     return max;
2190   return size;
2191 }
2192
2193
2194 /**
2195  * Recalculate our bloom filter for filtering replies.  This function
2196  * will create a new bloom filter from scratch, so it should only be
2197  * called if we have no bloomfilter at all (and hence can create a
2198  * fresh one of minimal size without problems) OR if our peer is the
2199  * initiator (in which case we may resize to larger than mimimum size).
2200  *
2201  * @param pr request for which the BF is to be recomputed
2202  */
2203 static void
2204 refresh_bloomfilter (struct PendingRequest *pr)
2205 {
2206   unsigned int i;
2207   size_t nsize;
2208   GNUNET_HashCode mhash;
2209
2210   nsize = compute_bloomfilter_size (pr->replies_seen_off);
2211   if (nsize == pr->bf_size)
2212     return; /* size not changed */
2213   if (pr->bf != NULL)
2214     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
2215   pr->bf_size = nsize;
2216   pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
2217   pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
2218                                               pr->bf_size,
2219                                               BLOOMFILTER_K);
2220   for (i=0;i<pr->replies_seen_off;i++)
2221     {
2222       GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i],
2223                                 pr->mingle,
2224                                 &mhash);
2225       GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
2226     }
2227 }
2228
2229
2230 /**
2231  * Function called after we've tried to reserve a certain amount of
2232  * bandwidth for a reply.  Check if we succeeded and if so send our
2233  * query.
2234  *
2235  * @param cls the requests "struct PendingRequest*"
2236  * @param peer identifies the peer
2237  * @param bpm_in set to the current bandwidth limit (receiving) for this peer
2238  * @param bpm_out set to the current bandwidth limit (sending) for this peer
2239  * @param amount set to the amount that was actually reserved or unreserved
2240  * @param preference current traffic preference for the given peer
2241  */
2242 static void
2243 target_reservation_cb (void *cls,
2244                        const struct
2245                        GNUNET_PeerIdentity * peer,
2246                        struct GNUNET_BANDWIDTH_Value32NBO bpm_in,
2247                        struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
2248                        int amount,
2249                        uint64_t preference)
2250 {
2251   struct PendingRequest *pr = cls;
2252   struct ConnectedPeer *cp;
2253   struct PendingMessage *pm;
2254   struct GetMessage *gm;
2255   GNUNET_HashCode *ext;
2256   char *bfdata;
2257   size_t msize;
2258   unsigned int k;
2259   int no_route;
2260   uint32_t bm;
2261
2262   pr->irc = NULL;
2263   if (peer == NULL)
2264     {
2265       /* error in communication with core, try again later */
2266       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2267         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2268                                                  get_processing_delay (),
2269                                                  &forward_request_task,
2270                                                  pr);
2271       return;
2272     }
2273   // (3) transmit, update ttl/priority
2274   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2275                                           &peer->hashPubKey);
2276   if (cp == NULL)
2277     {
2278       /* Peer must have just left */
2279 #if DEBUG_FS
2280       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2281                   "Selected peer disconnected!\n");
2282 #endif
2283       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2284         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2285                                                  get_processing_delay (),
2286                                                  &forward_request_task,
2287                                                  pr);
2288       return;
2289     }
2290   no_route = GNUNET_NO;
2291   if (amount == 0)
2292     {
2293       if (pr->cp == NULL)
2294         {
2295 #if DEBUG_FS > 1
2296           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2297                       "Failed to reserve bandwidth for reply (got %d/%u bytes only)!\n",
2298                       amount,
2299                       DBLOCK_SIZE);
2300 #endif
2301           GNUNET_STATISTICS_update (stats,
2302                                     gettext_noop ("# reply bandwidth reservation requests failed"),
2303                                     1,
2304                                     GNUNET_NO);
2305           if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2306             pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2307                                                      get_processing_delay (),
2308                                                      &forward_request_task,
2309                                                      pr);
2310           return;  /* this target round failed */
2311         }
2312       /* FIXME: if we are "quite" busy, we may still want to skip
2313          this round; need more load detection code! */
2314       no_route = GNUNET_YES;
2315     }
2316   
2317   GNUNET_STATISTICS_update (stats,
2318                             gettext_noop ("# queries scheduled for forwarding"),
2319                             1,
2320                             GNUNET_NO);
2321   /* build message and insert message into priority queue */
2322 #if DEBUG_FS
2323   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2324               "Forwarding request `%s' to `%4s'!\n",
2325               GNUNET_h2s (&pr->query),
2326               GNUNET_i2s (peer));
2327 #endif
2328   k = 0;
2329   bm = 0;
2330   if (GNUNET_YES == no_route)
2331     {
2332       bm |= GET_MESSAGE_BIT_RETURN_TO;
2333       k++;      
2334     }
2335   if (pr->namespace != NULL)
2336     {
2337       bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
2338       k++;
2339     }
2340   if (pr->target_pid != 0)
2341     {
2342       bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
2343       k++;
2344     }
2345   msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
2346   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
2347   pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
2348   pm->msize = msize;
2349   gm = (struct GetMessage*) &pm[1];
2350   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
2351   gm->header.size = htons (msize);
2352   gm->type = htonl (pr->type);
2353   pr->remaining_priority /= 2;
2354   gm->priority = htonl (pr->remaining_priority);
2355   gm->ttl = htonl (pr->ttl);
2356   gm->filter_mutator = htonl(pr->mingle); 
2357   gm->hash_bitmap = htonl (bm);
2358   gm->query = pr->query;
2359   ext = (GNUNET_HashCode*) &gm[1];
2360   k = 0;
2361   if (GNUNET_YES == no_route)
2362     GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2363   if (pr->namespace != NULL)
2364     memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
2365   if (pr->target_pid != 0)
2366     GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2367   bfdata = (char *) &ext[k];
2368   if (pr->bf != NULL)
2369     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
2370                                                bfdata,
2371                                                pr->bf_size);
2372   pm->cont = &transmit_query_continuation;
2373   pm->cont_cls = pr;
2374   add_to_pending_messages_for_peer (cp, pm, pr);
2375 }
2376
2377
2378 /**
2379  * Closure used for "target_peer_select_cb".
2380  */
2381 struct PeerSelectionContext 
2382 {
2383   /**
2384    * The request for which we are selecting
2385    * peers.
2386    */
2387   struct PendingRequest *pr;
2388
2389   /**
2390    * Current "prime" target.
2391    */
2392   struct GNUNET_PeerIdentity target;
2393
2394   /**
2395    * How much do we like this target?
2396    */
2397   double target_score;
2398
2399 };
2400
2401
2402 /**
2403  * Function called for each connected peer to determine
2404  * which one(s) would make good targets for forwarding.
2405  *
2406  * @param cls closure (struct PeerSelectionContext)
2407  * @param key current key code (peer identity)
2408  * @param value value in the hash map (struct ConnectedPeer)
2409  * @return GNUNET_YES if we should continue to
2410  *         iterate,
2411  *         GNUNET_NO if not.
2412  */
2413 static int
2414 target_peer_select_cb (void *cls,
2415                        const GNUNET_HashCode * key,
2416                        void *value)
2417 {
2418   struct PeerSelectionContext *psc = cls;
2419   struct ConnectedPeer *cp = value;
2420   struct PendingRequest *pr = psc->pr;
2421   double score;
2422   unsigned int i;
2423   unsigned int pc;
2424
2425   /* 1) check that this peer is not the initiator */
2426   if (cp == pr->cp)
2427     {
2428 #if DEBUG_FS
2429       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2430                   "Skipping initiator in forwarding selection\n");
2431 #endif
2432       return GNUNET_YES; /* skip */        
2433     }
2434
2435   /* 2) check if we have already (recently) forwarded to this peer */
2436   pc = 0;
2437   for (i=0;i<pr->used_pids_off;i++)
2438     if (pr->used_pids[i] == cp->pid) 
2439       {
2440         pc++;
2441         if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2442                                            RETRY_PROBABILITY_INV))
2443           {
2444 #if DEBUG_FS
2445             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2446                         "NOT re-trying query that was previously transmitted %u times\n",
2447                         (unsigned int) pr->used_pids_off);
2448 #endif
2449             return GNUNET_YES; /* skip */
2450           }
2451       }
2452 #if DEBUG_FS
2453   if (0 < pc)
2454     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2455                 "Re-trying query that was previously transmitted %u times to this peer\n",
2456                 (unsigned int) pc);
2457 #endif
2458   /* 3) calculate how much we'd like to forward to this peer,
2459      starting with a random value that is strong enough
2460      to at least give any peer a chance sometimes 
2461      (compared to the other factors that come later) */
2462   /* 3a) count successful (recent) routes from cp for same source */
2463   if (pr->cp != NULL)
2464     {
2465       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2466                                         P2P_SUCCESS_LIST_SIZE);
2467       for (i=0;i<P2P_SUCCESS_LIST_SIZE;i++)
2468         if (cp->last_p2p_replies[i] == pr->cp->pid)
2469           score += 1; /* likely successful based on hot path */
2470     }
2471   else
2472     {
2473       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2474                                         CS2P_SUCCESS_LIST_SIZE);
2475       for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
2476         if (cp->last_client_replies[i] == pr->client_request_list->client_list->client)
2477           score += 1; /* likely successful based on hot path */
2478     }
2479   /* 3b) include latency */
2480   if (cp->avg_delay.value < 4 * TTL_DECREMENT)
2481     score += 1; /* likely fast based on latency */
2482   /* 3c) include priorities */
2483   if (cp->avg_priority <= pr->remaining_priority / 2.0)
2484     score += 1; /* likely successful based on priorities */
2485   /* 3d) penalize for queue size */  
2486   score -= (2.0 * cp->pending_requests / (double) MAX_QUEUE_PER_PEER); 
2487   /* 3e) include peer proximity */
2488   score -= (2.0 * (GNUNET_CRYPTO_hash_distance_u32 (key,
2489                                                     &pr->query)) / (double) UINT32_MAX);
2490   /* 4) super-bonus for being the known target */
2491   if (pr->target_pid == cp->pid)
2492     score += 100.0;
2493   /* store best-fit in closure */
2494 #if DEBUG_FS
2495   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2496               "Peer `%s' gets score %f for forwarding query, max is %f\n",
2497               GNUNET_h2s (key),
2498               score,
2499               psc->target_score);
2500 #endif  
2501   score++; /* avoid zero */
2502   if (score > psc->target_score)
2503     {
2504       psc->target_score = score;
2505       psc->target.hashPubKey = *key; 
2506     }
2507   return GNUNET_YES;
2508 }
2509   
2510
2511 /**
2512  * The priority level imposes a bound on the maximum
2513  * value for the ttl that can be requested.
2514  *
2515  * @param ttl_in requested ttl
2516  * @param prio given priority
2517  * @return ttl_in if ttl_in is below the limit,
2518  *         otherwise the ttl-limit for the given priority
2519  */
2520 static int32_t
2521 bound_ttl (int32_t ttl_in, uint32_t prio)
2522 {
2523   unsigned long long allowed;
2524
2525   if (ttl_in <= 0)
2526     return ttl_in;
2527   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
2528   if (ttl_in > allowed)      
2529     {
2530       if (allowed >= (1 << 30))
2531         return 1 << 30;
2532       return allowed;
2533     }
2534   return ttl_in;
2535 }
2536
2537
2538 /**
2539  * Iterator called on each result obtained for a DHT
2540  * operation that expects a reply
2541  *
2542  * @param cls closure
2543  * @param exp when will this value expire
2544  * @param key key of the result
2545  * @param get_path NULL-terminated array of pointers
2546  *                 to the peers on reverse GET path (or NULL if not recorded)
2547  * @param put_path NULL-terminated array of pointers
2548  *                 to the peers on the PUT path (or NULL if not recorded)
2549  * @param type type of the result
2550  * @param size number of bytes in data
2551  * @param data pointer to the result data
2552  */
2553 static void
2554 process_dht_reply (void *cls,
2555                    struct GNUNET_TIME_Absolute exp,
2556                    const GNUNET_HashCode * key,
2557                    const struct GNUNET_PeerIdentity * const *get_path,
2558                    const struct GNUNET_PeerIdentity * const *put_path,
2559                    enum GNUNET_BLOCK_Type type,
2560                    size_t size,
2561                    const void *data);
2562
2563
2564 /**
2565  * We're processing a GET request and have decided
2566  * to forward it to other peers.  This function is called periodically
2567  * and should forward the request to other peers until we have all
2568  * possible replies.  If we have transmitted the *only* reply to
2569  * the initiator we should destroy the pending request.  If we have
2570  * many replies in the queue to the initiator, we should delay sending
2571  * out more queries until the reply queue has shrunk some.
2572  *
2573  * @param cls our "struct ProcessGetContext *"
2574  * @param tc unused
2575  */
2576 static void
2577 forward_request_task (void *cls,
2578                      const struct GNUNET_SCHEDULER_TaskContext *tc)
2579 {
2580   struct PendingRequest *pr = cls;
2581   struct PeerSelectionContext psc;
2582   struct ConnectedPeer *cp; 
2583   struct GNUNET_TIME_Relative delay;
2584
2585   pr->task = GNUNET_SCHEDULER_NO_TASK;
2586   if (pr->irc != NULL)
2587     {
2588 #if DEBUG_FS
2589       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2590                   "Forwarding of query `%s' not attempted due to pending local lookup!\n",
2591                   GNUNET_h2s (&pr->query));
2592 #endif
2593       return; /* already pending */
2594     }
2595   if (GNUNET_YES == pr->local_only)
2596     return; /* configured to not do P2P search */
2597   /* (0) try DHT */
2598   if ( (0 == pr->anonymity_level) &&
2599        (pr->type != GNUNET_BLOCK_TYPE_FS_DBLOCK) &&
2600        (pr->type != GNUNET_BLOCK_TYPE_FS_IBLOCK) )
2601     {
2602       pr->dht_get = GNUNET_DHT_get_start (dht_handle,
2603                                           GNUNET_TIME_UNIT_FOREVER_REL,
2604                                           pr->type,
2605                                           &pr->query,
2606                                           GNUNET_DHT_RO_NONE,
2607                                           pr->bf,
2608                                           pr->mingle,
2609                                           pr->namespace,
2610                                           (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
2611                                           &process_dht_reply,
2612                                           pr);
2613     }
2614   /* (1) select target */
2615   psc.pr = pr;
2616   psc.target_score = -DBL_MAX;
2617   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
2618                                          &target_peer_select_cb,
2619                                          &psc);  
2620   if (psc.target_score == -DBL_MAX)
2621     {
2622       delay = get_processing_delay ();
2623 #if DEBUG_FS 
2624       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2625                   "No peer selected for forwarding of query `%s', will try again in %llu ms!\n",
2626                   GNUNET_h2s (&pr->query),
2627                   delay.value);
2628 #endif
2629       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2630                                                delay,
2631                                                &forward_request_task,
2632                                                pr);
2633       return; /* nobody selected */
2634     }
2635   /* (3) update TTL/priority */
2636   if (pr->client_request_list != NULL)
2637     {
2638       /* FIXME: use better algorithm!? */
2639       if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2640                                          4))
2641         pr->priority++;
2642       /* bound priority we use by priorities we see from other peers
2643          rounded up (must round up so that we can see non-zero
2644          priorities, but round up as little as possible to make it
2645          plausible that we forwarded another peers request) */
2646       if (pr->priority > current_priorities + 1.0)
2647         pr->priority = (uint32_t) current_priorities + 1.0;
2648       pr->ttl = bound_ttl (pr->ttl + TTL_DECREMENT * 2,
2649                            pr->priority);
2650 #if DEBUG_FS
2651       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2652                   "Trying query `%s' with priority %u and TTL %d.\n",
2653                   GNUNET_h2s (&pr->query),
2654                   pr->priority,
2655                   pr->ttl);
2656 #endif
2657     }
2658
2659   /* (3) reserve reply bandwidth */
2660   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2661                                           &psc.target.hashPubKey);
2662   GNUNET_assert (NULL != cp);
2663   pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
2664                                                 &psc.target,
2665                                                 GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
2666                                                 GNUNET_BANDWIDTH_value_init (UINT32_MAX),
2667                                                 DBLOCK_SIZE * 2, 
2668                                                 cp->inc_preference,
2669                                                 &target_reservation_cb,
2670                                                 pr);
2671   cp->inc_preference = 0;
2672 }
2673
2674
2675 /* **************************** P2P PUT Handling ************************ */
2676
2677
2678 /**
2679  * Function called after we either failed or succeeded
2680  * at transmitting a reply to a peer.  
2681  *
2682  * @param cls the requests "struct PendingRequest*"
2683  * @param tpid ID of receiving peer, 0 on transmission error
2684  */
2685 static void
2686 transmit_reply_continuation (void *cls,
2687                              GNUNET_PEER_Id tpid)
2688 {
2689   struct PendingRequest *pr = cls;
2690   
2691   switch (pr->type)
2692     {
2693     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
2694     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
2695       /* only one reply expected, done with the request! */
2696       destroy_pending_request (pr);
2697       break;
2698     case GNUNET_BLOCK_TYPE_ANY:
2699     case GNUNET_BLOCK_TYPE_FS_KBLOCK:
2700     case GNUNET_BLOCK_TYPE_FS_SBLOCK:
2701       break;
2702     default:
2703       GNUNET_break (0);
2704       break;
2705     }
2706 }
2707
2708
2709 /**
2710  * Transmit the given message by copying it to the target buffer
2711  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
2712  * for writing in the meantime.  In that case, do nothing
2713  * (the disconnect or shutdown handler will take care of the rest).
2714  * If we were able to transmit messages and there are still more
2715  * pending, ask core again for further calls to this function.
2716  *
2717  * @param cls closure, pointer to the 'struct ClientList*'
2718  * @param size number of bytes available in buf
2719  * @param buf where the callee should write the message
2720  * @return number of bytes written to buf
2721  */
2722 static size_t
2723 transmit_to_client (void *cls,
2724                   size_t size, void *buf)
2725 {
2726   struct ClientList *cl = cls;
2727   char *cbuf = buf;
2728   struct ClientResponseMessage *creply;
2729   size_t msize;
2730   
2731   cl->th = NULL;
2732   if (NULL == buf)
2733     {
2734 #if DEBUG_FS
2735       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2736                   "Not sending reply, client communication problem.\n");
2737 #endif
2738       return 0;
2739     }
2740   msize = 0;
2741   while ( (NULL != (creply = cl->res_head) ) &&
2742           (creply->msize <= size) )
2743     {
2744       memcpy (&cbuf[msize], &creply[1], creply->msize);
2745       msize += creply->msize;
2746       size -= creply->msize;
2747       GNUNET_CONTAINER_DLL_remove (cl->res_head,
2748                                    cl->res_tail,
2749                                    creply);
2750       GNUNET_free (creply);
2751     }
2752   if (NULL != creply)
2753     cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
2754                                                   creply->msize,
2755                                                   GNUNET_TIME_UNIT_FOREVER_REL,
2756                                                   &transmit_to_client,
2757                                                   cl);
2758 #if DEBUG_FS
2759   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2760               "Transmitted %u bytes to client\n",
2761               (unsigned int) msize);
2762 #endif
2763   return msize;
2764 }
2765
2766
2767 /**
2768  * Closure for "process_reply" function.
2769  */
2770 struct ProcessReplyClosure
2771 {
2772   /**
2773    * The data for the reply.
2774    */
2775   const void *data;
2776
2777   /**
2778    * Who gave us this reply? NULL for local host (or DHT)
2779    */
2780   struct ConnectedPeer *sender;
2781
2782   /**
2783    * When the reply expires.
2784    */
2785   struct GNUNET_TIME_Absolute expiration;
2786
2787   /**
2788    * Size of data.
2789    */
2790   size_t size;
2791
2792   /**
2793    * Type of the block.
2794    */
2795   enum GNUNET_BLOCK_Type type;
2796
2797   /**
2798    * How much was this reply worth to us?
2799    */
2800   uint32_t priority;
2801
2802   /**
2803    * Evaluation result (returned).
2804    */
2805   enum GNUNET_BLOCK_EvaluationResult eval;
2806
2807   /**
2808    * Did we finish processing the associated request?
2809    */ 
2810   int finished;
2811
2812   /**
2813    * Did we find a matching request?
2814    */
2815   int request_found;
2816 };
2817
2818
2819 /**
2820  * We have received a reply; handle it!
2821  *
2822  * @param cls response (struct ProcessReplyClosure)
2823  * @param key our query
2824  * @param value value in the hash map (info about the query)
2825  * @return GNUNET_YES (we should continue to iterate)
2826  */
2827 static int
2828 process_reply (void *cls,
2829                const GNUNET_HashCode * key,
2830                void *value)
2831 {
2832   struct ProcessReplyClosure *prq = cls;
2833   struct PendingRequest *pr = value;
2834   struct PendingMessage *reply;
2835   struct ClientResponseMessage *creply;
2836   struct ClientList *cl;
2837   struct PutMessage *pm;
2838   struct ConnectedPeer *cp;
2839   struct GNUNET_TIME_Relative cur_delay;
2840   size_t msize;
2841
2842 #if DEBUG_FS
2843   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2844               "Matched result (type %u) for query `%s' with pending request\n",
2845               (unsigned int) prq->type,
2846               GNUNET_h2s (key));
2847 #endif  
2848   GNUNET_STATISTICS_update (stats,
2849                             gettext_noop ("# replies received and matched"),
2850                             1,
2851                             GNUNET_NO);
2852   if (prq->sender != NULL)
2853     {
2854       /* FIXME: should we be more precise here and not use
2855          "start_time" but a peer-specific time stamp? */
2856       cur_delay = GNUNET_TIME_absolute_get_duration (pr->start_time);
2857       prq->sender->avg_delay.value
2858         = (prq->sender->avg_delay.value * 
2859            (RUNAVG_DELAY_N - 1) + cur_delay.value) / RUNAVG_DELAY_N; 
2860       prq->sender->avg_priority
2861         = (prq->sender->avg_priority * 
2862            (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
2863       if (pr->cp != NULL)
2864         {
2865           GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
2866                                  [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], 
2867                                  -1);
2868           GNUNET_PEER_change_rc (pr->cp->pid, 1);
2869           prq->sender->last_p2p_replies
2870             [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
2871             = pr->cp->pid;
2872         }
2873       else
2874         {
2875           if (NULL != prq->sender->last_client_replies
2876               [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
2877             GNUNET_SERVER_client_drop (prq->sender->last_client_replies
2878                                        [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
2879           prq->sender->last_client_replies
2880             [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
2881             = pr->client_request_list->client_list->client;
2882           GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
2883         }
2884     }
2885   prq->eval = GNUNET_BLOCK_evaluate (block_ctx,
2886                                      prq->type,
2887                                      key,
2888                                      &pr->bf,
2889                                      pr->mingle,
2890                                      pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
2891                                      prq->data,
2892                                      prq->size);
2893   switch (prq->eval)
2894     {
2895     case GNUNET_BLOCK_EVALUATION_OK_MORE:
2896       break;
2897     case GNUNET_BLOCK_EVALUATION_OK_LAST:
2898       while (NULL != pr->pending_head)
2899         destroy_pending_message_list_entry (pr->pending_head);
2900       if (pr->qe != NULL)
2901         {
2902           if (pr->client_request_list != NULL)
2903             GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
2904                                         GNUNET_YES);
2905           GNUNET_DATASTORE_cancel (pr->qe);
2906           pr->qe = NULL;
2907         }
2908       pr->do_remove = GNUNET_YES;
2909       if (pr->task != GNUNET_SCHEDULER_NO_TASK)
2910         {
2911           GNUNET_SCHEDULER_cancel (sched,
2912                                    pr->task);
2913           pr->task = GNUNET_SCHEDULER_NO_TASK;
2914         }
2915       GNUNET_break (GNUNET_YES ==
2916                     GNUNET_CONTAINER_multihashmap_remove (query_request_map,
2917                                                           key,
2918                                                           pr));
2919       break;
2920     case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
2921       GNUNET_STATISTICS_update (stats,
2922                                 gettext_noop ("# duplicate replies discarded (bloomfilter)"),
2923                                 1,
2924                                 GNUNET_NO);
2925 #if DEBUG_FS
2926       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2927                   "Duplicate response `%s', discarding.\n",
2928                   GNUNET_h2s (&mhash));
2929 #endif
2930       return GNUNET_YES; /* duplicate */
2931     case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
2932       return GNUNET_YES; /* wrong namespace */  
2933     case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
2934       GNUNET_break (0);
2935       return GNUNET_YES;
2936     case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
2937       GNUNET_break (0);
2938       return GNUNET_YES;
2939     case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
2940       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2941                   _("Unsupported block type %u\n"),
2942                   prq->type);
2943       return GNUNET_NO;
2944     }
2945   if (pr->client_request_list != NULL)
2946     {
2947       if (pr->replies_seen_size == pr->replies_seen_off)
2948         GNUNET_array_grow (pr->replies_seen,
2949                            pr->replies_seen_size,
2950                            pr->replies_seen_size * 2 + 4);      
2951       GNUNET_CRYPTO_hash (prq->data,
2952                           prq->size,
2953                           &pr->replies_seen[pr->replies_seen_off++]);         
2954       refresh_bloomfilter (pr);
2955     }
2956   if (NULL == prq->sender)
2957     {
2958 #if DEBUG_FS
2959       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2960                   "Found result for query `%s' in local datastore\n",
2961                   GNUNET_h2s (key));
2962 #endif
2963       GNUNET_STATISTICS_update (stats,
2964                                 gettext_noop ("# results found locally"),
2965                                 1,
2966                                 GNUNET_NO);      
2967     }
2968   prq->priority += pr->remaining_priority;
2969   pr->remaining_priority = 0;
2970   pr->results_found++;
2971   prq->request_found = GNUNET_YES;
2972   if (NULL != pr->client_request_list)
2973     {
2974       GNUNET_STATISTICS_update (stats,
2975                                 gettext_noop ("# replies received for local clients"),
2976                                 1,
2977                                 GNUNET_NO);
2978       cl = pr->client_request_list->client_list;
2979       msize = sizeof (struct PutMessage) + prq->size;
2980       creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage));
2981       creply->msize = msize;
2982       creply->client_list = cl;
2983       GNUNET_CONTAINER_DLL_insert_after (cl->res_head,
2984                                          cl->res_tail,
2985                                          cl->res_tail,
2986                                          creply);      
2987       pm = (struct PutMessage*) &creply[1];
2988       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2989       pm->header.size = htons (msize);
2990       pm->type = htonl (prq->type);
2991       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
2992       memcpy (&pm[1], prq->data, prq->size);      
2993       if (NULL == cl->th)
2994         {
2995 #if DEBUG_FS
2996           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2997                       "Transmitting result for query `%s' to client\n",
2998                       GNUNET_h2s (key));
2999 #endif  
3000           cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
3001                                                         msize,
3002                                                         GNUNET_TIME_UNIT_FOREVER_REL,
3003                                                         &transmit_to_client,
3004                                                         cl);
3005         }
3006       GNUNET_break (cl->th != NULL);
3007       if (pr->do_remove)                
3008         {
3009           prq->finished = GNUNET_YES;
3010           destroy_pending_request (pr);         
3011         }
3012     }
3013   else
3014     {
3015       cp = pr->cp;
3016 #if DEBUG_FS
3017       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3018                   "Transmitting result for query `%s' to other peer (PID=%u)\n",
3019                   GNUNET_h2s (key),
3020                   (unsigned int) cp->pid);
3021 #endif  
3022       GNUNET_STATISTICS_update (stats,
3023                                 gettext_noop ("# replies received for other peers"),
3024                                 1,
3025                                 GNUNET_NO);
3026       msize = sizeof (struct PutMessage) + prq->size;
3027       reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
3028       reply->cont = &transmit_reply_continuation;
3029       reply->cont_cls = pr;
3030       reply->msize = msize;
3031       reply->priority = UINT32_MAX; /* send replies first! */
3032       pm = (struct PutMessage*) &reply[1];
3033       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3034       pm->header.size = htons (msize);
3035       pm->type = htonl (prq->type);
3036       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3037       memcpy (&pm[1], prq->data, prq->size);
3038       add_to_pending_messages_for_peer (cp, reply, pr);
3039     }
3040   return GNUNET_YES;
3041 }
3042
3043
3044 /**
3045  * Iterator called on each result obtained for a DHT
3046  * operation that expects a reply
3047  *
3048  * @param cls closure
3049  * @param exp when will this value expire
3050  * @param key key of the result
3051  * @param get_path NULL-terminated array of pointers
3052  *                 to the peers on reverse GET path (or NULL if not recorded)
3053  * @param put_path NULL-terminated array of pointers
3054  *                 to the peers on the PUT path (or NULL if not recorded)
3055  * @param type type of the result
3056  * @param size number of bytes in data
3057  * @param data pointer to the result data
3058  */
3059 static void
3060 process_dht_reply (void *cls,
3061                    struct GNUNET_TIME_Absolute exp,
3062                    const GNUNET_HashCode * key,
3063                    const struct GNUNET_PeerIdentity * const *get_path,
3064                    const struct GNUNET_PeerIdentity * const *put_path,
3065                    enum GNUNET_BLOCK_Type type,
3066                    size_t size,
3067                    const void *data)
3068 {
3069   struct PendingRequest *pr = cls;
3070   struct ProcessReplyClosure prq;
3071
3072   memset (&prq, 0, sizeof (prq));
3073   prq.data = data;
3074   prq.expiration = exp;
3075   prq.size = size;  
3076   prq.type = type;
3077   process_reply (&prq, key, pr);
3078 }
3079
3080
3081
3082 /**
3083  * Continuation called to notify client about result of the
3084  * operation.
3085  *
3086  * @param cls closure
3087  * @param success GNUNET_SYSERR on failure
3088  * @param msg NULL on success, otherwise an error message
3089  */
3090 static void 
3091 put_migration_continuation (void *cls,
3092                             int success,
3093                             const char *msg)
3094 {
3095   struct GNUNET_TIME_Absolute *start = cls;
3096   struct GNUNET_TIME_Relative delay;
3097   
3098   delay = GNUNET_TIME_absolute_get_duration (*start);
3099   GNUNET_free (start);
3100   GNUNET_LOAD_update (datastore_put_load,
3101                       delay.value);
3102   if (GNUNET_OK == success)
3103     return;
3104   GNUNET_STATISTICS_update (stats,
3105                             gettext_noop ("# datastore 'put' failures"),
3106                             1,
3107                             GNUNET_NO);
3108 }
3109
3110
3111 /**
3112  * Handle P2P "PUT" message.
3113  *
3114  * @param cls closure, always NULL
3115  * @param other the other peer involved (sender or receiver, NULL
3116  *        for loopback messages where we are both sender and receiver)
3117  * @param message the actual message
3118  * @param latency reported latency of the connection with 'other'
3119  * @param distance reported distance (DV) to 'other' 
3120  * @return GNUNET_OK to keep the connection open,
3121  *         GNUNET_SYSERR to close it (signal serious error)
3122  */
3123 static int
3124 handle_p2p_put (void *cls,
3125                 const struct GNUNET_PeerIdentity *other,
3126                 const struct GNUNET_MessageHeader *message,
3127                 struct GNUNET_TIME_Relative latency,
3128                 uint32_t distance)
3129 {
3130   const struct PutMessage *put;
3131   uint16_t msize;
3132   size_t dsize;
3133   enum GNUNET_BLOCK_Type type;
3134   struct GNUNET_TIME_Absolute expiration;
3135   GNUNET_HashCode query;
3136   struct ProcessReplyClosure prq;
3137   struct GNUNET_TIME_Absolute *start;
3138   struct GNUNET_TIME_Relative block_time;  
3139   double putl;
3140   struct ConnectedPeer *cp; 
3141   struct PendingMessage *pm;
3142   struct MigrationStopMessage *msm;
3143
3144   msize = ntohs (message->size);
3145   if (msize < sizeof (struct PutMessage))
3146     {
3147       GNUNET_break_op(0);
3148       return GNUNET_SYSERR;
3149     }
3150   put = (const struct PutMessage*) message;
3151   dsize = msize - sizeof (struct PutMessage);
3152   type = ntohl (put->type);
3153   expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
3154
3155   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3156     return GNUNET_SYSERR;
3157   if (GNUNET_OK !=
3158       GNUNET_BLOCK_get_key (block_ctx,
3159                             type,
3160                             &put[1],
3161                             dsize,
3162                             &query))
3163     {
3164       GNUNET_break_op (0);
3165       return GNUNET_SYSERR;
3166     }
3167 #if DEBUG_FS
3168   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3169               "Received result for query `%s' from peer `%4s'\n",
3170               GNUNET_h2s (&query),
3171               GNUNET_i2s (other));
3172 #endif
3173   GNUNET_STATISTICS_update (stats,
3174                             gettext_noop ("# replies received (overall)"),
3175                             1,
3176                             GNUNET_NO);
3177   /* now, lookup 'query' */
3178   prq.data = (const void*) &put[1];
3179   if (other != NULL)
3180     prq.sender = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3181                                                     &other->hashPubKey);
3182   else
3183     prq.sender = NULL;
3184   prq.size = dsize;
3185   prq.type = type;
3186   prq.expiration = expiration;
3187   prq.priority = 0;
3188   prq.finished = GNUNET_NO;
3189   prq.request_found = GNUNET_NO;
3190   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3191                                               &query,
3192                                               &process_reply,
3193                                               &prq);
3194   if (prq.sender != NULL)
3195     {
3196       prq.sender->inc_preference += CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority;
3197       prq.sender->trust += prq.priority;
3198     }
3199   if (GNUNET_YES == active_migration)
3200     {
3201 #if DEBUG_FS
3202       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3203                   "Replicating result for query `%s' with priority %u\n",
3204                   GNUNET_h2s (&query),
3205                   prq.priority);
3206 #endif
3207       start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
3208       *start = GNUNET_TIME_absolute_get ();
3209       GNUNET_DATASTORE_put (dsh,
3210                             0, &query, dsize, &put[1],
3211                             type, prq.priority, 1 /* anonymity */, 
3212                             expiration, 
3213                             1 + prq.priority, MAX_DATASTORE_QUEUE,
3214                             GNUNET_CONSTANTS_SERVICE_TIMEOUT,
3215                             &put_migration_continuation, 
3216                             start);
3217     }
3218   putl = GNUNET_LOAD_get_load (datastore_put_load);
3219   if ( (GNUNET_NO == prq.request_found) &&
3220        ( (GNUNET_YES != active_migration) ||
3221          (putl > 2.0) ) )
3222     {
3223       cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3224                                               &other->hashPubKey);
3225       if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).value < 5000)
3226         return GNUNET_OK; /* already blocked */
3227       /* We're too busy; send MigrationStop message! */
3228       if (GNUNET_YES != active_migration) 
3229         putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
3230       block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
3231                                                   5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3232                                                                                    (unsigned int) (60000 * putl * putl)));
3233       
3234       cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
3235       pm = GNUNET_malloc (sizeof (struct PendingMessage) + 
3236                           sizeof (struct MigrationStopMessage));
3237       pm->msize = sizeof (struct MigrationStopMessage);
3238       pm->priority = UINT32_MAX;
3239       msm = (struct MigrationStopMessage*) &pm[1];
3240       msm->header.size = htons (sizeof (struct MigrationStopMessage));
3241       msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
3242       msm->duration = GNUNET_TIME_relative_hton (block_time);
3243       add_to_pending_messages_for_peer (cp,
3244                                         pm,
3245                                         NULL);
3246     }
3247   return GNUNET_OK;
3248 }
3249
3250
3251 /**
3252  * Handle P2P "MIGRATION_STOP" message.
3253  *
3254  * @param cls closure, always NULL
3255  * @param other the other peer involved (sender or receiver, NULL
3256  *        for loopback messages where we are both sender and receiver)
3257  * @param message the actual message
3258  * @param latency reported latency of the connection with 'other'
3259  * @param distance reported distance (DV) to 'other' 
3260  * @return GNUNET_OK to keep the connection open,
3261  *         GNUNET_SYSERR to close it (signal serious error)
3262  */
3263 static int
3264 handle_p2p_migration_stop (void *cls,
3265                            const struct GNUNET_PeerIdentity *other,
3266                            const struct GNUNET_MessageHeader *message,
3267                            struct GNUNET_TIME_Relative latency,
3268                            uint32_t distance)
3269 {
3270   struct ConnectedPeer *cp; 
3271   const struct MigrationStopMessage *msm;
3272
3273   msm = (const struct MigrationStopMessage*) message;
3274   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3275                                           &other->hashPubKey);
3276   if (cp == NULL)
3277     {
3278       GNUNET_break (0);
3279       return GNUNET_OK;
3280     }
3281   cp->migration_blocked = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration));
3282   return GNUNET_OK;
3283 }
3284
3285
3286
3287 /* **************************** P2P GET Handling ************************ */
3288
3289
3290 /**
3291  * Closure for 'check_duplicate_request_{peer,client}'.
3292  */
3293 struct CheckDuplicateRequestClosure
3294 {
3295   /**
3296    * The new request we should check if it already exists.
3297    */
3298   const struct PendingRequest *pr;
3299
3300   /**
3301    * Existing request found by the checker, NULL if none.
3302    */
3303   struct PendingRequest *have;
3304 };
3305
3306
3307 /**
3308  * Iterator over entries in the 'query_request_map' that
3309  * tries to see if we have the same request pending from
3310  * the same client already.
3311  *
3312  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
3313  * @param key current key code (query, ignored, must match)
3314  * @param value value in the hash map (a 'struct PendingRequest' 
3315  *              that already exists)
3316  * @return GNUNET_YES if we should continue to
3317  *         iterate (no match yet)
3318  *         GNUNET_NO if not (match found).
3319  */
3320 static int
3321 check_duplicate_request_client (void *cls,
3322                                 const GNUNET_HashCode * key,
3323                                 void *value)
3324 {
3325   struct CheckDuplicateRequestClosure *cdc = cls;
3326   struct PendingRequest *have = value;
3327
3328   if (have->client_request_list == NULL)
3329     return GNUNET_YES;
3330   if ( (cdc->pr->client_request_list->client_list->client == have->client_request_list->client_list->client) &&
3331        (cdc->pr != have) )
3332     {
3333       cdc->have = have;
3334       return GNUNET_NO;
3335     }
3336   return GNUNET_YES;
3337 }
3338
3339
3340 /**
3341  * We're processing (local) results for a search request
3342  * from another peer.  Pass applicable results to the
3343  * peer and if we are done either clean up (operation
3344  * complete) or forward to other peers (more results possible).
3345  *
3346  * @param cls our closure (struct LocalGetContext)
3347  * @param key key for the content
3348  * @param size number of bytes in data
3349  * @param data content stored
3350  * @param type type of the content
3351  * @param priority priority of the content
3352  * @param anonymity anonymity-level for the content
3353  * @param expiration expiration time for the content
3354  * @param uid unique identifier for the datum;
3355  *        maybe 0 if no unique identifier is available
3356  */
3357 static void
3358 process_local_reply (void *cls,
3359                      const GNUNET_HashCode * key,
3360                      size_t size,
3361                      const void *data,
3362                      enum GNUNET_BLOCK_Type type,
3363                      uint32_t priority,
3364                      uint32_t anonymity,
3365                      struct GNUNET_TIME_Absolute
3366                      expiration, 
3367                      uint64_t uid)
3368 {
3369   struct PendingRequest *pr = cls;
3370   struct ProcessReplyClosure prq;
3371   struct CheckDuplicateRequestClosure cdrc;
3372   GNUNET_HashCode query;
3373   unsigned int old_rf;
3374   
3375   if (NULL == key)
3376     {
3377 #if DEBUG_FS > 1
3378       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3379                   "Done processing local replies, forwarding request to other peers.\n");
3380 #endif
3381       pr->qe = NULL;
3382       if (pr->client_request_list != NULL)
3383         {
3384           GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
3385                                       GNUNET_YES);
3386           /* Figure out if this is a duplicate request and possibly
3387              merge 'struct PendingRequest' entries */
3388           cdrc.have = NULL;
3389           cdrc.pr = pr;
3390           GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3391                                                       &pr->query,
3392                                                       &check_duplicate_request_client,
3393                                                       &cdrc);
3394           if (cdrc.have != NULL)
3395             {
3396 #if DEBUG_FS
3397               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3398                           "Received request for block `%s' twice from client, will only request once.\n",
3399                           GNUNET_h2s (&pr->query));
3400 #endif
3401               
3402               destroy_pending_request (pr);
3403               return;
3404             }
3405         }
3406
3407       /* no more results */
3408       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
3409         pr->task = GNUNET_SCHEDULER_add_now (sched,
3410                                              &forward_request_task,
3411                                              pr);      
3412       return;
3413     }
3414 #if DEBUG_FS
3415   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3416               "New local response to `%s' of type %u.\n",
3417               GNUNET_h2s (key),
3418               type);
3419 #endif
3420   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3421     {
3422 #if DEBUG_FS
3423       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3424                   "Found ONDEMAND block, performing on-demand encoding\n");
3425 #endif
3426       GNUNET_STATISTICS_update (stats,
3427                                 gettext_noop ("# on-demand blocks matched requests"),
3428                                 1,
3429                                 GNUNET_NO);
3430       if (GNUNET_OK != 
3431           GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, 
3432                                             anonymity, expiration, uid, 
3433                                             &process_local_reply,
3434                                             pr))
3435       if (pr->qe != NULL)
3436         {
3437           GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3438         }
3439       return;
3440     }
3441   old_rf = pr->results_found;
3442   memset (&prq, 0, sizeof (prq));
3443   prq.data = data;
3444   prq.expiration = expiration;
3445   prq.size = size;  
3446   if (GNUNET_OK != 
3447       GNUNET_BLOCK_get_key (block_ctx,
3448                             type,
3449                             data,
3450                             size,
3451                             &query))
3452     {
3453       GNUNET_break (0);
3454       GNUNET_DATASTORE_remove (dsh,
3455                                key,
3456                                size, data,
3457                                -1, -1, 
3458                                GNUNET_TIME_UNIT_FOREVER_REL,
3459                                NULL, NULL);
3460       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3461       return;
3462     }
3463   prq.type = type;
3464   prq.priority = priority;  
3465   prq.finished = GNUNET_NO;
3466   prq.request_found = GNUNET_NO;
3467   process_reply (&prq, key, pr);
3468   if ( (old_rf == 0) &&
3469        (pr->results_found == 1) )
3470     update_datastore_delays (pr->start_time);
3471   if (prq.finished == GNUNET_YES)
3472     return;
3473   if (pr->qe == NULL)
3474     return; /* done here */
3475   if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
3476     {
3477       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3478       return;
3479     }
3480   if ( (pr->client_request_list == NULL) &&
3481        ( (GNUNET_YES == test_load_too_high()) ||
3482          (pr->results_found > 5 + 2 * pr->priority) ) )
3483     {
3484 #if DEBUG_FS > 2
3485       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3486                   "Load too high, done with request\n");
3487 #endif
3488       GNUNET_STATISTICS_update (stats,
3489                                 gettext_noop ("# processing result set cut short due to load"),
3490                                 1,
3491                                 GNUNET_NO);
3492       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3493       return;
3494     }
3495   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3496 }
3497
3498
3499 /**
3500  * We've received a request with the specified priority.  Bound it
3501  * according to how much we trust the given peer.
3502  * 
3503  * @param prio_in requested priority
3504  * @param cp the peer making the request
3505  * @return effective priority
3506  */
3507 static uint32_t
3508 bound_priority (uint32_t prio_in,
3509                 struct ConnectedPeer *cp)
3510 {
3511 #define N ((double)128.0)
3512   uint32_t ret;
3513   double rret;
3514   int ld;
3515
3516   ld = test_load_too_high ();
3517   if (ld == GNUNET_SYSERR)
3518     return 0; /* excess resources */
3519   ret = change_host_trust (cp, prio_in);
3520   if (ret > 0)
3521     {
3522       if (ret > current_priorities + N)
3523         rret = current_priorities + N;
3524       else
3525         rret = ret;
3526       current_priorities 
3527         = (current_priorities * (N-1) + rret)/N;
3528     }
3529 #undef N
3530   return ret;
3531 }
3532
3533
3534 /**
3535  * Iterator over entries in the 'query_request_map' that
3536  * tries to see if we have the same request pending from
3537  * the same peer already.
3538  *
3539  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
3540  * @param key current key code (query, ignored, must match)
3541  * @param value value in the hash map (a 'struct PendingRequest' 
3542  *              that already exists)
3543  * @return GNUNET_YES if we should continue to
3544  *         iterate (no match yet)
3545  *         GNUNET_NO if not (match found).
3546  */
3547 static int
3548 check_duplicate_request_peer (void *cls,
3549                               const GNUNET_HashCode * key,
3550                               void *value)
3551 {
3552   struct CheckDuplicateRequestClosure *cdc = cls;
3553   struct PendingRequest *have = value;
3554
3555   if (cdc->pr->target_pid == have->target_pid)
3556     {
3557       cdc->have = have;
3558       return GNUNET_NO;
3559     }
3560   return GNUNET_YES;
3561 }
3562
3563
3564 /**
3565  * Handle P2P "GET" request.
3566  *
3567  * @param cls closure, always NULL
3568  * @param other the other peer involved (sender or receiver, NULL
3569  *        for loopback messages where we are both sender and receiver)
3570  * @param message the actual message
3571  * @param latency reported latency of the connection with 'other'
3572  * @param distance reported distance (DV) to 'other' 
3573  * @return GNUNET_OK to keep the connection open,
3574  *         GNUNET_SYSERR to close it (signal serious error)
3575  */
3576 static int
3577 handle_p2p_get (void *cls,
3578                 const struct GNUNET_PeerIdentity *other,
3579                 const struct GNUNET_MessageHeader *message,
3580                 struct GNUNET_TIME_Relative latency,
3581                 uint32_t distance)
3582 {
3583   struct PendingRequest *pr;
3584   struct ConnectedPeer *cp;
3585   struct ConnectedPeer *cps;
3586   struct CheckDuplicateRequestClosure cdc;
3587   struct GNUNET_TIME_Relative timeout;
3588   uint16_t msize;
3589   const struct GetMessage *gm;
3590   unsigned int bits;
3591   const GNUNET_HashCode *opt;
3592   uint32_t bm;
3593   size_t bfsize;
3594   uint32_t ttl_decrement;
3595   enum GNUNET_BLOCK_Type type;
3596   int have_ns;
3597   int ld;
3598
3599   msize = ntohs(message->size);
3600   if (msize < sizeof (struct GetMessage))
3601     {
3602       GNUNET_break_op (0);
3603       return GNUNET_SYSERR;
3604     }
3605   gm = (const struct GetMessage*) message;
3606   type = ntohl (gm->type);
3607   bm = ntohl (gm->hash_bitmap);
3608   bits = 0;
3609   while (bm > 0)
3610     {
3611       if (1 == (bm & 1))
3612         bits++;
3613       bm >>= 1;
3614     }
3615   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
3616     {
3617       GNUNET_break_op (0);
3618       return GNUNET_SYSERR;
3619     }  
3620   opt = (const GNUNET_HashCode*) &gm[1];
3621   bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
3622   bm = ntohl (gm->hash_bitmap);
3623   bits = 0;
3624   cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3625                                            &other->hashPubKey);
3626   if (NULL == cps)
3627     {
3628       /* peer must have just disconnected */
3629       GNUNET_STATISTICS_update (stats,
3630                                 gettext_noop ("# requests dropped due to initiator not being connected"),
3631                                 1,
3632                                 GNUNET_NO);
3633       return GNUNET_SYSERR;
3634     }
3635   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
3636     cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3637                                             &opt[bits++]);
3638   else
3639     cp = cps;
3640   if (cp == NULL)
3641     {
3642 #if DEBUG_FS
3643       if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
3644         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3645                     "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
3646                     GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
3647       
3648       else
3649         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3650                     "Failed to find peer `%4s' in connection set. Dropping query.\n",
3651                     GNUNET_i2s (other));
3652 #endif
3653       GNUNET_STATISTICS_update (stats,
3654                                 gettext_noop ("# requests dropped due to missing reverse route"),
3655                                 1,
3656                                 GNUNET_NO);
3657      /* FIXME: try connect? */
3658       return GNUNET_OK;
3659     }
3660   /* note that we can really only check load here since otherwise
3661      peers could find out that we are overloaded by not being
3662      disconnected after sending us a malformed query... */
3663
3664   /* FIXME: query priority should play
3665      a major role here! */
3666   ld = test_load_too_high ();
3667   if (GNUNET_YES == ld)
3668     {
3669 #if DEBUG_FS
3670       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3671                   "Dropping query from `%s', this peer is too busy.\n",
3672                   GNUNET_i2s (other));
3673 #endif
3674       GNUNET_STATISTICS_update (stats,
3675                                 gettext_noop ("# requests dropped due to high load"),
3676                                 1,
3677                                 GNUNET_NO);
3678       return GNUNET_OK;
3679     }
3680   /* FIXME: if ld == GNUNET_NO, forward
3681      instead of indirecting! */
3682
3683 #if DEBUG_FS 
3684   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3685               "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
3686               GNUNET_h2s (&gm->query),
3687               (unsigned int) type,
3688               GNUNET_i2s (other),
3689               (unsigned int) bm);
3690 #endif
3691   have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
3692   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
3693                       (have_ns ? sizeof(GNUNET_HashCode) : 0));
3694   if (have_ns)
3695     {
3696       pr->namespace = (GNUNET_HashCode*) &pr[1];
3697       memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
3698     }
3699   pr->type = type;
3700   pr->mingle = ntohl (gm->filter_mutator);
3701   if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
3702     pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
3703   pr->anonymity_level = 1;
3704   pr->priority = bound_priority (ntohl (gm->priority), cps);
3705   pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
3706   pr->query = gm->query;
3707   /* decrement ttl (always) */
3708   ttl_decrement = 2 * TTL_DECREMENT +
3709     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3710                               TTL_DECREMENT);
3711   if ( (pr->ttl < 0) &&
3712        (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
3713     {
3714 #if DEBUG_FS
3715       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3716                   "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
3717                   GNUNET_i2s (other),
3718                   pr->ttl,
3719                   ttl_decrement);
3720 #endif
3721       GNUNET_STATISTICS_update (stats,
3722                                 gettext_noop ("# requests dropped due TTL underflow"),
3723                                 1,
3724                                 GNUNET_NO);
3725       /* integer underflow => drop (should be very rare)! */      
3726       GNUNET_free (pr);
3727       return GNUNET_OK;
3728     } 
3729   pr->ttl -= ttl_decrement;
3730   pr->start_time = GNUNET_TIME_absolute_get ();
3731
3732   /* get bloom filter */
3733   if (bfsize > 0)
3734     {
3735       pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
3736                                                   bfsize,
3737                                                   BLOOMFILTER_K);
3738       pr->bf_size = bfsize;
3739     }
3740
3741   cdc.have = NULL;
3742   cdc.pr = pr;
3743   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3744                                               &gm->query,
3745                                               &check_duplicate_request_peer,
3746                                               &cdc);
3747   if (cdc.have != NULL)
3748     {
3749       if (cdc.have->start_time.value + cdc.have->ttl >=
3750           pr->start_time.value + pr->ttl)
3751         {
3752           /* existing request has higher TTL, drop new one! */
3753           cdc.have->priority += pr->priority;
3754           destroy_pending_request (pr);
3755 #if DEBUG_FS
3756           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3757                       "Have existing request with higher TTL, dropping new request.\n",
3758                       GNUNET_i2s (other));
3759 #endif
3760           GNUNET_STATISTICS_update (stats,
3761                                     gettext_noop ("# requests dropped due to higher-TTL request"),
3762                                     1,
3763                                     GNUNET_NO);
3764           return GNUNET_OK;
3765         }
3766       else
3767         {
3768           /* existing request has lower TTL, drop old one! */
3769           pr->priority += cdc.have->priority;
3770           /* Possible optimization: if we have applicable pending
3771              replies in 'cdc.have', we might want to move those over
3772              (this is a really rare special-case, so it is not clear
3773              that this would be worth it) */
3774           destroy_pending_request (cdc.have);
3775           /* keep processing 'pr'! */
3776         }
3777     }
3778
3779   pr->cp = cp;
3780   GNUNET_break (GNUNET_OK ==
3781                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
3782                                                    &gm->query,
3783                                                    pr,
3784                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
3785   GNUNET_break (GNUNET_OK ==
3786                 GNUNET_CONTAINER_multihashmap_put (peer_request_map,
3787                                                    &other->hashPubKey,
3788                                                    pr,
3789                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
3790   
3791   pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
3792                                             pr,
3793                                             pr->start_time.value + pr->ttl);
3794
3795   GNUNET_STATISTICS_update (stats,
3796                             gettext_noop ("# P2P searches received"),
3797                             1,
3798                             GNUNET_NO);
3799   GNUNET_STATISTICS_update (stats,
3800                             gettext_noop ("# P2P searches active"),
3801                             1,
3802                             GNUNET_NO);
3803
3804   /* calculate change in traffic preference */
3805   cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
3806   /* process locally */
3807   if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
3808     type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
3809   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
3810                                            (pr->priority + 1)); 
3811   pr->qe = GNUNET_DATASTORE_get (dsh,
3812                                  &gm->query,
3813                                  type,                         
3814                                  pr->priority + 1,
3815                                  MAX_DATASTORE_QUEUE,                            
3816                                  timeout,
3817                                  &process_local_reply,
3818                                  pr);
3819
3820   /* Are multiple results possible?  If so, start processing remotely now! */
3821   switch (pr->type)
3822     {
3823     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
3824     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
3825       /* only one result, wait for datastore */
3826       break;
3827     default:
3828       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
3829         pr->task = GNUNET_SCHEDULER_add_now (sched,
3830                                              &forward_request_task,
3831                                              pr);
3832     }
3833
3834   /* make sure we don't track too many requests */
3835   if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
3836     {
3837       pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
3838       GNUNET_assert (pr != NULL);
3839       destroy_pending_request (pr);
3840     }
3841   return GNUNET_OK;
3842 }
3843
3844
3845 /* **************************** CS GET Handling ************************ */
3846
3847
3848 /**
3849  * Handle START_SEARCH-message (search request from client).
3850  *
3851  * @param cls closure
3852  * @param client identification of the client
3853  * @param message the actual message
3854  */
3855 static void
3856 handle_start_search (void *cls,
3857                      struct GNUNET_SERVER_Client *client,
3858                      const struct GNUNET_MessageHeader *message)
3859 {
3860   static GNUNET_HashCode all_zeros;
3861   const struct SearchMessage *sm;
3862   struct ClientList *cl;
3863   struct ClientRequestList *crl;
3864   struct PendingRequest *pr;
3865   uint16_t msize;
3866   unsigned int sc;
3867   enum GNUNET_BLOCK_Type type;
3868
3869   msize = ntohs (message->size);
3870   if ( (msize < sizeof (struct SearchMessage)) ||
3871        (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
3872     {
3873       GNUNET_break (0);
3874       GNUNET_SERVER_receive_done (client,
3875                                   GNUNET_SYSERR);
3876       return;
3877     }
3878   GNUNET_STATISTICS_update (stats,
3879                             gettext_noop ("# client searches received"),
3880                             1,
3881                             GNUNET_NO);
3882   sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
3883   sm = (const struct SearchMessage*) message;
3884   type = ntohl (sm->type);
3885 #if DEBUG_FS
3886   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3887               "Received request for `%s' of type %u from local client\n",
3888               GNUNET_h2s (&sm->query),
3889               (unsigned int) type);
3890 #endif
3891   cl = client_list;
3892   while ( (cl != NULL) &&
3893           (cl->client != client) )
3894     cl = cl->next;
3895   if (cl == NULL)
3896     {
3897       cl = GNUNET_malloc (sizeof (struct ClientList));
3898       cl->client = client;
3899       GNUNET_SERVER_client_keep (client);
3900       cl->next = client_list;
3901       client_list = cl;
3902     }
3903   /* detect duplicate KBLOCK requests */
3904   if ( (type == GNUNET_BLOCK_TYPE_FS_KBLOCK) ||
3905        (type == GNUNET_BLOCK_TYPE_FS_NBLOCK) ||
3906        (type == GNUNET_BLOCK_TYPE_ANY) )
3907     {
3908       crl = cl->rl_head;
3909       while ( (crl != NULL) &&
3910               ( (0 != memcmp (&crl->req->query,
3911                               &sm->query,
3912                               sizeof (GNUNET_HashCode))) ||
3913                 (crl->req->type != type) ) )
3914         crl = crl->next;
3915       if (crl != NULL)  
3916         { 
3917 #if DEBUG_FS
3918           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3919                       "Have existing request, merging content-seen lists.\n");
3920 #endif
3921           pr = crl->req;
3922           /* Duplicate request (used to send long list of
3923              known/blocked results); merge 'pr->replies_seen'
3924              and update bloom filter */
3925           GNUNET_array_grow (pr->replies_seen,
3926                              pr->replies_seen_size,
3927                              pr->replies_seen_off + sc);
3928           memcpy (&pr->replies_seen[pr->replies_seen_off],
3929                   &sm[1],
3930                   sc * sizeof (GNUNET_HashCode));
3931           pr->replies_seen_off += sc;
3932           refresh_bloomfilter (pr);
3933           GNUNET_STATISTICS_update (stats,
3934                                     gettext_noop ("# client searches updated (merged content seen list)"),
3935                                     1,
3936                                     GNUNET_NO);
3937           GNUNET_SERVER_receive_done (client,
3938                                       GNUNET_OK);
3939           return;
3940         }
3941     }
3942   GNUNET_STATISTICS_update (stats,
3943                             gettext_noop ("# client searches active"),
3944                             1,
3945                             GNUNET_NO);
3946   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
3947                       ((type == GNUNET_BLOCK_TYPE_FS_SBLOCK) ? sizeof(GNUNET_HashCode) : 0));
3948   crl = GNUNET_malloc (sizeof (struct ClientRequestList));
3949   memset (crl, 0, sizeof (struct ClientRequestList));
3950   crl->client_list = cl;
3951   GNUNET_CONTAINER_DLL_insert (cl->rl_head,
3952                                cl->rl_tail,
3953                                crl);  
3954   crl->req = pr;
3955   pr->type = type;
3956   pr->client_request_list = crl;
3957   GNUNET_array_grow (pr->replies_seen,
3958                      pr->replies_seen_size,
3959                      sc);
3960   memcpy (pr->replies_seen,
3961           &sm[1],
3962           sc * sizeof (GNUNET_HashCode));
3963   pr->replies_seen_off = sc;
3964   pr->anonymity_level = ntohl (sm->anonymity_level); 
3965   pr->start_time = GNUNET_TIME_absolute_get ();
3966   refresh_bloomfilter (pr);
3967   pr->query = sm->query;
3968   if (0 == (1 & ntohl (sm->options)))
3969     pr->local_only = GNUNET_NO;
3970   else
3971     pr->local_only = GNUNET_YES;
3972   switch (type)
3973     {
3974     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
3975     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
3976       if (0 != memcmp (&sm->target,
3977                        &all_zeros,
3978                        sizeof (GNUNET_HashCode)))
3979         pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target);
3980       break;
3981     case GNUNET_BLOCK_TYPE_FS_SBLOCK:
3982       pr->namespace = (GNUNET_HashCode*) &pr[1];
3983       memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode));
3984       break;
3985     default:
3986       break;
3987     }
3988   GNUNET_break (GNUNET_OK ==
3989                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
3990                                                    &sm->query,
3991                                                    pr,
3992                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
3993   if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
3994     type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */
3995   pr->qe = GNUNET_DATASTORE_get (dsh,
3996                                  &sm->query,
3997                                  type,
3998                                  -3, -1,
3999                                  GNUNET_CONSTANTS_SERVICE_TIMEOUT,                             
4000                                  &process_local_reply,
4001                                  pr);
4002 }
4003
4004
4005 /* **************************** Startup ************************ */
4006
4007 /**
4008  * Process fs requests.
4009  *
4010  * @param s scheduler to use
4011  * @param server the initialized server
4012  * @param c configuration to use
4013  */
4014 static int
4015 main_init (struct GNUNET_SCHEDULER_Handle *s,
4016            struct GNUNET_SERVER_Handle *server,
4017            const struct GNUNET_CONFIGURATION_Handle *c)
4018 {
4019   static const struct GNUNET_CORE_MessageHandler p2p_handlers[] =
4020     {
4021       { &handle_p2p_get, 
4022         GNUNET_MESSAGE_TYPE_FS_GET, 0 },
4023       { &handle_p2p_put, 
4024         GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
4025       { &handle_p2p_migration_stop, 
4026         GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP,
4027         sizeof (struct MigrationStopMessage) },
4028       { NULL, 0, 0 }
4029     };
4030   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
4031     {&GNUNET_FS_handle_index_start, NULL, 
4032      GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
4033     {&GNUNET_FS_handle_index_list_get, NULL, 
4034      GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
4035     {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
4036      sizeof (struct UnindexMessage) },
4037     {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
4038      0 },
4039     {NULL, NULL, 0, 0}
4040   };
4041   unsigned long long enc = 128;
4042
4043   sched = s;
4044   cfg = c;
4045   stats = GNUNET_STATISTICS_create (sched, "fs", cfg);
4046   min_migration_delay = GNUNET_TIME_UNIT_SECONDS;
4047   if ( (GNUNET_OK !=
4048         GNUNET_CONFIGURATION_get_value_number (cfg,
4049                                                "fs",
4050                                                "MAX_PENDING_REQUESTS",
4051                                                &max_pending_requests)) ||
4052        (GNUNET_OK !=
4053         GNUNET_CONFIGURATION_get_value_number (cfg,
4054                                                "fs",
4055                                                "EXPECTED_NEIGHBOUR_COUNT",
4056                                                &enc)) ||
4057        (GNUNET_OK != 
4058         GNUNET_CONFIGURATION_get_value_time (cfg,
4059                                              "fs",
4060                                              "MIN_MIGRATION_DELAY",
4061                                              &min_migration_delay)) )
4062     {
4063       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4064                   _("Configuration fails to specify certain parameters, assuming default values."));
4065     }
4066   connected_peers = GNUNET_CONTAINER_multihashmap_create (enc); 
4067   query_request_map = GNUNET_CONTAINER_multihashmap_create (max_pending_requests);
4068   peer_request_map = GNUNET_CONTAINER_multihashmap_create (enc);
4069   requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
4070   core = GNUNET_CORE_connect (sched,
4071                               cfg,
4072                               GNUNET_TIME_UNIT_FOREVER_REL,
4073                               NULL,
4074                               NULL,
4075                               &peer_connect_handler,
4076                               &peer_disconnect_handler,
4077                               NULL,
4078                               NULL, GNUNET_NO,
4079                               NULL, GNUNET_NO,
4080                               p2p_handlers);
4081   if (NULL == core)
4082     {
4083       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
4084                   _("Failed to connect to `%s' service.\n"),
4085                   "core");
4086       GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
4087       connected_peers = NULL;
4088       GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
4089       query_request_map = NULL;
4090       GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
4091       requests_by_expiration_heap = NULL;
4092       GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
4093       peer_request_map = NULL;
4094       if (dsh != NULL)
4095         {
4096           GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4097           dsh = NULL;
4098         }
4099       return GNUNET_SYSERR;
4100     }
4101   /* FIXME: distinguish between sending and storing in options? */
4102   if (active_migration) 
4103     {
4104       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4105                   _("Content migration is enabled, will start to gather data\n"));
4106       consider_migration_gathering ();
4107     }
4108   consider_dht_put_gathering (NULL);
4109   GNUNET_SERVER_disconnect_notify (server, 
4110                                    &handle_client_disconnect,
4111                                    NULL);
4112   GNUNET_assert (GNUNET_OK ==
4113                  GNUNET_CONFIGURATION_get_value_filename (cfg,
4114                                                           "fs",
4115                                                           "TRUST",
4116                                                           &trustDirectory));
4117   GNUNET_DISK_directory_create (trustDirectory);
4118   GNUNET_SCHEDULER_add_with_priority (sched,
4119                                       GNUNET_SCHEDULER_PRIORITY_HIGH,
4120                                       &cron_flush_trust, NULL);
4121
4122
4123   GNUNET_SERVER_add_handlers (server, handlers);
4124   GNUNET_SCHEDULER_add_delayed (sched,
4125                                 GNUNET_TIME_UNIT_FOREVER_REL,
4126                                 &shutdown_task,
4127                                 NULL);
4128   return GNUNET_OK;
4129 }
4130
4131
4132 /**
4133  * Process fs requests.
4134  *
4135  * @param cls closure
4136  * @param sched scheduler to use
4137  * @param server the initialized server
4138  * @param cfg configuration to use
4139  */
4140 static void
4141 run (void *cls,
4142      struct GNUNET_SCHEDULER_Handle *sched,
4143      struct GNUNET_SERVER_Handle *server,
4144      const struct GNUNET_CONFIGURATION_Handle *cfg)
4145 {
4146   active_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
4147                                                            "FS",
4148                                                            "ACTIVEMIGRATION");
4149   dsh = GNUNET_DATASTORE_connect (cfg,
4150                                   sched);
4151   if (dsh == NULL)
4152     {
4153       GNUNET_SCHEDULER_shutdown (sched);
4154       return;
4155     }
4156   datastore_get_load = GNUNET_LOAD_value_init ();
4157   datastore_put_load = GNUNET_LOAD_value_init ();
4158   block_cfg = GNUNET_CONFIGURATION_create ();
4159   GNUNET_CONFIGURATION_set_value_string (block_cfg,
4160                                          "block",
4161                                          "PLUGINS",
4162                                          "fs");
4163   block_ctx = GNUNET_BLOCK_context_create (block_cfg);
4164   GNUNET_assert (NULL != block_ctx);
4165   dht_handle = GNUNET_DHT_connect (sched,
4166                                    cfg,
4167                                    FS_DHT_HT_SIZE);
4168   if ( (GNUNET_OK != GNUNET_FS_indexing_init (sched, cfg, dsh)) ||
4169        (GNUNET_OK != main_init (sched, server, cfg)) )
4170     {    
4171       GNUNET_SCHEDULER_shutdown (sched);
4172       GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4173       dsh = NULL;
4174       GNUNET_DHT_disconnect (dht_handle);
4175       dht_handle = NULL;
4176       GNUNET_BLOCK_context_destroy (block_ctx);
4177       block_ctx = NULL;
4178       GNUNET_CONFIGURATION_destroy (block_cfg);
4179       block_cfg = NULL;
4180       GNUNET_LOAD_value_free (datastore_get_load);
4181       datastore_get_load = NULL;
4182       GNUNET_LOAD_value_free (datastore_put_load);
4183       datastore_put_load = NULL;
4184       return;   
4185     }
4186 }
4187
4188
4189 /**
4190  * The main function for the fs service.
4191  *
4192  * @param argc number of arguments from the command line
4193  * @param argv command line arguments
4194  * @return 0 ok, 1 on error
4195  */
4196 int
4197 main (int argc, char *const *argv)
4198 {
4199   return (GNUNET_OK ==
4200           GNUNET_SERVICE_run (argc,
4201                               argv,
4202                               "fs",
4203                               GNUNET_SERVICE_OPTION_NONE,
4204                               &run, NULL)) ? 0 : 1;
4205 }
4206
4207 /* end of gnunet-service-fs.c */