fix
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs.c
23  * @brief gnunet anonymity protocol implementation
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - introduce random latency in processing
28  * - more statistics
29  */
30 #include "platform.h"
31 #include <float.h>
32 #include "gnunet_constants.h"
33 #include "gnunet_core_service.h"
34 #include "gnunet_dht_service.h"
35 #include "gnunet_datastore_service.h"
36 #include "gnunet_load_lib.h"
37 #include "gnunet_peer_lib.h"
38 #include "gnunet_protocols.h"
39 #include "gnunet_signatures.h"
40 #include "gnunet_statistics_service.h"
41 #include "gnunet_util_lib.h"
42 #include "gnunet-service-fs_indexing.h"
43 #include "fs.h"
44
45 #define DEBUG_FS GNUNET_NO
46
47 /**
48  * Should we introduce random latency in processing?  Required for proper
49  * implementation of GAP, but can be disabled for performance evaluation of
50  * the basic routing algorithm.
51  */
52 #define SUPPORT_DELAYS GNUNET_NO
53
54 /**
55  * Maximum number of outgoing messages we queue per peer.
56  */
57 #define MAX_QUEUE_PER_PEER 16
58
59 /**
60  * Size for the hash map for DHT requests from the FS
61  * service.  Should be about the number of concurrent
62  * DHT requests we plan to make.
63  */
64 #define FS_DHT_HT_SIZE 1024
65
66 /**
67  * How often do we flush trust values to disk?
68  */
69 #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
70
71 /**
72  * How often do we at most PUT content into the DHT?
73  */
74 #define MAX_DHT_PUT_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
75
76 /**
77  * Inverse of the probability that we will submit the same query
78  * to the same peer again.  If the same peer already got the query
79  * repeatedly recently, the probability is multiplied by the inverse
80  * of this number each time.  Note that we only try about every TTL_DECREMENT/2
81  * plus MAX_CORK_DELAY (so roughly every 3.5s).
82  */
83 #define RETRY_PROBABILITY_INV 3
84
85 /**
86  * What is the maximum delay for a P2P FS message (in our interaction
87  * with core)?  FS-internal delays are another story.  The value is
88  * chosen based on the 32k block size.  Given that peers typcially
89  * have at least 1 kb/s bandwidth, 45s waits give us a chance to
90  * transmit one message even to the lowest-bandwidth peers.
91  */
92 #define MAX_TRANSMIT_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 45)
93
94 /**
95  * Maximum number of requests (from other peers) that we're
96  * willing to have pending at any given point in time.
97  */
98 static unsigned long long max_pending_requests = (32 * 1024);
99
100
101 /**
102  * Information we keep for each pending reply.  The
103  * actual message follows at the end of this struct.
104  */
105 struct PendingMessage;
106
107 /**
108  * Function called upon completion of a transmission.
109  *
110  * @param cls closure
111  * @param pid ID of receiving peer, 0 on transmission error
112  */
113 typedef void (*TransmissionContinuation)(void * cls, 
114                                          GNUNET_PEER_Id tpid);
115
116
117 /**
118  * Information we keep for each pending message (GET/PUT).  The
119  * actual message follows at the end of this struct.
120  */
121 struct PendingMessage
122 {
123   /**
124    * This is a doubly-linked list of messages to the same peer.
125    */
126   struct PendingMessage *next;
127
128   /**
129    * This is a doubly-linked list of messages to the same peer.
130    */
131   struct PendingMessage *prev;
132
133   /**
134    * Entry in pending message list for this pending message.
135    */ 
136   struct PendingMessageList *pml;  
137
138   /**
139    * Function to call immediately once we have transmitted this
140    * message.
141    */
142   TransmissionContinuation cont;
143
144   /**
145    * Closure for cont.
146    */
147   void *cont_cls;
148
149   /**
150    * Do not transmit this pending message until this deadline.
151    */
152   struct GNUNET_TIME_Absolute delay_until;
153
154   /**
155    * Size of the reply; actual reply message follows
156    * at the end of this struct.
157    */
158   size_t msize;
159   
160   /**
161    * How important is this message for us?
162    */
163   uint32_t priority;
164  
165 };
166
167
168 /**
169  * Information about a peer that we are connected to.
170  * We track data that is useful for determining which
171  * peers should receive our requests.  We also keep
172  * a list of messages to transmit to this peer.
173  */
174 struct ConnectedPeer
175 {
176
177   /**
178    * List of the last clients for which this peer successfully
179    * answered a query.
180    */
181   struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
182
183   /**
184    * List of the last PIDs for which
185    * this peer successfully answered a query;
186    * We use 0 to indicate no successful reply.
187    */
188   GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
189
190   /**
191    * Average delay between sending the peer a request and
192    * getting a reply (only calculated over the requests for
193    * which we actually got a reply).   Calculated
194    * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
195    */ 
196   struct GNUNET_TIME_Relative avg_delay;
197
198   /**
199    * Point in time until which this peer does not want us to migrate content
200    * to it.
201    */
202   struct GNUNET_TIME_Absolute migration_blocked;
203
204   /**
205    * Time until when we blocked this peer from migrating
206    * data to us.
207    */
208   struct GNUNET_TIME_Absolute last_migration_block;
209
210   /**
211    * Handle for an active request for transmission to this
212    * peer, or NULL.
213    */
214   struct GNUNET_CORE_TransmitHandle *cth;
215
216   /**
217    * Messages (replies, queries, content migration) we would like to
218    * send to this peer in the near future.  Sorted by priority, head.
219    */
220   struct PendingMessage *pending_messages_head;
221
222   /**
223    * Messages (replies, queries, content migration) we would like to
224    * send to this peer in the near future.  Sorted by priority, tail.
225    */
226   struct PendingMessage *pending_messages_tail;
227
228   /**
229    * How long does it typically take for us to transmit a message
230    * to this peer?  (delay between the request being issued and
231    * the callback being invoked).
232    */
233   struct GNUNET_LOAD_Value *transmission_delay;
234
235   /**
236    * Time when the last transmission request was issued.
237    */
238   struct GNUNET_TIME_Absolute last_transmission_request_start;
239
240   /**
241    * ID of delay task for scheduling transmission.
242    */
243   GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task;
244
245   /**
246    * Average priority of successful replies.  Calculated
247    * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
248    */
249   double avg_priority;
250
251   /**
252    * Increase in traffic preference still to be submitted
253    * to the core service for this peer.
254    */
255   uint64_t inc_preference;
256
257   /**
258    * Trust rating for this peer
259    */
260   uint32_t trust;
261
262   /**
263    * Trust rating for this peer on disk.
264    */
265   uint32_t disk_trust;
266
267   /**
268    * The peer's identity.
269    */
270   GNUNET_PEER_Id pid;  
271
272   /**
273    * Size of the linked list of 'pending_messages'.
274    */
275   unsigned int pending_requests;
276
277   /**
278    * Which offset in "last_p2p_replies" will be updated next?
279    * (we go round-robin).
280    */
281   unsigned int last_p2p_replies_woff;
282
283   /**
284    * Which offset in "last_client_replies" will be updated next?
285    * (we go round-robin).
286    */
287   unsigned int last_client_replies_woff;
288
289 };
290
291
292 /**
293  * Information we keep for each pending request.  We should try to
294  * keep this struct as small as possible since its memory consumption
295  * is key to how many requests we can have pending at once.
296  */
297 struct PendingRequest;
298
299
300 /**
301  * Doubly-linked list of requests we are performing
302  * on behalf of the same client.
303  */
304 struct ClientRequestList
305 {
306
307   /**
308    * This is a doubly-linked list.
309    */
310   struct ClientRequestList *next;
311
312   /**
313    * This is a doubly-linked list.
314    */
315   struct ClientRequestList *prev;
316
317   /**
318    * Request this entry represents.
319    */
320   struct PendingRequest *req;
321
322   /**
323    * Client list this request belongs to.
324    */
325   struct ClientList *client_list;
326
327 };
328
329
330 /**
331  * Replies to be transmitted to the client.  The actual
332  * response message is allocated after this struct.
333  */
334 struct ClientResponseMessage
335 {
336   /**
337    * This is a doubly-linked list.
338    */
339   struct ClientResponseMessage *next;
340
341   /**
342    * This is a doubly-linked list.
343    */
344   struct ClientResponseMessage *prev;
345
346   /**
347    * Client list entry this response belongs to.
348    */
349   struct ClientList *client_list;
350
351   /**
352    * Number of bytes in the response.
353    */
354   size_t msize;
355 };
356
357
358 /**
359  * Linked list of clients we are performing requests
360  * for right now.
361  */
362 struct ClientList
363 {
364   /**
365    * This is a linked list.
366    */
367   struct ClientList *next;
368
369   /**
370    * ID of a client making a request, NULL if this entry is for a
371    * peer.
372    */
373   struct GNUNET_SERVER_Client *client;
374
375   /**
376    * Head of list of requests performed on behalf
377    * of this client right now.
378    */
379   struct ClientRequestList *rl_head;
380
381   /**
382    * Tail of list of requests performed on behalf
383    * of this client right now.
384    */
385   struct ClientRequestList *rl_tail;
386
387   /**
388    * Head of linked list of responses.
389    */
390   struct ClientResponseMessage *res_head;
391
392   /**
393    * Tail of linked list of responses.
394    */
395   struct ClientResponseMessage *res_tail;
396
397   /**
398    * Context for sending replies.
399    */
400   struct GNUNET_CONNECTION_TransmitHandle *th;
401
402 };
403
404
405 /**
406  * Doubly-linked list of messages we are performing
407  * due to a pending request.
408  */
409 struct PendingMessageList
410 {
411
412   /**
413    * This is a doubly-linked list of messages on behalf of the same request.
414    */
415   struct PendingMessageList *next;
416
417   /**
418    * This is a doubly-linked list of messages on behalf of the same request.
419    */
420   struct PendingMessageList *prev;
421
422   /**
423    * Message this entry represents.
424    */
425   struct PendingMessage *pm;
426
427   /**
428    * Request this entry belongs to.
429    */
430   struct PendingRequest *req;
431
432   /**
433    * Peer this message is targeted for.
434    */
435   struct ConnectedPeer *target;
436
437 };
438
439
440 /**
441  * Information we keep for each pending request.  We should try to
442  * keep this struct as small as possible since its memory consumption
443  * is key to how many requests we can have pending at once.
444  */
445 struct PendingRequest
446 {
447
448   /**
449    * If this request was made by a client, this is our entry in the
450    * client request list; otherwise NULL.
451    */
452   struct ClientRequestList *client_request_list;
453
454   /**
455    * Entry of peer responsible for this entry (if this request
456    * was made by a peer).
457    */
458   struct ConnectedPeer *cp;
459
460   /**
461    * If this is a namespace query, pointer to the hash of the public
462    * key of the namespace; otherwise NULL.  Pointer will be to the 
463    * end of this struct (so no need to free it).
464    */
465   const GNUNET_HashCode *namespace;
466
467   /**
468    * Bloomfilter we use to filter out replies that we don't care about
469    * (anymore).  NULL as long as we are interested in all replies.
470    */
471   struct GNUNET_CONTAINER_BloomFilter *bf;
472
473   /**
474    * Context of our GNUNET_CORE_peer_change_preference call.
475    */
476   struct GNUNET_CORE_InformationRequestContext *irc;
477
478   /**
479    * Reference to DHT get operation for this request (or NULL).
480    */
481   struct GNUNET_DHT_GetHandle *dht_get;
482
483   /**
484    * Hash code of all replies that we have seen so far (only valid
485    * if client is not NULL since we only track replies like this for
486    * our own clients).
487    */
488   GNUNET_HashCode *replies_seen;
489
490   /**
491    * Node in the heap representing this entry; NULL
492    * if we have no heap node.
493    */
494   struct GNUNET_CONTAINER_HeapNode *hnode;
495
496   /**
497    * Head of list of messages being performed on behalf of this
498    * request.
499    */
500   struct PendingMessageList *pending_head;
501
502   /**
503    * Tail of list of messages being performed on behalf of this
504    * request.
505    */
506   struct PendingMessageList *pending_tail;
507
508   /**
509    * When did we first see this request (form this peer), or, if our
510    * client is initiating, when did we last initiate a search?
511    */
512   struct GNUNET_TIME_Absolute start_time;
513
514   /**
515    * The query that this request is for.
516    */
517   GNUNET_HashCode query;
518
519   /**
520    * The task responsible for transmitting queries
521    * for this request.
522    */
523   GNUNET_SCHEDULER_TaskIdentifier task;
524
525   /**
526    * (Interned) Peer identifier that identifies a preferred target
527    * for requests.
528    */
529   GNUNET_PEER_Id target_pid;
530
531   /**
532    * (Interned) Peer identifiers of peers that have already
533    * received our query for this content.
534    */
535   GNUNET_PEER_Id *used_pids;
536   
537   /**
538    * Our entry in the queue (non-NULL while we wait for our
539    * turn to interact with the local database).
540    */
541   struct GNUNET_DATASTORE_QueueEntry *qe;
542
543   /**
544    * Size of the 'bf' (in bytes).
545    */
546   size_t bf_size;
547
548   /**
549    * Desired anonymity level; only valid for requests from a local client.
550    */
551   uint32_t anonymity_level;
552
553   /**
554    * How many entries in "used_pids" are actually valid?
555    */
556   unsigned int used_pids_off;
557
558   /**
559    * How long is the "used_pids" array?
560    */
561   unsigned int used_pids_size;
562
563   /**
564    * Number of results found for this request.
565    */
566   unsigned int results_found;
567
568   /**
569    * How many entries in "replies_seen" are actually valid?
570    */
571   unsigned int replies_seen_off;
572
573   /**
574    * How long is the "replies_seen" array?
575    */
576   unsigned int replies_seen_size;
577   
578   /**
579    * Priority with which this request was made.  If one of our clients
580    * made the request, then this is the current priority that we are
581    * using when initiating the request.  This value is used when
582    * we decide to reward other peers with trust for providing a reply.
583    */
584   uint32_t priority;
585
586   /**
587    * Priority points left for us to spend when forwarding this request
588    * to other peers.
589    */
590   uint32_t remaining_priority;
591
592   /**
593    * Number to mingle hashes for bloom-filter tests with.
594    */
595   int32_t mingle;
596
597   /**
598    * TTL with which we saw this request (or, if we initiated, TTL that
599    * we used for the request).
600    */
601   int32_t ttl;
602   
603   /**
604    * Type of the content that this request is for.
605    */
606   enum GNUNET_BLOCK_Type type;
607
608   /**
609    * Remove this request after transmission of the current response.
610    */
611   int8_t do_remove;
612
613   /**
614    * GNUNET_YES if we should not forward this request to other peers.
615    */
616   int8_t local_only;
617
618   /**
619    * GNUNET_YES if we should not forward this request to other peers.
620    */
621   int8_t forward_only;
622
623 };
624
625
626 /**
627  * Block that is ready for migration to other peers.  Actual data is at the end of the block.
628  */
629 struct MigrationReadyBlock
630 {
631
632   /**
633    * This is a doubly-linked list.
634    */
635   struct MigrationReadyBlock *next;
636
637   /**
638    * This is a doubly-linked list.
639    */
640   struct MigrationReadyBlock *prev;
641
642   /**
643    * Query for the block.
644    */
645   GNUNET_HashCode query;
646
647   /**
648    * When does this block expire? 
649    */
650   struct GNUNET_TIME_Absolute expiration;
651
652   /**
653    * Peers we would consider forwarding this
654    * block to.  Zero for empty entries.
655    */
656   GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
657
658   /**
659    * Size of the block.
660    */
661   size_t size;
662
663   /**
664    *  Number of targets already used.
665    */
666   unsigned int used_targets;
667
668   /**
669    * Type of the block.
670    */
671   enum GNUNET_BLOCK_Type type;
672 };
673
674
675 /**
676  * Our connection to the datastore.
677  */
678 static struct GNUNET_DATASTORE_Handle *dsh;
679
680 /**
681  * Our block context.
682  */
683 static struct GNUNET_BLOCK_Context *block_ctx;
684
685 /**
686  * Our block configuration.
687  */
688 static struct GNUNET_CONFIGURATION_Handle *block_cfg;
689
690 /**
691  * Our scheduler.
692  */
693 static struct GNUNET_SCHEDULER_Handle *sched;
694
695 /**
696  * Our configuration.
697  */
698 static const struct GNUNET_CONFIGURATION_Handle *cfg;
699
700 /**
701  * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
702  */
703 static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
704
705 /**
706  * Map of peer identifiers to "struct PendingRequest" (for that peer).
707  */
708 static struct GNUNET_CONTAINER_MultiHashMap *peer_request_map;
709
710 /**
711  * Map of query identifiers to "struct PendingRequest" (for that query).
712  */
713 static struct GNUNET_CONTAINER_MultiHashMap *query_request_map;
714
715 /**
716  * Heap with the request that will expire next at the top.  Contains
717  * pointers of type "struct PendingRequest*"; these will *also* be
718  * aliased from the "requests_by_peer" data structures and the
719  * "requests_by_query" table.  Note that requests from our clients
720  * don't expire and are thus NOT in the "requests_by_expiration"
721  * (or the "requests_by_peer" tables).
722  */
723 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
724
725 /**
726  * Handle for reporting statistics.
727  */
728 static struct GNUNET_STATISTICS_Handle *stats;
729
730 /**
731  * Linked list of clients we are currently processing requests for.
732  */
733 static struct ClientList *client_list;
734
735 /**
736  * Pointer to handle to the core service (points to NULL until we've
737  * connected to it).
738  */
739 static struct GNUNET_CORE_Handle *core;
740
741 /**
742  * Head of linked list of blocks that can be migrated.
743  */
744 static struct MigrationReadyBlock *mig_head;
745
746 /**
747  * Tail of linked list of blocks that can be migrated.
748  */
749 static struct MigrationReadyBlock *mig_tail;
750
751 /**
752  * Request to datastore for migration (or NULL).
753  */
754 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
755
756 /**
757  * Request to datastore for DHT PUTs (or NULL).
758  */
759 static struct GNUNET_DATASTORE_QueueEntry *dht_qe;
760
761 /**
762  * Type we will request for the next DHT PUT round from the datastore.
763  */
764 static enum GNUNET_BLOCK_Type dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
765
766 /**
767  * Where do we store trust information?
768  */
769 static char *trustDirectory;
770
771 /**
772  * ID of task that collects blocks for migration.
773  */
774 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
775
776 /**
777  * ID of task that collects blocks for DHT PUTs.
778  */
779 static GNUNET_SCHEDULER_TaskIdentifier dht_task;
780
781 /**
782  * What is the maximum frequency at which we are allowed to
783  * poll the datastore for migration content?
784  */
785 static struct GNUNET_TIME_Relative min_migration_delay;
786
787 /**
788  * Handle for DHT operations.
789  */
790 static struct GNUNET_DHT_Handle *dht_handle;
791
792 /**
793  * Size of the doubly-linked list of migration blocks.
794  */
795 static unsigned int mig_size;
796
797 /**
798  * Are we allowed to migrate content to this peer.
799  */
800 static int active_migration;
801
802 /**
803  * How many entires with zero anonymity do we currently estimate
804  * to have in the database?
805  */
806 static unsigned int zero_anonymity_count_estimate;
807
808 /**
809  * Typical priorities we're seeing from other peers right now.  Since
810  * most priorities will be zero, this value is the weighted average of
811  * non-zero priorities seen "recently".  In order to ensure that new
812  * values do not dramatically change the ratio, values are first
813  * "capped" to a reasonable range (+N of the current value) and then
814  * averaged into the existing value by a ratio of 1:N.  Hence
815  * receiving the largest possible priority can still only raise our
816  * "current_priorities" by at most 1.
817  */
818 static double current_priorities;
819
820 /**
821  * Datastore 'GET' load tracking.
822  */
823 static struct GNUNET_LOAD_Value *datastore_get_load;
824
825 /**
826  * Datastore 'PUT' load tracking.
827  */
828 static struct GNUNET_LOAD_Value *datastore_put_load;
829
830 /**
831  * How long do requests typically stay in the routing table?
832  */
833 static struct GNUNET_LOAD_Value *rt_entry_lifetime;
834
835 /**
836  * We've just now completed a datastore request.  Update our
837  * datastore load calculations.
838  *
839  * @param start time when the datastore request was issued
840  */
841 static void
842 update_datastore_delays (struct GNUNET_TIME_Absolute start)
843 {
844   struct GNUNET_TIME_Relative delay;
845
846   delay = GNUNET_TIME_absolute_get_duration (start);
847   GNUNET_LOAD_update (datastore_get_load,
848                       delay.value);
849 }
850
851
852 /**
853  * Get the filename under which we would store the GNUNET_HELLO_Message
854  * for the given host and protocol.
855  * @return filename of the form DIRECTORY/HOSTID
856  */
857 static char *
858 get_trust_filename (const struct GNUNET_PeerIdentity *id)
859 {
860   struct GNUNET_CRYPTO_HashAsciiEncoded fil;
861   char *fn;
862
863   GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
864   GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil);
865   return fn;
866 }
867
868
869
870 /**
871  * Transmit messages by copying it to the target buffer
872  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
873  * for writing in the meantime.  In that case, do nothing
874  * (the disconnect or shutdown handler will take care of the rest).
875  * If we were able to transmit messages and there are still more
876  * pending, ask core again for further calls to this function.
877  *
878  * @param cls closure, pointer to the 'struct ConnectedPeer*'
879  * @param size number of bytes available in buf
880  * @param buf where the callee should write the message
881  * @return number of bytes written to buf
882  */
883 static size_t
884 transmit_to_peer (void *cls,
885                   size_t size, void *buf);
886
887
888 /* ******************* clean up functions ************************ */
889
890 /**
891  * Delete the given migration block.
892  *
893  * @param mb block to delete
894  */
895 static void
896 delete_migration_block (struct MigrationReadyBlock *mb)
897 {
898   GNUNET_CONTAINER_DLL_remove (mig_head,
899                                mig_tail,
900                                mb);
901   GNUNET_PEER_decrement_rcs (mb->target_list,
902                              MIGRATION_LIST_SIZE);
903   mig_size--;
904   GNUNET_free (mb);
905 }
906
907
908 /**
909  * Compare the distance of two peers to a key.
910  *
911  * @param key key
912  * @param p1 first peer
913  * @param p2 second peer
914  * @return GNUNET_YES if P1 is closer to key than P2
915  */
916 static int
917 is_closer (const GNUNET_HashCode *key,
918            const struct GNUNET_PeerIdentity *p1,
919            const struct GNUNET_PeerIdentity *p2)
920 {
921   return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey,
922                                     &p2->hashPubKey,
923                                     key);
924 }
925
926
927 /**
928  * Consider migrating content to a given peer.
929  *
930  * @param cls 'struct MigrationReadyBlock*' to select
931  *            targets for (or NULL for none)
932  * @param key ID of the peer 
933  * @param value 'struct ConnectedPeer' of the peer
934  * @return GNUNET_YES (always continue iteration)
935  */
936 static int
937 consider_migration (void *cls,
938                     const GNUNET_HashCode *key,
939                     void *value)
940 {
941   struct MigrationReadyBlock *mb = cls;
942   struct ConnectedPeer *cp = value;
943   struct MigrationReadyBlock *pos;
944   struct GNUNET_PeerIdentity cppid;
945   struct GNUNET_PeerIdentity otherpid;
946   struct GNUNET_PeerIdentity worstpid;
947   size_t msize;
948   unsigned int i;
949   unsigned int repl;
950   
951   /* consider 'cp' as a migration target for mb */
952   if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).value > 0)
953     return GNUNET_YES; /* peer has requested no migration! */
954   if (mb != NULL)
955     {
956       GNUNET_PEER_resolve (cp->pid,
957                            &cppid);
958       repl = MIGRATION_LIST_SIZE;
959       for (i=0;i<MIGRATION_LIST_SIZE;i++)
960         {
961           if (mb->target_list[i] == 0)
962             {
963               mb->target_list[i] = cp->pid;
964               GNUNET_PEER_change_rc (mb->target_list[i], 1);
965               repl = MIGRATION_LIST_SIZE;
966               break;
967             }
968           GNUNET_PEER_resolve (mb->target_list[i],
969                                &otherpid);
970           if ( (repl == MIGRATION_LIST_SIZE) &&
971                is_closer (&mb->query,
972                           &cppid,
973                           &otherpid)) 
974             {
975               repl = i;
976               worstpid = otherpid;
977             }
978           else if ( (repl != MIGRATION_LIST_SIZE) &&
979                     (is_closer (&mb->query,
980                                 &worstpid,
981                                 &otherpid) ) )
982             {
983               repl = i;
984               worstpid = otherpid;
985             }       
986         }
987       if (repl != MIGRATION_LIST_SIZE) 
988         {
989           GNUNET_PEER_change_rc (mb->target_list[repl], -1);
990           mb->target_list[repl] = cp->pid;
991           GNUNET_PEER_change_rc (mb->target_list[repl], 1);
992         }
993     }
994
995   /* consider scheduling transmission to cp for content migration */
996   if (cp->cth != NULL)        
997     return GNUNET_YES; 
998   msize = 0;
999   pos = mig_head;
1000   while (pos != NULL)
1001     {
1002       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1003         {
1004           if (cp->pid == pos->target_list[i])
1005             {
1006               if (msize == 0)
1007                 msize = pos->size;
1008               else
1009                 msize = GNUNET_MIN (msize,
1010                                     pos->size);
1011               break;
1012             }
1013         }
1014       pos = pos->next;
1015     }
1016   if (msize == 0)
1017     return GNUNET_YES; /* no content available */
1018 #if DEBUG_FS
1019   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1020               "Trying to migrate at least %u bytes to peer `%s'\n",
1021               msize,
1022               GNUNET_h2s (key));
1023 #endif
1024   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
1025     {
1026       GNUNET_SCHEDULER_cancel (sched, cp->delayed_transmission_request_task);
1027       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
1028     }
1029   cp->cth 
1030     = GNUNET_CORE_notify_transmit_ready (core,
1031                                          0, GNUNET_TIME_UNIT_FOREVER_REL,
1032                                          (const struct GNUNET_PeerIdentity*) key,
1033                                          msize + sizeof (struct PutMessage),
1034                                          &transmit_to_peer,
1035                                          cp);
1036   return GNUNET_YES;
1037 }
1038
1039
1040 /**
1041  * Task that is run periodically to obtain blocks for content
1042  * migration
1043  * 
1044  * @param cls unused
1045  * @param tc scheduler context (also unused)
1046  */
1047 static void
1048 gather_migration_blocks (void *cls,
1049                          const struct GNUNET_SCHEDULER_TaskContext *tc);
1050
1051
1052
1053
1054 /**
1055  * Task that is run periodically to obtain blocks for DHT PUTs.
1056  * 
1057  * @param cls type of blocks to gather
1058  * @param tc scheduler context (unused)
1059  */
1060 static void
1061 gather_dht_put_blocks (void *cls,
1062                        const struct GNUNET_SCHEDULER_TaskContext *tc);
1063
1064
1065 /**
1066  * If the migration task is not currently running, consider
1067  * (re)scheduling it with the appropriate delay.
1068  */
1069 static void
1070 consider_migration_gathering ()
1071 {
1072   struct GNUNET_TIME_Relative delay;
1073
1074   if (dsh == NULL)
1075     return;
1076   if (mig_qe != NULL)
1077     return;
1078   if (mig_task != GNUNET_SCHEDULER_NO_TASK)
1079     return;
1080   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
1081                                          mig_size);
1082   delay = GNUNET_TIME_relative_divide (delay,
1083                                        MAX_MIGRATION_QUEUE);
1084   delay = GNUNET_TIME_relative_max (delay,
1085                                     min_migration_delay);
1086   mig_task = GNUNET_SCHEDULER_add_delayed (sched,
1087                                            delay,
1088                                            &gather_migration_blocks,
1089                                            NULL);
1090 }
1091
1092
1093 /**
1094  * If the DHT PUT gathering task is not currently running, consider
1095  * (re)scheduling it with the appropriate delay.
1096  */
1097 static void
1098 consider_dht_put_gathering (void *cls)
1099 {
1100   struct GNUNET_TIME_Relative delay;
1101
1102   if (dsh == NULL)
1103     return;
1104   if (dht_qe != NULL)
1105     return;
1106   if (dht_task != GNUNET_SCHEDULER_NO_TASK)
1107     return;
1108   if (zero_anonymity_count_estimate > 0)
1109     {
1110       delay = GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY,
1111                                            zero_anonymity_count_estimate);
1112       delay = GNUNET_TIME_relative_min (delay,
1113                                         MAX_DHT_PUT_FREQ);
1114     }
1115   else
1116     {
1117       /* if we have NO zero-anonymity content yet, wait 5 minutes for some to
1118          (hopefully) appear */
1119       delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5);
1120     }
1121   dht_task = GNUNET_SCHEDULER_add_delayed (sched,
1122                                            delay,
1123                                            &gather_dht_put_blocks,
1124                                            cls);
1125 }
1126
1127
1128 /**
1129  * Process content offered for migration.
1130  *
1131  * @param cls closure
1132  * @param key key for the content
1133  * @param size number of bytes in data
1134  * @param data content stored
1135  * @param type type of the content
1136  * @param priority priority of the content
1137  * @param anonymity anonymity-level for the content
1138  * @param expiration expiration time for the content
1139  * @param uid unique identifier for the datum;
1140  *        maybe 0 if no unique identifier is available
1141  */
1142 static void
1143 process_migration_content (void *cls,
1144                            const GNUNET_HashCode * key,
1145                            size_t size,
1146                            const void *data,
1147                            enum GNUNET_BLOCK_Type type,
1148                            uint32_t priority,
1149                            uint32_t anonymity,
1150                            struct GNUNET_TIME_Absolute
1151                            expiration, uint64_t uid)
1152 {
1153   struct MigrationReadyBlock *mb;
1154   
1155   if (key == NULL)
1156     {
1157       mig_qe = NULL;
1158       if (mig_size < MAX_MIGRATION_QUEUE)  
1159         consider_migration_gathering ();
1160       return;
1161     }
1162   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1163     {
1164       if (GNUNET_OK !=
1165           GNUNET_FS_handle_on_demand_block (key, size, data,
1166                                             type, priority, anonymity,
1167                                             expiration, uid, 
1168                                             &process_migration_content,
1169                                             NULL))
1170         {
1171           GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1172         }
1173       return;
1174     }
1175 #if DEBUG_FS
1176   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1177               "Retrieved block `%s' of type %u for migration\n",
1178               GNUNET_h2s (key),
1179               type);
1180 #endif
1181   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
1182   mb->query = *key;
1183   mb->expiration = expiration;
1184   mb->size = size;
1185   mb->type = type;
1186   memcpy (&mb[1], data, size);
1187   GNUNET_CONTAINER_DLL_insert_after (mig_head,
1188                                      mig_tail,
1189                                      mig_tail,
1190                                      mb);
1191   mig_size++;
1192   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1193                                          &consider_migration,
1194                                          mb);
1195   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1196 }
1197
1198
1199 /**
1200  * Function called upon completion of the DHT PUT operation.
1201  */
1202 static void
1203 dht_put_continuation (void *cls,
1204                       const struct GNUNET_SCHEDULER_TaskContext *tc)
1205 {
1206   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1207 }
1208
1209
1210 /**
1211  * Store content in DHT.
1212  *
1213  * @param cls closure
1214  * @param key key for the content
1215  * @param size number of bytes in data
1216  * @param data content stored
1217  * @param type type of the content
1218  * @param priority priority of the content
1219  * @param anonymity anonymity-level for the content
1220  * @param expiration expiration time for the content
1221  * @param uid unique identifier for the datum;
1222  *        maybe 0 if no unique identifier is available
1223  */
1224 static void
1225 process_dht_put_content (void *cls,
1226                          const GNUNET_HashCode * key,
1227                          size_t size,
1228                          const void *data,
1229                          enum GNUNET_BLOCK_Type type,
1230                          uint32_t priority,
1231                          uint32_t anonymity,
1232                          struct GNUNET_TIME_Absolute
1233                          expiration, uint64_t uid)
1234
1235   static unsigned int counter;
1236   static GNUNET_HashCode last_vhash;
1237   static GNUNET_HashCode vhash;
1238
1239   if (key == NULL)
1240     {
1241       dht_qe = NULL;
1242       consider_dht_put_gathering (cls);
1243       return;
1244     }
1245   /* slightly funky code to estimate the total number of values with zero
1246      anonymity from the maximum observed length of a monotonically increasing 
1247      sequence of hashes over the contents */
1248   GNUNET_CRYPTO_hash (data, size, &vhash);
1249   if (GNUNET_CRYPTO_hash_cmp (&vhash, &last_vhash) <= 0)
1250     {
1251       if (zero_anonymity_count_estimate > 0)
1252         zero_anonymity_count_estimate /= 2;
1253       counter = 0;
1254     }
1255   last_vhash = vhash;
1256   if (counter < 31)
1257     counter++;
1258   if (zero_anonymity_count_estimate < (1 << counter))
1259     zero_anonymity_count_estimate = (1 << counter);
1260 #if DEBUG_FS
1261   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1262               "Retrieved block `%s' of type %u for DHT PUT\n",
1263               GNUNET_h2s (key),
1264               type);
1265 #endif
1266   GNUNET_DHT_put (dht_handle,
1267                   key,
1268                   GNUNET_DHT_RO_NONE,
1269                   type,
1270                   size,
1271                   data,
1272                   expiration,
1273                   GNUNET_TIME_UNIT_FOREVER_REL,
1274                   &dht_put_continuation,
1275                   cls);
1276 }
1277
1278
1279 /**
1280  * Task that is run periodically to obtain blocks for content
1281  * migration
1282  * 
1283  * @param cls unused
1284  * @param tc scheduler context (also unused)
1285  */
1286 static void
1287 gather_migration_blocks (void *cls,
1288                          const struct GNUNET_SCHEDULER_TaskContext *tc)
1289 {
1290   mig_task = GNUNET_SCHEDULER_NO_TASK;
1291   if (dsh != NULL)
1292     {
1293       mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, UINT_MAX,
1294                                             GNUNET_TIME_UNIT_FOREVER_REL,
1295                                             &process_migration_content, NULL);
1296       GNUNET_assert (mig_qe != NULL);
1297     }
1298 }
1299
1300
1301 /**
1302  * Task that is run periodically to obtain blocks for DHT PUTs.
1303  * 
1304  * @param cls type of blocks to gather
1305  * @param tc scheduler context (unused)
1306  */
1307 static void
1308 gather_dht_put_blocks (void *cls,
1309                        const struct GNUNET_SCHEDULER_TaskContext *tc)
1310 {
1311   dht_task = GNUNET_SCHEDULER_NO_TASK;
1312   if (dsh != NULL)
1313     {
1314       if (dht_put_type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1315         dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
1316       dht_qe = GNUNET_DATASTORE_get_zero_anonymity (dsh, 0, UINT_MAX,
1317                                                     GNUNET_TIME_UNIT_FOREVER_REL,
1318                                                     dht_put_type++,
1319                                                     &process_dht_put_content, NULL);
1320       GNUNET_assert (dht_qe != NULL);
1321     }
1322 }
1323
1324
1325 /**
1326  * We're done with a particular message list entry.
1327  * Free all associated resources.
1328  * 
1329  * @param pml entry to destroy
1330  */
1331 static void
1332 destroy_pending_message_list_entry (struct PendingMessageList *pml)
1333 {
1334   GNUNET_CONTAINER_DLL_remove (pml->req->pending_head,
1335                                pml->req->pending_tail,
1336                                pml);
1337   GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head,
1338                                pml->target->pending_messages_tail,
1339                                pml->pm);
1340   pml->target->pending_requests--;
1341   GNUNET_free (pml->pm);
1342   GNUNET_free (pml);
1343 }
1344
1345
1346 /**
1347  * Destroy the given pending message (and call the respective
1348  * continuation).
1349  *
1350  * @param pm message to destroy
1351  * @param tpid id of peer that the message was delivered to, or 0 for none
1352  */
1353 static void
1354 destroy_pending_message (struct PendingMessage *pm,
1355                          GNUNET_PEER_Id tpid)
1356 {
1357   struct PendingMessageList *pml = pm->pml;
1358   TransmissionContinuation cont;
1359   void *cont_cls;
1360
1361   cont = pm->cont;
1362   cont_cls = pm->cont_cls;
1363   if (pml != NULL)
1364     {
1365       GNUNET_assert (pml->pm == pm);
1366       GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
1367       destroy_pending_message_list_entry (pml);
1368     }
1369   else
1370     {
1371       GNUNET_free (pm);
1372     }
1373   if (cont != NULL)
1374     cont (cont_cls, tpid);  
1375 }
1376
1377
1378 /**
1379  * We're done processing a particular request.
1380  * Free all associated resources.
1381  *
1382  * @param pr request to destroy
1383  */
1384 static void
1385 destroy_pending_request (struct PendingRequest *pr)
1386 {
1387   struct GNUNET_PeerIdentity pid;
1388
1389   if (pr->hnode != NULL)
1390     {
1391       GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
1392                                          pr->hnode);
1393       pr->hnode = NULL;
1394     }
1395   if (NULL == pr->client_request_list)
1396     {
1397       GNUNET_STATISTICS_update (stats,
1398                                 gettext_noop ("# P2P searches active"),
1399                                 -1,
1400                                 GNUNET_NO);
1401     }
1402   else
1403     {
1404       GNUNET_STATISTICS_update (stats,
1405                                 gettext_noop ("# client searches active"),
1406                                 -1,
1407                                 GNUNET_NO);
1408     }
1409   if (GNUNET_YES == 
1410       GNUNET_CONTAINER_multihashmap_remove (query_request_map,
1411                                             &pr->query,
1412                                             pr))
1413     {
1414       GNUNET_LOAD_update (rt_entry_lifetime,
1415                           GNUNET_TIME_absolute_get_duration (pr->start_time).value);
1416     }
1417   if (pr->qe != NULL)
1418      {
1419       GNUNET_DATASTORE_cancel (pr->qe);
1420       pr->qe = NULL;
1421     }
1422   if (pr->dht_get != NULL)
1423     {
1424       GNUNET_DHT_get_stop (pr->dht_get);
1425       pr->dht_get = NULL;
1426     }
1427   if (pr->client_request_list != NULL)
1428     {
1429       GNUNET_CONTAINER_DLL_remove (pr->client_request_list->client_list->rl_head,
1430                                    pr->client_request_list->client_list->rl_tail,
1431                                    pr->client_request_list);
1432       GNUNET_free (pr->client_request_list);
1433       pr->client_request_list = NULL;
1434     }
1435   if (pr->cp != NULL)
1436     {
1437       GNUNET_PEER_resolve (pr->cp->pid,
1438                            &pid);
1439       (void) GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1440                                                    &pid.hashPubKey,
1441                                                    pr);
1442       pr->cp = NULL;
1443     }
1444   if (pr->bf != NULL)
1445     {
1446       GNUNET_CONTAINER_bloomfilter_free (pr->bf);                                        
1447       pr->bf = NULL;
1448     }
1449   if (pr->irc != NULL)
1450     {
1451       GNUNET_CORE_peer_change_preference_cancel (pr->irc);
1452       pr->irc = NULL;
1453     }
1454   if (pr->replies_seen != NULL)
1455     {
1456       GNUNET_free (pr->replies_seen);
1457       pr->replies_seen = NULL;
1458     }
1459   if (pr->task != GNUNET_SCHEDULER_NO_TASK)
1460     {
1461       GNUNET_SCHEDULER_cancel (sched,
1462                                pr->task);
1463       pr->task = GNUNET_SCHEDULER_NO_TASK;
1464     }
1465   while (NULL != pr->pending_head)    
1466     destroy_pending_message_list_entry (pr->pending_head);
1467   GNUNET_PEER_change_rc (pr->target_pid, -1);
1468   if (pr->used_pids != NULL)
1469     {
1470       GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
1471       GNUNET_free (pr->used_pids);
1472       pr->used_pids_off = 0;
1473       pr->used_pids_size = 0;
1474       pr->used_pids = NULL;
1475     }
1476   GNUNET_free (pr);
1477 }
1478
1479
1480 /**
1481  * Method called whenever a given peer connects.
1482  *
1483  * @param cls closure, not used
1484  * @param peer peer identity this notification is about
1485  * @param latency reported latency of the connection with 'other'
1486  * @param distance reported distance (DV) to 'other' 
1487  */
1488 static void 
1489 peer_connect_handler (void *cls,
1490                       const struct
1491                       GNUNET_PeerIdentity * peer,
1492                       struct GNUNET_TIME_Relative latency,
1493                       uint32_t distance)
1494 {
1495   struct ConnectedPeer *cp;
1496   struct MigrationReadyBlock *pos;
1497   char *fn;
1498   uint32_t trust;
1499   
1500   cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
1501   cp->transmission_delay = GNUNET_LOAD_value_init ();
1502   cp->pid = GNUNET_PEER_intern (peer);
1503
1504   fn = get_trust_filename (peer);
1505   if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
1506       (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
1507     cp->disk_trust = cp->trust = ntohl (trust);
1508   GNUNET_free (fn);
1509
1510   GNUNET_break (GNUNET_OK ==
1511                 GNUNET_CONTAINER_multihashmap_put (connected_peers,
1512                                                    &peer->hashPubKey,
1513                                                    cp,
1514                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1515
1516   pos = mig_head;
1517   while (NULL != pos)
1518     {
1519       (void) consider_migration (pos, &peer->hashPubKey, cp);
1520       pos = pos->next;
1521     }
1522 }
1523
1524
1525 /**
1526  * Increase the host credit by a value.
1527  *
1528  * @param host which peer to change the trust value on
1529  * @param value is the int value by which the
1530  *  host credit is to be increased or decreased
1531  * @returns the actual change in trust (positive or negative)
1532  */
1533 static int
1534 change_host_trust (struct ConnectedPeer *host, int value)
1535 {
1536   unsigned int old_trust;
1537
1538   if (value == 0)
1539     return 0;
1540   GNUNET_assert (host != NULL);
1541   old_trust = host->trust;
1542   if (value > 0)
1543     {
1544       if (host->trust + value < host->trust)
1545         {
1546           value = UINT32_MAX - host->trust;
1547           host->trust = UINT32_MAX;
1548         }
1549       else
1550         host->trust += value;
1551     }
1552   else
1553     {
1554       if (host->trust < -value)
1555         {
1556           value = -host->trust;
1557           host->trust = 0;
1558         }
1559       else
1560         host->trust += value;
1561     }
1562   return value;
1563 }
1564
1565
1566 /**
1567  * Write host-trust information to a file - flush the buffer entry!
1568  */
1569 static int
1570 flush_trust (void *cls,
1571              const GNUNET_HashCode *key,
1572              void *value)
1573 {
1574   struct ConnectedPeer *host = value;
1575   char *fn;
1576   uint32_t trust;
1577   struct GNUNET_PeerIdentity pid;
1578
1579   if (host->trust == host->disk_trust)
1580     return GNUNET_OK;                     /* unchanged */
1581   GNUNET_PEER_resolve (host->pid,
1582                        &pid);
1583   fn = get_trust_filename (&pid);
1584   if (host->trust == 0)
1585     {
1586       if ((0 != UNLINK (fn)) && (errno != ENOENT))
1587         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
1588                                   GNUNET_ERROR_TYPE_BULK, "unlink", fn);
1589     }
1590   else
1591     {
1592       trust = htonl (host->trust);
1593       if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust, 
1594                                                     sizeof(uint32_t),
1595                                                     GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE
1596                                                     | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ))
1597         host->disk_trust = host->trust;
1598     }
1599   GNUNET_free (fn);
1600   return GNUNET_OK;
1601 }
1602
1603 /**
1604  * Call this method periodically to scan data/hosts for new hosts.
1605  */
1606 static void
1607 cron_flush_trust (void *cls,
1608                   const struct GNUNET_SCHEDULER_TaskContext *tc)
1609 {
1610
1611   if (NULL == connected_peers)
1612     return;
1613   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1614                                          &flush_trust,
1615                                          NULL);
1616   if (NULL == tc)
1617     return;
1618   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1619     return;
1620   GNUNET_SCHEDULER_add_delayed (tc->sched,
1621                                 TRUST_FLUSH_FREQ, &cron_flush_trust, NULL);
1622 }
1623
1624
1625 /**
1626  * Free (each) request made by the peer.
1627  *
1628  * @param cls closure, points to peer that the request belongs to
1629  * @param key current key code
1630  * @param value value in the hash map
1631  * @return GNUNET_YES (we should continue to iterate)
1632  */
1633 static int
1634 destroy_request (void *cls,
1635                  const GNUNET_HashCode * key,
1636                  void *value)
1637 {
1638   const struct GNUNET_PeerIdentity * peer = cls;
1639   struct PendingRequest *pr = value;
1640   
1641   GNUNET_break (GNUNET_YES ==
1642                 GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1643                                                       &peer->hashPubKey,
1644                                                       pr));
1645   destroy_pending_request (pr);
1646   return GNUNET_YES;
1647 }
1648
1649
1650 /**
1651  * Method called whenever a peer disconnects.
1652  *
1653  * @param cls closure, not used
1654  * @param peer peer identity this notification is about
1655  */
1656 static void
1657 peer_disconnect_handler (void *cls,
1658                          const struct
1659                          GNUNET_PeerIdentity * peer)
1660 {
1661   struct ConnectedPeer *cp;
1662   struct PendingMessage *pm;
1663   unsigned int i;
1664   struct MigrationReadyBlock *pos;
1665   struct MigrationReadyBlock *next;
1666
1667   GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map,
1668                                               &peer->hashPubKey,
1669                                               &destroy_request,
1670                                               (void*) peer);
1671   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1672                                           &peer->hashPubKey);
1673   if (cp == NULL)
1674     return;
1675   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1676     {
1677       if (NULL != cp->last_client_replies[i])
1678         {
1679           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1680           cp->last_client_replies[i] = NULL;
1681         }
1682     }
1683   GNUNET_break (GNUNET_YES ==
1684                 GNUNET_CONTAINER_multihashmap_remove (connected_peers,
1685                                                       &peer->hashPubKey,
1686                                                       cp));
1687   /* remove this peer from migration considerations; schedule
1688      alternatives */
1689   next = mig_head;
1690   while (NULL != (pos = next))
1691     {
1692       next = pos->next;
1693       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1694         {
1695           if (pos->target_list[i] == cp->pid)
1696             {
1697               GNUNET_PEER_change_rc (pos->target_list[i], -1);
1698               pos->target_list[i] = 0;
1699             }
1700          }
1701       if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers))
1702         {
1703           delete_migration_block (pos);
1704           consider_migration_gathering ();
1705           continue;
1706         }
1707       GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1708                                              &consider_migration,
1709                                              pos);
1710     }
1711   GNUNET_PEER_change_rc (cp->pid, -1);
1712   GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1713   if (NULL != cp->cth)
1714     {
1715       GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1716       cp->cth = NULL;
1717     }
1718   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
1719     {
1720       GNUNET_SCHEDULER_cancel (sched, cp->delayed_transmission_request_task);
1721       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
1722     }
1723   while (NULL != (pm = cp->pending_messages_head))
1724     destroy_pending_message (pm, 0 /* delivery failed */);
1725   GNUNET_LOAD_value_free (cp->transmission_delay);
1726   GNUNET_break (0 == cp->pending_requests);
1727   GNUNET_free (cp);
1728 }
1729
1730
1731 /**
1732  * Iterator over hash map entries that removes all occurences
1733  * of the given 'client' from the 'last_client_replies' of the
1734  * given connected peer.
1735  *
1736  * @param cls closure, the 'struct GNUNET_SERVER_Client*' to remove
1737  * @param key current key code (unused)
1738  * @param value value in the hash map (the 'struct ConnectedPeer*' to change)
1739  * @return GNUNET_YES (we should continue to iterate)
1740  */
1741 static int
1742 remove_client_from_last_client_replies (void *cls,
1743                                         const GNUNET_HashCode * key,
1744                                         void *value)
1745 {
1746   struct GNUNET_SERVER_Client *client = cls;
1747   struct ConnectedPeer *cp = value;
1748   unsigned int i;
1749
1750   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1751     {
1752       if (cp->last_client_replies[i] == client)
1753         {
1754           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1755           cp->last_client_replies[i] = NULL;
1756         }
1757     }  
1758   return GNUNET_YES;
1759 }
1760
1761
1762 /**
1763  * A client disconnected.  Remove all of its pending queries.
1764  *
1765  * @param cls closure, NULL
1766  * @param client identification of the client
1767  */
1768 static void
1769 handle_client_disconnect (void *cls,
1770                           struct GNUNET_SERVER_Client
1771                           * client)
1772 {
1773   struct ClientList *pos;
1774   struct ClientList *prev;
1775   struct ClientRequestList *rcl;
1776   struct ClientResponseMessage *creply;
1777
1778   if (client == NULL)
1779     return;
1780   prev = NULL;
1781   pos = client_list;
1782   while ( (NULL != pos) &&
1783           (pos->client != client) )
1784     {
1785       prev = pos;
1786       pos = pos->next;
1787     }
1788   if (pos == NULL)
1789     return; /* no requests pending for this client */
1790   while (NULL != (rcl = pos->rl_head))
1791     {
1792       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1793                   "Destroying pending request `%s' on disconnect\n",
1794                   GNUNET_h2s (&rcl->req->query));
1795       destroy_pending_request (rcl->req);
1796     }
1797   if (prev == NULL)
1798     client_list = pos->next;
1799   else
1800     prev->next = pos->next;
1801   if (pos->th != NULL)
1802     {
1803       GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
1804       pos->th = NULL;
1805     }
1806   while (NULL != (creply = pos->res_head))
1807     {
1808       GNUNET_CONTAINER_DLL_remove (pos->res_head,
1809                                    pos->res_tail,
1810                                    creply);
1811       GNUNET_free (creply);
1812     }    
1813   GNUNET_SERVER_client_drop (pos->client);
1814   GNUNET_free (pos);
1815   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1816                                          &remove_client_from_last_client_replies,
1817                                          client);
1818 }
1819
1820
1821 /**
1822  * Iterator to free peer entries.
1823  *
1824  * @param cls closure, unused
1825  * @param key current key code
1826  * @param value value in the hash map (peer entry)
1827  * @return GNUNET_YES (we should continue to iterate)
1828  */
1829 static int 
1830 clean_peer (void *cls,
1831             const GNUNET_HashCode * key,
1832             void *value)
1833 {
1834   peer_disconnect_handler (NULL, (const struct GNUNET_PeerIdentity*) key);
1835   return GNUNET_YES;
1836 }
1837
1838
1839 /**
1840  * Task run during shutdown.
1841  *
1842  * @param cls unused
1843  * @param tc unused
1844  */
1845 static void
1846 shutdown_task (void *cls,
1847                const struct GNUNET_SCHEDULER_TaskContext *tc)
1848 {
1849   if (mig_qe != NULL)
1850     {
1851       GNUNET_DATASTORE_cancel (mig_qe);
1852       mig_qe = NULL;
1853     }
1854   if (dht_qe != NULL)
1855     {
1856       GNUNET_DATASTORE_cancel (dht_qe);
1857       dht_qe = NULL;
1858     }
1859   if (GNUNET_SCHEDULER_NO_TASK != mig_task)
1860     {
1861       GNUNET_SCHEDULER_cancel (sched, mig_task);
1862       mig_task = GNUNET_SCHEDULER_NO_TASK;
1863     }
1864   if (GNUNET_SCHEDULER_NO_TASK != dht_task)
1865     {
1866       GNUNET_SCHEDULER_cancel (sched, dht_task);
1867       dht_task = GNUNET_SCHEDULER_NO_TASK;
1868     }
1869   while (client_list != NULL)
1870     handle_client_disconnect (NULL,
1871                               client_list->client);
1872   cron_flush_trust (NULL, NULL);
1873   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1874                                          &clean_peer,
1875                                          NULL);
1876   GNUNET_break (0 == GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap));
1877   GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
1878   requests_by_expiration_heap = 0;
1879   GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
1880   connected_peers = NULL;
1881   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map));
1882   GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
1883   query_request_map = NULL;
1884   GNUNET_LOAD_value_free (rt_entry_lifetime);
1885   rt_entry_lifetime = NULL;
1886   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map));
1887   GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
1888   peer_request_map = NULL;
1889   GNUNET_assert (NULL != core);
1890   GNUNET_CORE_disconnect (core);
1891   core = NULL;
1892   if (stats != NULL)
1893     {
1894       GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
1895       stats = NULL;
1896     }
1897   if (dsh != NULL)
1898     {
1899       GNUNET_DATASTORE_disconnect (dsh,
1900                                    GNUNET_NO);
1901       dsh = NULL;
1902     }
1903   while (mig_head != NULL)
1904     delete_migration_block (mig_head);
1905   GNUNET_assert (0 == mig_size);
1906   GNUNET_DHT_disconnect (dht_handle);
1907   dht_handle = NULL;
1908   GNUNET_LOAD_value_free (datastore_get_load);
1909   datastore_get_load = NULL;
1910   GNUNET_LOAD_value_free (datastore_put_load);
1911   datastore_put_load = NULL;
1912   GNUNET_BLOCK_context_destroy (block_ctx);
1913   block_ctx = NULL;
1914   GNUNET_CONFIGURATION_destroy (block_cfg);
1915   block_cfg = NULL;
1916   sched = NULL;
1917   cfg = NULL;  
1918   GNUNET_free_non_null (trustDirectory);
1919   trustDirectory = NULL;
1920 }
1921
1922
1923 /* ******************* Utility functions  ******************** */
1924
1925
1926 /**
1927  * We've had to delay a request for transmission to core, but now
1928  * we should be ready.  Run it.
1929  *
1930  * @param cls the 'struct ConnectedPeer' for which a request was delayed
1931  * @param tc task context (unused)
1932  */
1933 static void
1934 delayed_transmission_request (void *cls,
1935                               const struct GNUNET_SCHEDULER_TaskContext *tc)
1936 {
1937   struct ConnectedPeer *cp = cls;
1938   struct GNUNET_PeerIdentity pid;
1939   struct PendingMessage *pm;
1940
1941   pm = cp->pending_messages_head;
1942   cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
1943   GNUNET_assert (cp->cth == NULL);
1944   if (pm == NULL)
1945     return;
1946   GNUNET_PEER_resolve (cp->pid,
1947                        &pid);
1948   cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
1949   cp->cth = GNUNET_CORE_notify_transmit_ready (core,
1950                                                pm->priority,
1951                                                GNUNET_CONSTANTS_SERVICE_TIMEOUT,
1952                                                &pid,
1953                                                pm->msize,
1954                                                &transmit_to_peer,
1955                                                cp);
1956 }
1957
1958
1959 /**
1960  * Transmit messages by copying it to the target buffer
1961  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
1962  * for writing in the meantime.  In that case, do nothing
1963  * (the disconnect or shutdown handler will take care of the rest).
1964  * If we were able to transmit messages and there are still more
1965  * pending, ask core again for further calls to this function.
1966  *
1967  * @param cls closure, pointer to the 'struct ConnectedPeer*'
1968  * @param size number of bytes available in buf
1969  * @param buf where the callee should write the message
1970  * @return number of bytes written to buf
1971  */
1972 static size_t
1973 transmit_to_peer (void *cls,
1974                   size_t size, void *buf)
1975 {
1976   struct ConnectedPeer *cp = cls;
1977   char *cbuf = buf;
1978   struct PendingMessage *pm;
1979   struct PendingMessage *next_pm;
1980   struct GNUNET_TIME_Absolute now;
1981   struct GNUNET_TIME_Relative min_delay;
1982   struct MigrationReadyBlock *mb;
1983   struct MigrationReadyBlock *next;
1984   struct PutMessage migm;
1985   size_t msize;
1986   unsigned int i;
1987   struct GNUNET_PeerIdentity pid;
1988  
1989   cp->cth = NULL;
1990   if (NULL == buf)
1991     {
1992 #if DEBUG_FS
1993       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1994                   "Dropping message, core too busy.\n");
1995 #endif
1996       GNUNET_LOAD_update (cp->transmission_delay,
1997                           UINT64_MAX);
1998       return 0;
1999     }  
2000   GNUNET_LOAD_update (cp->transmission_delay,
2001                       GNUNET_TIME_absolute_get_duration (cp->last_transmission_request_start).value);
2002   now = GNUNET_TIME_absolute_get ();
2003   msize = 0;
2004   min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
2005   next_pm = cp->pending_messages_head;
2006   while ( (NULL != (pm = next_pm) ) &&
2007           (pm->msize <= size) )
2008     {
2009       next_pm = pm->next;
2010       if (pm->delay_until.value > now.value)
2011         {
2012           min_delay = GNUNET_TIME_relative_min (min_delay,
2013                                                 GNUNET_TIME_absolute_get_remaining (pm->delay_until));
2014           continue;
2015         }
2016       memcpy (&cbuf[msize], &pm[1], pm->msize);
2017       msize += pm->msize;
2018       size -= pm->msize;
2019       if (NULL == pm->pml)
2020         {
2021           GNUNET_CONTAINER_DLL_remove (cp->pending_messages_head,
2022                                        cp->pending_messages_tail,
2023                                        pm);
2024           cp->pending_requests--;
2025         }
2026       destroy_pending_message (pm, cp->pid);
2027     }
2028   if (pm != NULL)
2029     min_delay = GNUNET_TIME_UNIT_ZERO;
2030   if (NULL != cp->pending_messages_head)
2031     {     
2032       GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == cp->delayed_transmission_request_task);
2033       cp->delayed_transmission_request_task
2034         = GNUNET_SCHEDULER_add_delayed (sched,
2035                                         min_delay,
2036                                         &delayed_transmission_request,
2037                                         cp);
2038     }
2039   if (pm == NULL)
2040     {      
2041       GNUNET_PEER_resolve (cp->pid,
2042                            &pid);
2043       next = mig_head;
2044       while (NULL != (mb = next))
2045         {
2046           next = mb->next;
2047           for (i=0;i<MIGRATION_LIST_SIZE;i++)
2048             {
2049               if ( (cp->pid == mb->target_list[i]) &&
2050                    (mb->size + sizeof (migm) <= size) )
2051                 {
2052                   GNUNET_PEER_change_rc (mb->target_list[i], -1);
2053                   mb->target_list[i] = 0;
2054                   mb->used_targets++;
2055                   memset (&migm, 0, sizeof (migm));
2056                   migm.header.size = htons (sizeof (migm) + mb->size);
2057                   migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2058                   migm.type = htonl (mb->type);
2059                   migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
2060                   memcpy (&cbuf[msize], &migm, sizeof (migm));
2061                   msize += sizeof (migm);
2062                   size -= sizeof (migm);
2063                   memcpy (&cbuf[msize], &mb[1], mb->size);
2064                   msize += mb->size;
2065                   size -= mb->size;
2066 #if DEBUG_FS
2067                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2068                               "Pushing migration block `%s' (%u bytes) to `%s'\n",
2069                               GNUNET_h2s (&mb->query),
2070                               (unsigned int) mb->size,
2071                               GNUNET_i2s (&pid));
2072 #endif    
2073                   break;
2074                 }
2075               else
2076                 {
2077 #if DEBUG_FS
2078                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2079                               "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
2080                               GNUNET_h2s (&mb->query),
2081                               (unsigned int) mb->size,
2082                               GNUNET_i2s (&pid));
2083 #endif    
2084                 }
2085             }
2086           if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
2087                (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
2088             {
2089               delete_migration_block (mb);
2090               consider_migration_gathering ();
2091             }
2092         }
2093       consider_migration (NULL, 
2094                           &pid.hashPubKey,
2095                           cp);
2096     }
2097 #if DEBUG_FS
2098   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2099               "Transmitting %u bytes to peer with PID %u\n",
2100               (unsigned int) msize,
2101               (unsigned int) cp->pid);
2102 #endif
2103   return msize;
2104 }
2105
2106
2107 /**
2108  * Add a message to the set of pending messages for the given peer.
2109  *
2110  * @param cp peer to send message to
2111  * @param pm message to queue
2112  * @param pr request on which behalf this message is being queued
2113  */
2114 static void
2115 add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
2116                                   struct PendingMessage *pm,
2117                                   struct PendingRequest *pr)
2118 {
2119   struct PendingMessage *pos;
2120   struct PendingMessageList *pml;
2121   struct GNUNET_PeerIdentity pid;
2122
2123   GNUNET_assert (pm->next == NULL);
2124   GNUNET_assert (pm->pml == NULL);    
2125   if (pr != NULL)
2126     {
2127       pml = GNUNET_malloc (sizeof (struct PendingMessageList));
2128       pml->req = pr;
2129       pml->target = cp;
2130       pml->pm = pm;
2131       pm->pml = pml;  
2132       GNUNET_CONTAINER_DLL_insert (pr->pending_head,
2133                                    pr->pending_tail,
2134                                    pml);
2135     }
2136   pos = cp->pending_messages_head;
2137   while ( (pos != NULL) &&
2138           (pm->priority < pos->priority) )
2139     pos = pos->next;    
2140   GNUNET_CONTAINER_DLL_insert_after (cp->pending_messages_head,
2141                                      cp->pending_messages_tail,
2142                                      pos,
2143                                      pm);
2144   cp->pending_requests++;
2145   if (cp->pending_requests > MAX_QUEUE_PER_PEER)
2146     destroy_pending_message (cp->pending_messages_tail, 0);  
2147   GNUNET_PEER_resolve (cp->pid, &pid);
2148   if (NULL != cp->cth)
2149     {
2150       GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
2151       cp->cth = NULL;
2152     }
2153   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
2154     {
2155       GNUNET_SCHEDULER_cancel (sched, cp->delayed_transmission_request_task);
2156       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
2157     }
2158   /* need to schedule transmission */
2159   cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
2160   cp->cth = GNUNET_CORE_notify_transmit_ready (core,
2161                                                cp->pending_messages_head->priority,
2162                                                MAX_TRANSMIT_DELAY,
2163                                                &pid,
2164                                                cp->pending_messages_head->msize,
2165                                                &transmit_to_peer,
2166                                                cp);
2167   if (cp->cth == NULL)
2168     {
2169 #if DEBUG_FS
2170       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2171                   "Failed to schedule transmission with core!\n");
2172 #endif
2173       GNUNET_STATISTICS_update (stats,
2174                                 gettext_noop ("# CORE transmission failures"),
2175                                 1,
2176                                 GNUNET_NO);
2177     }
2178 }
2179
2180
2181 /**
2182  * Test if the DATABASE (GET) load on this peer is too high
2183  * to even consider processing the query at
2184  * all.  
2185  * 
2186  * @return GNUNET_YES if the load is too high to do anything (load high)
2187  *         GNUNET_NO to process normally (load normal)
2188  *         GNUNET_SYSERR to process for free (load low)
2189  */
2190 static int
2191 test_get_load_too_high (uint32_t priority)
2192 {
2193   double ld;
2194
2195   ld = GNUNET_LOAD_get_load (datastore_get_load);
2196   if (ld < 1)
2197     {
2198       GNUNET_STATISTICS_update (stats,
2199                                 gettext_noop ("# requests done for free (low load)"),
2200                                 1,
2201                                 GNUNET_NO);
2202       return GNUNET_SYSERR;
2203     }
2204   if (ld <= priority)
2205     {
2206       GNUNET_STATISTICS_update (stats,
2207                                 gettext_noop ("# requests done for a price (normal load)"),
2208                                 1,
2209                                 GNUNET_NO);
2210       return GNUNET_NO;
2211     }
2212   GNUNET_STATISTICS_update (stats,
2213                             gettext_noop ("# requests dropped due to high load"),
2214                             1,
2215                             GNUNET_NO);
2216   return GNUNET_YES;
2217 }
2218
2219
2220
2221
2222 /**
2223  * Test if the DATABASE (PUT) load on this peer is too high
2224  * to even consider processing the query at
2225  * all.  
2226  * 
2227  * @return GNUNET_YES if the load is too high to do anything (load high)
2228  *         GNUNET_NO to process normally (load normal or low)
2229  */
2230 static int
2231 test_put_load_too_high (uint32_t priority)
2232 {
2233   double ld;
2234
2235   if (GNUNET_LOAD_get_average (datastore_put_load) < 50)
2236     return GNUNET_NO; /* very fast */
2237   ld = GNUNET_LOAD_get_load (datastore_put_load);
2238   if ( (ld < 1) || (ld < priority) )
2239     return GNUNET_NO;
2240   GNUNET_STATISTICS_update (stats,
2241                             gettext_noop ("# storage requests dropped due to high load"),
2242                             1,
2243                             GNUNET_NO);
2244   return GNUNET_YES;
2245 }
2246
2247
2248 /* ******************* Pending Request Refresh Task ******************** */
2249
2250
2251
2252 /**
2253  * We use a random delay to make the timing of requests less
2254  * predictable.  This function returns such a random delay.  We add a base
2255  * delay of MAX_CORK_DELAY (1s).
2256  *
2257  * FIXME: make schedule dependent on the specifics of the request?
2258  * Or bandwidth and number of connected peers and load?
2259  *
2260  * @return random delay to use for some request, between 1s and 1000+TTL_DECREMENT ms
2261  */
2262 static struct GNUNET_TIME_Relative
2263 get_processing_delay ()
2264 {
2265   return 
2266     GNUNET_TIME_relative_add (GNUNET_CONSTANTS_MAX_CORK_DELAY,
2267                               GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
2268                                                              GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2269                                                                                        TTL_DECREMENT)));
2270 }
2271
2272
2273 /**
2274  * We're processing a GET request from another peer and have decided
2275  * to forward it to other peers.  This function is called periodically
2276  * and should forward the request to other peers until we have all
2277  * possible replies.  If we have transmitted the *only* reply to
2278  * the initiator we should destroy the pending request.  If we have
2279  * many replies in the queue to the initiator, we should delay sending
2280  * out more queries until the reply queue has shrunk some.
2281  *
2282  * @param cls our "struct ProcessGetContext *"
2283  * @param tc unused
2284  */
2285 static void
2286 forward_request_task (void *cls,
2287                       const struct GNUNET_SCHEDULER_TaskContext *tc);
2288
2289
2290 /**
2291  * Function called after we either failed or succeeded
2292  * at transmitting a query to a peer.  
2293  *
2294  * @param cls the requests "struct PendingRequest*"
2295  * @param tpid ID of receiving peer, 0 on transmission error
2296  */
2297 static void
2298 transmit_query_continuation (void *cls,
2299                              GNUNET_PEER_Id tpid)
2300 {
2301   struct PendingRequest *pr = cls;
2302
2303   GNUNET_STATISTICS_update (stats,
2304                             gettext_noop ("# queries scheduled for forwarding"),
2305                             -1,
2306                             GNUNET_NO);
2307   if (tpid == 0)   
2308     {
2309 #if DEBUG_FS
2310       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2311                   "Transmission of request failed, will try again later.\n");
2312 #endif
2313       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2314         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2315                                                  get_processing_delay (),
2316                                                  &forward_request_task,
2317                                                  pr); 
2318       return;    
2319     }
2320   GNUNET_STATISTICS_update (stats,
2321                             gettext_noop ("# queries forwarded"),
2322                             1,
2323                             GNUNET_NO);
2324   GNUNET_PEER_change_rc (tpid, 1);
2325   if (pr->used_pids_off == pr->used_pids_size)
2326     GNUNET_array_grow (pr->used_pids,
2327                        pr->used_pids_size,
2328                        pr->used_pids_size * 2 + 2);
2329   pr->used_pids[pr->used_pids_off++] = tpid;
2330   if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2331     pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2332                                              get_processing_delay (),
2333                                              &forward_request_task,
2334                                              pr);
2335 }
2336
2337
2338 /**
2339  * How many bytes should a bloomfilter be if we have already seen
2340  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
2341  * of bits set per entry.  Furthermore, we should not re-size the
2342  * filter too often (to keep it cheap).
2343  *
2344  * Since other peers will also add entries but not resize the filter,
2345  * we should generally pick a slightly larger size than what the
2346  * strict math would suggest.
2347  *
2348  * @return must be a power of two and smaller or equal to 2^15.
2349  */
2350 static size_t
2351 compute_bloomfilter_size (unsigned int entry_count)
2352 {
2353   size_t size;
2354   unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
2355   uint16_t max = 1 << 15;
2356
2357   if (entry_count > max)
2358     return max;
2359   size = 8;
2360   while ((size < max) && (size < ideal))
2361     size *= 2;
2362   if (size > max)
2363     return max;
2364   return size;
2365 }
2366
2367
2368 /**
2369  * Recalculate our bloom filter for filtering replies.  This function
2370  * will create a new bloom filter from scratch, so it should only be
2371  * called if we have no bloomfilter at all (and hence can create a
2372  * fresh one of minimal size without problems) OR if our peer is the
2373  * initiator (in which case we may resize to larger than mimimum size).
2374  *
2375  * @param pr request for which the BF is to be recomputed
2376  */
2377 static void
2378 refresh_bloomfilter (struct PendingRequest *pr)
2379 {
2380   unsigned int i;
2381   size_t nsize;
2382   GNUNET_HashCode mhash;
2383
2384   nsize = compute_bloomfilter_size (pr->replies_seen_off);
2385   if (nsize == pr->bf_size)
2386     return; /* size not changed */
2387   if (pr->bf != NULL)
2388     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
2389   pr->bf_size = nsize;
2390   pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
2391   pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
2392                                               pr->bf_size,
2393                                               BLOOMFILTER_K);
2394   for (i=0;i<pr->replies_seen_off;i++)
2395     {
2396       GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i],
2397                                 pr->mingle,
2398                                 &mhash);
2399       GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
2400     }
2401 }
2402
2403
2404 /**
2405  * Function called after we've tried to reserve a certain amount of
2406  * bandwidth for a reply.  Check if we succeeded and if so send our
2407  * query.
2408  *
2409  * @param cls the requests "struct PendingRequest*"
2410  * @param peer identifies the peer
2411  * @param bpm_in set to the current bandwidth limit (receiving) for this peer
2412  * @param bpm_out set to the current bandwidth limit (sending) for this peer
2413  * @param amount set to the amount that was actually reserved or unreserved
2414  * @param preference current traffic preference for the given peer
2415  */
2416 static void
2417 target_reservation_cb (void *cls,
2418                        const struct
2419                        GNUNET_PeerIdentity * peer,
2420                        struct GNUNET_BANDWIDTH_Value32NBO bpm_in,
2421                        struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
2422                        int amount,
2423                        uint64_t preference)
2424 {
2425   struct PendingRequest *pr = cls;
2426   struct ConnectedPeer *cp;
2427   struct PendingMessage *pm;
2428   struct GetMessage *gm;
2429   GNUNET_HashCode *ext;
2430   char *bfdata;
2431   size_t msize;
2432   unsigned int k;
2433   int no_route;
2434   uint32_t bm;
2435
2436   pr->irc = NULL;
2437   if (peer == NULL)
2438     {
2439       /* error in communication with core, try again later */
2440       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2441         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2442                                                  get_processing_delay (),
2443                                                  &forward_request_task,
2444                                                  pr);
2445       return;
2446     }
2447   // (3) transmit, update ttl/priority
2448   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2449                                           &peer->hashPubKey);
2450   if (cp == NULL)
2451     {
2452       /* Peer must have just left */
2453 #if DEBUG_FS
2454       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2455                   "Selected peer disconnected!\n");
2456 #endif
2457       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2458         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2459                                                  get_processing_delay (),
2460                                                  &forward_request_task,
2461                                                  pr);
2462       return;
2463     }
2464   no_route = GNUNET_NO;
2465   if (amount == 0)
2466     {
2467       if (pr->cp == NULL)
2468         {
2469 #if DEBUG_FS > 1
2470           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2471                       "Failed to reserve bandwidth for reply (got %d/%u bytes only)!\n",
2472                       amount,
2473                       DBLOCK_SIZE);
2474 #endif
2475           GNUNET_STATISTICS_update (stats,
2476                                     gettext_noop ("# reply bandwidth reservation requests failed"),
2477                                     1,
2478                                     GNUNET_NO);
2479           if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2480             pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2481                                                      get_processing_delay (),
2482                                                      &forward_request_task,
2483                                                      pr);
2484           return;  /* this target round failed */
2485         }
2486       no_route = GNUNET_YES;
2487     }
2488   
2489   GNUNET_STATISTICS_update (stats,
2490                             gettext_noop ("# queries scheduled for forwarding"),
2491                             1,
2492                             GNUNET_NO);
2493   /* build message and insert message into priority queue */
2494 #if DEBUG_FS
2495   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2496               "Forwarding request `%s' to `%4s'!\n",
2497               GNUNET_h2s (&pr->query),
2498               GNUNET_i2s (peer));
2499 #endif
2500   k = 0;
2501   bm = 0;
2502   if (GNUNET_YES == no_route)
2503     {
2504       bm |= GET_MESSAGE_BIT_RETURN_TO;
2505       k++;      
2506     }
2507   if (pr->namespace != NULL)
2508     {
2509       bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
2510       k++;
2511     }
2512   if (pr->target_pid != 0)
2513     {
2514       bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
2515       k++;
2516     }
2517   msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
2518   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
2519   pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
2520   pm->msize = msize;
2521   gm = (struct GetMessage*) &pm[1];
2522   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
2523   gm->header.size = htons (msize);
2524   gm->type = htonl (pr->type);
2525   pr->remaining_priority /= 2;
2526   gm->priority = htonl (pr->remaining_priority);
2527   gm->ttl = htonl (pr->ttl);
2528   gm->filter_mutator = htonl(pr->mingle); 
2529   gm->hash_bitmap = htonl (bm);
2530   gm->query = pr->query;
2531   ext = (GNUNET_HashCode*) &gm[1];
2532   k = 0;
2533   if (GNUNET_YES == no_route)
2534     GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2535   if (pr->namespace != NULL)
2536     memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
2537   if (pr->target_pid != 0)
2538     GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2539   bfdata = (char *) &ext[k];
2540   if (pr->bf != NULL)
2541     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
2542                                                bfdata,
2543                                                pr->bf_size);
2544   pm->cont = &transmit_query_continuation;
2545   pm->cont_cls = pr;
2546   add_to_pending_messages_for_peer (cp, pm, pr);
2547 }
2548
2549
2550 /**
2551  * Closure used for "target_peer_select_cb".
2552  */
2553 struct PeerSelectionContext 
2554 {
2555   /**
2556    * The request for which we are selecting
2557    * peers.
2558    */
2559   struct PendingRequest *pr;
2560
2561   /**
2562    * Current "prime" target.
2563    */
2564   struct GNUNET_PeerIdentity target;
2565
2566   /**
2567    * How much do we like this target?
2568    */
2569   double target_score;
2570
2571 };
2572
2573
2574 /**
2575  * Function called for each connected peer to determine
2576  * which one(s) would make good targets for forwarding.
2577  *
2578  * @param cls closure (struct PeerSelectionContext)
2579  * @param key current key code (peer identity)
2580  * @param value value in the hash map (struct ConnectedPeer)
2581  * @return GNUNET_YES if we should continue to
2582  *         iterate,
2583  *         GNUNET_NO if not.
2584  */
2585 static int
2586 target_peer_select_cb (void *cls,
2587                        const GNUNET_HashCode * key,
2588                        void *value)
2589 {
2590   struct PeerSelectionContext *psc = cls;
2591   struct ConnectedPeer *cp = value;
2592   struct PendingRequest *pr = psc->pr;
2593   double score;
2594   unsigned int i;
2595   unsigned int pc;
2596
2597   /* 1) check that this peer is not the initiator */
2598   if (cp == pr->cp)
2599     {
2600 #if DEBUG_FS
2601       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2602                   "Skipping initiator in forwarding selection\n");
2603 #endif
2604       return GNUNET_YES; /* skip */        
2605     }
2606
2607   /* 2) check if we have already (recently) forwarded to this peer */
2608   pc = 0;
2609   for (i=0;i<pr->used_pids_off;i++)
2610     if (pr->used_pids[i] == cp->pid) 
2611       {
2612         pc++;
2613         if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2614                                            RETRY_PROBABILITY_INV))
2615           {
2616 #if DEBUG_FS
2617             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2618                         "NOT re-trying query that was previously transmitted %u times\n",
2619                         (unsigned int) pr->used_pids_off);
2620 #endif
2621             return GNUNET_YES; /* skip */
2622           }
2623       }
2624 #if DEBUG_FS
2625   if (0 < pc)
2626     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2627                 "Re-trying query that was previously transmitted %u times to this peer\n",
2628                 (unsigned int) pc);
2629 #endif
2630   /* 3) calculate how much we'd like to forward to this peer,
2631      starting with a random value that is strong enough
2632      to at least give any peer a chance sometimes 
2633      (compared to the other factors that come later) */
2634   /* 3a) count successful (recent) routes from cp for same source */
2635   if (pr->cp != NULL)
2636     {
2637       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2638                                         P2P_SUCCESS_LIST_SIZE);
2639       for (i=0;i<P2P_SUCCESS_LIST_SIZE;i++)
2640         if (cp->last_p2p_replies[i] == pr->cp->pid)
2641           score += 1.0; /* likely successful based on hot path */
2642     }
2643   else
2644     {
2645       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2646                                         CS2P_SUCCESS_LIST_SIZE);
2647       for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
2648         if (cp->last_client_replies[i] == pr->client_request_list->client_list->client)
2649           score += 1.0; /* likely successful based on hot path */
2650     }
2651   /* 3b) include latency */
2652   if (cp->avg_delay.value < 4 * TTL_DECREMENT)
2653     score += 1.0; /* likely fast based on latency */
2654   /* 3c) include priorities */
2655   if (cp->avg_priority <= pr->remaining_priority / 2.0)
2656     score += 1.0; /* likely successful based on priorities */
2657   /* 3d) penalize for queue size */  
2658   score -= (2.0 * cp->pending_requests / (double) MAX_QUEUE_PER_PEER); 
2659   /* 3e) include peer proximity */
2660   score -= (2.0 * (GNUNET_CRYPTO_hash_distance_u32 (key,
2661                                                     &pr->query)) / (double) UINT32_MAX);
2662   /* 4) super-bonus for being the known target */
2663   if (pr->target_pid == cp->pid)
2664     score += 100.0;
2665   /* store best-fit in closure */
2666 #if DEBUG_FS
2667   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2668               "Peer `%s' gets score %f for forwarding query, max is %f\n",
2669               GNUNET_h2s (key),
2670               score,
2671               psc->target_score);
2672 #endif  
2673   score++; /* avoid zero */
2674   if (score > psc->target_score)
2675     {
2676       psc->target_score = score;
2677       psc->target.hashPubKey = *key; 
2678     }
2679   return GNUNET_YES;
2680 }
2681   
2682
2683 /**
2684  * The priority level imposes a bound on the maximum
2685  * value for the ttl that can be requested.
2686  *
2687  * @param ttl_in requested ttl
2688  * @param prio given priority
2689  * @return ttl_in if ttl_in is below the limit,
2690  *         otherwise the ttl-limit for the given priority
2691  */
2692 static int32_t
2693 bound_ttl (int32_t ttl_in, uint32_t prio)
2694 {
2695   unsigned long long allowed;
2696
2697   if (ttl_in <= 0)
2698     return ttl_in;
2699   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
2700   if (ttl_in > allowed)      
2701     {
2702       if (allowed >= (1 << 30))
2703         return 1 << 30;
2704       return allowed;
2705     }
2706   return ttl_in;
2707 }
2708
2709
2710 /**
2711  * Iterator called on each result obtained for a DHT
2712  * operation that expects a reply
2713  *
2714  * @param cls closure
2715  * @param exp when will this value expire
2716  * @param key key of the result
2717  * @param get_path NULL-terminated array of pointers
2718  *                 to the peers on reverse GET path (or NULL if not recorded)
2719  * @param put_path NULL-terminated array of pointers
2720  *                 to the peers on the PUT path (or NULL if not recorded)
2721  * @param type type of the result
2722  * @param size number of bytes in data
2723  * @param data pointer to the result data
2724  */
2725 static void
2726 process_dht_reply (void *cls,
2727                    struct GNUNET_TIME_Absolute exp,
2728                    const GNUNET_HashCode * key,
2729                    const struct GNUNET_PeerIdentity * const *get_path,
2730                    const struct GNUNET_PeerIdentity * const *put_path,
2731                    enum GNUNET_BLOCK_Type type,
2732                    size_t size,
2733                    const void *data);
2734
2735
2736 /**
2737  * We're processing a GET request and have decided
2738  * to forward it to other peers.  This function is called periodically
2739  * and should forward the request to other peers until we have all
2740  * possible replies.  If we have transmitted the *only* reply to
2741  * the initiator we should destroy the pending request.  If we have
2742  * many replies in the queue to the initiator, we should delay sending
2743  * out more queries until the reply queue has shrunk some.
2744  *
2745  * @param cls our "struct ProcessGetContext *"
2746  * @param tc unused
2747  */
2748 static void
2749 forward_request_task (void *cls,
2750                      const struct GNUNET_SCHEDULER_TaskContext *tc)
2751 {
2752   struct PendingRequest *pr = cls;
2753   struct PeerSelectionContext psc;
2754   struct ConnectedPeer *cp; 
2755   struct GNUNET_TIME_Relative delay;
2756
2757   pr->task = GNUNET_SCHEDULER_NO_TASK;
2758   if (pr->irc != NULL)
2759     {
2760 #if DEBUG_FS
2761       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2762                   "Forwarding of query `%s' not attempted due to pending local lookup!\n",
2763                   GNUNET_h2s (&pr->query));
2764 #endif
2765       return; /* already pending */
2766     }
2767   if (GNUNET_YES == pr->local_only)
2768     return; /* configured to not do P2P search */
2769   /* (0) try DHT */
2770   if ( (0 == pr->anonymity_level) &&
2771        (GNUNET_YES != pr->forward_only) &&
2772        (pr->type != GNUNET_BLOCK_TYPE_FS_DBLOCK) &&
2773        (pr->type != GNUNET_BLOCK_TYPE_FS_IBLOCK) )
2774     {
2775       pr->dht_get = GNUNET_DHT_get_start (dht_handle,
2776                                           GNUNET_TIME_UNIT_FOREVER_REL,
2777                                           pr->type,
2778                                           &pr->query,
2779                                           GNUNET_DHT_RO_NONE,
2780                                           pr->bf,
2781                                           pr->mingle,
2782                                           pr->namespace,
2783                                           (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
2784                                           &process_dht_reply,
2785                                           pr);
2786     }
2787   /* (1) select target */
2788   psc.pr = pr;
2789   psc.target_score = -DBL_MAX;
2790   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
2791                                          &target_peer_select_cb,
2792                                          &psc);  
2793   if (psc.target_score == -DBL_MAX)
2794     {
2795       delay = get_processing_delay ();
2796 #if DEBUG_FS 
2797       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2798                   "No peer selected for forwarding of query `%s', will try again in %llu ms!\n",
2799                   GNUNET_h2s (&pr->query),
2800                   delay.value);
2801 #endif
2802       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2803                                                delay,
2804                                                &forward_request_task,
2805                                                pr);
2806       return; /* nobody selected */
2807     }
2808   /* (3) update TTL/priority */
2809   if (pr->client_request_list != NULL)
2810     {
2811       /* FIXME: use better algorithm!? */
2812       if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2813                                          4))
2814         pr->priority++;
2815       /* bound priority we use by priorities we see from other peers
2816          rounded up (must round up so that we can see non-zero
2817          priorities, but round up as little as possible to make it
2818          plausible that we forwarded another peers request) */
2819       if (pr->priority > current_priorities + 1.0)
2820         pr->priority = (uint32_t) current_priorities + 1.0;
2821       pr->ttl = bound_ttl (pr->ttl + TTL_DECREMENT * 2,
2822                            pr->priority);
2823 #if DEBUG_FS
2824       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2825                   "Trying query `%s' with priority %u and TTL %d.\n",
2826                   GNUNET_h2s (&pr->query),
2827                   pr->priority,
2828                   pr->ttl);
2829 #endif
2830     }
2831
2832   /* (3) reserve reply bandwidth */
2833   if (GNUNET_NO == pr->forward_only)
2834     {
2835       cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2836                                               &psc.target.hashPubKey);
2837       GNUNET_assert (NULL != cp);
2838       pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
2839                                                     &psc.target,
2840                                                     GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
2841                                                     GNUNET_BANDWIDTH_value_init (UINT32_MAX),
2842                                                     DBLOCK_SIZE * 2, 
2843                                                     cp->inc_preference,
2844                                                     &target_reservation_cb,
2845                                                     pr);
2846       cp->inc_preference = 0;
2847     }
2848   else
2849     {
2850       /* force forwarding */
2851       static struct GNUNET_BANDWIDTH_Value32NBO zerobw;
2852       target_reservation_cb (pr, &psc.target,
2853                              zerobw, zerobw, 0, 0.0);
2854     }
2855 }
2856
2857
2858 /* **************************** P2P PUT Handling ************************ */
2859
2860
2861 /**
2862  * Function called after we either failed or succeeded
2863  * at transmitting a reply to a peer.  
2864  *
2865  * @param cls the requests "struct PendingRequest*"
2866  * @param tpid ID of receiving peer, 0 on transmission error
2867  */
2868 static void
2869 transmit_reply_continuation (void *cls,
2870                              GNUNET_PEER_Id tpid)
2871 {
2872   struct PendingRequest *pr = cls;
2873   
2874   switch (pr->type)
2875     {
2876     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
2877     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
2878       /* only one reply expected, done with the request! */
2879       destroy_pending_request (pr);
2880       break;
2881     case GNUNET_BLOCK_TYPE_ANY:
2882     case GNUNET_BLOCK_TYPE_FS_KBLOCK:
2883     case GNUNET_BLOCK_TYPE_FS_SBLOCK:
2884       break;
2885     default:
2886       GNUNET_break (0);
2887       break;
2888     }
2889 }
2890
2891
2892 /**
2893  * Transmit the given message by copying it to the target buffer
2894  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
2895  * for writing in the meantime.  In that case, do nothing
2896  * (the disconnect or shutdown handler will take care of the rest).
2897  * If we were able to transmit messages and there are still more
2898  * pending, ask core again for further calls to this function.
2899  *
2900  * @param cls closure, pointer to the 'struct ClientList*'
2901  * @param size number of bytes available in buf
2902  * @param buf where the callee should write the message
2903  * @return number of bytes written to buf
2904  */
2905 static size_t
2906 transmit_to_client (void *cls,
2907                   size_t size, void *buf)
2908 {
2909   struct ClientList *cl = cls;
2910   char *cbuf = buf;
2911   struct ClientResponseMessage *creply;
2912   size_t msize;
2913   
2914   cl->th = NULL;
2915   if (NULL == buf)
2916     {
2917 #if DEBUG_FS
2918       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2919                   "Not sending reply, client communication problem.\n");
2920 #endif
2921       return 0;
2922     }
2923   msize = 0;
2924   while ( (NULL != (creply = cl->res_head) ) &&
2925           (creply->msize <= size) )
2926     {
2927       memcpy (&cbuf[msize], &creply[1], creply->msize);
2928       msize += creply->msize;
2929       size -= creply->msize;
2930       GNUNET_CONTAINER_DLL_remove (cl->res_head,
2931                                    cl->res_tail,
2932                                    creply);
2933       GNUNET_free (creply);
2934     }
2935   if (NULL != creply)
2936     cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
2937                                                   creply->msize,
2938                                                   GNUNET_TIME_UNIT_FOREVER_REL,
2939                                                   &transmit_to_client,
2940                                                   cl);
2941 #if DEBUG_FS
2942   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2943               "Transmitted %u bytes to client\n",
2944               (unsigned int) msize);
2945 #endif
2946   return msize;
2947 }
2948
2949
2950 /**
2951  * Closure for "process_reply" function.
2952  */
2953 struct ProcessReplyClosure
2954 {
2955   /**
2956    * The data for the reply.
2957    */
2958   const void *data;
2959
2960   /**
2961    * Who gave us this reply? NULL for local host (or DHT)
2962    */
2963   struct ConnectedPeer *sender;
2964
2965   /**
2966    * When the reply expires.
2967    */
2968   struct GNUNET_TIME_Absolute expiration;
2969
2970   /**
2971    * Size of data.
2972    */
2973   size_t size;
2974
2975   /**
2976    * Type of the block.
2977    */
2978   enum GNUNET_BLOCK_Type type;
2979
2980   /**
2981    * How much was this reply worth to us?
2982    */
2983   uint32_t priority;
2984
2985   /**
2986    * Evaluation result (returned).
2987    */
2988   enum GNUNET_BLOCK_EvaluationResult eval;
2989
2990   /**
2991    * Did we finish processing the associated request?
2992    */ 
2993   int finished;
2994
2995   /**
2996    * Did we find a matching request?
2997    */
2998   int request_found;
2999 };
3000
3001
3002 /**
3003  * We have received a reply; handle it!
3004  *
3005  * @param cls response (struct ProcessReplyClosure)
3006  * @param key our query
3007  * @param value value in the hash map (info about the query)
3008  * @return GNUNET_YES (we should continue to iterate)
3009  */
3010 static int
3011 process_reply (void *cls,
3012                const GNUNET_HashCode * key,
3013                void *value)
3014 {
3015   struct ProcessReplyClosure *prq = cls;
3016   struct PendingRequest *pr = value;
3017   struct PendingMessage *reply;
3018   struct ClientResponseMessage *creply;
3019   struct ClientList *cl;
3020   struct PutMessage *pm;
3021   struct ConnectedPeer *cp;
3022   struct GNUNET_TIME_Relative cur_delay;
3023   size_t msize;
3024
3025 #if DEBUG_FS
3026   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3027               "Matched result (type %u) for query `%s' with pending request\n",
3028               (unsigned int) prq->type,
3029               GNUNET_h2s (key));
3030 #endif  
3031   GNUNET_STATISTICS_update (stats,
3032                             gettext_noop ("# replies received and matched"),
3033                             1,
3034                             GNUNET_NO);
3035   if (prq->sender != NULL)
3036     {
3037       cur_delay = GNUNET_TIME_absolute_get_duration (pr->start_time);
3038       prq->sender->avg_delay.value
3039         = (prq->sender->avg_delay.value * 
3040            (RUNAVG_DELAY_N - 1) + cur_delay.value) / RUNAVG_DELAY_N; 
3041       prq->sender->avg_priority
3042         = (prq->sender->avg_priority * 
3043            (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
3044       if (pr->cp != NULL)
3045         {
3046           GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
3047                                  [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], 
3048                                  -1);
3049           GNUNET_PEER_change_rc (pr->cp->pid, 1);
3050           prq->sender->last_p2p_replies
3051             [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
3052             = pr->cp->pid;
3053         }
3054       else
3055         {
3056           if (NULL != prq->sender->last_client_replies
3057               [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
3058             GNUNET_SERVER_client_drop (prq->sender->last_client_replies
3059                                        [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
3060           prq->sender->last_client_replies
3061             [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
3062             = pr->client_request_list->client_list->client;
3063           GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
3064         }
3065     }
3066   prq->eval = GNUNET_BLOCK_evaluate (block_ctx,
3067                                      prq->type,
3068                                      key,
3069                                      &pr->bf,
3070                                      pr->mingle,
3071                                      pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
3072                                      prq->data,
3073                                      prq->size);
3074   switch (prq->eval)
3075     {
3076     case GNUNET_BLOCK_EVALUATION_OK_MORE:
3077       break;
3078     case GNUNET_BLOCK_EVALUATION_OK_LAST:
3079       while (NULL != pr->pending_head)
3080         destroy_pending_message_list_entry (pr->pending_head);
3081       if (pr->qe != NULL)
3082         {
3083           if (pr->client_request_list != NULL)
3084             GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
3085                                         GNUNET_YES);
3086           GNUNET_DATASTORE_cancel (pr->qe);
3087           pr->qe = NULL;
3088         }
3089       pr->do_remove = GNUNET_YES;
3090       if (pr->task != GNUNET_SCHEDULER_NO_TASK)
3091         {
3092           GNUNET_SCHEDULER_cancel (sched,
3093                                    pr->task);
3094           pr->task = GNUNET_SCHEDULER_NO_TASK;
3095         }
3096       GNUNET_break (GNUNET_YES ==
3097                     GNUNET_CONTAINER_multihashmap_remove (query_request_map,
3098                                                           key,
3099                                                           pr));
3100       GNUNET_LOAD_update (rt_entry_lifetime,
3101                           GNUNET_TIME_absolute_get_duration (pr->start_time).value);
3102       break;
3103     case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
3104       GNUNET_STATISTICS_update (stats,
3105                                 gettext_noop ("# duplicate replies discarded (bloomfilter)"),
3106                                 1,
3107                                 GNUNET_NO);
3108 #if DEBUG_FS
3109       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3110                   "Duplicate response `%s', discarding.\n",
3111                   GNUNET_h2s (&mhash));
3112 #endif
3113       return GNUNET_YES; /* duplicate */
3114     case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
3115       return GNUNET_YES; /* wrong namespace */  
3116     case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
3117       GNUNET_break (0);
3118       return GNUNET_YES;
3119     case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
3120       GNUNET_break (0);
3121       return GNUNET_YES;
3122     case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
3123       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3124                   _("Unsupported block type %u\n"),
3125                   prq->type);
3126       return GNUNET_NO;
3127     }
3128   if (pr->client_request_list != NULL)
3129     {
3130       if (pr->replies_seen_size == pr->replies_seen_off)
3131         GNUNET_array_grow (pr->replies_seen,
3132                            pr->replies_seen_size,
3133                            pr->replies_seen_size * 2 + 4);      
3134       GNUNET_CRYPTO_hash (prq->data,
3135                           prq->size,
3136                           &pr->replies_seen[pr->replies_seen_off++]);         
3137       refresh_bloomfilter (pr);
3138     }
3139   if (NULL == prq->sender)
3140     {
3141 #if DEBUG_FS
3142       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3143                   "Found result for query `%s' in local datastore\n",
3144                   GNUNET_h2s (key));
3145 #endif
3146       GNUNET_STATISTICS_update (stats,
3147                                 gettext_noop ("# results found locally"),
3148                                 1,
3149                                 GNUNET_NO);      
3150     }
3151   prq->priority += pr->remaining_priority;
3152   pr->remaining_priority = 0;
3153   pr->results_found++;
3154   prq->request_found = GNUNET_YES;
3155   if (NULL != pr->client_request_list)
3156     {
3157       GNUNET_STATISTICS_update (stats,
3158                                 gettext_noop ("# replies received for local clients"),
3159                                 1,
3160                                 GNUNET_NO);
3161       cl = pr->client_request_list->client_list;
3162       msize = sizeof (struct PutMessage) + prq->size;
3163       creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage));
3164       creply->msize = msize;
3165       creply->client_list = cl;
3166       GNUNET_CONTAINER_DLL_insert_after (cl->res_head,
3167                                          cl->res_tail,
3168                                          cl->res_tail,
3169                                          creply);      
3170       pm = (struct PutMessage*) &creply[1];
3171       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3172       pm->header.size = htons (msize);
3173       pm->type = htonl (prq->type);
3174       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3175       memcpy (&pm[1], prq->data, prq->size);      
3176       if (NULL == cl->th)
3177         {
3178 #if DEBUG_FS
3179           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3180                       "Transmitting result for query `%s' to client\n",
3181                       GNUNET_h2s (key));
3182 #endif  
3183           cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
3184                                                         msize,
3185                                                         GNUNET_TIME_UNIT_FOREVER_REL,
3186                                                         &transmit_to_client,
3187                                                         cl);
3188         }
3189       GNUNET_break (cl->th != NULL);
3190       if (pr->do_remove)                
3191         {
3192           prq->finished = GNUNET_YES;
3193           destroy_pending_request (pr);         
3194         }
3195     }
3196   else
3197     {
3198       cp = pr->cp;
3199 #if DEBUG_FS
3200       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3201                   "Transmitting result for query `%s' to other peer (PID=%u)\n",
3202                   GNUNET_h2s (key),
3203                   (unsigned int) cp->pid);
3204 #endif  
3205       GNUNET_STATISTICS_update (stats,
3206                                 gettext_noop ("# replies received for other peers"),
3207                                 1,
3208                                 GNUNET_NO);
3209       msize = sizeof (struct PutMessage) + prq->size;
3210       reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
3211       reply->cont = &transmit_reply_continuation;
3212       reply->cont_cls = pr;
3213 #if SUPPORT_DELAYS
3214       reply->delay_until 
3215         = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
3216                                                                            GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3217                                                                                                      TTL_DECREMENT)));
3218 #endif
3219       reply->msize = msize;
3220       reply->priority = UINT32_MAX; /* send replies first! */
3221       pm = (struct PutMessage*) &reply[1];
3222       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3223       pm->header.size = htons (msize);
3224       pm->type = htonl (prq->type);
3225       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3226       memcpy (&pm[1], prq->data, prq->size);
3227       add_to_pending_messages_for_peer (cp, reply, pr);
3228     }
3229   return GNUNET_YES;
3230 }
3231
3232
3233 /**
3234  * Iterator called on each result obtained for a DHT
3235  * operation that expects a reply
3236  *
3237  * @param cls closure
3238  * @param exp when will this value expire
3239  * @param key key of the result
3240  * @param get_path NULL-terminated array of pointers
3241  *                 to the peers on reverse GET path (or NULL if not recorded)
3242  * @param put_path NULL-terminated array of pointers
3243  *                 to the peers on the PUT path (or NULL if not recorded)
3244  * @param type type of the result
3245  * @param size number of bytes in data
3246  * @param data pointer to the result data
3247  */
3248 static void
3249 process_dht_reply (void *cls,
3250                    struct GNUNET_TIME_Absolute exp,
3251                    const GNUNET_HashCode * key,
3252                    const struct GNUNET_PeerIdentity * const *get_path,
3253                    const struct GNUNET_PeerIdentity * const *put_path,
3254                    enum GNUNET_BLOCK_Type type,
3255                    size_t size,
3256                    const void *data)
3257 {
3258   struct PendingRequest *pr = cls;
3259   struct ProcessReplyClosure prq;
3260
3261   memset (&prq, 0, sizeof (prq));
3262   prq.data = data;
3263   prq.expiration = exp;
3264   prq.size = size;  
3265   prq.type = type;
3266   process_reply (&prq, key, pr);
3267 }
3268
3269
3270
3271 /**
3272  * Continuation called to notify client about result of the
3273  * operation.
3274  *
3275  * @param cls closure
3276  * @param success GNUNET_SYSERR on failure
3277  * @param msg NULL on success, otherwise an error message
3278  */
3279 static void 
3280 put_migration_continuation (void *cls,
3281                             int success,
3282                             const char *msg)
3283 {
3284   struct GNUNET_TIME_Absolute *start = cls;
3285   struct GNUNET_TIME_Relative delay;
3286   
3287   delay = GNUNET_TIME_absolute_get_duration (*start);
3288   GNUNET_free (start);
3289   GNUNET_LOAD_update (datastore_put_load,
3290                       delay.value);
3291   if (GNUNET_OK == success)
3292     return;
3293   GNUNET_STATISTICS_update (stats,
3294                             gettext_noop ("# datastore 'put' failures"),
3295                             1,
3296                             GNUNET_NO);
3297 }
3298
3299
3300 /**
3301  * Handle P2P "PUT" message.
3302  *
3303  * @param cls closure, always NULL
3304  * @param other the other peer involved (sender or receiver, NULL
3305  *        for loopback messages where we are both sender and receiver)
3306  * @param message the actual message
3307  * @param latency reported latency of the connection with 'other'
3308  * @param distance reported distance (DV) to 'other' 
3309  * @return GNUNET_OK to keep the connection open,
3310  *         GNUNET_SYSERR to close it (signal serious error)
3311  */
3312 static int
3313 handle_p2p_put (void *cls,
3314                 const struct GNUNET_PeerIdentity *other,
3315                 const struct GNUNET_MessageHeader *message,
3316                 struct GNUNET_TIME_Relative latency,
3317                 uint32_t distance)
3318 {
3319   const struct PutMessage *put;
3320   uint16_t msize;
3321   size_t dsize;
3322   enum GNUNET_BLOCK_Type type;
3323   struct GNUNET_TIME_Absolute expiration;
3324   GNUNET_HashCode query;
3325   struct ProcessReplyClosure prq;
3326   struct GNUNET_TIME_Absolute *start;
3327   struct GNUNET_TIME_Relative block_time;  
3328   double putl;
3329   struct ConnectedPeer *cp; 
3330   struct PendingMessage *pm;
3331   struct MigrationStopMessage *msm;
3332
3333   msize = ntohs (message->size);
3334   if (msize < sizeof (struct PutMessage))
3335     {
3336       GNUNET_break_op(0);
3337       return GNUNET_SYSERR;
3338     }
3339   put = (const struct PutMessage*) message;
3340   dsize = msize - sizeof (struct PutMessage);
3341   type = ntohl (put->type);
3342   expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
3343
3344   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3345     return GNUNET_SYSERR;
3346   if (GNUNET_OK !=
3347       GNUNET_BLOCK_get_key (block_ctx,
3348                             type,
3349                             &put[1],
3350                             dsize,
3351                             &query))
3352     {
3353       GNUNET_break_op (0);
3354       return GNUNET_SYSERR;
3355     }
3356 #if DEBUG_FS
3357   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3358               "Received result for query `%s' from peer `%4s'\n",
3359               GNUNET_h2s (&query),
3360               GNUNET_i2s (other));
3361 #endif
3362   GNUNET_STATISTICS_update (stats,
3363                             gettext_noop ("# replies received (overall)"),
3364                             1,
3365                             GNUNET_NO);
3366   /* now, lookup 'query' */
3367   prq.data = (const void*) &put[1];
3368   if (other != NULL)
3369     prq.sender = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3370                                                     &other->hashPubKey);
3371   else
3372     prq.sender = NULL;
3373   prq.size = dsize;
3374   prq.type = type;
3375   prq.expiration = expiration;
3376   prq.priority = 0;
3377   prq.finished = GNUNET_NO;
3378   prq.request_found = GNUNET_NO;
3379   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3380                                               &query,
3381                                               &process_reply,
3382                                               &prq);
3383   if (prq.sender != NULL)
3384     {
3385       prq.sender->inc_preference += CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority;
3386       prq.sender->trust += prq.priority;
3387     }
3388   if ( (GNUNET_YES == active_migration) &&
3389        (GNUNET_NO == test_put_load_too_high (prq.priority)) )
3390     {      
3391 #if DEBUG_FS
3392       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3393                   "Replicating result for query `%s' with priority %u\n",
3394                   GNUNET_h2s (&query),
3395                   prq.priority);
3396 #endif
3397       start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
3398       *start = GNUNET_TIME_absolute_get ();
3399       GNUNET_DATASTORE_put (dsh,
3400                             0, &query, dsize, &put[1],
3401                             type, prq.priority, 1 /* anonymity */, 
3402                             expiration, 
3403                             1 + prq.priority, MAX_DATASTORE_QUEUE,
3404                             GNUNET_CONSTANTS_SERVICE_TIMEOUT,
3405                             &put_migration_continuation, 
3406                             start);
3407     }
3408   putl = GNUNET_LOAD_get_load (datastore_put_load);
3409   if ( (GNUNET_NO == prq.request_found) &&
3410        ( (GNUNET_YES != active_migration) ||
3411          (putl > 2.0) ) )
3412     {
3413       cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3414                                               &other->hashPubKey);
3415       if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).value < 5000)
3416         return GNUNET_OK; /* already blocked */
3417       /* We're too busy; send MigrationStop message! */
3418       if (GNUNET_YES != active_migration) 
3419         putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
3420       block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
3421                                                   5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3422                                                                                    (unsigned int) (60000 * putl * putl)));
3423       
3424       cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
3425       pm = GNUNET_malloc (sizeof (struct PendingMessage) + 
3426                           sizeof (struct MigrationStopMessage));
3427       pm->msize = sizeof (struct MigrationStopMessage);
3428       pm->priority = UINT32_MAX;
3429       msm = (struct MigrationStopMessage*) &pm[1];
3430       msm->header.size = htons (sizeof (struct MigrationStopMessage));
3431       msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
3432       msm->duration = GNUNET_TIME_relative_hton (block_time);
3433       add_to_pending_messages_for_peer (cp,
3434                                         pm,
3435                                         NULL);
3436     }
3437   return GNUNET_OK;
3438 }
3439
3440
3441 /**
3442  * Handle P2P "MIGRATION_STOP" message.
3443  *
3444  * @param cls closure, always NULL
3445  * @param other the other peer involved (sender or receiver, NULL
3446  *        for loopback messages where we are both sender and receiver)
3447  * @param message the actual message
3448  * @param latency reported latency of the connection with 'other'
3449  * @param distance reported distance (DV) to 'other' 
3450  * @return GNUNET_OK to keep the connection open,
3451  *         GNUNET_SYSERR to close it (signal serious error)
3452  */
3453 static int
3454 handle_p2p_migration_stop (void *cls,
3455                            const struct GNUNET_PeerIdentity *other,
3456                            const struct GNUNET_MessageHeader *message,
3457                            struct GNUNET_TIME_Relative latency,
3458                            uint32_t distance)
3459 {
3460   struct ConnectedPeer *cp; 
3461   const struct MigrationStopMessage *msm;
3462
3463   msm = (const struct MigrationStopMessage*) message;
3464   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3465                                           &other->hashPubKey);
3466   if (cp == NULL)
3467     {
3468       GNUNET_break (0);
3469       return GNUNET_OK;
3470     }
3471   cp->migration_blocked = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration));
3472   return GNUNET_OK;
3473 }
3474
3475
3476
3477 /* **************************** P2P GET Handling ************************ */
3478
3479
3480 /**
3481  * Closure for 'check_duplicate_request_{peer,client}'.
3482  */
3483 struct CheckDuplicateRequestClosure
3484 {
3485   /**
3486    * The new request we should check if it already exists.
3487    */
3488   const struct PendingRequest *pr;
3489
3490   /**
3491    * Existing request found by the checker, NULL if none.
3492    */
3493   struct PendingRequest *have;
3494 };
3495
3496
3497 /**
3498  * Iterator over entries in the 'query_request_map' that
3499  * tries to see if we have the same request pending from
3500  * the same client already.
3501  *
3502  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
3503  * @param key current key code (query, ignored, must match)
3504  * @param value value in the hash map (a 'struct PendingRequest' 
3505  *              that already exists)
3506  * @return GNUNET_YES if we should continue to
3507  *         iterate (no match yet)
3508  *         GNUNET_NO if not (match found).
3509  */
3510 static int
3511 check_duplicate_request_client (void *cls,
3512                                 const GNUNET_HashCode * key,
3513                                 void *value)
3514 {
3515   struct CheckDuplicateRequestClosure *cdc = cls;
3516   struct PendingRequest *have = value;
3517
3518   if (have->client_request_list == NULL)
3519     return GNUNET_YES;
3520   if ( (cdc->pr->client_request_list->client_list->client == have->client_request_list->client_list->client) &&
3521        (cdc->pr != have) )
3522     {
3523       cdc->have = have;
3524       return GNUNET_NO;
3525     }
3526   return GNUNET_YES;
3527 }
3528
3529
3530 /**
3531  * We're processing (local) results for a search request
3532  * from another peer.  Pass applicable results to the
3533  * peer and if we are done either clean up (operation
3534  * complete) or forward to other peers (more results possible).
3535  *
3536  * @param cls our closure (struct LocalGetContext)
3537  * @param key key for the content
3538  * @param size number of bytes in data
3539  * @param data content stored
3540  * @param type type of the content
3541  * @param priority priority of the content
3542  * @param anonymity anonymity-level for the content
3543  * @param expiration expiration time for the content
3544  * @param uid unique identifier for the datum;
3545  *        maybe 0 if no unique identifier is available
3546  */
3547 static void
3548 process_local_reply (void *cls,
3549                      const GNUNET_HashCode * key,
3550                      size_t size,
3551                      const void *data,
3552                      enum GNUNET_BLOCK_Type type,
3553                      uint32_t priority,
3554                      uint32_t anonymity,
3555                      struct GNUNET_TIME_Absolute
3556                      expiration, 
3557                      uint64_t uid)
3558 {
3559   struct PendingRequest *pr = cls;
3560   struct ProcessReplyClosure prq;
3561   struct CheckDuplicateRequestClosure cdrc;
3562   GNUNET_HashCode query;
3563   unsigned int old_rf;
3564   
3565   if (NULL == key)
3566     {
3567 #if DEBUG_FS > 1
3568       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3569                   "Done processing local replies, forwarding request to other peers.\n");
3570 #endif
3571       pr->qe = NULL;
3572       if (pr->client_request_list != NULL)
3573         {
3574           GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
3575                                       GNUNET_YES);
3576           /* Figure out if this is a duplicate request and possibly
3577              merge 'struct PendingRequest' entries */
3578           cdrc.have = NULL;
3579           cdrc.pr = pr;
3580           GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3581                                                       &pr->query,
3582                                                       &check_duplicate_request_client,
3583                                                       &cdrc);
3584           if (cdrc.have != NULL)
3585             {
3586 #if DEBUG_FS
3587               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3588                           "Received request for block `%s' twice from client, will only request once.\n",
3589                           GNUNET_h2s (&pr->query));
3590 #endif
3591               
3592               destroy_pending_request (pr);
3593               return;
3594             }
3595         }
3596
3597       /* no more results */
3598       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
3599         pr->task = GNUNET_SCHEDULER_add_now (sched,
3600                                              &forward_request_task,
3601                                              pr);      
3602       return;
3603     }
3604 #if DEBUG_FS
3605   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3606               "New local response to `%s' of type %u.\n",
3607               GNUNET_h2s (key),
3608               type);
3609 #endif
3610   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3611     {
3612 #if DEBUG_FS
3613       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3614                   "Found ONDEMAND block, performing on-demand encoding\n");
3615 #endif
3616       GNUNET_STATISTICS_update (stats,
3617                                 gettext_noop ("# on-demand blocks matched requests"),
3618                                 1,
3619                                 GNUNET_NO);
3620       if (GNUNET_OK != 
3621           GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, 
3622                                             anonymity, expiration, uid, 
3623                                             &process_local_reply,
3624                                             pr))
3625       if (pr->qe != NULL)
3626         {
3627           GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3628         }
3629       return;
3630     }
3631   old_rf = pr->results_found;
3632   memset (&prq, 0, sizeof (prq));
3633   prq.data = data;
3634   prq.expiration = expiration;
3635   prq.size = size;  
3636   if (GNUNET_OK != 
3637       GNUNET_BLOCK_get_key (block_ctx,
3638                             type,
3639                             data,
3640                             size,
3641                             &query))
3642     {
3643       GNUNET_break (0);
3644       GNUNET_DATASTORE_remove (dsh,
3645                                key,
3646                                size, data,
3647                                -1, -1, 
3648                                GNUNET_TIME_UNIT_FOREVER_REL,
3649                                NULL, NULL);
3650       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3651       return;
3652     }
3653   prq.type = type;
3654   prq.priority = priority;  
3655   prq.finished = GNUNET_NO;
3656   prq.request_found = GNUNET_NO;
3657   if ( (old_rf == 0) &&
3658        (pr->results_found == 0) )
3659     update_datastore_delays (pr->start_time);
3660   process_reply (&prq, key, pr);
3661   if (prq.finished == GNUNET_YES)
3662     return;
3663   if (pr->qe == NULL)
3664     return; /* done here */
3665   if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
3666     {
3667       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3668       return;
3669     }
3670   if ( (pr->client_request_list == NULL) &&
3671        ( (GNUNET_YES == test_get_load_too_high (0)) ||
3672          (pr->results_found > 5 + 2 * pr->priority) ) )
3673     {
3674 #if DEBUG_FS > 2
3675       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3676                   "Load too high, done with request\n");
3677 #endif
3678       GNUNET_STATISTICS_update (stats,
3679                                 gettext_noop ("# processing result set cut short due to load"),
3680                                 1,
3681                                 GNUNET_NO);
3682       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3683       return;
3684     }
3685   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3686 }
3687
3688
3689 /**
3690  * We've received a request with the specified priority.  Bound it
3691  * according to how much we trust the given peer.
3692  * 
3693  * @param prio_in requested priority
3694  * @param cp the peer making the request
3695  * @return effective priority
3696  */
3697 static int32_t
3698 bound_priority (uint32_t prio_in,
3699                 struct ConnectedPeer *cp)
3700 {
3701 #define N ((double)128.0)
3702   uint32_t ret;
3703   double rret;
3704   int ld;
3705
3706   ld = test_get_load_too_high (0);
3707   if (ld == GNUNET_SYSERR)
3708     return 0; /* excess resources */
3709   ret = change_host_trust (cp, prio_in);
3710   if (ret > 0)
3711     {
3712       if (ret > current_priorities + N)
3713         rret = current_priorities + N;
3714       else
3715         rret = ret;
3716       current_priorities 
3717         = (current_priorities * (N-1) + rret)/N;
3718     }
3719   if ( (ld == GNUNET_YES) && (ret > 0) )
3720     {
3721       /* try with charging */
3722       ld = test_get_load_too_high (ret);
3723     }
3724   if (ld == GNUNET_YES)
3725     {
3726       /* undo charge */
3727       if (ret != 0)
3728         change_host_trust (cp, -ret);
3729       return -1; /* not enough resources */
3730     }
3731 #undef N
3732   return ret;
3733 }
3734
3735
3736 /**
3737  * Iterator over entries in the 'query_request_map' that
3738  * tries to see if we have the same request pending from
3739  * the same peer already.
3740  *
3741  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
3742  * @param key current key code (query, ignored, must match)
3743  * @param value value in the hash map (a 'struct PendingRequest' 
3744  *              that already exists)
3745  * @return GNUNET_YES if we should continue to
3746  *         iterate (no match yet)
3747  *         GNUNET_NO if not (match found).
3748  */
3749 static int
3750 check_duplicate_request_peer (void *cls,
3751                               const GNUNET_HashCode * key,
3752                               void *value)
3753 {
3754   struct CheckDuplicateRequestClosure *cdc = cls;
3755   struct PendingRequest *have = value;
3756
3757   if (cdc->pr->target_pid == have->target_pid)
3758     {
3759       cdc->have = have;
3760       return GNUNET_NO;
3761     }
3762   return GNUNET_YES;
3763 }
3764
3765
3766 /**
3767  * Handle P2P "GET" request.
3768  *
3769  * @param cls closure, always NULL
3770  * @param other the other peer involved (sender or receiver, NULL
3771  *        for loopback messages where we are both sender and receiver)
3772  * @param message the actual message
3773  * @param latency reported latency of the connection with 'other'
3774  * @param distance reported distance (DV) to 'other' 
3775  * @return GNUNET_OK to keep the connection open,
3776  *         GNUNET_SYSERR to close it (signal serious error)
3777  */
3778 static int
3779 handle_p2p_get (void *cls,
3780                 const struct GNUNET_PeerIdentity *other,
3781                 const struct GNUNET_MessageHeader *message,
3782                 struct GNUNET_TIME_Relative latency,
3783                 uint32_t distance)
3784 {
3785   struct PendingRequest *pr;
3786   struct ConnectedPeer *cp;
3787   struct ConnectedPeer *cps;
3788   struct CheckDuplicateRequestClosure cdc;
3789   struct GNUNET_TIME_Relative timeout;
3790   uint16_t msize;
3791   const struct GetMessage *gm;
3792   unsigned int bits;
3793   const GNUNET_HashCode *opt;
3794   uint32_t bm;
3795   size_t bfsize;
3796   uint32_t ttl_decrement;
3797   int32_t priority;
3798   enum GNUNET_BLOCK_Type type;
3799   int have_ns;
3800
3801   msize = ntohs(message->size);
3802   if (msize < sizeof (struct GetMessage))
3803     {
3804       GNUNET_break_op (0);
3805       return GNUNET_SYSERR;
3806     }
3807   gm = (const struct GetMessage*) message;
3808   type = ntohl (gm->type);
3809   bm = ntohl (gm->hash_bitmap);
3810   bits = 0;
3811   while (bm > 0)
3812     {
3813       if (1 == (bm & 1))
3814         bits++;
3815       bm >>= 1;
3816     }
3817   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
3818     {
3819       GNUNET_break_op (0);
3820       return GNUNET_SYSERR;
3821     }  
3822   opt = (const GNUNET_HashCode*) &gm[1];
3823   bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
3824   bm = ntohl (gm->hash_bitmap);
3825   bits = 0;
3826   cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3827                                            &other->hashPubKey);
3828   if (NULL == cps)
3829     {
3830       /* peer must have just disconnected */
3831       GNUNET_STATISTICS_update (stats,
3832                                 gettext_noop ("# requests dropped due to initiator not being connected"),
3833                                 1,
3834                                 GNUNET_NO);
3835       return GNUNET_SYSERR;
3836     }
3837   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
3838     cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3839                                             &opt[bits++]);
3840   else
3841     cp = cps;
3842   if (cp == NULL)
3843     {
3844 #if DEBUG_FS
3845       if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
3846         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3847                     "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
3848                     GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
3849       
3850       else
3851         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3852                     "Failed to find peer `%4s' in connection set. Dropping query.\n",
3853                     GNUNET_i2s (other));
3854 #endif
3855       GNUNET_STATISTICS_update (stats,
3856                                 gettext_noop ("# requests dropped due to missing reverse route"),
3857                                 1,
3858                                 GNUNET_NO);
3859      /* FIXME: try connect? */
3860       return GNUNET_OK;
3861     }
3862   /* note that we can really only check load here since otherwise
3863      peers could find out that we are overloaded by not being
3864      disconnected after sending us a malformed query... */
3865   priority = bound_priority (ntohl (gm->priority), cps);
3866   if (priority < 0)
3867     {
3868 #if DEBUG_FS
3869       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3870                   "Dropping query from `%s', this peer is too busy.\n",
3871                   GNUNET_i2s (other));
3872 #endif
3873       GNUNET_STATISTICS_update (stats,
3874                                 gettext_noop ("# requests dropped due to high load"),
3875                                 1,
3876                                 GNUNET_NO);
3877       return GNUNET_OK;
3878     }
3879 #if DEBUG_FS 
3880   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3881               "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
3882               GNUNET_h2s (&gm->query),
3883               (unsigned int) type,
3884               GNUNET_i2s (other),
3885               (unsigned int) bm);
3886 #endif
3887   have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
3888   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
3889                       (have_ns ? sizeof(GNUNET_HashCode) : 0));
3890   if (have_ns)
3891     {
3892       pr->namespace = (GNUNET_HashCode*) &pr[1];
3893       memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
3894     }
3895   if ( (GNUNET_LOAD_get_load (cp->transmission_delay) > 3) ||
3896        (GNUNET_LOAD_get_average (cp->transmission_delay) > 
3897         GNUNET_CONSTANTS_MAX_CORK_DELAY.value * 2 + GNUNET_LOAD_get_average (rt_entry_lifetime)) )
3898     {
3899       /* don't have BW to send to peer, or would likely take longer than we have for it,
3900          so at best indirect the query */
3901       priority = 0;
3902       pr->forward_only = GNUNET_YES;
3903     }
3904   pr->type = type;
3905   pr->mingle = ntohl (gm->filter_mutator);
3906   if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
3907     pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
3908   pr->anonymity_level = 1;
3909   pr->priority = (uint32_t) priority;
3910   pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
3911   pr->query = gm->query;
3912   /* decrement ttl (always) */
3913   ttl_decrement = 2 * TTL_DECREMENT +
3914     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3915                               TTL_DECREMENT);
3916   if ( (pr->ttl < 0) &&
3917        (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
3918     {
3919 #if DEBUG_FS
3920       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3921                   "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
3922                   GNUNET_i2s (other),
3923                   pr->ttl,
3924                   ttl_decrement);
3925 #endif
3926       GNUNET_STATISTICS_update (stats,
3927                                 gettext_noop ("# requests dropped due TTL underflow"),
3928                                 1,
3929                                 GNUNET_NO);
3930       /* integer underflow => drop (should be very rare)! */      
3931       GNUNET_free (pr);
3932       return GNUNET_OK;
3933     } 
3934   pr->ttl -= ttl_decrement;
3935   pr->start_time = GNUNET_TIME_absolute_get ();
3936
3937   /* get bloom filter */
3938   if (bfsize > 0)
3939     {
3940       pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
3941                                                   bfsize,
3942                                                   BLOOMFILTER_K);
3943       pr->bf_size = bfsize;
3944     }
3945
3946   cdc.have = NULL;
3947   cdc.pr = pr;
3948   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3949                                               &gm->query,
3950                                               &check_duplicate_request_peer,
3951                                               &cdc);
3952   if (cdc.have != NULL)
3953     {
3954       if (cdc.have->start_time.value + cdc.have->ttl >=
3955           pr->start_time.value + pr->ttl)
3956         {
3957           /* existing request has higher TTL, drop new one! */
3958           cdc.have->priority += pr->priority;
3959           destroy_pending_request (pr);
3960 #if DEBUG_FS
3961           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3962                       "Have existing request with higher TTL, dropping new request.\n",
3963                       GNUNET_i2s (other));
3964 #endif
3965           GNUNET_STATISTICS_update (stats,
3966                                     gettext_noop ("# requests dropped due to higher-TTL request"),
3967                                     1,
3968                                     GNUNET_NO);
3969           return GNUNET_OK;
3970         }
3971       else
3972         {
3973           /* existing request has lower TTL, drop old one! */
3974           pr->priority += cdc.have->priority;
3975           /* Possible optimization: if we have applicable pending
3976              replies in 'cdc.have', we might want to move those over
3977              (this is a really rare special-case, so it is not clear
3978              that this would be worth it) */
3979           destroy_pending_request (cdc.have);
3980           /* keep processing 'pr'! */
3981         }
3982     }
3983
3984   pr->cp = cp;
3985   GNUNET_break (GNUNET_OK ==
3986                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
3987                                                    &gm->query,
3988                                                    pr,
3989                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
3990   GNUNET_break (GNUNET_OK ==
3991                 GNUNET_CONTAINER_multihashmap_put (peer_request_map,
3992                                                    &other->hashPubKey,
3993                                                    pr,
3994                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
3995   
3996   pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
3997                                             pr,
3998                                             pr->start_time.value + pr->ttl);
3999
4000   GNUNET_STATISTICS_update (stats,
4001                             gettext_noop ("# P2P searches received"),
4002                             1,
4003                             GNUNET_NO);
4004   GNUNET_STATISTICS_update (stats,
4005                             gettext_noop ("# P2P searches active"),
4006                             1,
4007                             GNUNET_NO);
4008
4009   /* calculate change in traffic preference */
4010   cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
4011   /* process locally */
4012   if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
4013     type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
4014   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
4015                                            (pr->priority + 1)); 
4016   if (GNUNET_YES != pr->forward_only)
4017     pr->qe = GNUNET_DATASTORE_get (dsh,
4018                                    &gm->query,
4019                                    type,                               
4020                                    pr->priority + 1,
4021                                    MAX_DATASTORE_QUEUE,                          
4022                                    timeout,
4023                                    &process_local_reply,
4024                                    pr);
4025   else
4026     GNUNET_STATISTICS_update (stats,
4027                               gettext_noop ("# requests forwarded due to high load"),
4028                               1,
4029                               GNUNET_NO);
4030
4031   /* Are multiple results possible (and did we look locally)?  If so, start processing remotely now! */
4032   switch (pr->type)
4033     {
4034     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
4035     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
4036       /* only one result, wait for datastore */
4037       if (GNUNET_YES != pr->forward_only)
4038         break;
4039     default:
4040       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
4041         pr->task = GNUNET_SCHEDULER_add_now (sched,
4042                                              &forward_request_task,
4043                                              pr);
4044     }
4045
4046   /* make sure we don't track too many requests */
4047   if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
4048     {
4049       pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
4050       GNUNET_assert (pr != NULL);
4051       destroy_pending_request (pr);
4052     }
4053   return GNUNET_OK;
4054 }
4055
4056
4057 /* **************************** CS GET Handling ************************ */
4058
4059
4060 /**
4061  * Handle START_SEARCH-message (search request from client).
4062  *
4063  * @param cls closure
4064  * @param client identification of the client
4065  * @param message the actual message
4066  */
4067 static void
4068 handle_start_search (void *cls,
4069                      struct GNUNET_SERVER_Client *client,
4070                      const struct GNUNET_MessageHeader *message)
4071 {
4072   static GNUNET_HashCode all_zeros;
4073   const struct SearchMessage *sm;
4074   struct ClientList *cl;
4075   struct ClientRequestList *crl;
4076   struct PendingRequest *pr;
4077   uint16_t msize;
4078   unsigned int sc;
4079   enum GNUNET_BLOCK_Type type;
4080
4081   msize = ntohs (message->size);
4082   if ( (msize < sizeof (struct SearchMessage)) ||
4083        (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
4084     {
4085       GNUNET_break (0);
4086       GNUNET_SERVER_receive_done (client,
4087                                   GNUNET_SYSERR);
4088       return;
4089     }
4090   GNUNET_STATISTICS_update (stats,
4091                             gettext_noop ("# client searches received"),
4092                             1,
4093                             GNUNET_NO);
4094   sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
4095   sm = (const struct SearchMessage*) message;
4096   type = ntohl (sm->type);
4097 #if DEBUG_FS
4098   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4099               "Received request for `%s' of type %u from local client\n",
4100               GNUNET_h2s (&sm->query),
4101               (unsigned int) type);
4102 #endif
4103   cl = client_list;
4104   while ( (cl != NULL) &&
4105           (cl->client != client) )
4106     cl = cl->next;
4107   if (cl == NULL)
4108     {
4109       cl = GNUNET_malloc (sizeof (struct ClientList));
4110       cl->client = client;
4111       GNUNET_SERVER_client_keep (client);
4112       cl->next = client_list;
4113       client_list = cl;
4114     }
4115   /* detect duplicate KBLOCK requests */
4116   if ( (type == GNUNET_BLOCK_TYPE_FS_KBLOCK) ||
4117        (type == GNUNET_BLOCK_TYPE_FS_NBLOCK) ||
4118        (type == GNUNET_BLOCK_TYPE_ANY) )
4119     {
4120       crl = cl->rl_head;
4121       while ( (crl != NULL) &&
4122               ( (0 != memcmp (&crl->req->query,
4123                               &sm->query,
4124                               sizeof (GNUNET_HashCode))) ||
4125                 (crl->req->type != type) ) )
4126         crl = crl->next;
4127       if (crl != NULL)  
4128         { 
4129 #if DEBUG_FS
4130           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4131                       "Have existing request, merging content-seen lists.\n");
4132 #endif
4133           pr = crl->req;
4134           /* Duplicate request (used to send long list of
4135              known/blocked results); merge 'pr->replies_seen'
4136              and update bloom filter */
4137           GNUNET_array_grow (pr->replies_seen,
4138                              pr->replies_seen_size,
4139                              pr->replies_seen_off + sc);
4140           memcpy (&pr->replies_seen[pr->replies_seen_off],
4141                   &sm[1],
4142                   sc * sizeof (GNUNET_HashCode));
4143           pr->replies_seen_off += sc;
4144           refresh_bloomfilter (pr);
4145           GNUNET_STATISTICS_update (stats,
4146                                     gettext_noop ("# client searches updated (merged content seen list)"),
4147                                     1,
4148                                     GNUNET_NO);
4149           GNUNET_SERVER_receive_done (client,
4150                                       GNUNET_OK);
4151           return;
4152         }
4153     }
4154   GNUNET_STATISTICS_update (stats,
4155                             gettext_noop ("# client searches active"),
4156                             1,
4157                             GNUNET_NO);
4158   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
4159                       ((type == GNUNET_BLOCK_TYPE_FS_SBLOCK) ? sizeof(GNUNET_HashCode) : 0));
4160   crl = GNUNET_malloc (sizeof (struct ClientRequestList));
4161   memset (crl, 0, sizeof (struct ClientRequestList));
4162   crl->client_list = cl;
4163   GNUNET_CONTAINER_DLL_insert (cl->rl_head,
4164                                cl->rl_tail,
4165                                crl);  
4166   crl->req = pr;
4167   pr->type = type;
4168   pr->client_request_list = crl;
4169   GNUNET_array_grow (pr->replies_seen,
4170                      pr->replies_seen_size,
4171                      sc);
4172   memcpy (pr->replies_seen,
4173           &sm[1],
4174           sc * sizeof (GNUNET_HashCode));
4175   pr->replies_seen_off = sc;
4176   pr->anonymity_level = ntohl (sm->anonymity_level); 
4177   pr->start_time = GNUNET_TIME_absolute_get ();
4178   refresh_bloomfilter (pr);
4179   pr->query = sm->query;
4180   if (0 == (1 & ntohl (sm->options)))
4181     pr->local_only = GNUNET_NO;
4182   else
4183     pr->local_only = GNUNET_YES;
4184   switch (type)
4185     {
4186     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
4187     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
4188       if (0 != memcmp (&sm->target,
4189                        &all_zeros,
4190                        sizeof (GNUNET_HashCode)))
4191         pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target);
4192       break;
4193     case GNUNET_BLOCK_TYPE_FS_SBLOCK:
4194       pr->namespace = (GNUNET_HashCode*) &pr[1];
4195       memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode));
4196       break;
4197     default:
4198       break;
4199     }
4200   GNUNET_break (GNUNET_OK ==
4201                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
4202                                                    &sm->query,
4203                                                    pr,
4204                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4205   if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
4206     type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */
4207   pr->qe = GNUNET_DATASTORE_get (dsh,
4208                                  &sm->query,
4209                                  type,
4210                                  -3, -1,
4211                                  GNUNET_CONSTANTS_SERVICE_TIMEOUT,                             
4212                                  &process_local_reply,
4213                                  pr);
4214 }
4215
4216
4217 /* **************************** Startup ************************ */
4218
4219 /**
4220  * Process fs requests.
4221  *
4222  * @param s scheduler to use
4223  * @param server the initialized server
4224  * @param c configuration to use
4225  */
4226 static int
4227 main_init (struct GNUNET_SCHEDULER_Handle *s,
4228            struct GNUNET_SERVER_Handle *server,
4229            const struct GNUNET_CONFIGURATION_Handle *c)
4230 {
4231   static const struct GNUNET_CORE_MessageHandler p2p_handlers[] =
4232     {
4233       { &handle_p2p_get, 
4234         GNUNET_MESSAGE_TYPE_FS_GET, 0 },
4235       { &handle_p2p_put, 
4236         GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
4237       { &handle_p2p_migration_stop, 
4238         GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP,
4239         sizeof (struct MigrationStopMessage) },
4240       { NULL, 0, 0 }
4241     };
4242   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
4243     {&GNUNET_FS_handle_index_start, NULL, 
4244      GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
4245     {&GNUNET_FS_handle_index_list_get, NULL, 
4246      GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
4247     {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
4248      sizeof (struct UnindexMessage) },
4249     {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
4250      0 },
4251     {NULL, NULL, 0, 0}
4252   };
4253   unsigned long long enc = 128;
4254
4255   sched = s;
4256   cfg = c;
4257   stats = GNUNET_STATISTICS_create (sched, "fs", cfg);
4258   min_migration_delay = GNUNET_TIME_UNIT_SECONDS;
4259   if ( (GNUNET_OK !=
4260         GNUNET_CONFIGURATION_get_value_number (cfg,
4261                                                "fs",
4262                                                "MAX_PENDING_REQUESTS",
4263                                                &max_pending_requests)) ||
4264        (GNUNET_OK !=
4265         GNUNET_CONFIGURATION_get_value_number (cfg,
4266                                                "fs",
4267                                                "EXPECTED_NEIGHBOUR_COUNT",
4268                                                &enc)) ||
4269        (GNUNET_OK != 
4270         GNUNET_CONFIGURATION_get_value_time (cfg,
4271                                              "fs",
4272                                              "MIN_MIGRATION_DELAY",
4273                                              &min_migration_delay)) )
4274     {
4275       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4276                   _("Configuration fails to specify certain parameters, assuming default values."));
4277     }
4278   connected_peers = GNUNET_CONTAINER_multihashmap_create (enc); 
4279   query_request_map = GNUNET_CONTAINER_multihashmap_create (max_pending_requests);
4280   rt_entry_lifetime = GNUNET_LOAD_value_init ();
4281   peer_request_map = GNUNET_CONTAINER_multihashmap_create (enc);
4282   requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
4283   core = GNUNET_CORE_connect (sched,
4284                               cfg,
4285                               GNUNET_TIME_UNIT_FOREVER_REL,
4286                               NULL,
4287                               NULL,
4288                               &peer_connect_handler,
4289                               &peer_disconnect_handler,
4290                               NULL,
4291                               NULL, GNUNET_NO,
4292                               NULL, GNUNET_NO,
4293                               p2p_handlers);
4294   if (NULL == core)
4295     {
4296       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
4297                   _("Failed to connect to `%s' service.\n"),
4298                   "core");
4299       GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
4300       connected_peers = NULL;
4301       GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
4302       query_request_map = NULL;
4303       GNUNET_LOAD_value_free (rt_entry_lifetime);
4304       rt_entry_lifetime = NULL;
4305       GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
4306       requests_by_expiration_heap = NULL;
4307       GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
4308       peer_request_map = NULL;
4309       if (dsh != NULL)
4310         {
4311           GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4312           dsh = NULL;
4313         }
4314       return GNUNET_SYSERR;
4315     }
4316   /* FIXME: distinguish between sending and storing in options? */
4317   if (active_migration) 
4318     {
4319       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4320                   _("Content migration is enabled, will start to gather data\n"));
4321       consider_migration_gathering ();
4322     }
4323   consider_dht_put_gathering (NULL);
4324   GNUNET_SERVER_disconnect_notify (server, 
4325                                    &handle_client_disconnect,
4326                                    NULL);
4327   GNUNET_assert (GNUNET_OK ==
4328                  GNUNET_CONFIGURATION_get_value_filename (cfg,
4329                                                           "fs",
4330                                                           "TRUST",
4331                                                           &trustDirectory));
4332   GNUNET_DISK_directory_create (trustDirectory);
4333   GNUNET_SCHEDULER_add_with_priority (sched,
4334                                       GNUNET_SCHEDULER_PRIORITY_HIGH,
4335                                       &cron_flush_trust, NULL);
4336
4337
4338   GNUNET_SERVER_add_handlers (server, handlers);
4339   GNUNET_SCHEDULER_add_delayed (sched,
4340                                 GNUNET_TIME_UNIT_FOREVER_REL,
4341                                 &shutdown_task,
4342                                 NULL);
4343   return GNUNET_OK;
4344 }
4345
4346
4347 /**
4348  * Process fs requests.
4349  *
4350  * @param cls closure
4351  * @param sched scheduler to use
4352  * @param server the initialized server
4353  * @param cfg configuration to use
4354  */
4355 static void
4356 run (void *cls,
4357      struct GNUNET_SCHEDULER_Handle *sched,
4358      struct GNUNET_SERVER_Handle *server,
4359      const struct GNUNET_CONFIGURATION_Handle *cfg)
4360 {
4361   active_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
4362                                                            "FS",
4363                                                            "ACTIVEMIGRATION");
4364   dsh = GNUNET_DATASTORE_connect (cfg,
4365                                   sched);
4366   if (dsh == NULL)
4367     {
4368       GNUNET_SCHEDULER_shutdown (sched);
4369       return;
4370     }
4371   datastore_get_load = GNUNET_LOAD_value_init ();
4372   datastore_put_load = GNUNET_LOAD_value_init ();
4373   block_cfg = GNUNET_CONFIGURATION_create ();
4374   GNUNET_CONFIGURATION_set_value_string (block_cfg,
4375                                          "block",
4376                                          "PLUGINS",
4377                                          "fs");
4378   block_ctx = GNUNET_BLOCK_context_create (block_cfg);
4379   GNUNET_assert (NULL != block_ctx);
4380   dht_handle = GNUNET_DHT_connect (sched,
4381                                    cfg,
4382                                    FS_DHT_HT_SIZE);
4383   if ( (GNUNET_OK != GNUNET_FS_indexing_init (sched, cfg, dsh)) ||
4384        (GNUNET_OK != main_init (sched, server, cfg)) )
4385     {    
4386       GNUNET_SCHEDULER_shutdown (sched);
4387       GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4388       dsh = NULL;
4389       GNUNET_DHT_disconnect (dht_handle);
4390       dht_handle = NULL;
4391       GNUNET_BLOCK_context_destroy (block_ctx);
4392       block_ctx = NULL;
4393       GNUNET_CONFIGURATION_destroy (block_cfg);
4394       block_cfg = NULL;
4395       GNUNET_LOAD_value_free (datastore_get_load);
4396       datastore_get_load = NULL;
4397       GNUNET_LOAD_value_free (datastore_put_load);
4398       datastore_put_load = NULL;
4399       return;   
4400     }
4401 }
4402
4403
4404 /**
4405  * The main function for the fs service.
4406  *
4407  * @param argc number of arguments from the command line
4408  * @param argv command line arguments
4409  * @return 0 ok, 1 on error
4410  */
4411 int
4412 main (int argc, char *const *argv)
4413 {
4414   return (GNUNET_OK ==
4415           GNUNET_SERVICE_run (argc,
4416                               argv,
4417                               "fs",
4418                               GNUNET_SERVICE_OPTION_NONE,
4419                               &run, NULL)) ? 0 : 1;
4420 }
4421
4422 /* end of gnunet-service-fs.c */