c2baf1adafa50c338056ad5fbc11ddf2dd8c623f
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs.c
23  * @brief gnunet anonymity protocol implementation
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - introduce random latency in processing
28  * - more statistics
29  */
30 #include "platform.h"
31 #include <float.h>
32 #include "gnunet_constants.h"
33 #include "gnunet_core_service.h"
34 #include "gnunet_dht_service.h"
35 #include "gnunet_datastore_service.h"
36 #include "gnunet_load_lib.h"
37 #include "gnunet_peer_lib.h"
38 #include "gnunet_protocols.h"
39 #include "gnunet_signatures.h"
40 #include "gnunet_statistics_service.h"
41 #include "gnunet_util_lib.h"
42 #include "gnunet-service-fs_indexing.h"
43 #include "fs.h"
44
45 #define DEBUG_FS GNUNET_NO
46
47 /**
48  * Maximum number of outgoing messages we queue per peer.
49  */
50 #define MAX_QUEUE_PER_PEER 16
51
52 /**
53  * Size for the hash map for DHT requests from the FS
54  * service.  Should be about the number of concurrent
55  * DHT requests we plan to make.
56  */
57 #define FS_DHT_HT_SIZE 1024
58
59 /**
60  * How often do we flush trust values to disk?
61  */
62 #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
63
64 /**
65  * How often do we at most PUT content into the DHT?
66  */
67 #define MAX_DHT_PUT_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
68
69 /**
70  * Inverse of the probability that we will submit the same query
71  * to the same peer again.  If the same peer already got the query
72  * repeatedly recently, the probability is multiplied by the inverse
73  * of this number each time.  Note that we only try about every TTL_DECREMENT/2
74  * plus MAX_CORK_DELAY (so roughly every 3.5s).
75  */
76 #define RETRY_PROBABILITY_INV 3
77
78 /**
79  * What is the maximum delay for a P2P FS message (in our interaction
80  * with core)?  FS-internal delays are another story.  The value is
81  * chosen based on the 32k block size.  Given that peers typcially
82  * have at least 1 kb/s bandwidth, 45s waits give us a chance to
83  * transmit one message even to the lowest-bandwidth peers.
84  */
85 #define MAX_TRANSMIT_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 45)
86
87 /**
88  * Maximum number of requests (from other peers) that we're
89  * willing to have pending at any given point in time.
90  */
91 static unsigned long long max_pending_requests = (32 * 1024);
92
93
94 /**
95  * Information we keep for each pending reply.  The
96  * actual message follows at the end of this struct.
97  */
98 struct PendingMessage;
99
100 /**
101  * Function called upon completion of a transmission.
102  *
103  * @param cls closure
104  * @param pid ID of receiving peer, 0 on transmission error
105  */
106 typedef void (*TransmissionContinuation)(void * cls, 
107                                          GNUNET_PEER_Id tpid);
108
109
110 /**
111  * Information we keep for each pending message (GET/PUT).  The
112  * actual message follows at the end of this struct.
113  */
114 struct PendingMessage
115 {
116   /**
117    * This is a doubly-linked list of messages to the same peer.
118    */
119   struct PendingMessage *next;
120
121   /**
122    * This is a doubly-linked list of messages to the same peer.
123    */
124   struct PendingMessage *prev;
125
126   /**
127    * Entry in pending message list for this pending message.
128    */ 
129   struct PendingMessageList *pml;  
130
131   /**
132    * Function to call immediately once we have transmitted this
133    * message.
134    */
135   TransmissionContinuation cont;
136
137   /**
138    * Closure for cont.
139    */
140   void *cont_cls;
141
142   /**
143    * Size of the reply; actual reply message follows
144    * at the end of this struct.
145    */
146   size_t msize;
147   
148   /**
149    * How important is this message for us?
150    */
151   uint32_t priority;
152  
153 };
154
155
156 /**
157  * Information about a peer that we are connected to.
158  * We track data that is useful for determining which
159  * peers should receive our requests.  We also keep
160  * a list of messages to transmit to this peer.
161  */
162 struct ConnectedPeer
163 {
164
165   /**
166    * List of the last clients for which this peer successfully
167    * answered a query.
168    */
169   struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
170
171   /**
172    * List of the last PIDs for which
173    * this peer successfully answered a query;
174    * We use 0 to indicate no successful reply.
175    */
176   GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
177
178   /**
179    * Average delay between sending the peer a request and
180    * getting a reply (only calculated over the requests for
181    * which we actually got a reply).   Calculated
182    * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
183    */ 
184   struct GNUNET_TIME_Relative avg_delay;
185
186   /**
187    * Point in time until which this peer does not want us to migrate content
188    * to it.
189    */
190   struct GNUNET_TIME_Absolute migration_blocked;
191
192   /**
193    * Time until when we blocked this peer from migrating
194    * data to us.
195    */
196   struct GNUNET_TIME_Absolute last_migration_block;
197
198   /**
199    * Handle for an active request for transmission to this
200    * peer, or NULL.
201    */
202   struct GNUNET_CORE_TransmitHandle *cth;
203
204   /**
205    * Messages (replies, queries, content migration) we would like to
206    * send to this peer in the near future.  Sorted by priority, head.
207    */
208   struct PendingMessage *pending_messages_head;
209
210   /**
211    * Messages (replies, queries, content migration) we would like to
212    * send to this peer in the near future.  Sorted by priority, tail.
213    */
214   struct PendingMessage *pending_messages_tail;
215
216   /**
217    * How long does it typically take for us to transmit a message
218    * to this peer?  (delay between the request being issued and
219    * the callback being invoked).
220    */
221   struct GNUNET_LOAD_Value *transmission_delay;
222
223   /**
224    * Time when the last transmission request was issued.
225    */
226   struct GNUNET_TIME_Absolute last_transmission_request_start;
227
228   /**
229    * Average priority of successful replies.  Calculated
230    * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
231    */
232   double avg_priority;
233
234   /**
235    * Increase in traffic preference still to be submitted
236    * to the core service for this peer.
237    */
238   uint64_t inc_preference;
239
240   /**
241    * Trust rating for this peer
242    */
243   uint32_t trust;
244
245   /**
246    * Trust rating for this peer on disk.
247    */
248   uint32_t disk_trust;
249
250   /**
251    * The peer's identity.
252    */
253   GNUNET_PEER_Id pid;  
254
255   /**
256    * Size of the linked list of 'pending_messages'.
257    */
258   unsigned int pending_requests;
259
260   /**
261    * Which offset in "last_p2p_replies" will be updated next?
262    * (we go round-robin).
263    */
264   unsigned int last_p2p_replies_woff;
265
266   /**
267    * Which offset in "last_client_replies" will be updated next?
268    * (we go round-robin).
269    */
270   unsigned int last_client_replies_woff;
271
272 };
273
274
275 /**
276  * Information we keep for each pending request.  We should try to
277  * keep this struct as small as possible since its memory consumption
278  * is key to how many requests we can have pending at once.
279  */
280 struct PendingRequest;
281
282
283 /**
284  * Doubly-linked list of requests we are performing
285  * on behalf of the same client.
286  */
287 struct ClientRequestList
288 {
289
290   /**
291    * This is a doubly-linked list.
292    */
293   struct ClientRequestList *next;
294
295   /**
296    * This is a doubly-linked list.
297    */
298   struct ClientRequestList *prev;
299
300   /**
301    * Request this entry represents.
302    */
303   struct PendingRequest *req;
304
305   /**
306    * Client list this request belongs to.
307    */
308   struct ClientList *client_list;
309
310 };
311
312
313 /**
314  * Replies to be transmitted to the client.  The actual
315  * response message is allocated after this struct.
316  */
317 struct ClientResponseMessage
318 {
319   /**
320    * This is a doubly-linked list.
321    */
322   struct ClientResponseMessage *next;
323
324   /**
325    * This is a doubly-linked list.
326    */
327   struct ClientResponseMessage *prev;
328
329   /**
330    * Client list entry this response belongs to.
331    */
332   struct ClientList *client_list;
333
334   /**
335    * Number of bytes in the response.
336    */
337   size_t msize;
338 };
339
340
341 /**
342  * Linked list of clients we are performing requests
343  * for right now.
344  */
345 struct ClientList
346 {
347   /**
348    * This is a linked list.
349    */
350   struct ClientList *next;
351
352   /**
353    * ID of a client making a request, NULL if this entry is for a
354    * peer.
355    */
356   struct GNUNET_SERVER_Client *client;
357
358   /**
359    * Head of list of requests performed on behalf
360    * of this client right now.
361    */
362   struct ClientRequestList *rl_head;
363
364   /**
365    * Tail of list of requests performed on behalf
366    * of this client right now.
367    */
368   struct ClientRequestList *rl_tail;
369
370   /**
371    * Head of linked list of responses.
372    */
373   struct ClientResponseMessage *res_head;
374
375   /**
376    * Tail of linked list of responses.
377    */
378   struct ClientResponseMessage *res_tail;
379
380   /**
381    * Context for sending replies.
382    */
383   struct GNUNET_CONNECTION_TransmitHandle *th;
384
385 };
386
387
388 /**
389  * Doubly-linked list of messages we are performing
390  * due to a pending request.
391  */
392 struct PendingMessageList
393 {
394
395   /**
396    * This is a doubly-linked list of messages on behalf of the same request.
397    */
398   struct PendingMessageList *next;
399
400   /**
401    * This is a doubly-linked list of messages on behalf of the same request.
402    */
403   struct PendingMessageList *prev;
404
405   /**
406    * Message this entry represents.
407    */
408   struct PendingMessage *pm;
409
410   /**
411    * Request this entry belongs to.
412    */
413   struct PendingRequest *req;
414
415   /**
416    * Peer this message is targeted for.
417    */
418   struct ConnectedPeer *target;
419
420 };
421
422
423 /**
424  * Information we keep for each pending request.  We should try to
425  * keep this struct as small as possible since its memory consumption
426  * is key to how many requests we can have pending at once.
427  */
428 struct PendingRequest
429 {
430
431   /**
432    * If this request was made by a client, this is our entry in the
433    * client request list; otherwise NULL.
434    */
435   struct ClientRequestList *client_request_list;
436
437   /**
438    * Entry of peer responsible for this entry (if this request
439    * was made by a peer).
440    */
441   struct ConnectedPeer *cp;
442
443   /**
444    * If this is a namespace query, pointer to the hash of the public
445    * key of the namespace; otherwise NULL.  Pointer will be to the 
446    * end of this struct (so no need to free it).
447    */
448   const GNUNET_HashCode *namespace;
449
450   /**
451    * Bloomfilter we use to filter out replies that we don't care about
452    * (anymore).  NULL as long as we are interested in all replies.
453    */
454   struct GNUNET_CONTAINER_BloomFilter *bf;
455
456   /**
457    * Context of our GNUNET_CORE_peer_change_preference call.
458    */
459   struct GNUNET_CORE_InformationRequestContext *irc;
460
461   /**
462    * Reference to DHT get operation for this request (or NULL).
463    */
464   struct GNUNET_DHT_GetHandle *dht_get;
465
466   /**
467    * Hash code of all replies that we have seen so far (only valid
468    * if client is not NULL since we only track replies like this for
469    * our own clients).
470    */
471   GNUNET_HashCode *replies_seen;
472
473   /**
474    * Node in the heap representing this entry; NULL
475    * if we have no heap node.
476    */
477   struct GNUNET_CONTAINER_HeapNode *hnode;
478
479   /**
480    * Head of list of messages being performed on behalf of this
481    * request.
482    */
483   struct PendingMessageList *pending_head;
484
485   /**
486    * Tail of list of messages being performed on behalf of this
487    * request.
488    */
489   struct PendingMessageList *pending_tail;
490
491   /**
492    * When did we first see this request (form this peer), or, if our
493    * client is initiating, when did we last initiate a search?
494    */
495   struct GNUNET_TIME_Absolute start_time;
496
497   /**
498    * The query that this request is for.
499    */
500   GNUNET_HashCode query;
501
502   /**
503    * The task responsible for transmitting queries
504    * for this request.
505    */
506   GNUNET_SCHEDULER_TaskIdentifier task;
507
508   /**
509    * (Interned) Peer identifier that identifies a preferred target
510    * for requests.
511    */
512   GNUNET_PEER_Id target_pid;
513
514   /**
515    * (Interned) Peer identifiers of peers that have already
516    * received our query for this content.
517    */
518   GNUNET_PEER_Id *used_pids;
519   
520   /**
521    * Our entry in the queue (non-NULL while we wait for our
522    * turn to interact with the local database).
523    */
524   struct GNUNET_DATASTORE_QueueEntry *qe;
525
526   /**
527    * Size of the 'bf' (in bytes).
528    */
529   size_t bf_size;
530
531   /**
532    * Desired anonymity level; only valid for requests from a local client.
533    */
534   uint32_t anonymity_level;
535
536   /**
537    * How many entries in "used_pids" are actually valid?
538    */
539   unsigned int used_pids_off;
540
541   /**
542    * How long is the "used_pids" array?
543    */
544   unsigned int used_pids_size;
545
546   /**
547    * Number of results found for this request.
548    */
549   unsigned int results_found;
550
551   /**
552    * How many entries in "replies_seen" are actually valid?
553    */
554   unsigned int replies_seen_off;
555
556   /**
557    * How long is the "replies_seen" array?
558    */
559   unsigned int replies_seen_size;
560   
561   /**
562    * Priority with which this request was made.  If one of our clients
563    * made the request, then this is the current priority that we are
564    * using when initiating the request.  This value is used when
565    * we decide to reward other peers with trust for providing a reply.
566    */
567   uint32_t priority;
568
569   /**
570    * Priority points left for us to spend when forwarding this request
571    * to other peers.
572    */
573   uint32_t remaining_priority;
574
575   /**
576    * Number to mingle hashes for bloom-filter tests with.
577    */
578   int32_t mingle;
579
580   /**
581    * TTL with which we saw this request (or, if we initiated, TTL that
582    * we used for the request).
583    */
584   int32_t ttl;
585   
586   /**
587    * Type of the content that this request is for.
588    */
589   enum GNUNET_BLOCK_Type type;
590
591   /**
592    * Remove this request after transmission of the current response.
593    */
594   int8_t do_remove;
595
596   /**
597    * GNUNET_YES if we should not forward this request to other peers.
598    */
599   int8_t local_only;
600
601   /**
602    * GNUNET_YES if we should not forward this request to other peers.
603    */
604   int8_t forward_only;
605
606 };
607
608
609 /**
610  * Block that is ready for migration to other peers.  Actual data is at the end of the block.
611  */
612 struct MigrationReadyBlock
613 {
614
615   /**
616    * This is a doubly-linked list.
617    */
618   struct MigrationReadyBlock *next;
619
620   /**
621    * This is a doubly-linked list.
622    */
623   struct MigrationReadyBlock *prev;
624
625   /**
626    * Query for the block.
627    */
628   GNUNET_HashCode query;
629
630   /**
631    * When does this block expire? 
632    */
633   struct GNUNET_TIME_Absolute expiration;
634
635   /**
636    * Peers we would consider forwarding this
637    * block to.  Zero for empty entries.
638    */
639   GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
640
641   /**
642    * Size of the block.
643    */
644   size_t size;
645
646   /**
647    *  Number of targets already used.
648    */
649   unsigned int used_targets;
650
651   /**
652    * Type of the block.
653    */
654   enum GNUNET_BLOCK_Type type;
655 };
656
657
658 /**
659  * Our connection to the datastore.
660  */
661 static struct GNUNET_DATASTORE_Handle *dsh;
662
663 /**
664  * Our block context.
665  */
666 static struct GNUNET_BLOCK_Context *block_ctx;
667
668 /**
669  * Our block configuration.
670  */
671 static struct GNUNET_CONFIGURATION_Handle *block_cfg;
672
673 /**
674  * Our scheduler.
675  */
676 static struct GNUNET_SCHEDULER_Handle *sched;
677
678 /**
679  * Our configuration.
680  */
681 static const struct GNUNET_CONFIGURATION_Handle *cfg;
682
683 /**
684  * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
685  */
686 static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
687
688 /**
689  * Map of peer identifiers to "struct PendingRequest" (for that peer).
690  */
691 static struct GNUNET_CONTAINER_MultiHashMap *peer_request_map;
692
693 /**
694  * Map of query identifiers to "struct PendingRequest" (for that query).
695  */
696 static struct GNUNET_CONTAINER_MultiHashMap *query_request_map;
697
698 /**
699  * Heap with the request that will expire next at the top.  Contains
700  * pointers of type "struct PendingRequest*"; these will *also* be
701  * aliased from the "requests_by_peer" data structures and the
702  * "requests_by_query" table.  Note that requests from our clients
703  * don't expire and are thus NOT in the "requests_by_expiration"
704  * (or the "requests_by_peer" tables).
705  */
706 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
707
708 /**
709  * Handle for reporting statistics.
710  */
711 static struct GNUNET_STATISTICS_Handle *stats;
712
713 /**
714  * Linked list of clients we are currently processing requests for.
715  */
716 static struct ClientList *client_list;
717
718 /**
719  * Pointer to handle to the core service (points to NULL until we've
720  * connected to it).
721  */
722 static struct GNUNET_CORE_Handle *core;
723
724 /**
725  * Head of linked list of blocks that can be migrated.
726  */
727 static struct MigrationReadyBlock *mig_head;
728
729 /**
730  * Tail of linked list of blocks that can be migrated.
731  */
732 static struct MigrationReadyBlock *mig_tail;
733
734 /**
735  * Request to datastore for migration (or NULL).
736  */
737 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
738
739 /**
740  * Request to datastore for DHT PUTs (or NULL).
741  */
742 static struct GNUNET_DATASTORE_QueueEntry *dht_qe;
743
744 /**
745  * Type we will request for the next DHT PUT round from the datastore.
746  */
747 static enum GNUNET_BLOCK_Type dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
748
749 /**
750  * Where do we store trust information?
751  */
752 static char *trustDirectory;
753
754 /**
755  * ID of task that collects blocks for migration.
756  */
757 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
758
759 /**
760  * ID of task that collects blocks for DHT PUTs.
761  */
762 static GNUNET_SCHEDULER_TaskIdentifier dht_task;
763
764 /**
765  * What is the maximum frequency at which we are allowed to
766  * poll the datastore for migration content?
767  */
768 static struct GNUNET_TIME_Relative min_migration_delay;
769
770 /**
771  * Handle for DHT operations.
772  */
773 static struct GNUNET_DHT_Handle *dht_handle;
774
775 /**
776  * Size of the doubly-linked list of migration blocks.
777  */
778 static unsigned int mig_size;
779
780 /**
781  * Are we allowed to migrate content to this peer.
782  */
783 static int active_migration;
784
785 /**
786  * How many entires with zero anonymity do we currently estimate
787  * to have in the database?
788  */
789 static unsigned int zero_anonymity_count_estimate;
790
791 /**
792  * Typical priorities we're seeing from other peers right now.  Since
793  * most priorities will be zero, this value is the weighted average of
794  * non-zero priorities seen "recently".  In order to ensure that new
795  * values do not dramatically change the ratio, values are first
796  * "capped" to a reasonable range (+N of the current value) and then
797  * averaged into the existing value by a ratio of 1:N.  Hence
798  * receiving the largest possible priority can still only raise our
799  * "current_priorities" by at most 1.
800  */
801 static double current_priorities;
802
803 /**
804  * Datastore 'GET' load tracking.
805  */
806 static struct GNUNET_LOAD_Value *datastore_get_load;
807
808 /**
809  * Datastore 'PUT' load tracking.
810  */
811 static struct GNUNET_LOAD_Value *datastore_put_load;
812
813 /**
814  * How long do requests typically stay in the routing table?
815  */
816 static struct GNUNET_LOAD_Value *rt_entry_lifetime;
817
818 /**
819  * We've just now completed a datastore request.  Update our
820  * datastore load calculations.
821  *
822  * @param start time when the datastore request was issued
823  */
824 static void
825 update_datastore_delays (struct GNUNET_TIME_Absolute start)
826 {
827   struct GNUNET_TIME_Relative delay;
828
829   delay = GNUNET_TIME_absolute_get_duration (start);
830   GNUNET_LOAD_update (datastore_get_load,
831                       delay.value);
832 }
833
834
835 /**
836  * Get the filename under which we would store the GNUNET_HELLO_Message
837  * for the given host and protocol.
838  * @return filename of the form DIRECTORY/HOSTID
839  */
840 static char *
841 get_trust_filename (const struct GNUNET_PeerIdentity *id)
842 {
843   struct GNUNET_CRYPTO_HashAsciiEncoded fil;
844   char *fn;
845
846   GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
847   GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil);
848   return fn;
849 }
850
851
852
853 /**
854  * Transmit messages by copying it to the target buffer
855  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
856  * for writing in the meantime.  In that case, do nothing
857  * (the disconnect or shutdown handler will take care of the rest).
858  * If we were able to transmit messages and there are still more
859  * pending, ask core again for further calls to this function.
860  *
861  * @param cls closure, pointer to the 'struct ConnectedPeer*'
862  * @param size number of bytes available in buf
863  * @param buf where the callee should write the message
864  * @return number of bytes written to buf
865  */
866 static size_t
867 transmit_to_peer (void *cls,
868                   size_t size, void *buf);
869
870
871 /* ******************* clean up functions ************************ */
872
873 /**
874  * Delete the given migration block.
875  *
876  * @param mb block to delete
877  */
878 static void
879 delete_migration_block (struct MigrationReadyBlock *mb)
880 {
881   GNUNET_CONTAINER_DLL_remove (mig_head,
882                                mig_tail,
883                                mb);
884   GNUNET_PEER_decrement_rcs (mb->target_list,
885                              MIGRATION_LIST_SIZE);
886   mig_size--;
887   GNUNET_free (mb);
888 }
889
890
891 /**
892  * Compare the distance of two peers to a key.
893  *
894  * @param key key
895  * @param p1 first peer
896  * @param p2 second peer
897  * @return GNUNET_YES if P1 is closer to key than P2
898  */
899 static int
900 is_closer (const GNUNET_HashCode *key,
901            const struct GNUNET_PeerIdentity *p1,
902            const struct GNUNET_PeerIdentity *p2)
903 {
904   return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey,
905                                     &p2->hashPubKey,
906                                     key);
907 }
908
909
910 /**
911  * Consider migrating content to a given peer.
912  *
913  * @param cls 'struct MigrationReadyBlock*' to select
914  *            targets for (or NULL for none)
915  * @param key ID of the peer 
916  * @param value 'struct ConnectedPeer' of the peer
917  * @return GNUNET_YES (always continue iteration)
918  */
919 static int
920 consider_migration (void *cls,
921                     const GNUNET_HashCode *key,
922                     void *value)
923 {
924   struct MigrationReadyBlock *mb = cls;
925   struct ConnectedPeer *cp = value;
926   struct MigrationReadyBlock *pos;
927   struct GNUNET_PeerIdentity cppid;
928   struct GNUNET_PeerIdentity otherpid;
929   struct GNUNET_PeerIdentity worstpid;
930   size_t msize;
931   unsigned int i;
932   unsigned int repl;
933   
934   /* consider 'cp' as a migration target for mb */
935   if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).value > 0)
936     return GNUNET_YES; /* peer has requested no migration! */
937   if (mb != NULL)
938     {
939       GNUNET_PEER_resolve (cp->pid,
940                            &cppid);
941       repl = MIGRATION_LIST_SIZE;
942       for (i=0;i<MIGRATION_LIST_SIZE;i++)
943         {
944           if (mb->target_list[i] == 0)
945             {
946               mb->target_list[i] = cp->pid;
947               GNUNET_PEER_change_rc (mb->target_list[i], 1);
948               repl = MIGRATION_LIST_SIZE;
949               break;
950             }
951           GNUNET_PEER_resolve (mb->target_list[i],
952                                &otherpid);
953           if ( (repl == MIGRATION_LIST_SIZE) &&
954                is_closer (&mb->query,
955                           &cppid,
956                           &otherpid)) 
957             {
958               repl = i;
959               worstpid = otherpid;
960             }
961           else if ( (repl != MIGRATION_LIST_SIZE) &&
962                     (is_closer (&mb->query,
963                                 &worstpid,
964                                 &otherpid) ) )
965             {
966               repl = i;
967               worstpid = otherpid;
968             }       
969         }
970       if (repl != MIGRATION_LIST_SIZE) 
971         {
972           GNUNET_PEER_change_rc (mb->target_list[repl], -1);
973           mb->target_list[repl] = cp->pid;
974           GNUNET_PEER_change_rc (mb->target_list[repl], 1);
975         }
976     }
977
978   /* consider scheduling transmission to cp for content migration */
979   if (cp->cth != NULL)
980     return GNUNET_YES; 
981   msize = 0;
982   pos = mig_head;
983   while (pos != NULL)
984     {
985       for (i=0;i<MIGRATION_LIST_SIZE;i++)
986         {
987           if (cp->pid == pos->target_list[i])
988             {
989               if (msize == 0)
990                 msize = pos->size;
991               else
992                 msize = GNUNET_MIN (msize,
993                                     pos->size);
994               break;
995             }
996         }
997       pos = pos->next;
998     }
999   if (msize == 0)
1000     return GNUNET_YES; /* no content available */
1001 #if DEBUG_FS
1002   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1003               "Trying to migrate at least %u bytes to peer `%s'\n",
1004               msize,
1005               GNUNET_h2s (key));
1006 #endif
1007   cp->cth 
1008     = GNUNET_CORE_notify_transmit_ready (core,
1009                                          0, GNUNET_TIME_UNIT_FOREVER_REL,
1010                                          (const struct GNUNET_PeerIdentity*) key,
1011                                          msize + sizeof (struct PutMessage),
1012                                          &transmit_to_peer,
1013                                          cp);
1014   return GNUNET_YES;
1015 }
1016
1017
1018 /**
1019  * Task that is run periodically to obtain blocks for content
1020  * migration
1021  * 
1022  * @param cls unused
1023  * @param tc scheduler context (also unused)
1024  */
1025 static void
1026 gather_migration_blocks (void *cls,
1027                          const struct GNUNET_SCHEDULER_TaskContext *tc);
1028
1029
1030
1031
1032 /**
1033  * Task that is run periodically to obtain blocks for DHT PUTs.
1034  * 
1035  * @param cls type of blocks to gather
1036  * @param tc scheduler context (unused)
1037  */
1038 static void
1039 gather_dht_put_blocks (void *cls,
1040                        const struct GNUNET_SCHEDULER_TaskContext *tc);
1041
1042
1043 /**
1044  * If the migration task is not currently running, consider
1045  * (re)scheduling it with the appropriate delay.
1046  */
1047 static void
1048 consider_migration_gathering ()
1049 {
1050   struct GNUNET_TIME_Relative delay;
1051
1052   if (dsh == NULL)
1053     return;
1054   if (mig_qe != NULL)
1055     return;
1056   if (mig_task != GNUNET_SCHEDULER_NO_TASK)
1057     return;
1058   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
1059                                          mig_size);
1060   delay = GNUNET_TIME_relative_divide (delay,
1061                                        MAX_MIGRATION_QUEUE);
1062   delay = GNUNET_TIME_relative_max (delay,
1063                                     min_migration_delay);
1064   mig_task = GNUNET_SCHEDULER_add_delayed (sched,
1065                                            delay,
1066                                            &gather_migration_blocks,
1067                                            NULL);
1068 }
1069
1070
1071 /**
1072  * If the DHT PUT gathering task is not currently running, consider
1073  * (re)scheduling it with the appropriate delay.
1074  */
1075 static void
1076 consider_dht_put_gathering (void *cls)
1077 {
1078   struct GNUNET_TIME_Relative delay;
1079
1080   if (dsh == NULL)
1081     return;
1082   if (dht_qe != NULL)
1083     return;
1084   if (dht_task != GNUNET_SCHEDULER_NO_TASK)
1085     return;
1086   if (zero_anonymity_count_estimate > 0)
1087     {
1088       delay = GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY,
1089                                            zero_anonymity_count_estimate);
1090       delay = GNUNET_TIME_relative_min (delay,
1091                                         MAX_DHT_PUT_FREQ);
1092     }
1093   else
1094     {
1095       /* if we have NO zero-anonymity content yet, wait 5 minutes for some to
1096          (hopefully) appear */
1097       delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5);
1098     }
1099   dht_task = GNUNET_SCHEDULER_add_delayed (sched,
1100                                            delay,
1101                                            &gather_dht_put_blocks,
1102                                            cls);
1103 }
1104
1105
1106 /**
1107  * Process content offered for migration.
1108  *
1109  * @param cls closure
1110  * @param key key for the content
1111  * @param size number of bytes in data
1112  * @param data content stored
1113  * @param type type of the content
1114  * @param priority priority of the content
1115  * @param anonymity anonymity-level for the content
1116  * @param expiration expiration time for the content
1117  * @param uid unique identifier for the datum;
1118  *        maybe 0 if no unique identifier is available
1119  */
1120 static void
1121 process_migration_content (void *cls,
1122                            const GNUNET_HashCode * key,
1123                            size_t size,
1124                            const void *data,
1125                            enum GNUNET_BLOCK_Type type,
1126                            uint32_t priority,
1127                            uint32_t anonymity,
1128                            struct GNUNET_TIME_Absolute
1129                            expiration, uint64_t uid)
1130 {
1131   struct MigrationReadyBlock *mb;
1132   
1133   if (key == NULL)
1134     {
1135       mig_qe = NULL;
1136       if (mig_size < MAX_MIGRATION_QUEUE)  
1137         consider_migration_gathering ();
1138       return;
1139     }
1140   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1141     {
1142       if (GNUNET_OK !=
1143           GNUNET_FS_handle_on_demand_block (key, size, data,
1144                                             type, priority, anonymity,
1145                                             expiration, uid, 
1146                                             &process_migration_content,
1147                                             NULL))
1148         {
1149           GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1150         }
1151       return;
1152     }
1153 #if DEBUG_FS
1154   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1155               "Retrieved block `%s' of type %u for migration\n",
1156               GNUNET_h2s (key),
1157               type);
1158 #endif
1159   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
1160   mb->query = *key;
1161   mb->expiration = expiration;
1162   mb->size = size;
1163   mb->type = type;
1164   memcpy (&mb[1], data, size);
1165   GNUNET_CONTAINER_DLL_insert_after (mig_head,
1166                                      mig_tail,
1167                                      mig_tail,
1168                                      mb);
1169   mig_size++;
1170   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1171                                          &consider_migration,
1172                                          mb);
1173   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1174 }
1175
1176
1177 /**
1178  * Function called upon completion of the DHT PUT operation.
1179  */
1180 static void
1181 dht_put_continuation (void *cls,
1182                       const struct GNUNET_SCHEDULER_TaskContext *tc)
1183 {
1184   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1185 }
1186
1187
1188 /**
1189  * Store content in DHT.
1190  *
1191  * @param cls closure
1192  * @param key key for the content
1193  * @param size number of bytes in data
1194  * @param data content stored
1195  * @param type type of the content
1196  * @param priority priority of the content
1197  * @param anonymity anonymity-level for the content
1198  * @param expiration expiration time for the content
1199  * @param uid unique identifier for the datum;
1200  *        maybe 0 if no unique identifier is available
1201  */
1202 static void
1203 process_dht_put_content (void *cls,
1204                          const GNUNET_HashCode * key,
1205                          size_t size,
1206                          const void *data,
1207                          enum GNUNET_BLOCK_Type type,
1208                          uint32_t priority,
1209                          uint32_t anonymity,
1210                          struct GNUNET_TIME_Absolute
1211                          expiration, uint64_t uid)
1212
1213   static unsigned int counter;
1214   static GNUNET_HashCode last_vhash;
1215   static GNUNET_HashCode vhash;
1216
1217   if (key == NULL)
1218     {
1219       dht_qe = NULL;
1220       consider_dht_put_gathering (cls);
1221       return;
1222     }
1223   /* slightly funky code to estimate the total number of values with zero
1224      anonymity from the maximum observed length of a monotonically increasing 
1225      sequence of hashes over the contents */
1226   GNUNET_CRYPTO_hash (data, size, &vhash);
1227   if (GNUNET_CRYPTO_hash_cmp (&vhash, &last_vhash) <= 0)
1228     {
1229       if (zero_anonymity_count_estimate > 0)
1230         zero_anonymity_count_estimate /= 2;
1231       counter = 0;
1232     }
1233   last_vhash = vhash;
1234   if (counter < 31)
1235     counter++;
1236   if (zero_anonymity_count_estimate < (1 << counter))
1237     zero_anonymity_count_estimate = (1 << counter);
1238 #if DEBUG_FS
1239   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1240               "Retrieved block `%s' of type %u for DHT PUT\n",
1241               GNUNET_h2s (key),
1242               type);
1243 #endif
1244   GNUNET_DHT_put (dht_handle,
1245                   key,
1246                   GNUNET_DHT_RO_NONE,
1247                   type,
1248                   size,
1249                   data,
1250                   expiration,
1251                   GNUNET_TIME_UNIT_FOREVER_REL,
1252                   &dht_put_continuation,
1253                   cls);
1254 }
1255
1256
1257 /**
1258  * Task that is run periodically to obtain blocks for content
1259  * migration
1260  * 
1261  * @param cls unused
1262  * @param tc scheduler context (also unused)
1263  */
1264 static void
1265 gather_migration_blocks (void *cls,
1266                          const struct GNUNET_SCHEDULER_TaskContext *tc)
1267 {
1268   mig_task = GNUNET_SCHEDULER_NO_TASK;
1269   if (dsh != NULL)
1270     {
1271       mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, UINT_MAX,
1272                                             GNUNET_TIME_UNIT_FOREVER_REL,
1273                                             &process_migration_content, NULL);
1274       GNUNET_assert (mig_qe != NULL);
1275     }
1276 }
1277
1278
1279 /**
1280  * Task that is run periodically to obtain blocks for DHT PUTs.
1281  * 
1282  * @param cls type of blocks to gather
1283  * @param tc scheduler context (unused)
1284  */
1285 static void
1286 gather_dht_put_blocks (void *cls,
1287                        const struct GNUNET_SCHEDULER_TaskContext *tc)
1288 {
1289   dht_task = GNUNET_SCHEDULER_NO_TASK;
1290   if (dsh != NULL)
1291     {
1292       if (dht_put_type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1293         dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
1294       dht_qe = GNUNET_DATASTORE_get_zero_anonymity (dsh, 0, UINT_MAX,
1295                                                     GNUNET_TIME_UNIT_FOREVER_REL,
1296                                                     dht_put_type++,
1297                                                     &process_dht_put_content, NULL);
1298       GNUNET_assert (dht_qe != NULL);
1299     }
1300 }
1301
1302
1303 /**
1304  * We're done with a particular message list entry.
1305  * Free all associated resources.
1306  * 
1307  * @param pml entry to destroy
1308  */
1309 static void
1310 destroy_pending_message_list_entry (struct PendingMessageList *pml)
1311 {
1312   GNUNET_CONTAINER_DLL_remove (pml->req->pending_head,
1313                                pml->req->pending_tail,
1314                                pml);
1315   GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head,
1316                                pml->target->pending_messages_tail,
1317                                pml->pm);
1318   pml->target->pending_requests--;
1319   GNUNET_free (pml->pm);
1320   GNUNET_free (pml);
1321 }
1322
1323
1324 /**
1325  * Destroy the given pending message (and call the respective
1326  * continuation).
1327  *
1328  * @param pm message to destroy
1329  * @param tpid id of peer that the message was delivered to, or 0 for none
1330  */
1331 static void
1332 destroy_pending_message (struct PendingMessage *pm,
1333                          GNUNET_PEER_Id tpid)
1334 {
1335   struct PendingMessageList *pml = pm->pml;
1336   TransmissionContinuation cont;
1337   void *cont_cls;
1338
1339   if (pml != NULL)
1340     {
1341       GNUNET_assert (pml->pm == pm);
1342       GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
1343       cont = pm->cont;
1344       cont_cls = pm->cont_cls;
1345       destroy_pending_message_list_entry (pml);
1346     }
1347   else
1348     {
1349       GNUNET_free (pm);
1350     }
1351   if (cont != NULL)
1352     cont (cont_cls, tpid);  
1353 }
1354
1355
1356 /**
1357  * We're done processing a particular request.
1358  * Free all associated resources.
1359  *
1360  * @param pr request to destroy
1361  */
1362 static void
1363 destroy_pending_request (struct PendingRequest *pr)
1364 {
1365   struct GNUNET_PeerIdentity pid;
1366
1367   if (pr->hnode != NULL)
1368     {
1369       GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
1370                                          pr->hnode);
1371       pr->hnode = NULL;
1372     }
1373   if (NULL == pr->client_request_list)
1374     {
1375       GNUNET_STATISTICS_update (stats,
1376                                 gettext_noop ("# P2P searches active"),
1377                                 -1,
1378                                 GNUNET_NO);
1379     }
1380   else
1381     {
1382       GNUNET_STATISTICS_update (stats,
1383                                 gettext_noop ("# client searches active"),
1384                                 -1,
1385                                 GNUNET_NO);
1386     }
1387   if (GNUNET_YES == 
1388       GNUNET_CONTAINER_multihashmap_remove (query_request_map,
1389                                             &pr->query,
1390                                             pr))
1391     {
1392       GNUNET_LOAD_update (rt_entry_lifetime,
1393                           GNUNET_TIME_absolute_get_duration (pr->start_time).value);
1394     }
1395   if (pr->qe != NULL)
1396      {
1397       GNUNET_DATASTORE_cancel (pr->qe);
1398       pr->qe = NULL;
1399     }
1400   if (pr->dht_get != NULL)
1401     {
1402       GNUNET_DHT_get_stop (pr->dht_get);
1403       pr->dht_get = NULL;
1404     }
1405   if (pr->client_request_list != NULL)
1406     {
1407       GNUNET_CONTAINER_DLL_remove (pr->client_request_list->client_list->rl_head,
1408                                    pr->client_request_list->client_list->rl_tail,
1409                                    pr->client_request_list);
1410       GNUNET_free (pr->client_request_list);
1411       pr->client_request_list = NULL;
1412     }
1413   if (pr->cp != NULL)
1414     {
1415       GNUNET_PEER_resolve (pr->cp->pid,
1416                            &pid);
1417       (void) GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1418                                                    &pid.hashPubKey,
1419                                                    pr);
1420       pr->cp = NULL;
1421     }
1422   if (pr->bf != NULL)
1423     {
1424       GNUNET_CONTAINER_bloomfilter_free (pr->bf);                                        
1425       pr->bf = NULL;
1426     }
1427   if (pr->irc != NULL)
1428     {
1429       GNUNET_CORE_peer_change_preference_cancel (pr->irc);
1430       pr->irc = NULL;
1431     }
1432   if (pr->replies_seen != NULL)
1433     {
1434       GNUNET_free (pr->replies_seen);
1435       pr->replies_seen = NULL;
1436     }
1437   if (pr->task != GNUNET_SCHEDULER_NO_TASK)
1438     {
1439       GNUNET_SCHEDULER_cancel (sched,
1440                                pr->task);
1441       pr->task = GNUNET_SCHEDULER_NO_TASK;
1442     }
1443   while (NULL != pr->pending_head)    
1444     destroy_pending_message_list_entry (pr->pending_head);
1445   GNUNET_PEER_change_rc (pr->target_pid, -1);
1446   if (pr->used_pids != NULL)
1447     {
1448       GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
1449       GNUNET_free (pr->used_pids);
1450       pr->used_pids_off = 0;
1451       pr->used_pids_size = 0;
1452       pr->used_pids = NULL;
1453     }
1454   GNUNET_free (pr);
1455 }
1456
1457
1458 /**
1459  * Method called whenever a given peer connects.
1460  *
1461  * @param cls closure, not used
1462  * @param peer peer identity this notification is about
1463  * @param latency reported latency of the connection with 'other'
1464  * @param distance reported distance (DV) to 'other' 
1465  */
1466 static void 
1467 peer_connect_handler (void *cls,
1468                       const struct
1469                       GNUNET_PeerIdentity * peer,
1470                       struct GNUNET_TIME_Relative latency,
1471                       uint32_t distance)
1472 {
1473   struct ConnectedPeer *cp;
1474   struct MigrationReadyBlock *pos;
1475   char *fn;
1476   uint32_t trust;
1477   
1478   cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
1479   cp->transmission_delay = GNUNET_LOAD_value_init ();
1480   cp->pid = GNUNET_PEER_intern (peer);
1481
1482   fn = get_trust_filename (peer);
1483   if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
1484       (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
1485     cp->disk_trust = cp->trust = ntohl (trust);
1486   GNUNET_free (fn);
1487
1488   GNUNET_break (GNUNET_OK ==
1489                 GNUNET_CONTAINER_multihashmap_put (connected_peers,
1490                                                    &peer->hashPubKey,
1491                                                    cp,
1492                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1493
1494   pos = mig_head;
1495   while (NULL != pos)
1496     {
1497       (void) consider_migration (pos, &peer->hashPubKey, cp);
1498       pos = pos->next;
1499     }
1500 }
1501
1502
1503 /**
1504  * Increase the host credit by a value.
1505  *
1506  * @param host which peer to change the trust value on
1507  * @param value is the int value by which the
1508  *  host credit is to be increased or decreased
1509  * @returns the actual change in trust (positive or negative)
1510  */
1511 static int
1512 change_host_trust (struct ConnectedPeer *host, int value)
1513 {
1514   unsigned int old_trust;
1515
1516   if (value == 0)
1517     return 0;
1518   GNUNET_assert (host != NULL);
1519   old_trust = host->trust;
1520   if (value > 0)
1521     {
1522       if (host->trust + value < host->trust)
1523         {
1524           value = UINT32_MAX - host->trust;
1525           host->trust = UINT32_MAX;
1526         }
1527       else
1528         host->trust += value;
1529     }
1530   else
1531     {
1532       if (host->trust < -value)
1533         {
1534           value = -host->trust;
1535           host->trust = 0;
1536         }
1537       else
1538         host->trust += value;
1539     }
1540   return value;
1541 }
1542
1543
1544 /**
1545  * Write host-trust information to a file - flush the buffer entry!
1546  */
1547 static int
1548 flush_trust (void *cls,
1549              const GNUNET_HashCode *key,
1550              void *value)
1551 {
1552   struct ConnectedPeer *host = value;
1553   char *fn;
1554   uint32_t trust;
1555   struct GNUNET_PeerIdentity pid;
1556
1557   if (host->trust == host->disk_trust)
1558     return GNUNET_OK;                     /* unchanged */
1559   GNUNET_PEER_resolve (host->pid,
1560                        &pid);
1561   fn = get_trust_filename (&pid);
1562   if (host->trust == 0)
1563     {
1564       if ((0 != UNLINK (fn)) && (errno != ENOENT))
1565         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
1566                                   GNUNET_ERROR_TYPE_BULK, "unlink", fn);
1567     }
1568   else
1569     {
1570       trust = htonl (host->trust);
1571       if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust, 
1572                                                     sizeof(uint32_t),
1573                                                     GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE
1574                                                     | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ))
1575         host->disk_trust = host->trust;
1576     }
1577   GNUNET_free (fn);
1578   return GNUNET_OK;
1579 }
1580
1581 /**
1582  * Call this method periodically to scan data/hosts for new hosts.
1583  */
1584 static void
1585 cron_flush_trust (void *cls,
1586                   const struct GNUNET_SCHEDULER_TaskContext *tc)
1587 {
1588
1589   if (NULL == connected_peers)
1590     return;
1591   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1592                                          &flush_trust,
1593                                          NULL);
1594   if (NULL == tc)
1595     return;
1596   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1597     return;
1598   GNUNET_SCHEDULER_add_delayed (tc->sched,
1599                                 TRUST_FLUSH_FREQ, &cron_flush_trust, NULL);
1600 }
1601
1602
1603 /**
1604  * Free (each) request made by the peer.
1605  *
1606  * @param cls closure, points to peer that the request belongs to
1607  * @param key current key code
1608  * @param value value in the hash map
1609  * @return GNUNET_YES (we should continue to iterate)
1610  */
1611 static int
1612 destroy_request (void *cls,
1613                  const GNUNET_HashCode * key,
1614                  void *value)
1615 {
1616   const struct GNUNET_PeerIdentity * peer = cls;
1617   struct PendingRequest *pr = value;
1618   
1619   GNUNET_break (GNUNET_YES ==
1620                 GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1621                                                       &peer->hashPubKey,
1622                                                       pr));
1623   destroy_pending_request (pr);
1624   return GNUNET_YES;
1625 }
1626
1627
1628 /**
1629  * Method called whenever a peer disconnects.
1630  *
1631  * @param cls closure, not used
1632  * @param peer peer identity this notification is about
1633  */
1634 static void
1635 peer_disconnect_handler (void *cls,
1636                          const struct
1637                          GNUNET_PeerIdentity * peer)
1638 {
1639   struct ConnectedPeer *cp;
1640   struct PendingMessage *pm;
1641   unsigned int i;
1642   struct MigrationReadyBlock *pos;
1643   struct MigrationReadyBlock *next;
1644
1645   GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map,
1646                                               &peer->hashPubKey,
1647                                               &destroy_request,
1648                                               (void*) peer);
1649   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1650                                           &peer->hashPubKey);
1651   if (cp == NULL)
1652     return;
1653   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1654     {
1655       if (NULL != cp->last_client_replies[i])
1656         {
1657           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1658           cp->last_client_replies[i] = NULL;
1659         }
1660     }
1661   GNUNET_break (GNUNET_YES ==
1662                 GNUNET_CONTAINER_multihashmap_remove (connected_peers,
1663                                                       &peer->hashPubKey,
1664                                                       cp));
1665   /* remove this peer from migration considerations; schedule
1666      alternatives */
1667   next = mig_head;
1668   while (NULL != (pos = next))
1669     {
1670       next = pos->next;
1671       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1672         {
1673           if (pos->target_list[i] == cp->pid)
1674             {
1675               GNUNET_PEER_change_rc (pos->target_list[i], -1);
1676               pos->target_list[i] = 0;
1677             }
1678          }
1679       if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers))
1680         {
1681           delete_migration_block (pos);
1682           consider_migration_gathering ();
1683           continue;
1684         }
1685       GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1686                                              &consider_migration,
1687                                              pos);
1688     }
1689   GNUNET_PEER_change_rc (cp->pid, -1);
1690   GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1691   if (NULL != cp->cth)
1692     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1693   while (NULL != (pm = cp->pending_messages_head))
1694     destroy_pending_message (pm, 0 /* delivery failed */);
1695   GNUNET_LOAD_value_free (cp->transmission_delay);
1696   GNUNET_break (0 == cp->pending_requests);
1697   GNUNET_free (cp);
1698 }
1699
1700
1701 /**
1702  * Iterator over hash map entries that removes all occurences
1703  * of the given 'client' from the 'last_client_replies' of the
1704  * given connected peer.
1705  *
1706  * @param cls closure, the 'struct GNUNET_SERVER_Client*' to remove
1707  * @param key current key code (unused)
1708  * @param value value in the hash map (the 'struct ConnectedPeer*' to change)
1709  * @return GNUNET_YES (we should continue to iterate)
1710  */
1711 static int
1712 remove_client_from_last_client_replies (void *cls,
1713                                         const GNUNET_HashCode * key,
1714                                         void *value)
1715 {
1716   struct GNUNET_SERVER_Client *client = cls;
1717   struct ConnectedPeer *cp = value;
1718   unsigned int i;
1719
1720   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1721     {
1722       if (cp->last_client_replies[i] == client)
1723         {
1724           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1725           cp->last_client_replies[i] = NULL;
1726         }
1727     }  
1728   return GNUNET_YES;
1729 }
1730
1731
1732 /**
1733  * A client disconnected.  Remove all of its pending queries.
1734  *
1735  * @param cls closure, NULL
1736  * @param client identification of the client
1737  */
1738 static void
1739 handle_client_disconnect (void *cls,
1740                           struct GNUNET_SERVER_Client
1741                           * client)
1742 {
1743   struct ClientList *pos;
1744   struct ClientList *prev;
1745   struct ClientRequestList *rcl;
1746   struct ClientResponseMessage *creply;
1747
1748   if (client == NULL)
1749     return;
1750   prev = NULL;
1751   pos = client_list;
1752   while ( (NULL != pos) &&
1753           (pos->client != client) )
1754     {
1755       prev = pos;
1756       pos = pos->next;
1757     }
1758   if (pos == NULL)
1759     return; /* no requests pending for this client */
1760   while (NULL != (rcl = pos->rl_head))
1761     {
1762       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1763                   "Destroying pending request `%s' on disconnect\n",
1764                   GNUNET_h2s (&rcl->req->query));
1765       destroy_pending_request (rcl->req);
1766     }
1767   if (prev == NULL)
1768     client_list = pos->next;
1769   else
1770     prev->next = pos->next;
1771   if (pos->th != NULL)
1772     {
1773       GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
1774       pos->th = NULL;
1775     }
1776   while (NULL != (creply = pos->res_head))
1777     {
1778       GNUNET_CONTAINER_DLL_remove (pos->res_head,
1779                                    pos->res_tail,
1780                                    creply);
1781       GNUNET_free (creply);
1782     }    
1783   GNUNET_SERVER_client_drop (pos->client);
1784   GNUNET_free (pos);
1785   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1786                                          &remove_client_from_last_client_replies,
1787                                          client);
1788 }
1789
1790
1791 /**
1792  * Iterator to free peer entries.
1793  *
1794  * @param cls closure, unused
1795  * @param key current key code
1796  * @param value value in the hash map (peer entry)
1797  * @return GNUNET_YES (we should continue to iterate)
1798  */
1799 static int 
1800 clean_peer (void *cls,
1801             const GNUNET_HashCode * key,
1802             void *value)
1803 {
1804   peer_disconnect_handler (NULL, (const struct GNUNET_PeerIdentity*) key);
1805   return GNUNET_YES;
1806 }
1807
1808
1809 /**
1810  * Task run during shutdown.
1811  *
1812  * @param cls unused
1813  * @param tc unused
1814  */
1815 static void
1816 shutdown_task (void *cls,
1817                const struct GNUNET_SCHEDULER_TaskContext *tc)
1818 {
1819   if (mig_qe != NULL)
1820     {
1821       GNUNET_DATASTORE_cancel (mig_qe);
1822       mig_qe = NULL;
1823     }
1824   if (dht_qe != NULL)
1825     {
1826       GNUNET_DATASTORE_cancel (dht_qe);
1827       dht_qe = NULL;
1828     }
1829   if (GNUNET_SCHEDULER_NO_TASK != mig_task)
1830     {
1831       GNUNET_SCHEDULER_cancel (sched, mig_task);
1832       mig_task = GNUNET_SCHEDULER_NO_TASK;
1833     }
1834   if (GNUNET_SCHEDULER_NO_TASK != dht_task)
1835     {
1836       GNUNET_SCHEDULER_cancel (sched, dht_task);
1837       dht_task = GNUNET_SCHEDULER_NO_TASK;
1838     }
1839   while (client_list != NULL)
1840     handle_client_disconnect (NULL,
1841                               client_list->client);
1842   cron_flush_trust (NULL, NULL);
1843   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1844                                          &clean_peer,
1845                                          NULL);
1846   GNUNET_break (0 == GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap));
1847   GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
1848   requests_by_expiration_heap = 0;
1849   GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
1850   connected_peers = NULL;
1851   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map));
1852   GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
1853   query_request_map = NULL;
1854   GNUNET_LOAD_value_free (rt_entry_lifetime);
1855   rt_entry_lifetime = NULL;
1856   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map));
1857   GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
1858   peer_request_map = NULL;
1859   GNUNET_assert (NULL != core);
1860   GNUNET_CORE_disconnect (core);
1861   core = NULL;
1862   if (stats != NULL)
1863     {
1864       GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
1865       stats = NULL;
1866     }
1867   if (dsh != NULL)
1868     {
1869       GNUNET_DATASTORE_disconnect (dsh,
1870                                    GNUNET_NO);
1871       dsh = NULL;
1872     }
1873   while (mig_head != NULL)
1874     delete_migration_block (mig_head);
1875   GNUNET_assert (0 == mig_size);
1876   GNUNET_DHT_disconnect (dht_handle);
1877   dht_handle = NULL;
1878   GNUNET_LOAD_value_free (datastore_get_load);
1879   datastore_get_load = NULL;
1880   GNUNET_LOAD_value_free (datastore_put_load);
1881   datastore_put_load = NULL;
1882   GNUNET_BLOCK_context_destroy (block_ctx);
1883   block_ctx = NULL;
1884   GNUNET_CONFIGURATION_destroy (block_cfg);
1885   block_cfg = NULL;
1886   sched = NULL;
1887   cfg = NULL;  
1888   GNUNET_free_non_null (trustDirectory);
1889   trustDirectory = NULL;
1890 }
1891
1892
1893 /* ******************* Utility functions  ******************** */
1894
1895
1896 /**
1897  * Transmit messages by copying it to the target buffer
1898  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
1899  * for writing in the meantime.  In that case, do nothing
1900  * (the disconnect or shutdown handler will take care of the rest).
1901  * If we were able to transmit messages and there are still more
1902  * pending, ask core again for further calls to this function.
1903  *
1904  * @param cls closure, pointer to the 'struct ConnectedPeer*'
1905  * @param size number of bytes available in buf
1906  * @param buf where the callee should write the message
1907  * @return number of bytes written to buf
1908  */
1909 static size_t
1910 transmit_to_peer (void *cls,
1911                   size_t size, void *buf)
1912 {
1913   struct ConnectedPeer *cp = cls;
1914   char *cbuf = buf;
1915   struct GNUNET_PeerIdentity pid;
1916   struct PendingMessage *pm;
1917   struct MigrationReadyBlock *mb;
1918   struct MigrationReadyBlock *next;
1919   struct PutMessage migm;
1920   size_t msize;
1921   unsigned int i;
1922  
1923   cp->cth = NULL;
1924   if (NULL == buf)
1925     {
1926 #if DEBUG_FS
1927       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1928                   "Dropping message, core too busy.\n");
1929 #endif
1930       GNUNET_LOAD_update (cp->transmission_delay,
1931                           UINT64_MAX);
1932       return 0;
1933     }
1934   GNUNET_LOAD_update (cp->transmission_delay,
1935                       GNUNET_TIME_absolute_get_duration (cp->last_transmission_request_start).value);
1936   msize = 0;
1937   while ( (NULL != (pm = cp->pending_messages_head) ) &&
1938           (pm->msize <= size) )
1939     {
1940       memcpy (&cbuf[msize], &pm[1], pm->msize);
1941       msize += pm->msize;
1942       size -= pm->msize;
1943       destroy_pending_message (pm, cp->pid);
1944     }
1945   if (NULL != pm)
1946     {
1947       GNUNET_PEER_resolve (cp->pid,
1948                            &pid);
1949       cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
1950       cp->cth = GNUNET_CORE_notify_transmit_ready (core,
1951                                                    pm->priority,
1952                                                    GNUNET_CONSTANTS_SERVICE_TIMEOUT,
1953                                                    &pid,
1954                                                    pm->msize,
1955                                                    &transmit_to_peer,
1956                                                    cp);
1957     }
1958   else
1959     {      
1960       next = mig_head;
1961       while (NULL != (mb = next))
1962         {
1963           next = mb->next;
1964           for (i=0;i<MIGRATION_LIST_SIZE;i++)
1965             {
1966               if ( (cp->pid == mb->target_list[i]) &&
1967                    (mb->size + sizeof (migm) <= size) )
1968                 {
1969                   GNUNET_PEER_change_rc (mb->target_list[i], -1);
1970                   mb->target_list[i] = 0;
1971                   mb->used_targets++;
1972                   memset (&migm, 0, sizeof (migm));
1973                   migm.header.size = htons (sizeof (migm) + mb->size);
1974                   migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
1975                   migm.type = htonl (mb->type);
1976                   migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
1977                   memcpy (&cbuf[msize], &migm, sizeof (migm));
1978                   msize += sizeof (migm);
1979                   size -= sizeof (migm);
1980                   memcpy (&cbuf[msize], &mb[1], mb->size);
1981                   msize += mb->size;
1982                   size -= mb->size;
1983 #if DEBUG_FS
1984                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1985                               "Pushing migration block `%s' (%u bytes) to `%s'\n",
1986                               GNUNET_h2s (&mb->query),
1987                               mb->size,
1988                               GNUNET_i2s (&pid));
1989 #endif    
1990                   break;
1991                 }
1992               else
1993                 {
1994 #if DEBUG_FS
1995                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1996                               "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
1997                               GNUNET_h2s (&mb->query),
1998                               mb->size,
1999                               GNUNET_i2s (&pid));
2000 #endif    
2001                 }
2002             }
2003           if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
2004                (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
2005             {
2006               delete_migration_block (mb);
2007               consider_migration_gathering ();
2008             }
2009         }
2010       consider_migration (NULL, 
2011                           &pid.hashPubKey,
2012                           cp);
2013     }
2014 #if DEBUG_FS
2015   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2016               "Transmitting %u bytes to peer %u\n",
2017               msize,
2018               cp->pid);
2019 #endif
2020   return msize;
2021 }
2022
2023
2024 /**
2025  * Add a message to the set of pending messages for the given peer.
2026  *
2027  * @param cp peer to send message to
2028  * @param pm message to queue
2029  * @param pr request on which behalf this message is being queued
2030  */
2031 static void
2032 add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
2033                                   struct PendingMessage *pm,
2034                                   struct PendingRequest *pr)
2035 {
2036   struct PendingMessage *pos;
2037   struct PendingMessageList *pml;
2038   struct GNUNET_PeerIdentity pid;
2039
2040   GNUNET_assert (pm->next == NULL);
2041   GNUNET_assert (pm->pml == NULL);    
2042   if (pr != NULL)
2043     {
2044       pml = GNUNET_malloc (sizeof (struct PendingMessageList));
2045       pml->req = pr;
2046       pml->target = cp;
2047       pml->pm = pm;
2048       pm->pml = pml;  
2049       GNUNET_CONTAINER_DLL_insert (pr->pending_head,
2050                                    pr->pending_tail,
2051                                    pml);
2052     }
2053   pos = cp->pending_messages_head;
2054   while ( (pos != NULL) &&
2055           (pm->priority < pos->priority) )
2056     pos = pos->next;    
2057   GNUNET_CONTAINER_DLL_insert_after (cp->pending_messages_head,
2058                                      cp->pending_messages_tail,
2059                                      pos,
2060                                      pm);
2061   cp->pending_requests++;
2062   if (cp->pending_requests > MAX_QUEUE_PER_PEER)
2063     destroy_pending_message (cp->pending_messages_tail, 0);  
2064   GNUNET_PEER_resolve (cp->pid, &pid);
2065   if (NULL != cp->cth)
2066     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
2067   /* need to schedule transmission */
2068   cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
2069   cp->cth = GNUNET_CORE_notify_transmit_ready (core,
2070                                                cp->pending_messages_head->priority,
2071                                                MAX_TRANSMIT_DELAY,
2072                                                &pid,
2073                                                cp->pending_messages_head->msize,
2074                                                &transmit_to_peer,
2075                                                cp);
2076   if (cp->cth == NULL)
2077     {
2078 #if DEBUG_FS
2079       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2080                   "Failed to schedule transmission with core!\n");
2081 #endif
2082       GNUNET_STATISTICS_update (stats,
2083                                 gettext_noop ("# CORE transmission failures"),
2084                                 1,
2085                                 GNUNET_NO);
2086     }
2087 }
2088
2089
2090 /**
2091  * Test if the DATABASE (GET) load on this peer is too high
2092  * to even consider processing the query at
2093  * all.  
2094  * 
2095  * @return GNUNET_YES if the load is too high to do anything (load high)
2096  *         GNUNET_NO to process normally (load normal)
2097  *         GNUNET_SYSERR to process for free (load low)
2098  */
2099 static int
2100 test_get_load_too_high (uint32_t priority)
2101 {
2102   double ld;
2103
2104   ld = GNUNET_LOAD_get_load (datastore_get_load);
2105   if (ld < 1)
2106     {
2107       GNUNET_STATISTICS_update (stats,
2108                                 gettext_noop ("# requests done for free (low load)"),
2109                                 1,
2110                                 GNUNET_NO);
2111       return GNUNET_SYSERR;
2112     }
2113   if (ld <= priority)
2114     {
2115       GNUNET_STATISTICS_update (stats,
2116                                 gettext_noop ("# requests done for a price (normal load)"),
2117                                 1,
2118                                 GNUNET_NO);
2119       return GNUNET_NO;
2120     }
2121   GNUNET_STATISTICS_update (stats,
2122                             gettext_noop ("# requests dropped due to high load"),
2123                             1,
2124                             GNUNET_NO);
2125   return GNUNET_YES;
2126 }
2127
2128
2129
2130
2131 /**
2132  * Test if the DATABASE (PUT) load on this peer is too high
2133  * to even consider processing the query at
2134  * all.  
2135  * 
2136  * @return GNUNET_YES if the load is too high to do anything (load high)
2137  *         GNUNET_NO to process normally (load normal or low)
2138  */
2139 static int
2140 test_put_load_too_high (uint32_t priority)
2141 {
2142   double ld;
2143
2144   if (GNUNET_LOAD_get_average (datastore_put_load) < 50)
2145     return GNUNET_NO; /* very fast */
2146   ld = GNUNET_LOAD_get_load (datastore_put_load);
2147   if ( (ld < 1) || (ld < priority) )
2148     return GNUNET_NO;
2149   GNUNET_STATISTICS_update (stats,
2150                             gettext_noop ("# storage requests dropped due to high load"),
2151                             1,
2152                             GNUNET_NO);
2153   return GNUNET_YES;
2154 }
2155
2156
2157 /* ******************* Pending Request Refresh Task ******************** */
2158
2159
2160
2161 /**
2162  * We use a random delay to make the timing of requests less
2163  * predictable.  This function returns such a random delay.  We add a base
2164  * delay of MAX_CORK_DELAY (1s).
2165  *
2166  * FIXME: make schedule dependent on the specifics of the request?
2167  * Or bandwidth and number of connected peers and load?
2168  *
2169  * @return random delay to use for some request, between 1s and 1000+TTL_DECREMENT ms
2170  */
2171 static struct GNUNET_TIME_Relative
2172 get_processing_delay ()
2173 {
2174   return 
2175     GNUNET_TIME_relative_add (GNUNET_CONSTANTS_MAX_CORK_DELAY,
2176                               GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
2177                                                              GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2178                                                                                        TTL_DECREMENT)));
2179 }
2180
2181
2182 /**
2183  * We're processing a GET request from another peer and have decided
2184  * to forward it to other peers.  This function is called periodically
2185  * and should forward the request to other peers until we have all
2186  * possible replies.  If we have transmitted the *only* reply to
2187  * the initiator we should destroy the pending request.  If we have
2188  * many replies in the queue to the initiator, we should delay sending
2189  * out more queries until the reply queue has shrunk some.
2190  *
2191  * @param cls our "struct ProcessGetContext *"
2192  * @param tc unused
2193  */
2194 static void
2195 forward_request_task (void *cls,
2196                       const struct GNUNET_SCHEDULER_TaskContext *tc);
2197
2198
2199 /**
2200  * Function called after we either failed or succeeded
2201  * at transmitting a query to a peer.  
2202  *
2203  * @param cls the requests "struct PendingRequest*"
2204  * @param tpid ID of receiving peer, 0 on transmission error
2205  */
2206 static void
2207 transmit_query_continuation (void *cls,
2208                              GNUNET_PEER_Id tpid)
2209 {
2210   struct PendingRequest *pr = cls;
2211
2212   GNUNET_STATISTICS_update (stats,
2213                             gettext_noop ("# queries scheduled for forwarding"),
2214                             -1,
2215                             GNUNET_NO);
2216   if (tpid == 0)   
2217     {
2218 #if DEBUG_FS
2219       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2220                   "Transmission of request failed, will try again later.\n");
2221 #endif
2222       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2223         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2224                                                  get_processing_delay (),
2225                                                  &forward_request_task,
2226                                                  pr); 
2227       return;    
2228     }
2229   GNUNET_STATISTICS_update (stats,
2230                             gettext_noop ("# queries forwarded"),
2231                             1,
2232                             GNUNET_NO);
2233   GNUNET_PEER_change_rc (tpid, 1);
2234   if (pr->used_pids_off == pr->used_pids_size)
2235     GNUNET_array_grow (pr->used_pids,
2236                        pr->used_pids_size,
2237                        pr->used_pids_size * 2 + 2);
2238   pr->used_pids[pr->used_pids_off++] = tpid;
2239   if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2240     pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2241                                              get_processing_delay (),
2242                                              &forward_request_task,
2243                                              pr);
2244 }
2245
2246
2247 /**
2248  * How many bytes should a bloomfilter be if we have already seen
2249  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
2250  * of bits set per entry.  Furthermore, we should not re-size the
2251  * filter too often (to keep it cheap).
2252  *
2253  * Since other peers will also add entries but not resize the filter,
2254  * we should generally pick a slightly larger size than what the
2255  * strict math would suggest.
2256  *
2257  * @return must be a power of two and smaller or equal to 2^15.
2258  */
2259 static size_t
2260 compute_bloomfilter_size (unsigned int entry_count)
2261 {
2262   size_t size;
2263   unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
2264   uint16_t max = 1 << 15;
2265
2266   if (entry_count > max)
2267     return max;
2268   size = 8;
2269   while ((size < max) && (size < ideal))
2270     size *= 2;
2271   if (size > max)
2272     return max;
2273   return size;
2274 }
2275
2276
2277 /**
2278  * Recalculate our bloom filter for filtering replies.  This function
2279  * will create a new bloom filter from scratch, so it should only be
2280  * called if we have no bloomfilter at all (and hence can create a
2281  * fresh one of minimal size without problems) OR if our peer is the
2282  * initiator (in which case we may resize to larger than mimimum size).
2283  *
2284  * @param pr request for which the BF is to be recomputed
2285  */
2286 static void
2287 refresh_bloomfilter (struct PendingRequest *pr)
2288 {
2289   unsigned int i;
2290   size_t nsize;
2291   GNUNET_HashCode mhash;
2292
2293   nsize = compute_bloomfilter_size (pr->replies_seen_off);
2294   if (nsize == pr->bf_size)
2295     return; /* size not changed */
2296   if (pr->bf != NULL)
2297     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
2298   pr->bf_size = nsize;
2299   pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
2300   pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
2301                                               pr->bf_size,
2302                                               BLOOMFILTER_K);
2303   for (i=0;i<pr->replies_seen_off;i++)
2304     {
2305       GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i],
2306                                 pr->mingle,
2307                                 &mhash);
2308       GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
2309     }
2310 }
2311
2312
2313 /**
2314  * Function called after we've tried to reserve a certain amount of
2315  * bandwidth for a reply.  Check if we succeeded and if so send our
2316  * query.
2317  *
2318  * @param cls the requests "struct PendingRequest*"
2319  * @param peer identifies the peer
2320  * @param bpm_in set to the current bandwidth limit (receiving) for this peer
2321  * @param bpm_out set to the current bandwidth limit (sending) for this peer
2322  * @param amount set to the amount that was actually reserved or unreserved
2323  * @param preference current traffic preference for the given peer
2324  */
2325 static void
2326 target_reservation_cb (void *cls,
2327                        const struct
2328                        GNUNET_PeerIdentity * peer,
2329                        struct GNUNET_BANDWIDTH_Value32NBO bpm_in,
2330                        struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
2331                        int amount,
2332                        uint64_t preference)
2333 {
2334   struct PendingRequest *pr = cls;
2335   struct ConnectedPeer *cp;
2336   struct PendingMessage *pm;
2337   struct GetMessage *gm;
2338   GNUNET_HashCode *ext;
2339   char *bfdata;
2340   size_t msize;
2341   unsigned int k;
2342   int no_route;
2343   uint32_t bm;
2344
2345   pr->irc = NULL;
2346   if (peer == NULL)
2347     {
2348       /* error in communication with core, try again later */
2349       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2350         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2351                                                  get_processing_delay (),
2352                                                  &forward_request_task,
2353                                                  pr);
2354       return;
2355     }
2356   // (3) transmit, update ttl/priority
2357   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2358                                           &peer->hashPubKey);
2359   if (cp == NULL)
2360     {
2361       /* Peer must have just left */
2362 #if DEBUG_FS
2363       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2364                   "Selected peer disconnected!\n");
2365 #endif
2366       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2367         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2368                                                  get_processing_delay (),
2369                                                  &forward_request_task,
2370                                                  pr);
2371       return;
2372     }
2373   no_route = GNUNET_NO;
2374   if (amount == 0)
2375     {
2376       if (pr->cp == NULL)
2377         {
2378 #if DEBUG_FS > 1
2379           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2380                       "Failed to reserve bandwidth for reply (got %d/%u bytes only)!\n",
2381                       amount,
2382                       DBLOCK_SIZE);
2383 #endif
2384           GNUNET_STATISTICS_update (stats,
2385                                     gettext_noop ("# reply bandwidth reservation requests failed"),
2386                                     1,
2387                                     GNUNET_NO);
2388           if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2389             pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2390                                                      get_processing_delay (),
2391                                                      &forward_request_task,
2392                                                      pr);
2393           return;  /* this target round failed */
2394         }
2395       no_route = GNUNET_YES;
2396     }
2397   
2398   GNUNET_STATISTICS_update (stats,
2399                             gettext_noop ("# queries scheduled for forwarding"),
2400                             1,
2401                             GNUNET_NO);
2402   /* build message and insert message into priority queue */
2403 #if DEBUG_FS
2404   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2405               "Forwarding request `%s' to `%4s'!\n",
2406               GNUNET_h2s (&pr->query),
2407               GNUNET_i2s (peer));
2408 #endif
2409   k = 0;
2410   bm = 0;
2411   if (GNUNET_YES == no_route)
2412     {
2413       bm |= GET_MESSAGE_BIT_RETURN_TO;
2414       k++;      
2415     }
2416   if (pr->namespace != NULL)
2417     {
2418       bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
2419       k++;
2420     }
2421   if (pr->target_pid != 0)
2422     {
2423       bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
2424       k++;
2425     }
2426   msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
2427   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
2428   pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
2429   pm->msize = msize;
2430   gm = (struct GetMessage*) &pm[1];
2431   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
2432   gm->header.size = htons (msize);
2433   gm->type = htonl (pr->type);
2434   pr->remaining_priority /= 2;
2435   gm->priority = htonl (pr->remaining_priority);
2436   gm->ttl = htonl (pr->ttl);
2437   gm->filter_mutator = htonl(pr->mingle); 
2438   gm->hash_bitmap = htonl (bm);
2439   gm->query = pr->query;
2440   ext = (GNUNET_HashCode*) &gm[1];
2441   k = 0;
2442   if (GNUNET_YES == no_route)
2443     GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2444   if (pr->namespace != NULL)
2445     memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
2446   if (pr->target_pid != 0)
2447     GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2448   bfdata = (char *) &ext[k];
2449   if (pr->bf != NULL)
2450     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
2451                                                bfdata,
2452                                                pr->bf_size);
2453   pm->cont = &transmit_query_continuation;
2454   pm->cont_cls = pr;
2455   add_to_pending_messages_for_peer (cp, pm, pr);
2456 }
2457
2458
2459 /**
2460  * Closure used for "target_peer_select_cb".
2461  */
2462 struct PeerSelectionContext 
2463 {
2464   /**
2465    * The request for which we are selecting
2466    * peers.
2467    */
2468   struct PendingRequest *pr;
2469
2470   /**
2471    * Current "prime" target.
2472    */
2473   struct GNUNET_PeerIdentity target;
2474
2475   /**
2476    * How much do we like this target?
2477    */
2478   double target_score;
2479
2480 };
2481
2482
2483 /**
2484  * Function called for each connected peer to determine
2485  * which one(s) would make good targets for forwarding.
2486  *
2487  * @param cls closure (struct PeerSelectionContext)
2488  * @param key current key code (peer identity)
2489  * @param value value in the hash map (struct ConnectedPeer)
2490  * @return GNUNET_YES if we should continue to
2491  *         iterate,
2492  *         GNUNET_NO if not.
2493  */
2494 static int
2495 target_peer_select_cb (void *cls,
2496                        const GNUNET_HashCode * key,
2497                        void *value)
2498 {
2499   struct PeerSelectionContext *psc = cls;
2500   struct ConnectedPeer *cp = value;
2501   struct PendingRequest *pr = psc->pr;
2502   double score;
2503   unsigned int i;
2504   unsigned int pc;
2505
2506   /* 1) check that this peer is not the initiator */
2507   if (cp == pr->cp)
2508     {
2509 #if DEBUG_FS
2510       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2511                   "Skipping initiator in forwarding selection\n");
2512 #endif
2513       return GNUNET_YES; /* skip */        
2514     }
2515
2516   /* 2) check if we have already (recently) forwarded to this peer */
2517   pc = 0;
2518   for (i=0;i<pr->used_pids_off;i++)
2519     if (pr->used_pids[i] == cp->pid) 
2520       {
2521         pc++;
2522         if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2523                                            RETRY_PROBABILITY_INV))
2524           {
2525 #if DEBUG_FS
2526             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2527                         "NOT re-trying query that was previously transmitted %u times\n",
2528                         (unsigned int) pr->used_pids_off);
2529 #endif
2530             return GNUNET_YES; /* skip */
2531           }
2532       }
2533 #if DEBUG_FS
2534   if (0 < pc)
2535     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2536                 "Re-trying query that was previously transmitted %u times to this peer\n",
2537                 (unsigned int) pc);
2538 #endif
2539   /* 3) calculate how much we'd like to forward to this peer,
2540      starting with a random value that is strong enough
2541      to at least give any peer a chance sometimes 
2542      (compared to the other factors that come later) */
2543   /* 3a) count successful (recent) routes from cp for same source */
2544   if (pr->cp != NULL)
2545     {
2546       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2547                                         P2P_SUCCESS_LIST_SIZE);
2548       for (i=0;i<P2P_SUCCESS_LIST_SIZE;i++)
2549         if (cp->last_p2p_replies[i] == pr->cp->pid)
2550           score += 1.0; /* likely successful based on hot path */
2551     }
2552   else
2553     {
2554       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2555                                         CS2P_SUCCESS_LIST_SIZE);
2556       for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
2557         if (cp->last_client_replies[i] == pr->client_request_list->client_list->client)
2558           score += 1.0; /* likely successful based on hot path */
2559     }
2560   /* 3b) include latency */
2561   if (cp->avg_delay.value < 4 * TTL_DECREMENT)
2562     score += 1.0; /* likely fast based on latency */
2563   /* 3c) include priorities */
2564   if (cp->avg_priority <= pr->remaining_priority / 2.0)
2565     score += 1.0; /* likely successful based on priorities */
2566   /* 3d) penalize for queue size */  
2567   score -= (2.0 * cp->pending_requests / (double) MAX_QUEUE_PER_PEER); 
2568   /* 3e) include peer proximity */
2569   score -= (2.0 * (GNUNET_CRYPTO_hash_distance_u32 (key,
2570                                                     &pr->query)) / (double) UINT32_MAX);
2571   /* 4) super-bonus for being the known target */
2572   if (pr->target_pid == cp->pid)
2573     score += 100.0;
2574   /* store best-fit in closure */
2575 #if DEBUG_FS
2576   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2577               "Peer `%s' gets score %f for forwarding query, max is %f\n",
2578               GNUNET_h2s (key),
2579               score,
2580               psc->target_score);
2581 #endif  
2582   score++; /* avoid zero */
2583   if (score > psc->target_score)
2584     {
2585       psc->target_score = score;
2586       psc->target.hashPubKey = *key; 
2587     }
2588   return GNUNET_YES;
2589 }
2590   
2591
2592 /**
2593  * The priority level imposes a bound on the maximum
2594  * value for the ttl that can be requested.
2595  *
2596  * @param ttl_in requested ttl
2597  * @param prio given priority
2598  * @return ttl_in if ttl_in is below the limit,
2599  *         otherwise the ttl-limit for the given priority
2600  */
2601 static int32_t
2602 bound_ttl (int32_t ttl_in, uint32_t prio)
2603 {
2604   unsigned long long allowed;
2605
2606   if (ttl_in <= 0)
2607     return ttl_in;
2608   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
2609   if (ttl_in > allowed)      
2610     {
2611       if (allowed >= (1 << 30))
2612         return 1 << 30;
2613       return allowed;
2614     }
2615   return ttl_in;
2616 }
2617
2618
2619 /**
2620  * Iterator called on each result obtained for a DHT
2621  * operation that expects a reply
2622  *
2623  * @param cls closure
2624  * @param exp when will this value expire
2625  * @param key key of the result
2626  * @param get_path NULL-terminated array of pointers
2627  *                 to the peers on reverse GET path (or NULL if not recorded)
2628  * @param put_path NULL-terminated array of pointers
2629  *                 to the peers on the PUT path (or NULL if not recorded)
2630  * @param type type of the result
2631  * @param size number of bytes in data
2632  * @param data pointer to the result data
2633  */
2634 static void
2635 process_dht_reply (void *cls,
2636                    struct GNUNET_TIME_Absolute exp,
2637                    const GNUNET_HashCode * key,
2638                    const struct GNUNET_PeerIdentity * const *get_path,
2639                    const struct GNUNET_PeerIdentity * const *put_path,
2640                    enum GNUNET_BLOCK_Type type,
2641                    size_t size,
2642                    const void *data);
2643
2644
2645 /**
2646  * We're processing a GET request and have decided
2647  * to forward it to other peers.  This function is called periodically
2648  * and should forward the request to other peers until we have all
2649  * possible replies.  If we have transmitted the *only* reply to
2650  * the initiator we should destroy the pending request.  If we have
2651  * many replies in the queue to the initiator, we should delay sending
2652  * out more queries until the reply queue has shrunk some.
2653  *
2654  * @param cls our "struct ProcessGetContext *"
2655  * @param tc unused
2656  */
2657 static void
2658 forward_request_task (void *cls,
2659                      const struct GNUNET_SCHEDULER_TaskContext *tc)
2660 {
2661   struct PendingRequest *pr = cls;
2662   struct PeerSelectionContext psc;
2663   struct ConnectedPeer *cp; 
2664   struct GNUNET_TIME_Relative delay;
2665
2666   pr->task = GNUNET_SCHEDULER_NO_TASK;
2667   if (pr->irc != NULL)
2668     {
2669 #if DEBUG_FS
2670       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2671                   "Forwarding of query `%s' not attempted due to pending local lookup!\n",
2672                   GNUNET_h2s (&pr->query));
2673 #endif
2674       return; /* already pending */
2675     }
2676   if (GNUNET_YES == pr->local_only)
2677     return; /* configured to not do P2P search */
2678   /* (0) try DHT */
2679   if ( (0 == pr->anonymity_level) &&
2680        (GNUNET_YES != pr->forward_only) &&
2681        (pr->type != GNUNET_BLOCK_TYPE_FS_DBLOCK) &&
2682        (pr->type != GNUNET_BLOCK_TYPE_FS_IBLOCK) )
2683     {
2684       pr->dht_get = GNUNET_DHT_get_start (dht_handle,
2685                                           GNUNET_TIME_UNIT_FOREVER_REL,
2686                                           pr->type,
2687                                           &pr->query,
2688                                           GNUNET_DHT_RO_NONE,
2689                                           pr->bf,
2690                                           pr->mingle,
2691                                           pr->namespace,
2692                                           (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
2693                                           &process_dht_reply,
2694                                           pr);
2695     }
2696   /* (1) select target */
2697   psc.pr = pr;
2698   psc.target_score = -DBL_MAX;
2699   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
2700                                          &target_peer_select_cb,
2701                                          &psc);  
2702   if (psc.target_score == -DBL_MAX)
2703     {
2704       delay = get_processing_delay ();
2705 #if DEBUG_FS 
2706       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2707                   "No peer selected for forwarding of query `%s', will try again in %llu ms!\n",
2708                   GNUNET_h2s (&pr->query),
2709                   delay.value);
2710 #endif
2711       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2712                                                delay,
2713                                                &forward_request_task,
2714                                                pr);
2715       return; /* nobody selected */
2716     }
2717   /* (3) update TTL/priority */
2718   if (pr->client_request_list != NULL)
2719     {
2720       /* FIXME: use better algorithm!? */
2721       if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2722                                          4))
2723         pr->priority++;
2724       /* bound priority we use by priorities we see from other peers
2725          rounded up (must round up so that we can see non-zero
2726          priorities, but round up as little as possible to make it
2727          plausible that we forwarded another peers request) */
2728       if (pr->priority > current_priorities + 1.0)
2729         pr->priority = (uint32_t) current_priorities + 1.0;
2730       pr->ttl = bound_ttl (pr->ttl + TTL_DECREMENT * 2,
2731                            pr->priority);
2732 #if DEBUG_FS
2733       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2734                   "Trying query `%s' with priority %u and TTL %d.\n",
2735                   GNUNET_h2s (&pr->query),
2736                   pr->priority,
2737                   pr->ttl);
2738 #endif
2739     }
2740
2741   /* (3) reserve reply bandwidth */
2742   if (GNUNET_NO == pr->forward_only)
2743     {
2744       cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2745                                               &psc.target.hashPubKey);
2746       GNUNET_assert (NULL != cp);
2747       pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
2748                                                     &psc.target,
2749                                                     GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
2750                                                     GNUNET_BANDWIDTH_value_init (UINT32_MAX),
2751                                                     DBLOCK_SIZE * 2, 
2752                                                     cp->inc_preference,
2753                                                     &target_reservation_cb,
2754                                                     pr);
2755       cp->inc_preference = 0;
2756     }
2757   else
2758     {
2759       /* force forwarding */
2760       static struct GNUNET_BANDWIDTH_Value32NBO zerobw;
2761       target_reservation_cb (pr, &psc.target,
2762                              zerobw, zerobw, 0, 0.0);
2763     }
2764 }
2765
2766
2767 /* **************************** P2P PUT Handling ************************ */
2768
2769
2770 /**
2771  * Function called after we either failed or succeeded
2772  * at transmitting a reply to a peer.  
2773  *
2774  * @param cls the requests "struct PendingRequest*"
2775  * @param tpid ID of receiving peer, 0 on transmission error
2776  */
2777 static void
2778 transmit_reply_continuation (void *cls,
2779                              GNUNET_PEER_Id tpid)
2780 {
2781   struct PendingRequest *pr = cls;
2782   
2783   switch (pr->type)
2784     {
2785     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
2786     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
2787       /* only one reply expected, done with the request! */
2788       destroy_pending_request (pr);
2789       break;
2790     case GNUNET_BLOCK_TYPE_ANY:
2791     case GNUNET_BLOCK_TYPE_FS_KBLOCK:
2792     case GNUNET_BLOCK_TYPE_FS_SBLOCK:
2793       break;
2794     default:
2795       GNUNET_break (0);
2796       break;
2797     }
2798 }
2799
2800
2801 /**
2802  * Transmit the given message by copying it to the target buffer
2803  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
2804  * for writing in the meantime.  In that case, do nothing
2805  * (the disconnect or shutdown handler will take care of the rest).
2806  * If we were able to transmit messages and there are still more
2807  * pending, ask core again for further calls to this function.
2808  *
2809  * @param cls closure, pointer to the 'struct ClientList*'
2810  * @param size number of bytes available in buf
2811  * @param buf where the callee should write the message
2812  * @return number of bytes written to buf
2813  */
2814 static size_t
2815 transmit_to_client (void *cls,
2816                   size_t size, void *buf)
2817 {
2818   struct ClientList *cl = cls;
2819   char *cbuf = buf;
2820   struct ClientResponseMessage *creply;
2821   size_t msize;
2822   
2823   cl->th = NULL;
2824   if (NULL == buf)
2825     {
2826 #if DEBUG_FS
2827       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2828                   "Not sending reply, client communication problem.\n");
2829 #endif
2830       return 0;
2831     }
2832   msize = 0;
2833   while ( (NULL != (creply = cl->res_head) ) &&
2834           (creply->msize <= size) )
2835     {
2836       memcpy (&cbuf[msize], &creply[1], creply->msize);
2837       msize += creply->msize;
2838       size -= creply->msize;
2839       GNUNET_CONTAINER_DLL_remove (cl->res_head,
2840                                    cl->res_tail,
2841                                    creply);
2842       GNUNET_free (creply);
2843     }
2844   if (NULL != creply)
2845     cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
2846                                                   creply->msize,
2847                                                   GNUNET_TIME_UNIT_FOREVER_REL,
2848                                                   &transmit_to_client,
2849                                                   cl);
2850 #if DEBUG_FS
2851   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2852               "Transmitted %u bytes to client\n",
2853               (unsigned int) msize);
2854 #endif
2855   return msize;
2856 }
2857
2858
2859 /**
2860  * Closure for "process_reply" function.
2861  */
2862 struct ProcessReplyClosure
2863 {
2864   /**
2865    * The data for the reply.
2866    */
2867   const void *data;
2868
2869   /**
2870    * Who gave us this reply? NULL for local host (or DHT)
2871    */
2872   struct ConnectedPeer *sender;
2873
2874   /**
2875    * When the reply expires.
2876    */
2877   struct GNUNET_TIME_Absolute expiration;
2878
2879   /**
2880    * Size of data.
2881    */
2882   size_t size;
2883
2884   /**
2885    * Type of the block.
2886    */
2887   enum GNUNET_BLOCK_Type type;
2888
2889   /**
2890    * How much was this reply worth to us?
2891    */
2892   uint32_t priority;
2893
2894   /**
2895    * Evaluation result (returned).
2896    */
2897   enum GNUNET_BLOCK_EvaluationResult eval;
2898
2899   /**
2900    * Did we finish processing the associated request?
2901    */ 
2902   int finished;
2903
2904   /**
2905    * Did we find a matching request?
2906    */
2907   int request_found;
2908 };
2909
2910
2911 /**
2912  * We have received a reply; handle it!
2913  *
2914  * @param cls response (struct ProcessReplyClosure)
2915  * @param key our query
2916  * @param value value in the hash map (info about the query)
2917  * @return GNUNET_YES (we should continue to iterate)
2918  */
2919 static int
2920 process_reply (void *cls,
2921                const GNUNET_HashCode * key,
2922                void *value)
2923 {
2924   struct ProcessReplyClosure *prq = cls;
2925   struct PendingRequest *pr = value;
2926   struct PendingMessage *reply;
2927   struct ClientResponseMessage *creply;
2928   struct ClientList *cl;
2929   struct PutMessage *pm;
2930   struct ConnectedPeer *cp;
2931   struct GNUNET_TIME_Relative cur_delay;
2932   size_t msize;
2933
2934 #if DEBUG_FS
2935   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2936               "Matched result (type %u) for query `%s' with pending request\n",
2937               (unsigned int) prq->type,
2938               GNUNET_h2s (key));
2939 #endif  
2940   GNUNET_STATISTICS_update (stats,
2941                             gettext_noop ("# replies received and matched"),
2942                             1,
2943                             GNUNET_NO);
2944   if (prq->sender != NULL)
2945     {
2946       cur_delay = GNUNET_TIME_absolute_get_duration (pr->start_time);
2947       prq->sender->avg_delay.value
2948         = (prq->sender->avg_delay.value * 
2949            (RUNAVG_DELAY_N - 1) + cur_delay.value) / RUNAVG_DELAY_N; 
2950       prq->sender->avg_priority
2951         = (prq->sender->avg_priority * 
2952            (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
2953       if (pr->cp != NULL)
2954         {
2955           GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
2956                                  [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], 
2957                                  -1);
2958           GNUNET_PEER_change_rc (pr->cp->pid, 1);
2959           prq->sender->last_p2p_replies
2960             [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
2961             = pr->cp->pid;
2962         }
2963       else
2964         {
2965           if (NULL != prq->sender->last_client_replies
2966               [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
2967             GNUNET_SERVER_client_drop (prq->sender->last_client_replies
2968                                        [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
2969           prq->sender->last_client_replies
2970             [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
2971             = pr->client_request_list->client_list->client;
2972           GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
2973         }
2974     }
2975   prq->eval = GNUNET_BLOCK_evaluate (block_ctx,
2976                                      prq->type,
2977                                      key,
2978                                      &pr->bf,
2979                                      pr->mingle,
2980                                      pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
2981                                      prq->data,
2982                                      prq->size);
2983   switch (prq->eval)
2984     {
2985     case GNUNET_BLOCK_EVALUATION_OK_MORE:
2986       break;
2987     case GNUNET_BLOCK_EVALUATION_OK_LAST:
2988       while (NULL != pr->pending_head)
2989         destroy_pending_message_list_entry (pr->pending_head);
2990       if (pr->qe != NULL)
2991         {
2992           if (pr->client_request_list != NULL)
2993             GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
2994                                         GNUNET_YES);
2995           GNUNET_DATASTORE_cancel (pr->qe);
2996           pr->qe = NULL;
2997         }
2998       pr->do_remove = GNUNET_YES;
2999       if (pr->task != GNUNET_SCHEDULER_NO_TASK)
3000         {
3001           GNUNET_SCHEDULER_cancel (sched,
3002                                    pr->task);
3003           pr->task = GNUNET_SCHEDULER_NO_TASK;
3004         }
3005       GNUNET_break (GNUNET_YES ==
3006                     GNUNET_CONTAINER_multihashmap_remove (query_request_map,
3007                                                           key,
3008                                                           pr));
3009       GNUNET_LOAD_update (rt_entry_lifetime,
3010                           GNUNET_TIME_absolute_get_duration (pr->start_time).value);
3011       break;
3012     case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
3013       GNUNET_STATISTICS_update (stats,
3014                                 gettext_noop ("# duplicate replies discarded (bloomfilter)"),
3015                                 1,
3016                                 GNUNET_NO);
3017 #if DEBUG_FS
3018       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3019                   "Duplicate response `%s', discarding.\n",
3020                   GNUNET_h2s (&mhash));
3021 #endif
3022       return GNUNET_YES; /* duplicate */
3023     case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
3024       return GNUNET_YES; /* wrong namespace */  
3025     case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
3026       GNUNET_break (0);
3027       return GNUNET_YES;
3028     case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
3029       GNUNET_break (0);
3030       return GNUNET_YES;
3031     case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
3032       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3033                   _("Unsupported block type %u\n"),
3034                   prq->type);
3035       return GNUNET_NO;
3036     }
3037   if (pr->client_request_list != NULL)
3038     {
3039       if (pr->replies_seen_size == pr->replies_seen_off)
3040         GNUNET_array_grow (pr->replies_seen,
3041                            pr->replies_seen_size,
3042                            pr->replies_seen_size * 2 + 4);      
3043       GNUNET_CRYPTO_hash (prq->data,
3044                           prq->size,
3045                           &pr->replies_seen[pr->replies_seen_off++]);         
3046       refresh_bloomfilter (pr);
3047     }
3048   if (NULL == prq->sender)
3049     {
3050 #if DEBUG_FS
3051       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3052                   "Found result for query `%s' in local datastore\n",
3053                   GNUNET_h2s (key));
3054 #endif
3055       GNUNET_STATISTICS_update (stats,
3056                                 gettext_noop ("# results found locally"),
3057                                 1,
3058                                 GNUNET_NO);      
3059     }
3060   prq->priority += pr->remaining_priority;
3061   pr->remaining_priority = 0;
3062   pr->results_found++;
3063   prq->request_found = GNUNET_YES;
3064   if (NULL != pr->client_request_list)
3065     {
3066       GNUNET_STATISTICS_update (stats,
3067                                 gettext_noop ("# replies received for local clients"),
3068                                 1,
3069                                 GNUNET_NO);
3070       cl = pr->client_request_list->client_list;
3071       msize = sizeof (struct PutMessage) + prq->size;
3072       creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage));
3073       creply->msize = msize;
3074       creply->client_list = cl;
3075       GNUNET_CONTAINER_DLL_insert_after (cl->res_head,
3076                                          cl->res_tail,
3077                                          cl->res_tail,
3078                                          creply);      
3079       pm = (struct PutMessage*) &creply[1];
3080       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3081       pm->header.size = htons (msize);
3082       pm->type = htonl (prq->type);
3083       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3084       memcpy (&pm[1], prq->data, prq->size);      
3085       if (NULL == cl->th)
3086         {
3087 #if DEBUG_FS
3088           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3089                       "Transmitting result for query `%s' to client\n",
3090                       GNUNET_h2s (key));
3091 #endif  
3092           cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
3093                                                         msize,
3094                                                         GNUNET_TIME_UNIT_FOREVER_REL,
3095                                                         &transmit_to_client,
3096                                                         cl);
3097         }
3098       GNUNET_break (cl->th != NULL);
3099       if (pr->do_remove)                
3100         {
3101           prq->finished = GNUNET_YES;
3102           destroy_pending_request (pr);         
3103         }
3104     }
3105   else
3106     {
3107       cp = pr->cp;
3108 #if DEBUG_FS
3109       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3110                   "Transmitting result for query `%s' to other peer (PID=%u)\n",
3111                   GNUNET_h2s (key),
3112                   (unsigned int) cp->pid);
3113 #endif  
3114       GNUNET_STATISTICS_update (stats,
3115                                 gettext_noop ("# replies received for other peers"),
3116                                 1,
3117                                 GNUNET_NO);
3118       msize = sizeof (struct PutMessage) + prq->size;
3119       reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
3120       reply->cont = &transmit_reply_continuation;
3121       reply->cont_cls = pr;
3122       reply->msize = msize;
3123       reply->priority = UINT32_MAX; /* send replies first! */
3124       pm = (struct PutMessage*) &reply[1];
3125       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3126       pm->header.size = htons (msize);
3127       pm->type = htonl (prq->type);
3128       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3129       memcpy (&pm[1], prq->data, prq->size);
3130       add_to_pending_messages_for_peer (cp, reply, pr);
3131     }
3132   return GNUNET_YES;
3133 }
3134
3135
3136 /**
3137  * Iterator called on each result obtained for a DHT
3138  * operation that expects a reply
3139  *
3140  * @param cls closure
3141  * @param exp when will this value expire
3142  * @param key key of the result
3143  * @param get_path NULL-terminated array of pointers
3144  *                 to the peers on reverse GET path (or NULL if not recorded)
3145  * @param put_path NULL-terminated array of pointers
3146  *                 to the peers on the PUT path (or NULL if not recorded)
3147  * @param type type of the result
3148  * @param size number of bytes in data
3149  * @param data pointer to the result data
3150  */
3151 static void
3152 process_dht_reply (void *cls,
3153                    struct GNUNET_TIME_Absolute exp,
3154                    const GNUNET_HashCode * key,
3155                    const struct GNUNET_PeerIdentity * const *get_path,
3156                    const struct GNUNET_PeerIdentity * const *put_path,
3157                    enum GNUNET_BLOCK_Type type,
3158                    size_t size,
3159                    const void *data)
3160 {
3161   struct PendingRequest *pr = cls;
3162   struct ProcessReplyClosure prq;
3163
3164   memset (&prq, 0, sizeof (prq));
3165   prq.data = data;
3166   prq.expiration = exp;
3167   prq.size = size;  
3168   prq.type = type;
3169   process_reply (&prq, key, pr);
3170 }
3171
3172
3173
3174 /**
3175  * Continuation called to notify client about result of the
3176  * operation.
3177  *
3178  * @param cls closure
3179  * @param success GNUNET_SYSERR on failure
3180  * @param msg NULL on success, otherwise an error message
3181  */
3182 static void 
3183 put_migration_continuation (void *cls,
3184                             int success,
3185                             const char *msg)
3186 {
3187   struct GNUNET_TIME_Absolute *start = cls;
3188   struct GNUNET_TIME_Relative delay;
3189   
3190   delay = GNUNET_TIME_absolute_get_duration (*start);
3191   GNUNET_free (start);
3192   GNUNET_LOAD_update (datastore_put_load,
3193                       delay.value);
3194   if (GNUNET_OK == success)
3195     return;
3196   GNUNET_STATISTICS_update (stats,
3197                             gettext_noop ("# datastore 'put' failures"),
3198                             1,
3199                             GNUNET_NO);
3200 }
3201
3202
3203 /**
3204  * Handle P2P "PUT" message.
3205  *
3206  * @param cls closure, always NULL
3207  * @param other the other peer involved (sender or receiver, NULL
3208  *        for loopback messages where we are both sender and receiver)
3209  * @param message the actual message
3210  * @param latency reported latency of the connection with 'other'
3211  * @param distance reported distance (DV) to 'other' 
3212  * @return GNUNET_OK to keep the connection open,
3213  *         GNUNET_SYSERR to close it (signal serious error)
3214  */
3215 static int
3216 handle_p2p_put (void *cls,
3217                 const struct GNUNET_PeerIdentity *other,
3218                 const struct GNUNET_MessageHeader *message,
3219                 struct GNUNET_TIME_Relative latency,
3220                 uint32_t distance)
3221 {
3222   const struct PutMessage *put;
3223   uint16_t msize;
3224   size_t dsize;
3225   enum GNUNET_BLOCK_Type type;
3226   struct GNUNET_TIME_Absolute expiration;
3227   GNUNET_HashCode query;
3228   struct ProcessReplyClosure prq;
3229   struct GNUNET_TIME_Absolute *start;
3230   struct GNUNET_TIME_Relative block_time;  
3231   double putl;
3232   struct ConnectedPeer *cp; 
3233   struct PendingMessage *pm;
3234   struct MigrationStopMessage *msm;
3235
3236   msize = ntohs (message->size);
3237   if (msize < sizeof (struct PutMessage))
3238     {
3239       GNUNET_break_op(0);
3240       return GNUNET_SYSERR;
3241     }
3242   put = (const struct PutMessage*) message;
3243   dsize = msize - sizeof (struct PutMessage);
3244   type = ntohl (put->type);
3245   expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
3246
3247   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3248     return GNUNET_SYSERR;
3249   if (GNUNET_OK !=
3250       GNUNET_BLOCK_get_key (block_ctx,
3251                             type,
3252                             &put[1],
3253                             dsize,
3254                             &query))
3255     {
3256       GNUNET_break_op (0);
3257       return GNUNET_SYSERR;
3258     }
3259 #if DEBUG_FS
3260   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3261               "Received result for query `%s' from peer `%4s'\n",
3262               GNUNET_h2s (&query),
3263               GNUNET_i2s (other));
3264 #endif
3265   GNUNET_STATISTICS_update (stats,
3266                             gettext_noop ("# replies received (overall)"),
3267                             1,
3268                             GNUNET_NO);
3269   /* now, lookup 'query' */
3270   prq.data = (const void*) &put[1];
3271   if (other != NULL)
3272     prq.sender = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3273                                                     &other->hashPubKey);
3274   else
3275     prq.sender = NULL;
3276   prq.size = dsize;
3277   prq.type = type;
3278   prq.expiration = expiration;
3279   prq.priority = 0;
3280   prq.finished = GNUNET_NO;
3281   prq.request_found = GNUNET_NO;
3282   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3283                                               &query,
3284                                               &process_reply,
3285                                               &prq);
3286   if (prq.sender != NULL)
3287     {
3288       prq.sender->inc_preference += CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority;
3289       prq.sender->trust += prq.priority;
3290     }
3291   if ( (GNUNET_YES == active_migration) &&
3292        (GNUNET_NO == test_put_load_too_high (prq.priority)) )
3293     {      
3294 #if DEBUG_FS
3295       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3296                   "Replicating result for query `%s' with priority %u\n",
3297                   GNUNET_h2s (&query),
3298                   prq.priority);
3299 #endif
3300       start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
3301       *start = GNUNET_TIME_absolute_get ();
3302       GNUNET_DATASTORE_put (dsh,
3303                             0, &query, dsize, &put[1],
3304                             type, prq.priority, 1 /* anonymity */, 
3305                             expiration, 
3306                             1 + prq.priority, MAX_DATASTORE_QUEUE,
3307                             GNUNET_CONSTANTS_SERVICE_TIMEOUT,
3308                             &put_migration_continuation, 
3309                             start);
3310     }
3311   putl = GNUNET_LOAD_get_load (datastore_put_load);
3312   if ( (GNUNET_NO == prq.request_found) &&
3313        ( (GNUNET_YES != active_migration) ||
3314          (putl > 2.0) ) )
3315     {
3316       cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3317                                               &other->hashPubKey);
3318       if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).value < 5000)
3319         return GNUNET_OK; /* already blocked */
3320       /* We're too busy; send MigrationStop message! */
3321       if (GNUNET_YES != active_migration) 
3322         putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
3323       block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
3324                                                   5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3325                                                                                    (unsigned int) (60000 * putl * putl)));
3326       
3327       cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
3328       pm = GNUNET_malloc (sizeof (struct PendingMessage) + 
3329                           sizeof (struct MigrationStopMessage));
3330       pm->msize = sizeof (struct MigrationStopMessage);
3331       pm->priority = UINT32_MAX;
3332       msm = (struct MigrationStopMessage*) &pm[1];
3333       msm->header.size = htons (sizeof (struct MigrationStopMessage));
3334       msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
3335       msm->duration = GNUNET_TIME_relative_hton (block_time);
3336       add_to_pending_messages_for_peer (cp,
3337                                         pm,
3338                                         NULL);
3339     }
3340   return GNUNET_OK;
3341 }
3342
3343
3344 /**
3345  * Handle P2P "MIGRATION_STOP" message.
3346  *
3347  * @param cls closure, always NULL
3348  * @param other the other peer involved (sender or receiver, NULL
3349  *        for loopback messages where we are both sender and receiver)
3350  * @param message the actual message
3351  * @param latency reported latency of the connection with 'other'
3352  * @param distance reported distance (DV) to 'other' 
3353  * @return GNUNET_OK to keep the connection open,
3354  *         GNUNET_SYSERR to close it (signal serious error)
3355  */
3356 static int
3357 handle_p2p_migration_stop (void *cls,
3358                            const struct GNUNET_PeerIdentity *other,
3359                            const struct GNUNET_MessageHeader *message,
3360                            struct GNUNET_TIME_Relative latency,
3361                            uint32_t distance)
3362 {
3363   struct ConnectedPeer *cp; 
3364   const struct MigrationStopMessage *msm;
3365
3366   msm = (const struct MigrationStopMessage*) message;
3367   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3368                                           &other->hashPubKey);
3369   if (cp == NULL)
3370     {
3371       GNUNET_break (0);
3372       return GNUNET_OK;
3373     }
3374   cp->migration_blocked = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration));
3375   return GNUNET_OK;
3376 }
3377
3378
3379
3380 /* **************************** P2P GET Handling ************************ */
3381
3382
3383 /**
3384  * Closure for 'check_duplicate_request_{peer,client}'.
3385  */
3386 struct CheckDuplicateRequestClosure
3387 {
3388   /**
3389    * The new request we should check if it already exists.
3390    */
3391   const struct PendingRequest *pr;
3392
3393   /**
3394    * Existing request found by the checker, NULL if none.
3395    */
3396   struct PendingRequest *have;
3397 };
3398
3399
3400 /**
3401  * Iterator over entries in the 'query_request_map' that
3402  * tries to see if we have the same request pending from
3403  * the same client already.
3404  *
3405  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
3406  * @param key current key code (query, ignored, must match)
3407  * @param value value in the hash map (a 'struct PendingRequest' 
3408  *              that already exists)
3409  * @return GNUNET_YES if we should continue to
3410  *         iterate (no match yet)
3411  *         GNUNET_NO if not (match found).
3412  */
3413 static int
3414 check_duplicate_request_client (void *cls,
3415                                 const GNUNET_HashCode * key,
3416                                 void *value)
3417 {
3418   struct CheckDuplicateRequestClosure *cdc = cls;
3419   struct PendingRequest *have = value;
3420
3421   if (have->client_request_list == NULL)
3422     return GNUNET_YES;
3423   if ( (cdc->pr->client_request_list->client_list->client == have->client_request_list->client_list->client) &&
3424        (cdc->pr != have) )
3425     {
3426       cdc->have = have;
3427       return GNUNET_NO;
3428     }
3429   return GNUNET_YES;
3430 }
3431
3432
3433 /**
3434  * We're processing (local) results for a search request
3435  * from another peer.  Pass applicable results to the
3436  * peer and if we are done either clean up (operation
3437  * complete) or forward to other peers (more results possible).
3438  *
3439  * @param cls our closure (struct LocalGetContext)
3440  * @param key key for the content
3441  * @param size number of bytes in data
3442  * @param data content stored
3443  * @param type type of the content
3444  * @param priority priority of the content
3445  * @param anonymity anonymity-level for the content
3446  * @param expiration expiration time for the content
3447  * @param uid unique identifier for the datum;
3448  *        maybe 0 if no unique identifier is available
3449  */
3450 static void
3451 process_local_reply (void *cls,
3452                      const GNUNET_HashCode * key,
3453                      size_t size,
3454                      const void *data,
3455                      enum GNUNET_BLOCK_Type type,
3456                      uint32_t priority,
3457                      uint32_t anonymity,
3458                      struct GNUNET_TIME_Absolute
3459                      expiration, 
3460                      uint64_t uid)
3461 {
3462   struct PendingRequest *pr = cls;
3463   struct ProcessReplyClosure prq;
3464   struct CheckDuplicateRequestClosure cdrc;
3465   GNUNET_HashCode query;
3466   unsigned int old_rf;
3467   
3468   if (NULL == key)
3469     {
3470 #if DEBUG_FS > 1
3471       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3472                   "Done processing local replies, forwarding request to other peers.\n");
3473 #endif
3474       pr->qe = NULL;
3475       if (pr->client_request_list != NULL)
3476         {
3477           GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
3478                                       GNUNET_YES);
3479           /* Figure out if this is a duplicate request and possibly
3480              merge 'struct PendingRequest' entries */
3481           cdrc.have = NULL;
3482           cdrc.pr = pr;
3483           GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3484                                                       &pr->query,
3485                                                       &check_duplicate_request_client,
3486                                                       &cdrc);
3487           if (cdrc.have != NULL)
3488             {
3489 #if DEBUG_FS
3490               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3491                           "Received request for block `%s' twice from client, will only request once.\n",
3492                           GNUNET_h2s (&pr->query));
3493 #endif
3494               
3495               destroy_pending_request (pr);
3496               return;
3497             }
3498         }
3499
3500       /* no more results */
3501       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
3502         pr->task = GNUNET_SCHEDULER_add_now (sched,
3503                                              &forward_request_task,
3504                                              pr);      
3505       return;
3506     }
3507 #if DEBUG_FS
3508   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3509               "New local response to `%s' of type %u.\n",
3510               GNUNET_h2s (key),
3511               type);
3512 #endif
3513   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3514     {
3515 #if DEBUG_FS
3516       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3517                   "Found ONDEMAND block, performing on-demand encoding\n");
3518 #endif
3519       GNUNET_STATISTICS_update (stats,
3520                                 gettext_noop ("# on-demand blocks matched requests"),
3521                                 1,
3522                                 GNUNET_NO);
3523       if (GNUNET_OK != 
3524           GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, 
3525                                             anonymity, expiration, uid, 
3526                                             &process_local_reply,
3527                                             pr))
3528       if (pr->qe != NULL)
3529         {
3530           GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3531         }
3532       return;
3533     }
3534   old_rf = pr->results_found;
3535   memset (&prq, 0, sizeof (prq));
3536   prq.data = data;
3537   prq.expiration = expiration;
3538   prq.size = size;  
3539   if (GNUNET_OK != 
3540       GNUNET_BLOCK_get_key (block_ctx,
3541                             type,
3542                             data,
3543                             size,
3544                             &query))
3545     {
3546       GNUNET_break (0);
3547       GNUNET_DATASTORE_remove (dsh,
3548                                key,
3549                                size, data,
3550                                -1, -1, 
3551                                GNUNET_TIME_UNIT_FOREVER_REL,
3552                                NULL, NULL);
3553       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3554       return;
3555     }
3556   prq.type = type;
3557   prq.priority = priority;  
3558   prq.finished = GNUNET_NO;
3559   prq.request_found = GNUNET_NO;
3560   process_reply (&prq, key, pr);
3561   if ( (old_rf == 0) &&
3562        (pr->results_found == 1) )
3563     update_datastore_delays (pr->start_time);
3564   if (prq.finished == GNUNET_YES)
3565     return;
3566   if (pr->qe == NULL)
3567     return; /* done here */
3568   if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
3569     {
3570       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3571       return;
3572     }
3573   if ( (pr->client_request_list == NULL) &&
3574        ( (GNUNET_YES == test_get_load_too_high (0)) ||
3575          (pr->results_found > 5 + 2 * pr->priority) ) )
3576     {
3577 #if DEBUG_FS > 2
3578       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3579                   "Load too high, done with request\n");
3580 #endif
3581       GNUNET_STATISTICS_update (stats,
3582                                 gettext_noop ("# processing result set cut short due to load"),
3583                                 1,
3584                                 GNUNET_NO);
3585       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3586       return;
3587     }
3588   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3589 }
3590
3591
3592 /**
3593  * We've received a request with the specified priority.  Bound it
3594  * according to how much we trust the given peer.
3595  * 
3596  * @param prio_in requested priority
3597  * @param cp the peer making the request
3598  * @return effective priority
3599  */
3600 static int32_t
3601 bound_priority (uint32_t prio_in,
3602                 struct ConnectedPeer *cp)
3603 {
3604 #define N ((double)128.0)
3605   uint32_t ret;
3606   double rret;
3607   int ld;
3608
3609   ld = test_get_load_too_high (0);
3610   if (ld == GNUNET_SYSERR)
3611     return 0; /* excess resources */
3612   ret = change_host_trust (cp, prio_in);
3613   if (ret > 0)
3614     {
3615       if (ret > current_priorities + N)
3616         rret = current_priorities + N;
3617       else
3618         rret = ret;
3619       current_priorities 
3620         = (current_priorities * (N-1) + rret)/N;
3621     }
3622   if ( (ld == GNUNET_YES) && (ret > 0) )
3623     {
3624       /* try with charging */
3625       ld = test_get_load_too_high (ret);
3626     }
3627   if (ld == GNUNET_YES)
3628     {
3629       /* undo charge */
3630       if (ret != 0)
3631         change_host_trust (cp, -ret);
3632       return -1; /* not enough resources */
3633     }
3634 #undef N
3635   return ret;
3636 }
3637
3638
3639 /**
3640  * Iterator over entries in the 'query_request_map' that
3641  * tries to see if we have the same request pending from
3642  * the same peer already.
3643  *
3644  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
3645  * @param key current key code (query, ignored, must match)
3646  * @param value value in the hash map (a 'struct PendingRequest' 
3647  *              that already exists)
3648  * @return GNUNET_YES if we should continue to
3649  *         iterate (no match yet)
3650  *         GNUNET_NO if not (match found).
3651  */
3652 static int
3653 check_duplicate_request_peer (void *cls,
3654                               const GNUNET_HashCode * key,
3655                               void *value)
3656 {
3657   struct CheckDuplicateRequestClosure *cdc = cls;
3658   struct PendingRequest *have = value;
3659
3660   if (cdc->pr->target_pid == have->target_pid)
3661     {
3662       cdc->have = have;
3663       return GNUNET_NO;
3664     }
3665   return GNUNET_YES;
3666 }
3667
3668
3669 /**
3670  * Handle P2P "GET" request.
3671  *
3672  * @param cls closure, always NULL
3673  * @param other the other peer involved (sender or receiver, NULL
3674  *        for loopback messages where we are both sender and receiver)
3675  * @param message the actual message
3676  * @param latency reported latency of the connection with 'other'
3677  * @param distance reported distance (DV) to 'other' 
3678  * @return GNUNET_OK to keep the connection open,
3679  *         GNUNET_SYSERR to close it (signal serious error)
3680  */
3681 static int
3682 handle_p2p_get (void *cls,
3683                 const struct GNUNET_PeerIdentity *other,
3684                 const struct GNUNET_MessageHeader *message,
3685                 struct GNUNET_TIME_Relative latency,
3686                 uint32_t distance)
3687 {
3688   struct PendingRequest *pr;
3689   struct ConnectedPeer *cp;
3690   struct ConnectedPeer *cps;
3691   struct CheckDuplicateRequestClosure cdc;
3692   struct GNUNET_TIME_Relative timeout;
3693   uint16_t msize;
3694   const struct GetMessage *gm;
3695   unsigned int bits;
3696   const GNUNET_HashCode *opt;
3697   uint32_t bm;
3698   size_t bfsize;
3699   uint32_t ttl_decrement;
3700   int32_t priority;
3701   enum GNUNET_BLOCK_Type type;
3702   int have_ns;
3703
3704   msize = ntohs(message->size);
3705   if (msize < sizeof (struct GetMessage))
3706     {
3707       GNUNET_break_op (0);
3708       return GNUNET_SYSERR;
3709     }
3710   gm = (const struct GetMessage*) message;
3711   type = ntohl (gm->type);
3712   bm = ntohl (gm->hash_bitmap);
3713   bits = 0;
3714   while (bm > 0)
3715     {
3716       if (1 == (bm & 1))
3717         bits++;
3718       bm >>= 1;
3719     }
3720   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
3721     {
3722       GNUNET_break_op (0);
3723       return GNUNET_SYSERR;
3724     }  
3725   opt = (const GNUNET_HashCode*) &gm[1];
3726   bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
3727   bm = ntohl (gm->hash_bitmap);
3728   bits = 0;
3729   cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3730                                            &other->hashPubKey);
3731   if (NULL == cps)
3732     {
3733       /* peer must have just disconnected */
3734       GNUNET_STATISTICS_update (stats,
3735                                 gettext_noop ("# requests dropped due to initiator not being connected"),
3736                                 1,
3737                                 GNUNET_NO);
3738       return GNUNET_SYSERR;
3739     }
3740   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
3741     cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3742                                             &opt[bits++]);
3743   else
3744     cp = cps;
3745   if (cp == NULL)
3746     {
3747 #if DEBUG_FS
3748       if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
3749         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3750                     "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
3751                     GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
3752       
3753       else
3754         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3755                     "Failed to find peer `%4s' in connection set. Dropping query.\n",
3756                     GNUNET_i2s (other));
3757 #endif
3758       GNUNET_STATISTICS_update (stats,
3759                                 gettext_noop ("# requests dropped due to missing reverse route"),
3760                                 1,
3761                                 GNUNET_NO);
3762      /* FIXME: try connect? */
3763       return GNUNET_OK;
3764     }
3765   /* note that we can really only check load here since otherwise
3766      peers could find out that we are overloaded by not being
3767      disconnected after sending us a malformed query... */
3768   priority = bound_priority (ntohl (gm->priority), cps);
3769   if (priority < 0)
3770     {
3771 #if DEBUG_FS
3772       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3773                   "Dropping query from `%s', this peer is too busy.\n",
3774                   GNUNET_i2s (other));
3775 #endif
3776       GNUNET_STATISTICS_update (stats,
3777                                 gettext_noop ("# requests dropped due to high load"),
3778                                 1,
3779                                 GNUNET_NO);
3780       return GNUNET_OK;
3781     }
3782 #if DEBUG_FS 
3783   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3784               "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
3785               GNUNET_h2s (&gm->query),
3786               (unsigned int) type,
3787               GNUNET_i2s (other),
3788               (unsigned int) bm);
3789 #endif
3790   have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
3791   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
3792                       (have_ns ? sizeof(GNUNET_HashCode) : 0));
3793   if (have_ns)
3794     {
3795       pr->namespace = (GNUNET_HashCode*) &pr[1];
3796       memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
3797     }
3798   if ( (GNUNET_LOAD_get_load (cp->transmission_delay) > 3) ||
3799        (GNUNET_LOAD_get_average (cp->transmission_delay) > 
3800         GNUNET_CONSTANTS_MAX_CORK_DELAY.value * 2 + GNUNET_LOAD_get_average (rt_entry_lifetime)) )
3801     {
3802       /* don't have BW to send to peer, or would likely take longer than we have for it,
3803          so at best indirect the query */
3804       priority = 0;
3805       pr->forward_only = GNUNET_YES;
3806     }
3807   pr->type = type;
3808   pr->mingle = ntohl (gm->filter_mutator);
3809   if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
3810     pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
3811   pr->anonymity_level = 1;
3812   pr->priority = (uint32_t) priority;
3813   pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
3814   pr->query = gm->query;
3815   /* decrement ttl (always) */
3816   ttl_decrement = 2 * TTL_DECREMENT +
3817     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3818                               TTL_DECREMENT);
3819   if ( (pr->ttl < 0) &&
3820        (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
3821     {
3822 #if DEBUG_FS
3823       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3824                   "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
3825                   GNUNET_i2s (other),
3826                   pr->ttl,
3827                   ttl_decrement);
3828 #endif
3829       GNUNET_STATISTICS_update (stats,
3830                                 gettext_noop ("# requests dropped due TTL underflow"),
3831                                 1,
3832                                 GNUNET_NO);
3833       /* integer underflow => drop (should be very rare)! */      
3834       GNUNET_free (pr);
3835       return GNUNET_OK;
3836     } 
3837   pr->ttl -= ttl_decrement;
3838   pr->start_time = GNUNET_TIME_absolute_get ();
3839
3840   /* get bloom filter */
3841   if (bfsize > 0)
3842     {
3843       pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
3844                                                   bfsize,
3845                                                   BLOOMFILTER_K);
3846       pr->bf_size = bfsize;
3847     }
3848
3849   cdc.have = NULL;
3850   cdc.pr = pr;
3851   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3852                                               &gm->query,
3853                                               &check_duplicate_request_peer,
3854                                               &cdc);
3855   if (cdc.have != NULL)
3856     {
3857       if (cdc.have->start_time.value + cdc.have->ttl >=
3858           pr->start_time.value + pr->ttl)
3859         {
3860           /* existing request has higher TTL, drop new one! */
3861           cdc.have->priority += pr->priority;
3862           destroy_pending_request (pr);
3863 #if DEBUG_FS
3864           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3865                       "Have existing request with higher TTL, dropping new request.\n",
3866                       GNUNET_i2s (other));
3867 #endif
3868           GNUNET_STATISTICS_update (stats,
3869                                     gettext_noop ("# requests dropped due to higher-TTL request"),
3870                                     1,
3871                                     GNUNET_NO);
3872           return GNUNET_OK;
3873         }
3874       else
3875         {
3876           /* existing request has lower TTL, drop old one! */
3877           pr->priority += cdc.have->priority;
3878           /* Possible optimization: if we have applicable pending
3879              replies in 'cdc.have', we might want to move those over
3880              (this is a really rare special-case, so it is not clear
3881              that this would be worth it) */
3882           destroy_pending_request (cdc.have);
3883           /* keep processing 'pr'! */
3884         }
3885     }
3886
3887   pr->cp = cp;
3888   GNUNET_break (GNUNET_OK ==
3889                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
3890                                                    &gm->query,
3891                                                    pr,
3892                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
3893   GNUNET_break (GNUNET_OK ==
3894                 GNUNET_CONTAINER_multihashmap_put (peer_request_map,
3895                                                    &other->hashPubKey,
3896                                                    pr,
3897                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
3898   
3899   pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
3900                                             pr,
3901                                             pr->start_time.value + pr->ttl);
3902
3903   GNUNET_STATISTICS_update (stats,
3904                             gettext_noop ("# P2P searches received"),
3905                             1,
3906                             GNUNET_NO);
3907   GNUNET_STATISTICS_update (stats,
3908                             gettext_noop ("# P2P searches active"),
3909                             1,
3910                             GNUNET_NO);
3911
3912   /* calculate change in traffic preference */
3913   cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
3914   /* process locally */
3915   if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
3916     type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
3917   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
3918                                            (pr->priority + 1)); 
3919   if (GNUNET_YES != pr->forward_only)
3920     pr->qe = GNUNET_DATASTORE_get (dsh,
3921                                    &gm->query,
3922                                    type,                               
3923                                    pr->priority + 1,
3924                                    MAX_DATASTORE_QUEUE,                          
3925                                    timeout,
3926                                    &process_local_reply,
3927                                    pr);
3928   else
3929     GNUNET_STATISTICS_update (stats,
3930                               gettext_noop ("# requests forwarded due to high load"),
3931                               1,
3932                               GNUNET_NO);
3933
3934   /* Are multiple results possible (and did we look locally)?  If so, start processing remotely now! */
3935   switch (pr->type)
3936     {
3937     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
3938     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
3939       /* only one result, wait for datastore */
3940       if (GNUNET_YES != pr->forward_only)
3941         break;
3942     default:
3943       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
3944         pr->task = GNUNET_SCHEDULER_add_now (sched,
3945                                              &forward_request_task,
3946                                              pr);
3947     }
3948
3949   /* make sure we don't track too many requests */
3950   if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
3951     {
3952       pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
3953       GNUNET_assert (pr != NULL);
3954       destroy_pending_request (pr);
3955     }
3956   return GNUNET_OK;
3957 }
3958
3959
3960 /* **************************** CS GET Handling ************************ */
3961
3962
3963 /**
3964  * Handle START_SEARCH-message (search request from client).
3965  *
3966  * @param cls closure
3967  * @param client identification of the client
3968  * @param message the actual message
3969  */
3970 static void
3971 handle_start_search (void *cls,
3972                      struct GNUNET_SERVER_Client *client,
3973                      const struct GNUNET_MessageHeader *message)
3974 {
3975   static GNUNET_HashCode all_zeros;
3976   const struct SearchMessage *sm;
3977   struct ClientList *cl;
3978   struct ClientRequestList *crl;
3979   struct PendingRequest *pr;
3980   uint16_t msize;
3981   unsigned int sc;
3982   enum GNUNET_BLOCK_Type type;
3983
3984   msize = ntohs (message->size);
3985   if ( (msize < sizeof (struct SearchMessage)) ||
3986        (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
3987     {
3988       GNUNET_break (0);
3989       GNUNET_SERVER_receive_done (client,
3990                                   GNUNET_SYSERR);
3991       return;
3992     }
3993   GNUNET_STATISTICS_update (stats,
3994                             gettext_noop ("# client searches received"),
3995                             1,
3996                             GNUNET_NO);
3997   sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
3998   sm = (const struct SearchMessage*) message;
3999   type = ntohl (sm->type);
4000 #if DEBUG_FS
4001   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4002               "Received request for `%s' of type %u from local client\n",
4003               GNUNET_h2s (&sm->query),
4004               (unsigned int) type);
4005 #endif
4006   cl = client_list;
4007   while ( (cl != NULL) &&
4008           (cl->client != client) )
4009     cl = cl->next;
4010   if (cl == NULL)
4011     {
4012       cl = GNUNET_malloc (sizeof (struct ClientList));
4013       cl->client = client;
4014       GNUNET_SERVER_client_keep (client);
4015       cl->next = client_list;
4016       client_list = cl;
4017     }
4018   /* detect duplicate KBLOCK requests */
4019   if ( (type == GNUNET_BLOCK_TYPE_FS_KBLOCK) ||
4020        (type == GNUNET_BLOCK_TYPE_FS_NBLOCK) ||
4021        (type == GNUNET_BLOCK_TYPE_ANY) )
4022     {
4023       crl = cl->rl_head;
4024       while ( (crl != NULL) &&
4025               ( (0 != memcmp (&crl->req->query,
4026                               &sm->query,
4027                               sizeof (GNUNET_HashCode))) ||
4028                 (crl->req->type != type) ) )
4029         crl = crl->next;
4030       if (crl != NULL)  
4031         { 
4032 #if DEBUG_FS
4033           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4034                       "Have existing request, merging content-seen lists.\n");
4035 #endif
4036           pr = crl->req;
4037           /* Duplicate request (used to send long list of
4038              known/blocked results); merge 'pr->replies_seen'
4039              and update bloom filter */
4040           GNUNET_array_grow (pr->replies_seen,
4041                              pr->replies_seen_size,
4042                              pr->replies_seen_off + sc);
4043           memcpy (&pr->replies_seen[pr->replies_seen_off],
4044                   &sm[1],
4045                   sc * sizeof (GNUNET_HashCode));
4046           pr->replies_seen_off += sc;
4047           refresh_bloomfilter (pr);
4048           GNUNET_STATISTICS_update (stats,
4049                                     gettext_noop ("# client searches updated (merged content seen list)"),
4050                                     1,
4051                                     GNUNET_NO);
4052           GNUNET_SERVER_receive_done (client,
4053                                       GNUNET_OK);
4054           return;
4055         }
4056     }
4057   GNUNET_STATISTICS_update (stats,
4058                             gettext_noop ("# client searches active"),
4059                             1,
4060                             GNUNET_NO);
4061   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
4062                       ((type == GNUNET_BLOCK_TYPE_FS_SBLOCK) ? sizeof(GNUNET_HashCode) : 0));
4063   crl = GNUNET_malloc (sizeof (struct ClientRequestList));
4064   memset (crl, 0, sizeof (struct ClientRequestList));
4065   crl->client_list = cl;
4066   GNUNET_CONTAINER_DLL_insert (cl->rl_head,
4067                                cl->rl_tail,
4068                                crl);  
4069   crl->req = pr;
4070   pr->type = type;
4071   pr->client_request_list = crl;
4072   GNUNET_array_grow (pr->replies_seen,
4073                      pr->replies_seen_size,
4074                      sc);
4075   memcpy (pr->replies_seen,
4076           &sm[1],
4077           sc * sizeof (GNUNET_HashCode));
4078   pr->replies_seen_off = sc;
4079   pr->anonymity_level = ntohl (sm->anonymity_level); 
4080   pr->start_time = GNUNET_TIME_absolute_get ();
4081   refresh_bloomfilter (pr);
4082   pr->query = sm->query;
4083   if (0 == (1 & ntohl (sm->options)))
4084     pr->local_only = GNUNET_NO;
4085   else
4086     pr->local_only = GNUNET_YES;
4087   switch (type)
4088     {
4089     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
4090     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
4091       if (0 != memcmp (&sm->target,
4092                        &all_zeros,
4093                        sizeof (GNUNET_HashCode)))
4094         pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target);
4095       break;
4096     case GNUNET_BLOCK_TYPE_FS_SBLOCK:
4097       pr->namespace = (GNUNET_HashCode*) &pr[1];
4098       memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode));
4099       break;
4100     default:
4101       break;
4102     }
4103   GNUNET_break (GNUNET_OK ==
4104                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
4105                                                    &sm->query,
4106                                                    pr,
4107                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4108   if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
4109     type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */
4110   pr->qe = GNUNET_DATASTORE_get (dsh,
4111                                  &sm->query,
4112                                  type,
4113                                  -3, -1,
4114                                  GNUNET_CONSTANTS_SERVICE_TIMEOUT,                             
4115                                  &process_local_reply,
4116                                  pr);
4117 }
4118
4119
4120 /* **************************** Startup ************************ */
4121
4122 /**
4123  * Process fs requests.
4124  *
4125  * @param s scheduler to use
4126  * @param server the initialized server
4127  * @param c configuration to use
4128  */
4129 static int
4130 main_init (struct GNUNET_SCHEDULER_Handle *s,
4131            struct GNUNET_SERVER_Handle *server,
4132            const struct GNUNET_CONFIGURATION_Handle *c)
4133 {
4134   static const struct GNUNET_CORE_MessageHandler p2p_handlers[] =
4135     {
4136       { &handle_p2p_get, 
4137         GNUNET_MESSAGE_TYPE_FS_GET, 0 },
4138       { &handle_p2p_put, 
4139         GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
4140       { &handle_p2p_migration_stop, 
4141         GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP,
4142         sizeof (struct MigrationStopMessage) },
4143       { NULL, 0, 0 }
4144     };
4145   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
4146     {&GNUNET_FS_handle_index_start, NULL, 
4147      GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
4148     {&GNUNET_FS_handle_index_list_get, NULL, 
4149      GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
4150     {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
4151      sizeof (struct UnindexMessage) },
4152     {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
4153      0 },
4154     {NULL, NULL, 0, 0}
4155   };
4156   unsigned long long enc = 128;
4157
4158   sched = s;
4159   cfg = c;
4160   stats = GNUNET_STATISTICS_create (sched, "fs", cfg);
4161   min_migration_delay = GNUNET_TIME_UNIT_SECONDS;
4162   if ( (GNUNET_OK !=
4163         GNUNET_CONFIGURATION_get_value_number (cfg,
4164                                                "fs",
4165                                                "MAX_PENDING_REQUESTS",
4166                                                &max_pending_requests)) ||
4167        (GNUNET_OK !=
4168         GNUNET_CONFIGURATION_get_value_number (cfg,
4169                                                "fs",
4170                                                "EXPECTED_NEIGHBOUR_COUNT",
4171                                                &enc)) ||
4172        (GNUNET_OK != 
4173         GNUNET_CONFIGURATION_get_value_time (cfg,
4174                                              "fs",
4175                                              "MIN_MIGRATION_DELAY",
4176                                              &min_migration_delay)) )
4177     {
4178       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4179                   _("Configuration fails to specify certain parameters, assuming default values."));
4180     }
4181   connected_peers = GNUNET_CONTAINER_multihashmap_create (enc); 
4182   query_request_map = GNUNET_CONTAINER_multihashmap_create (max_pending_requests);
4183   rt_entry_lifetime = GNUNET_LOAD_value_init ();
4184   peer_request_map = GNUNET_CONTAINER_multihashmap_create (enc);
4185   requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
4186   core = GNUNET_CORE_connect (sched,
4187                               cfg,
4188                               GNUNET_TIME_UNIT_FOREVER_REL,
4189                               NULL,
4190                               NULL,
4191                               &peer_connect_handler,
4192                               &peer_disconnect_handler,
4193                               NULL,
4194                               NULL, GNUNET_NO,
4195                               NULL, GNUNET_NO,
4196                               p2p_handlers);
4197   if (NULL == core)
4198     {
4199       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
4200                   _("Failed to connect to `%s' service.\n"),
4201                   "core");
4202       GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
4203       connected_peers = NULL;
4204       GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
4205       query_request_map = NULL;
4206       GNUNET_LOAD_value_free (rt_entry_lifetime);
4207       rt_entry_lifetime = NULL;
4208       GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
4209       requests_by_expiration_heap = NULL;
4210       GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
4211       peer_request_map = NULL;
4212       if (dsh != NULL)
4213         {
4214           GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4215           dsh = NULL;
4216         }
4217       return GNUNET_SYSERR;
4218     }
4219   /* FIXME: distinguish between sending and storing in options? */
4220   if (active_migration) 
4221     {
4222       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4223                   _("Content migration is enabled, will start to gather data\n"));
4224       consider_migration_gathering ();
4225     }
4226   consider_dht_put_gathering (NULL);
4227   GNUNET_SERVER_disconnect_notify (server, 
4228                                    &handle_client_disconnect,
4229                                    NULL);
4230   GNUNET_assert (GNUNET_OK ==
4231                  GNUNET_CONFIGURATION_get_value_filename (cfg,
4232                                                           "fs",
4233                                                           "TRUST",
4234                                                           &trustDirectory));
4235   GNUNET_DISK_directory_create (trustDirectory);
4236   GNUNET_SCHEDULER_add_with_priority (sched,
4237                                       GNUNET_SCHEDULER_PRIORITY_HIGH,
4238                                       &cron_flush_trust, NULL);
4239
4240
4241   GNUNET_SERVER_add_handlers (server, handlers);
4242   GNUNET_SCHEDULER_add_delayed (sched,
4243                                 GNUNET_TIME_UNIT_FOREVER_REL,
4244                                 &shutdown_task,
4245                                 NULL);
4246   return GNUNET_OK;
4247 }
4248
4249
4250 /**
4251  * Process fs requests.
4252  *
4253  * @param cls closure
4254  * @param sched scheduler to use
4255  * @param server the initialized server
4256  * @param cfg configuration to use
4257  */
4258 static void
4259 run (void *cls,
4260      struct GNUNET_SCHEDULER_Handle *sched,
4261      struct GNUNET_SERVER_Handle *server,
4262      const struct GNUNET_CONFIGURATION_Handle *cfg)
4263 {
4264   active_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
4265                                                            "FS",
4266                                                            "ACTIVEMIGRATION");
4267   dsh = GNUNET_DATASTORE_connect (cfg,
4268                                   sched);
4269   if (dsh == NULL)
4270     {
4271       GNUNET_SCHEDULER_shutdown (sched);
4272       return;
4273     }
4274   datastore_get_load = GNUNET_LOAD_value_init ();
4275   datastore_put_load = GNUNET_LOAD_value_init ();
4276   block_cfg = GNUNET_CONFIGURATION_create ();
4277   GNUNET_CONFIGURATION_set_value_string (block_cfg,
4278                                          "block",
4279                                          "PLUGINS",
4280                                          "fs");
4281   block_ctx = GNUNET_BLOCK_context_create (block_cfg);
4282   GNUNET_assert (NULL != block_ctx);
4283   dht_handle = GNUNET_DHT_connect (sched,
4284                                    cfg,
4285                                    FS_DHT_HT_SIZE);
4286   if ( (GNUNET_OK != GNUNET_FS_indexing_init (sched, cfg, dsh)) ||
4287        (GNUNET_OK != main_init (sched, server, cfg)) )
4288     {    
4289       GNUNET_SCHEDULER_shutdown (sched);
4290       GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4291       dsh = NULL;
4292       GNUNET_DHT_disconnect (dht_handle);
4293       dht_handle = NULL;
4294       GNUNET_BLOCK_context_destroy (block_ctx);
4295       block_ctx = NULL;
4296       GNUNET_CONFIGURATION_destroy (block_cfg);
4297       block_cfg = NULL;
4298       GNUNET_LOAD_value_free (datastore_get_load);
4299       datastore_get_load = NULL;
4300       GNUNET_LOAD_value_free (datastore_put_load);
4301       datastore_put_load = NULL;
4302       return;   
4303     }
4304 }
4305
4306
4307 /**
4308  * The main function for the fs service.
4309  *
4310  * @param argc number of arguments from the command line
4311  * @param argv command line arguments
4312  * @return 0 ok, 1 on error
4313  */
4314 int
4315 main (int argc, char *const *argv)
4316 {
4317   return (GNUNET_OK ==
4318           GNUNET_SERVICE_run (argc,
4319                               argv,
4320                               "fs",
4321                               GNUNET_SERVICE_OPTION_NONE,
4322                               &run, NULL)) ? 0 : 1;
4323 }
4324
4325 /* end of gnunet-service-fs.c */