5118cb56cd61ba8c5dcbb9c9c8604cdbef191b9d
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs.c
23  * @brief gnunet anonymity protocol implementation
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - introduce random latency in processing
28  * - more statistics
29  */
30 #include "platform.h"
31 #include <float.h>
32 #include "gnunet_constants.h"
33 #include "gnunet_core_service.h"
34 #include "gnunet_dht_service.h"
35 #include "gnunet_datastore_service.h"
36 #include "gnunet_load_lib.h"
37 #include "gnunet_peer_lib.h"
38 #include "gnunet_protocols.h"
39 #include "gnunet_signatures.h"
40 #include "gnunet_statistics_service.h"
41 #include "gnunet_util_lib.h"
42 #include "gnunet-service-fs_indexing.h"
43 #include "fs.h"
44
45 #define DEBUG_FS GNUNET_NO
46
47 /**
48  * Should we introduce random latency in processing?  Required for proper
49  * implementation of GAP, but can be disabled for performance evaluation of
50  * the basic routing algorithm.
51  */
52 #define SUPPORT_DELAYS GNUNET_NO
53
54 /**
55  * Maximum number of outgoing messages we queue per peer.
56  */
57 #define MAX_QUEUE_PER_PEER 16
58
59 /**
60  * Size for the hash map for DHT requests from the FS
61  * service.  Should be about the number of concurrent
62  * DHT requests we plan to make.
63  */
64 #define FS_DHT_HT_SIZE 1024
65
66 /**
67  * How often do we flush trust values to disk?
68  */
69 #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
70
71 /**
72  * How often do we at most PUT content into the DHT?
73  */
74 #define MAX_DHT_PUT_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
75
76 /**
77  * Inverse of the probability that we will submit the same query
78  * to the same peer again.  If the same peer already got the query
79  * repeatedly recently, the probability is multiplied by the inverse
80  * of this number each time.  Note that we only try about every TTL_DECREMENT/2
81  * plus MAX_CORK_DELAY (so roughly every 3.5s).
82  */
83 #define RETRY_PROBABILITY_INV 3
84
85 /**
86  * What is the maximum delay for a P2P FS message (in our interaction
87  * with core)?  FS-internal delays are another story.  The value is
88  * chosen based on the 32k block size.  Given that peers typcially
89  * have at least 1 kb/s bandwidth, 45s waits give us a chance to
90  * transmit one message even to the lowest-bandwidth peers.
91  */
92 #define MAX_TRANSMIT_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 45)
93
94 /**
95  * Maximum number of requests (from other peers) that we're
96  * willing to have pending at any given point in time.
97  */
98 static unsigned long long max_pending_requests = (32 * 1024);
99
100
101 /**
102  * Information we keep for each pending reply.  The
103  * actual message follows at the end of this struct.
104  */
105 struct PendingMessage;
106
107 /**
108  * Function called upon completion of a transmission.
109  *
110  * @param cls closure
111  * @param pid ID of receiving peer, 0 on transmission error
112  */
113 typedef void (*TransmissionContinuation)(void * cls, 
114                                          GNUNET_PEER_Id tpid);
115
116
117 /**
118  * Information we keep for each pending message (GET/PUT).  The
119  * actual message follows at the end of this struct.
120  */
121 struct PendingMessage
122 {
123   /**
124    * This is a doubly-linked list of messages to the same peer.
125    */
126   struct PendingMessage *next;
127
128   /**
129    * This is a doubly-linked list of messages to the same peer.
130    */
131   struct PendingMessage *prev;
132
133   /**
134    * Entry in pending message list for this pending message.
135    */ 
136   struct PendingMessageList *pml;  
137
138   /**
139    * Function to call immediately once we have transmitted this
140    * message.
141    */
142   TransmissionContinuation cont;
143
144   /**
145    * Closure for cont.
146    */
147   void *cont_cls;
148
149   /**
150    * Do not transmit this pending message until this deadline.
151    */
152   struct GNUNET_TIME_Absolute delay_until;
153
154   /**
155    * Size of the reply; actual reply message follows
156    * at the end of this struct.
157    */
158   size_t msize;
159   
160   /**
161    * How important is this message for us?
162    */
163   uint32_t priority;
164  
165 };
166
167
168 /**
169  * Information about a peer that we are connected to.
170  * We track data that is useful for determining which
171  * peers should receive our requests.  We also keep
172  * a list of messages to transmit to this peer.
173  */
174 struct ConnectedPeer
175 {
176
177   /**
178    * List of the last clients for which this peer successfully
179    * answered a query.
180    */
181   struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
182
183   /**
184    * List of the last PIDs for which
185    * this peer successfully answered a query;
186    * We use 0 to indicate no successful reply.
187    */
188   GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
189
190   /**
191    * Average delay between sending the peer a request and
192    * getting a reply (only calculated over the requests for
193    * which we actually got a reply).   Calculated
194    * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
195    */ 
196   struct GNUNET_TIME_Relative avg_delay;
197
198   /**
199    * Point in time until which this peer does not want us to migrate content
200    * to it.
201    */
202   struct GNUNET_TIME_Absolute migration_blocked;
203
204   /**
205    * Time until when we blocked this peer from migrating
206    * data to us.
207    */
208   struct GNUNET_TIME_Absolute last_migration_block;
209
210   /**
211    * Handle for an active request for transmission to this
212    * peer, or NULL.
213    */
214   struct GNUNET_CORE_TransmitHandle *cth;
215
216   /**
217    * Messages (replies, queries, content migration) we would like to
218    * send to this peer in the near future.  Sorted by priority, head.
219    */
220   struct PendingMessage *pending_messages_head;
221
222   /**
223    * Messages (replies, queries, content migration) we would like to
224    * send to this peer in the near future.  Sorted by priority, tail.
225    */
226   struct PendingMessage *pending_messages_tail;
227
228   /**
229    * How long does it typically take for us to transmit a message
230    * to this peer?  (delay between the request being issued and
231    * the callback being invoked).
232    */
233   struct GNUNET_LOAD_Value *transmission_delay;
234
235   /**
236    * Time when the last transmission request was issued.
237    */
238   struct GNUNET_TIME_Absolute last_transmission_request_start;
239
240   /**
241    * ID of delay task for scheduling transmission.
242    */
243   GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task;
244
245   /**
246    * Average priority of successful replies.  Calculated
247    * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
248    */
249   double avg_priority;
250
251   /**
252    * Increase in traffic preference still to be submitted
253    * to the core service for this peer.
254    */
255   uint64_t inc_preference;
256
257   /**
258    * Trust rating for this peer
259    */
260   uint32_t trust;
261
262   /**
263    * Trust rating for this peer on disk.
264    */
265   uint32_t disk_trust;
266
267   /**
268    * The peer's identity.
269    */
270   GNUNET_PEER_Id pid;  
271
272   /**
273    * Size of the linked list of 'pending_messages'.
274    */
275   unsigned int pending_requests;
276
277   /**
278    * Which offset in "last_p2p_replies" will be updated next?
279    * (we go round-robin).
280    */
281   unsigned int last_p2p_replies_woff;
282
283   /**
284    * Which offset in "last_client_replies" will be updated next?
285    * (we go round-robin).
286    */
287   unsigned int last_client_replies_woff;
288
289 };
290
291
292 /**
293  * Information we keep for each pending request.  We should try to
294  * keep this struct as small as possible since its memory consumption
295  * is key to how many requests we can have pending at once.
296  */
297 struct PendingRequest;
298
299
300 /**
301  * Doubly-linked list of requests we are performing
302  * on behalf of the same client.
303  */
304 struct ClientRequestList
305 {
306
307   /**
308    * This is a doubly-linked list.
309    */
310   struct ClientRequestList *next;
311
312   /**
313    * This is a doubly-linked list.
314    */
315   struct ClientRequestList *prev;
316
317   /**
318    * Request this entry represents.
319    */
320   struct PendingRequest *req;
321
322   /**
323    * Client list this request belongs to.
324    */
325   struct ClientList *client_list;
326
327 };
328
329
330 /**
331  * Replies to be transmitted to the client.  The actual
332  * response message is allocated after this struct.
333  */
334 struct ClientResponseMessage
335 {
336   /**
337    * This is a doubly-linked list.
338    */
339   struct ClientResponseMessage *next;
340
341   /**
342    * This is a doubly-linked list.
343    */
344   struct ClientResponseMessage *prev;
345
346   /**
347    * Client list entry this response belongs to.
348    */
349   struct ClientList *client_list;
350
351   /**
352    * Number of bytes in the response.
353    */
354   size_t msize;
355 };
356
357
358 /**
359  * Linked list of clients we are performing requests
360  * for right now.
361  */
362 struct ClientList
363 {
364   /**
365    * This is a linked list.
366    */
367   struct ClientList *next;
368
369   /**
370    * ID of a client making a request, NULL if this entry is for a
371    * peer.
372    */
373   struct GNUNET_SERVER_Client *client;
374
375   /**
376    * Head of list of requests performed on behalf
377    * of this client right now.
378    */
379   struct ClientRequestList *rl_head;
380
381   /**
382    * Tail of list of requests performed on behalf
383    * of this client right now.
384    */
385   struct ClientRequestList *rl_tail;
386
387   /**
388    * Head of linked list of responses.
389    */
390   struct ClientResponseMessage *res_head;
391
392   /**
393    * Tail of linked list of responses.
394    */
395   struct ClientResponseMessage *res_tail;
396
397   /**
398    * Context for sending replies.
399    */
400   struct GNUNET_CONNECTION_TransmitHandle *th;
401
402 };
403
404
405 /**
406  * Doubly-linked list of messages we are performing
407  * due to a pending request.
408  */
409 struct PendingMessageList
410 {
411
412   /**
413    * This is a doubly-linked list of messages on behalf of the same request.
414    */
415   struct PendingMessageList *next;
416
417   /**
418    * This is a doubly-linked list of messages on behalf of the same request.
419    */
420   struct PendingMessageList *prev;
421
422   /**
423    * Message this entry represents.
424    */
425   struct PendingMessage *pm;
426
427   /**
428    * Request this entry belongs to.
429    */
430   struct PendingRequest *req;
431
432   /**
433    * Peer this message is targeted for.
434    */
435   struct ConnectedPeer *target;
436
437 };
438
439
440 /**
441  * Information we keep for each pending request.  We should try to
442  * keep this struct as small as possible since its memory consumption
443  * is key to how many requests we can have pending at once.
444  */
445 struct PendingRequest
446 {
447
448   /**
449    * If this request was made by a client, this is our entry in the
450    * client request list; otherwise NULL.
451    */
452   struct ClientRequestList *client_request_list;
453
454   /**
455    * Entry of peer responsible for this entry (if this request
456    * was made by a peer).
457    */
458   struct ConnectedPeer *cp;
459
460   /**
461    * If this is a namespace query, pointer to the hash of the public
462    * key of the namespace; otherwise NULL.  Pointer will be to the 
463    * end of this struct (so no need to free it).
464    */
465   const GNUNET_HashCode *namespace;
466
467   /**
468    * Bloomfilter we use to filter out replies that we don't care about
469    * (anymore).  NULL as long as we are interested in all replies.
470    */
471   struct GNUNET_CONTAINER_BloomFilter *bf;
472
473   /**
474    * Context of our GNUNET_CORE_peer_change_preference call.
475    */
476   struct GNUNET_CORE_InformationRequestContext *irc;
477
478   /**
479    * Reference to DHT get operation for this request (or NULL).
480    */
481   struct GNUNET_DHT_GetHandle *dht_get;
482
483   /**
484    * Hash code of all replies that we have seen so far (only valid
485    * if client is not NULL since we only track replies like this for
486    * our own clients).
487    */
488   GNUNET_HashCode *replies_seen;
489
490   /**
491    * Node in the heap representing this entry; NULL
492    * if we have no heap node.
493    */
494   struct GNUNET_CONTAINER_HeapNode *hnode;
495
496   /**
497    * Head of list of messages being performed on behalf of this
498    * request.
499    */
500   struct PendingMessageList *pending_head;
501
502   /**
503    * Tail of list of messages being performed on behalf of this
504    * request.
505    */
506   struct PendingMessageList *pending_tail;
507
508   /**
509    * When did we first see this request (form this peer), or, if our
510    * client is initiating, when did we last initiate a search?
511    */
512   struct GNUNET_TIME_Absolute start_time;
513
514   /**
515    * The query that this request is for.
516    */
517   GNUNET_HashCode query;
518
519   /**
520    * The task responsible for transmitting queries
521    * for this request.
522    */
523   GNUNET_SCHEDULER_TaskIdentifier task;
524
525   /**
526    * (Interned) Peer identifier that identifies a preferred target
527    * for requests.
528    */
529   GNUNET_PEER_Id target_pid;
530
531   /**
532    * (Interned) Peer identifiers of peers that have already
533    * received our query for this content.
534    */
535   GNUNET_PEER_Id *used_pids;
536   
537   /**
538    * Our entry in the queue (non-NULL while we wait for our
539    * turn to interact with the local database).
540    */
541   struct GNUNET_DATASTORE_QueueEntry *qe;
542
543   /**
544    * Size of the 'bf' (in bytes).
545    */
546   size_t bf_size;
547
548   /**
549    * Desired anonymity level; only valid for requests from a local client.
550    */
551   uint32_t anonymity_level;
552
553   /**
554    * How many entries in "used_pids" are actually valid?
555    */
556   unsigned int used_pids_off;
557
558   /**
559    * How long is the "used_pids" array?
560    */
561   unsigned int used_pids_size;
562
563   /**
564    * Number of results found for this request.
565    */
566   unsigned int results_found;
567
568   /**
569    * How many entries in "replies_seen" are actually valid?
570    */
571   unsigned int replies_seen_off;
572
573   /**
574    * How long is the "replies_seen" array?
575    */
576   unsigned int replies_seen_size;
577   
578   /**
579    * Priority with which this request was made.  If one of our clients
580    * made the request, then this is the current priority that we are
581    * using when initiating the request.  This value is used when
582    * we decide to reward other peers with trust for providing a reply.
583    */
584   uint32_t priority;
585
586   /**
587    * Priority points left for us to spend when forwarding this request
588    * to other peers.
589    */
590   uint32_t remaining_priority;
591
592   /**
593    * Number to mingle hashes for bloom-filter tests with.
594    */
595   int32_t mingle;
596
597   /**
598    * TTL with which we saw this request (or, if we initiated, TTL that
599    * we used for the request).
600    */
601   int32_t ttl;
602   
603   /**
604    * Type of the content that this request is for.
605    */
606   enum GNUNET_BLOCK_Type type;
607
608   /**
609    * Remove this request after transmission of the current response.
610    */
611   int8_t do_remove;
612
613   /**
614    * GNUNET_YES if we should not forward this request to other peers.
615    */
616   int8_t local_only;
617
618   /**
619    * GNUNET_YES if we should not forward this request to other peers.
620    */
621   int8_t forward_only;
622
623 };
624
625
626 /**
627  * Block that is ready for migration to other peers.  Actual data is at the end of the block.
628  */
629 struct MigrationReadyBlock
630 {
631
632   /**
633    * This is a doubly-linked list.
634    */
635   struct MigrationReadyBlock *next;
636
637   /**
638    * This is a doubly-linked list.
639    */
640   struct MigrationReadyBlock *prev;
641
642   /**
643    * Query for the block.
644    */
645   GNUNET_HashCode query;
646
647   /**
648    * When does this block expire? 
649    */
650   struct GNUNET_TIME_Absolute expiration;
651
652   /**
653    * Peers we would consider forwarding this
654    * block to.  Zero for empty entries.
655    */
656   GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
657
658   /**
659    * Size of the block.
660    */
661   size_t size;
662
663   /**
664    *  Number of targets already used.
665    */
666   unsigned int used_targets;
667
668   /**
669    * Type of the block.
670    */
671   enum GNUNET_BLOCK_Type type;
672 };
673
674
675 /**
676  * Our connection to the datastore.
677  */
678 static struct GNUNET_DATASTORE_Handle *dsh;
679
680 /**
681  * Our block context.
682  */
683 static struct GNUNET_BLOCK_Context *block_ctx;
684
685 /**
686  * Our block configuration.
687  */
688 static struct GNUNET_CONFIGURATION_Handle *block_cfg;
689
690 /**
691  * Our scheduler.
692  */
693 static struct GNUNET_SCHEDULER_Handle *sched;
694
695 /**
696  * Our configuration.
697  */
698 static const struct GNUNET_CONFIGURATION_Handle *cfg;
699
700 /**
701  * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
702  */
703 static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
704
705 /**
706  * Map of peer identifiers to "struct PendingRequest" (for that peer).
707  */
708 static struct GNUNET_CONTAINER_MultiHashMap *peer_request_map;
709
710 /**
711  * Map of query identifiers to "struct PendingRequest" (for that query).
712  */
713 static struct GNUNET_CONTAINER_MultiHashMap *query_request_map;
714
715 /**
716  * Heap with the request that will expire next at the top.  Contains
717  * pointers of type "struct PendingRequest*"; these will *also* be
718  * aliased from the "requests_by_peer" data structures and the
719  * "requests_by_query" table.  Note that requests from our clients
720  * don't expire and are thus NOT in the "requests_by_expiration"
721  * (or the "requests_by_peer" tables).
722  */
723 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
724
725 /**
726  * Handle for reporting statistics.
727  */
728 static struct GNUNET_STATISTICS_Handle *stats;
729
730 /**
731  * Linked list of clients we are currently processing requests for.
732  */
733 static struct ClientList *client_list;
734
735 /**
736  * Pointer to handle to the core service (points to NULL until we've
737  * connected to it).
738  */
739 static struct GNUNET_CORE_Handle *core;
740
741 /**
742  * Head of linked list of blocks that can be migrated.
743  */
744 static struct MigrationReadyBlock *mig_head;
745
746 /**
747  * Tail of linked list of blocks that can be migrated.
748  */
749 static struct MigrationReadyBlock *mig_tail;
750
751 /**
752  * Request to datastore for migration (or NULL).
753  */
754 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
755
756 /**
757  * Request to datastore for DHT PUTs (or NULL).
758  */
759 static struct GNUNET_DATASTORE_QueueEntry *dht_qe;
760
761 /**
762  * Type we will request for the next DHT PUT round from the datastore.
763  */
764 static enum GNUNET_BLOCK_Type dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
765
766 /**
767  * Where do we store trust information?
768  */
769 static char *trustDirectory;
770
771 /**
772  * ID of task that collects blocks for migration.
773  */
774 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
775
776 /**
777  * ID of task that collects blocks for DHT PUTs.
778  */
779 static GNUNET_SCHEDULER_TaskIdentifier dht_task;
780
781 /**
782  * What is the maximum frequency at which we are allowed to
783  * poll the datastore for migration content?
784  */
785 static struct GNUNET_TIME_Relative min_migration_delay;
786
787 /**
788  * Handle for DHT operations.
789  */
790 static struct GNUNET_DHT_Handle *dht_handle;
791
792 /**
793  * Size of the doubly-linked list of migration blocks.
794  */
795 static unsigned int mig_size;
796
797 /**
798  * Are we allowed to migrate content to this peer.
799  */
800 static int active_migration;
801
802 /**
803  * How many entires with zero anonymity do we currently estimate
804  * to have in the database?
805  */
806 static unsigned int zero_anonymity_count_estimate;
807
808 /**
809  * Typical priorities we're seeing from other peers right now.  Since
810  * most priorities will be zero, this value is the weighted average of
811  * non-zero priorities seen "recently".  In order to ensure that new
812  * values do not dramatically change the ratio, values are first
813  * "capped" to a reasonable range (+N of the current value) and then
814  * averaged into the existing value by a ratio of 1:N.  Hence
815  * receiving the largest possible priority can still only raise our
816  * "current_priorities" by at most 1.
817  */
818 static double current_priorities;
819
820 /**
821  * Datastore 'GET' load tracking.
822  */
823 static struct GNUNET_LOAD_Value *datastore_get_load;
824
825 /**
826  * Datastore 'PUT' load tracking.
827  */
828 static struct GNUNET_LOAD_Value *datastore_put_load;
829
830 /**
831  * How long do requests typically stay in the routing table?
832  */
833 static struct GNUNET_LOAD_Value *rt_entry_lifetime;
834
835 /**
836  * We've just now completed a datastore request.  Update our
837  * datastore load calculations.
838  *
839  * @param start time when the datastore request was issued
840  */
841 static void
842 update_datastore_delays (struct GNUNET_TIME_Absolute start)
843 {
844   struct GNUNET_TIME_Relative delay;
845
846   delay = GNUNET_TIME_absolute_get_duration (start);
847   GNUNET_LOAD_update (datastore_get_load,
848                       delay.value);
849 }
850
851
852 /**
853  * Get the filename under which we would store the GNUNET_HELLO_Message
854  * for the given host and protocol.
855  * @return filename of the form DIRECTORY/HOSTID
856  */
857 static char *
858 get_trust_filename (const struct GNUNET_PeerIdentity *id)
859 {
860   struct GNUNET_CRYPTO_HashAsciiEncoded fil;
861   char *fn;
862
863   GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
864   GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil);
865   return fn;
866 }
867
868
869
870 /**
871  * Transmit messages by copying it to the target buffer
872  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
873  * for writing in the meantime.  In that case, do nothing
874  * (the disconnect or shutdown handler will take care of the rest).
875  * If we were able to transmit messages and there are still more
876  * pending, ask core again for further calls to this function.
877  *
878  * @param cls closure, pointer to the 'struct ConnectedPeer*'
879  * @param size number of bytes available in buf
880  * @param buf where the callee should write the message
881  * @return number of bytes written to buf
882  */
883 static size_t
884 transmit_to_peer (void *cls,
885                   size_t size, void *buf);
886
887
888 /* ******************* clean up functions ************************ */
889
890 /**
891  * Delete the given migration block.
892  *
893  * @param mb block to delete
894  */
895 static void
896 delete_migration_block (struct MigrationReadyBlock *mb)
897 {
898   GNUNET_CONTAINER_DLL_remove (mig_head,
899                                mig_tail,
900                                mb);
901   GNUNET_PEER_decrement_rcs (mb->target_list,
902                              MIGRATION_LIST_SIZE);
903   mig_size--;
904   GNUNET_free (mb);
905 }
906
907
908 /**
909  * Compare the distance of two peers to a key.
910  *
911  * @param key key
912  * @param p1 first peer
913  * @param p2 second peer
914  * @return GNUNET_YES if P1 is closer to key than P2
915  */
916 static int
917 is_closer (const GNUNET_HashCode *key,
918            const struct GNUNET_PeerIdentity *p1,
919            const struct GNUNET_PeerIdentity *p2)
920 {
921   return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey,
922                                     &p2->hashPubKey,
923                                     key);
924 }
925
926
927 /**
928  * Consider migrating content to a given peer.
929  *
930  * @param cls 'struct MigrationReadyBlock*' to select
931  *            targets for (or NULL for none)
932  * @param key ID of the peer 
933  * @param value 'struct ConnectedPeer' of the peer
934  * @return GNUNET_YES (always continue iteration)
935  */
936 static int
937 consider_migration (void *cls,
938                     const GNUNET_HashCode *key,
939                     void *value)
940 {
941   struct MigrationReadyBlock *mb = cls;
942   struct ConnectedPeer *cp = value;
943   struct MigrationReadyBlock *pos;
944   struct GNUNET_PeerIdentity cppid;
945   struct GNUNET_PeerIdentity otherpid;
946   struct GNUNET_PeerIdentity worstpid;
947   size_t msize;
948   unsigned int i;
949   unsigned int repl;
950   
951   /* consider 'cp' as a migration target for mb */
952   if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).value > 0)
953     return GNUNET_YES; /* peer has requested no migration! */
954   if (mb != NULL)
955     {
956       GNUNET_PEER_resolve (cp->pid,
957                            &cppid);
958       repl = MIGRATION_LIST_SIZE;
959       for (i=0;i<MIGRATION_LIST_SIZE;i++)
960         {
961           if (mb->target_list[i] == 0)
962             {
963               mb->target_list[i] = cp->pid;
964               GNUNET_PEER_change_rc (mb->target_list[i], 1);
965               repl = MIGRATION_LIST_SIZE;
966               break;
967             }
968           GNUNET_PEER_resolve (mb->target_list[i],
969                                &otherpid);
970           if ( (repl == MIGRATION_LIST_SIZE) &&
971                is_closer (&mb->query,
972                           &cppid,
973                           &otherpid)) 
974             {
975               repl = i;
976               worstpid = otherpid;
977             }
978           else if ( (repl != MIGRATION_LIST_SIZE) &&
979                     (is_closer (&mb->query,
980                                 &worstpid,
981                                 &otherpid) ) )
982             {
983               repl = i;
984               worstpid = otherpid;
985             }       
986         }
987       if (repl != MIGRATION_LIST_SIZE) 
988         {
989           GNUNET_PEER_change_rc (mb->target_list[repl], -1);
990           mb->target_list[repl] = cp->pid;
991           GNUNET_PEER_change_rc (mb->target_list[repl], 1);
992         }
993     }
994
995   /* consider scheduling transmission to cp for content migration */
996   if (cp->cth != NULL)        
997     return GNUNET_YES; 
998   msize = 0;
999   pos = mig_head;
1000   while (pos != NULL)
1001     {
1002       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1003         {
1004           if (cp->pid == pos->target_list[i])
1005             {
1006               if (msize == 0)
1007                 msize = pos->size;
1008               else
1009                 msize = GNUNET_MIN (msize,
1010                                     pos->size);
1011               break;
1012             }
1013         }
1014       pos = pos->next;
1015     }
1016   if (msize == 0)
1017     return GNUNET_YES; /* no content available */
1018 #if DEBUG_FS
1019   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1020               "Trying to migrate at least %u bytes to peer `%s'\n",
1021               msize,
1022               GNUNET_h2s (key));
1023 #endif
1024   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
1025     {
1026       GNUNET_SCHEDULER_cancel (sched, cp->delayed_transmission_request_task);
1027       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
1028     }
1029   cp->cth 
1030     = GNUNET_CORE_notify_transmit_ready (core,
1031                                          0, GNUNET_TIME_UNIT_FOREVER_REL,
1032                                          (const struct GNUNET_PeerIdentity*) key,
1033                                          msize + sizeof (struct PutMessage),
1034                                          &transmit_to_peer,
1035                                          cp);
1036   return GNUNET_YES;
1037 }
1038
1039
1040 /**
1041  * Task that is run periodically to obtain blocks for content
1042  * migration
1043  * 
1044  * @param cls unused
1045  * @param tc scheduler context (also unused)
1046  */
1047 static void
1048 gather_migration_blocks (void *cls,
1049                          const struct GNUNET_SCHEDULER_TaskContext *tc);
1050
1051
1052
1053
1054 /**
1055  * Task that is run periodically to obtain blocks for DHT PUTs.
1056  * 
1057  * @param cls type of blocks to gather
1058  * @param tc scheduler context (unused)
1059  */
1060 static void
1061 gather_dht_put_blocks (void *cls,
1062                        const struct GNUNET_SCHEDULER_TaskContext *tc);
1063
1064
1065 /**
1066  * If the migration task is not currently running, consider
1067  * (re)scheduling it with the appropriate delay.
1068  */
1069 static void
1070 consider_migration_gathering ()
1071 {
1072   struct GNUNET_TIME_Relative delay;
1073
1074   if (dsh == NULL)
1075     return;
1076   if (mig_qe != NULL)
1077     return;
1078   if (mig_task != GNUNET_SCHEDULER_NO_TASK)
1079     return;
1080   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
1081                                          mig_size);
1082   delay = GNUNET_TIME_relative_divide (delay,
1083                                        MAX_MIGRATION_QUEUE);
1084   delay = GNUNET_TIME_relative_max (delay,
1085                                     min_migration_delay);
1086   mig_task = GNUNET_SCHEDULER_add_delayed (sched,
1087                                            delay,
1088                                            &gather_migration_blocks,
1089                                            NULL);
1090 }
1091
1092
1093 /**
1094  * If the DHT PUT gathering task is not currently running, consider
1095  * (re)scheduling it with the appropriate delay.
1096  */
1097 static void
1098 consider_dht_put_gathering (void *cls)
1099 {
1100   struct GNUNET_TIME_Relative delay;
1101
1102   if (dsh == NULL)
1103     return;
1104   if (dht_qe != NULL)
1105     return;
1106   if (dht_task != GNUNET_SCHEDULER_NO_TASK)
1107     return;
1108   if (zero_anonymity_count_estimate > 0)
1109     {
1110       delay = GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY,
1111                                            zero_anonymity_count_estimate);
1112       delay = GNUNET_TIME_relative_min (delay,
1113                                         MAX_DHT_PUT_FREQ);
1114     }
1115   else
1116     {
1117       /* if we have NO zero-anonymity content yet, wait 5 minutes for some to
1118          (hopefully) appear */
1119       delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5);
1120     }
1121   dht_task = GNUNET_SCHEDULER_add_delayed (sched,
1122                                            delay,
1123                                            &gather_dht_put_blocks,
1124                                            cls);
1125 }
1126
1127
1128 /**
1129  * Process content offered for migration.
1130  *
1131  * @param cls closure
1132  * @param key key for the content
1133  * @param size number of bytes in data
1134  * @param data content stored
1135  * @param type type of the content
1136  * @param priority priority of the content
1137  * @param anonymity anonymity-level for the content
1138  * @param expiration expiration time for the content
1139  * @param uid unique identifier for the datum;
1140  *        maybe 0 if no unique identifier is available
1141  */
1142 static void
1143 process_migration_content (void *cls,
1144                            const GNUNET_HashCode * key,
1145                            size_t size,
1146                            const void *data,
1147                            enum GNUNET_BLOCK_Type type,
1148                            uint32_t priority,
1149                            uint32_t anonymity,
1150                            struct GNUNET_TIME_Absolute
1151                            expiration, uint64_t uid)
1152 {
1153   struct MigrationReadyBlock *mb;
1154   
1155   if (key == NULL)
1156     {
1157       mig_qe = NULL;
1158       if (mig_size < MAX_MIGRATION_QUEUE)  
1159         consider_migration_gathering ();
1160       return;
1161     }
1162   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1163     {
1164       if (GNUNET_OK !=
1165           GNUNET_FS_handle_on_demand_block (key, size, data,
1166                                             type, priority, anonymity,
1167                                             expiration, uid, 
1168                                             &process_migration_content,
1169                                             NULL))
1170         {
1171           GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1172         }
1173       return;
1174     }
1175 #if DEBUG_FS
1176   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1177               "Retrieved block `%s' of type %u for migration\n",
1178               GNUNET_h2s (key),
1179               type);
1180 #endif
1181   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
1182   mb->query = *key;
1183   mb->expiration = expiration;
1184   mb->size = size;
1185   mb->type = type;
1186   memcpy (&mb[1], data, size);
1187   GNUNET_CONTAINER_DLL_insert_after (mig_head,
1188                                      mig_tail,
1189                                      mig_tail,
1190                                      mb);
1191   mig_size++;
1192   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1193                                          &consider_migration,
1194                                          mb);
1195   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1196 }
1197
1198
1199 /**
1200  * Function called upon completion of the DHT PUT operation.
1201  */
1202 static void
1203 dht_put_continuation (void *cls,
1204                       const struct GNUNET_SCHEDULER_TaskContext *tc)
1205 {
1206   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1207 }
1208
1209
1210 /**
1211  * Store content in DHT.
1212  *
1213  * @param cls closure
1214  * @param key key for the content
1215  * @param size number of bytes in data
1216  * @param data content stored
1217  * @param type type of the content
1218  * @param priority priority of the content
1219  * @param anonymity anonymity-level for the content
1220  * @param expiration expiration time for the content
1221  * @param uid unique identifier for the datum;
1222  *        maybe 0 if no unique identifier is available
1223  */
1224 static void
1225 process_dht_put_content (void *cls,
1226                          const GNUNET_HashCode * key,
1227                          size_t size,
1228                          const void *data,
1229                          enum GNUNET_BLOCK_Type type,
1230                          uint32_t priority,
1231                          uint32_t anonymity,
1232                          struct GNUNET_TIME_Absolute
1233                          expiration, uint64_t uid)
1234
1235   static unsigned int counter;
1236   static GNUNET_HashCode last_vhash;
1237   static GNUNET_HashCode vhash;
1238
1239   if (key == NULL)
1240     {
1241       dht_qe = NULL;
1242       consider_dht_put_gathering (cls);
1243       return;
1244     }
1245   /* slightly funky code to estimate the total number of values with zero
1246      anonymity from the maximum observed length of a monotonically increasing 
1247      sequence of hashes over the contents */
1248   GNUNET_CRYPTO_hash (data, size, &vhash);
1249   if (GNUNET_CRYPTO_hash_cmp (&vhash, &last_vhash) <= 0)
1250     {
1251       if (zero_anonymity_count_estimate > 0)
1252         zero_anonymity_count_estimate /= 2;
1253       counter = 0;
1254     }
1255   last_vhash = vhash;
1256   if (counter < 31)
1257     counter++;
1258   if (zero_anonymity_count_estimate < (1 << counter))
1259     zero_anonymity_count_estimate = (1 << counter);
1260 #if DEBUG_FS
1261   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1262               "Retrieved block `%s' of type %u for DHT PUT\n",
1263               GNUNET_h2s (key),
1264               type);
1265 #endif
1266   GNUNET_DHT_put (dht_handle,
1267                   key,
1268                   GNUNET_DHT_RO_NONE,
1269                   type,
1270                   size,
1271                   data,
1272                   expiration,
1273                   GNUNET_TIME_UNIT_FOREVER_REL,
1274                   &dht_put_continuation,
1275                   cls);
1276 }
1277
1278
1279 /**
1280  * Task that is run periodically to obtain blocks for content
1281  * migration
1282  * 
1283  * @param cls unused
1284  * @param tc scheduler context (also unused)
1285  */
1286 static void
1287 gather_migration_blocks (void *cls,
1288                          const struct GNUNET_SCHEDULER_TaskContext *tc)
1289 {
1290   mig_task = GNUNET_SCHEDULER_NO_TASK;
1291   if (dsh != NULL)
1292     {
1293       mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, UINT_MAX,
1294                                             GNUNET_TIME_UNIT_FOREVER_REL,
1295                                             &process_migration_content, NULL);
1296       GNUNET_assert (mig_qe != NULL);
1297     }
1298 }
1299
1300
1301 /**
1302  * Task that is run periodically to obtain blocks for DHT PUTs.
1303  * 
1304  * @param cls type of blocks to gather
1305  * @param tc scheduler context (unused)
1306  */
1307 static void
1308 gather_dht_put_blocks (void *cls,
1309                        const struct GNUNET_SCHEDULER_TaskContext *tc)
1310 {
1311   dht_task = GNUNET_SCHEDULER_NO_TASK;
1312   if (dsh != NULL)
1313     {
1314       if (dht_put_type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1315         dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
1316       dht_qe = GNUNET_DATASTORE_get_zero_anonymity (dsh, 0, UINT_MAX,
1317                                                     GNUNET_TIME_UNIT_FOREVER_REL,
1318                                                     dht_put_type++,
1319                                                     &process_dht_put_content, NULL);
1320       GNUNET_assert (dht_qe != NULL);
1321     }
1322 }
1323
1324
1325 /**
1326  * We're done with a particular message list entry.
1327  * Free all associated resources.
1328  * 
1329  * @param pml entry to destroy
1330  */
1331 static void
1332 destroy_pending_message_list_entry (struct PendingMessageList *pml)
1333 {
1334   GNUNET_CONTAINER_DLL_remove (pml->req->pending_head,
1335                                pml->req->pending_tail,
1336                                pml);
1337   GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head,
1338                                pml->target->pending_messages_tail,
1339                                pml->pm);
1340   pml->target->pending_requests--;
1341   GNUNET_free (pml->pm);
1342   GNUNET_free (pml);
1343 }
1344
1345
1346 /**
1347  * Destroy the given pending message (and call the respective
1348  * continuation).
1349  *
1350  * @param pm message to destroy
1351  * @param tpid id of peer that the message was delivered to, or 0 for none
1352  */
1353 static void
1354 destroy_pending_message (struct PendingMessage *pm,
1355                          GNUNET_PEER_Id tpid)
1356 {
1357   struct PendingMessageList *pml = pm->pml;
1358   TransmissionContinuation cont;
1359   void *cont_cls;
1360
1361   cont = pm->cont;
1362   cont_cls = pm->cont_cls;
1363   if (pml != NULL)
1364     {
1365       GNUNET_assert (pml->pm == pm);
1366       GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
1367       destroy_pending_message_list_entry (pml);
1368     }
1369   else
1370     {
1371       GNUNET_free (pm);
1372     }
1373   if (cont != NULL)
1374     cont (cont_cls, tpid);  
1375 }
1376
1377
1378 /**
1379  * We're done processing a particular request.
1380  * Free all associated resources.
1381  *
1382  * @param pr request to destroy
1383  */
1384 static void
1385 destroy_pending_request (struct PendingRequest *pr)
1386 {
1387   struct GNUNET_PeerIdentity pid;
1388
1389   if (pr->hnode != NULL)
1390     {
1391       GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
1392                                          pr->hnode);
1393       pr->hnode = NULL;
1394     }
1395   if (NULL == pr->client_request_list)
1396     {
1397       GNUNET_STATISTICS_update (stats,
1398                                 gettext_noop ("# P2P searches active"),
1399                                 -1,
1400                                 GNUNET_NO);
1401     }
1402   else
1403     {
1404       GNUNET_STATISTICS_update (stats,
1405                                 gettext_noop ("# client searches active"),
1406                                 -1,
1407                                 GNUNET_NO);
1408     }
1409   if (GNUNET_YES == 
1410       GNUNET_CONTAINER_multihashmap_remove (query_request_map,
1411                                             &pr->query,
1412                                             pr))
1413     {
1414       GNUNET_LOAD_update (rt_entry_lifetime,
1415                           GNUNET_TIME_absolute_get_duration (pr->start_time).value);
1416     }
1417   if (pr->qe != NULL)
1418      {
1419       GNUNET_DATASTORE_cancel (pr->qe);
1420       pr->qe = NULL;
1421     }
1422   if (pr->dht_get != NULL)
1423     {
1424       GNUNET_DHT_get_stop (pr->dht_get);
1425       pr->dht_get = NULL;
1426     }
1427   if (pr->client_request_list != NULL)
1428     {
1429       GNUNET_CONTAINER_DLL_remove (pr->client_request_list->client_list->rl_head,
1430                                    pr->client_request_list->client_list->rl_tail,
1431                                    pr->client_request_list);
1432       GNUNET_free (pr->client_request_list);
1433       pr->client_request_list = NULL;
1434     }
1435   if (pr->cp != NULL)
1436     {
1437       GNUNET_PEER_resolve (pr->cp->pid,
1438                            &pid);
1439       (void) GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1440                                                    &pid.hashPubKey,
1441                                                    pr);
1442       pr->cp = NULL;
1443     }
1444   if (pr->bf != NULL)
1445     {
1446       GNUNET_CONTAINER_bloomfilter_free (pr->bf);                                        
1447       pr->bf = NULL;
1448     }
1449   if (pr->irc != NULL)
1450     {
1451       GNUNET_CORE_peer_change_preference_cancel (pr->irc);
1452       pr->irc = NULL;
1453     }
1454   if (pr->replies_seen != NULL)
1455     {
1456       GNUNET_free (pr->replies_seen);
1457       pr->replies_seen = NULL;
1458     }
1459   if (pr->task != GNUNET_SCHEDULER_NO_TASK)
1460     {
1461       GNUNET_SCHEDULER_cancel (sched,
1462                                pr->task);
1463       pr->task = GNUNET_SCHEDULER_NO_TASK;
1464     }
1465   while (NULL != pr->pending_head)    
1466     destroy_pending_message_list_entry (pr->pending_head);
1467   GNUNET_PEER_change_rc (pr->target_pid, -1);
1468   if (pr->used_pids != NULL)
1469     {
1470       GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
1471       GNUNET_free (pr->used_pids);
1472       pr->used_pids_off = 0;
1473       pr->used_pids_size = 0;
1474       pr->used_pids = NULL;
1475     }
1476   GNUNET_free (pr);
1477 }
1478
1479
1480 /**
1481  * Method called whenever a given peer connects.
1482  *
1483  * @param cls closure, not used
1484  * @param peer peer identity this notification is about
1485  * @param latency reported latency of the connection with 'other'
1486  * @param distance reported distance (DV) to 'other' 
1487  */
1488 static void 
1489 peer_connect_handler (void *cls,
1490                       const struct
1491                       GNUNET_PeerIdentity * peer,
1492                       struct GNUNET_TIME_Relative latency,
1493                       uint32_t distance)
1494 {
1495   struct ConnectedPeer *cp;
1496   struct MigrationReadyBlock *pos;
1497   char *fn;
1498   uint32_t trust;
1499   
1500   cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
1501   cp->transmission_delay = GNUNET_LOAD_value_init ();
1502   cp->pid = GNUNET_PEER_intern (peer);
1503
1504   fn = get_trust_filename (peer);
1505   if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
1506       (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
1507     cp->disk_trust = cp->trust = ntohl (trust);
1508   GNUNET_free (fn);
1509
1510   GNUNET_break (GNUNET_OK ==
1511                 GNUNET_CONTAINER_multihashmap_put (connected_peers,
1512                                                    &peer->hashPubKey,
1513                                                    cp,
1514                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1515
1516   pos = mig_head;
1517   while (NULL != pos)
1518     {
1519       (void) consider_migration (pos, &peer->hashPubKey, cp);
1520       pos = pos->next;
1521     }
1522 }
1523
1524
1525 /**
1526  * Increase the host credit by a value.
1527  *
1528  * @param host which peer to change the trust value on
1529  * @param value is the int value by which the
1530  *  host credit is to be increased or decreased
1531  * @returns the actual change in trust (positive or negative)
1532  */
1533 static int
1534 change_host_trust (struct ConnectedPeer *host, int value)
1535 {
1536   unsigned int old_trust;
1537
1538   if (value == 0)
1539     return 0;
1540   GNUNET_assert (host != NULL);
1541   old_trust = host->trust;
1542   if (value > 0)
1543     {
1544       if (host->trust + value < host->trust)
1545         {
1546           value = UINT32_MAX - host->trust;
1547           host->trust = UINT32_MAX;
1548         }
1549       else
1550         host->trust += value;
1551     }
1552   else
1553     {
1554       if (host->trust < -value)
1555         {
1556           value = -host->trust;
1557           host->trust = 0;
1558         }
1559       else
1560         host->trust += value;
1561     }
1562   return value;
1563 }
1564
1565
1566 /**
1567  * Write host-trust information to a file - flush the buffer entry!
1568  */
1569 static int
1570 flush_trust (void *cls,
1571              const GNUNET_HashCode *key,
1572              void *value)
1573 {
1574   struct ConnectedPeer *host = value;
1575   char *fn;
1576   uint32_t trust;
1577   struct GNUNET_PeerIdentity pid;
1578
1579   if (host->trust == host->disk_trust)
1580     return GNUNET_OK;                     /* unchanged */
1581   GNUNET_PEER_resolve (host->pid,
1582                        &pid);
1583   fn = get_trust_filename (&pid);
1584   if (host->trust == 0)
1585     {
1586       if ((0 != UNLINK (fn)) && (errno != ENOENT))
1587         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
1588                                   GNUNET_ERROR_TYPE_BULK, "unlink", fn);
1589     }
1590   else
1591     {
1592       trust = htonl (host->trust);
1593       if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust, 
1594                                                     sizeof(uint32_t),
1595                                                     GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE
1596                                                     | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ))
1597         host->disk_trust = host->trust;
1598     }
1599   GNUNET_free (fn);
1600   return GNUNET_OK;
1601 }
1602
1603 /**
1604  * Call this method periodically to scan data/hosts for new hosts.
1605  */
1606 static void
1607 cron_flush_trust (void *cls,
1608                   const struct GNUNET_SCHEDULER_TaskContext *tc)
1609 {
1610
1611   if (NULL == connected_peers)
1612     return;
1613   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1614                                          &flush_trust,
1615                                          NULL);
1616   if (NULL == tc)
1617     return;
1618   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1619     return;
1620   GNUNET_SCHEDULER_add_delayed (tc->sched,
1621                                 TRUST_FLUSH_FREQ, &cron_flush_trust, NULL);
1622 }
1623
1624
1625 /**
1626  * Free (each) request made by the peer.
1627  *
1628  * @param cls closure, points to peer that the request belongs to
1629  * @param key current key code
1630  * @param value value in the hash map
1631  * @return GNUNET_YES (we should continue to iterate)
1632  */
1633 static int
1634 destroy_request (void *cls,
1635                  const GNUNET_HashCode * key,
1636                  void *value)
1637 {
1638   const struct GNUNET_PeerIdentity * peer = cls;
1639   struct PendingRequest *pr = value;
1640   
1641   GNUNET_break (GNUNET_YES ==
1642                 GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1643                                                       &peer->hashPubKey,
1644                                                       pr));
1645   destroy_pending_request (pr);
1646   return GNUNET_YES;
1647 }
1648
1649
1650 /**
1651  * Method called whenever a peer disconnects.
1652  *
1653  * @param cls closure, not used
1654  * @param peer peer identity this notification is about
1655  */
1656 static void
1657 peer_disconnect_handler (void *cls,
1658                          const struct
1659                          GNUNET_PeerIdentity * peer)
1660 {
1661   struct ConnectedPeer *cp;
1662   struct PendingMessage *pm;
1663   unsigned int i;
1664   struct MigrationReadyBlock *pos;
1665   struct MigrationReadyBlock *next;
1666
1667   GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map,
1668                                               &peer->hashPubKey,
1669                                               &destroy_request,
1670                                               (void*) peer);
1671   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1672                                           &peer->hashPubKey);
1673   if (cp == NULL)
1674     return;
1675   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1676     {
1677       if (NULL != cp->last_client_replies[i])
1678         {
1679           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1680           cp->last_client_replies[i] = NULL;
1681         }
1682     }
1683   GNUNET_break (GNUNET_YES ==
1684                 GNUNET_CONTAINER_multihashmap_remove (connected_peers,
1685                                                       &peer->hashPubKey,
1686                                                       cp));
1687   /* remove this peer from migration considerations; schedule
1688      alternatives */
1689   next = mig_head;
1690   while (NULL != (pos = next))
1691     {
1692       next = pos->next;
1693       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1694         {
1695           if (pos->target_list[i] == cp->pid)
1696             {
1697               GNUNET_PEER_change_rc (pos->target_list[i], -1);
1698               pos->target_list[i] = 0;
1699             }
1700          }
1701       if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers))
1702         {
1703           delete_migration_block (pos);
1704           consider_migration_gathering ();
1705           continue;
1706         }
1707       GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1708                                              &consider_migration,
1709                                              pos);
1710     }
1711   GNUNET_PEER_change_rc (cp->pid, -1);
1712   GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1713   if (NULL != cp->cth)
1714     {
1715       GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1716       cp->cth = NULL;
1717     }
1718   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
1719     {
1720       GNUNET_SCHEDULER_cancel (sched, cp->delayed_transmission_request_task);
1721       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
1722     }
1723   while (NULL != (pm = cp->pending_messages_head))
1724     destroy_pending_message (pm, 0 /* delivery failed */);
1725   GNUNET_LOAD_value_free (cp->transmission_delay);
1726   GNUNET_break (0 == cp->pending_requests);
1727   GNUNET_free (cp);
1728 }
1729
1730
1731 /**
1732  * Iterator over hash map entries that removes all occurences
1733  * of the given 'client' from the 'last_client_replies' of the
1734  * given connected peer.
1735  *
1736  * @param cls closure, the 'struct GNUNET_SERVER_Client*' to remove
1737  * @param key current key code (unused)
1738  * @param value value in the hash map (the 'struct ConnectedPeer*' to change)
1739  * @return GNUNET_YES (we should continue to iterate)
1740  */
1741 static int
1742 remove_client_from_last_client_replies (void *cls,
1743                                         const GNUNET_HashCode * key,
1744                                         void *value)
1745 {
1746   struct GNUNET_SERVER_Client *client = cls;
1747   struct ConnectedPeer *cp = value;
1748   unsigned int i;
1749
1750   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1751     {
1752       if (cp->last_client_replies[i] == client)
1753         {
1754           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1755           cp->last_client_replies[i] = NULL;
1756         }
1757     }  
1758   return GNUNET_YES;
1759 }
1760
1761
1762 /**
1763  * A client disconnected.  Remove all of its pending queries.
1764  *
1765  * @param cls closure, NULL
1766  * @param client identification of the client
1767  */
1768 static void
1769 handle_client_disconnect (void *cls,
1770                           struct GNUNET_SERVER_Client
1771                           * client)
1772 {
1773   struct ClientList *pos;
1774   struct ClientList *prev;
1775   struct ClientRequestList *rcl;
1776   struct ClientResponseMessage *creply;
1777
1778   if (client == NULL)
1779     return;
1780   prev = NULL;
1781   pos = client_list;
1782   while ( (NULL != pos) &&
1783           (pos->client != client) )
1784     {
1785       prev = pos;
1786       pos = pos->next;
1787     }
1788   if (pos == NULL)
1789     return; /* no requests pending for this client */
1790   while (NULL != (rcl = pos->rl_head))
1791     {
1792       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1793                   "Destroying pending request `%s' on disconnect\n",
1794                   GNUNET_h2s (&rcl->req->query));
1795       destroy_pending_request (rcl->req);
1796     }
1797   if (prev == NULL)
1798     client_list = pos->next;
1799   else
1800     prev->next = pos->next;
1801   if (pos->th != NULL)
1802     {
1803       GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
1804       pos->th = NULL;
1805     }
1806   while (NULL != (creply = pos->res_head))
1807     {
1808       GNUNET_CONTAINER_DLL_remove (pos->res_head,
1809                                    pos->res_tail,
1810                                    creply);
1811       GNUNET_free (creply);
1812     }    
1813   GNUNET_SERVER_client_drop (pos->client);
1814   GNUNET_free (pos);
1815   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1816                                          &remove_client_from_last_client_replies,
1817                                          client);
1818 }
1819
1820
1821 /**
1822  * Iterator to free peer entries.
1823  *
1824  * @param cls closure, unused
1825  * @param key current key code
1826  * @param value value in the hash map (peer entry)
1827  * @return GNUNET_YES (we should continue to iterate)
1828  */
1829 static int 
1830 clean_peer (void *cls,
1831             const GNUNET_HashCode * key,
1832             void *value)
1833 {
1834   peer_disconnect_handler (NULL, (const struct GNUNET_PeerIdentity*) key);
1835   return GNUNET_YES;
1836 }
1837
1838
1839 /**
1840  * Task run during shutdown.
1841  *
1842  * @param cls unused
1843  * @param tc unused
1844  */
1845 static void
1846 shutdown_task (void *cls,
1847                const struct GNUNET_SCHEDULER_TaskContext *tc)
1848 {
1849   if (mig_qe != NULL)
1850     {
1851       GNUNET_DATASTORE_cancel (mig_qe);
1852       mig_qe = NULL;
1853     }
1854   if (dht_qe != NULL)
1855     {
1856       GNUNET_DATASTORE_cancel (dht_qe);
1857       dht_qe = NULL;
1858     }
1859   if (GNUNET_SCHEDULER_NO_TASK != mig_task)
1860     {
1861       GNUNET_SCHEDULER_cancel (sched, mig_task);
1862       mig_task = GNUNET_SCHEDULER_NO_TASK;
1863     }
1864   if (GNUNET_SCHEDULER_NO_TASK != dht_task)
1865     {
1866       GNUNET_SCHEDULER_cancel (sched, dht_task);
1867       dht_task = GNUNET_SCHEDULER_NO_TASK;
1868     }
1869   while (client_list != NULL)
1870     handle_client_disconnect (NULL,
1871                               client_list->client);
1872   cron_flush_trust (NULL, NULL);
1873   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1874                                          &clean_peer,
1875                                          NULL);
1876   GNUNET_break (0 == GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap));
1877   GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
1878   requests_by_expiration_heap = 0;
1879   GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
1880   connected_peers = NULL;
1881   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map));
1882   GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
1883   query_request_map = NULL;
1884   GNUNET_LOAD_value_free (rt_entry_lifetime);
1885   rt_entry_lifetime = NULL;
1886   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map));
1887   GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
1888   peer_request_map = NULL;
1889   GNUNET_assert (NULL != core);
1890   GNUNET_CORE_disconnect (core);
1891   core = NULL;
1892   if (stats != NULL)
1893     {
1894       GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
1895       stats = NULL;
1896     }
1897   if (dsh != NULL)
1898     {
1899       GNUNET_DATASTORE_disconnect (dsh,
1900                                    GNUNET_NO);
1901       dsh = NULL;
1902     }
1903   while (mig_head != NULL)
1904     delete_migration_block (mig_head);
1905   GNUNET_assert (0 == mig_size);
1906   GNUNET_DHT_disconnect (dht_handle);
1907   dht_handle = NULL;
1908   GNUNET_LOAD_value_free (datastore_get_load);
1909   datastore_get_load = NULL;
1910   GNUNET_LOAD_value_free (datastore_put_load);
1911   datastore_put_load = NULL;
1912   GNUNET_BLOCK_context_destroy (block_ctx);
1913   block_ctx = NULL;
1914   GNUNET_CONFIGURATION_destroy (block_cfg);
1915   block_cfg = NULL;
1916   sched = NULL;
1917   cfg = NULL;  
1918   GNUNET_free_non_null (trustDirectory);
1919   trustDirectory = NULL;
1920 }
1921
1922
1923 /* ******************* Utility functions  ******************** */
1924
1925
1926 /**
1927  * We've had to delay a request for transmission to core, but now
1928  * we should be ready.  Run it.
1929  *
1930  * @param cls the 'struct ConnectedPeer' for which a request was delayed
1931  * @param tc task context (unused)
1932  */
1933 static void
1934 delayed_transmission_request (void *cls,
1935                               const struct GNUNET_SCHEDULER_TaskContext *tc)
1936 {
1937   struct ConnectedPeer *cp = cls;
1938   struct GNUNET_PeerIdentity pid;
1939   struct PendingMessage *pm;
1940
1941   pm = cp->pending_messages_head;
1942   cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
1943   GNUNET_assert (cp->cth == NULL);
1944   if (pm == NULL)
1945     return;
1946   GNUNET_PEER_resolve (cp->pid,
1947                        &pid);
1948   cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
1949   cp->cth = GNUNET_CORE_notify_transmit_ready (core,
1950                                                pm->priority,
1951                                                GNUNET_CONSTANTS_SERVICE_TIMEOUT,
1952                                                &pid,
1953                                                pm->msize,
1954                                                &transmit_to_peer,
1955                                                cp);
1956 }
1957
1958
1959 /**
1960  * Transmit messages by copying it to the target buffer
1961  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
1962  * for writing in the meantime.  In that case, do nothing
1963  * (the disconnect or shutdown handler will take care of the rest).
1964  * If we were able to transmit messages and there are still more
1965  * pending, ask core again for further calls to this function.
1966  *
1967  * @param cls closure, pointer to the 'struct ConnectedPeer*'
1968  * @param size number of bytes available in buf
1969  * @param buf where the callee should write the message
1970  * @return number of bytes written to buf
1971  */
1972 static size_t
1973 transmit_to_peer (void *cls,
1974                   size_t size, void *buf)
1975 {
1976   struct ConnectedPeer *cp = cls;
1977   char *cbuf = buf;
1978   struct PendingMessage *pm;
1979   struct PendingMessage *next_pm;
1980   struct GNUNET_TIME_Absolute now;
1981   struct GNUNET_TIME_Relative min_delay;
1982   struct MigrationReadyBlock *mb;
1983   struct MigrationReadyBlock *next;
1984   struct PutMessage migm;
1985   size_t msize;
1986   unsigned int i;
1987   struct GNUNET_PeerIdentity pid;
1988  
1989   cp->cth = NULL;
1990   if (NULL == buf)
1991     {
1992 #if DEBUG_FS
1993       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1994                   "Dropping message, core too busy.\n");
1995 #endif
1996       GNUNET_LOAD_update (cp->transmission_delay,
1997                           UINT64_MAX);
1998       return 0;
1999     }  
2000   GNUNET_LOAD_update (cp->transmission_delay,
2001                       GNUNET_TIME_absolute_get_duration (cp->last_transmission_request_start).value);
2002   now = GNUNET_TIME_absolute_get ();
2003   msize = 0;
2004   min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
2005   next_pm = cp->pending_messages_head;
2006   while ( (NULL != (pm = next_pm) ) &&
2007           (pm->msize <= size) )
2008     {
2009       next_pm = pm->next;
2010       if (pm->delay_until.value > now.value)
2011         {
2012           min_delay = GNUNET_TIME_relative_min (min_delay,
2013                                                 GNUNET_TIME_absolute_get_remaining (pm->delay_until));
2014           continue;
2015         }
2016       memcpy (&cbuf[msize], &pm[1], pm->msize);
2017       msize += pm->msize;
2018       size -= pm->msize;
2019       GNUNET_CONTAINER_DLL_remove (cp->pending_messages_head,
2020                                    cp->pending_messages_tail,
2021                                    pm);
2022       if (NULL == pm->pml)
2023         cp->pending_requests--;
2024       destroy_pending_message (pm, cp->pid);
2025     }
2026   if (pm != NULL)
2027     min_delay = GNUNET_TIME_UNIT_ZERO;
2028   if (NULL != cp->pending_messages_head)
2029     {     
2030       GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == cp->delayed_transmission_request_task);
2031       cp->delayed_transmission_request_task
2032         = GNUNET_SCHEDULER_add_delayed (sched,
2033                                         min_delay,
2034                                         &delayed_transmission_request,
2035                                         cp);
2036     }
2037   if (pm == NULL)
2038     {      
2039       GNUNET_PEER_resolve (cp->pid,
2040                            &pid);
2041       next = mig_head;
2042       while (NULL != (mb = next))
2043         {
2044           next = mb->next;
2045           for (i=0;i<MIGRATION_LIST_SIZE;i++)
2046             {
2047               if ( (cp->pid == mb->target_list[i]) &&
2048                    (mb->size + sizeof (migm) <= size) )
2049                 {
2050                   GNUNET_PEER_change_rc (mb->target_list[i], -1);
2051                   mb->target_list[i] = 0;
2052                   mb->used_targets++;
2053                   memset (&migm, 0, sizeof (migm));
2054                   migm.header.size = htons (sizeof (migm) + mb->size);
2055                   migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2056                   migm.type = htonl (mb->type);
2057                   migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
2058                   memcpy (&cbuf[msize], &migm, sizeof (migm));
2059                   msize += sizeof (migm);
2060                   size -= sizeof (migm);
2061                   memcpy (&cbuf[msize], &mb[1], mb->size);
2062                   msize += mb->size;
2063                   size -= mb->size;
2064 #if DEBUG_FS
2065                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2066                               "Pushing migration block `%s' (%u bytes) to `%s'\n",
2067                               GNUNET_h2s (&mb->query),
2068                               (unsigned int) mb->size,
2069                               GNUNET_i2s (&pid));
2070 #endif    
2071                   break;
2072                 }
2073               else
2074                 {
2075 #if DEBUG_FS
2076                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2077                               "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
2078                               GNUNET_h2s (&mb->query),
2079                               (unsigned int) mb->size,
2080                               GNUNET_i2s (&pid));
2081 #endif    
2082                 }
2083             }
2084           if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
2085                (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
2086             {
2087               delete_migration_block (mb);
2088               consider_migration_gathering ();
2089             }
2090         }
2091       consider_migration (NULL, 
2092                           &pid.hashPubKey,
2093                           cp);
2094     }
2095 #if DEBUG_FS
2096   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2097               "Transmitting %u bytes to peer with PID %u\n",
2098               (unsigned int) msize,
2099               (unsigned int) cp->pid);
2100 #endif
2101   return msize;
2102 }
2103
2104
2105 /**
2106  * Add a message to the set of pending messages for the given peer.
2107  *
2108  * @param cp peer to send message to
2109  * @param pm message to queue
2110  * @param pr request on which behalf this message is being queued
2111  */
2112 static void
2113 add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
2114                                   struct PendingMessage *pm,
2115                                   struct PendingRequest *pr)
2116 {
2117   struct PendingMessage *pos;
2118   struct PendingMessageList *pml;
2119   struct GNUNET_PeerIdentity pid;
2120
2121   GNUNET_assert (pm->next == NULL);
2122   GNUNET_assert (pm->pml == NULL);    
2123   if (pr != NULL)
2124     {
2125       pml = GNUNET_malloc (sizeof (struct PendingMessageList));
2126       pml->req = pr;
2127       pml->target = cp;
2128       pml->pm = pm;
2129       pm->pml = pml;  
2130       GNUNET_CONTAINER_DLL_insert (pr->pending_head,
2131                                    pr->pending_tail,
2132                                    pml);
2133     }
2134   pos = cp->pending_messages_head;
2135   while ( (pos != NULL) &&
2136           (pm->priority < pos->priority) )
2137     pos = pos->next;    
2138   GNUNET_CONTAINER_DLL_insert_after (cp->pending_messages_head,
2139                                      cp->pending_messages_tail,
2140                                      pos,
2141                                      pm);
2142   cp->pending_requests++;
2143   if (cp->pending_requests > MAX_QUEUE_PER_PEER)
2144     destroy_pending_message (cp->pending_messages_tail, 0);  
2145   GNUNET_PEER_resolve (cp->pid, &pid);
2146   if (NULL != cp->cth)
2147     {
2148       GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
2149       cp->cth = NULL;
2150     }
2151   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
2152     {
2153       GNUNET_SCHEDULER_cancel (sched, cp->delayed_transmission_request_task);
2154       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
2155     }
2156   /* need to schedule transmission */
2157   cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
2158   cp->cth = GNUNET_CORE_notify_transmit_ready (core,
2159                                                cp->pending_messages_head->priority,
2160                                                MAX_TRANSMIT_DELAY,
2161                                                &pid,
2162                                                cp->pending_messages_head->msize,
2163                                                &transmit_to_peer,
2164                                                cp);
2165   if (cp->cth == NULL)
2166     {
2167 #if DEBUG_FS
2168       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2169                   "Failed to schedule transmission with core!\n");
2170 #endif
2171       GNUNET_STATISTICS_update (stats,
2172                                 gettext_noop ("# CORE transmission failures"),
2173                                 1,
2174                                 GNUNET_NO);
2175     }
2176 }
2177
2178
2179 /**
2180  * Test if the DATABASE (GET) load on this peer is too high
2181  * to even consider processing the query at
2182  * all.  
2183  * 
2184  * @return GNUNET_YES if the load is too high to do anything (load high)
2185  *         GNUNET_NO to process normally (load normal)
2186  *         GNUNET_SYSERR to process for free (load low)
2187  */
2188 static int
2189 test_get_load_too_high (uint32_t priority)
2190 {
2191   double ld;
2192
2193   ld = GNUNET_LOAD_get_load (datastore_get_load);
2194   if (ld < 1)
2195     {
2196       GNUNET_STATISTICS_update (stats,
2197                                 gettext_noop ("# requests done for free (low load)"),
2198                                 1,
2199                                 GNUNET_NO);
2200       return GNUNET_SYSERR;
2201     }
2202   if (ld <= priority)
2203     {
2204       GNUNET_STATISTICS_update (stats,
2205                                 gettext_noop ("# requests done for a price (normal load)"),
2206                                 1,
2207                                 GNUNET_NO);
2208       return GNUNET_NO;
2209     }
2210   GNUNET_STATISTICS_update (stats,
2211                             gettext_noop ("# requests dropped due to high load"),
2212                             1,
2213                             GNUNET_NO);
2214   return GNUNET_YES;
2215 }
2216
2217
2218
2219
2220 /**
2221  * Test if the DATABASE (PUT) load on this peer is too high
2222  * to even consider processing the query at
2223  * all.  
2224  * 
2225  * @return GNUNET_YES if the load is too high to do anything (load high)
2226  *         GNUNET_NO to process normally (load normal or low)
2227  */
2228 static int
2229 test_put_load_too_high (uint32_t priority)
2230 {
2231   double ld;
2232
2233   if (GNUNET_LOAD_get_average (datastore_put_load) < 50)
2234     return GNUNET_NO; /* very fast */
2235   ld = GNUNET_LOAD_get_load (datastore_put_load);
2236   if ( (ld < 1) || (ld < priority) )
2237     return GNUNET_NO;
2238   GNUNET_STATISTICS_update (stats,
2239                             gettext_noop ("# storage requests dropped due to high load"),
2240                             1,
2241                             GNUNET_NO);
2242   return GNUNET_YES;
2243 }
2244
2245
2246 /* ******************* Pending Request Refresh Task ******************** */
2247
2248
2249
2250 /**
2251  * We use a random delay to make the timing of requests less
2252  * predictable.  This function returns such a random delay.  We add a base
2253  * delay of MAX_CORK_DELAY (1s).
2254  *
2255  * FIXME: make schedule dependent on the specifics of the request?
2256  * Or bandwidth and number of connected peers and load?
2257  *
2258  * @return random delay to use for some request, between 1s and 1000+TTL_DECREMENT ms
2259  */
2260 static struct GNUNET_TIME_Relative
2261 get_processing_delay ()
2262 {
2263   return 
2264     GNUNET_TIME_relative_add (GNUNET_CONSTANTS_MAX_CORK_DELAY,
2265                               GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
2266                                                              GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2267                                                                                        TTL_DECREMENT)));
2268 }
2269
2270
2271 /**
2272  * We're processing a GET request from another peer and have decided
2273  * to forward it to other peers.  This function is called periodically
2274  * and should forward the request to other peers until we have all
2275  * possible replies.  If we have transmitted the *only* reply to
2276  * the initiator we should destroy the pending request.  If we have
2277  * many replies in the queue to the initiator, we should delay sending
2278  * out more queries until the reply queue has shrunk some.
2279  *
2280  * @param cls our "struct ProcessGetContext *"
2281  * @param tc unused
2282  */
2283 static void
2284 forward_request_task (void *cls,
2285                       const struct GNUNET_SCHEDULER_TaskContext *tc);
2286
2287
2288 /**
2289  * Function called after we either failed or succeeded
2290  * at transmitting a query to a peer.  
2291  *
2292  * @param cls the requests "struct PendingRequest*"
2293  * @param tpid ID of receiving peer, 0 on transmission error
2294  */
2295 static void
2296 transmit_query_continuation (void *cls,
2297                              GNUNET_PEER_Id tpid)
2298 {
2299   struct PendingRequest *pr = cls;
2300
2301   GNUNET_STATISTICS_update (stats,
2302                             gettext_noop ("# queries scheduled for forwarding"),
2303                             -1,
2304                             GNUNET_NO);
2305   if (tpid == 0)   
2306     {
2307 #if DEBUG_FS
2308       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2309                   "Transmission of request failed, will try again later.\n");
2310 #endif
2311       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2312         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2313                                                  get_processing_delay (),
2314                                                  &forward_request_task,
2315                                                  pr); 
2316       return;    
2317     }
2318   GNUNET_STATISTICS_update (stats,
2319                             gettext_noop ("# queries forwarded"),
2320                             1,
2321                             GNUNET_NO);
2322   GNUNET_PEER_change_rc (tpid, 1);
2323   if (pr->used_pids_off == pr->used_pids_size)
2324     GNUNET_array_grow (pr->used_pids,
2325                        pr->used_pids_size,
2326                        pr->used_pids_size * 2 + 2);
2327   pr->used_pids[pr->used_pids_off++] = tpid;
2328   if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2329     pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2330                                              get_processing_delay (),
2331                                              &forward_request_task,
2332                                              pr);
2333 }
2334
2335
2336 /**
2337  * How many bytes should a bloomfilter be if we have already seen
2338  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
2339  * of bits set per entry.  Furthermore, we should not re-size the
2340  * filter too often (to keep it cheap).
2341  *
2342  * Since other peers will also add entries but not resize the filter,
2343  * we should generally pick a slightly larger size than what the
2344  * strict math would suggest.
2345  *
2346  * @return must be a power of two and smaller or equal to 2^15.
2347  */
2348 static size_t
2349 compute_bloomfilter_size (unsigned int entry_count)
2350 {
2351   size_t size;
2352   unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
2353   uint16_t max = 1 << 15;
2354
2355   if (entry_count > max)
2356     return max;
2357   size = 8;
2358   while ((size < max) && (size < ideal))
2359     size *= 2;
2360   if (size > max)
2361     return max;
2362   return size;
2363 }
2364
2365
2366 /**
2367  * Recalculate our bloom filter for filtering replies.  This function
2368  * will create a new bloom filter from scratch, so it should only be
2369  * called if we have no bloomfilter at all (and hence can create a
2370  * fresh one of minimal size without problems) OR if our peer is the
2371  * initiator (in which case we may resize to larger than mimimum size).
2372  *
2373  * @param pr request for which the BF is to be recomputed
2374  */
2375 static void
2376 refresh_bloomfilter (struct PendingRequest *pr)
2377 {
2378   unsigned int i;
2379   size_t nsize;
2380   GNUNET_HashCode mhash;
2381
2382   nsize = compute_bloomfilter_size (pr->replies_seen_off);
2383   if (nsize == pr->bf_size)
2384     return; /* size not changed */
2385   if (pr->bf != NULL)
2386     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
2387   pr->bf_size = nsize;
2388   pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
2389   pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
2390                                               pr->bf_size,
2391                                               BLOOMFILTER_K);
2392   for (i=0;i<pr->replies_seen_off;i++)
2393     {
2394       GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i],
2395                                 pr->mingle,
2396                                 &mhash);
2397       GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
2398     }
2399 }
2400
2401
2402 /**
2403  * Function called after we've tried to reserve a certain amount of
2404  * bandwidth for a reply.  Check if we succeeded and if so send our
2405  * query.
2406  *
2407  * @param cls the requests "struct PendingRequest*"
2408  * @param peer identifies the peer
2409  * @param bpm_in set to the current bandwidth limit (receiving) for this peer
2410  * @param bpm_out set to the current bandwidth limit (sending) for this peer
2411  * @param amount set to the amount that was actually reserved or unreserved
2412  * @param preference current traffic preference for the given peer
2413  */
2414 static void
2415 target_reservation_cb (void *cls,
2416                        const struct
2417                        GNUNET_PeerIdentity * peer,
2418                        struct GNUNET_BANDWIDTH_Value32NBO bpm_in,
2419                        struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
2420                        int amount,
2421                        uint64_t preference)
2422 {
2423   struct PendingRequest *pr = cls;
2424   struct ConnectedPeer *cp;
2425   struct PendingMessage *pm;
2426   struct GetMessage *gm;
2427   GNUNET_HashCode *ext;
2428   char *bfdata;
2429   size_t msize;
2430   unsigned int k;
2431   int no_route;
2432   uint32_t bm;
2433
2434   pr->irc = NULL;
2435   if (peer == NULL)
2436     {
2437       /* error in communication with core, try again later */
2438       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2439         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2440                                                  get_processing_delay (),
2441                                                  &forward_request_task,
2442                                                  pr);
2443       return;
2444     }
2445   // (3) transmit, update ttl/priority
2446   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2447                                           &peer->hashPubKey);
2448   if (cp == NULL)
2449     {
2450       /* Peer must have just left */
2451 #if DEBUG_FS
2452       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2453                   "Selected peer disconnected!\n");
2454 #endif
2455       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2456         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2457                                                  get_processing_delay (),
2458                                                  &forward_request_task,
2459                                                  pr);
2460       return;
2461     }
2462   no_route = GNUNET_NO;
2463   if (amount == 0)
2464     {
2465       if (pr->cp == NULL)
2466         {
2467 #if DEBUG_FS > 1
2468           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2469                       "Failed to reserve bandwidth for reply (got %d/%u bytes only)!\n",
2470                       amount,
2471                       DBLOCK_SIZE);
2472 #endif
2473           GNUNET_STATISTICS_update (stats,
2474                                     gettext_noop ("# reply bandwidth reservation requests failed"),
2475                                     1,
2476                                     GNUNET_NO);
2477           if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2478             pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2479                                                      get_processing_delay (),
2480                                                      &forward_request_task,
2481                                                      pr);
2482           return;  /* this target round failed */
2483         }
2484       no_route = GNUNET_YES;
2485     }
2486   
2487   GNUNET_STATISTICS_update (stats,
2488                             gettext_noop ("# queries scheduled for forwarding"),
2489                             1,
2490                             GNUNET_NO);
2491   /* build message and insert message into priority queue */
2492 #if DEBUG_FS
2493   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2494               "Forwarding request `%s' to `%4s'!\n",
2495               GNUNET_h2s (&pr->query),
2496               GNUNET_i2s (peer));
2497 #endif
2498   k = 0;
2499   bm = 0;
2500   if (GNUNET_YES == no_route)
2501     {
2502       bm |= GET_MESSAGE_BIT_RETURN_TO;
2503       k++;      
2504     }
2505   if (pr->namespace != NULL)
2506     {
2507       bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
2508       k++;
2509     }
2510   if (pr->target_pid != 0)
2511     {
2512       bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
2513       k++;
2514     }
2515   msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
2516   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
2517   pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
2518   pm->msize = msize;
2519   gm = (struct GetMessage*) &pm[1];
2520   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
2521   gm->header.size = htons (msize);
2522   gm->type = htonl (pr->type);
2523   pr->remaining_priority /= 2;
2524   gm->priority = htonl (pr->remaining_priority);
2525   gm->ttl = htonl (pr->ttl);
2526   gm->filter_mutator = htonl(pr->mingle); 
2527   gm->hash_bitmap = htonl (bm);
2528   gm->query = pr->query;
2529   ext = (GNUNET_HashCode*) &gm[1];
2530   k = 0;
2531   if (GNUNET_YES == no_route)
2532     GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2533   if (pr->namespace != NULL)
2534     memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
2535   if (pr->target_pid != 0)
2536     GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2537   bfdata = (char *) &ext[k];
2538   if (pr->bf != NULL)
2539     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
2540                                                bfdata,
2541                                                pr->bf_size);
2542   pm->cont = &transmit_query_continuation;
2543   pm->cont_cls = pr;
2544   add_to_pending_messages_for_peer (cp, pm, pr);
2545 }
2546
2547
2548 /**
2549  * Closure used for "target_peer_select_cb".
2550  */
2551 struct PeerSelectionContext 
2552 {
2553   /**
2554    * The request for which we are selecting
2555    * peers.
2556    */
2557   struct PendingRequest *pr;
2558
2559   /**
2560    * Current "prime" target.
2561    */
2562   struct GNUNET_PeerIdentity target;
2563
2564   /**
2565    * How much do we like this target?
2566    */
2567   double target_score;
2568
2569 };
2570
2571
2572 /**
2573  * Function called for each connected peer to determine
2574  * which one(s) would make good targets for forwarding.
2575  *
2576  * @param cls closure (struct PeerSelectionContext)
2577  * @param key current key code (peer identity)
2578  * @param value value in the hash map (struct ConnectedPeer)
2579  * @return GNUNET_YES if we should continue to
2580  *         iterate,
2581  *         GNUNET_NO if not.
2582  */
2583 static int
2584 target_peer_select_cb (void *cls,
2585                        const GNUNET_HashCode * key,
2586                        void *value)
2587 {
2588   struct PeerSelectionContext *psc = cls;
2589   struct ConnectedPeer *cp = value;
2590   struct PendingRequest *pr = psc->pr;
2591   double score;
2592   unsigned int i;
2593   unsigned int pc;
2594
2595   /* 1) check that this peer is not the initiator */
2596   if (cp == pr->cp)
2597     {
2598 #if DEBUG_FS
2599       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2600                   "Skipping initiator in forwarding selection\n");
2601 #endif
2602       return GNUNET_YES; /* skip */        
2603     }
2604
2605   /* 2) check if we have already (recently) forwarded to this peer */
2606   pc = 0;
2607   for (i=0;i<pr->used_pids_off;i++)
2608     if (pr->used_pids[i] == cp->pid) 
2609       {
2610         pc++;
2611         if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2612                                            RETRY_PROBABILITY_INV))
2613           {
2614 #if DEBUG_FS
2615             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2616                         "NOT re-trying query that was previously transmitted %u times\n",
2617                         (unsigned int) pr->used_pids_off);
2618 #endif
2619             return GNUNET_YES; /* skip */
2620           }
2621       }
2622 #if DEBUG_FS
2623   if (0 < pc)
2624     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2625                 "Re-trying query that was previously transmitted %u times to this peer\n",
2626                 (unsigned int) pc);
2627 #endif
2628   /* 3) calculate how much we'd like to forward to this peer,
2629      starting with a random value that is strong enough
2630      to at least give any peer a chance sometimes 
2631      (compared to the other factors that come later) */
2632   /* 3a) count successful (recent) routes from cp for same source */
2633   if (pr->cp != NULL)
2634     {
2635       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2636                                         P2P_SUCCESS_LIST_SIZE);
2637       for (i=0;i<P2P_SUCCESS_LIST_SIZE;i++)
2638         if (cp->last_p2p_replies[i] == pr->cp->pid)
2639           score += 1.0; /* likely successful based on hot path */
2640     }
2641   else
2642     {
2643       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2644                                         CS2P_SUCCESS_LIST_SIZE);
2645       for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
2646         if (cp->last_client_replies[i] == pr->client_request_list->client_list->client)
2647           score += 1.0; /* likely successful based on hot path */
2648     }
2649   /* 3b) include latency */
2650   if (cp->avg_delay.value < 4 * TTL_DECREMENT)
2651     score += 1.0; /* likely fast based on latency */
2652   /* 3c) include priorities */
2653   if (cp->avg_priority <= pr->remaining_priority / 2.0)
2654     score += 1.0; /* likely successful based on priorities */
2655   /* 3d) penalize for queue size */  
2656   score -= (2.0 * cp->pending_requests / (double) MAX_QUEUE_PER_PEER); 
2657   /* 3e) include peer proximity */
2658   score -= (2.0 * (GNUNET_CRYPTO_hash_distance_u32 (key,
2659                                                     &pr->query)) / (double) UINT32_MAX);
2660   /* 4) super-bonus for being the known target */
2661   if (pr->target_pid == cp->pid)
2662     score += 100.0;
2663   /* store best-fit in closure */
2664 #if DEBUG_FS
2665   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2666               "Peer `%s' gets score %f for forwarding query, max is %f\n",
2667               GNUNET_h2s (key),
2668               score,
2669               psc->target_score);
2670 #endif  
2671   score++; /* avoid zero */
2672   if (score > psc->target_score)
2673     {
2674       psc->target_score = score;
2675       psc->target.hashPubKey = *key; 
2676     }
2677   return GNUNET_YES;
2678 }
2679   
2680
2681 /**
2682  * The priority level imposes a bound on the maximum
2683  * value for the ttl that can be requested.
2684  *
2685  * @param ttl_in requested ttl
2686  * @param prio given priority
2687  * @return ttl_in if ttl_in is below the limit,
2688  *         otherwise the ttl-limit for the given priority
2689  */
2690 static int32_t
2691 bound_ttl (int32_t ttl_in, uint32_t prio)
2692 {
2693   unsigned long long allowed;
2694
2695   if (ttl_in <= 0)
2696     return ttl_in;
2697   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
2698   if (ttl_in > allowed)      
2699     {
2700       if (allowed >= (1 << 30))
2701         return 1 << 30;
2702       return allowed;
2703     }
2704   return ttl_in;
2705 }
2706
2707
2708 /**
2709  * Iterator called on each result obtained for a DHT
2710  * operation that expects a reply
2711  *
2712  * @param cls closure
2713  * @param exp when will this value expire
2714  * @param key key of the result
2715  * @param get_path NULL-terminated array of pointers
2716  *                 to the peers on reverse GET path (or NULL if not recorded)
2717  * @param put_path NULL-terminated array of pointers
2718  *                 to the peers on the PUT path (or NULL if not recorded)
2719  * @param type type of the result
2720  * @param size number of bytes in data
2721  * @param data pointer to the result data
2722  */
2723 static void
2724 process_dht_reply (void *cls,
2725                    struct GNUNET_TIME_Absolute exp,
2726                    const GNUNET_HashCode * key,
2727                    const struct GNUNET_PeerIdentity * const *get_path,
2728                    const struct GNUNET_PeerIdentity * const *put_path,
2729                    enum GNUNET_BLOCK_Type type,
2730                    size_t size,
2731                    const void *data);
2732
2733
2734 /**
2735  * We're processing a GET request and have decided
2736  * to forward it to other peers.  This function is called periodically
2737  * and should forward the request to other peers until we have all
2738  * possible replies.  If we have transmitted the *only* reply to
2739  * the initiator we should destroy the pending request.  If we have
2740  * many replies in the queue to the initiator, we should delay sending
2741  * out more queries until the reply queue has shrunk some.
2742  *
2743  * @param cls our "struct ProcessGetContext *"
2744  * @param tc unused
2745  */
2746 static void
2747 forward_request_task (void *cls,
2748                      const struct GNUNET_SCHEDULER_TaskContext *tc)
2749 {
2750   struct PendingRequest *pr = cls;
2751   struct PeerSelectionContext psc;
2752   struct ConnectedPeer *cp; 
2753   struct GNUNET_TIME_Relative delay;
2754
2755   pr->task = GNUNET_SCHEDULER_NO_TASK;
2756   if (pr->irc != NULL)
2757     {
2758 #if DEBUG_FS
2759       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2760                   "Forwarding of query `%s' not attempted due to pending local lookup!\n",
2761                   GNUNET_h2s (&pr->query));
2762 #endif
2763       return; /* already pending */
2764     }
2765   if (GNUNET_YES == pr->local_only)
2766     return; /* configured to not do P2P search */
2767   /* (0) try DHT */
2768   if ( (0 == pr->anonymity_level) &&
2769        (GNUNET_YES != pr->forward_only) &&
2770        (pr->type != GNUNET_BLOCK_TYPE_FS_DBLOCK) &&
2771        (pr->type != GNUNET_BLOCK_TYPE_FS_IBLOCK) )
2772     {
2773       pr->dht_get = GNUNET_DHT_get_start (dht_handle,
2774                                           GNUNET_TIME_UNIT_FOREVER_REL,
2775                                           pr->type,
2776                                           &pr->query,
2777                                           GNUNET_DHT_RO_NONE,
2778                                           pr->bf,
2779                                           pr->mingle,
2780                                           pr->namespace,
2781                                           (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
2782                                           &process_dht_reply,
2783                                           pr);
2784     }
2785   /* (1) select target */
2786   psc.pr = pr;
2787   psc.target_score = -DBL_MAX;
2788   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
2789                                          &target_peer_select_cb,
2790                                          &psc);  
2791   if (psc.target_score == -DBL_MAX)
2792     {
2793       delay = get_processing_delay ();
2794 #if DEBUG_FS 
2795       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2796                   "No peer selected for forwarding of query `%s', will try again in %llu ms!\n",
2797                   GNUNET_h2s (&pr->query),
2798                   delay.value);
2799 #endif
2800       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2801                                                delay,
2802                                                &forward_request_task,
2803                                                pr);
2804       return; /* nobody selected */
2805     }
2806   /* (3) update TTL/priority */
2807   if (pr->client_request_list != NULL)
2808     {
2809       /* FIXME: use better algorithm!? */
2810       if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2811                                          4))
2812         pr->priority++;
2813       /* bound priority we use by priorities we see from other peers
2814          rounded up (must round up so that we can see non-zero
2815          priorities, but round up as little as possible to make it
2816          plausible that we forwarded another peers request) */
2817       if (pr->priority > current_priorities + 1.0)
2818         pr->priority = (uint32_t) current_priorities + 1.0;
2819       pr->ttl = bound_ttl (pr->ttl + TTL_DECREMENT * 2,
2820                            pr->priority);
2821 #if DEBUG_FS
2822       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2823                   "Trying query `%s' with priority %u and TTL %d.\n",
2824                   GNUNET_h2s (&pr->query),
2825                   pr->priority,
2826                   pr->ttl);
2827 #endif
2828     }
2829
2830   /* (3) reserve reply bandwidth */
2831   if (GNUNET_NO == pr->forward_only)
2832     {
2833       cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2834                                               &psc.target.hashPubKey);
2835       GNUNET_assert (NULL != cp);
2836       pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
2837                                                     &psc.target,
2838                                                     GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
2839                                                     GNUNET_BANDWIDTH_value_init (UINT32_MAX),
2840                                                     DBLOCK_SIZE * 2, 
2841                                                     cp->inc_preference,
2842                                                     &target_reservation_cb,
2843                                                     pr);
2844       cp->inc_preference = 0;
2845     }
2846   else
2847     {
2848       /* force forwarding */
2849       static struct GNUNET_BANDWIDTH_Value32NBO zerobw;
2850       target_reservation_cb (pr, &psc.target,
2851                              zerobw, zerobw, 0, 0.0);
2852     }
2853 }
2854
2855
2856 /* **************************** P2P PUT Handling ************************ */
2857
2858
2859 /**
2860  * Function called after we either failed or succeeded
2861  * at transmitting a reply to a peer.  
2862  *
2863  * @param cls the requests "struct PendingRequest*"
2864  * @param tpid ID of receiving peer, 0 on transmission error
2865  */
2866 static void
2867 transmit_reply_continuation (void *cls,
2868                              GNUNET_PEER_Id tpid)
2869 {
2870   struct PendingRequest *pr = cls;
2871   
2872   switch (pr->type)
2873     {
2874     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
2875     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
2876       /* only one reply expected, done with the request! */
2877       destroy_pending_request (pr);
2878       break;
2879     case GNUNET_BLOCK_TYPE_ANY:
2880     case GNUNET_BLOCK_TYPE_FS_KBLOCK:
2881     case GNUNET_BLOCK_TYPE_FS_SBLOCK:
2882       break;
2883     default:
2884       GNUNET_break (0);
2885       break;
2886     }
2887 }
2888
2889
2890 /**
2891  * Transmit the given message by copying it to the target buffer
2892  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
2893  * for writing in the meantime.  In that case, do nothing
2894  * (the disconnect or shutdown handler will take care of the rest).
2895  * If we were able to transmit messages and there are still more
2896  * pending, ask core again for further calls to this function.
2897  *
2898  * @param cls closure, pointer to the 'struct ClientList*'
2899  * @param size number of bytes available in buf
2900  * @param buf where the callee should write the message
2901  * @return number of bytes written to buf
2902  */
2903 static size_t
2904 transmit_to_client (void *cls,
2905                   size_t size, void *buf)
2906 {
2907   struct ClientList *cl = cls;
2908   char *cbuf = buf;
2909   struct ClientResponseMessage *creply;
2910   size_t msize;
2911   
2912   cl->th = NULL;
2913   if (NULL == buf)
2914     {
2915 #if DEBUG_FS
2916       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2917                   "Not sending reply, client communication problem.\n");
2918 #endif
2919       return 0;
2920     }
2921   msize = 0;
2922   while ( (NULL != (creply = cl->res_head) ) &&
2923           (creply->msize <= size) )
2924     {
2925       memcpy (&cbuf[msize], &creply[1], creply->msize);
2926       msize += creply->msize;
2927       size -= creply->msize;
2928       GNUNET_CONTAINER_DLL_remove (cl->res_head,
2929                                    cl->res_tail,
2930                                    creply);
2931       GNUNET_free (creply);
2932     }
2933   if (NULL != creply)
2934     cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
2935                                                   creply->msize,
2936                                                   GNUNET_TIME_UNIT_FOREVER_REL,
2937                                                   &transmit_to_client,
2938                                                   cl);
2939 #if DEBUG_FS
2940   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2941               "Transmitted %u bytes to client\n",
2942               (unsigned int) msize);
2943 #endif
2944   return msize;
2945 }
2946
2947
2948 /**
2949  * Closure for "process_reply" function.
2950  */
2951 struct ProcessReplyClosure
2952 {
2953   /**
2954    * The data for the reply.
2955    */
2956   const void *data;
2957
2958   /**
2959    * Who gave us this reply? NULL for local host (or DHT)
2960    */
2961   struct ConnectedPeer *sender;
2962
2963   /**
2964    * When the reply expires.
2965    */
2966   struct GNUNET_TIME_Absolute expiration;
2967
2968   /**
2969    * Size of data.
2970    */
2971   size_t size;
2972
2973   /**
2974    * Type of the block.
2975    */
2976   enum GNUNET_BLOCK_Type type;
2977
2978   /**
2979    * How much was this reply worth to us?
2980    */
2981   uint32_t priority;
2982
2983   /**
2984    * Evaluation result (returned).
2985    */
2986   enum GNUNET_BLOCK_EvaluationResult eval;
2987
2988   /**
2989    * Did we finish processing the associated request?
2990    */ 
2991   int finished;
2992
2993   /**
2994    * Did we find a matching request?
2995    */
2996   int request_found;
2997 };
2998
2999
3000 /**
3001  * We have received a reply; handle it!
3002  *
3003  * @param cls response (struct ProcessReplyClosure)
3004  * @param key our query
3005  * @param value value in the hash map (info about the query)
3006  * @return GNUNET_YES (we should continue to iterate)
3007  */
3008 static int
3009 process_reply (void *cls,
3010                const GNUNET_HashCode * key,
3011                void *value)
3012 {
3013   struct ProcessReplyClosure *prq = cls;
3014   struct PendingRequest *pr = value;
3015   struct PendingMessage *reply;
3016   struct ClientResponseMessage *creply;
3017   struct ClientList *cl;
3018   struct PutMessage *pm;
3019   struct ConnectedPeer *cp;
3020   struct GNUNET_TIME_Relative cur_delay;
3021   size_t msize;
3022
3023 #if DEBUG_FS
3024   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3025               "Matched result (type %u) for query `%s' with pending request\n",
3026               (unsigned int) prq->type,
3027               GNUNET_h2s (key));
3028 #endif  
3029   GNUNET_STATISTICS_update (stats,
3030                             gettext_noop ("# replies received and matched"),
3031                             1,
3032                             GNUNET_NO);
3033   if (prq->sender != NULL)
3034     {
3035       cur_delay = GNUNET_TIME_absolute_get_duration (pr->start_time);
3036       prq->sender->avg_delay.value
3037         = (prq->sender->avg_delay.value * 
3038            (RUNAVG_DELAY_N - 1) + cur_delay.value) / RUNAVG_DELAY_N; 
3039       prq->sender->avg_priority
3040         = (prq->sender->avg_priority * 
3041            (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
3042       if (pr->cp != NULL)
3043         {
3044           GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
3045                                  [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], 
3046                                  -1);
3047           GNUNET_PEER_change_rc (pr->cp->pid, 1);
3048           prq->sender->last_p2p_replies
3049             [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
3050             = pr->cp->pid;
3051         }
3052       else
3053         {
3054           if (NULL != prq->sender->last_client_replies
3055               [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
3056             GNUNET_SERVER_client_drop (prq->sender->last_client_replies
3057                                        [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
3058           prq->sender->last_client_replies
3059             [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
3060             = pr->client_request_list->client_list->client;
3061           GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
3062         }
3063     }
3064   prq->eval = GNUNET_BLOCK_evaluate (block_ctx,
3065                                      prq->type,
3066                                      key,
3067                                      &pr->bf,
3068                                      pr->mingle,
3069                                      pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
3070                                      prq->data,
3071                                      prq->size);
3072   switch (prq->eval)
3073     {
3074     case GNUNET_BLOCK_EVALUATION_OK_MORE:
3075       break;
3076     case GNUNET_BLOCK_EVALUATION_OK_LAST:
3077       while (NULL != pr->pending_head)
3078         destroy_pending_message_list_entry (pr->pending_head);
3079       if (pr->qe != NULL)
3080         {
3081           if (pr->client_request_list != NULL)
3082             GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
3083                                         GNUNET_YES);
3084           GNUNET_DATASTORE_cancel (pr->qe);
3085           pr->qe = NULL;
3086         }
3087       pr->do_remove = GNUNET_YES;
3088       if (pr->task != GNUNET_SCHEDULER_NO_TASK)
3089         {
3090           GNUNET_SCHEDULER_cancel (sched,
3091                                    pr->task);
3092           pr->task = GNUNET_SCHEDULER_NO_TASK;
3093         }
3094       GNUNET_break (GNUNET_YES ==
3095                     GNUNET_CONTAINER_multihashmap_remove (query_request_map,
3096                                                           key,
3097                                                           pr));
3098       GNUNET_LOAD_update (rt_entry_lifetime,
3099                           GNUNET_TIME_absolute_get_duration (pr->start_time).value);
3100       break;
3101     case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
3102       GNUNET_STATISTICS_update (stats,
3103                                 gettext_noop ("# duplicate replies discarded (bloomfilter)"),
3104                                 1,
3105                                 GNUNET_NO);
3106 #if DEBUG_FS
3107       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3108                   "Duplicate response `%s', discarding.\n",
3109                   GNUNET_h2s (&mhash));
3110 #endif
3111       return GNUNET_YES; /* duplicate */
3112     case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
3113       return GNUNET_YES; /* wrong namespace */  
3114     case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
3115       GNUNET_break (0);
3116       return GNUNET_YES;
3117     case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
3118       GNUNET_break (0);
3119       return GNUNET_YES;
3120     case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
3121       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3122                   _("Unsupported block type %u\n"),
3123                   prq->type);
3124       return GNUNET_NO;
3125     }
3126   if (pr->client_request_list != NULL)
3127     {
3128       if (pr->replies_seen_size == pr->replies_seen_off)
3129         GNUNET_array_grow (pr->replies_seen,
3130                            pr->replies_seen_size,
3131                            pr->replies_seen_size * 2 + 4);      
3132       GNUNET_CRYPTO_hash (prq->data,
3133                           prq->size,
3134                           &pr->replies_seen[pr->replies_seen_off++]);         
3135       refresh_bloomfilter (pr);
3136     }
3137   if (NULL == prq->sender)
3138     {
3139 #if DEBUG_FS
3140       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3141                   "Found result for query `%s' in local datastore\n",
3142                   GNUNET_h2s (key));
3143 #endif
3144       GNUNET_STATISTICS_update (stats,
3145                                 gettext_noop ("# results found locally"),
3146                                 1,
3147                                 GNUNET_NO);      
3148     }
3149   prq->priority += pr->remaining_priority;
3150   pr->remaining_priority = 0;
3151   pr->results_found++;
3152   prq->request_found = GNUNET_YES;
3153   if (NULL != pr->client_request_list)
3154     {
3155       GNUNET_STATISTICS_update (stats,
3156                                 gettext_noop ("# replies received for local clients"),
3157                                 1,
3158                                 GNUNET_NO);
3159       cl = pr->client_request_list->client_list;
3160       msize = sizeof (struct PutMessage) + prq->size;
3161       creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage));
3162       creply->msize = msize;
3163       creply->client_list = cl;
3164       GNUNET_CONTAINER_DLL_insert_after (cl->res_head,
3165                                          cl->res_tail,
3166                                          cl->res_tail,
3167                                          creply);      
3168       pm = (struct PutMessage*) &creply[1];
3169       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3170       pm->header.size = htons (msize);
3171       pm->type = htonl (prq->type);
3172       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3173       memcpy (&pm[1], prq->data, prq->size);      
3174       if (NULL == cl->th)
3175         {
3176 #if DEBUG_FS
3177           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3178                       "Transmitting result for query `%s' to client\n",
3179                       GNUNET_h2s (key));
3180 #endif  
3181           cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
3182                                                         msize,
3183                                                         GNUNET_TIME_UNIT_FOREVER_REL,
3184                                                         &transmit_to_client,
3185                                                         cl);
3186         }
3187       GNUNET_break (cl->th != NULL);
3188       if (pr->do_remove)                
3189         {
3190           prq->finished = GNUNET_YES;
3191           destroy_pending_request (pr);         
3192         }
3193     }
3194   else
3195     {
3196       cp = pr->cp;
3197 #if DEBUG_FS
3198       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3199                   "Transmitting result for query `%s' to other peer (PID=%u)\n",
3200                   GNUNET_h2s (key),
3201                   (unsigned int) cp->pid);
3202 #endif  
3203       GNUNET_STATISTICS_update (stats,
3204                                 gettext_noop ("# replies received for other peers"),
3205                                 1,
3206                                 GNUNET_NO);
3207       msize = sizeof (struct PutMessage) + prq->size;
3208       reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
3209       reply->cont = &transmit_reply_continuation;
3210       reply->cont_cls = pr;
3211 #if SUPPORT_DELAYS
3212       reply->delay_until 
3213         = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
3214                                                                            GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3215                                                                                                      TTL_DECREMENT)));
3216 #endif
3217       reply->msize = msize;
3218       reply->priority = UINT32_MAX; /* send replies first! */
3219       pm = (struct PutMessage*) &reply[1];
3220       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3221       pm->header.size = htons (msize);
3222       pm->type = htonl (prq->type);
3223       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3224       memcpy (&pm[1], prq->data, prq->size);
3225       add_to_pending_messages_for_peer (cp, reply, pr);
3226     }
3227   return GNUNET_YES;
3228 }
3229
3230
3231 /**
3232  * Iterator called on each result obtained for a DHT
3233  * operation that expects a reply
3234  *
3235  * @param cls closure
3236  * @param exp when will this value expire
3237  * @param key key of the result
3238  * @param get_path NULL-terminated array of pointers
3239  *                 to the peers on reverse GET path (or NULL if not recorded)
3240  * @param put_path NULL-terminated array of pointers
3241  *                 to the peers on the PUT path (or NULL if not recorded)
3242  * @param type type of the result
3243  * @param size number of bytes in data
3244  * @param data pointer to the result data
3245  */
3246 static void
3247 process_dht_reply (void *cls,
3248                    struct GNUNET_TIME_Absolute exp,
3249                    const GNUNET_HashCode * key,
3250                    const struct GNUNET_PeerIdentity * const *get_path,
3251                    const struct GNUNET_PeerIdentity * const *put_path,
3252                    enum GNUNET_BLOCK_Type type,
3253                    size_t size,
3254                    const void *data)
3255 {
3256   struct PendingRequest *pr = cls;
3257   struct ProcessReplyClosure prq;
3258
3259   memset (&prq, 0, sizeof (prq));
3260   prq.data = data;
3261   prq.expiration = exp;
3262   prq.size = size;  
3263   prq.type = type;
3264   process_reply (&prq, key, pr);
3265 }
3266
3267
3268
3269 /**
3270  * Continuation called to notify client about result of the
3271  * operation.
3272  *
3273  * @param cls closure
3274  * @param success GNUNET_SYSERR on failure
3275  * @param msg NULL on success, otherwise an error message
3276  */
3277 static void 
3278 put_migration_continuation (void *cls,
3279                             int success,
3280                             const char *msg)
3281 {
3282   struct GNUNET_TIME_Absolute *start = cls;
3283   struct GNUNET_TIME_Relative delay;
3284   
3285   delay = GNUNET_TIME_absolute_get_duration (*start);
3286   GNUNET_free (start);
3287   GNUNET_LOAD_update (datastore_put_load,
3288                       delay.value);
3289   if (GNUNET_OK == success)
3290     return;
3291   GNUNET_STATISTICS_update (stats,
3292                             gettext_noop ("# datastore 'put' failures"),
3293                             1,
3294                             GNUNET_NO);
3295 }
3296
3297
3298 /**
3299  * Handle P2P "PUT" message.
3300  *
3301  * @param cls closure, always NULL
3302  * @param other the other peer involved (sender or receiver, NULL
3303  *        for loopback messages where we are both sender and receiver)
3304  * @param message the actual message
3305  * @param latency reported latency of the connection with 'other'
3306  * @param distance reported distance (DV) to 'other' 
3307  * @return GNUNET_OK to keep the connection open,
3308  *         GNUNET_SYSERR to close it (signal serious error)
3309  */
3310 static int
3311 handle_p2p_put (void *cls,
3312                 const struct GNUNET_PeerIdentity *other,
3313                 const struct GNUNET_MessageHeader *message,
3314                 struct GNUNET_TIME_Relative latency,
3315                 uint32_t distance)
3316 {
3317   const struct PutMessage *put;
3318   uint16_t msize;
3319   size_t dsize;
3320   enum GNUNET_BLOCK_Type type;
3321   struct GNUNET_TIME_Absolute expiration;
3322   GNUNET_HashCode query;
3323   struct ProcessReplyClosure prq;
3324   struct GNUNET_TIME_Absolute *start;
3325   struct GNUNET_TIME_Relative block_time;  
3326   double putl;
3327   struct ConnectedPeer *cp; 
3328   struct PendingMessage *pm;
3329   struct MigrationStopMessage *msm;
3330
3331   msize = ntohs (message->size);
3332   if (msize < sizeof (struct PutMessage))
3333     {
3334       GNUNET_break_op(0);
3335       return GNUNET_SYSERR;
3336     }
3337   put = (const struct PutMessage*) message;
3338   dsize = msize - sizeof (struct PutMessage);
3339   type = ntohl (put->type);
3340   expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
3341
3342   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3343     return GNUNET_SYSERR;
3344   if (GNUNET_OK !=
3345       GNUNET_BLOCK_get_key (block_ctx,
3346                             type,
3347                             &put[1],
3348                             dsize,
3349                             &query))
3350     {
3351       GNUNET_break_op (0);
3352       return GNUNET_SYSERR;
3353     }
3354 #if DEBUG_FS
3355   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3356               "Received result for query `%s' from peer `%4s'\n",
3357               GNUNET_h2s (&query),
3358               GNUNET_i2s (other));
3359 #endif
3360   GNUNET_STATISTICS_update (stats,
3361                             gettext_noop ("# replies received (overall)"),
3362                             1,
3363                             GNUNET_NO);
3364   /* now, lookup 'query' */
3365   prq.data = (const void*) &put[1];
3366   if (other != NULL)
3367     prq.sender = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3368                                                     &other->hashPubKey);
3369   else
3370     prq.sender = NULL;
3371   prq.size = dsize;
3372   prq.type = type;
3373   prq.expiration = expiration;
3374   prq.priority = 0;
3375   prq.finished = GNUNET_NO;
3376   prq.request_found = GNUNET_NO;
3377   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3378                                               &query,
3379                                               &process_reply,
3380                                               &prq);
3381   if (prq.sender != NULL)
3382     {
3383       prq.sender->inc_preference += CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority;
3384       prq.sender->trust += prq.priority;
3385     }
3386   if ( (GNUNET_YES == active_migration) &&
3387        (GNUNET_NO == test_put_load_too_high (prq.priority)) )
3388     {      
3389 #if DEBUG_FS
3390       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3391                   "Replicating result for query `%s' with priority %u\n",
3392                   GNUNET_h2s (&query),
3393                   prq.priority);
3394 #endif
3395       start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
3396       *start = GNUNET_TIME_absolute_get ();
3397       GNUNET_DATASTORE_put (dsh,
3398                             0, &query, dsize, &put[1],
3399                             type, prq.priority, 1 /* anonymity */, 
3400                             expiration, 
3401                             1 + prq.priority, MAX_DATASTORE_QUEUE,
3402                             GNUNET_CONSTANTS_SERVICE_TIMEOUT,
3403                             &put_migration_continuation, 
3404                             start);
3405     }
3406   putl = GNUNET_LOAD_get_load (datastore_put_load);
3407   if ( (GNUNET_NO == prq.request_found) &&
3408        ( (GNUNET_YES != active_migration) ||
3409          (putl > 2.0) ) )
3410     {
3411       cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3412                                               &other->hashPubKey);
3413       if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).value < 5000)
3414         return GNUNET_OK; /* already blocked */
3415       /* We're too busy; send MigrationStop message! */
3416       if (GNUNET_YES != active_migration) 
3417         putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
3418       block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
3419                                                   5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3420                                                                                    (unsigned int) (60000 * putl * putl)));
3421       
3422       cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
3423       pm = GNUNET_malloc (sizeof (struct PendingMessage) + 
3424                           sizeof (struct MigrationStopMessage));
3425       pm->msize = sizeof (struct MigrationStopMessage);
3426       pm->priority = UINT32_MAX;
3427       msm = (struct MigrationStopMessage*) &pm[1];
3428       msm->header.size = htons (sizeof (struct MigrationStopMessage));
3429       msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
3430       msm->duration = GNUNET_TIME_relative_hton (block_time);
3431       add_to_pending_messages_for_peer (cp,
3432                                         pm,
3433                                         NULL);
3434     }
3435   return GNUNET_OK;
3436 }
3437
3438
3439 /**
3440  * Handle P2P "MIGRATION_STOP" message.
3441  *
3442  * @param cls closure, always NULL
3443  * @param other the other peer involved (sender or receiver, NULL
3444  *        for loopback messages where we are both sender and receiver)
3445  * @param message the actual message
3446  * @param latency reported latency of the connection with 'other'
3447  * @param distance reported distance (DV) to 'other' 
3448  * @return GNUNET_OK to keep the connection open,
3449  *         GNUNET_SYSERR to close it (signal serious error)
3450  */
3451 static int
3452 handle_p2p_migration_stop (void *cls,
3453                            const struct GNUNET_PeerIdentity *other,
3454                            const struct GNUNET_MessageHeader *message,
3455                            struct GNUNET_TIME_Relative latency,
3456                            uint32_t distance)
3457 {
3458   struct ConnectedPeer *cp; 
3459   const struct MigrationStopMessage *msm;
3460
3461   msm = (const struct MigrationStopMessage*) message;
3462   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3463                                           &other->hashPubKey);
3464   if (cp == NULL)
3465     {
3466       GNUNET_break (0);
3467       return GNUNET_OK;
3468     }
3469   cp->migration_blocked = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration));
3470   return GNUNET_OK;
3471 }
3472
3473
3474
3475 /* **************************** P2P GET Handling ************************ */
3476
3477
3478 /**
3479  * Closure for 'check_duplicate_request_{peer,client}'.
3480  */
3481 struct CheckDuplicateRequestClosure
3482 {
3483   /**
3484    * The new request we should check if it already exists.
3485    */
3486   const struct PendingRequest *pr;
3487
3488   /**
3489    * Existing request found by the checker, NULL if none.
3490    */
3491   struct PendingRequest *have;
3492 };
3493
3494
3495 /**
3496  * Iterator over entries in the 'query_request_map' that
3497  * tries to see if we have the same request pending from
3498  * the same client already.
3499  *
3500  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
3501  * @param key current key code (query, ignored, must match)
3502  * @param value value in the hash map (a 'struct PendingRequest' 
3503  *              that already exists)
3504  * @return GNUNET_YES if we should continue to
3505  *         iterate (no match yet)
3506  *         GNUNET_NO if not (match found).
3507  */
3508 static int
3509 check_duplicate_request_client (void *cls,
3510                                 const GNUNET_HashCode * key,
3511                                 void *value)
3512 {
3513   struct CheckDuplicateRequestClosure *cdc = cls;
3514   struct PendingRequest *have = value;
3515
3516   if (have->client_request_list == NULL)
3517     return GNUNET_YES;
3518   if ( (cdc->pr->client_request_list->client_list->client == have->client_request_list->client_list->client) &&
3519        (cdc->pr != have) )
3520     {
3521       cdc->have = have;
3522       return GNUNET_NO;
3523     }
3524   return GNUNET_YES;
3525 }
3526
3527
3528 /**
3529  * We're processing (local) results for a search request
3530  * from another peer.  Pass applicable results to the
3531  * peer and if we are done either clean up (operation
3532  * complete) or forward to other peers (more results possible).
3533  *
3534  * @param cls our closure (struct LocalGetContext)
3535  * @param key key for the content
3536  * @param size number of bytes in data
3537  * @param data content stored
3538  * @param type type of the content
3539  * @param priority priority of the content
3540  * @param anonymity anonymity-level for the content
3541  * @param expiration expiration time for the content
3542  * @param uid unique identifier for the datum;
3543  *        maybe 0 if no unique identifier is available
3544  */
3545 static void
3546 process_local_reply (void *cls,
3547                      const GNUNET_HashCode * key,
3548                      size_t size,
3549                      const void *data,
3550                      enum GNUNET_BLOCK_Type type,
3551                      uint32_t priority,
3552                      uint32_t anonymity,
3553                      struct GNUNET_TIME_Absolute
3554                      expiration, 
3555                      uint64_t uid)
3556 {
3557   struct PendingRequest *pr = cls;
3558   struct ProcessReplyClosure prq;
3559   struct CheckDuplicateRequestClosure cdrc;
3560   GNUNET_HashCode query;
3561   unsigned int old_rf;
3562   
3563   if (NULL == key)
3564     {
3565 #if DEBUG_FS > 1
3566       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3567                   "Done processing local replies, forwarding request to other peers.\n");
3568 #endif
3569       pr->qe = NULL;
3570       if (pr->client_request_list != NULL)
3571         {
3572           GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
3573                                       GNUNET_YES);
3574           /* Figure out if this is a duplicate request and possibly
3575              merge 'struct PendingRequest' entries */
3576           cdrc.have = NULL;
3577           cdrc.pr = pr;
3578           GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3579                                                       &pr->query,
3580                                                       &check_duplicate_request_client,
3581                                                       &cdrc);
3582           if (cdrc.have != NULL)
3583             {
3584 #if DEBUG_FS
3585               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3586                           "Received request for block `%s' twice from client, will only request once.\n",
3587                           GNUNET_h2s (&pr->query));
3588 #endif
3589               
3590               destroy_pending_request (pr);
3591               return;
3592             }
3593         }
3594
3595       /* no more results */
3596       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
3597         pr->task = GNUNET_SCHEDULER_add_now (sched,
3598                                              &forward_request_task,
3599                                              pr);      
3600       return;
3601     }
3602 #if DEBUG_FS
3603   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3604               "New local response to `%s' of type %u.\n",
3605               GNUNET_h2s (key),
3606               type);
3607 #endif
3608   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3609     {
3610 #if DEBUG_FS
3611       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3612                   "Found ONDEMAND block, performing on-demand encoding\n");
3613 #endif
3614       GNUNET_STATISTICS_update (stats,
3615                                 gettext_noop ("# on-demand blocks matched requests"),
3616                                 1,
3617                                 GNUNET_NO);
3618       if (GNUNET_OK != 
3619           GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, 
3620                                             anonymity, expiration, uid, 
3621                                             &process_local_reply,
3622                                             pr))
3623       if (pr->qe != NULL)
3624         {
3625           GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3626         }
3627       return;
3628     }
3629   old_rf = pr->results_found;
3630   memset (&prq, 0, sizeof (prq));
3631   prq.data = data;
3632   prq.expiration = expiration;
3633   prq.size = size;  
3634   if (GNUNET_OK != 
3635       GNUNET_BLOCK_get_key (block_ctx,
3636                             type,
3637                             data,
3638                             size,
3639                             &query))
3640     {
3641       GNUNET_break (0);
3642       GNUNET_DATASTORE_remove (dsh,
3643                                key,
3644                                size, data,
3645                                -1, -1, 
3646                                GNUNET_TIME_UNIT_FOREVER_REL,
3647                                NULL, NULL);
3648       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3649       return;
3650     }
3651   prq.type = type;
3652   prq.priority = priority;  
3653   prq.finished = GNUNET_NO;
3654   prq.request_found = GNUNET_NO;
3655   if ( (old_rf == 0) &&
3656        (pr->results_found == 0) )
3657     update_datastore_delays (pr->start_time);
3658   process_reply (&prq, key, pr);
3659   if (prq.finished == GNUNET_YES)
3660     return;
3661   if (pr->qe == NULL)
3662     return; /* done here */
3663   if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
3664     {
3665       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3666       return;
3667     }
3668   if ( (pr->client_request_list == NULL) &&
3669        ( (GNUNET_YES == test_get_load_too_high (0)) ||
3670          (pr->results_found > 5 + 2 * pr->priority) ) )
3671     {
3672 #if DEBUG_FS > 2
3673       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3674                   "Load too high, done with request\n");
3675 #endif
3676       GNUNET_STATISTICS_update (stats,
3677                                 gettext_noop ("# processing result set cut short due to load"),
3678                                 1,
3679                                 GNUNET_NO);
3680       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3681       return;
3682     }
3683   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3684 }
3685
3686
3687 /**
3688  * We've received a request with the specified priority.  Bound it
3689  * according to how much we trust the given peer.
3690  * 
3691  * @param prio_in requested priority
3692  * @param cp the peer making the request
3693  * @return effective priority
3694  */
3695 static int32_t
3696 bound_priority (uint32_t prio_in,
3697                 struct ConnectedPeer *cp)
3698 {
3699 #define N ((double)128.0)
3700   uint32_t ret;
3701   double rret;
3702   int ld;
3703
3704   ld = test_get_load_too_high (0);
3705   if (ld == GNUNET_SYSERR)
3706     return 0; /* excess resources */
3707   ret = change_host_trust (cp, prio_in);
3708   if (ret > 0)
3709     {
3710       if (ret > current_priorities + N)
3711         rret = current_priorities + N;
3712       else
3713         rret = ret;
3714       current_priorities 
3715         = (current_priorities * (N-1) + rret)/N;
3716     }
3717   if ( (ld == GNUNET_YES) && (ret > 0) )
3718     {
3719       /* try with charging */
3720       ld = test_get_load_too_high (ret);
3721     }
3722   if (ld == GNUNET_YES)
3723     {
3724       /* undo charge */
3725       if (ret != 0)
3726         change_host_trust (cp, -ret);
3727       return -1; /* not enough resources */
3728     }
3729 #undef N
3730   return ret;
3731 }
3732
3733
3734 /**
3735  * Iterator over entries in the 'query_request_map' that
3736  * tries to see if we have the same request pending from
3737  * the same peer already.
3738  *
3739  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
3740  * @param key current key code (query, ignored, must match)
3741  * @param value value in the hash map (a 'struct PendingRequest' 
3742  *              that already exists)
3743  * @return GNUNET_YES if we should continue to
3744  *         iterate (no match yet)
3745  *         GNUNET_NO if not (match found).
3746  */
3747 static int
3748 check_duplicate_request_peer (void *cls,
3749                               const GNUNET_HashCode * key,
3750                               void *value)
3751 {
3752   struct CheckDuplicateRequestClosure *cdc = cls;
3753   struct PendingRequest *have = value;
3754
3755   if (cdc->pr->target_pid == have->target_pid)
3756     {
3757       cdc->have = have;
3758       return GNUNET_NO;
3759     }
3760   return GNUNET_YES;
3761 }
3762
3763
3764 /**
3765  * Handle P2P "GET" request.
3766  *
3767  * @param cls closure, always NULL
3768  * @param other the other peer involved (sender or receiver, NULL
3769  *        for loopback messages where we are both sender and receiver)
3770  * @param message the actual message
3771  * @param latency reported latency of the connection with 'other'
3772  * @param distance reported distance (DV) to 'other' 
3773  * @return GNUNET_OK to keep the connection open,
3774  *         GNUNET_SYSERR to close it (signal serious error)
3775  */
3776 static int
3777 handle_p2p_get (void *cls,
3778                 const struct GNUNET_PeerIdentity *other,
3779                 const struct GNUNET_MessageHeader *message,
3780                 struct GNUNET_TIME_Relative latency,
3781                 uint32_t distance)
3782 {
3783   struct PendingRequest *pr;
3784   struct ConnectedPeer *cp;
3785   struct ConnectedPeer *cps;
3786   struct CheckDuplicateRequestClosure cdc;
3787   struct GNUNET_TIME_Relative timeout;
3788   uint16_t msize;
3789   const struct GetMessage *gm;
3790   unsigned int bits;
3791   const GNUNET_HashCode *opt;
3792   uint32_t bm;
3793   size_t bfsize;
3794   uint32_t ttl_decrement;
3795   int32_t priority;
3796   enum GNUNET_BLOCK_Type type;
3797   int have_ns;
3798
3799   msize = ntohs(message->size);
3800   if (msize < sizeof (struct GetMessage))
3801     {
3802       GNUNET_break_op (0);
3803       return GNUNET_SYSERR;
3804     }
3805   gm = (const struct GetMessage*) message;
3806   type = ntohl (gm->type);
3807   bm = ntohl (gm->hash_bitmap);
3808   bits = 0;
3809   while (bm > 0)
3810     {
3811       if (1 == (bm & 1))
3812         bits++;
3813       bm >>= 1;
3814     }
3815   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
3816     {
3817       GNUNET_break_op (0);
3818       return GNUNET_SYSERR;
3819     }  
3820   opt = (const GNUNET_HashCode*) &gm[1];
3821   bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
3822   bm = ntohl (gm->hash_bitmap);
3823   bits = 0;
3824   cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3825                                            &other->hashPubKey);
3826   if (NULL == cps)
3827     {
3828       /* peer must have just disconnected */
3829       GNUNET_STATISTICS_update (stats,
3830                                 gettext_noop ("# requests dropped due to initiator not being connected"),
3831                                 1,
3832                                 GNUNET_NO);
3833       return GNUNET_SYSERR;
3834     }
3835   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
3836     cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3837                                             &opt[bits++]);
3838   else
3839     cp = cps;
3840   if (cp == NULL)
3841     {
3842 #if DEBUG_FS
3843       if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
3844         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3845                     "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
3846                     GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
3847       
3848       else
3849         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3850                     "Failed to find peer `%4s' in connection set. Dropping query.\n",
3851                     GNUNET_i2s (other));
3852 #endif
3853       GNUNET_STATISTICS_update (stats,
3854                                 gettext_noop ("# requests dropped due to missing reverse route"),
3855                                 1,
3856                                 GNUNET_NO);
3857      /* FIXME: try connect? */
3858       return GNUNET_OK;
3859     }
3860   /* note that we can really only check load here since otherwise
3861      peers could find out that we are overloaded by not being
3862      disconnected after sending us a malformed query... */
3863   priority = bound_priority (ntohl (gm->priority), cps);
3864   if (priority < 0)
3865     {
3866 #if DEBUG_FS
3867       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3868                   "Dropping query from `%s', this peer is too busy.\n",
3869                   GNUNET_i2s (other));
3870 #endif
3871       GNUNET_STATISTICS_update (stats,
3872                                 gettext_noop ("# requests dropped due to high load"),
3873                                 1,
3874                                 GNUNET_NO);
3875       return GNUNET_OK;
3876     }
3877 #if DEBUG_FS 
3878   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3879               "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
3880               GNUNET_h2s (&gm->query),
3881               (unsigned int) type,
3882               GNUNET_i2s (other),
3883               (unsigned int) bm);
3884 #endif
3885   have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
3886   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
3887                       (have_ns ? sizeof(GNUNET_HashCode) : 0));
3888   if (have_ns)
3889     {
3890       pr->namespace = (GNUNET_HashCode*) &pr[1];
3891       memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
3892     }
3893   if ( (GNUNET_LOAD_get_load (cp->transmission_delay) > 3) ||
3894        (GNUNET_LOAD_get_average (cp->transmission_delay) > 
3895         GNUNET_CONSTANTS_MAX_CORK_DELAY.value * 2 + GNUNET_LOAD_get_average (rt_entry_lifetime)) )
3896     {
3897       /* don't have BW to send to peer, or would likely take longer than we have for it,
3898          so at best indirect the query */
3899       priority = 0;
3900       pr->forward_only = GNUNET_YES;
3901     }
3902   pr->type = type;
3903   pr->mingle = ntohl (gm->filter_mutator);
3904   if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
3905     pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
3906   pr->anonymity_level = 1;
3907   pr->priority = (uint32_t) priority;
3908   pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
3909   pr->query = gm->query;
3910   /* decrement ttl (always) */
3911   ttl_decrement = 2 * TTL_DECREMENT +
3912     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3913                               TTL_DECREMENT);
3914   if ( (pr->ttl < 0) &&
3915        (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
3916     {
3917 #if DEBUG_FS
3918       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3919                   "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
3920                   GNUNET_i2s (other),
3921                   pr->ttl,
3922                   ttl_decrement);
3923 #endif
3924       GNUNET_STATISTICS_update (stats,
3925                                 gettext_noop ("# requests dropped due TTL underflow"),
3926                                 1,
3927                                 GNUNET_NO);
3928       /* integer underflow => drop (should be very rare)! */      
3929       GNUNET_free (pr);
3930       return GNUNET_OK;
3931     } 
3932   pr->ttl -= ttl_decrement;
3933   pr->start_time = GNUNET_TIME_absolute_get ();
3934
3935   /* get bloom filter */
3936   if (bfsize > 0)
3937     {
3938       pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
3939                                                   bfsize,
3940                                                   BLOOMFILTER_K);
3941       pr->bf_size = bfsize;
3942     }
3943
3944   cdc.have = NULL;
3945   cdc.pr = pr;
3946   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3947                                               &gm->query,
3948                                               &check_duplicate_request_peer,
3949                                               &cdc);
3950   if (cdc.have != NULL)
3951     {
3952       if (cdc.have->start_time.value + cdc.have->ttl >=
3953           pr->start_time.value + pr->ttl)
3954         {
3955           /* existing request has higher TTL, drop new one! */
3956           cdc.have->priority += pr->priority;
3957           destroy_pending_request (pr);
3958 #if DEBUG_FS
3959           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3960                       "Have existing request with higher TTL, dropping new request.\n",
3961                       GNUNET_i2s (other));
3962 #endif
3963           GNUNET_STATISTICS_update (stats,
3964                                     gettext_noop ("# requests dropped due to higher-TTL request"),
3965                                     1,
3966                                     GNUNET_NO);
3967           return GNUNET_OK;
3968         }
3969       else
3970         {
3971           /* existing request has lower TTL, drop old one! */
3972           pr->priority += cdc.have->priority;
3973           /* Possible optimization: if we have applicable pending
3974              replies in 'cdc.have', we might want to move those over
3975              (this is a really rare special-case, so it is not clear
3976              that this would be worth it) */
3977           destroy_pending_request (cdc.have);
3978           /* keep processing 'pr'! */
3979         }
3980     }
3981
3982   pr->cp = cp;
3983   GNUNET_break (GNUNET_OK ==
3984                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
3985                                                    &gm->query,
3986                                                    pr,
3987                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
3988   GNUNET_break (GNUNET_OK ==
3989                 GNUNET_CONTAINER_multihashmap_put (peer_request_map,
3990                                                    &other->hashPubKey,
3991                                                    pr,
3992                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
3993   
3994   pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
3995                                             pr,
3996                                             pr->start_time.value + pr->ttl);
3997
3998   GNUNET_STATISTICS_update (stats,
3999                             gettext_noop ("# P2P searches received"),
4000                             1,
4001                             GNUNET_NO);
4002   GNUNET_STATISTICS_update (stats,
4003                             gettext_noop ("# P2P searches active"),
4004                             1,
4005                             GNUNET_NO);
4006
4007   /* calculate change in traffic preference */
4008   cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
4009   /* process locally */
4010   if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
4011     type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
4012   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
4013                                            (pr->priority + 1)); 
4014   if (GNUNET_YES != pr->forward_only)
4015     pr->qe = GNUNET_DATASTORE_get (dsh,
4016                                    &gm->query,
4017                                    type,                               
4018                                    pr->priority + 1,
4019                                    MAX_DATASTORE_QUEUE,                          
4020                                    timeout,
4021                                    &process_local_reply,
4022                                    pr);
4023   else
4024     GNUNET_STATISTICS_update (stats,
4025                               gettext_noop ("# requests forwarded due to high load"),
4026                               1,
4027                               GNUNET_NO);
4028
4029   /* Are multiple results possible (and did we look locally)?  If so, start processing remotely now! */
4030   switch (pr->type)
4031     {
4032     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
4033     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
4034       /* only one result, wait for datastore */
4035       if (GNUNET_YES != pr->forward_only)
4036         break;
4037     default:
4038       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
4039         pr->task = GNUNET_SCHEDULER_add_now (sched,
4040                                              &forward_request_task,
4041                                              pr);
4042     }
4043
4044   /* make sure we don't track too many requests */
4045   if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
4046     {
4047       pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
4048       GNUNET_assert (pr != NULL);
4049       destroy_pending_request (pr);
4050     }
4051   return GNUNET_OK;
4052 }
4053
4054
4055 /* **************************** CS GET Handling ************************ */
4056
4057
4058 /**
4059  * Handle START_SEARCH-message (search request from client).
4060  *
4061  * @param cls closure
4062  * @param client identification of the client
4063  * @param message the actual message
4064  */
4065 static void
4066 handle_start_search (void *cls,
4067                      struct GNUNET_SERVER_Client *client,
4068                      const struct GNUNET_MessageHeader *message)
4069 {
4070   static GNUNET_HashCode all_zeros;
4071   const struct SearchMessage *sm;
4072   struct ClientList *cl;
4073   struct ClientRequestList *crl;
4074   struct PendingRequest *pr;
4075   uint16_t msize;
4076   unsigned int sc;
4077   enum GNUNET_BLOCK_Type type;
4078
4079   msize = ntohs (message->size);
4080   if ( (msize < sizeof (struct SearchMessage)) ||
4081        (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
4082     {
4083       GNUNET_break (0);
4084       GNUNET_SERVER_receive_done (client,
4085                                   GNUNET_SYSERR);
4086       return;
4087     }
4088   GNUNET_STATISTICS_update (stats,
4089                             gettext_noop ("# client searches received"),
4090                             1,
4091                             GNUNET_NO);
4092   sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
4093   sm = (const struct SearchMessage*) message;
4094   type = ntohl (sm->type);
4095 #if DEBUG_FS
4096   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4097               "Received request for `%s' of type %u from local client\n",
4098               GNUNET_h2s (&sm->query),
4099               (unsigned int) type);
4100 #endif
4101   cl = client_list;
4102   while ( (cl != NULL) &&
4103           (cl->client != client) )
4104     cl = cl->next;
4105   if (cl == NULL)
4106     {
4107       cl = GNUNET_malloc (sizeof (struct ClientList));
4108       cl->client = client;
4109       GNUNET_SERVER_client_keep (client);
4110       cl->next = client_list;
4111       client_list = cl;
4112     }
4113   /* detect duplicate KBLOCK requests */
4114   if ( (type == GNUNET_BLOCK_TYPE_FS_KBLOCK) ||
4115        (type == GNUNET_BLOCK_TYPE_FS_NBLOCK) ||
4116        (type == GNUNET_BLOCK_TYPE_ANY) )
4117     {
4118       crl = cl->rl_head;
4119       while ( (crl != NULL) &&
4120               ( (0 != memcmp (&crl->req->query,
4121                               &sm->query,
4122                               sizeof (GNUNET_HashCode))) ||
4123                 (crl->req->type != type) ) )
4124         crl = crl->next;
4125       if (crl != NULL)  
4126         { 
4127 #if DEBUG_FS
4128           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4129                       "Have existing request, merging content-seen lists.\n");
4130 #endif
4131           pr = crl->req;
4132           /* Duplicate request (used to send long list of
4133              known/blocked results); merge 'pr->replies_seen'
4134              and update bloom filter */
4135           GNUNET_array_grow (pr->replies_seen,
4136                              pr->replies_seen_size,
4137                              pr->replies_seen_off + sc);
4138           memcpy (&pr->replies_seen[pr->replies_seen_off],
4139                   &sm[1],
4140                   sc * sizeof (GNUNET_HashCode));
4141           pr->replies_seen_off += sc;
4142           refresh_bloomfilter (pr);
4143           GNUNET_STATISTICS_update (stats,
4144                                     gettext_noop ("# client searches updated (merged content seen list)"),
4145                                     1,
4146                                     GNUNET_NO);
4147           GNUNET_SERVER_receive_done (client,
4148                                       GNUNET_OK);
4149           return;
4150         }
4151     }
4152   GNUNET_STATISTICS_update (stats,
4153                             gettext_noop ("# client searches active"),
4154                             1,
4155                             GNUNET_NO);
4156   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
4157                       ((type == GNUNET_BLOCK_TYPE_FS_SBLOCK) ? sizeof(GNUNET_HashCode) : 0));
4158   crl = GNUNET_malloc (sizeof (struct ClientRequestList));
4159   memset (crl, 0, sizeof (struct ClientRequestList));
4160   crl->client_list = cl;
4161   GNUNET_CONTAINER_DLL_insert (cl->rl_head,
4162                                cl->rl_tail,
4163                                crl);  
4164   crl->req = pr;
4165   pr->type = type;
4166   pr->client_request_list = crl;
4167   GNUNET_array_grow (pr->replies_seen,
4168                      pr->replies_seen_size,
4169                      sc);
4170   memcpy (pr->replies_seen,
4171           &sm[1],
4172           sc * sizeof (GNUNET_HashCode));
4173   pr->replies_seen_off = sc;
4174   pr->anonymity_level = ntohl (sm->anonymity_level); 
4175   pr->start_time = GNUNET_TIME_absolute_get ();
4176   refresh_bloomfilter (pr);
4177   pr->query = sm->query;
4178   if (0 == (1 & ntohl (sm->options)))
4179     pr->local_only = GNUNET_NO;
4180   else
4181     pr->local_only = GNUNET_YES;
4182   switch (type)
4183     {
4184     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
4185     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
4186       if (0 != memcmp (&sm->target,
4187                        &all_zeros,
4188                        sizeof (GNUNET_HashCode)))
4189         pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target);
4190       break;
4191     case GNUNET_BLOCK_TYPE_FS_SBLOCK:
4192       pr->namespace = (GNUNET_HashCode*) &pr[1];
4193       memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode));
4194       break;
4195     default:
4196       break;
4197     }
4198   GNUNET_break (GNUNET_OK ==
4199                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
4200                                                    &sm->query,
4201                                                    pr,
4202                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4203   if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
4204     type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */
4205   pr->qe = GNUNET_DATASTORE_get (dsh,
4206                                  &sm->query,
4207                                  type,
4208                                  -3, -1,
4209                                  GNUNET_CONSTANTS_SERVICE_TIMEOUT,                             
4210                                  &process_local_reply,
4211                                  pr);
4212 }
4213
4214
4215 /* **************************** Startup ************************ */
4216
4217 /**
4218  * Process fs requests.
4219  *
4220  * @param s scheduler to use
4221  * @param server the initialized server
4222  * @param c configuration to use
4223  */
4224 static int
4225 main_init (struct GNUNET_SCHEDULER_Handle *s,
4226            struct GNUNET_SERVER_Handle *server,
4227            const struct GNUNET_CONFIGURATION_Handle *c)
4228 {
4229   static const struct GNUNET_CORE_MessageHandler p2p_handlers[] =
4230     {
4231       { &handle_p2p_get, 
4232         GNUNET_MESSAGE_TYPE_FS_GET, 0 },
4233       { &handle_p2p_put, 
4234         GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
4235       { &handle_p2p_migration_stop, 
4236         GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP,
4237         sizeof (struct MigrationStopMessage) },
4238       { NULL, 0, 0 }
4239     };
4240   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
4241     {&GNUNET_FS_handle_index_start, NULL, 
4242      GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
4243     {&GNUNET_FS_handle_index_list_get, NULL, 
4244      GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
4245     {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
4246      sizeof (struct UnindexMessage) },
4247     {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
4248      0 },
4249     {NULL, NULL, 0, 0}
4250   };
4251   unsigned long long enc = 128;
4252
4253   sched = s;
4254   cfg = c;
4255   stats = GNUNET_STATISTICS_create (sched, "fs", cfg);
4256   min_migration_delay = GNUNET_TIME_UNIT_SECONDS;
4257   if ( (GNUNET_OK !=
4258         GNUNET_CONFIGURATION_get_value_number (cfg,
4259                                                "fs",
4260                                                "MAX_PENDING_REQUESTS",
4261                                                &max_pending_requests)) ||
4262        (GNUNET_OK !=
4263         GNUNET_CONFIGURATION_get_value_number (cfg,
4264                                                "fs",
4265                                                "EXPECTED_NEIGHBOUR_COUNT",
4266                                                &enc)) ||
4267        (GNUNET_OK != 
4268         GNUNET_CONFIGURATION_get_value_time (cfg,
4269                                              "fs",
4270                                              "MIN_MIGRATION_DELAY",
4271                                              &min_migration_delay)) )
4272     {
4273       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4274                   _("Configuration fails to specify certain parameters, assuming default values."));
4275     }
4276   connected_peers = GNUNET_CONTAINER_multihashmap_create (enc); 
4277   query_request_map = GNUNET_CONTAINER_multihashmap_create (max_pending_requests);
4278   rt_entry_lifetime = GNUNET_LOAD_value_init ();
4279   peer_request_map = GNUNET_CONTAINER_multihashmap_create (enc);
4280   requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
4281   core = GNUNET_CORE_connect (sched,
4282                               cfg,
4283                               GNUNET_TIME_UNIT_FOREVER_REL,
4284                               NULL,
4285                               NULL,
4286                               &peer_connect_handler,
4287                               &peer_disconnect_handler,
4288                               NULL,
4289                               NULL, GNUNET_NO,
4290                               NULL, GNUNET_NO,
4291                               p2p_handlers);
4292   if (NULL == core)
4293     {
4294       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
4295                   _("Failed to connect to `%s' service.\n"),
4296                   "core");
4297       GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
4298       connected_peers = NULL;
4299       GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
4300       query_request_map = NULL;
4301       GNUNET_LOAD_value_free (rt_entry_lifetime);
4302       rt_entry_lifetime = NULL;
4303       GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
4304       requests_by_expiration_heap = NULL;
4305       GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
4306       peer_request_map = NULL;
4307       if (dsh != NULL)
4308         {
4309           GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4310           dsh = NULL;
4311         }
4312       return GNUNET_SYSERR;
4313     }
4314   /* FIXME: distinguish between sending and storing in options? */
4315   if (active_migration) 
4316     {
4317       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4318                   _("Content migration is enabled, will start to gather data\n"));
4319       consider_migration_gathering ();
4320     }
4321   consider_dht_put_gathering (NULL);
4322   GNUNET_SERVER_disconnect_notify (server, 
4323                                    &handle_client_disconnect,
4324                                    NULL);
4325   GNUNET_assert (GNUNET_OK ==
4326                  GNUNET_CONFIGURATION_get_value_filename (cfg,
4327                                                           "fs",
4328                                                           "TRUST",
4329                                                           &trustDirectory));
4330   GNUNET_DISK_directory_create (trustDirectory);
4331   GNUNET_SCHEDULER_add_with_priority (sched,
4332                                       GNUNET_SCHEDULER_PRIORITY_HIGH,
4333                                       &cron_flush_trust, NULL);
4334
4335
4336   GNUNET_SERVER_add_handlers (server, handlers);
4337   GNUNET_SCHEDULER_add_delayed (sched,
4338                                 GNUNET_TIME_UNIT_FOREVER_REL,
4339                                 &shutdown_task,
4340                                 NULL);
4341   return GNUNET_OK;
4342 }
4343
4344
4345 /**
4346  * Process fs requests.
4347  *
4348  * @param cls closure
4349  * @param sched scheduler to use
4350  * @param server the initialized server
4351  * @param cfg configuration to use
4352  */
4353 static void
4354 run (void *cls,
4355      struct GNUNET_SCHEDULER_Handle *sched,
4356      struct GNUNET_SERVER_Handle *server,
4357      const struct GNUNET_CONFIGURATION_Handle *cfg)
4358 {
4359   active_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
4360                                                            "FS",
4361                                                            "ACTIVEMIGRATION");
4362   dsh = GNUNET_DATASTORE_connect (cfg,
4363                                   sched);
4364   if (dsh == NULL)
4365     {
4366       GNUNET_SCHEDULER_shutdown (sched);
4367       return;
4368     }
4369   datastore_get_load = GNUNET_LOAD_value_init ();
4370   datastore_put_load = GNUNET_LOAD_value_init ();
4371   block_cfg = GNUNET_CONFIGURATION_create ();
4372   GNUNET_CONFIGURATION_set_value_string (block_cfg,
4373                                          "block",
4374                                          "PLUGINS",
4375                                          "fs");
4376   block_ctx = GNUNET_BLOCK_context_create (block_cfg);
4377   GNUNET_assert (NULL != block_ctx);
4378   dht_handle = GNUNET_DHT_connect (sched,
4379                                    cfg,
4380                                    FS_DHT_HT_SIZE);
4381   if ( (GNUNET_OK != GNUNET_FS_indexing_init (sched, cfg, dsh)) ||
4382        (GNUNET_OK != main_init (sched, server, cfg)) )
4383     {    
4384       GNUNET_SCHEDULER_shutdown (sched);
4385       GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4386       dsh = NULL;
4387       GNUNET_DHT_disconnect (dht_handle);
4388       dht_handle = NULL;
4389       GNUNET_BLOCK_context_destroy (block_ctx);
4390       block_ctx = NULL;
4391       GNUNET_CONFIGURATION_destroy (block_cfg);
4392       block_cfg = NULL;
4393       GNUNET_LOAD_value_free (datastore_get_load);
4394       datastore_get_load = NULL;
4395       GNUNET_LOAD_value_free (datastore_put_load);
4396       datastore_put_load = NULL;
4397       return;   
4398     }
4399 }
4400
4401
4402 /**
4403  * The main function for the fs service.
4404  *
4405  * @param argc number of arguments from the command line
4406  * @param argv command line arguments
4407  * @return 0 ok, 1 on error
4408  */
4409 int
4410 main (int argc, char *const *argv)
4411 {
4412   return (GNUNET_OK ==
4413           GNUNET_SERVICE_run (argc,
4414                               argv,
4415                               "fs",
4416                               GNUNET_SERVICE_OPTION_NONE,
4417                               &run, NULL)) ? 0 : 1;
4418 }
4419
4420 /* end of gnunet-service-fs.c */