stats
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs.c
23  * @brief gnunet anonymity protocol implementation
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - more statistics
28  */
29 #include "platform.h"
30 #include <float.h>
31 #include "gnunet_constants.h"
32 #include "gnunet_core_service.h"
33 #include "gnunet_dht_service.h"
34 #include "gnunet_datastore_service.h"
35 #include "gnunet_load_lib.h"
36 #include "gnunet_peer_lib.h"
37 #include "gnunet_protocols.h"
38 #include "gnunet_signatures.h"
39 #include "gnunet_statistics_service.h"
40 #include "gnunet_util_lib.h"
41 #include "gnunet-service-fs_indexing.h"
42 #include "fs.h"
43
44 #define DEBUG_FS GNUNET_NO
45
46 /**
47  * Should we introduce random latency in processing?  Required for proper
48  * implementation of GAP, but can be disabled for performance evaluation of
49  * the basic routing algorithm.
50  */
51 #define SUPPORT_DELAYS GNUNET_NO
52
53 /**
54  * Maximum number of outgoing messages we queue per peer.
55  */
56 #define MAX_QUEUE_PER_PEER 16
57
58 /**
59  * Size for the hash map for DHT requests from the FS
60  * service.  Should be about the number of concurrent
61  * DHT requests we plan to make.
62  */
63 #define FS_DHT_HT_SIZE 1024
64
65 /**
66  * How often do we flush trust values to disk?
67  */
68 #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
69
70 /**
71  * How often do we at most PUT content into the DHT?
72  */
73 #define MAX_DHT_PUT_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
74
75 /**
76  * Inverse of the probability that we will submit the same query
77  * to the same peer again.  If the same peer already got the query
78  * repeatedly recently, the probability is multiplied by the inverse
79  * of this number each time.  Note that we only try about every TTL_DECREMENT/2
80  * plus MAX_CORK_DELAY (so roughly every 3.5s).
81  */
82 #define RETRY_PROBABILITY_INV 3
83
84 /**
85  * What is the maximum delay for a P2P FS message (in our interaction
86  * with core)?  FS-internal delays are another story.  The value is
87  * chosen based on the 32k block size.  Given that peers typcially
88  * have at least 1 kb/s bandwidth, 45s waits give us a chance to
89  * transmit one message even to the lowest-bandwidth peers.
90  */
91 #define MAX_TRANSMIT_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 45)
92
93 /**
94  * Maximum number of requests (from other peers) that we're
95  * willing to have pending at any given point in time.
96  */
97 static unsigned long long max_pending_requests = (32 * 1024);
98
99
100 /**
101  * Information we keep for each pending reply.  The
102  * actual message follows at the end of this struct.
103  */
104 struct PendingMessage;
105
106 /**
107  * Function called upon completion of a transmission.
108  *
109  * @param cls closure
110  * @param pid ID of receiving peer, 0 on transmission error
111  */
112 typedef void (*TransmissionContinuation)(void * cls, 
113                                          GNUNET_PEER_Id tpid);
114
115
116 /**
117  * Information we keep for each pending message (GET/PUT).  The
118  * actual message follows at the end of this struct.
119  */
120 struct PendingMessage
121 {
122   /**
123    * This is a doubly-linked list of messages to the same peer.
124    */
125   struct PendingMessage *next;
126
127   /**
128    * This is a doubly-linked list of messages to the same peer.
129    */
130   struct PendingMessage *prev;
131
132   /**
133    * Entry in pending message list for this pending message.
134    */ 
135   struct PendingMessageList *pml;  
136
137   /**
138    * Function to call immediately once we have transmitted this
139    * message.
140    */
141   TransmissionContinuation cont;
142
143   /**
144    * Closure for cont.
145    */
146   void *cont_cls;
147
148   /**
149    * Do not transmit this pending message until this deadline.
150    */
151   struct GNUNET_TIME_Absolute delay_until;
152
153   /**
154    * Size of the reply; actual reply message follows
155    * at the end of this struct.
156    */
157   size_t msize;
158   
159   /**
160    * How important is this message for us?
161    */
162   uint32_t priority;
163  
164 };
165
166
167 /**
168  * Information about a peer that we are connected to.
169  * We track data that is useful for determining which
170  * peers should receive our requests.  We also keep
171  * a list of messages to transmit to this peer.
172  */
173 struct ConnectedPeer
174 {
175
176   /**
177    * List of the last clients for which this peer successfully
178    * answered a query.
179    */
180   struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
181
182   /**
183    * List of the last PIDs for which
184    * this peer successfully answered a query;
185    * We use 0 to indicate no successful reply.
186    */
187   GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
188
189   /**
190    * Average delay between sending the peer a request and
191    * getting a reply (only calculated over the requests for
192    * which we actually got a reply).   Calculated
193    * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
194    */ 
195   struct GNUNET_TIME_Relative avg_delay;
196
197   /**
198    * Point in time until which this peer does not want us to migrate content
199    * to it.
200    */
201   struct GNUNET_TIME_Absolute migration_blocked;
202
203   /**
204    * Time until when we blocked this peer from migrating
205    * data to us.
206    */
207   struct GNUNET_TIME_Absolute last_migration_block;
208
209   /**
210    * Handle for an active request for transmission to this
211    * peer, or NULL.
212    */
213   struct GNUNET_CORE_TransmitHandle *cth;
214
215   /**
216    * Messages (replies, queries, content migration) we would like to
217    * send to this peer in the near future.  Sorted by priority, head.
218    */
219   struct PendingMessage *pending_messages_head;
220
221   /**
222    * Messages (replies, queries, content migration) we would like to
223    * send to this peer in the near future.  Sorted by priority, tail.
224    */
225   struct PendingMessage *pending_messages_tail;
226
227   /**
228    * How long does it typically take for us to transmit a message
229    * to this peer?  (delay between the request being issued and
230    * the callback being invoked).
231    */
232   struct GNUNET_LOAD_Value *transmission_delay;
233
234   /**
235    * Time when the last transmission request was issued.
236    */
237   struct GNUNET_TIME_Absolute last_transmission_request_start;
238
239   /**
240    * ID of delay task for scheduling transmission.
241    */
242   GNUNET_SCHEDULER_TaskIdentifier delayed_transmission_request_task;
243
244   /**
245    * Average priority of successful replies.  Calculated
246    * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
247    */
248   double avg_priority;
249
250   /**
251    * Increase in traffic preference still to be submitted
252    * to the core service for this peer.
253    */
254   uint64_t inc_preference;
255
256   /**
257    * Trust rating for this peer
258    */
259   uint32_t trust;
260
261   /**
262    * Trust rating for this peer on disk.
263    */
264   uint32_t disk_trust;
265
266   /**
267    * The peer's identity.
268    */
269   GNUNET_PEER_Id pid;  
270
271   /**
272    * Size of the linked list of 'pending_messages'.
273    */
274   unsigned int pending_requests;
275
276   /**
277    * Which offset in "last_p2p_replies" will be updated next?
278    * (we go round-robin).
279    */
280   unsigned int last_p2p_replies_woff;
281
282   /**
283    * Which offset in "last_client_replies" will be updated next?
284    * (we go round-robin).
285    */
286   unsigned int last_client_replies_woff;
287
288 };
289
290
291 /**
292  * Information we keep for each pending request.  We should try to
293  * keep this struct as small as possible since its memory consumption
294  * is key to how many requests we can have pending at once.
295  */
296 struct PendingRequest;
297
298
299 /**
300  * Doubly-linked list of requests we are performing
301  * on behalf of the same client.
302  */
303 struct ClientRequestList
304 {
305
306   /**
307    * This is a doubly-linked list.
308    */
309   struct ClientRequestList *next;
310
311   /**
312    * This is a doubly-linked list.
313    */
314   struct ClientRequestList *prev;
315
316   /**
317    * Request this entry represents.
318    */
319   struct PendingRequest *req;
320
321   /**
322    * Client list this request belongs to.
323    */
324   struct ClientList *client_list;
325
326 };
327
328
329 /**
330  * Replies to be transmitted to the client.  The actual
331  * response message is allocated after this struct.
332  */
333 struct ClientResponseMessage
334 {
335   /**
336    * This is a doubly-linked list.
337    */
338   struct ClientResponseMessage *next;
339
340   /**
341    * This is a doubly-linked list.
342    */
343   struct ClientResponseMessage *prev;
344
345   /**
346    * Client list entry this response belongs to.
347    */
348   struct ClientList *client_list;
349
350   /**
351    * Number of bytes in the response.
352    */
353   size_t msize;
354 };
355
356
357 /**
358  * Linked list of clients we are performing requests
359  * for right now.
360  */
361 struct ClientList
362 {
363   /**
364    * This is a linked list.
365    */
366   struct ClientList *next;
367
368   /**
369    * ID of a client making a request, NULL if this entry is for a
370    * peer.
371    */
372   struct GNUNET_SERVER_Client *client;
373
374   /**
375    * Head of list of requests performed on behalf
376    * of this client right now.
377    */
378   struct ClientRequestList *rl_head;
379
380   /**
381    * Tail of list of requests performed on behalf
382    * of this client right now.
383    */
384   struct ClientRequestList *rl_tail;
385
386   /**
387    * Head of linked list of responses.
388    */
389   struct ClientResponseMessage *res_head;
390
391   /**
392    * Tail of linked list of responses.
393    */
394   struct ClientResponseMessage *res_tail;
395
396   /**
397    * Context for sending replies.
398    */
399   struct GNUNET_CONNECTION_TransmitHandle *th;
400
401 };
402
403
404 /**
405  * Doubly-linked list of messages we are performing
406  * due to a pending request.
407  */
408 struct PendingMessageList
409 {
410
411   /**
412    * This is a doubly-linked list of messages on behalf of the same request.
413    */
414   struct PendingMessageList *next;
415
416   /**
417    * This is a doubly-linked list of messages on behalf of the same request.
418    */
419   struct PendingMessageList *prev;
420
421   /**
422    * Message this entry represents.
423    */
424   struct PendingMessage *pm;
425
426   /**
427    * Request this entry belongs to.
428    */
429   struct PendingRequest *req;
430
431   /**
432    * Peer this message is targeted for.
433    */
434   struct ConnectedPeer *target;
435
436 };
437
438
439 /**
440  * Information we keep for each pending request.  We should try to
441  * keep this struct as small as possible since its memory consumption
442  * is key to how many requests we can have pending at once.
443  */
444 struct PendingRequest
445 {
446
447   /**
448    * If this request was made by a client, this is our entry in the
449    * client request list; otherwise NULL.
450    */
451   struct ClientRequestList *client_request_list;
452
453   /**
454    * Entry of peer responsible for this entry (if this request
455    * was made by a peer).
456    */
457   struct ConnectedPeer *cp;
458
459   /**
460    * If this is a namespace query, pointer to the hash of the public
461    * key of the namespace; otherwise NULL.  Pointer will be to the 
462    * end of this struct (so no need to free it).
463    */
464   const GNUNET_HashCode *namespace;
465
466   /**
467    * Bloomfilter we use to filter out replies that we don't care about
468    * (anymore).  NULL as long as we are interested in all replies.
469    */
470   struct GNUNET_CONTAINER_BloomFilter *bf;
471
472   /**
473    * Context of our GNUNET_CORE_peer_change_preference call.
474    */
475   struct GNUNET_CORE_InformationRequestContext *irc;
476
477   /**
478    * Reference to DHT get operation for this request (or NULL).
479    */
480   struct GNUNET_DHT_GetHandle *dht_get;
481
482   /**
483    * Hash code of all replies that we have seen so far (only valid
484    * if client is not NULL since we only track replies like this for
485    * our own clients).
486    */
487   GNUNET_HashCode *replies_seen;
488
489   /**
490    * Node in the heap representing this entry; NULL
491    * if we have no heap node.
492    */
493   struct GNUNET_CONTAINER_HeapNode *hnode;
494
495   /**
496    * Head of list of messages being performed on behalf of this
497    * request.
498    */
499   struct PendingMessageList *pending_head;
500
501   /**
502    * Tail of list of messages being performed on behalf of this
503    * request.
504    */
505   struct PendingMessageList *pending_tail;
506
507   /**
508    * When did we first see this request (form this peer), or, if our
509    * client is initiating, when did we last initiate a search?
510    */
511   struct GNUNET_TIME_Absolute start_time;
512
513   /**
514    * The query that this request is for.
515    */
516   GNUNET_HashCode query;
517
518   /**
519    * The task responsible for transmitting queries
520    * for this request.
521    */
522   GNUNET_SCHEDULER_TaskIdentifier task;
523
524   /**
525    * (Interned) Peer identifier that identifies a preferred target
526    * for requests.
527    */
528   GNUNET_PEER_Id target_pid;
529
530   /**
531    * (Interned) Peer identifiers of peers that have already
532    * received our query for this content.
533    */
534   GNUNET_PEER_Id *used_pids;
535   
536   /**
537    * Our entry in the queue (non-NULL while we wait for our
538    * turn to interact with the local database).
539    */
540   struct GNUNET_DATASTORE_QueueEntry *qe;
541
542   /**
543    * Size of the 'bf' (in bytes).
544    */
545   size_t bf_size;
546
547   /**
548    * Desired anonymity level; only valid for requests from a local client.
549    */
550   uint32_t anonymity_level;
551
552   /**
553    * How many entries in "used_pids" are actually valid?
554    */
555   unsigned int used_pids_off;
556
557   /**
558    * How long is the "used_pids" array?
559    */
560   unsigned int used_pids_size;
561
562   /**
563    * Number of results found for this request.
564    */
565   unsigned int results_found;
566
567   /**
568    * How many entries in "replies_seen" are actually valid?
569    */
570   unsigned int replies_seen_off;
571
572   /**
573    * How long is the "replies_seen" array?
574    */
575   unsigned int replies_seen_size;
576   
577   /**
578    * Priority with which this request was made.  If one of our clients
579    * made the request, then this is the current priority that we are
580    * using when initiating the request.  This value is used when
581    * we decide to reward other peers with trust for providing a reply.
582    */
583   uint32_t priority;
584
585   /**
586    * Priority points left for us to spend when forwarding this request
587    * to other peers.
588    */
589   uint32_t remaining_priority;
590
591   /**
592    * Number to mingle hashes for bloom-filter tests with.
593    */
594   int32_t mingle;
595
596   /**
597    * TTL with which we saw this request (or, if we initiated, TTL that
598    * we used for the request).
599    */
600   int32_t ttl;
601   
602   /**
603    * Type of the content that this request is for.
604    */
605   enum GNUNET_BLOCK_Type type;
606
607   /**
608    * Remove this request after transmission of the current response.
609    */
610   int8_t do_remove;
611
612   /**
613    * GNUNET_YES if we should not forward this request to other peers.
614    */
615   int8_t local_only;
616
617   /**
618    * GNUNET_YES if we should not forward this request to other peers.
619    */
620   int8_t forward_only;
621
622 };
623
624
625 /**
626  * Block that is ready for migration to other peers.  Actual data is at the end of the block.
627  */
628 struct MigrationReadyBlock
629 {
630
631   /**
632    * This is a doubly-linked list.
633    */
634   struct MigrationReadyBlock *next;
635
636   /**
637    * This is a doubly-linked list.
638    */
639   struct MigrationReadyBlock *prev;
640
641   /**
642    * Query for the block.
643    */
644   GNUNET_HashCode query;
645
646   /**
647    * When does this block expire? 
648    */
649   struct GNUNET_TIME_Absolute expiration;
650
651   /**
652    * Peers we would consider forwarding this
653    * block to.  Zero for empty entries.
654    */
655   GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
656
657   /**
658    * Size of the block.
659    */
660   size_t size;
661
662   /**
663    *  Number of targets already used.
664    */
665   unsigned int used_targets;
666
667   /**
668    * Type of the block.
669    */
670   enum GNUNET_BLOCK_Type type;
671 };
672
673
674 /**
675  * Our connection to the datastore.
676  */
677 static struct GNUNET_DATASTORE_Handle *dsh;
678
679 /**
680  * Our block context.
681  */
682 static struct GNUNET_BLOCK_Context *block_ctx;
683
684 /**
685  * Our block configuration.
686  */
687 static struct GNUNET_CONFIGURATION_Handle *block_cfg;
688
689 /**
690  * Our scheduler.
691  */
692 static struct GNUNET_SCHEDULER_Handle *sched;
693
694 /**
695  * Our configuration.
696  */
697 static const struct GNUNET_CONFIGURATION_Handle *cfg;
698
699 /**
700  * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
701  */
702 static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
703
704 /**
705  * Map of peer identifiers to "struct PendingRequest" (for that peer).
706  */
707 static struct GNUNET_CONTAINER_MultiHashMap *peer_request_map;
708
709 /**
710  * Map of query identifiers to "struct PendingRequest" (for that query).
711  */
712 static struct GNUNET_CONTAINER_MultiHashMap *query_request_map;
713
714 /**
715  * Heap with the request that will expire next at the top.  Contains
716  * pointers of type "struct PendingRequest*"; these will *also* be
717  * aliased from the "requests_by_peer" data structures and the
718  * "requests_by_query" table.  Note that requests from our clients
719  * don't expire and are thus NOT in the "requests_by_expiration"
720  * (or the "requests_by_peer" tables).
721  */
722 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
723
724 /**
725  * Handle for reporting statistics.
726  */
727 static struct GNUNET_STATISTICS_Handle *stats;
728
729 /**
730  * Linked list of clients we are currently processing requests for.
731  */
732 static struct ClientList *client_list;
733
734 /**
735  * Pointer to handle to the core service (points to NULL until we've
736  * connected to it).
737  */
738 static struct GNUNET_CORE_Handle *core;
739
740 /**
741  * Head of linked list of blocks that can be migrated.
742  */
743 static struct MigrationReadyBlock *mig_head;
744
745 /**
746  * Tail of linked list of blocks that can be migrated.
747  */
748 static struct MigrationReadyBlock *mig_tail;
749
750 /**
751  * Request to datastore for migration (or NULL).
752  */
753 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
754
755 /**
756  * Request to datastore for DHT PUTs (or NULL).
757  */
758 static struct GNUNET_DATASTORE_QueueEntry *dht_qe;
759
760 /**
761  * Type we will request for the next DHT PUT round from the datastore.
762  */
763 static enum GNUNET_BLOCK_Type dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
764
765 /**
766  * Where do we store trust information?
767  */
768 static char *trustDirectory;
769
770 /**
771  * ID of task that collects blocks for migration.
772  */
773 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
774
775 /**
776  * ID of task that collects blocks for DHT PUTs.
777  */
778 static GNUNET_SCHEDULER_TaskIdentifier dht_task;
779
780 /**
781  * What is the maximum frequency at which we are allowed to
782  * poll the datastore for migration content?
783  */
784 static struct GNUNET_TIME_Relative min_migration_delay;
785
786 /**
787  * Handle for DHT operations.
788  */
789 static struct GNUNET_DHT_Handle *dht_handle;
790
791 /**
792  * Size of the doubly-linked list of migration blocks.
793  */
794 static unsigned int mig_size;
795
796 /**
797  * Are we allowed to migrate content to this peer.
798  */
799 static int active_migration;
800
801 /**
802  * How many entires with zero anonymity do we currently estimate
803  * to have in the database?
804  */
805 static unsigned int zero_anonymity_count_estimate;
806
807 /**
808  * Typical priorities we're seeing from other peers right now.  Since
809  * most priorities will be zero, this value is the weighted average of
810  * non-zero priorities seen "recently".  In order to ensure that new
811  * values do not dramatically change the ratio, values are first
812  * "capped" to a reasonable range (+N of the current value) and then
813  * averaged into the existing value by a ratio of 1:N.  Hence
814  * receiving the largest possible priority can still only raise our
815  * "current_priorities" by at most 1.
816  */
817 static double current_priorities;
818
819 /**
820  * Datastore 'GET' load tracking.
821  */
822 static struct GNUNET_LOAD_Value *datastore_get_load;
823
824 /**
825  * Datastore 'PUT' load tracking.
826  */
827 static struct GNUNET_LOAD_Value *datastore_put_load;
828
829 /**
830  * How long do requests typically stay in the routing table?
831  */
832 static struct GNUNET_LOAD_Value *rt_entry_lifetime;
833
834 /**
835  * We've just now completed a datastore request.  Update our
836  * datastore load calculations.
837  *
838  * @param start time when the datastore request was issued
839  */
840 static void
841 update_datastore_delays (struct GNUNET_TIME_Absolute start)
842 {
843   struct GNUNET_TIME_Relative delay;
844
845   delay = GNUNET_TIME_absolute_get_duration (start);
846   GNUNET_LOAD_update (datastore_get_load,
847                       delay.value);
848 }
849
850
851 /**
852  * Get the filename under which we would store the GNUNET_HELLO_Message
853  * for the given host and protocol.
854  * @return filename of the form DIRECTORY/HOSTID
855  */
856 static char *
857 get_trust_filename (const struct GNUNET_PeerIdentity *id)
858 {
859   struct GNUNET_CRYPTO_HashAsciiEncoded fil;
860   char *fn;
861
862   GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
863   GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil);
864   return fn;
865 }
866
867
868
869 /**
870  * Transmit messages by copying it to the target buffer
871  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
872  * for writing in the meantime.  In that case, do nothing
873  * (the disconnect or shutdown handler will take care of the rest).
874  * If we were able to transmit messages and there are still more
875  * pending, ask core again for further calls to this function.
876  *
877  * @param cls closure, pointer to the 'struct ConnectedPeer*'
878  * @param size number of bytes available in buf
879  * @param buf where the callee should write the message
880  * @return number of bytes written to buf
881  */
882 static size_t
883 transmit_to_peer (void *cls,
884                   size_t size, void *buf);
885
886
887 /* ******************* clean up functions ************************ */
888
889 /**
890  * Delete the given migration block.
891  *
892  * @param mb block to delete
893  */
894 static void
895 delete_migration_block (struct MigrationReadyBlock *mb)
896 {
897   GNUNET_CONTAINER_DLL_remove (mig_head,
898                                mig_tail,
899                                mb);
900   GNUNET_PEER_decrement_rcs (mb->target_list,
901                              MIGRATION_LIST_SIZE);
902   mig_size--;
903   GNUNET_free (mb);
904 }
905
906
907 /**
908  * Compare the distance of two peers to a key.
909  *
910  * @param key key
911  * @param p1 first peer
912  * @param p2 second peer
913  * @return GNUNET_YES if P1 is closer to key than P2
914  */
915 static int
916 is_closer (const GNUNET_HashCode *key,
917            const struct GNUNET_PeerIdentity *p1,
918            const struct GNUNET_PeerIdentity *p2)
919 {
920   return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey,
921                                     &p2->hashPubKey,
922                                     key);
923 }
924
925
926 /**
927  * Consider migrating content to a given peer.
928  *
929  * @param cls 'struct MigrationReadyBlock*' to select
930  *            targets for (or NULL for none)
931  * @param key ID of the peer 
932  * @param value 'struct ConnectedPeer' of the peer
933  * @return GNUNET_YES (always continue iteration)
934  */
935 static int
936 consider_migration (void *cls,
937                     const GNUNET_HashCode *key,
938                     void *value)
939 {
940   struct MigrationReadyBlock *mb = cls;
941   struct ConnectedPeer *cp = value;
942   struct MigrationReadyBlock *pos;
943   struct GNUNET_PeerIdentity cppid;
944   struct GNUNET_PeerIdentity otherpid;
945   struct GNUNET_PeerIdentity worstpid;
946   size_t msize;
947   unsigned int i;
948   unsigned int repl;
949   
950   /* consider 'cp' as a migration target for mb */
951   if (GNUNET_TIME_absolute_get_remaining (cp->migration_blocked).value > 0)
952     return GNUNET_YES; /* peer has requested no migration! */
953   if (mb != NULL)
954     {
955       GNUNET_PEER_resolve (cp->pid,
956                            &cppid);
957       repl = MIGRATION_LIST_SIZE;
958       for (i=0;i<MIGRATION_LIST_SIZE;i++)
959         {
960           if (mb->target_list[i] == 0)
961             {
962               mb->target_list[i] = cp->pid;
963               GNUNET_PEER_change_rc (mb->target_list[i], 1);
964               repl = MIGRATION_LIST_SIZE;
965               break;
966             }
967           GNUNET_PEER_resolve (mb->target_list[i],
968                                &otherpid);
969           if ( (repl == MIGRATION_LIST_SIZE) &&
970                is_closer (&mb->query,
971                           &cppid,
972                           &otherpid)) 
973             {
974               repl = i;
975               worstpid = otherpid;
976             }
977           else if ( (repl != MIGRATION_LIST_SIZE) &&
978                     (is_closer (&mb->query,
979                                 &worstpid,
980                                 &otherpid) ) )
981             {
982               repl = i;
983               worstpid = otherpid;
984             }       
985         }
986       if (repl != MIGRATION_LIST_SIZE) 
987         {
988           GNUNET_PEER_change_rc (mb->target_list[repl], -1);
989           mb->target_list[repl] = cp->pid;
990           GNUNET_PEER_change_rc (mb->target_list[repl], 1);
991         }
992     }
993
994   /* consider scheduling transmission to cp for content migration */
995   if (cp->cth != NULL)        
996     return GNUNET_YES; 
997   msize = 0;
998   pos = mig_head;
999   while (pos != NULL)
1000     {
1001       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1002         {
1003           if (cp->pid == pos->target_list[i])
1004             {
1005               if (msize == 0)
1006                 msize = pos->size;
1007               else
1008                 msize = GNUNET_MIN (msize,
1009                                     pos->size);
1010               break;
1011             }
1012         }
1013       pos = pos->next;
1014     }
1015   if (msize == 0)
1016     return GNUNET_YES; /* no content available */
1017 #if DEBUG_FS
1018   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1019               "Trying to migrate at least %u bytes to peer `%s'\n",
1020               msize,
1021               GNUNET_h2s (key));
1022 #endif
1023   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
1024     {
1025       GNUNET_SCHEDULER_cancel (sched, cp->delayed_transmission_request_task);
1026       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
1027     }
1028   cp->cth 
1029     = GNUNET_CORE_notify_transmit_ready (core,
1030                                          0, GNUNET_TIME_UNIT_FOREVER_REL,
1031                                          (const struct GNUNET_PeerIdentity*) key,
1032                                          msize + sizeof (struct PutMessage),
1033                                          &transmit_to_peer,
1034                                          cp);
1035   return GNUNET_YES;
1036 }
1037
1038
1039 /**
1040  * Task that is run periodically to obtain blocks for content
1041  * migration
1042  * 
1043  * @param cls unused
1044  * @param tc scheduler context (also unused)
1045  */
1046 static void
1047 gather_migration_blocks (void *cls,
1048                          const struct GNUNET_SCHEDULER_TaskContext *tc);
1049
1050
1051
1052
1053 /**
1054  * Task that is run periodically to obtain blocks for DHT PUTs.
1055  * 
1056  * @param cls type of blocks to gather
1057  * @param tc scheduler context (unused)
1058  */
1059 static void
1060 gather_dht_put_blocks (void *cls,
1061                        const struct GNUNET_SCHEDULER_TaskContext *tc);
1062
1063
1064 /**
1065  * If the migration task is not currently running, consider
1066  * (re)scheduling it with the appropriate delay.
1067  */
1068 static void
1069 consider_migration_gathering ()
1070 {
1071   struct GNUNET_TIME_Relative delay;
1072
1073   if (dsh == NULL)
1074     return;
1075   if (mig_qe != NULL)
1076     return;
1077   if (mig_task != GNUNET_SCHEDULER_NO_TASK)
1078     return;
1079   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
1080                                          mig_size);
1081   delay = GNUNET_TIME_relative_divide (delay,
1082                                        MAX_MIGRATION_QUEUE);
1083   delay = GNUNET_TIME_relative_max (delay,
1084                                     min_migration_delay);
1085   mig_task = GNUNET_SCHEDULER_add_delayed (sched,
1086                                            delay,
1087                                            &gather_migration_blocks,
1088                                            NULL);
1089 }
1090
1091
1092 /**
1093  * If the DHT PUT gathering task is not currently running, consider
1094  * (re)scheduling it with the appropriate delay.
1095  */
1096 static void
1097 consider_dht_put_gathering (void *cls)
1098 {
1099   struct GNUNET_TIME_Relative delay;
1100
1101   if (dsh == NULL)
1102     return;
1103   if (dht_qe != NULL)
1104     return;
1105   if (dht_task != GNUNET_SCHEDULER_NO_TASK)
1106     return;
1107   if (zero_anonymity_count_estimate > 0)
1108     {
1109       delay = GNUNET_TIME_relative_divide (GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY,
1110                                            zero_anonymity_count_estimate);
1111       delay = GNUNET_TIME_relative_min (delay,
1112                                         MAX_DHT_PUT_FREQ);
1113     }
1114   else
1115     {
1116       /* if we have NO zero-anonymity content yet, wait 5 minutes for some to
1117          (hopefully) appear */
1118       delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5);
1119     }
1120   dht_task = GNUNET_SCHEDULER_add_delayed (sched,
1121                                            delay,
1122                                            &gather_dht_put_blocks,
1123                                            cls);
1124 }
1125
1126
1127 /**
1128  * Process content offered for migration.
1129  *
1130  * @param cls closure
1131  * @param key key for the content
1132  * @param size number of bytes in data
1133  * @param data content stored
1134  * @param type type of the content
1135  * @param priority priority of the content
1136  * @param anonymity anonymity-level for the content
1137  * @param expiration expiration time for the content
1138  * @param uid unique identifier for the datum;
1139  *        maybe 0 if no unique identifier is available
1140  */
1141 static void
1142 process_migration_content (void *cls,
1143                            const GNUNET_HashCode * key,
1144                            size_t size,
1145                            const void *data,
1146                            enum GNUNET_BLOCK_Type type,
1147                            uint32_t priority,
1148                            uint32_t anonymity,
1149                            struct GNUNET_TIME_Absolute
1150                            expiration, uint64_t uid)
1151 {
1152   struct MigrationReadyBlock *mb;
1153   
1154   if (key == NULL)
1155     {
1156       mig_qe = NULL;
1157       if (mig_size < MAX_MIGRATION_QUEUE)  
1158         consider_migration_gathering ();
1159       return;
1160     }
1161   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1162     {
1163       if (GNUNET_OK !=
1164           GNUNET_FS_handle_on_demand_block (key, size, data,
1165                                             type, priority, anonymity,
1166                                             expiration, uid, 
1167                                             &process_migration_content,
1168                                             NULL))
1169         {
1170           GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1171         }
1172       return;
1173     }
1174 #if DEBUG_FS
1175   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1176               "Retrieved block `%s' of type %u for migration\n",
1177               GNUNET_h2s (key),
1178               type);
1179 #endif
1180   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
1181   mb->query = *key;
1182   mb->expiration = expiration;
1183   mb->size = size;
1184   mb->type = type;
1185   memcpy (&mb[1], data, size);
1186   GNUNET_CONTAINER_DLL_insert_after (mig_head,
1187                                      mig_tail,
1188                                      mig_tail,
1189                                      mb);
1190   mig_size++;
1191   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1192                                          &consider_migration,
1193                                          mb);
1194   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1195 }
1196
1197
1198 /**
1199  * Function called upon completion of the DHT PUT operation.
1200  */
1201 static void
1202 dht_put_continuation (void *cls,
1203                       const struct GNUNET_SCHEDULER_TaskContext *tc)
1204 {
1205   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1206 }
1207
1208
1209 /**
1210  * Store content in DHT.
1211  *
1212  * @param cls closure
1213  * @param key key for the content
1214  * @param size number of bytes in data
1215  * @param data content stored
1216  * @param type type of the content
1217  * @param priority priority of the content
1218  * @param anonymity anonymity-level for the content
1219  * @param expiration expiration time for the content
1220  * @param uid unique identifier for the datum;
1221  *        maybe 0 if no unique identifier is available
1222  */
1223 static void
1224 process_dht_put_content (void *cls,
1225                          const GNUNET_HashCode * key,
1226                          size_t size,
1227                          const void *data,
1228                          enum GNUNET_BLOCK_Type type,
1229                          uint32_t priority,
1230                          uint32_t anonymity,
1231                          struct GNUNET_TIME_Absolute
1232                          expiration, uint64_t uid)
1233
1234   static unsigned int counter;
1235   static GNUNET_HashCode last_vhash;
1236   static GNUNET_HashCode vhash;
1237
1238   if (key == NULL)
1239     {
1240       dht_qe = NULL;
1241       consider_dht_put_gathering (cls);
1242       return;
1243     }
1244   /* slightly funky code to estimate the total number of values with zero
1245      anonymity from the maximum observed length of a monotonically increasing 
1246      sequence of hashes over the contents */
1247   GNUNET_CRYPTO_hash (data, size, &vhash);
1248   if (GNUNET_CRYPTO_hash_cmp (&vhash, &last_vhash) <= 0)
1249     {
1250       if (zero_anonymity_count_estimate > 0)
1251         zero_anonymity_count_estimate /= 2;
1252       counter = 0;
1253     }
1254   last_vhash = vhash;
1255   if (counter < 31)
1256     counter++;
1257   if (zero_anonymity_count_estimate < (1 << counter))
1258     zero_anonymity_count_estimate = (1 << counter);
1259 #if DEBUG_FS
1260   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1261               "Retrieved block `%s' of type %u for DHT PUT\n",
1262               GNUNET_h2s (key),
1263               type);
1264 #endif
1265   GNUNET_DHT_put (dht_handle,
1266                   key,
1267                   GNUNET_DHT_RO_NONE,
1268                   type,
1269                   size,
1270                   data,
1271                   expiration,
1272                   GNUNET_TIME_UNIT_FOREVER_REL,
1273                   &dht_put_continuation,
1274                   cls);
1275 }
1276
1277
1278 /**
1279  * Task that is run periodically to obtain blocks for content
1280  * migration
1281  * 
1282  * @param cls unused
1283  * @param tc scheduler context (also unused)
1284  */
1285 static void
1286 gather_migration_blocks (void *cls,
1287                          const struct GNUNET_SCHEDULER_TaskContext *tc)
1288 {
1289   mig_task = GNUNET_SCHEDULER_NO_TASK;
1290   if (dsh != NULL)
1291     {
1292       mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, UINT_MAX,
1293                                             GNUNET_TIME_UNIT_FOREVER_REL,
1294                                             &process_migration_content, NULL);
1295       GNUNET_assert (mig_qe != NULL);
1296     }
1297 }
1298
1299
1300 /**
1301  * Task that is run periodically to obtain blocks for DHT PUTs.
1302  * 
1303  * @param cls type of blocks to gather
1304  * @param tc scheduler context (unused)
1305  */
1306 static void
1307 gather_dht_put_blocks (void *cls,
1308                        const struct GNUNET_SCHEDULER_TaskContext *tc)
1309 {
1310   dht_task = GNUNET_SCHEDULER_NO_TASK;
1311   if (dsh != NULL)
1312     {
1313       if (dht_put_type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1314         dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
1315       dht_qe = GNUNET_DATASTORE_get_zero_anonymity (dsh, 0, UINT_MAX,
1316                                                     GNUNET_TIME_UNIT_FOREVER_REL,
1317                                                     dht_put_type++,
1318                                                     &process_dht_put_content, NULL);
1319       GNUNET_assert (dht_qe != NULL);
1320     }
1321 }
1322
1323
1324 /**
1325  * We're done with a particular message list entry.
1326  * Free all associated resources.
1327  * 
1328  * @param pml entry to destroy
1329  */
1330 static void
1331 destroy_pending_message_list_entry (struct PendingMessageList *pml)
1332 {
1333   GNUNET_CONTAINER_DLL_remove (pml->req->pending_head,
1334                                pml->req->pending_tail,
1335                                pml);
1336   GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head,
1337                                pml->target->pending_messages_tail,
1338                                pml->pm);
1339   pml->target->pending_requests--;
1340   GNUNET_free (pml->pm);
1341   GNUNET_free (pml);
1342 }
1343
1344
1345 /**
1346  * Destroy the given pending message (and call the respective
1347  * continuation).
1348  *
1349  * @param pm message to destroy
1350  * @param tpid id of peer that the message was delivered to, or 0 for none
1351  */
1352 static void
1353 destroy_pending_message (struct PendingMessage *pm,
1354                          GNUNET_PEER_Id tpid)
1355 {
1356   struct PendingMessageList *pml = pm->pml;
1357   TransmissionContinuation cont;
1358   void *cont_cls;
1359
1360   cont = pm->cont;
1361   cont_cls = pm->cont_cls;
1362   if (pml != NULL)
1363     {
1364       GNUNET_assert (pml->pm == pm);
1365       GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
1366       destroy_pending_message_list_entry (pml);
1367     }
1368   else
1369     {
1370       GNUNET_free (pm);
1371     }
1372   if (cont != NULL)
1373     cont (cont_cls, tpid);  
1374 }
1375
1376
1377 /**
1378  * We're done processing a particular request.
1379  * Free all associated resources.
1380  *
1381  * @param pr request to destroy
1382  */
1383 static void
1384 destroy_pending_request (struct PendingRequest *pr)
1385 {
1386   struct GNUNET_PeerIdentity pid;
1387
1388   if (pr->hnode != NULL)
1389     {
1390       GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
1391                                          pr->hnode);
1392       pr->hnode = NULL;
1393     }
1394   if (NULL == pr->client_request_list)
1395     {
1396       GNUNET_STATISTICS_update (stats,
1397                                 gettext_noop ("# P2P searches active"),
1398                                 -1,
1399                                 GNUNET_NO);
1400     }
1401   else
1402     {
1403       GNUNET_STATISTICS_update (stats,
1404                                 gettext_noop ("# client searches active"),
1405                                 -1,
1406                                 GNUNET_NO);
1407     }
1408   if (GNUNET_YES == 
1409       GNUNET_CONTAINER_multihashmap_remove (query_request_map,
1410                                             &pr->query,
1411                                             pr))
1412     {
1413       GNUNET_LOAD_update (rt_entry_lifetime,
1414                           GNUNET_TIME_absolute_get_duration (pr->start_time).value);
1415     }
1416   if (pr->qe != NULL)
1417      {
1418       GNUNET_DATASTORE_cancel (pr->qe);
1419       pr->qe = NULL;
1420     }
1421   if (pr->dht_get != NULL)
1422     {
1423       GNUNET_DHT_get_stop (pr->dht_get);
1424       pr->dht_get = NULL;
1425     }
1426   if (pr->client_request_list != NULL)
1427     {
1428       GNUNET_CONTAINER_DLL_remove (pr->client_request_list->client_list->rl_head,
1429                                    pr->client_request_list->client_list->rl_tail,
1430                                    pr->client_request_list);
1431       GNUNET_free (pr->client_request_list);
1432       pr->client_request_list = NULL;
1433     }
1434   if (pr->cp != NULL)
1435     {
1436       GNUNET_PEER_resolve (pr->cp->pid,
1437                            &pid);
1438       (void) GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1439                                                    &pid.hashPubKey,
1440                                                    pr);
1441       pr->cp = NULL;
1442     }
1443   if (pr->bf != NULL)
1444     {
1445       GNUNET_CONTAINER_bloomfilter_free (pr->bf);                                        
1446       pr->bf = NULL;
1447     }
1448   if (pr->irc != NULL)
1449     {
1450       GNUNET_CORE_peer_change_preference_cancel (pr->irc);
1451       pr->irc = NULL;
1452     }
1453   if (pr->replies_seen != NULL)
1454     {
1455       GNUNET_free (pr->replies_seen);
1456       pr->replies_seen = NULL;
1457     }
1458   if (pr->task != GNUNET_SCHEDULER_NO_TASK)
1459     {
1460       GNUNET_SCHEDULER_cancel (sched,
1461                                pr->task);
1462       pr->task = GNUNET_SCHEDULER_NO_TASK;
1463     }
1464   while (NULL != pr->pending_head)    
1465     destroy_pending_message_list_entry (pr->pending_head);
1466   GNUNET_PEER_change_rc (pr->target_pid, -1);
1467   if (pr->used_pids != NULL)
1468     {
1469       GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
1470       GNUNET_free (pr->used_pids);
1471       pr->used_pids_off = 0;
1472       pr->used_pids_size = 0;
1473       pr->used_pids = NULL;
1474     }
1475   GNUNET_free (pr);
1476 }
1477
1478
1479 /**
1480  * Method called whenever a given peer connects.
1481  *
1482  * @param cls closure, not used
1483  * @param peer peer identity this notification is about
1484  * @param latency reported latency of the connection with 'other'
1485  * @param distance reported distance (DV) to 'other' 
1486  */
1487 static void 
1488 peer_connect_handler (void *cls,
1489                       const struct
1490                       GNUNET_PeerIdentity * peer,
1491                       struct GNUNET_TIME_Relative latency,
1492                       uint32_t distance)
1493 {
1494   struct ConnectedPeer *cp;
1495   struct MigrationReadyBlock *pos;
1496   char *fn;
1497   uint32_t trust;
1498   
1499   cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
1500   cp->transmission_delay = GNUNET_LOAD_value_init ();
1501   cp->pid = GNUNET_PEER_intern (peer);
1502
1503   fn = get_trust_filename (peer);
1504   if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
1505       (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
1506     cp->disk_trust = cp->trust = ntohl (trust);
1507   GNUNET_free (fn);
1508
1509   GNUNET_break (GNUNET_OK ==
1510                 GNUNET_CONTAINER_multihashmap_put (connected_peers,
1511                                                    &peer->hashPubKey,
1512                                                    cp,
1513                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1514
1515   pos = mig_head;
1516   while (NULL != pos)
1517     {
1518       (void) consider_migration (pos, &peer->hashPubKey, cp);
1519       pos = pos->next;
1520     }
1521 }
1522
1523
1524 /**
1525  * Increase the host credit by a value.
1526  *
1527  * @param host which peer to change the trust value on
1528  * @param value is the int value by which the
1529  *  host credit is to be increased or decreased
1530  * @returns the actual change in trust (positive or negative)
1531  */
1532 static int
1533 change_host_trust (struct ConnectedPeer *host, int value)
1534 {
1535   unsigned int old_trust;
1536
1537   if (value == 0)
1538     return 0;
1539   GNUNET_assert (host != NULL);
1540   old_trust = host->trust;
1541   if (value > 0)
1542     {
1543       if (host->trust + value < host->trust)
1544         {
1545           value = UINT32_MAX - host->trust;
1546           host->trust = UINT32_MAX;
1547         }
1548       else
1549         host->trust += value;
1550     }
1551   else
1552     {
1553       if (host->trust < -value)
1554         {
1555           value = -host->trust;
1556           host->trust = 0;
1557         }
1558       else
1559         host->trust += value;
1560     }
1561   return value;
1562 }
1563
1564
1565 /**
1566  * Write host-trust information to a file - flush the buffer entry!
1567  */
1568 static int
1569 flush_trust (void *cls,
1570              const GNUNET_HashCode *key,
1571              void *value)
1572 {
1573   struct ConnectedPeer *host = value;
1574   char *fn;
1575   uint32_t trust;
1576   struct GNUNET_PeerIdentity pid;
1577
1578   if (host->trust == host->disk_trust)
1579     return GNUNET_OK;                     /* unchanged */
1580   GNUNET_PEER_resolve (host->pid,
1581                        &pid);
1582   fn = get_trust_filename (&pid);
1583   if (host->trust == 0)
1584     {
1585       if ((0 != UNLINK (fn)) && (errno != ENOENT))
1586         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
1587                                   GNUNET_ERROR_TYPE_BULK, "unlink", fn);
1588     }
1589   else
1590     {
1591       trust = htonl (host->trust);
1592       if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust, 
1593                                                     sizeof(uint32_t),
1594                                                     GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE
1595                                                     | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ))
1596         host->disk_trust = host->trust;
1597     }
1598   GNUNET_free (fn);
1599   return GNUNET_OK;
1600 }
1601
1602 /**
1603  * Call this method periodically to scan data/hosts for new hosts.
1604  */
1605 static void
1606 cron_flush_trust (void *cls,
1607                   const struct GNUNET_SCHEDULER_TaskContext *tc)
1608 {
1609
1610   if (NULL == connected_peers)
1611     return;
1612   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1613                                          &flush_trust,
1614                                          NULL);
1615   if (NULL == tc)
1616     return;
1617   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1618     return;
1619   GNUNET_SCHEDULER_add_delayed (tc->sched,
1620                                 TRUST_FLUSH_FREQ, &cron_flush_trust, NULL);
1621 }
1622
1623
1624 /**
1625  * Free (each) request made by the peer.
1626  *
1627  * @param cls closure, points to peer that the request belongs to
1628  * @param key current key code
1629  * @param value value in the hash map
1630  * @return GNUNET_YES (we should continue to iterate)
1631  */
1632 static int
1633 destroy_request (void *cls,
1634                  const GNUNET_HashCode * key,
1635                  void *value)
1636 {
1637   const struct GNUNET_PeerIdentity * peer = cls;
1638   struct PendingRequest *pr = value;
1639   
1640   GNUNET_break (GNUNET_YES ==
1641                 GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1642                                                       &peer->hashPubKey,
1643                                                       pr));
1644   destroy_pending_request (pr);
1645   return GNUNET_YES;
1646 }
1647
1648
1649 /**
1650  * Method called whenever a peer disconnects.
1651  *
1652  * @param cls closure, not used
1653  * @param peer peer identity this notification is about
1654  */
1655 static void
1656 peer_disconnect_handler (void *cls,
1657                          const struct
1658                          GNUNET_PeerIdentity * peer)
1659 {
1660   struct ConnectedPeer *cp;
1661   struct PendingMessage *pm;
1662   unsigned int i;
1663   struct MigrationReadyBlock *pos;
1664   struct MigrationReadyBlock *next;
1665
1666   GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map,
1667                                               &peer->hashPubKey,
1668                                               &destroy_request,
1669                                               (void*) peer);
1670   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1671                                           &peer->hashPubKey);
1672   if (cp == NULL)
1673     return;
1674   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1675     {
1676       if (NULL != cp->last_client_replies[i])
1677         {
1678           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1679           cp->last_client_replies[i] = NULL;
1680         }
1681     }
1682   GNUNET_break (GNUNET_YES ==
1683                 GNUNET_CONTAINER_multihashmap_remove (connected_peers,
1684                                                       &peer->hashPubKey,
1685                                                       cp));
1686   /* remove this peer from migration considerations; schedule
1687      alternatives */
1688   next = mig_head;
1689   while (NULL != (pos = next))
1690     {
1691       next = pos->next;
1692       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1693         {
1694           if (pos->target_list[i] == cp->pid)
1695             {
1696               GNUNET_PEER_change_rc (pos->target_list[i], -1);
1697               pos->target_list[i] = 0;
1698             }
1699          }
1700       if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers))
1701         {
1702           delete_migration_block (pos);
1703           consider_migration_gathering ();
1704           continue;
1705         }
1706       GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1707                                              &consider_migration,
1708                                              pos);
1709     }
1710   GNUNET_PEER_change_rc (cp->pid, -1);
1711   GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1712   if (NULL != cp->cth)
1713     {
1714       GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1715       cp->cth = NULL;
1716     }
1717   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
1718     {
1719       GNUNET_SCHEDULER_cancel (sched, cp->delayed_transmission_request_task);
1720       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
1721     }
1722   while (NULL != (pm = cp->pending_messages_head))
1723     destroy_pending_message (pm, 0 /* delivery failed */);
1724   GNUNET_LOAD_value_free (cp->transmission_delay);
1725   GNUNET_break (0 == cp->pending_requests);
1726   GNUNET_free (cp);
1727 }
1728
1729
1730 /**
1731  * Iterator over hash map entries that removes all occurences
1732  * of the given 'client' from the 'last_client_replies' of the
1733  * given connected peer.
1734  *
1735  * @param cls closure, the 'struct GNUNET_SERVER_Client*' to remove
1736  * @param key current key code (unused)
1737  * @param value value in the hash map (the 'struct ConnectedPeer*' to change)
1738  * @return GNUNET_YES (we should continue to iterate)
1739  */
1740 static int
1741 remove_client_from_last_client_replies (void *cls,
1742                                         const GNUNET_HashCode * key,
1743                                         void *value)
1744 {
1745   struct GNUNET_SERVER_Client *client = cls;
1746   struct ConnectedPeer *cp = value;
1747   unsigned int i;
1748
1749   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1750     {
1751       if (cp->last_client_replies[i] == client)
1752         {
1753           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1754           cp->last_client_replies[i] = NULL;
1755         }
1756     }  
1757   return GNUNET_YES;
1758 }
1759
1760
1761 /**
1762  * A client disconnected.  Remove all of its pending queries.
1763  *
1764  * @param cls closure, NULL
1765  * @param client identification of the client
1766  */
1767 static void
1768 handle_client_disconnect (void *cls,
1769                           struct GNUNET_SERVER_Client
1770                           * client)
1771 {
1772   struct ClientList *pos;
1773   struct ClientList *prev;
1774   struct ClientRequestList *rcl;
1775   struct ClientResponseMessage *creply;
1776
1777   if (client == NULL)
1778     return;
1779   prev = NULL;
1780   pos = client_list;
1781   while ( (NULL != pos) &&
1782           (pos->client != client) )
1783     {
1784       prev = pos;
1785       pos = pos->next;
1786     }
1787   if (pos == NULL)
1788     return; /* no requests pending for this client */
1789   while (NULL != (rcl = pos->rl_head))
1790     {
1791       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1792                   "Destroying pending request `%s' on disconnect\n",
1793                   GNUNET_h2s (&rcl->req->query));
1794       destroy_pending_request (rcl->req);
1795     }
1796   if (prev == NULL)
1797     client_list = pos->next;
1798   else
1799     prev->next = pos->next;
1800   if (pos->th != NULL)
1801     {
1802       GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
1803       pos->th = NULL;
1804     }
1805   while (NULL != (creply = pos->res_head))
1806     {
1807       GNUNET_CONTAINER_DLL_remove (pos->res_head,
1808                                    pos->res_tail,
1809                                    creply);
1810       GNUNET_free (creply);
1811     }    
1812   GNUNET_SERVER_client_drop (pos->client);
1813   GNUNET_free (pos);
1814   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1815                                          &remove_client_from_last_client_replies,
1816                                          client);
1817 }
1818
1819
1820 /**
1821  * Iterator to free peer entries.
1822  *
1823  * @param cls closure, unused
1824  * @param key current key code
1825  * @param value value in the hash map (peer entry)
1826  * @return GNUNET_YES (we should continue to iterate)
1827  */
1828 static int 
1829 clean_peer (void *cls,
1830             const GNUNET_HashCode * key,
1831             void *value)
1832 {
1833   peer_disconnect_handler (NULL, (const struct GNUNET_PeerIdentity*) key);
1834   return GNUNET_YES;
1835 }
1836
1837
1838 /**
1839  * Task run during shutdown.
1840  *
1841  * @param cls unused
1842  * @param tc unused
1843  */
1844 static void
1845 shutdown_task (void *cls,
1846                const struct GNUNET_SCHEDULER_TaskContext *tc)
1847 {
1848   if (mig_qe != NULL)
1849     {
1850       GNUNET_DATASTORE_cancel (mig_qe);
1851       mig_qe = NULL;
1852     }
1853   if (dht_qe != NULL)
1854     {
1855       GNUNET_DATASTORE_cancel (dht_qe);
1856       dht_qe = NULL;
1857     }
1858   if (GNUNET_SCHEDULER_NO_TASK != mig_task)
1859     {
1860       GNUNET_SCHEDULER_cancel (sched, mig_task);
1861       mig_task = GNUNET_SCHEDULER_NO_TASK;
1862     }
1863   if (GNUNET_SCHEDULER_NO_TASK != dht_task)
1864     {
1865       GNUNET_SCHEDULER_cancel (sched, dht_task);
1866       dht_task = GNUNET_SCHEDULER_NO_TASK;
1867     }
1868   while (client_list != NULL)
1869     handle_client_disconnect (NULL,
1870                               client_list->client);
1871   cron_flush_trust (NULL, NULL);
1872   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1873                                          &clean_peer,
1874                                          NULL);
1875   GNUNET_break (0 == GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap));
1876   GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
1877   requests_by_expiration_heap = 0;
1878   GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
1879   connected_peers = NULL;
1880   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map));
1881   GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
1882   query_request_map = NULL;
1883   GNUNET_LOAD_value_free (rt_entry_lifetime);
1884   rt_entry_lifetime = NULL;
1885   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map));
1886   GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
1887   peer_request_map = NULL;
1888   GNUNET_assert (NULL != core);
1889   GNUNET_CORE_disconnect (core);
1890   core = NULL;
1891   if (stats != NULL)
1892     {
1893       GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
1894       stats = NULL;
1895     }
1896   if (dsh != NULL)
1897     {
1898       GNUNET_DATASTORE_disconnect (dsh,
1899                                    GNUNET_NO);
1900       dsh = NULL;
1901     }
1902   while (mig_head != NULL)
1903     delete_migration_block (mig_head);
1904   GNUNET_assert (0 == mig_size);
1905   GNUNET_DHT_disconnect (dht_handle);
1906   dht_handle = NULL;
1907   GNUNET_LOAD_value_free (datastore_get_load);
1908   datastore_get_load = NULL;
1909   GNUNET_LOAD_value_free (datastore_put_load);
1910   datastore_put_load = NULL;
1911   GNUNET_BLOCK_context_destroy (block_ctx);
1912   block_ctx = NULL;
1913   GNUNET_CONFIGURATION_destroy (block_cfg);
1914   block_cfg = NULL;
1915   sched = NULL;
1916   cfg = NULL;  
1917   GNUNET_free_non_null (trustDirectory);
1918   trustDirectory = NULL;
1919 }
1920
1921
1922 /* ******************* Utility functions  ******************** */
1923
1924
1925 /**
1926  * We've had to delay a request for transmission to core, but now
1927  * we should be ready.  Run it.
1928  *
1929  * @param cls the 'struct ConnectedPeer' for which a request was delayed
1930  * @param tc task context (unused)
1931  */
1932 static void
1933 delayed_transmission_request (void *cls,
1934                               const struct GNUNET_SCHEDULER_TaskContext *tc)
1935 {
1936   struct ConnectedPeer *cp = cls;
1937   struct GNUNET_PeerIdentity pid;
1938   struct PendingMessage *pm;
1939
1940   pm = cp->pending_messages_head;
1941   cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
1942   GNUNET_assert (cp->cth == NULL);
1943   if (pm == NULL)
1944     return;
1945   GNUNET_PEER_resolve (cp->pid,
1946                        &pid);
1947   cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
1948   cp->cth = GNUNET_CORE_notify_transmit_ready (core,
1949                                                pm->priority,
1950                                                GNUNET_CONSTANTS_SERVICE_TIMEOUT,
1951                                                &pid,
1952                                                pm->msize,
1953                                                &transmit_to_peer,
1954                                                cp);
1955 }
1956
1957
1958 /**
1959  * Transmit messages by copying it to the target buffer
1960  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
1961  * for writing in the meantime.  In that case, do nothing
1962  * (the disconnect or shutdown handler will take care of the rest).
1963  * If we were able to transmit messages and there are still more
1964  * pending, ask core again for further calls to this function.
1965  *
1966  * @param cls closure, pointer to the 'struct ConnectedPeer*'
1967  * @param size number of bytes available in buf
1968  * @param buf where the callee should write the message
1969  * @return number of bytes written to buf
1970  */
1971 static size_t
1972 transmit_to_peer (void *cls,
1973                   size_t size, void *buf)
1974 {
1975   struct ConnectedPeer *cp = cls;
1976   char *cbuf = buf;
1977   struct PendingMessage *pm;
1978   struct PendingMessage *next_pm;
1979   struct GNUNET_TIME_Absolute now;
1980   struct GNUNET_TIME_Relative min_delay;
1981   struct MigrationReadyBlock *mb;
1982   struct MigrationReadyBlock *next;
1983   struct PutMessage migm;
1984   size_t msize;
1985   unsigned int i;
1986   struct GNUNET_PeerIdentity pid;
1987  
1988   cp->cth = NULL;
1989   if (NULL == buf)
1990     {
1991 #if DEBUG_FS
1992       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1993                   "Dropping message, core too busy.\n");
1994 #endif
1995       GNUNET_LOAD_update (cp->transmission_delay,
1996                           UINT64_MAX);
1997       return 0;
1998     }  
1999   GNUNET_LOAD_update (cp->transmission_delay,
2000                       GNUNET_TIME_absolute_get_duration (cp->last_transmission_request_start).value);
2001   now = GNUNET_TIME_absolute_get ();
2002   msize = 0;
2003   min_delay = GNUNET_TIME_UNIT_FOREVER_REL;
2004   next_pm = cp->pending_messages_head;
2005   while ( (NULL != (pm = next_pm) ) &&
2006           (pm->msize <= size) )
2007     {
2008       next_pm = pm->next;
2009       if (pm->delay_until.value > now.value)
2010         {
2011           min_delay = GNUNET_TIME_relative_min (min_delay,
2012                                                 GNUNET_TIME_absolute_get_remaining (pm->delay_until));
2013           continue;
2014         }
2015       memcpy (&cbuf[msize], &pm[1], pm->msize);
2016       msize += pm->msize;
2017       size -= pm->msize;
2018       if (NULL == pm->pml)
2019         {
2020           GNUNET_CONTAINER_DLL_remove (cp->pending_messages_head,
2021                                        cp->pending_messages_tail,
2022                                        pm);
2023           cp->pending_requests--;
2024         }
2025       destroy_pending_message (pm, cp->pid);
2026     }
2027   if (pm != NULL)
2028     min_delay = GNUNET_TIME_UNIT_ZERO;
2029   if (NULL != cp->pending_messages_head)
2030     {     
2031       GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == cp->delayed_transmission_request_task);
2032       cp->delayed_transmission_request_task
2033         = GNUNET_SCHEDULER_add_delayed (sched,
2034                                         min_delay,
2035                                         &delayed_transmission_request,
2036                                         cp);
2037     }
2038   if (pm == NULL)
2039     {      
2040       GNUNET_PEER_resolve (cp->pid,
2041                            &pid);
2042       next = mig_head;
2043       while (NULL != (mb = next))
2044         {
2045           next = mb->next;
2046           for (i=0;i<MIGRATION_LIST_SIZE;i++)
2047             {
2048               if ( (cp->pid == mb->target_list[i]) &&
2049                    (mb->size + sizeof (migm) <= size) )
2050                 {
2051                   GNUNET_PEER_change_rc (mb->target_list[i], -1);
2052                   mb->target_list[i] = 0;
2053                   mb->used_targets++;
2054                   memset (&migm, 0, sizeof (migm));
2055                   migm.header.size = htons (sizeof (migm) + mb->size);
2056                   migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2057                   migm.type = htonl (mb->type);
2058                   migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
2059                   memcpy (&cbuf[msize], &migm, sizeof (migm));
2060                   msize += sizeof (migm);
2061                   size -= sizeof (migm);
2062                   memcpy (&cbuf[msize], &mb[1], mb->size);
2063                   msize += mb->size;
2064                   size -= mb->size;
2065 #if DEBUG_FS
2066                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2067                               "Pushing migration block `%s' (%u bytes) to `%s'\n",
2068                               GNUNET_h2s (&mb->query),
2069                               (unsigned int) mb->size,
2070                               GNUNET_i2s (&pid));
2071 #endif    
2072                   break;
2073                 }
2074               else
2075                 {
2076 #if DEBUG_FS
2077                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2078                               "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
2079                               GNUNET_h2s (&mb->query),
2080                               (unsigned int) mb->size,
2081                               GNUNET_i2s (&pid));
2082 #endif    
2083                 }
2084             }
2085           if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
2086                (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
2087             {
2088               delete_migration_block (mb);
2089               consider_migration_gathering ();
2090             }
2091         }
2092       consider_migration (NULL, 
2093                           &pid.hashPubKey,
2094                           cp);
2095     }
2096 #if DEBUG_FS
2097   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2098               "Transmitting %u bytes to peer with PID %u\n",
2099               (unsigned int) msize,
2100               (unsigned int) cp->pid);
2101 #endif
2102   return msize;
2103 }
2104
2105
2106 /**
2107  * Add a message to the set of pending messages for the given peer.
2108  *
2109  * @param cp peer to send message to
2110  * @param pm message to queue
2111  * @param pr request on which behalf this message is being queued
2112  */
2113 static void
2114 add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
2115                                   struct PendingMessage *pm,
2116                                   struct PendingRequest *pr)
2117 {
2118   struct PendingMessage *pos;
2119   struct PendingMessageList *pml;
2120   struct GNUNET_PeerIdentity pid;
2121
2122   GNUNET_assert (pm->next == NULL);
2123   GNUNET_assert (pm->pml == NULL);    
2124   if (pr != NULL)
2125     {
2126       pml = GNUNET_malloc (sizeof (struct PendingMessageList));
2127       pml->req = pr;
2128       pml->target = cp;
2129       pml->pm = pm;
2130       pm->pml = pml;  
2131       GNUNET_CONTAINER_DLL_insert (pr->pending_head,
2132                                    pr->pending_tail,
2133                                    pml);
2134     }
2135   pos = cp->pending_messages_head;
2136   while ( (pos != NULL) &&
2137           (pm->priority < pos->priority) )
2138     pos = pos->next;    
2139   GNUNET_CONTAINER_DLL_insert_after (cp->pending_messages_head,
2140                                      cp->pending_messages_tail,
2141                                      pos,
2142                                      pm);
2143   cp->pending_requests++;
2144   if (cp->pending_requests > MAX_QUEUE_PER_PEER)
2145     destroy_pending_message (cp->pending_messages_tail, 0);  
2146   GNUNET_PEER_resolve (cp->pid, &pid);
2147   if (NULL != cp->cth)
2148     {
2149       GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
2150       cp->cth = NULL;
2151     }
2152   if (cp->delayed_transmission_request_task != GNUNET_SCHEDULER_NO_TASK)
2153     {
2154       GNUNET_SCHEDULER_cancel (sched, cp->delayed_transmission_request_task);
2155       cp->delayed_transmission_request_task = GNUNET_SCHEDULER_NO_TASK;
2156     }
2157   /* need to schedule transmission */
2158   cp->last_transmission_request_start = GNUNET_TIME_absolute_get ();
2159   cp->cth = GNUNET_CORE_notify_transmit_ready (core,
2160                                                cp->pending_messages_head->priority,
2161                                                MAX_TRANSMIT_DELAY,
2162                                                &pid,
2163                                                cp->pending_messages_head->msize,
2164                                                &transmit_to_peer,
2165                                                cp);
2166   if (cp->cth == NULL)
2167     {
2168 #if DEBUG_FS
2169       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2170                   "Failed to schedule transmission with core!\n");
2171 #endif
2172       GNUNET_STATISTICS_update (stats,
2173                                 gettext_noop ("# CORE transmission failures"),
2174                                 1,
2175                                 GNUNET_NO);
2176     }
2177 }
2178
2179
2180 /**
2181  * Test if the DATABASE (GET) load on this peer is too high
2182  * to even consider processing the query at
2183  * all.  
2184  * 
2185  * @return GNUNET_YES if the load is too high to do anything (load high)
2186  *         GNUNET_NO to process normally (load normal)
2187  *         GNUNET_SYSERR to process for free (load low)
2188  */
2189 static int
2190 test_get_load_too_high (uint32_t priority)
2191 {
2192   double ld;
2193
2194   ld = GNUNET_LOAD_get_load (datastore_get_load);
2195   if (ld < 1)
2196     {
2197       GNUNET_STATISTICS_update (stats,
2198                                 gettext_noop ("# requests done for free (low load)"),
2199                                 1,
2200                                 GNUNET_NO);
2201       return GNUNET_SYSERR;
2202     }
2203   if (ld <= priority)
2204     {
2205       GNUNET_STATISTICS_update (stats,
2206                                 gettext_noop ("# requests done for a price (normal load)"),
2207                                 1,
2208                                 GNUNET_NO);
2209       return GNUNET_NO;
2210     }
2211   GNUNET_STATISTICS_update (stats,
2212                             gettext_noop ("# requests dropped due to high load"),
2213                             1,
2214                             GNUNET_NO);
2215   return GNUNET_YES;
2216 }
2217
2218
2219
2220
2221 /**
2222  * Test if the DATABASE (PUT) load on this peer is too high
2223  * to even consider processing the query at
2224  * all.  
2225  * 
2226  * @return GNUNET_YES if the load is too high to do anything (load high)
2227  *         GNUNET_NO to process normally (load normal or low)
2228  */
2229 static int
2230 test_put_load_too_high (uint32_t priority)
2231 {
2232   double ld;
2233
2234   if (GNUNET_LOAD_get_average (datastore_put_load) < 50)
2235     return GNUNET_NO; /* very fast */
2236   ld = GNUNET_LOAD_get_load (datastore_put_load);
2237   if (ld < 2.0 * (1 + priority))
2238     return GNUNET_NO;
2239   GNUNET_STATISTICS_update (stats,
2240                             gettext_noop ("# storage requests dropped due to high load"),
2241                             1,
2242                             GNUNET_NO);
2243   return GNUNET_YES;
2244 }
2245
2246
2247 /* ******************* Pending Request Refresh Task ******************** */
2248
2249
2250
2251 /**
2252  * We use a random delay to make the timing of requests less
2253  * predictable.  This function returns such a random delay.  We add a base
2254  * delay of MAX_CORK_DELAY (1s).
2255  *
2256  * FIXME: make schedule dependent on the specifics of the request?
2257  * Or bandwidth and number of connected peers and load?
2258  *
2259  * @return random delay to use for some request, between 1s and 1000+TTL_DECREMENT ms
2260  */
2261 static struct GNUNET_TIME_Relative
2262 get_processing_delay ()
2263 {
2264   return 
2265     GNUNET_TIME_relative_add (GNUNET_CONSTANTS_MAX_CORK_DELAY,
2266                               GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
2267                                                              GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2268                                                                                        TTL_DECREMENT)));
2269 }
2270
2271
2272 /**
2273  * We're processing a GET request from another peer and have decided
2274  * to forward it to other peers.  This function is called periodically
2275  * and should forward the request to other peers until we have all
2276  * possible replies.  If we have transmitted the *only* reply to
2277  * the initiator we should destroy the pending request.  If we have
2278  * many replies in the queue to the initiator, we should delay sending
2279  * out more queries until the reply queue has shrunk some.
2280  *
2281  * @param cls our "struct ProcessGetContext *"
2282  * @param tc unused
2283  */
2284 static void
2285 forward_request_task (void *cls,
2286                       const struct GNUNET_SCHEDULER_TaskContext *tc);
2287
2288
2289 /**
2290  * Function called after we either failed or succeeded
2291  * at transmitting a query to a peer.  
2292  *
2293  * @param cls the requests "struct PendingRequest*"
2294  * @param tpid ID of receiving peer, 0 on transmission error
2295  */
2296 static void
2297 transmit_query_continuation (void *cls,
2298                              GNUNET_PEER_Id tpid)
2299 {
2300   struct PendingRequest *pr = cls;
2301
2302   GNUNET_STATISTICS_update (stats,
2303                             gettext_noop ("# queries scheduled for forwarding"),
2304                             -1,
2305                             GNUNET_NO);
2306   if (tpid == 0)   
2307     {
2308 #if DEBUG_FS
2309       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2310                   "Transmission of request failed, will try again later.\n");
2311 #endif
2312       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2313         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2314                                                  get_processing_delay (),
2315                                                  &forward_request_task,
2316                                                  pr); 
2317       return;    
2318     }
2319   GNUNET_STATISTICS_update (stats,
2320                             gettext_noop ("# queries forwarded"),
2321                             1,
2322                             GNUNET_NO);
2323   GNUNET_PEER_change_rc (tpid, 1);
2324   if (pr->used_pids_off == pr->used_pids_size)
2325     GNUNET_array_grow (pr->used_pids,
2326                        pr->used_pids_size,
2327                        pr->used_pids_size * 2 + 2);
2328   pr->used_pids[pr->used_pids_off++] = tpid;
2329   if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2330     pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2331                                              get_processing_delay (),
2332                                              &forward_request_task,
2333                                              pr);
2334 }
2335
2336
2337 /**
2338  * How many bytes should a bloomfilter be if we have already seen
2339  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
2340  * of bits set per entry.  Furthermore, we should not re-size the
2341  * filter too often (to keep it cheap).
2342  *
2343  * Since other peers will also add entries but not resize the filter,
2344  * we should generally pick a slightly larger size than what the
2345  * strict math would suggest.
2346  *
2347  * @return must be a power of two and smaller or equal to 2^15.
2348  */
2349 static size_t
2350 compute_bloomfilter_size (unsigned int entry_count)
2351 {
2352   size_t size;
2353   unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
2354   uint16_t max = 1 << 15;
2355
2356   if (entry_count > max)
2357     return max;
2358   size = 8;
2359   while ((size < max) && (size < ideal))
2360     size *= 2;
2361   if (size > max)
2362     return max;
2363   return size;
2364 }
2365
2366
2367 /**
2368  * Recalculate our bloom filter for filtering replies.  This function
2369  * will create a new bloom filter from scratch, so it should only be
2370  * called if we have no bloomfilter at all (and hence can create a
2371  * fresh one of minimal size without problems) OR if our peer is the
2372  * initiator (in which case we may resize to larger than mimimum size).
2373  *
2374  * @param pr request for which the BF is to be recomputed
2375  */
2376 static void
2377 refresh_bloomfilter (struct PendingRequest *pr)
2378 {
2379   unsigned int i;
2380   size_t nsize;
2381   GNUNET_HashCode mhash;
2382
2383   nsize = compute_bloomfilter_size (pr->replies_seen_off);
2384   if (nsize == pr->bf_size)
2385     return; /* size not changed */
2386   if (pr->bf != NULL)
2387     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
2388   pr->bf_size = nsize;
2389   pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
2390   pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
2391                                               pr->bf_size,
2392                                               BLOOMFILTER_K);
2393   for (i=0;i<pr->replies_seen_off;i++)
2394     {
2395       GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i],
2396                                 pr->mingle,
2397                                 &mhash);
2398       GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
2399     }
2400 }
2401
2402
2403 /**
2404  * Function called after we've tried to reserve a certain amount of
2405  * bandwidth for a reply.  Check if we succeeded and if so send our
2406  * query.
2407  *
2408  * @param cls the requests "struct PendingRequest*"
2409  * @param peer identifies the peer
2410  * @param bpm_in set to the current bandwidth limit (receiving) for this peer
2411  * @param bpm_out set to the current bandwidth limit (sending) for this peer
2412  * @param amount set to the amount that was actually reserved or unreserved
2413  * @param preference current traffic preference for the given peer
2414  */
2415 static void
2416 target_reservation_cb (void *cls,
2417                        const struct
2418                        GNUNET_PeerIdentity * peer,
2419                        struct GNUNET_BANDWIDTH_Value32NBO bpm_in,
2420                        struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
2421                        int amount,
2422                        uint64_t preference)
2423 {
2424   struct PendingRequest *pr = cls;
2425   struct ConnectedPeer *cp;
2426   struct PendingMessage *pm;
2427   struct GetMessage *gm;
2428   GNUNET_HashCode *ext;
2429   char *bfdata;
2430   size_t msize;
2431   unsigned int k;
2432   int no_route;
2433   uint32_t bm;
2434
2435   pr->irc = NULL;
2436   if (peer == NULL)
2437     {
2438       /* error in communication with core, try again later */
2439       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2440         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2441                                                  get_processing_delay (),
2442                                                  &forward_request_task,
2443                                                  pr);
2444       return;
2445     }
2446   // (3) transmit, update ttl/priority
2447   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2448                                           &peer->hashPubKey);
2449   if (cp == NULL)
2450     {
2451       /* Peer must have just left */
2452 #if DEBUG_FS
2453       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2454                   "Selected peer disconnected!\n");
2455 #endif
2456       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2457         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2458                                                  get_processing_delay (),
2459                                                  &forward_request_task,
2460                                                  pr);
2461       return;
2462     }
2463   no_route = GNUNET_NO;
2464   if (amount == 0)
2465     {
2466       if (pr->cp == NULL)
2467         {
2468 #if DEBUG_FS > 1
2469           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2470                       "Failed to reserve bandwidth for reply (got %d/%u bytes only)!\n",
2471                       amount,
2472                       DBLOCK_SIZE);
2473 #endif
2474           GNUNET_STATISTICS_update (stats,
2475                                     gettext_noop ("# reply bandwidth reservation requests failed"),
2476                                     1,
2477                                     GNUNET_NO);
2478           if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2479             pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2480                                                      get_processing_delay (),
2481                                                      &forward_request_task,
2482                                                      pr);
2483           return;  /* this target round failed */
2484         }
2485       no_route = GNUNET_YES;
2486     }
2487   
2488   GNUNET_STATISTICS_update (stats,
2489                             gettext_noop ("# queries scheduled for forwarding"),
2490                             1,
2491                             GNUNET_NO);
2492   /* build message and insert message into priority queue */
2493 #if DEBUG_FS
2494   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2495               "Forwarding request `%s' to `%4s'!\n",
2496               GNUNET_h2s (&pr->query),
2497               GNUNET_i2s (peer));
2498 #endif
2499   k = 0;
2500   bm = 0;
2501   if (GNUNET_YES == no_route)
2502     {
2503       bm |= GET_MESSAGE_BIT_RETURN_TO;
2504       k++;      
2505     }
2506   if (pr->namespace != NULL)
2507     {
2508       bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
2509       k++;
2510     }
2511   if (pr->target_pid != 0)
2512     {
2513       bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
2514       k++;
2515     }
2516   msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
2517   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
2518   pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
2519   pm->msize = msize;
2520   gm = (struct GetMessage*) &pm[1];
2521   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
2522   gm->header.size = htons (msize);
2523   gm->type = htonl (pr->type);
2524   pr->remaining_priority /= 2;
2525   gm->priority = htonl (pr->remaining_priority);
2526   gm->ttl = htonl (pr->ttl);
2527   gm->filter_mutator = htonl(pr->mingle); 
2528   gm->hash_bitmap = htonl (bm);
2529   gm->query = pr->query;
2530   ext = (GNUNET_HashCode*) &gm[1];
2531   k = 0;
2532   if (GNUNET_YES == no_route)
2533     GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2534   if (pr->namespace != NULL)
2535     memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
2536   if (pr->target_pid != 0)
2537     GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2538   bfdata = (char *) &ext[k];
2539   if (pr->bf != NULL)
2540     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
2541                                                bfdata,
2542                                                pr->bf_size);
2543   pm->cont = &transmit_query_continuation;
2544   pm->cont_cls = pr;
2545   add_to_pending_messages_for_peer (cp, pm, pr);
2546 }
2547
2548
2549 /**
2550  * Closure used for "target_peer_select_cb".
2551  */
2552 struct PeerSelectionContext 
2553 {
2554   /**
2555    * The request for which we are selecting
2556    * peers.
2557    */
2558   struct PendingRequest *pr;
2559
2560   /**
2561    * Current "prime" target.
2562    */
2563   struct GNUNET_PeerIdentity target;
2564
2565   /**
2566    * How much do we like this target?
2567    */
2568   double target_score;
2569
2570 };
2571
2572
2573 /**
2574  * Function called for each connected peer to determine
2575  * which one(s) would make good targets for forwarding.
2576  *
2577  * @param cls closure (struct PeerSelectionContext)
2578  * @param key current key code (peer identity)
2579  * @param value value in the hash map (struct ConnectedPeer)
2580  * @return GNUNET_YES if we should continue to
2581  *         iterate,
2582  *         GNUNET_NO if not.
2583  */
2584 static int
2585 target_peer_select_cb (void *cls,
2586                        const GNUNET_HashCode * key,
2587                        void *value)
2588 {
2589   struct PeerSelectionContext *psc = cls;
2590   struct ConnectedPeer *cp = value;
2591   struct PendingRequest *pr = psc->pr;
2592   double score;
2593   unsigned int i;
2594   unsigned int pc;
2595
2596   /* 1) check that this peer is not the initiator */
2597   if (cp == pr->cp)
2598     {
2599 #if DEBUG_FS
2600       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2601                   "Skipping initiator in forwarding selection\n");
2602 #endif
2603       return GNUNET_YES; /* skip */        
2604     }
2605
2606   /* 2) check if we have already (recently) forwarded to this peer */
2607   pc = 0;
2608   for (i=0;i<pr->used_pids_off;i++)
2609     if (pr->used_pids[i] == cp->pid) 
2610       {
2611         pc++;
2612         if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2613                                            RETRY_PROBABILITY_INV))
2614           {
2615 #if DEBUG_FS
2616             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2617                         "NOT re-trying query that was previously transmitted %u times\n",
2618                         (unsigned int) pr->used_pids_off);
2619 #endif
2620             return GNUNET_YES; /* skip */
2621           }
2622       }
2623 #if DEBUG_FS
2624   if (0 < pc)
2625     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2626                 "Re-trying query that was previously transmitted %u times to this peer\n",
2627                 (unsigned int) pc);
2628 #endif
2629   /* 3) calculate how much we'd like to forward to this peer,
2630      starting with a random value that is strong enough
2631      to at least give any peer a chance sometimes 
2632      (compared to the other factors that come later) */
2633   /* 3a) count successful (recent) routes from cp for same source */
2634   if (pr->cp != NULL)
2635     {
2636       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2637                                         P2P_SUCCESS_LIST_SIZE);
2638       for (i=0;i<P2P_SUCCESS_LIST_SIZE;i++)
2639         if (cp->last_p2p_replies[i] == pr->cp->pid)
2640           score += 1.0; /* likely successful based on hot path */
2641     }
2642   else
2643     {
2644       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2645                                         CS2P_SUCCESS_LIST_SIZE);
2646       for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
2647         if (cp->last_client_replies[i] == pr->client_request_list->client_list->client)
2648           score += 1.0; /* likely successful based on hot path */
2649     }
2650   /* 3b) include latency */
2651   if (cp->avg_delay.value < 4 * TTL_DECREMENT)
2652     score += 1.0; /* likely fast based on latency */
2653   /* 3c) include priorities */
2654   if (cp->avg_priority <= pr->remaining_priority / 2.0)
2655     score += 1.0; /* likely successful based on priorities */
2656   /* 3d) penalize for queue size */  
2657   score -= (2.0 * cp->pending_requests / (double) MAX_QUEUE_PER_PEER); 
2658   /* 3e) include peer proximity */
2659   score -= (2.0 * (GNUNET_CRYPTO_hash_distance_u32 (key,
2660                                                     &pr->query)) / (double) UINT32_MAX);
2661   /* 4) super-bonus for being the known target */
2662   if (pr->target_pid == cp->pid)
2663     score += 100.0;
2664   /* store best-fit in closure */
2665 #if DEBUG_FS
2666   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2667               "Peer `%s' gets score %f for forwarding query, max is %f\n",
2668               GNUNET_h2s (key),
2669               score,
2670               psc->target_score);
2671 #endif  
2672   score++; /* avoid zero */
2673   if (score > psc->target_score)
2674     {
2675       psc->target_score = score;
2676       psc->target.hashPubKey = *key; 
2677     }
2678   return GNUNET_YES;
2679 }
2680   
2681
2682 /**
2683  * The priority level imposes a bound on the maximum
2684  * value for the ttl that can be requested.
2685  *
2686  * @param ttl_in requested ttl
2687  * @param prio given priority
2688  * @return ttl_in if ttl_in is below the limit,
2689  *         otherwise the ttl-limit for the given priority
2690  */
2691 static int32_t
2692 bound_ttl (int32_t ttl_in, uint32_t prio)
2693 {
2694   unsigned long long allowed;
2695
2696   if (ttl_in <= 0)
2697     return ttl_in;
2698   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
2699   if (ttl_in > allowed)      
2700     {
2701       if (allowed >= (1 << 30))
2702         return 1 << 30;
2703       return allowed;
2704     }
2705   return ttl_in;
2706 }
2707
2708
2709 /**
2710  * Iterator called on each result obtained for a DHT
2711  * operation that expects a reply
2712  *
2713  * @param cls closure
2714  * @param exp when will this value expire
2715  * @param key key of the result
2716  * @param get_path NULL-terminated array of pointers
2717  *                 to the peers on reverse GET path (or NULL if not recorded)
2718  * @param put_path NULL-terminated array of pointers
2719  *                 to the peers on the PUT path (or NULL if not recorded)
2720  * @param type type of the result
2721  * @param size number of bytes in data
2722  * @param data pointer to the result data
2723  */
2724 static void
2725 process_dht_reply (void *cls,
2726                    struct GNUNET_TIME_Absolute exp,
2727                    const GNUNET_HashCode * key,
2728                    const struct GNUNET_PeerIdentity * const *get_path,
2729                    const struct GNUNET_PeerIdentity * const *put_path,
2730                    enum GNUNET_BLOCK_Type type,
2731                    size_t size,
2732                    const void *data);
2733
2734
2735 /**
2736  * We're processing a GET request and have decided
2737  * to forward it to other peers.  This function is called periodically
2738  * and should forward the request to other peers until we have all
2739  * possible replies.  If we have transmitted the *only* reply to
2740  * the initiator we should destroy the pending request.  If we have
2741  * many replies in the queue to the initiator, we should delay sending
2742  * out more queries until the reply queue has shrunk some.
2743  *
2744  * @param cls our "struct ProcessGetContext *"
2745  * @param tc unused
2746  */
2747 static void
2748 forward_request_task (void *cls,
2749                      const struct GNUNET_SCHEDULER_TaskContext *tc)
2750 {
2751   struct PendingRequest *pr = cls;
2752   struct PeerSelectionContext psc;
2753   struct ConnectedPeer *cp; 
2754   struct GNUNET_TIME_Relative delay;
2755
2756   pr->task = GNUNET_SCHEDULER_NO_TASK;
2757   if (pr->irc != NULL)
2758     {
2759 #if DEBUG_FS
2760       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2761                   "Forwarding of query `%s' not attempted due to pending local lookup!\n",
2762                   GNUNET_h2s (&pr->query));
2763 #endif
2764       return; /* already pending */
2765     }
2766   if (GNUNET_YES == pr->local_only)
2767     return; /* configured to not do P2P search */
2768   /* (0) try DHT */
2769   if ( (0 == pr->anonymity_level) &&
2770        (GNUNET_YES != pr->forward_only) &&
2771        (pr->type != GNUNET_BLOCK_TYPE_FS_DBLOCK) &&
2772        (pr->type != GNUNET_BLOCK_TYPE_FS_IBLOCK) )
2773     {
2774       pr->dht_get = GNUNET_DHT_get_start (dht_handle,
2775                                           GNUNET_TIME_UNIT_FOREVER_REL,
2776                                           pr->type,
2777                                           &pr->query,
2778                                           GNUNET_DHT_RO_NONE,
2779                                           pr->bf,
2780                                           pr->mingle,
2781                                           pr->namespace,
2782                                           (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
2783                                           &process_dht_reply,
2784                                           pr);
2785     }
2786   /* (1) select target */
2787   psc.pr = pr;
2788   psc.target_score = -DBL_MAX;
2789   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
2790                                          &target_peer_select_cb,
2791                                          &psc);  
2792   if (psc.target_score == -DBL_MAX)
2793     {
2794       delay = get_processing_delay ();
2795 #if DEBUG_FS 
2796       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2797                   "No peer selected for forwarding of query `%s', will try again in %llu ms!\n",
2798                   GNUNET_h2s (&pr->query),
2799                   delay.value);
2800 #endif
2801       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2802                                                delay,
2803                                                &forward_request_task,
2804                                                pr);
2805       return; /* nobody selected */
2806     }
2807   /* (3) update TTL/priority */
2808   if (pr->client_request_list != NULL)
2809     {
2810       /* FIXME: use better algorithm!? */
2811       if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2812                                          4))
2813         pr->priority++;
2814       /* bound priority we use by priorities we see from other peers
2815          rounded up (must round up so that we can see non-zero
2816          priorities, but round up as little as possible to make it
2817          plausible that we forwarded another peers request) */
2818       if (pr->priority > current_priorities + 1.0)
2819         pr->priority = (uint32_t) current_priorities + 1.0;
2820       pr->ttl = bound_ttl (pr->ttl + TTL_DECREMENT * 2,
2821                            pr->priority);
2822 #if DEBUG_FS
2823       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2824                   "Trying query `%s' with priority %u and TTL %d.\n",
2825                   GNUNET_h2s (&pr->query),
2826                   pr->priority,
2827                   pr->ttl);
2828 #endif
2829     }
2830
2831   /* (3) reserve reply bandwidth */
2832   if (GNUNET_NO == pr->forward_only)
2833     {
2834       cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2835                                               &psc.target.hashPubKey);
2836       GNUNET_assert (NULL != cp);
2837       pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
2838                                                     &psc.target,
2839                                                     GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
2840                                                     GNUNET_BANDWIDTH_value_init (UINT32_MAX),
2841                                                     DBLOCK_SIZE * 2, 
2842                                                     cp->inc_preference,
2843                                                     &target_reservation_cb,
2844                                                     pr);
2845       cp->inc_preference = 0;
2846     }
2847   else
2848     {
2849       /* force forwarding */
2850       static struct GNUNET_BANDWIDTH_Value32NBO zerobw;
2851       target_reservation_cb (pr, &psc.target,
2852                              zerobw, zerobw, 0, 0.0);
2853     }
2854 }
2855
2856
2857 /* **************************** P2P PUT Handling ************************ */
2858
2859
2860 /**
2861  * Function called after we either failed or succeeded
2862  * at transmitting a reply to a peer.  
2863  *
2864  * @param cls the requests "struct PendingRequest*"
2865  * @param tpid ID of receiving peer, 0 on transmission error
2866  */
2867 static void
2868 transmit_reply_continuation (void *cls,
2869                              GNUNET_PEER_Id tpid)
2870 {
2871   struct PendingRequest *pr = cls;
2872   
2873   switch (pr->type)
2874     {
2875     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
2876     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
2877       /* only one reply expected, done with the request! */
2878       destroy_pending_request (pr);
2879       break;
2880     case GNUNET_BLOCK_TYPE_ANY:
2881     case GNUNET_BLOCK_TYPE_FS_KBLOCK:
2882     case GNUNET_BLOCK_TYPE_FS_SBLOCK:
2883       break;
2884     default:
2885       GNUNET_break (0);
2886       break;
2887     }
2888 }
2889
2890
2891 /**
2892  * Transmit the given message by copying it to the target buffer
2893  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
2894  * for writing in the meantime.  In that case, do nothing
2895  * (the disconnect or shutdown handler will take care of the rest).
2896  * If we were able to transmit messages and there are still more
2897  * pending, ask core again for further calls to this function.
2898  *
2899  * @param cls closure, pointer to the 'struct ClientList*'
2900  * @param size number of bytes available in buf
2901  * @param buf where the callee should write the message
2902  * @return number of bytes written to buf
2903  */
2904 static size_t
2905 transmit_to_client (void *cls,
2906                   size_t size, void *buf)
2907 {
2908   struct ClientList *cl = cls;
2909   char *cbuf = buf;
2910   struct ClientResponseMessage *creply;
2911   size_t msize;
2912   
2913   cl->th = NULL;
2914   if (NULL == buf)
2915     {
2916 #if DEBUG_FS
2917       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2918                   "Not sending reply, client communication problem.\n");
2919 #endif
2920       return 0;
2921     }
2922   msize = 0;
2923   while ( (NULL != (creply = cl->res_head) ) &&
2924           (creply->msize <= size) )
2925     {
2926       memcpy (&cbuf[msize], &creply[1], creply->msize);
2927       msize += creply->msize;
2928       size -= creply->msize;
2929       GNUNET_CONTAINER_DLL_remove (cl->res_head,
2930                                    cl->res_tail,
2931                                    creply);
2932       GNUNET_free (creply);
2933     }
2934   if (NULL != creply)
2935     cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
2936                                                   creply->msize,
2937                                                   GNUNET_TIME_UNIT_FOREVER_REL,
2938                                                   &transmit_to_client,
2939                                                   cl);
2940 #if DEBUG_FS
2941   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2942               "Transmitted %u bytes to client\n",
2943               (unsigned int) msize);
2944 #endif
2945   return msize;
2946 }
2947
2948
2949 /**
2950  * Closure for "process_reply" function.
2951  */
2952 struct ProcessReplyClosure
2953 {
2954   /**
2955    * The data for the reply.
2956    */
2957   const void *data;
2958
2959   /**
2960    * Who gave us this reply? NULL for local host (or DHT)
2961    */
2962   struct ConnectedPeer *sender;
2963
2964   /**
2965    * When the reply expires.
2966    */
2967   struct GNUNET_TIME_Absolute expiration;
2968
2969   /**
2970    * Size of data.
2971    */
2972   size_t size;
2973
2974   /**
2975    * Type of the block.
2976    */
2977   enum GNUNET_BLOCK_Type type;
2978
2979   /**
2980    * How much was this reply worth to us?
2981    */
2982   uint32_t priority;
2983
2984   /**
2985    * Evaluation result (returned).
2986    */
2987   enum GNUNET_BLOCK_EvaluationResult eval;
2988
2989   /**
2990    * Did we finish processing the associated request?
2991    */ 
2992   int finished;
2993
2994   /**
2995    * Did we find a matching request?
2996    */
2997   int request_found;
2998 };
2999
3000
3001 /**
3002  * We have received a reply; handle it!
3003  *
3004  * @param cls response (struct ProcessReplyClosure)
3005  * @param key our query
3006  * @param value value in the hash map (info about the query)
3007  * @return GNUNET_YES (we should continue to iterate)
3008  */
3009 static int
3010 process_reply (void *cls,
3011                const GNUNET_HashCode * key,
3012                void *value)
3013 {
3014   struct ProcessReplyClosure *prq = cls;
3015   struct PendingRequest *pr = value;
3016   struct PendingMessage *reply;
3017   struct ClientResponseMessage *creply;
3018   struct ClientList *cl;
3019   struct PutMessage *pm;
3020   struct ConnectedPeer *cp;
3021   struct GNUNET_TIME_Relative cur_delay;
3022 #if SUPPORT_DELAYS  
3023 struct GNUNET_TIME_Relative art_delay;
3024 #endif
3025   size_t msize;
3026
3027 #if DEBUG_FS
3028   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3029               "Matched result (type %u) for query `%s' with pending request\n",
3030               (unsigned int) prq->type,
3031               GNUNET_h2s (key));
3032 #endif  
3033   GNUNET_STATISTICS_update (stats,
3034                             gettext_noop ("# replies received and matched"),
3035                             1,
3036                             GNUNET_NO);
3037   if (prq->sender != NULL)
3038     {
3039       cur_delay = GNUNET_TIME_absolute_get_duration (pr->start_time);
3040       prq->sender->avg_delay.value
3041         = (prq->sender->avg_delay.value * 
3042            (RUNAVG_DELAY_N - 1) + cur_delay.value) / RUNAVG_DELAY_N; 
3043       prq->sender->avg_priority
3044         = (prq->sender->avg_priority * 
3045            (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
3046       if (pr->cp != NULL)
3047         {
3048           GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
3049                                  [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], 
3050                                  -1);
3051           GNUNET_PEER_change_rc (pr->cp->pid, 1);
3052           prq->sender->last_p2p_replies
3053             [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
3054             = pr->cp->pid;
3055         }
3056       else
3057         {
3058           if (NULL != prq->sender->last_client_replies
3059               [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
3060             GNUNET_SERVER_client_drop (prq->sender->last_client_replies
3061                                        [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
3062           prq->sender->last_client_replies
3063             [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
3064             = pr->client_request_list->client_list->client;
3065           GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
3066         }
3067     }
3068   prq->eval = GNUNET_BLOCK_evaluate (block_ctx,
3069                                      prq->type,
3070                                      key,
3071                                      &pr->bf,
3072                                      pr->mingle,
3073                                      pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
3074                                      prq->data,
3075                                      prq->size);
3076   switch (prq->eval)
3077     {
3078     case GNUNET_BLOCK_EVALUATION_OK_MORE:
3079       break;
3080     case GNUNET_BLOCK_EVALUATION_OK_LAST:
3081       while (NULL != pr->pending_head)
3082         destroy_pending_message_list_entry (pr->pending_head);
3083       if (pr->qe != NULL)
3084         {
3085           if (pr->client_request_list != NULL)
3086             GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
3087                                         GNUNET_YES);
3088           GNUNET_DATASTORE_cancel (pr->qe);
3089           pr->qe = NULL;
3090         }
3091       pr->do_remove = GNUNET_YES;
3092       if (pr->task != GNUNET_SCHEDULER_NO_TASK)
3093         {
3094           GNUNET_SCHEDULER_cancel (sched,
3095                                    pr->task);
3096           pr->task = GNUNET_SCHEDULER_NO_TASK;
3097         }
3098       GNUNET_break (GNUNET_YES ==
3099                     GNUNET_CONTAINER_multihashmap_remove (query_request_map,
3100                                                           key,
3101                                                           pr));
3102       GNUNET_LOAD_update (rt_entry_lifetime,
3103                           GNUNET_TIME_absolute_get_duration (pr->start_time).value);
3104       break;
3105     case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
3106       GNUNET_STATISTICS_update (stats,
3107                                 gettext_noop ("# duplicate replies discarded (bloomfilter)"),
3108                                 1,
3109                                 GNUNET_NO);
3110 #if DEBUG_FS
3111       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3112                   "Duplicate response `%s', discarding.\n",
3113                   GNUNET_h2s (&mhash));
3114 #endif
3115       return GNUNET_YES; /* duplicate */
3116     case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
3117       return GNUNET_YES; /* wrong namespace */  
3118     case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
3119       GNUNET_break (0);
3120       return GNUNET_YES;
3121     case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
3122       GNUNET_break (0);
3123       return GNUNET_YES;
3124     case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
3125       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3126                   _("Unsupported block type %u\n"),
3127                   prq->type);
3128       return GNUNET_NO;
3129     }
3130   if (pr->client_request_list != NULL)
3131     {
3132       if (pr->replies_seen_size == pr->replies_seen_off)
3133         GNUNET_array_grow (pr->replies_seen,
3134                            pr->replies_seen_size,
3135                            pr->replies_seen_size * 2 + 4);      
3136       GNUNET_CRYPTO_hash (prq->data,
3137                           prq->size,
3138                           &pr->replies_seen[pr->replies_seen_off++]);         
3139       refresh_bloomfilter (pr);
3140     }
3141   if (NULL == prq->sender)
3142     {
3143 #if DEBUG_FS
3144       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3145                   "Found result for query `%s' in local datastore\n",
3146                   GNUNET_h2s (key));
3147 #endif
3148       GNUNET_STATISTICS_update (stats,
3149                                 gettext_noop ("# results found locally"),
3150                                 1,
3151                                 GNUNET_NO);      
3152     }
3153   prq->priority += pr->remaining_priority;
3154   pr->remaining_priority = 0;
3155   pr->results_found++;
3156   prq->request_found = GNUNET_YES;
3157   if (NULL != pr->client_request_list)
3158     {
3159       GNUNET_STATISTICS_update (stats,
3160                                 gettext_noop ("# replies received for local clients"),
3161                                 1,
3162                                 GNUNET_NO);
3163       cl = pr->client_request_list->client_list;
3164       msize = sizeof (struct PutMessage) + prq->size;
3165       creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage));
3166       creply->msize = msize;
3167       creply->client_list = cl;
3168       GNUNET_CONTAINER_DLL_insert_after (cl->res_head,
3169                                          cl->res_tail,
3170                                          cl->res_tail,
3171                                          creply);      
3172       pm = (struct PutMessage*) &creply[1];
3173       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3174       pm->header.size = htons (msize);
3175       pm->type = htonl (prq->type);
3176       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3177       memcpy (&pm[1], prq->data, prq->size);      
3178       if (NULL == cl->th)
3179         {
3180 #if DEBUG_FS
3181           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3182                       "Transmitting result for query `%s' to client\n",
3183                       GNUNET_h2s (key));
3184 #endif  
3185           cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
3186                                                         msize,
3187                                                         GNUNET_TIME_UNIT_FOREVER_REL,
3188                                                         &transmit_to_client,
3189                                                         cl);
3190         }
3191       GNUNET_break (cl->th != NULL);
3192       if (pr->do_remove)                
3193         {
3194           prq->finished = GNUNET_YES;
3195           destroy_pending_request (pr);         
3196         }
3197     }
3198   else
3199     {
3200       cp = pr->cp;
3201 #if DEBUG_FS
3202       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3203                   "Transmitting result for query `%s' to other peer (PID=%u)\n",
3204                   GNUNET_h2s (key),
3205                   (unsigned int) cp->pid);
3206 #endif  
3207       GNUNET_STATISTICS_update (stats,
3208                                 gettext_noop ("# replies received for other peers"),
3209                                 1,
3210                                 GNUNET_NO);
3211       msize = sizeof (struct PutMessage) + prq->size;
3212       reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
3213       reply->cont = &transmit_reply_continuation;
3214       reply->cont_cls = pr;
3215 #if SUPPORT_DELAYS
3216       art_delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
3217                                                  GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3218                                                                            TTL_DECREMENT));
3219       reply->delay_until 
3220         = GNUNET_TIME_relative_to_absolute (art_delay);
3221       GNUNET_STATISTICS_update (stats,
3222                                 gettext_noop ("cummulative artificial delay introduced (ms)"),
3223                                 art_delay.value,
3224                                 GNUNET_NO);
3225 #endif
3226       reply->msize = msize;
3227       reply->priority = UINT32_MAX; /* send replies first! */
3228       pm = (struct PutMessage*) &reply[1];
3229       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3230       pm->header.size = htons (msize);
3231       pm->type = htonl (prq->type);
3232       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3233       memcpy (&pm[1], prq->data, prq->size);
3234       add_to_pending_messages_for_peer (cp, reply, pr);
3235     }
3236   return GNUNET_YES;
3237 }
3238
3239
3240 /**
3241  * Iterator called on each result obtained for a DHT
3242  * operation that expects a reply
3243  *
3244  * @param cls closure
3245  * @param exp when will this value expire
3246  * @param key key of the result
3247  * @param get_path NULL-terminated array of pointers
3248  *                 to the peers on reverse GET path (or NULL if not recorded)
3249  * @param put_path NULL-terminated array of pointers
3250  *                 to the peers on the PUT path (or NULL if not recorded)
3251  * @param type type of the result
3252  * @param size number of bytes in data
3253  * @param data pointer to the result data
3254  */
3255 static void
3256 process_dht_reply (void *cls,
3257                    struct GNUNET_TIME_Absolute exp,
3258                    const GNUNET_HashCode * key,
3259                    const struct GNUNET_PeerIdentity * const *get_path,
3260                    const struct GNUNET_PeerIdentity * const *put_path,
3261                    enum GNUNET_BLOCK_Type type,
3262                    size_t size,
3263                    const void *data)
3264 {
3265   struct PendingRequest *pr = cls;
3266   struct ProcessReplyClosure prq;
3267
3268   memset (&prq, 0, sizeof (prq));
3269   prq.data = data;
3270   prq.expiration = exp;
3271   prq.size = size;  
3272   prq.type = type;
3273   process_reply (&prq, key, pr);
3274 }
3275
3276
3277
3278 /**
3279  * Continuation called to notify client about result of the
3280  * operation.
3281  *
3282  * @param cls closure
3283  * @param success GNUNET_SYSERR on failure
3284  * @param msg NULL on success, otherwise an error message
3285  */
3286 static void 
3287 put_migration_continuation (void *cls,
3288                             int success,
3289                             const char *msg)
3290 {
3291   struct GNUNET_TIME_Absolute *start = cls;
3292   struct GNUNET_TIME_Relative delay;
3293   
3294   delay = GNUNET_TIME_absolute_get_duration (*start);
3295   GNUNET_free (start);
3296   GNUNET_LOAD_update (datastore_put_load,
3297                       delay.value);
3298   if (GNUNET_OK == success)
3299     return;
3300   GNUNET_STATISTICS_update (stats,
3301                             gettext_noop ("# datastore 'put' failures"),
3302                             1,
3303                             GNUNET_NO);
3304 }
3305
3306
3307 /**
3308  * Handle P2P "PUT" message.
3309  *
3310  * @param cls closure, always NULL
3311  * @param other the other peer involved (sender or receiver, NULL
3312  *        for loopback messages where we are both sender and receiver)
3313  * @param message the actual message
3314  * @param latency reported latency of the connection with 'other'
3315  * @param distance reported distance (DV) to 'other' 
3316  * @return GNUNET_OK to keep the connection open,
3317  *         GNUNET_SYSERR to close it (signal serious error)
3318  */
3319 static int
3320 handle_p2p_put (void *cls,
3321                 const struct GNUNET_PeerIdentity *other,
3322                 const struct GNUNET_MessageHeader *message,
3323                 struct GNUNET_TIME_Relative latency,
3324                 uint32_t distance)
3325 {
3326   const struct PutMessage *put;
3327   uint16_t msize;
3328   size_t dsize;
3329   enum GNUNET_BLOCK_Type type;
3330   struct GNUNET_TIME_Absolute expiration;
3331   GNUNET_HashCode query;
3332   struct ProcessReplyClosure prq;
3333   struct GNUNET_TIME_Absolute *start;
3334   struct GNUNET_TIME_Relative block_time;  
3335   double putl;
3336   struct ConnectedPeer *cp; 
3337   struct PendingMessage *pm;
3338   struct MigrationStopMessage *msm;
3339
3340   msize = ntohs (message->size);
3341   if (msize < sizeof (struct PutMessage))
3342     {
3343       GNUNET_break_op(0);
3344       return GNUNET_SYSERR;
3345     }
3346   put = (const struct PutMessage*) message;
3347   dsize = msize - sizeof (struct PutMessage);
3348   type = ntohl (put->type);
3349   expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
3350
3351   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3352     return GNUNET_SYSERR;
3353   if (GNUNET_OK !=
3354       GNUNET_BLOCK_get_key (block_ctx,
3355                             type,
3356                             &put[1],
3357                             dsize,
3358                             &query))
3359     {
3360       GNUNET_break_op (0);
3361       return GNUNET_SYSERR;
3362     }
3363 #if DEBUG_FS
3364   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3365               "Received result for query `%s' from peer `%4s'\n",
3366               GNUNET_h2s (&query),
3367               GNUNET_i2s (other));
3368 #endif
3369   GNUNET_STATISTICS_update (stats,
3370                             gettext_noop ("# replies received (overall)"),
3371                             1,
3372                             GNUNET_NO);
3373   /* now, lookup 'query' */
3374   prq.data = (const void*) &put[1];
3375   if (other != NULL)
3376     prq.sender = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3377                                                     &other->hashPubKey);
3378   else
3379     prq.sender = NULL;
3380   prq.size = dsize;
3381   prq.type = type;
3382   prq.expiration = expiration;
3383   prq.priority = 0;
3384   prq.finished = GNUNET_NO;
3385   prq.request_found = GNUNET_NO;
3386   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3387                                               &query,
3388                                               &process_reply,
3389                                               &prq);
3390   if (prq.sender != NULL)
3391     {
3392       prq.sender->inc_preference += CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority;
3393       prq.sender->trust += prq.priority;
3394     }
3395   if ( (GNUNET_YES == active_migration) &&
3396        (GNUNET_NO == test_put_load_too_high (prq.priority)) )
3397     {      
3398 #if DEBUG_FS
3399       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3400                   "Replicating result for query `%s' with priority %u\n",
3401                   GNUNET_h2s (&query),
3402                   prq.priority);
3403 #endif
3404       start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
3405       *start = GNUNET_TIME_absolute_get ();
3406       GNUNET_DATASTORE_put (dsh,
3407                             0, &query, dsize, &put[1],
3408                             type, prq.priority, 1 /* anonymity */, 
3409                             expiration, 
3410                             1 + prq.priority, MAX_DATASTORE_QUEUE,
3411                             GNUNET_CONSTANTS_SERVICE_TIMEOUT,
3412                             &put_migration_continuation, 
3413                             start);
3414     }
3415   putl = GNUNET_LOAD_get_load (datastore_put_load);
3416   if ( (GNUNET_NO == prq.request_found) &&
3417        ( (GNUNET_YES != active_migration) ||
3418          (putl > 2.5 * (1 + prq.priority)) ) )
3419     {
3420       cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3421                                               &other->hashPubKey);
3422       if (GNUNET_TIME_absolute_get_duration (cp->last_migration_block).value < 5000)
3423         return GNUNET_OK; /* already blocked */
3424       /* We're too busy; send MigrationStop message! */
3425       if (GNUNET_YES != active_migration) 
3426         putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
3427       block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
3428                                                   5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3429                                                                                    (unsigned int) (60000 * putl * putl)));
3430       
3431       cp->last_migration_block = GNUNET_TIME_relative_to_absolute (block_time);
3432       pm = GNUNET_malloc (sizeof (struct PendingMessage) + 
3433                           sizeof (struct MigrationStopMessage));
3434       pm->msize = sizeof (struct MigrationStopMessage);
3435       pm->priority = UINT32_MAX;
3436       msm = (struct MigrationStopMessage*) &pm[1];
3437       msm->header.size = htons (sizeof (struct MigrationStopMessage));
3438       msm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
3439       msm->duration = GNUNET_TIME_relative_hton (block_time);
3440       add_to_pending_messages_for_peer (cp,
3441                                         pm,
3442                                         NULL);
3443     }
3444   return GNUNET_OK;
3445 }
3446
3447
3448 /**
3449  * Handle P2P "MIGRATION_STOP" message.
3450  *
3451  * @param cls closure, always NULL
3452  * @param other the other peer involved (sender or receiver, NULL
3453  *        for loopback messages where we are both sender and receiver)
3454  * @param message the actual message
3455  * @param latency reported latency of the connection with 'other'
3456  * @param distance reported distance (DV) to 'other' 
3457  * @return GNUNET_OK to keep the connection open,
3458  *         GNUNET_SYSERR to close it (signal serious error)
3459  */
3460 static int
3461 handle_p2p_migration_stop (void *cls,
3462                            const struct GNUNET_PeerIdentity *other,
3463                            const struct GNUNET_MessageHeader *message,
3464                            struct GNUNET_TIME_Relative latency,
3465                            uint32_t distance)
3466 {
3467   struct ConnectedPeer *cp; 
3468   const struct MigrationStopMessage *msm;
3469
3470   msm = (const struct MigrationStopMessage*) message;
3471   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3472                                           &other->hashPubKey);
3473   if (cp == NULL)
3474     {
3475       GNUNET_break (0);
3476       return GNUNET_OK;
3477     }
3478   cp->migration_blocked = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (msm->duration));
3479   return GNUNET_OK;
3480 }
3481
3482
3483
3484 /* **************************** P2P GET Handling ************************ */
3485
3486
3487 /**
3488  * Closure for 'check_duplicate_request_{peer,client}'.
3489  */
3490 struct CheckDuplicateRequestClosure
3491 {
3492   /**
3493    * The new request we should check if it already exists.
3494    */
3495   const struct PendingRequest *pr;
3496
3497   /**
3498    * Existing request found by the checker, NULL if none.
3499    */
3500   struct PendingRequest *have;
3501 };
3502
3503
3504 /**
3505  * Iterator over entries in the 'query_request_map' that
3506  * tries to see if we have the same request pending from
3507  * the same client already.
3508  *
3509  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
3510  * @param key current key code (query, ignored, must match)
3511  * @param value value in the hash map (a 'struct PendingRequest' 
3512  *              that already exists)
3513  * @return GNUNET_YES if we should continue to
3514  *         iterate (no match yet)
3515  *         GNUNET_NO if not (match found).
3516  */
3517 static int
3518 check_duplicate_request_client (void *cls,
3519                                 const GNUNET_HashCode * key,
3520                                 void *value)
3521 {
3522   struct CheckDuplicateRequestClosure *cdc = cls;
3523   struct PendingRequest *have = value;
3524
3525   if (have->client_request_list == NULL)
3526     return GNUNET_YES;
3527   if ( (cdc->pr->client_request_list->client_list->client == have->client_request_list->client_list->client) &&
3528        (cdc->pr != have) )
3529     {
3530       cdc->have = have;
3531       return GNUNET_NO;
3532     }
3533   return GNUNET_YES;
3534 }
3535
3536
3537 /**
3538  * We're processing (local) results for a search request
3539  * from another peer.  Pass applicable results to the
3540  * peer and if we are done either clean up (operation
3541  * complete) or forward to other peers (more results possible).
3542  *
3543  * @param cls our closure (struct LocalGetContext)
3544  * @param key key for the content
3545  * @param size number of bytes in data
3546  * @param data content stored
3547  * @param type type of the content
3548  * @param priority priority of the content
3549  * @param anonymity anonymity-level for the content
3550  * @param expiration expiration time for the content
3551  * @param uid unique identifier for the datum;
3552  *        maybe 0 if no unique identifier is available
3553  */
3554 static void
3555 process_local_reply (void *cls,
3556                      const GNUNET_HashCode * key,
3557                      size_t size,
3558                      const void *data,
3559                      enum GNUNET_BLOCK_Type type,
3560                      uint32_t priority,
3561                      uint32_t anonymity,
3562                      struct GNUNET_TIME_Absolute
3563                      expiration, 
3564                      uint64_t uid)
3565 {
3566   struct PendingRequest *pr = cls;
3567   struct ProcessReplyClosure prq;
3568   struct CheckDuplicateRequestClosure cdrc;
3569   GNUNET_HashCode query;
3570   unsigned int old_rf;
3571   
3572   if (NULL == key)
3573     {
3574 #if DEBUG_FS > 1
3575       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3576                   "Done processing local replies, forwarding request to other peers.\n");
3577 #endif
3578       pr->qe = NULL;
3579       if (pr->client_request_list != NULL)
3580         {
3581           GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
3582                                       GNUNET_YES);
3583           /* Figure out if this is a duplicate request and possibly
3584              merge 'struct PendingRequest' entries */
3585           cdrc.have = NULL;
3586           cdrc.pr = pr;
3587           GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3588                                                       &pr->query,
3589                                                       &check_duplicate_request_client,
3590                                                       &cdrc);
3591           if (cdrc.have != NULL)
3592             {
3593 #if DEBUG_FS
3594               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3595                           "Received request for block `%s' twice from client, will only request once.\n",
3596                           GNUNET_h2s (&pr->query));
3597 #endif
3598               
3599               destroy_pending_request (pr);
3600               return;
3601             }
3602         }
3603
3604       /* no more results */
3605       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
3606         pr->task = GNUNET_SCHEDULER_add_now (sched,
3607                                              &forward_request_task,
3608                                              pr);      
3609       return;
3610     }
3611 #if DEBUG_FS
3612   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3613               "New local response to `%s' of type %u.\n",
3614               GNUNET_h2s (key),
3615               type);
3616 #endif
3617   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
3618     {
3619 #if DEBUG_FS
3620       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3621                   "Found ONDEMAND block, performing on-demand encoding\n");
3622 #endif
3623       GNUNET_STATISTICS_update (stats,
3624                                 gettext_noop ("# on-demand blocks matched requests"),
3625                                 1,
3626                                 GNUNET_NO);
3627       if (GNUNET_OK != 
3628           GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, 
3629                                             anonymity, expiration, uid, 
3630                                             &process_local_reply,
3631                                             pr))
3632       if (pr->qe != NULL)
3633         {
3634           GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3635         }
3636       return;
3637     }
3638   old_rf = pr->results_found;
3639   memset (&prq, 0, sizeof (prq));
3640   prq.data = data;
3641   prq.expiration = expiration;
3642   prq.size = size;  
3643   if (GNUNET_OK != 
3644       GNUNET_BLOCK_get_key (block_ctx,
3645                             type,
3646                             data,
3647                             size,
3648                             &query))
3649     {
3650       GNUNET_break (0);
3651       GNUNET_DATASTORE_remove (dsh,
3652                                key,
3653                                size, data,
3654                                -1, -1, 
3655                                GNUNET_TIME_UNIT_FOREVER_REL,
3656                                NULL, NULL);
3657       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3658       return;
3659     }
3660   prq.type = type;
3661   prq.priority = priority;  
3662   prq.finished = GNUNET_NO;
3663   prq.request_found = GNUNET_NO;
3664   if ( (old_rf == 0) &&
3665        (pr->results_found == 0) )
3666     update_datastore_delays (pr->start_time);
3667   process_reply (&prq, key, pr);
3668   if (prq.finished == GNUNET_YES)
3669     return;
3670   if (pr->qe == NULL)
3671     return; /* done here */
3672   if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
3673     {
3674       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3675       return;
3676     }
3677   if ( (pr->client_request_list == NULL) &&
3678        ( (GNUNET_YES == test_get_load_too_high (0)) ||
3679          (pr->results_found > 5 + 2 * pr->priority) ) )
3680     {
3681 #if DEBUG_FS > 2
3682       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3683                   "Load too high, done with request\n");
3684 #endif
3685       GNUNET_STATISTICS_update (stats,
3686                                 gettext_noop ("# processing result set cut short due to load"),
3687                                 1,
3688                                 GNUNET_NO);
3689       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3690       return;
3691     }
3692   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3693 }
3694
3695
3696 /**
3697  * We've received a request with the specified priority.  Bound it
3698  * according to how much we trust the given peer.
3699  * 
3700  * @param prio_in requested priority
3701  * @param cp the peer making the request
3702  * @return effective priority
3703  */
3704 static int32_t
3705 bound_priority (uint32_t prio_in,
3706                 struct ConnectedPeer *cp)
3707 {
3708 #define N ((double)128.0)
3709   uint32_t ret;
3710   double rret;
3711   int ld;
3712
3713   ld = test_get_load_too_high (0);
3714   if (ld == GNUNET_SYSERR)
3715     return 0; /* excess resources */
3716   ret = change_host_trust (cp, prio_in);
3717   if (ret > 0)
3718     {
3719       if (ret > current_priorities + N)
3720         rret = current_priorities + N;
3721       else
3722         rret = ret;
3723       current_priorities 
3724         = (current_priorities * (N-1) + rret)/N;
3725     }
3726   if ( (ld == GNUNET_YES) && (ret > 0) )
3727     {
3728       /* try with charging */
3729       ld = test_get_load_too_high (ret);
3730     }
3731   if (ld == GNUNET_YES)
3732     {
3733       /* undo charge */
3734       if (ret != 0)
3735         change_host_trust (cp, -ret);
3736       return -1; /* not enough resources */
3737     }
3738 #undef N
3739   return ret;
3740 }
3741
3742
3743 /**
3744  * Iterator over entries in the 'query_request_map' that
3745  * tries to see if we have the same request pending from
3746  * the same peer already.
3747  *
3748  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
3749  * @param key current key code (query, ignored, must match)
3750  * @param value value in the hash map (a 'struct PendingRequest' 
3751  *              that already exists)
3752  * @return GNUNET_YES if we should continue to
3753  *         iterate (no match yet)
3754  *         GNUNET_NO if not (match found).
3755  */
3756 static int
3757 check_duplicate_request_peer (void *cls,
3758                               const GNUNET_HashCode * key,
3759                               void *value)
3760 {
3761   struct CheckDuplicateRequestClosure *cdc = cls;
3762   struct PendingRequest *have = value;
3763
3764   if (cdc->pr->target_pid == have->target_pid)
3765     {
3766       cdc->have = have;
3767       return GNUNET_NO;
3768     }
3769   return GNUNET_YES;
3770 }
3771
3772
3773 /**
3774  * Handle P2P "GET" request.
3775  *
3776  * @param cls closure, always NULL
3777  * @param other the other peer involved (sender or receiver, NULL
3778  *        for loopback messages where we are both sender and receiver)
3779  * @param message the actual message
3780  * @param latency reported latency of the connection with 'other'
3781  * @param distance reported distance (DV) to 'other' 
3782  * @return GNUNET_OK to keep the connection open,
3783  *         GNUNET_SYSERR to close it (signal serious error)
3784  */
3785 static int
3786 handle_p2p_get (void *cls,
3787                 const struct GNUNET_PeerIdentity *other,
3788                 const struct GNUNET_MessageHeader *message,
3789                 struct GNUNET_TIME_Relative latency,
3790                 uint32_t distance)
3791 {
3792   struct PendingRequest *pr;
3793   struct ConnectedPeer *cp;
3794   struct ConnectedPeer *cps;
3795   struct CheckDuplicateRequestClosure cdc;
3796   struct GNUNET_TIME_Relative timeout;
3797   uint16_t msize;
3798   const struct GetMessage *gm;
3799   unsigned int bits;
3800   const GNUNET_HashCode *opt;
3801   uint32_t bm;
3802   size_t bfsize;
3803   uint32_t ttl_decrement;
3804   int32_t priority;
3805   enum GNUNET_BLOCK_Type type;
3806   int have_ns;
3807
3808   msize = ntohs(message->size);
3809   if (msize < sizeof (struct GetMessage))
3810     {
3811       GNUNET_break_op (0);
3812       return GNUNET_SYSERR;
3813     }
3814   gm = (const struct GetMessage*) message;
3815   type = ntohl (gm->type);
3816   bm = ntohl (gm->hash_bitmap);
3817   bits = 0;
3818   while (bm > 0)
3819     {
3820       if (1 == (bm & 1))
3821         bits++;
3822       bm >>= 1;
3823     }
3824   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
3825     {
3826       GNUNET_break_op (0);
3827       return GNUNET_SYSERR;
3828     }  
3829   opt = (const GNUNET_HashCode*) &gm[1];
3830   bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
3831   bm = ntohl (gm->hash_bitmap);
3832   bits = 0;
3833   cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3834                                            &other->hashPubKey);
3835   if (NULL == cps)
3836     {
3837       /* peer must have just disconnected */
3838       GNUNET_STATISTICS_update (stats,
3839                                 gettext_noop ("# requests dropped due to initiator not being connected"),
3840                                 1,
3841                                 GNUNET_NO);
3842       return GNUNET_SYSERR;
3843     }
3844   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
3845     cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3846                                             &opt[bits++]);
3847   else
3848     cp = cps;
3849   if (cp == NULL)
3850     {
3851 #if DEBUG_FS
3852       if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
3853         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3854                     "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
3855                     GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
3856       
3857       else
3858         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3859                     "Failed to find peer `%4s' in connection set. Dropping query.\n",
3860                     GNUNET_i2s (other));
3861 #endif
3862       GNUNET_STATISTICS_update (stats,
3863                                 gettext_noop ("# requests dropped due to missing reverse route"),
3864                                 1,
3865                                 GNUNET_NO);
3866      /* FIXME: try connect? */
3867       return GNUNET_OK;
3868     }
3869   /* note that we can really only check load here since otherwise
3870      peers could find out that we are overloaded by not being
3871      disconnected after sending us a malformed query... */
3872   priority = bound_priority (ntohl (gm->priority), cps);
3873   if (priority < 0)
3874     {
3875 #if DEBUG_FS
3876       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3877                   "Dropping query from `%s', this peer is too busy.\n",
3878                   GNUNET_i2s (other));
3879 #endif
3880       GNUNET_STATISTICS_update (stats,
3881                                 gettext_noop ("# requests dropped due to high load"),
3882                                 1,
3883                                 GNUNET_NO);
3884       return GNUNET_OK;
3885     }
3886 #if DEBUG_FS 
3887   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3888               "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
3889               GNUNET_h2s (&gm->query),
3890               (unsigned int) type,
3891               GNUNET_i2s (other),
3892               (unsigned int) bm);
3893 #endif
3894   have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
3895   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
3896                       (have_ns ? sizeof(GNUNET_HashCode) : 0));
3897   if (have_ns)
3898     {
3899       pr->namespace = (GNUNET_HashCode*) &pr[1];
3900       memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
3901     }
3902   if ( (GNUNET_LOAD_get_load (cp->transmission_delay) > 3 * (1 + priority)) ||
3903        (GNUNET_LOAD_get_average (cp->transmission_delay) > 
3904         GNUNET_CONSTANTS_MAX_CORK_DELAY.value * 2 + GNUNET_LOAD_get_average (rt_entry_lifetime)) )
3905     {
3906       /* don't have BW to send to peer, or would likely take longer than we have for it,
3907          so at best indirect the query */
3908       priority = 0;
3909       pr->forward_only = GNUNET_YES;
3910     }
3911   pr->type = type;
3912   pr->mingle = ntohl (gm->filter_mutator);
3913   if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
3914     pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
3915   pr->anonymity_level = 1;
3916   pr->priority = (uint32_t) priority;
3917   pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
3918   pr->query = gm->query;
3919   /* decrement ttl (always) */
3920   ttl_decrement = 2 * TTL_DECREMENT +
3921     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3922                               TTL_DECREMENT);
3923   if ( (pr->ttl < 0) &&
3924        (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
3925     {
3926 #if DEBUG_FS
3927       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3928                   "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
3929                   GNUNET_i2s (other),
3930                   pr->ttl,
3931                   ttl_decrement);
3932 #endif
3933       GNUNET_STATISTICS_update (stats,
3934                                 gettext_noop ("# requests dropped due TTL underflow"),
3935                                 1,
3936                                 GNUNET_NO);
3937       /* integer underflow => drop (should be very rare)! */      
3938       GNUNET_free (pr);
3939       return GNUNET_OK;
3940     } 
3941   pr->ttl -= ttl_decrement;
3942   pr->start_time = GNUNET_TIME_absolute_get ();
3943
3944   /* get bloom filter */
3945   if (bfsize > 0)
3946     {
3947       pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
3948                                                   bfsize,
3949                                                   BLOOMFILTER_K);
3950       pr->bf_size = bfsize;
3951     }
3952
3953   cdc.have = NULL;
3954   cdc.pr = pr;
3955   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3956                                               &gm->query,
3957                                               &check_duplicate_request_peer,
3958                                               &cdc);
3959   if (cdc.have != NULL)
3960     {
3961       if (cdc.have->start_time.value + cdc.have->ttl >=
3962           pr->start_time.value + pr->ttl)
3963         {
3964           /* existing request has higher TTL, drop new one! */
3965           cdc.have->priority += pr->priority;
3966           destroy_pending_request (pr);
3967 #if DEBUG_FS
3968           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3969                       "Have existing request with higher TTL, dropping new request.\n",
3970                       GNUNET_i2s (other));
3971 #endif
3972           GNUNET_STATISTICS_update (stats,
3973                                     gettext_noop ("# requests dropped due to higher-TTL request"),
3974                                     1,
3975                                     GNUNET_NO);
3976           return GNUNET_OK;
3977         }
3978       else
3979         {
3980           /* existing request has lower TTL, drop old one! */
3981           pr->priority += cdc.have->priority;
3982           /* Possible optimization: if we have applicable pending
3983              replies in 'cdc.have', we might want to move those over
3984              (this is a really rare special-case, so it is not clear
3985              that this would be worth it) */
3986           destroy_pending_request (cdc.have);
3987           /* keep processing 'pr'! */
3988         }
3989     }
3990
3991   pr->cp = cp;
3992   GNUNET_break (GNUNET_OK ==
3993                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
3994                                                    &gm->query,
3995                                                    pr,
3996                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
3997   GNUNET_break (GNUNET_OK ==
3998                 GNUNET_CONTAINER_multihashmap_put (peer_request_map,
3999                                                    &other->hashPubKey,
4000                                                    pr,
4001                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4002   
4003   pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
4004                                             pr,
4005                                             pr->start_time.value + pr->ttl);
4006
4007   GNUNET_STATISTICS_update (stats,
4008                             gettext_noop ("# P2P searches received"),
4009                             1,
4010                             GNUNET_NO);
4011   GNUNET_STATISTICS_update (stats,
4012                             gettext_noop ("# P2P searches active"),
4013                             1,
4014                             GNUNET_NO);
4015
4016   /* calculate change in traffic preference */
4017   cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
4018   /* process locally */
4019   if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
4020     type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
4021   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
4022                                            (pr->priority + 1)); 
4023   if (GNUNET_YES != pr->forward_only)
4024     pr->qe = GNUNET_DATASTORE_get (dsh,
4025                                    &gm->query,
4026                                    type,                               
4027                                    pr->priority + 1,
4028                                    MAX_DATASTORE_QUEUE,                          
4029                                    timeout,
4030                                    &process_local_reply,
4031                                    pr);
4032   else
4033     GNUNET_STATISTICS_update (stats,
4034                               gettext_noop ("# requests forwarded due to high load"),
4035                               1,
4036                               GNUNET_NO);
4037
4038   /* Are multiple results possible (and did we look locally)?  If so, start processing remotely now! */
4039   switch (pr->type)
4040     {
4041     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
4042     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
4043       /* only one result, wait for datastore */
4044       if (GNUNET_YES != pr->forward_only)
4045         break;
4046     default:
4047       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
4048         pr->task = GNUNET_SCHEDULER_add_now (sched,
4049                                              &forward_request_task,
4050                                              pr);
4051     }
4052
4053   /* make sure we don't track too many requests */
4054   if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
4055     {
4056       pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
4057       GNUNET_assert (pr != NULL);
4058       destroy_pending_request (pr);
4059     }
4060   return GNUNET_OK;
4061 }
4062
4063
4064 /* **************************** CS GET Handling ************************ */
4065
4066
4067 /**
4068  * Handle START_SEARCH-message (search request from client).
4069  *
4070  * @param cls closure
4071  * @param client identification of the client
4072  * @param message the actual message
4073  */
4074 static void
4075 handle_start_search (void *cls,
4076                      struct GNUNET_SERVER_Client *client,
4077                      const struct GNUNET_MessageHeader *message)
4078 {
4079   static GNUNET_HashCode all_zeros;
4080   const struct SearchMessage *sm;
4081   struct ClientList *cl;
4082   struct ClientRequestList *crl;
4083   struct PendingRequest *pr;
4084   uint16_t msize;
4085   unsigned int sc;
4086   enum GNUNET_BLOCK_Type type;
4087
4088   msize = ntohs (message->size);
4089   if ( (msize < sizeof (struct SearchMessage)) ||
4090        (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
4091     {
4092       GNUNET_break (0);
4093       GNUNET_SERVER_receive_done (client,
4094                                   GNUNET_SYSERR);
4095       return;
4096     }
4097   GNUNET_STATISTICS_update (stats,
4098                             gettext_noop ("# client searches received"),
4099                             1,
4100                             GNUNET_NO);
4101   sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
4102   sm = (const struct SearchMessage*) message;
4103   type = ntohl (sm->type);
4104 #if DEBUG_FS
4105   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4106               "Received request for `%s' of type %u from local client\n",
4107               GNUNET_h2s (&sm->query),
4108               (unsigned int) type);
4109 #endif
4110   cl = client_list;
4111   while ( (cl != NULL) &&
4112           (cl->client != client) )
4113     cl = cl->next;
4114   if (cl == NULL)
4115     {
4116       cl = GNUNET_malloc (sizeof (struct ClientList));
4117       cl->client = client;
4118       GNUNET_SERVER_client_keep (client);
4119       cl->next = client_list;
4120       client_list = cl;
4121     }
4122   /* detect duplicate KBLOCK requests */
4123   if ( (type == GNUNET_BLOCK_TYPE_FS_KBLOCK) ||
4124        (type == GNUNET_BLOCK_TYPE_FS_NBLOCK) ||
4125        (type == GNUNET_BLOCK_TYPE_ANY) )
4126     {
4127       crl = cl->rl_head;
4128       while ( (crl != NULL) &&
4129               ( (0 != memcmp (&crl->req->query,
4130                               &sm->query,
4131                               sizeof (GNUNET_HashCode))) ||
4132                 (crl->req->type != type) ) )
4133         crl = crl->next;
4134       if (crl != NULL)  
4135         { 
4136 #if DEBUG_FS
4137           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
4138                       "Have existing request, merging content-seen lists.\n");
4139 #endif
4140           pr = crl->req;
4141           /* Duplicate request (used to send long list of
4142              known/blocked results); merge 'pr->replies_seen'
4143              and update bloom filter */
4144           GNUNET_array_grow (pr->replies_seen,
4145                              pr->replies_seen_size,
4146                              pr->replies_seen_off + sc);
4147           memcpy (&pr->replies_seen[pr->replies_seen_off],
4148                   &sm[1],
4149                   sc * sizeof (GNUNET_HashCode));
4150           pr->replies_seen_off += sc;
4151           refresh_bloomfilter (pr);
4152           GNUNET_STATISTICS_update (stats,
4153                                     gettext_noop ("# client searches updated (merged content seen list)"),
4154                                     1,
4155                                     GNUNET_NO);
4156           GNUNET_SERVER_receive_done (client,
4157                                       GNUNET_OK);
4158           return;
4159         }
4160     }
4161   GNUNET_STATISTICS_update (stats,
4162                             gettext_noop ("# client searches active"),
4163                             1,
4164                             GNUNET_NO);
4165   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
4166                       ((type == GNUNET_BLOCK_TYPE_FS_SBLOCK) ? sizeof(GNUNET_HashCode) : 0));
4167   crl = GNUNET_malloc (sizeof (struct ClientRequestList));
4168   memset (crl, 0, sizeof (struct ClientRequestList));
4169   crl->client_list = cl;
4170   GNUNET_CONTAINER_DLL_insert (cl->rl_head,
4171                                cl->rl_tail,
4172                                crl);  
4173   crl->req = pr;
4174   pr->type = type;
4175   pr->client_request_list = crl;
4176   GNUNET_array_grow (pr->replies_seen,
4177                      pr->replies_seen_size,
4178                      sc);
4179   memcpy (pr->replies_seen,
4180           &sm[1],
4181           sc * sizeof (GNUNET_HashCode));
4182   pr->replies_seen_off = sc;
4183   pr->anonymity_level = ntohl (sm->anonymity_level); 
4184   pr->start_time = GNUNET_TIME_absolute_get ();
4185   refresh_bloomfilter (pr);
4186   pr->query = sm->query;
4187   if (0 == (1 & ntohl (sm->options)))
4188     pr->local_only = GNUNET_NO;
4189   else
4190     pr->local_only = GNUNET_YES;
4191   switch (type)
4192     {
4193     case GNUNET_BLOCK_TYPE_FS_DBLOCK:
4194     case GNUNET_BLOCK_TYPE_FS_IBLOCK:
4195       if (0 != memcmp (&sm->target,
4196                        &all_zeros,
4197                        sizeof (GNUNET_HashCode)))
4198         pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target);
4199       break;
4200     case GNUNET_BLOCK_TYPE_FS_SBLOCK:
4201       pr->namespace = (GNUNET_HashCode*) &pr[1];
4202       memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode));
4203       break;
4204     default:
4205       break;
4206     }
4207   GNUNET_break (GNUNET_OK ==
4208                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
4209                                                    &sm->query,
4210                                                    pr,
4211                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
4212   if (type == GNUNET_BLOCK_TYPE_FS_DBLOCK)
4213     type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */
4214   pr->qe = GNUNET_DATASTORE_get (dsh,
4215                                  &sm->query,
4216                                  type,
4217                                  -3, -1,
4218                                  GNUNET_CONSTANTS_SERVICE_TIMEOUT,                             
4219                                  &process_local_reply,
4220                                  pr);
4221 }
4222
4223
4224 /* **************************** Startup ************************ */
4225
4226 /**
4227  * Process fs requests.
4228  *
4229  * @param s scheduler to use
4230  * @param server the initialized server
4231  * @param c configuration to use
4232  */
4233 static int
4234 main_init (struct GNUNET_SCHEDULER_Handle *s,
4235            struct GNUNET_SERVER_Handle *server,
4236            const struct GNUNET_CONFIGURATION_Handle *c)
4237 {
4238   static const struct GNUNET_CORE_MessageHandler p2p_handlers[] =
4239     {
4240       { &handle_p2p_get, 
4241         GNUNET_MESSAGE_TYPE_FS_GET, 0 },
4242       { &handle_p2p_put, 
4243         GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
4244       { &handle_p2p_migration_stop, 
4245         GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP,
4246         sizeof (struct MigrationStopMessage) },
4247       { NULL, 0, 0 }
4248     };
4249   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
4250     {&GNUNET_FS_handle_index_start, NULL, 
4251      GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
4252     {&GNUNET_FS_handle_index_list_get, NULL, 
4253      GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
4254     {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
4255      sizeof (struct UnindexMessage) },
4256     {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
4257      0 },
4258     {NULL, NULL, 0, 0}
4259   };
4260   unsigned long long enc = 128;
4261
4262   sched = s;
4263   cfg = c;
4264   stats = GNUNET_STATISTICS_create (sched, "fs", cfg);
4265   min_migration_delay = GNUNET_TIME_UNIT_SECONDS;
4266   if ( (GNUNET_OK !=
4267         GNUNET_CONFIGURATION_get_value_number (cfg,
4268                                                "fs",
4269                                                "MAX_PENDING_REQUESTS",
4270                                                &max_pending_requests)) ||
4271        (GNUNET_OK !=
4272         GNUNET_CONFIGURATION_get_value_number (cfg,
4273                                                "fs",
4274                                                "EXPECTED_NEIGHBOUR_COUNT",
4275                                                &enc)) ||
4276        (GNUNET_OK != 
4277         GNUNET_CONFIGURATION_get_value_time (cfg,
4278                                              "fs",
4279                                              "MIN_MIGRATION_DELAY",
4280                                              &min_migration_delay)) )
4281     {
4282       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4283                   _("Configuration fails to specify certain parameters, assuming default values."));
4284     }
4285   connected_peers = GNUNET_CONTAINER_multihashmap_create (enc); 
4286   query_request_map = GNUNET_CONTAINER_multihashmap_create (max_pending_requests);
4287   rt_entry_lifetime = GNUNET_LOAD_value_init ();
4288   peer_request_map = GNUNET_CONTAINER_multihashmap_create (enc);
4289   requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
4290   core = GNUNET_CORE_connect (sched,
4291                               cfg,
4292                               GNUNET_TIME_UNIT_FOREVER_REL,
4293                               NULL,
4294                               NULL,
4295                               &peer_connect_handler,
4296                               &peer_disconnect_handler,
4297                               NULL,
4298                               NULL, GNUNET_NO,
4299                               NULL, GNUNET_NO,
4300                               p2p_handlers);
4301   if (NULL == core)
4302     {
4303       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
4304                   _("Failed to connect to `%s' service.\n"),
4305                   "core");
4306       GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
4307       connected_peers = NULL;
4308       GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
4309       query_request_map = NULL;
4310       GNUNET_LOAD_value_free (rt_entry_lifetime);
4311       rt_entry_lifetime = NULL;
4312       GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
4313       requests_by_expiration_heap = NULL;
4314       GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
4315       peer_request_map = NULL;
4316       if (dsh != NULL)
4317         {
4318           GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4319           dsh = NULL;
4320         }
4321       return GNUNET_SYSERR;
4322     }
4323   /* FIXME: distinguish between sending and storing in options? */
4324   if (active_migration) 
4325     {
4326       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
4327                   _("Content migration is enabled, will start to gather data\n"));
4328       consider_migration_gathering ();
4329     }
4330   consider_dht_put_gathering (NULL);
4331   GNUNET_SERVER_disconnect_notify (server, 
4332                                    &handle_client_disconnect,
4333                                    NULL);
4334   GNUNET_assert (GNUNET_OK ==
4335                  GNUNET_CONFIGURATION_get_value_filename (cfg,
4336                                                           "fs",
4337                                                           "TRUST",
4338                                                           &trustDirectory));
4339   GNUNET_DISK_directory_create (trustDirectory);
4340   GNUNET_SCHEDULER_add_with_priority (sched,
4341                                       GNUNET_SCHEDULER_PRIORITY_HIGH,
4342                                       &cron_flush_trust, NULL);
4343
4344
4345   GNUNET_SERVER_add_handlers (server, handlers);
4346   GNUNET_SCHEDULER_add_delayed (sched,
4347                                 GNUNET_TIME_UNIT_FOREVER_REL,
4348                                 &shutdown_task,
4349                                 NULL);
4350   return GNUNET_OK;
4351 }
4352
4353
4354 /**
4355  * Process fs requests.
4356  *
4357  * @param cls closure
4358  * @param sched scheduler to use
4359  * @param server the initialized server
4360  * @param cfg configuration to use
4361  */
4362 static void
4363 run (void *cls,
4364      struct GNUNET_SCHEDULER_Handle *sched,
4365      struct GNUNET_SERVER_Handle *server,
4366      const struct GNUNET_CONFIGURATION_Handle *cfg)
4367 {
4368   active_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
4369                                                            "FS",
4370                                                            "ACTIVEMIGRATION");
4371   dsh = GNUNET_DATASTORE_connect (cfg,
4372                                   sched);
4373   if (dsh == NULL)
4374     {
4375       GNUNET_SCHEDULER_shutdown (sched);
4376       return;
4377     }
4378   datastore_get_load = GNUNET_LOAD_value_init ();
4379   datastore_put_load = GNUNET_LOAD_value_init ();
4380   block_cfg = GNUNET_CONFIGURATION_create ();
4381   GNUNET_CONFIGURATION_set_value_string (block_cfg,
4382                                          "block",
4383                                          "PLUGINS",
4384                                          "fs");
4385   block_ctx = GNUNET_BLOCK_context_create (block_cfg);
4386   GNUNET_assert (NULL != block_ctx);
4387   dht_handle = GNUNET_DHT_connect (sched,
4388                                    cfg,
4389                                    FS_DHT_HT_SIZE);
4390   if ( (GNUNET_OK != GNUNET_FS_indexing_init (sched, cfg, dsh)) ||
4391        (GNUNET_OK != main_init (sched, server, cfg)) )
4392     {    
4393       GNUNET_SCHEDULER_shutdown (sched);
4394       GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
4395       dsh = NULL;
4396       GNUNET_DHT_disconnect (dht_handle);
4397       dht_handle = NULL;
4398       GNUNET_BLOCK_context_destroy (block_ctx);
4399       block_ctx = NULL;
4400       GNUNET_CONFIGURATION_destroy (block_cfg);
4401       block_cfg = NULL;
4402       GNUNET_LOAD_value_free (datastore_get_load);
4403       datastore_get_load = NULL;
4404       GNUNET_LOAD_value_free (datastore_put_load);
4405       datastore_put_load = NULL;
4406       return;   
4407     }
4408 }
4409
4410
4411 /**
4412  * The main function for the fs service.
4413  *
4414  * @param argc number of arguments from the command line
4415  * @param argv command line arguments
4416  * @return 0 ok, 1 on error
4417  */
4418 int
4419 main (int argc, char *const *argv)
4420 {
4421   return (GNUNET_OK ==
4422           GNUNET_SERVICE_run (argc,
4423                               argv,
4424                               "fs",
4425                               GNUNET_SERVICE_OPTION_NONE,
4426                               &run, NULL)) ? 0 : 1;
4427 }
4428
4429 /* end of gnunet-service-fs.c */