minor fixmes
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs.c
23  * @brief gnunet anonymity protocol implementation
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - implement test_load_too_high, make decision priority-based, implement forwarding, etc.
28  * - consider more precise latency estimation (per-peer & request)
29  * - introduce random latency in processing
30  * - tell other peers to stop migration if our PUTs fail (or if
31  *   we don't support migration per configuration?)
32  * - more statistics
33  */
34 #include "platform.h"
35 #include <float.h>
36 #include "gnunet_constants.h"
37 #include "gnunet_core_service.h"
38 #include "gnunet_datastore_service.h"
39 #include "gnunet_peer_lib.h"
40 #include "gnunet_protocols.h"
41 #include "gnunet_signatures.h"
42 #include "gnunet_statistics_service.h"
43 #include "gnunet_util_lib.h"
44 #include "gnunet-service-fs_indexing.h"
45 #include "fs.h"
46
47 #define DEBUG_FS GNUNET_NO
48
49 /**
50  * Maximum number of outgoing messages we queue per peer.
51  */
52 #define MAX_QUEUE_PER_PEER 16
53
54 /**
55  * How often do we flush trust values to disk?
56  */
57 #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
58
59 /**
60  * Inverse of the probability that we will submit the same query
61  * to the same peer again.  If the same peer already got the query
62  * repeatedly recently, the probability is multiplied by the inverse
63  * of this number each time.  Note that we only try about every TTL_DECREMENT/2
64  * plus MAX_CORK_DELAY (so roughly every 3.5s).
65  */
66 #define RETRY_PROBABILITY_INV 3
67
68 /**
69  * What is the maximum delay for a P2P FS message (in our interaction
70  * with core)?  FS-internal delays are another story.  The value is
71  * chosen based on the 32k block size.  Given that peers typcially
72  * have at least 1 kb/s bandwidth, 45s waits give us a chance to
73  * transmit one message even to the lowest-bandwidth peers.
74  */
75 #define MAX_TRANSMIT_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 45)
76
77 /**
78  * Maximum number of requests (from other peers) that we're
79  * willing to have pending at any given point in time.
80  */
81 static uint64_t max_pending_requests = (32 * 1024);
82
83
84 /**
85  * Information we keep for each pending reply.  The
86  * actual message follows at the end of this struct.
87  */
88 struct PendingMessage;
89
90 /**
91  * Function called upon completion of a transmission.
92  *
93  * @param cls closure
94  * @param pid ID of receiving peer, 0 on transmission error
95  */
96 typedef void (*TransmissionContinuation)(void * cls, 
97                                          GNUNET_PEER_Id tpid);
98
99
100 /**
101  * Information we keep for each pending message (GET/PUT).  The
102  * actual message follows at the end of this struct.
103  */
104 struct PendingMessage
105 {
106   /**
107    * This is a doubly-linked list of messages to the same peer.
108    */
109   struct PendingMessage *next;
110
111   /**
112    * This is a doubly-linked list of messages to the same peer.
113    */
114   struct PendingMessage *prev;
115
116   /**
117    * Entry in pending message list for this pending message.
118    */ 
119   struct PendingMessageList *pml;  
120
121   /**
122    * Function to call immediately once we have transmitted this
123    * message.
124    */
125   TransmissionContinuation cont;
126
127   /**
128    * Closure for cont.
129    */
130   void *cont_cls;
131
132   /**
133    * Size of the reply; actual reply message follows
134    * at the end of this struct.
135    */
136   size_t msize;
137   
138   /**
139    * How important is this message for us?
140    */
141   uint32_t priority;
142  
143 };
144
145
146 /**
147  * Information about a peer that we are connected to.
148  * We track data that is useful for determining which
149  * peers should receive our requests.  We also keep
150  * a list of messages to transmit to this peer.
151  */
152 struct ConnectedPeer
153 {
154
155   /**
156    * List of the last clients for which this peer successfully
157    * answered a query.
158    */
159   struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
160
161   /**
162    * List of the last PIDs for which
163    * this peer successfully answered a query;
164    * We use 0 to indicate no successful reply.
165    */
166   GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
167
168   /**
169    * Average delay between sending the peer a request and
170    * getting a reply (only calculated over the requests for
171    * which we actually got a reply).   Calculated
172    * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
173    */ 
174   struct GNUNET_TIME_Relative avg_delay;
175
176   /**
177    * Handle for an active request for transmission to this
178    * peer, or NULL.
179    */
180   struct GNUNET_CORE_TransmitHandle *cth;
181
182   /**
183    * Messages (replies, queries, content migration) we would like to
184    * send to this peer in the near future.  Sorted by priority, head.
185    */
186   struct PendingMessage *pending_messages_head;
187
188   /**
189    * Messages (replies, queries, content migration) we would like to
190    * send to this peer in the near future.  Sorted by priority, tail.
191    */
192   struct PendingMessage *pending_messages_tail;
193
194   /**
195    * Average priority of successful replies.  Calculated
196    * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
197    */
198   double avg_priority;
199
200   /**
201    * Increase in traffic preference still to be submitted
202    * to the core service for this peer.
203    */
204   uint64_t inc_preference;
205
206   /**
207    * Trust rating for this peer
208    */
209   uint32_t trust;
210
211   /**
212    * Trust rating for this peer on disk.
213    */
214   uint32_t disk_trust;
215
216   /**
217    * The peer's identity.
218    */
219   GNUNET_PEER_Id pid;  
220
221   /**
222    * Size of the linked list of 'pending_messages'.
223    */
224   unsigned int pending_requests;
225
226   /**
227    * Which offset in "last_p2p_replies" will be updated next?
228    * (we go round-robin).
229    */
230   unsigned int last_p2p_replies_woff;
231
232   /**
233    * Which offset in "last_client_replies" will be updated next?
234    * (we go round-robin).
235    */
236   unsigned int last_client_replies_woff;
237
238 };
239
240
241 /**
242  * Information we keep for each pending request.  We should try to
243  * keep this struct as small as possible since its memory consumption
244  * is key to how many requests we can have pending at once.
245  */
246 struct PendingRequest;
247
248
249 /**
250  * Doubly-linked list of requests we are performing
251  * on behalf of the same client.
252  */
253 struct ClientRequestList
254 {
255
256   /**
257    * This is a doubly-linked list.
258    */
259   struct ClientRequestList *next;
260
261   /**
262    * This is a doubly-linked list.
263    */
264   struct ClientRequestList *prev;
265
266   /**
267    * Request this entry represents.
268    */
269   struct PendingRequest *req;
270
271   /**
272    * Client list this request belongs to.
273    */
274   struct ClientList *client_list;
275
276 };
277
278
279 /**
280  * Replies to be transmitted to the client.  The actual
281  * response message is allocated after this struct.
282  */
283 struct ClientResponseMessage
284 {
285   /**
286    * This is a doubly-linked list.
287    */
288   struct ClientResponseMessage *next;
289
290   /**
291    * This is a doubly-linked list.
292    */
293   struct ClientResponseMessage *prev;
294
295   /**
296    * Client list entry this response belongs to.
297    */
298   struct ClientList *client_list;
299
300   /**
301    * Number of bytes in the response.
302    */
303   size_t msize;
304 };
305
306
307 /**
308  * Linked list of clients we are performing requests
309  * for right now.
310  */
311 struct ClientList
312 {
313   /**
314    * This is a linked list.
315    */
316   struct ClientList *next;
317
318   /**
319    * ID of a client making a request, NULL if this entry is for a
320    * peer.
321    */
322   struct GNUNET_SERVER_Client *client;
323
324   /**
325    * Head of list of requests performed on behalf
326    * of this client right now.
327    */
328   struct ClientRequestList *rl_head;
329
330   /**
331    * Tail of list of requests performed on behalf
332    * of this client right now.
333    */
334   struct ClientRequestList *rl_tail;
335
336   /**
337    * Head of linked list of responses.
338    */
339   struct ClientResponseMessage *res_head;
340
341   /**
342    * Tail of linked list of responses.
343    */
344   struct ClientResponseMessage *res_tail;
345
346   /**
347    * Context for sending replies.
348    */
349   struct GNUNET_CONNECTION_TransmitHandle *th;
350
351 };
352
353
354 /**
355  * Doubly-linked list of messages we are performing
356  * due to a pending request.
357  */
358 struct PendingMessageList
359 {
360
361   /**
362    * This is a doubly-linked list of messages on behalf of the same request.
363    */
364   struct PendingMessageList *next;
365
366   /**
367    * This is a doubly-linked list of messages on behalf of the same request.
368    */
369   struct PendingMessageList *prev;
370
371   /**
372    * Message this entry represents.
373    */
374   struct PendingMessage *pm;
375
376   /**
377    * Request this entry belongs to.
378    */
379   struct PendingRequest *req;
380
381   /**
382    * Peer this message is targeted for.
383    */
384   struct ConnectedPeer *target;
385
386 };
387
388
389 /**
390  * Information we keep for each pending request.  We should try to
391  * keep this struct as small as possible since its memory consumption
392  * is key to how many requests we can have pending at once.
393  */
394 struct PendingRequest
395 {
396
397   /**
398    * If this request was made by a client, this is our entry in the
399    * client request list; otherwise NULL.
400    */
401   struct ClientRequestList *client_request_list;
402
403   /**
404    * Entry of peer responsible for this entry (if this request
405    * was made by a peer).
406    */
407   struct ConnectedPeer *cp;
408
409   /**
410    * If this is a namespace query, pointer to the hash of the public
411    * key of the namespace; otherwise NULL.  Pointer will be to the 
412    * end of this struct (so no need to free it).
413    */
414   const GNUNET_HashCode *namespace;
415
416   /**
417    * Bloomfilter we use to filter out replies that we don't care about
418    * (anymore).  NULL as long as we are interested in all replies.
419    */
420   struct GNUNET_CONTAINER_BloomFilter *bf;
421
422   /**
423    * Context of our GNUNET_CORE_peer_change_preference call.
424    */
425   struct GNUNET_CORE_InformationRequestContext *irc;
426
427   /**
428    * Hash code of all replies that we have seen so far (only valid
429    * if client is not NULL since we only track replies like this for
430    * our own clients).
431    */
432   GNUNET_HashCode *replies_seen;
433
434   /**
435    * Node in the heap representing this entry; NULL
436    * if we have no heap node.
437    */
438   struct GNUNET_CONTAINER_HeapNode *hnode;
439
440   /**
441    * Head of list of messages being performed on behalf of this
442    * request.
443    */
444   struct PendingMessageList *pending_head;
445
446   /**
447    * Tail of list of messages being performed on behalf of this
448    * request.
449    */
450   struct PendingMessageList *pending_tail;
451
452   /**
453    * When did we first see this request (form this peer), or, if our
454    * client is initiating, when did we last initiate a search?
455    */
456   struct GNUNET_TIME_Absolute start_time;
457
458   /**
459    * The query that this request is for.
460    */
461   GNUNET_HashCode query;
462
463   /**
464    * The task responsible for transmitting queries
465    * for this request.
466    */
467   GNUNET_SCHEDULER_TaskIdentifier task;
468
469   /**
470    * (Interned) Peer identifier that identifies a preferred target
471    * for requests.
472    */
473   GNUNET_PEER_Id target_pid;
474
475   /**
476    * (Interned) Peer identifiers of peers that have already
477    * received our query for this content.
478    */
479   GNUNET_PEER_Id *used_pids;
480   
481   /**
482    * Our entry in the queue (non-NULL while we wait for our
483    * turn to interact with the local database).
484    */
485   struct GNUNET_DATASTORE_QueueEntry *qe;
486
487   /**
488    * Size of the 'bf' (in bytes).
489    */
490   size_t bf_size;
491
492   /**
493    * Desired anonymity level; only valid for requests from a local client.
494    */
495   uint32_t anonymity_level;
496
497   /**
498    * How many entries in "used_pids" are actually valid?
499    */
500   unsigned int used_pids_off;
501
502   /**
503    * How long is the "used_pids" array?
504    */
505   unsigned int used_pids_size;
506
507   /**
508    * Number of results found for this request.
509    */
510   unsigned int results_found;
511
512   /**
513    * How many entries in "replies_seen" are actually valid?
514    */
515   unsigned int replies_seen_off;
516
517   /**
518    * How long is the "replies_seen" array?
519    */
520   unsigned int replies_seen_size;
521   
522   /**
523    * Priority with which this request was made.  If one of our clients
524    * made the request, then this is the current priority that we are
525    * using when initiating the request.  This value is used when
526    * we decide to reward other peers with trust for providing a reply.
527    */
528   uint32_t priority;
529
530   /**
531    * Priority points left for us to spend when forwarding this request
532    * to other peers.
533    */
534   uint32_t remaining_priority;
535
536   /**
537    * Number to mingle hashes for bloom-filter tests with.
538    */
539   int32_t mingle;
540
541   /**
542    * TTL with which we saw this request (or, if we initiated, TTL that
543    * we used for the request).
544    */
545   int32_t ttl;
546   
547   /**
548    * Type of the content that this request is for.
549    */
550   enum GNUNET_BLOCK_Type type;
551
552   /**
553    * Remove this request after transmission of the current response.
554    */
555   int16_t do_remove;
556
557   /**
558    * GNUNET_YES if we should not forward this request to other peers.
559    */
560   int16_t local_only;
561
562 };
563
564
565 /**
566  * Block that is ready for migration to other peers.  Actual data is at the end of the block.
567  */
568 struct MigrationReadyBlock
569 {
570
571   /**
572    * This is a doubly-linked list.
573    */
574   struct MigrationReadyBlock *next;
575
576   /**
577    * This is a doubly-linked list.
578    */
579   struct MigrationReadyBlock *prev;
580
581   /**
582    * Query for the block.
583    */
584   GNUNET_HashCode query;
585
586   /**
587    * When does this block expire? 
588    */
589   struct GNUNET_TIME_Absolute expiration;
590
591   /**
592    * Peers we would consider forwarding this
593    * block to.  Zero for empty entries.
594    */
595   GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
596
597   /**
598    * Size of the block.
599    */
600   size_t size;
601
602   /**
603    *  Number of targets already used.
604    */
605   unsigned int used_targets;
606
607   /**
608    * Type of the block.
609    */
610   enum GNUNET_BLOCK_Type type;
611 };
612
613
614 /**
615  * Our connection to the datastore.
616  */
617 static struct GNUNET_DATASTORE_Handle *dsh;
618
619
620 /**
621  * Our scheduler.
622  */
623 static struct GNUNET_SCHEDULER_Handle *sched;
624
625 /**
626  * Our configuration.
627  */
628 static const struct GNUNET_CONFIGURATION_Handle *cfg;
629
630 /**
631  * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
632  */
633 static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
634
635 /**
636  * Map of peer identifiers to "struct PendingRequest" (for that peer).
637  */
638 static struct GNUNET_CONTAINER_MultiHashMap *peer_request_map;
639
640 /**
641  * Map of query identifiers to "struct PendingRequest" (for that query).
642  */
643 static struct GNUNET_CONTAINER_MultiHashMap *query_request_map;
644
645 /**
646  * Heap with the request that will expire next at the top.  Contains
647  * pointers of type "struct PendingRequest*"; these will *also* be
648  * aliased from the "requests_by_peer" data structures and the
649  * "requests_by_query" table.  Note that requests from our clients
650  * don't expire and are thus NOT in the "requests_by_expiration"
651  * (or the "requests_by_peer" tables).
652  */
653 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
654
655 /**
656  * Handle for reporting statistics.
657  */
658 static struct GNUNET_STATISTICS_Handle *stats;
659
660 /**
661  * Linked list of clients we are currently processing requests for.
662  */
663 static struct ClientList *client_list;
664
665 /**
666  * Pointer to handle to the core service (points to NULL until we've
667  * connected to it).
668  */
669 static struct GNUNET_CORE_Handle *core;
670
671 /**
672  * Head of linked list of blocks that can be migrated.
673  */
674 static struct MigrationReadyBlock *mig_head;
675
676 /**
677  * Tail of linked list of blocks that can be migrated.
678  */
679 static struct MigrationReadyBlock *mig_tail;
680
681 /**
682  * Request to datastore for migration (or NULL).
683  */
684 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
685
686 /**
687  * Where do we store trust information?
688  */
689 static char *trustDirectory;
690
691 /**
692  * ID of task that collects blocks for migration.
693  */
694 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
695
696 /**
697  * What is the maximum frequency at which we are allowed to
698  * poll the datastore for migration content?
699  */
700 static struct GNUNET_TIME_Relative min_migration_delay;
701
702 /**
703  * Size of the doubly-linked list of migration blocks.
704  */
705 static unsigned int mig_size;
706
707 /**
708  * Are we allowed to migrate content to this peer.
709  */
710 static int active_migration;
711
712 /**
713  * Typical priorities we're seeing from other peers right now.  Since
714  * most priorities will be zero, this value is the weighted average of
715  * non-zero priorities seen "recently".  In order to ensure that new
716  * values do not dramatically change the ratio, values are first
717  * "capped" to a reasonable range (+N of the current value) and then
718  * averaged into the existing value by a ratio of 1:N.  Hence
719  * receiving the largest possible priority can still only raise our
720  * "current_priorities" by at most 1.
721  */
722 static double current_priorities;
723
724 /**
725  * Get the filename under which we would store the GNUNET_HELLO_Message
726  * for the given host and protocol.
727  * @return filename of the form DIRECTORY/HOSTID
728  */
729 static char *
730 get_trust_filename (const struct GNUNET_PeerIdentity *id)
731 {
732   struct GNUNET_CRYPTO_HashAsciiEncoded fil;
733   char *fn;
734
735   GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
736   GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil);
737   return fn;
738 }
739
740
741
742 /**
743  * Transmit messages by copying it to the target buffer
744  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
745  * for writing in the meantime.  In that case, do nothing
746  * (the disconnect or shutdown handler will take care of the rest).
747  * If we were able to transmit messages and there are still more
748  * pending, ask core again for further calls to this function.
749  *
750  * @param cls closure, pointer to the 'struct ConnectedPeer*'
751  * @param size number of bytes available in buf
752  * @param buf where the callee should write the message
753  * @return number of bytes written to buf
754  */
755 static size_t
756 transmit_to_peer (void *cls,
757                   size_t size, void *buf);
758
759
760 /* ******************* clean up functions ************************ */
761
762
763 /**
764  * Delete the given migration block.
765  *
766  * @param mb block to delete
767  */
768 static void
769 delete_migration_block (struct MigrationReadyBlock *mb)
770 {
771   GNUNET_CONTAINER_DLL_remove (mig_head,
772                                mig_tail,
773                                mb);
774   GNUNET_PEER_decrement_rcs (mb->target_list,
775                              MIGRATION_LIST_SIZE);
776   mig_size--;
777   GNUNET_free (mb);
778 }
779
780
781 /**
782  * Compare the distance of two peers to a key.
783  *
784  * @param key key
785  * @param p1 first peer
786  * @param p2 second peer
787  * @return GNUNET_YES if P1 is closer to key than P2
788  */
789 static int
790 is_closer (const GNUNET_HashCode *key,
791            const struct GNUNET_PeerIdentity *p1,
792            const struct GNUNET_PeerIdentity *p2)
793 {
794   return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey,
795                                     &p2->hashPubKey,
796                                     key);
797 }
798
799
800 /**
801  * Consider migrating content to a given peer.
802  *
803  * @param cls 'struct MigrationReadyBlock*' to select
804  *            targets for (or NULL for none)
805  * @param key ID of the peer 
806  * @param value 'struct ConnectedPeer' of the peer
807  * @return GNUNET_YES (always continue iteration)
808  */
809 static int
810 consider_migration (void *cls,
811                     const GNUNET_HashCode *key,
812                     void *value)
813 {
814   struct MigrationReadyBlock *mb = cls;
815   struct ConnectedPeer *cp = value;
816   struct MigrationReadyBlock *pos;
817   struct GNUNET_PeerIdentity cppid;
818   struct GNUNET_PeerIdentity otherpid;
819   struct GNUNET_PeerIdentity worstpid;
820   size_t msize;
821   unsigned int i;
822   unsigned int repl;
823   
824   /* consider 'cp' as a migration target for mb */
825   if (mb != NULL)
826     {
827       GNUNET_PEER_resolve (cp->pid,
828                            &cppid);
829       repl = MIGRATION_LIST_SIZE;
830       for (i=0;i<MIGRATION_LIST_SIZE;i++)
831         {
832           if (mb->target_list[i] == 0)
833             {
834               mb->target_list[i] = cp->pid;
835               GNUNET_PEER_change_rc (mb->target_list[i], 1);
836               repl = MIGRATION_LIST_SIZE;
837               break;
838             }
839           GNUNET_PEER_resolve (mb->target_list[i],
840                                &otherpid);
841           if ( (repl == MIGRATION_LIST_SIZE) &&
842                is_closer (&mb->query,
843                           &cppid,
844                           &otherpid)) 
845             {
846               repl = i;
847               worstpid = otherpid;
848             }
849           else if ( (repl != MIGRATION_LIST_SIZE) &&
850                     (is_closer (&mb->query,
851                                 &worstpid,
852                                 &otherpid) ) )
853             {
854               repl = i;
855               worstpid = otherpid;
856             }       
857         }
858       if (repl != MIGRATION_LIST_SIZE) 
859         {
860           GNUNET_PEER_change_rc (mb->target_list[repl], -1);
861           mb->target_list[repl] = cp->pid;
862           GNUNET_PEER_change_rc (mb->target_list[repl], 1);
863         }
864     }
865
866   /* consider scheduling transmission to cp for content migration */
867   if (cp->cth != NULL)
868     return GNUNET_YES; 
869   msize = 0;
870   pos = mig_head;
871   while (pos != NULL)
872     {
873       for (i=0;i<MIGRATION_LIST_SIZE;i++)
874         {
875           if (cp->pid == pos->target_list[i])
876             {
877               if (msize == 0)
878                 msize = pos->size;
879               else
880                 msize = GNUNET_MIN (msize,
881                                     pos->size);
882               break;
883             }
884         }
885       pos = pos->next;
886     }
887   if (msize == 0)
888     return GNUNET_YES; /* no content available */
889 #if DEBUG_FS
890   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
891               "Trying to migrate at least %u bytes to peer `%s'\n",
892               msize,
893               GNUNET_h2s (key));
894 #endif
895   cp->cth 
896     = GNUNET_CORE_notify_transmit_ready (core,
897                                          0, GNUNET_TIME_UNIT_FOREVER_REL,
898                                          (const struct GNUNET_PeerIdentity*) key,
899                                          msize + sizeof (struct PutMessage),
900                                          &transmit_to_peer,
901                                          cp);
902   return GNUNET_YES;
903 }
904
905
906 /**
907  * Task that is run periodically to obtain blocks for content
908  * migration
909  * 
910  * @param cls unused
911  * @param tc scheduler context (also unused)
912  */
913 static void
914 gather_migration_blocks (void *cls,
915                          const struct GNUNET_SCHEDULER_TaskContext *tc);
916
917
918 /**
919  * If the migration task is not currently running, consider
920  * (re)scheduling it with the appropriate delay.
921  */
922 static void
923 consider_migration_gathering ()
924 {
925   struct GNUNET_TIME_Relative delay;
926
927   if (mig_qe != NULL)
928     return;
929   if (mig_task != GNUNET_SCHEDULER_NO_TASK)
930     return;
931   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
932                                          mig_size);
933   delay = GNUNET_TIME_relative_divide (delay,
934                                        MAX_MIGRATION_QUEUE);
935   delay = GNUNET_TIME_relative_max (delay,
936                                     min_migration_delay);
937   mig_task = GNUNET_SCHEDULER_add_delayed (sched,
938                                            delay,
939                                            &gather_migration_blocks,
940                                            NULL);
941 }
942
943
944 /**
945  * Process content offered for migration.
946  *
947  * @param cls closure
948  * @param key key for the content
949  * @param size number of bytes in data
950  * @param data content stored
951  * @param type type of the content
952  * @param priority priority of the content
953  * @param anonymity anonymity-level for the content
954  * @param expiration expiration time for the content
955  * @param uid unique identifier for the datum;
956  *        maybe 0 if no unique identifier is available
957  */
958 static void
959 process_migration_content (void *cls,
960                            const GNUNET_HashCode * key,
961                            uint32_t size,
962                            const void *data,
963                            enum GNUNET_BLOCK_Type type,
964                            uint32_t priority,
965                            uint32_t anonymity,
966                            struct GNUNET_TIME_Absolute
967                            expiration, uint64_t uid)
968 {
969   struct MigrationReadyBlock *mb;
970   
971   if (key == NULL)
972     {
973       mig_qe = NULL;
974       if (mig_size < MAX_MIGRATION_QUEUE)  
975         consider_migration_gathering ();
976       return;
977     }
978   if (type == GNUNET_BLOCK_TYPE_ONDEMAND)
979     {
980       if (GNUNET_OK !=
981           GNUNET_FS_handle_on_demand_block (key, size, data,
982                                             type, priority, anonymity,
983                                             expiration, uid, 
984                                             &process_migration_content,
985                                             NULL))
986         GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
987       return;
988     }
989 #if DEBUG_FS
990   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
991               "Retrieved block `%s' of type %u for migration\n",
992               GNUNET_h2s (key),
993               type);
994 #endif
995   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
996   mb->query = *key;
997   mb->expiration = expiration;
998   mb->size = size;
999   mb->type = type;
1000   memcpy (&mb[1], data, size);
1001   GNUNET_CONTAINER_DLL_insert_after (mig_head,
1002                                      mig_tail,
1003                                      mig_tail,
1004                                      mb);
1005   mig_size++;
1006   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1007                                          &consider_migration,
1008                                          mb);
1009   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1010 }
1011
1012
1013 /**
1014  * Task that is run periodically to obtain blocks for content
1015  * migration
1016  * 
1017  * @param cls unused
1018  * @param tc scheduler context (also unused)
1019  */
1020 static void
1021 gather_migration_blocks (void *cls,
1022                          const struct GNUNET_SCHEDULER_TaskContext *tc)
1023 {
1024   mig_task = GNUNET_SCHEDULER_NO_TASK;
1025   mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, -1,
1026                                         GNUNET_TIME_UNIT_FOREVER_REL,
1027                                         &process_migration_content, NULL);
1028   GNUNET_assert (mig_qe != NULL);
1029 }
1030
1031
1032 /**
1033  * We're done with a particular message list entry.
1034  * Free all associated resources.
1035  * 
1036  * @param pml entry to destroy
1037  */
1038 static void
1039 destroy_pending_message_list_entry (struct PendingMessageList *pml)
1040 {
1041   GNUNET_CONTAINER_DLL_remove (pml->req->pending_head,
1042                                pml->req->pending_tail,
1043                                pml);
1044   GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head,
1045                                pml->target->pending_messages_tail,
1046                                pml->pm);
1047   pml->target->pending_requests--;
1048   GNUNET_free (pml->pm);
1049   GNUNET_free (pml);
1050 }
1051
1052
1053 /**
1054  * Destroy the given pending message (and call the respective
1055  * continuation).
1056  *
1057  * @param pm message to destroy
1058  * @param tpid id of peer that the message was delivered to, or 0 for none
1059  */
1060 static void
1061 destroy_pending_message (struct PendingMessage *pm,
1062                          GNUNET_PEER_Id tpid)
1063 {
1064   struct PendingMessageList *pml = pm->pml;
1065   TransmissionContinuation cont;
1066   void *cont_cls;
1067
1068   GNUNET_assert (pml->pm == pm);
1069   GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
1070   cont = pm->cont;
1071   cont_cls = pm->cont_cls;
1072   destroy_pending_message_list_entry (pml);
1073   cont (cont_cls, tpid);  
1074 }
1075
1076
1077 /**
1078  * We're done processing a particular request.
1079  * Free all associated resources.
1080  *
1081  * @param pr request to destroy
1082  */
1083 static void
1084 destroy_pending_request (struct PendingRequest *pr)
1085 {
1086   struct GNUNET_PeerIdentity pid;
1087
1088   if (pr->hnode != NULL)
1089     {
1090       GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
1091                                          pr->hnode);
1092       pr->hnode = NULL;
1093     }
1094   if (NULL == pr->client_request_list)
1095     {
1096       GNUNET_STATISTICS_update (stats,
1097                                 gettext_noop ("# P2P searches active"),
1098                                 -1,
1099                                 GNUNET_NO);
1100     }
1101   else
1102     {
1103       GNUNET_STATISTICS_update (stats,
1104                                 gettext_noop ("# client searches active"),
1105                                 -1,
1106                                 GNUNET_NO);
1107     }
1108   /* might have already been removed from map in 'process_reply' (if
1109      there was a unique reply) or never inserted if it was a
1110      duplicate; hence ignore the return value here */
1111   (void) GNUNET_CONTAINER_multihashmap_remove (query_request_map,
1112                                                &pr->query,
1113                                                pr);
1114   if (pr->qe != NULL)
1115      {
1116       GNUNET_DATASTORE_cancel (pr->qe);
1117       pr->qe = NULL;
1118     }
1119   if (pr->client_request_list != NULL)
1120     {
1121       GNUNET_CONTAINER_DLL_remove (pr->client_request_list->client_list->rl_head,
1122                                    pr->client_request_list->client_list->rl_tail,
1123                                    pr->client_request_list);
1124       GNUNET_free (pr->client_request_list);
1125       pr->client_request_list = NULL;
1126     }
1127   if (pr->cp != NULL)
1128     {
1129       GNUNET_PEER_resolve (pr->cp->pid,
1130                            &pid);
1131       (void) GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1132                                                    &pid.hashPubKey,
1133                                                    pr);
1134       pr->cp = NULL;
1135     }
1136   if (pr->bf != NULL)
1137     {
1138       GNUNET_CONTAINER_bloomfilter_free (pr->bf);                                        
1139       pr->bf = NULL;
1140     }
1141   if (pr->irc != NULL)
1142     {
1143       GNUNET_CORE_peer_change_preference_cancel (pr->irc);
1144       pr->irc = NULL;
1145     }
1146   if (pr->replies_seen != NULL)
1147     {
1148       GNUNET_free (pr->replies_seen);
1149       pr->replies_seen = NULL;
1150     }
1151   if (pr->task != GNUNET_SCHEDULER_NO_TASK)
1152     {
1153       GNUNET_SCHEDULER_cancel (sched,
1154                                pr->task);
1155       pr->task = GNUNET_SCHEDULER_NO_TASK;
1156     }
1157   while (NULL != pr->pending_head)    
1158     destroy_pending_message_list_entry (pr->pending_head);
1159   GNUNET_PEER_change_rc (pr->target_pid, -1);
1160   if (pr->used_pids != NULL)
1161     {
1162       GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
1163       GNUNET_free (pr->used_pids);
1164       pr->used_pids_off = 0;
1165       pr->used_pids_size = 0;
1166       pr->used_pids = NULL;
1167     }
1168   GNUNET_free (pr);
1169 }
1170
1171
1172 /**
1173  * Method called whenever a given peer connects.
1174  *
1175  * @param cls closure, not used
1176  * @param peer peer identity this notification is about
1177  * @param latency reported latency of the connection with 'other'
1178  * @param distance reported distance (DV) to 'other' 
1179  */
1180 static void 
1181 peer_connect_handler (void *cls,
1182                       const struct
1183                       GNUNET_PeerIdentity * peer,
1184                       struct GNUNET_TIME_Relative latency,
1185                       uint32_t distance)
1186 {
1187   struct ConnectedPeer *cp;
1188   struct MigrationReadyBlock *pos;
1189   char *fn;
1190   uint32_t trust;
1191   
1192   cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
1193   cp->pid = GNUNET_PEER_intern (peer);
1194
1195   fn = get_trust_filename (peer);
1196   if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
1197       (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
1198     cp->disk_trust = cp->trust = ntohl (trust);
1199   GNUNET_free (fn);
1200
1201   GNUNET_break (GNUNET_OK ==
1202                 GNUNET_CONTAINER_multihashmap_put (connected_peers,
1203                                                    &peer->hashPubKey,
1204                                                    cp,
1205                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1206
1207   pos = mig_head;
1208   while (NULL != pos)
1209     {
1210       (void) consider_migration (pos, &peer->hashPubKey, cp);
1211       pos = pos->next;
1212     }
1213 }
1214
1215
1216 /**
1217  * Increase the host credit by a value.
1218  *
1219  * @param host which peer to change the trust value on
1220  * @param value is the int value by which the
1221  *  host credit is to be increased or decreased
1222  * @returns the actual change in trust (positive or negative)
1223  */
1224 static int
1225 change_host_trust (struct ConnectedPeer *host, int value)
1226 {
1227   unsigned int old_trust;
1228
1229   if (value == 0)
1230     return 0;
1231   GNUNET_assert (host != NULL);
1232   old_trust = host->trust;
1233   if (value > 0)
1234     {
1235       if (host->trust + value < host->trust)
1236         {
1237           value = UINT32_MAX - host->trust;
1238           host->trust = UINT32_MAX;
1239         }
1240       else
1241         host->trust += value;
1242     }
1243   else
1244     {
1245       if (host->trust < -value)
1246         {
1247           value = -host->trust;
1248           host->trust = 0;
1249         }
1250       else
1251         host->trust += value;
1252     }
1253   return value;
1254 }
1255
1256
1257 /**
1258  * Write host-trust information to a file - flush the buffer entry!
1259  */
1260 static int
1261 flush_trust (void *cls,
1262              const GNUNET_HashCode *key,
1263              void *value)
1264 {
1265   struct ConnectedPeer *host = value;
1266   char *fn;
1267   uint32_t trust;
1268   struct GNUNET_PeerIdentity pid;
1269
1270   if (host->trust == host->disk_trust)
1271     return GNUNET_OK;                     /* unchanged */
1272   GNUNET_PEER_resolve (host->pid,
1273                        &pid);
1274   fn = get_trust_filename (&pid);
1275   if (host->trust == 0)
1276     {
1277       if ((0 != UNLINK (fn)) && (errno != ENOENT))
1278         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
1279                                   GNUNET_ERROR_TYPE_BULK, "unlink", fn);
1280     }
1281   else
1282     {
1283       trust = htonl (host->trust);
1284       if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust, 
1285                                                     sizeof(uint32_t),
1286                                                     GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE
1287                                                     | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ))
1288         host->disk_trust = host->trust;
1289     }
1290   GNUNET_free (fn);
1291   return GNUNET_OK;
1292 }
1293
1294 /**
1295  * Call this method periodically to scan data/hosts for new hosts.
1296  */
1297 static void
1298 cron_flush_trust (void *cls,
1299                   const struct GNUNET_SCHEDULER_TaskContext *tc)
1300 {
1301
1302   if (NULL == connected_peers)
1303     return;
1304   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1305                                          &flush_trust,
1306                                          NULL);
1307   if (NULL == tc)
1308     return;
1309   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1310     return;
1311   GNUNET_SCHEDULER_add_delayed (tc->sched,
1312                                 TRUST_FLUSH_FREQ, &cron_flush_trust, NULL);
1313 }
1314
1315
1316 /**
1317  * Free (each) request made by the peer.
1318  *
1319  * @param cls closure, points to peer that the request belongs to
1320  * @param key current key code
1321  * @param value value in the hash map
1322  * @return GNUNET_YES (we should continue to iterate)
1323  */
1324 static int
1325 destroy_request (void *cls,
1326                  const GNUNET_HashCode * key,
1327                  void *value)
1328 {
1329   const struct GNUNET_PeerIdentity * peer = cls;
1330   struct PendingRequest *pr = value;
1331   
1332   GNUNET_break (GNUNET_YES ==
1333                 GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1334                                                       &peer->hashPubKey,
1335                                                       pr));
1336   destroy_pending_request (pr);
1337   return GNUNET_YES;
1338 }
1339
1340
1341 /**
1342  * Method called whenever a peer disconnects.
1343  *
1344  * @param cls closure, not used
1345  * @param peer peer identity this notification is about
1346  */
1347 static void
1348 peer_disconnect_handler (void *cls,
1349                          const struct
1350                          GNUNET_PeerIdentity * peer)
1351 {
1352   struct ConnectedPeer *cp;
1353   struct PendingMessage *pm;
1354   unsigned int i;
1355   struct MigrationReadyBlock *pos;
1356   struct MigrationReadyBlock *next;
1357
1358   GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map,
1359                                               &peer->hashPubKey,
1360                                               &destroy_request,
1361                                               (void*) peer);
1362   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1363                                           &peer->hashPubKey);
1364   if (cp == NULL)
1365     return;
1366   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1367     {
1368       if (NULL != cp->last_client_replies[i])
1369         {
1370           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1371           cp->last_client_replies[i] = NULL;
1372         }
1373     }
1374   GNUNET_break (GNUNET_YES ==
1375                 GNUNET_CONTAINER_multihashmap_remove (connected_peers,
1376                                                       &peer->hashPubKey,
1377                                                       cp));
1378   /* remove this peer from migration considerations; schedule
1379      alternatives */
1380   next = mig_head;
1381   while (NULL != (pos = next))
1382     {
1383       next = pos->next;
1384       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1385         {
1386           if (pos->target_list[i] == cp->pid)
1387             {
1388               GNUNET_PEER_change_rc (pos->target_list[i], -1);
1389               pos->target_list[i] = 0;
1390             }
1391          }
1392       if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers))
1393         {
1394           delete_migration_block (pos);
1395           consider_migration_gathering ();
1396           continue;
1397         }
1398       GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1399                                              &consider_migration,
1400                                              pos);
1401     }
1402   GNUNET_PEER_change_rc (cp->pid, -1);
1403   GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1404   if (NULL != cp->cth)
1405     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1406   while (NULL != (pm = cp->pending_messages_head))
1407     destroy_pending_message (pm, 0 /* delivery failed */);
1408   GNUNET_break (0 == cp->pending_requests);
1409   GNUNET_free (cp);
1410 }
1411
1412
1413 /**
1414  * Iterator over hash map entries that removes all occurences
1415  * of the given 'client' from the 'last_client_replies' of the
1416  * given connected peer.
1417  *
1418  * @param cls closure, the 'struct GNUNET_SERVER_Client*' to remove
1419  * @param key current key code (unused)
1420  * @param value value in the hash map (the 'struct ConnectedPeer*' to change)
1421  * @return GNUNET_YES (we should continue to iterate)
1422  */
1423 static int
1424 remove_client_from_last_client_replies (void *cls,
1425                                         const GNUNET_HashCode * key,
1426                                         void *value)
1427 {
1428   struct GNUNET_SERVER_Client *client = cls;
1429   struct ConnectedPeer *cp = value;
1430   unsigned int i;
1431
1432   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1433     {
1434       if (cp->last_client_replies[i] == client)
1435         {
1436           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1437           cp->last_client_replies[i] = NULL;
1438         }
1439     }  
1440   return GNUNET_YES;
1441 }
1442
1443
1444 /**
1445  * A client disconnected.  Remove all of its pending queries.
1446  *
1447  * @param cls closure, NULL
1448  * @param client identification of the client
1449  */
1450 static void
1451 handle_client_disconnect (void *cls,
1452                           struct GNUNET_SERVER_Client
1453                           * client)
1454 {
1455   struct ClientList *pos;
1456   struct ClientList *prev;
1457   struct ClientRequestList *rcl;
1458   struct ClientResponseMessage *creply;
1459
1460   if (client == NULL)
1461     return;
1462   prev = NULL;
1463   pos = client_list;
1464   while ( (NULL != pos) &&
1465           (pos->client != client) )
1466     {
1467       prev = pos;
1468       pos = pos->next;
1469     }
1470   if (pos == NULL)
1471     return; /* no requests pending for this client */
1472   while (NULL != (rcl = pos->rl_head))
1473     {
1474       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1475                   "Destroying pending request `%s' on disconnect\n",
1476                   GNUNET_h2s (&rcl->req->query));
1477       destroy_pending_request (rcl->req);
1478     }
1479   if (prev == NULL)
1480     client_list = pos->next;
1481   else
1482     prev->next = pos->next;
1483   if (pos->th != NULL)
1484     {
1485       GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
1486       pos->th = NULL;
1487     }
1488   while (NULL != (creply = pos->res_head))
1489     {
1490       GNUNET_CONTAINER_DLL_remove (pos->res_head,
1491                                    pos->res_tail,
1492                                    creply);
1493       GNUNET_free (creply);
1494     }    
1495   GNUNET_SERVER_client_drop (pos->client);
1496   GNUNET_free (pos);
1497   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1498                                          &remove_client_from_last_client_replies,
1499                                          client);
1500 }
1501
1502
1503 /**
1504  * Iterator to free peer entries.
1505  *
1506  * @param cls closure, unused
1507  * @param key current key code
1508  * @param value value in the hash map (peer entry)
1509  * @return GNUNET_YES (we should continue to iterate)
1510  */
1511 static int 
1512 clean_peer (void *cls,
1513             const GNUNET_HashCode * key,
1514             void *value)
1515 {
1516   peer_disconnect_handler (NULL, (const struct GNUNET_PeerIdentity*) key);
1517   return GNUNET_YES;
1518 }
1519
1520
1521 /**
1522  * Task run during shutdown.
1523  *
1524  * @param cls unused
1525  * @param tc unused
1526  */
1527 static void
1528 shutdown_task (void *cls,
1529                const struct GNUNET_SCHEDULER_TaskContext *tc)
1530 {
1531   if (mig_qe != NULL)
1532     {
1533       GNUNET_DATASTORE_cancel (mig_qe);
1534       mig_qe = NULL;
1535     }
1536   if (GNUNET_SCHEDULER_NO_TASK != mig_task)
1537     {
1538       GNUNET_SCHEDULER_cancel (sched, mig_task);
1539       mig_task = GNUNET_SCHEDULER_NO_TASK;
1540     }
1541   while (client_list != NULL)
1542     handle_client_disconnect (NULL,
1543                               client_list->client);
1544   cron_flush_trust (NULL, NULL);
1545   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1546                                          &clean_peer,
1547                                          NULL);
1548   GNUNET_break (0 == GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap));
1549   GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
1550   requests_by_expiration_heap = 0;
1551   GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
1552   connected_peers = NULL;
1553   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map));
1554   GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
1555   query_request_map = NULL;
1556   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map));
1557   GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
1558   peer_request_map = NULL;
1559   GNUNET_assert (NULL != core);
1560   GNUNET_CORE_disconnect (core);
1561   core = NULL;
1562   if (stats != NULL)
1563     {
1564       GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
1565       stats = NULL;
1566     }
1567   GNUNET_DATASTORE_disconnect (dsh,
1568                                GNUNET_NO);
1569   while (mig_head != NULL)
1570     delete_migration_block (mig_head);
1571   GNUNET_assert (0 == mig_size);
1572   dsh = NULL;
1573   sched = NULL;
1574   cfg = NULL;  
1575   GNUNET_free_non_null (trustDirectory);
1576   trustDirectory = NULL;
1577 }
1578
1579
1580 /* ******************* Utility functions  ******************** */
1581
1582
1583 /**
1584  * Transmit messages by copying it to the target buffer
1585  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
1586  * for writing in the meantime.  In that case, do nothing
1587  * (the disconnect or shutdown handler will take care of the rest).
1588  * If we were able to transmit messages and there are still more
1589  * pending, ask core again for further calls to this function.
1590  *
1591  * @param cls closure, pointer to the 'struct ConnectedPeer*'
1592  * @param size number of bytes available in buf
1593  * @param buf where the callee should write the message
1594  * @return number of bytes written to buf
1595  */
1596 static size_t
1597 transmit_to_peer (void *cls,
1598                   size_t size, void *buf)
1599 {
1600   struct ConnectedPeer *cp = cls;
1601   char *cbuf = buf;
1602   struct GNUNET_PeerIdentity pid;
1603   struct PendingMessage *pm;
1604   struct MigrationReadyBlock *mb;
1605   struct MigrationReadyBlock *next;
1606   struct PutMessage migm;
1607   size_t msize;
1608   unsigned int i;
1609  
1610   cp->cth = NULL;
1611   if (NULL == buf)
1612     {
1613 #if DEBUG_FS
1614       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1615                   "Dropping message, core too busy.\n");
1616 #endif
1617       return 0;
1618     }
1619   msize = 0;
1620   while ( (NULL != (pm = cp->pending_messages_head) ) &&
1621           (pm->msize <= size) )
1622     {
1623       memcpy (&cbuf[msize], &pm[1], pm->msize);
1624       msize += pm->msize;
1625       size -= pm->msize;
1626       destroy_pending_message (pm, cp->pid);
1627     }
1628   if (NULL != pm)
1629     {
1630       GNUNET_PEER_resolve (cp->pid,
1631                            &pid);
1632       cp->cth = GNUNET_CORE_notify_transmit_ready (core,
1633                                                    pm->priority,
1634                                                    GNUNET_CONSTANTS_SERVICE_TIMEOUT,
1635                                                    &pid,
1636                                                    pm->msize,
1637                                                    &transmit_to_peer,
1638                                                    cp);
1639     }
1640   else
1641     {      
1642       next = mig_head;
1643       while (NULL != (mb = next))
1644         {
1645           next = mb->next;
1646           for (i=0;i<MIGRATION_LIST_SIZE;i++)
1647             {
1648               if ( (cp->pid == mb->target_list[i]) &&
1649                    (mb->size + sizeof (migm) <= size) )
1650                 {
1651                   GNUNET_PEER_change_rc (mb->target_list[i], -1);
1652                   mb->target_list[i] = 0;
1653                   mb->used_targets++;
1654                   migm.header.size = htons (sizeof (migm) + mb->size);
1655                   migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
1656                   migm.type = htonl (mb->type);
1657                   migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
1658                   memcpy (&cbuf[msize], &migm, sizeof (migm));
1659                   msize += sizeof (migm);
1660                   size -= sizeof (migm);
1661                   memcpy (&cbuf[msize], &mb[1], mb->size);
1662                   msize += mb->size;
1663                   size -= mb->size;
1664 #if DEBUG_FS
1665                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1666                               "Pushing migration block `%s' (%u bytes) to `%s'\n",
1667                               GNUNET_h2s (&mb->query),
1668                               mb->size,
1669                               GNUNET_i2s (&pid));
1670 #endif    
1671                   break;
1672                 }
1673               else
1674                 {
1675 #if DEBUG_FS
1676                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1677                               "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
1678                               GNUNET_h2s (&mb->query),
1679                               mb->size,
1680                               GNUNET_i2s (&pid));
1681 #endif    
1682                 }
1683             }
1684           if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
1685                (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
1686             {
1687               delete_migration_block (mb);
1688               consider_migration_gathering ();
1689             }
1690         }
1691       consider_migration (NULL, 
1692                           &pid.hashPubKey,
1693                           cp);
1694     }
1695 #if DEBUG_FS
1696   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1697               "Transmitting %u bytes to peer %u\n",
1698               msize,
1699               cp->pid);
1700 #endif
1701   return msize;
1702 }
1703
1704
1705 /**
1706  * Add a message to the set of pending messages for the given peer.
1707  *
1708  * @param cp peer to send message to
1709  * @param pm message to queue
1710  * @param pr request on which behalf this message is being queued
1711  */
1712 static void
1713 add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
1714                                   struct PendingMessage *pm,
1715                                   struct PendingRequest *pr)
1716 {
1717   struct PendingMessage *pos;
1718   struct PendingMessageList *pml;
1719   struct GNUNET_PeerIdentity pid;
1720
1721   GNUNET_assert (pm->next == NULL);
1722   GNUNET_assert (pm->pml == NULL);    
1723   pml = GNUNET_malloc (sizeof (struct PendingMessageList));
1724   pml->req = pr;
1725   pml->target = cp;
1726   pml->pm = pm;
1727   pm->pml = pml;  
1728   GNUNET_CONTAINER_DLL_insert (pr->pending_head,
1729                                pr->pending_tail,
1730                                pml);
1731   pos = cp->pending_messages_head;
1732   while ( (pos != NULL) &&
1733           (pm->priority < pos->priority) )
1734     pos = pos->next;    
1735   GNUNET_CONTAINER_DLL_insert_after (cp->pending_messages_head,
1736                                      cp->pending_messages_tail,
1737                                      pos,
1738                                      pm);
1739   cp->pending_requests++;
1740   if (cp->pending_requests > MAX_QUEUE_PER_PEER)
1741     destroy_pending_message (cp->pending_messages_tail, 0);  
1742   GNUNET_PEER_resolve (cp->pid, &pid);
1743   if (NULL != cp->cth)
1744     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1745   /* need to schedule transmission */
1746   cp->cth = GNUNET_CORE_notify_transmit_ready (core,
1747                                                cp->pending_messages_head->priority,
1748                                                MAX_TRANSMIT_DELAY,
1749                                                &pid,
1750                                                cp->pending_messages_head->msize,
1751                                                &transmit_to_peer,
1752                                                cp);
1753   if (cp->cth == NULL)
1754     {
1755 #if DEBUG_FS
1756       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1757                   "Failed to schedule transmission with core!\n");
1758 #endif
1759       GNUNET_STATISTICS_update (stats,
1760                                 gettext_noop ("# CORE transmission failures"),
1761                                 1,
1762                                 GNUNET_NO);
1763     }
1764 }
1765
1766
1767 /**
1768  * Mingle hash with the mingle_number to produce different bits.
1769  */
1770 static void
1771 mingle_hash (const GNUNET_HashCode * in,
1772              int32_t mingle_number, 
1773              GNUNET_HashCode * hc)
1774 {
1775   GNUNET_HashCode m;
1776
1777   GNUNET_CRYPTO_hash (&mingle_number, 
1778                       sizeof (int32_t), 
1779                       &m);
1780   GNUNET_CRYPTO_hash_xor (&m, in, hc);
1781 }
1782
1783
1784 /**
1785  * Test if the load on this peer is too high
1786  * to even consider processing the query at
1787  * all.
1788  * 
1789  * @return GNUNET_YES if the load is too high to do anything, GNUNET_NO to forward (load high, but not too high), GNUNET_SYSERR to indirect (load low)
1790  */
1791 static int
1792 test_load_too_high ()
1793 {
1794   return GNUNET_SYSERR; // FIXME
1795 }
1796
1797
1798 /* ******************* Pending Request Refresh Task ******************** */
1799
1800
1801
1802 /**
1803  * We use a random delay to make the timing of requests less
1804  * predictable.  This function returns such a random delay.  We add a base
1805  * delay of MAX_CORK_DELAY (1s).
1806  *
1807  * FIXME: make schedule dependent on the specifics of the request?
1808  * Or bandwidth and number of connected peers and load?
1809  *
1810  * @return random delay to use for some request, between 1s and 1000+TTL_DECREMENT ms
1811  */
1812 static struct GNUNET_TIME_Relative
1813 get_processing_delay ()
1814 {
1815   return 
1816     GNUNET_TIME_relative_add (GNUNET_CONSTANTS_MAX_CORK_DELAY,
1817                               GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
1818                                                              GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1819                                                                                        TTL_DECREMENT)));
1820 }
1821
1822
1823 /**
1824  * We're processing a GET request from another peer and have decided
1825  * to forward it to other peers.  This function is called periodically
1826  * and should forward the request to other peers until we have all
1827  * possible replies.  If we have transmitted the *only* reply to
1828  * the initiator we should destroy the pending request.  If we have
1829  * many replies in the queue to the initiator, we should delay sending
1830  * out more queries until the reply queue has shrunk some.
1831  *
1832  * @param cls our "struct ProcessGetContext *"
1833  * @param tc unused
1834  */
1835 static void
1836 forward_request_task (void *cls,
1837                       const struct GNUNET_SCHEDULER_TaskContext *tc);
1838
1839
1840 /**
1841  * Function called after we either failed or succeeded
1842  * at transmitting a query to a peer.  
1843  *
1844  * @param cls the requests "struct PendingRequest*"
1845  * @param tpid ID of receiving peer, 0 on transmission error
1846  */
1847 static void
1848 transmit_query_continuation (void *cls,
1849                              GNUNET_PEER_Id tpid)
1850 {
1851   struct PendingRequest *pr = cls;
1852
1853   GNUNET_STATISTICS_update (stats,
1854                             gettext_noop ("# queries scheduled for forwarding"),
1855                             -1,
1856                             GNUNET_NO);
1857   if (tpid == 0)   
1858     {
1859 #if DEBUG_FS
1860       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1861                   "Transmission of request failed, will try again later.\n");
1862 #endif
1863       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
1864         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1865                                                  get_processing_delay (),
1866                                                  &forward_request_task,
1867                                                  pr); 
1868       return;    
1869     }
1870   GNUNET_STATISTICS_update (stats,
1871                             gettext_noop ("# queries forwarded"),
1872                             1,
1873                             GNUNET_NO);
1874   GNUNET_PEER_change_rc (tpid, 1);
1875   if (pr->used_pids_off == pr->used_pids_size)
1876     GNUNET_array_grow (pr->used_pids,
1877                        pr->used_pids_size,
1878                        pr->used_pids_size * 2 + 2);
1879   pr->used_pids[pr->used_pids_off++] = tpid;
1880   if (pr->task == GNUNET_SCHEDULER_NO_TASK)
1881     pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1882                                              get_processing_delay (),
1883                                              &forward_request_task,
1884                                              pr);
1885 }
1886
1887
1888 /**
1889  * How many bytes should a bloomfilter be if we have already seen
1890  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
1891  * of bits set per entry.  Furthermore, we should not re-size the
1892  * filter too often (to keep it cheap).
1893  *
1894  * Since other peers will also add entries but not resize the filter,
1895  * we should generally pick a slightly larger size than what the
1896  * strict math would suggest.
1897  *
1898  * @return must be a power of two and smaller or equal to 2^15.
1899  */
1900 static size_t
1901 compute_bloomfilter_size (unsigned int entry_count)
1902 {
1903   size_t size;
1904   unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
1905   uint16_t max = 1 << 15;
1906
1907   if (entry_count > max)
1908     return max;
1909   size = 8;
1910   while ((size < max) && (size < ideal))
1911     size *= 2;
1912   if (size > max)
1913     return max;
1914   return size;
1915 }
1916
1917
1918 /**
1919  * Recalculate our bloom filter for filtering replies.  This function
1920  * will create a new bloom filter from scratch, so it should only be
1921  * called if we have no bloomfilter at all (and hence can create a
1922  * fresh one of minimal size without problems) OR if our peer is the
1923  * initiator (in which case we may resize to larger than mimimum size).
1924  *
1925  * @param pr request for which the BF is to be recomputed
1926  */
1927 static void
1928 refresh_bloomfilter (struct PendingRequest *pr)
1929 {
1930   unsigned int i;
1931   size_t nsize;
1932   GNUNET_HashCode mhash;
1933
1934   nsize = compute_bloomfilter_size (pr->replies_seen_off);
1935   if (nsize == pr->bf_size)
1936     return; /* size not changed */
1937   if (pr->bf != NULL)
1938     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
1939   pr->bf_size = nsize;
1940   pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
1941   pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
1942                                               pr->bf_size,
1943                                               BLOOMFILTER_K);
1944   for (i=0;i<pr->replies_seen_off;i++)
1945     {
1946       mingle_hash (&pr->replies_seen[i], pr->mingle, &mhash);
1947       GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
1948     }
1949 }
1950
1951
1952 /**
1953  * Function called after we've tried to reserve a certain amount of
1954  * bandwidth for a reply.  Check if we succeeded and if so send our
1955  * query.
1956  *
1957  * @param cls the requests "struct PendingRequest*"
1958  * @param peer identifies the peer
1959  * @param bpm_in set to the current bandwidth limit (receiving) for this peer
1960  * @param bpm_out set to the current bandwidth limit (sending) for this peer
1961  * @param amount set to the amount that was actually reserved or unreserved
1962  * @param preference current traffic preference for the given peer
1963  */
1964 static void
1965 target_reservation_cb (void *cls,
1966                        const struct
1967                        GNUNET_PeerIdentity * peer,
1968                        struct GNUNET_BANDWIDTH_Value32NBO bpm_in,
1969                        struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
1970                        int amount,
1971                        uint64_t preference)
1972 {
1973   struct PendingRequest *pr = cls;
1974   struct ConnectedPeer *cp;
1975   struct PendingMessage *pm;
1976   struct GetMessage *gm;
1977   GNUNET_HashCode *ext;
1978   char *bfdata;
1979   size_t msize;
1980   unsigned int k;
1981   int no_route;
1982   uint32_t bm;
1983
1984   pr->irc = NULL;
1985   if (peer == NULL)
1986     {
1987       /* error in communication with core, try again later */
1988       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
1989         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1990                                                  get_processing_delay (),
1991                                                  &forward_request_task,
1992                                                  pr);
1993       return;
1994     }
1995   // (3) transmit, update ttl/priority
1996   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1997                                           &peer->hashPubKey);
1998   if (cp == NULL)
1999     {
2000       /* Peer must have just left */
2001 #if DEBUG_FS
2002       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2003                   "Selected peer disconnected!\n");
2004 #endif
2005       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2006         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2007                                                  get_processing_delay (),
2008                                                  &forward_request_task,
2009                                                  pr);
2010       return;
2011     }
2012   no_route = GNUNET_NO;
2013   if (amount == 0)
2014     {
2015       if (pr->cp == NULL)
2016         {
2017 #if DEBUG_FS > 1
2018           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2019                       "Failed to reserve bandwidth for reply (got %d/%u bytes only)!\n",
2020                       amount,
2021                       DBLOCK_SIZE);
2022 #endif
2023           GNUNET_STATISTICS_update (stats,
2024                                     gettext_noop ("# reply bandwidth reservation requests failed"),
2025                                     1,
2026                                     GNUNET_NO);
2027           if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2028             pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2029                                                      get_processing_delay (),
2030                                                      &forward_request_task,
2031                                                      pr);
2032           return;  /* this target round failed */
2033         }
2034       /* FIXME: if we are "quite" busy, we may still want to skip
2035          this round; need more load detection code! */
2036       no_route = GNUNET_YES;
2037     }
2038   
2039   GNUNET_STATISTICS_update (stats,
2040                             gettext_noop ("# queries scheduled for forwarding"),
2041                             1,
2042                             GNUNET_NO);
2043   /* build message and insert message into priority queue */
2044 #if DEBUG_FS
2045   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2046               "Forwarding request `%s' to `%4s'!\n",
2047               GNUNET_h2s (&pr->query),
2048               GNUNET_i2s (peer));
2049 #endif
2050   k = 0;
2051   bm = 0;
2052   if (GNUNET_YES == no_route)
2053     {
2054       bm |= GET_MESSAGE_BIT_RETURN_TO;
2055       k++;      
2056     }
2057   if (pr->namespace != NULL)
2058     {
2059       bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
2060       k++;
2061     }
2062   if (pr->target_pid != 0)
2063     {
2064       bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
2065       k++;
2066     }
2067   msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
2068   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
2069   pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
2070   pm->msize = msize;
2071   gm = (struct GetMessage*) &pm[1];
2072   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
2073   gm->header.size = htons (msize);
2074   gm->type = htonl (pr->type);
2075   pr->remaining_priority /= 2;
2076   gm->priority = htonl (pr->remaining_priority);
2077   gm->ttl = htonl (pr->ttl);
2078   gm->filter_mutator = htonl(pr->mingle); 
2079   gm->hash_bitmap = htonl (bm);
2080   gm->query = pr->query;
2081   ext = (GNUNET_HashCode*) &gm[1];
2082   k = 0;
2083   if (GNUNET_YES == no_route)
2084     GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2085   if (pr->namespace != NULL)
2086     memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
2087   if (pr->target_pid != 0)
2088     GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2089   bfdata = (char *) &ext[k];
2090   if (pr->bf != NULL)
2091     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
2092                                                bfdata,
2093                                                pr->bf_size);
2094   pm->cont = &transmit_query_continuation;
2095   pm->cont_cls = pr;
2096   add_to_pending_messages_for_peer (cp, pm, pr);
2097 }
2098
2099
2100 /**
2101  * Closure used for "target_peer_select_cb".
2102  */
2103 struct PeerSelectionContext 
2104 {
2105   /**
2106    * The request for which we are selecting
2107    * peers.
2108    */
2109   struct PendingRequest *pr;
2110
2111   /**
2112    * Current "prime" target.
2113    */
2114   struct GNUNET_PeerIdentity target;
2115
2116   /**
2117    * How much do we like this target?
2118    */
2119   double target_score;
2120
2121 };
2122
2123
2124 /**
2125  * Function called for each connected peer to determine
2126  * which one(s) would make good targets for forwarding.
2127  *
2128  * @param cls closure (struct PeerSelectionContext)
2129  * @param key current key code (peer identity)
2130  * @param value value in the hash map (struct ConnectedPeer)
2131  * @return GNUNET_YES if we should continue to
2132  *         iterate,
2133  *         GNUNET_NO if not.
2134  */
2135 static int
2136 target_peer_select_cb (void *cls,
2137                        const GNUNET_HashCode * key,
2138                        void *value)
2139 {
2140   struct PeerSelectionContext *psc = cls;
2141   struct ConnectedPeer *cp = value;
2142   struct PendingRequest *pr = psc->pr;
2143   double score;
2144   unsigned int i;
2145   unsigned int pc;
2146
2147   /* 1) check that this peer is not the initiator */
2148   if (cp == pr->cp)
2149     {
2150 #if DEBUG_FS
2151       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2152                   "Skipping initiator in forwarding selection\n");
2153 #endif
2154       return GNUNET_YES; /* skip */        
2155     }
2156
2157   /* 2) check if we have already (recently) forwarded to this peer */
2158   pc = 0;
2159   for (i=0;i<pr->used_pids_off;i++)
2160     if (pr->used_pids[i] == cp->pid) 
2161       {
2162         pc++;
2163         if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2164                                            RETRY_PROBABILITY_INV))
2165           {
2166 #if DEBUG_FS
2167             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2168                         "NOT re-trying query that was previously transmitted %u times\n",
2169                         (unsigned int) pr->used_pids_off);
2170 #endif
2171             return GNUNET_YES; /* skip */
2172           }
2173       }
2174 #if DEBUG_FS
2175   if (0 < pc)
2176     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2177                 "Re-trying query that was previously transmitted %u times to this peer\n",
2178                 (unsigned int) pc);
2179 #endif
2180   /* 3) calculate how much we'd like to forward to this peer,
2181      starting with a random value that is strong enough
2182      to at least give any peer a chance sometimes 
2183      (compared to the other factors that come later) */
2184   /* 3a) count successful (recent) routes from cp for same source */
2185   if (pr->cp != NULL)
2186     {
2187       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2188                                         P2P_SUCCESS_LIST_SIZE);
2189       for (i=0;i<P2P_SUCCESS_LIST_SIZE;i++)
2190         if (cp->last_p2p_replies[i] == pr->cp->pid)
2191           score += 1; /* likely successful based on hot path */
2192     }
2193   else
2194     {
2195       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2196                                         CS2P_SUCCESS_LIST_SIZE);
2197       for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
2198         if (cp->last_client_replies[i] == pr->client_request_list->client_list->client)
2199           score += 1; /* likely successful based on hot path */
2200     }
2201   /* 3b) include latency */
2202   if (cp->avg_delay.value < 4 * TTL_DECREMENT)
2203     score += 1; /* likely fast based on latency */
2204   /* 3c) include priorities */
2205   if (cp->avg_priority <= pr->remaining_priority / 2.0)
2206     score += 1; /* likely successful based on priorities */
2207   /* 3d) penalize for queue size */  
2208   score -= (2.0 * cp->pending_requests / (double) MAX_QUEUE_PER_PEER); 
2209   /* 3e) include peer proximity */
2210   score -= (2.0 * (GNUNET_CRYPTO_hash_distance_u32 (key,
2211                                                     &pr->query)) / (double) UINT32_MAX);
2212   /* 4) super-bonus for being the known target */
2213   if (pr->target_pid == cp->pid)
2214     score += 100.0;
2215   /* store best-fit in closure */
2216 #if DEBUG_FS
2217   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2218               "Peer `%s' gets score %f for forwarding query, max is %f\n",
2219               GNUNET_h2s (key),
2220               score,
2221               psc->target_score);
2222 #endif  
2223   score++; /* avoid zero */
2224   if (score > psc->target_score)
2225     {
2226       psc->target_score = score;
2227       psc->target.hashPubKey = *key; 
2228     }
2229   return GNUNET_YES;
2230 }
2231   
2232
2233 /**
2234  * The priority level imposes a bound on the maximum
2235  * value for the ttl that can be requested.
2236  *
2237  * @param ttl_in requested ttl
2238  * @param prio given priority
2239  * @return ttl_in if ttl_in is below the limit,
2240  *         otherwise the ttl-limit for the given priority
2241  */
2242 static int32_t
2243 bound_ttl (int32_t ttl_in, uint32_t prio)
2244 {
2245   unsigned long long allowed;
2246
2247   if (ttl_in <= 0)
2248     return ttl_in;
2249   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
2250   if (ttl_in > allowed)      
2251     {
2252       if (allowed >= (1 << 30))
2253         return 1 << 30;
2254       return allowed;
2255     }
2256   return ttl_in;
2257 }
2258
2259
2260 /**
2261  * We're processing a GET request and have decided
2262  * to forward it to other peers.  This function is called periodically
2263  * and should forward the request to other peers until we have all
2264  * possible replies.  If we have transmitted the *only* reply to
2265  * the initiator we should destroy the pending request.  If we have
2266  * many replies in the queue to the initiator, we should delay sending
2267  * out more queries until the reply queue has shrunk some.
2268  *
2269  * @param cls our "struct ProcessGetContext *"
2270  * @param tc unused
2271  */
2272 static void
2273 forward_request_task (void *cls,
2274                      const struct GNUNET_SCHEDULER_TaskContext *tc)
2275 {
2276   struct PendingRequest *pr = cls;
2277   struct PeerSelectionContext psc;
2278   struct ConnectedPeer *cp; 
2279   struct GNUNET_TIME_Relative delay;
2280
2281   pr->task = GNUNET_SCHEDULER_NO_TASK;
2282   if (pr->irc != NULL)
2283     {
2284 #if DEBUG_FS
2285       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2286                   "Forwarding of query `%s' not attempted due to pending local lookup!\n",
2287                   GNUNET_h2s (&pr->query));
2288 #endif
2289       return; /* already pending */
2290     }
2291   if (GNUNET_YES == pr->local_only)
2292     return; /* configured to not do P2P search */
2293   /* (1) select target */
2294   psc.pr = pr;
2295   psc.target_score = -DBL_MAX;
2296   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
2297                                          &target_peer_select_cb,
2298                                          &psc);  
2299   if (psc.target_score == -DBL_MAX)
2300     {
2301       delay = get_processing_delay ();
2302 #if DEBUG_FS 
2303       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2304                   "No peer selected for forwarding of query `%s', will try again in %llu ms!\n",
2305                   GNUNET_h2s (&pr->query),
2306                   delay.value);
2307 #endif
2308       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2309                                                delay,
2310                                                &forward_request_task,
2311                                                pr);
2312       return; /* nobody selected */
2313     }
2314   /* (3) update TTL/priority */
2315   if (pr->client_request_list != NULL)
2316     {
2317       /* FIXME: use better algorithm!? */
2318       if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2319                                          4))
2320         pr->priority++;
2321       /* bound priority we use by priorities we see from other peers
2322          rounded up (must round up so that we can see non-zero
2323          priorities, but round up as little as possible to make it
2324          plausible that we forwarded another peers request) */
2325       if (pr->priority > current_priorities + 1.0)
2326         pr->priority = (uint32_t) current_priorities + 1.0;
2327       pr->ttl = bound_ttl (pr->ttl + TTL_DECREMENT * 2,
2328                            pr->priority);
2329 #if DEBUG_FS
2330       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2331                   "Trying query `%s' with priority %u and TTL %d.\n",
2332                   GNUNET_h2s (&pr->query),
2333                   pr->priority,
2334                   pr->ttl);
2335 #endif
2336     }
2337
2338   /* (3) reserve reply bandwidth */
2339   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2340                                           &psc.target.hashPubKey);
2341   GNUNET_assert (NULL != cp);
2342   pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
2343                                                 &psc.target,
2344                                                 GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
2345                                                 GNUNET_BANDWIDTH_value_init (UINT32_MAX),
2346                                                 DBLOCK_SIZE * 2, 
2347                                                 cp->inc_preference,
2348                                                 &target_reservation_cb,
2349                                                 pr);
2350   cp->inc_preference = 0;
2351 }
2352
2353
2354 /* **************************** P2P PUT Handling ************************ */
2355
2356
2357 /**
2358  * Function called after we either failed or succeeded
2359  * at transmitting a reply to a peer.  
2360  *
2361  * @param cls the requests "struct PendingRequest*"
2362  * @param tpid ID of receiving peer, 0 on transmission error
2363  */
2364 static void
2365 transmit_reply_continuation (void *cls,
2366                              GNUNET_PEER_Id tpid)
2367 {
2368   struct PendingRequest *pr = cls;
2369   
2370   switch (pr->type)
2371     {
2372     case GNUNET_BLOCK_TYPE_DBLOCK:
2373     case GNUNET_BLOCK_TYPE_IBLOCK:
2374       /* only one reply expected, done with the request! */
2375       destroy_pending_request (pr);
2376       break;
2377     case GNUNET_BLOCK_TYPE_ANY:
2378     case GNUNET_BLOCK_TYPE_KBLOCK:
2379     case GNUNET_BLOCK_TYPE_SBLOCK:
2380       break;
2381     default:
2382       GNUNET_break (0);
2383       break;
2384     }
2385 }
2386
2387
2388 /**
2389  * Transmit the given message by copying it to the target buffer
2390  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
2391  * for writing in the meantime.  In that case, do nothing
2392  * (the disconnect or shutdown handler will take care of the rest).
2393  * If we were able to transmit messages and there are still more
2394  * pending, ask core again for further calls to this function.
2395  *
2396  * @param cls closure, pointer to the 'struct ClientList*'
2397  * @param size number of bytes available in buf
2398  * @param buf where the callee should write the message
2399  * @return number of bytes written to buf
2400  */
2401 static size_t
2402 transmit_to_client (void *cls,
2403                   size_t size, void *buf)
2404 {
2405   struct ClientList *cl = cls;
2406   char *cbuf = buf;
2407   struct ClientResponseMessage *creply;
2408   size_t msize;
2409   
2410   cl->th = NULL;
2411   if (NULL == buf)
2412     {
2413 #if DEBUG_FS
2414       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2415                   "Not sending reply, client communication problem.\n");
2416 #endif
2417       return 0;
2418     }
2419   msize = 0;
2420   while ( (NULL != (creply = cl->res_head) ) &&
2421           (creply->msize <= size) )
2422     {
2423       memcpy (&cbuf[msize], &creply[1], creply->msize);
2424       msize += creply->msize;
2425       size -= creply->msize;
2426       GNUNET_CONTAINER_DLL_remove (cl->res_head,
2427                                    cl->res_tail,
2428                                    creply);
2429       GNUNET_free (creply);
2430     }
2431   if (NULL != creply)
2432     cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
2433                                                   creply->msize,
2434                                                   GNUNET_TIME_UNIT_FOREVER_REL,
2435                                                   &transmit_to_client,
2436                                                   cl);
2437 #if DEBUG_FS
2438   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2439               "Transmitted %u bytes to client\n",
2440               (unsigned int) msize);
2441 #endif
2442   return msize;
2443 }
2444
2445
2446 /**
2447  * Closure for "process_reply" function.
2448  */
2449 struct ProcessReplyClosure
2450 {
2451   /**
2452    * The data for the reply.
2453    */
2454   const void *data;
2455
2456   /**
2457    * Who gave us this reply? NULL for local host.
2458    */
2459   struct ConnectedPeer *sender;
2460
2461   /**
2462    * When the reply expires.
2463    */
2464   struct GNUNET_TIME_Absolute expiration;
2465
2466   /**
2467    * Size of data.
2468    */
2469   size_t size;
2470
2471   /**
2472    * Namespace that this reply belongs to
2473    * (if it is of type SBLOCK).
2474    */
2475   GNUNET_HashCode namespace;
2476
2477   /**
2478    * Type of the block.
2479    */
2480   enum GNUNET_BLOCK_Type type;
2481
2482   /**
2483    * How much was this reply worth to us?
2484    */
2485   uint32_t priority;
2486
2487   /**
2488    * Did we finish processing the associated request?
2489    */ 
2490   int finished;
2491 };
2492
2493
2494 /**
2495  * We have received a reply; handle it!
2496  *
2497  * @param cls response (struct ProcessReplyClosure)
2498  * @param key our query
2499  * @param value value in the hash map (info about the query)
2500  * @return GNUNET_YES (we should continue to iterate)
2501  */
2502 static int
2503 process_reply (void *cls,
2504                const GNUNET_HashCode * key,
2505                void *value)
2506 {
2507   struct ProcessReplyClosure *prq = cls;
2508   struct PendingRequest *pr = value;
2509   struct PendingMessage *reply;
2510   struct ClientResponseMessage *creply;
2511   struct ClientList *cl;
2512   struct PutMessage *pm;
2513   struct ConnectedPeer *cp;
2514   struct GNUNET_TIME_Relative cur_delay;
2515   GNUNET_HashCode chash;
2516   GNUNET_HashCode mhash;
2517   size_t msize;
2518
2519 #if DEBUG_FS
2520   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2521               "Matched result (type %u) for query `%s' with pending request\n",
2522               (unsigned int) prq->type,
2523               GNUNET_h2s (key));
2524 #endif  
2525   GNUNET_STATISTICS_update (stats,
2526                             gettext_noop ("# replies received and matched"),
2527                             1,
2528                             GNUNET_NO);
2529   if (prq->sender != NULL)
2530     {
2531       /* FIXME: should we be more precise here and not use
2532          "start_time" but a peer-specific time stamp? */
2533       cur_delay = GNUNET_TIME_absolute_get_duration (pr->start_time);
2534       prq->sender->avg_delay.value
2535         = (prq->sender->avg_delay.value * 
2536            (RUNAVG_DELAY_N - 1) + cur_delay.value) / RUNAVG_DELAY_N; 
2537       prq->sender->avg_priority
2538         = (prq->sender->avg_priority * 
2539            (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
2540       if (pr->cp != NULL)
2541         {
2542           GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
2543                                  [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], 
2544                                  -1);
2545           GNUNET_PEER_change_rc (pr->cp->pid, 1);
2546           prq->sender->last_p2p_replies
2547             [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
2548             = pr->cp->pid;
2549         }
2550       else
2551         {
2552           if (NULL != prq->sender->last_client_replies
2553               [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
2554             GNUNET_SERVER_client_drop (prq->sender->last_client_replies
2555                                        [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
2556           prq->sender->last_client_replies
2557             [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
2558             = pr->client_request_list->client_list->client;
2559           GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
2560         }
2561     }
2562   GNUNET_CRYPTO_hash (prq->data,
2563                       prq->size,
2564                       &chash);
2565   switch (prq->type)
2566     {
2567     case GNUNET_BLOCK_TYPE_DBLOCK:
2568     case GNUNET_BLOCK_TYPE_IBLOCK:
2569       /* only possible reply, stop requesting! */
2570       while (NULL != pr->pending_head)
2571         destroy_pending_message_list_entry (pr->pending_head);
2572       if (pr->qe != NULL)
2573         {
2574           if (pr->client_request_list != NULL)
2575             GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
2576                                         GNUNET_YES);
2577           GNUNET_DATASTORE_cancel (pr->qe);
2578           pr->qe = NULL;
2579         }
2580       pr->do_remove = GNUNET_YES;
2581       if (pr->task != GNUNET_SCHEDULER_NO_TASK)
2582         {
2583           GNUNET_SCHEDULER_cancel (sched,
2584                                    pr->task);
2585           pr->task = GNUNET_SCHEDULER_NO_TASK;
2586         }
2587       GNUNET_break (GNUNET_YES ==
2588                     GNUNET_CONTAINER_multihashmap_remove (query_request_map,
2589                                                           key,
2590                                                           pr));
2591       break;
2592     case GNUNET_BLOCK_TYPE_SBLOCK:
2593       if (pr->namespace == NULL)
2594         {
2595           GNUNET_break (0);
2596           return GNUNET_YES;
2597         }
2598       if (0 != memcmp (pr->namespace,
2599                        &prq->namespace,
2600                        sizeof (GNUNET_HashCode)))
2601         {
2602           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2603                       _("Reply mismatched in terms of namespace.  Discarded.\n"));
2604           return GNUNET_YES; /* wrong namespace */      
2605         }
2606       /* then: fall-through! */
2607     case GNUNET_BLOCK_TYPE_KBLOCK:
2608     case GNUNET_BLOCK_TYPE_NBLOCK:
2609       if (pr->bf != NULL) 
2610         {
2611           mingle_hash (&chash, pr->mingle, &mhash);
2612           if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (pr->bf,
2613                                                                &mhash))
2614             {
2615               GNUNET_STATISTICS_update (stats,
2616                                         gettext_noop ("# duplicate replies discarded (bloomfilter)"),
2617                                         1,
2618                                         GNUNET_NO);
2619 #if DEBUG_FS
2620               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2621                           "Duplicate response `%s', discarding.\n",
2622                           GNUNET_h2s (&mhash));
2623 #endif
2624               return GNUNET_YES; /* duplicate */
2625             }
2626 #if DEBUG_FS
2627           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2628                       "New response `%s', adding to filter.\n",
2629                       GNUNET_h2s (&mhash));
2630 #endif
2631         }
2632       if (pr->client_request_list != NULL)
2633         {
2634           if (pr->replies_seen_size == pr->replies_seen_off)
2635             GNUNET_array_grow (pr->replies_seen,
2636                                pr->replies_seen_size,
2637                                pr->replies_seen_size * 2 + 4);  
2638             pr->replies_seen[pr->replies_seen_off++] = chash;         
2639         }
2640       if ( (pr->bf == NULL) ||
2641            (pr->client_request_list != NULL) )
2642         refresh_bloomfilter (pr);
2643       GNUNET_CONTAINER_bloomfilter_add (pr->bf,
2644                                         &mhash);
2645       break;
2646     default:
2647       GNUNET_break (0);
2648       return GNUNET_YES;
2649     }
2650   prq->priority += pr->remaining_priority;
2651   pr->remaining_priority = 0;
2652   if (NULL != pr->client_request_list)
2653     {
2654       GNUNET_STATISTICS_update (stats,
2655                                 gettext_noop ("# replies received for local clients"),
2656                                 1,
2657                                 GNUNET_NO);
2658       cl = pr->client_request_list->client_list;
2659       msize = sizeof (struct PutMessage) + prq->size;
2660       creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage));
2661       creply->msize = msize;
2662       creply->client_list = cl;
2663       GNUNET_CONTAINER_DLL_insert_after (cl->res_head,
2664                                          cl->res_tail,
2665                                          cl->res_tail,
2666                                          creply);      
2667       pm = (struct PutMessage*) &creply[1];
2668       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2669       pm->header.size = htons (msize);
2670       pm->type = htonl (prq->type);
2671       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
2672       memcpy (&pm[1], prq->data, prq->size);      
2673       if (NULL == cl->th)
2674         {
2675 #if DEBUG_FS
2676           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2677                       "Transmitting result for query `%s' to client\n",
2678                       GNUNET_h2s (key));
2679 #endif  
2680           cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
2681                                                         msize,
2682                                                         GNUNET_TIME_UNIT_FOREVER_REL,
2683                                                         &transmit_to_client,
2684                                                         cl);
2685         }
2686       GNUNET_break (cl->th != NULL);
2687       if (pr->do_remove)                
2688         {
2689           prq->finished = GNUNET_YES;
2690           destroy_pending_request (pr);         
2691         }
2692     }
2693   else
2694     {
2695       cp = pr->cp;
2696 #if DEBUG_FS
2697       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2698                   "Transmitting result for query `%s' to other peer (PID=%u)\n",
2699                   GNUNET_h2s (key),
2700                   (unsigned int) cp->pid);
2701 #endif  
2702       GNUNET_STATISTICS_update (stats,
2703                                 gettext_noop ("# replies received for other peers"),
2704                                 1,
2705                                 GNUNET_NO);
2706       msize = sizeof (struct PutMessage) + prq->size;
2707       reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
2708       reply->cont = &transmit_reply_continuation;
2709       reply->cont_cls = pr;
2710       reply->msize = msize;
2711       reply->priority = UINT32_MAX; /* send replies first! */
2712       pm = (struct PutMessage*) &reply[1];
2713       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2714       pm->header.size = htons (msize);
2715       pm->type = htonl (prq->type);
2716       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
2717       memcpy (&pm[1], prq->data, prq->size);
2718       add_to_pending_messages_for_peer (cp, reply, pr);
2719     }
2720   return GNUNET_YES;
2721 }
2722
2723
2724 /**
2725  * Continuation called to notify client about result of the
2726  * operation.
2727  *
2728  * @param cls closure
2729  * @param success GNUNET_SYSERR on failure
2730  * @param msg NULL on success, otherwise an error message
2731  */
2732 static void 
2733 put_migration_continuation (void *cls,
2734                             int success,
2735                             const char *msg)
2736 {
2737   /* FIXME */
2738 }
2739
2740
2741 /**
2742  * Handle P2P "PUT" message.
2743  *
2744  * @param cls closure, always NULL
2745  * @param other the other peer involved (sender or receiver, NULL
2746  *        for loopback messages where we are both sender and receiver)
2747  * @param message the actual message
2748  * @param latency reported latency of the connection with 'other'
2749  * @param distance reported distance (DV) to 'other' 
2750  * @return GNUNET_OK to keep the connection open,
2751  *         GNUNET_SYSERR to close it (signal serious error)
2752  */
2753 static int
2754 handle_p2p_put (void *cls,
2755                 const struct GNUNET_PeerIdentity *other,
2756                 const struct GNUNET_MessageHeader *message,
2757                 struct GNUNET_TIME_Relative latency,
2758                 uint32_t distance)
2759 {
2760   const struct PutMessage *put;
2761   uint16_t msize;
2762   size_t dsize;
2763   enum GNUNET_BLOCK_Type type;
2764   struct GNUNET_TIME_Absolute expiration;
2765   GNUNET_HashCode query;
2766   struct ProcessReplyClosure prq;
2767   const struct SBlock *sb;
2768
2769   msize = ntohs (message->size);
2770   if (msize < sizeof (struct PutMessage))
2771     {
2772       GNUNET_break_op(0);
2773       return GNUNET_SYSERR;
2774     }
2775   put = (const struct PutMessage*) message;
2776   dsize = msize - sizeof (struct PutMessage);
2777   type = ntohl (put->type);
2778   expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
2779
2780   if (GNUNET_OK !=
2781       GNUNET_BLOCK_check_block (type,
2782                                 &put[1],
2783                                 dsize,
2784                                 &query))
2785     {
2786       GNUNET_break_op (0);
2787       return GNUNET_SYSERR;
2788     }
2789   if (type == GNUNET_BLOCK_TYPE_ONDEMAND)
2790     return GNUNET_SYSERR;
2791   if (GNUNET_BLOCK_TYPE_SBLOCK == type)
2792     { 
2793       sb = (const struct SBlock*) &put[1];
2794       GNUNET_CRYPTO_hash (&sb->subspace,
2795                           sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
2796                           &prq.namespace);
2797     }
2798
2799 #if DEBUG_FS
2800   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2801               "Received result for query `%s' from peer `%4s'\n",
2802               GNUNET_h2s (&query),
2803               GNUNET_i2s (other));
2804 #endif
2805   GNUNET_STATISTICS_update (stats,
2806                             gettext_noop ("# replies received (overall)"),
2807                             1,
2808                             GNUNET_NO);
2809   /* now, lookup 'query' */
2810   prq.data = (const void*) &put[1];
2811   if (other != NULL)
2812     prq.sender = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2813                                                     &other->hashPubKey);
2814   else
2815     prq.sender = NULL;
2816   prq.size = dsize;
2817   prq.type = type;
2818   prq.expiration = expiration;
2819   prq.priority = 0;
2820   prq.finished = GNUNET_NO;
2821   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
2822                                               &query,
2823                                               &process_reply,
2824                                               &prq);
2825   if (prq.sender != NULL)
2826     {
2827       prq.sender->inc_preference += CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority;
2828       prq.sender->trust += prq.priority;
2829     }
2830   if (GNUNET_YES == active_migration)
2831     {
2832 #if DEBUG_FS
2833       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2834                   "Replicating result for query `%s' with priority %u\n",
2835                   GNUNET_h2s (&query),
2836                   prq.priority);
2837 #endif
2838       GNUNET_DATASTORE_put (dsh,
2839                             0, &query, dsize, &put[1],
2840                             type, prq.priority, 1 /* anonymity */, 
2841                             expiration, 
2842                             1 + prq.priority, MAX_DATASTORE_QUEUE,
2843                             GNUNET_CONSTANTS_SERVICE_TIMEOUT,
2844                             &put_migration_continuation, 
2845                             NULL);
2846     }
2847   return GNUNET_OK;
2848 }
2849
2850
2851 /* **************************** P2P GET Handling ************************ */
2852
2853
2854 /**
2855  * Closure for 'check_duplicate_request_{peer,client}'.
2856  */
2857 struct CheckDuplicateRequestClosure
2858 {
2859   /**
2860    * The new request we should check if it already exists.
2861    */
2862   const struct PendingRequest *pr;
2863
2864   /**
2865    * Existing request found by the checker, NULL if none.
2866    */
2867   struct PendingRequest *have;
2868 };
2869
2870
2871 /**
2872  * Iterator over entries in the 'query_request_map' that
2873  * tries to see if we have the same request pending from
2874  * the same client already.
2875  *
2876  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
2877  * @param key current key code (query, ignored, must match)
2878  * @param value value in the hash map (a 'struct PendingRequest' 
2879  *              that already exists)
2880  * @return GNUNET_YES if we should continue to
2881  *         iterate (no match yet)
2882  *         GNUNET_NO if not (match found).
2883  */
2884 static int
2885 check_duplicate_request_client (void *cls,
2886                                 const GNUNET_HashCode * key,
2887                                 void *value)
2888 {
2889   struct CheckDuplicateRequestClosure *cdc = cls;
2890   struct PendingRequest *have = value;
2891
2892   if (have->client_request_list == NULL)
2893     return GNUNET_YES;
2894   if ( (cdc->pr->client_request_list->client_list->client == have->client_request_list->client_list->client) &&
2895        (cdc->pr != have) )
2896     {
2897       cdc->have = have;
2898       return GNUNET_NO;
2899     }
2900   return GNUNET_YES;
2901 }
2902
2903
2904 /**
2905  * We're processing (local) results for a search request
2906  * from another peer.  Pass applicable results to the
2907  * peer and if we are done either clean up (operation
2908  * complete) or forward to other peers (more results possible).
2909  *
2910  * @param cls our closure (struct LocalGetContext)
2911  * @param key key for the content
2912  * @param size number of bytes in data
2913  * @param data content stored
2914  * @param type type of the content
2915  * @param priority priority of the content
2916  * @param anonymity anonymity-level for the content
2917  * @param expiration expiration time for the content
2918  * @param uid unique identifier for the datum;
2919  *        maybe 0 if no unique identifier is available
2920  */
2921 static void
2922 process_local_reply (void *cls,
2923                      const GNUNET_HashCode * key,
2924                      uint32_t size,
2925                      const void *data,
2926                      enum GNUNET_BLOCK_Type type,
2927                      uint32_t priority,
2928                      uint32_t anonymity,
2929                      struct GNUNET_TIME_Absolute
2930                      expiration, 
2931                      uint64_t uid)
2932 {
2933   struct PendingRequest *pr = cls;
2934   struct ProcessReplyClosure prq;
2935   struct CheckDuplicateRequestClosure cdrc;
2936   const struct SBlock *sb;
2937   GNUNET_HashCode dhash;
2938   GNUNET_HashCode mhash;
2939   GNUNET_HashCode query;
2940   
2941   if (NULL == key)
2942     {
2943 #if DEBUG_FS > 1
2944       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2945                   "Done processing local replies, forwarding request to other peers.\n");
2946 #endif
2947       pr->qe = NULL;
2948       if (pr->client_request_list != NULL)
2949         {
2950           GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
2951                                       GNUNET_YES);
2952           /* Figure out if this is a duplicate request and possibly
2953              merge 'struct PendingRequest' entries */
2954           cdrc.have = NULL;
2955           cdrc.pr = pr;
2956           GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
2957                                                       &pr->query,
2958                                                       &check_duplicate_request_client,
2959                                                       &cdrc);
2960           if (cdrc.have != NULL)
2961             {
2962 #if DEBUG_FS
2963               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2964                           "Received request for block `%s' twice from client, will only request once.\n",
2965                           GNUNET_h2s (&pr->query));
2966 #endif
2967               
2968               destroy_pending_request (pr);
2969               return;
2970             }
2971         }
2972
2973       /* no more results */
2974       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2975         pr->task = GNUNET_SCHEDULER_add_now (sched,
2976                                              &forward_request_task,
2977                                              pr);      
2978       return;
2979     }
2980 #if DEBUG_FS
2981   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2982               "New local response to `%s' of type %u.\n",
2983               GNUNET_h2s (key),
2984               type);
2985 #endif
2986   if (type == GNUNET_BLOCK_TYPE_ONDEMAND)
2987     {
2988 #if DEBUG_FS
2989       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2990                   "Found ONDEMAND block, performing on-demand encoding\n");
2991 #endif
2992       GNUNET_STATISTICS_update (stats,
2993                                 gettext_noop ("# on-demand blocks matched requests"),
2994                                 1,
2995                                 GNUNET_NO);
2996       if (GNUNET_OK != 
2997           GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, 
2998                                             anonymity, expiration, uid, 
2999                                             &process_local_reply,
3000                                             pr))
3001       if (pr->qe != NULL)
3002         GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3003       return;
3004     }
3005   /* check for duplicates */
3006   GNUNET_CRYPTO_hash (data, size, &dhash);
3007   mingle_hash (&dhash, 
3008                pr->mingle,
3009                &mhash);
3010   if ( (pr->bf != NULL) &&
3011        (GNUNET_YES ==
3012         GNUNET_CONTAINER_bloomfilter_test (pr->bf,
3013                                            &mhash)) )
3014     {      
3015 #if DEBUG_FS
3016       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3017                   "Result from datastore filtered by bloomfilter (duplicate).\n");
3018 #endif
3019       GNUNET_STATISTICS_update (stats,
3020                                 gettext_noop ("# results filtered by query bloomfilter"),
3021                                 1,
3022                                 GNUNET_NO);
3023       if (pr->qe != NULL)
3024         GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3025       return;
3026     }
3027 #if DEBUG_FS
3028   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3029               "Found result for query `%s' in local datastore\n",
3030               GNUNET_h2s (key));
3031 #endif
3032   GNUNET_STATISTICS_update (stats,
3033                             gettext_noop ("# results found locally"),
3034                             1,
3035                             GNUNET_NO);
3036   pr->results_found++;
3037   memset (&prq, 0, sizeof (prq));
3038   prq.data = data;
3039   prq.expiration = expiration;
3040   prq.size = size;  
3041   if (GNUNET_BLOCK_TYPE_SBLOCK == type)
3042     { 
3043       sb = (const struct SBlock*) data;
3044       GNUNET_CRYPTO_hash (&sb->subspace,
3045                           sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
3046                           &prq.namespace);
3047     }
3048   if (GNUNET_OK != GNUNET_BLOCK_check_block (type,
3049                                              data,
3050                                              size,
3051                                              &query))
3052     {
3053       GNUNET_break (0);
3054       GNUNET_DATASTORE_remove (dsh,
3055                                key,
3056                                size, data,
3057                                -1, -1, 
3058                                GNUNET_TIME_UNIT_FOREVER_REL,
3059                                NULL, NULL);
3060       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3061       return;
3062     }
3063   prq.type = type;
3064   prq.priority = priority;  
3065   prq.finished = GNUNET_NO;
3066   process_reply (&prq, key, pr);
3067   if (prq.finished == GNUNET_YES)
3068     return;
3069   if (pr->qe == NULL)
3070     return; /* done here */
3071   if ( (type == GNUNET_BLOCK_TYPE_DBLOCK) ||
3072        (type == GNUNET_BLOCK_TYPE_IBLOCK) ) 
3073     {
3074       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3075       return;
3076     }
3077   if ( (pr->client_request_list == NULL) &&
3078        ( (GNUNET_YES == test_load_too_high()) ||
3079          (pr->results_found > 5 + 2 * pr->priority) ) )
3080     {
3081 #if DEBUG_FS > 2
3082       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3083                   "Load too high, done with request\n");
3084 #endif
3085       GNUNET_STATISTICS_update (stats,
3086                                 gettext_noop ("# processing result set cut short due to load"),
3087                                 1,
3088                                 GNUNET_NO);
3089       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3090       return;
3091     }
3092   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3093 }
3094
3095
3096 /**
3097  * We've received a request with the specified priority.  Bound it
3098  * according to how much we trust the given peer.
3099  * 
3100  * @param prio_in requested priority
3101  * @param cp the peer making the request
3102  * @return effective priority
3103  */
3104 static uint32_t
3105 bound_priority (uint32_t prio_in,
3106                 struct ConnectedPeer *cp)
3107 {
3108 #define N ((double)128.0)
3109   uint32_t ret;
3110   double rret;
3111   int ld;
3112
3113   ld = test_load_too_high ();
3114   if (ld == GNUNET_SYSERR)
3115     return 0; /* excess resources */
3116   ret = change_host_trust (cp, prio_in);
3117   if (ret > 0)
3118     {
3119       if (ret > current_priorities + N)
3120         rret = current_priorities + N;
3121       else
3122         rret = ret;
3123       current_priorities 
3124         = (current_priorities * (N-1) + rret)/N;
3125     }
3126 #undef N
3127   return ret;
3128 }
3129
3130
3131 /**
3132  * Iterator over entries in the 'query_request_map' that
3133  * tries to see if we have the same request pending from
3134  * the same peer already.
3135  *
3136  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
3137  * @param key current key code (query, ignored, must match)
3138  * @param value value in the hash map (a 'struct PendingRequest' 
3139  *              that already exists)
3140  * @return GNUNET_YES if we should continue to
3141  *         iterate (no match yet)
3142  *         GNUNET_NO if not (match found).
3143  */
3144 static int
3145 check_duplicate_request_peer (void *cls,
3146                               const GNUNET_HashCode * key,
3147                               void *value)
3148 {
3149   struct CheckDuplicateRequestClosure *cdc = cls;
3150   struct PendingRequest *have = value;
3151
3152   if (cdc->pr->target_pid == have->target_pid)
3153     {
3154       cdc->have = have;
3155       return GNUNET_NO;
3156     }
3157   return GNUNET_YES;
3158 }
3159
3160
3161 /**
3162  * Handle P2P "GET" request.
3163  *
3164  * @param cls closure, always NULL
3165  * @param other the other peer involved (sender or receiver, NULL
3166  *        for loopback messages where we are both sender and receiver)
3167  * @param message the actual message
3168  * @param latency reported latency of the connection with 'other'
3169  * @param distance reported distance (DV) to 'other' 
3170  * @return GNUNET_OK to keep the connection open,
3171  *         GNUNET_SYSERR to close it (signal serious error)
3172  */
3173 static int
3174 handle_p2p_get (void *cls,
3175                 const struct GNUNET_PeerIdentity *other,
3176                 const struct GNUNET_MessageHeader *message,
3177                 struct GNUNET_TIME_Relative latency,
3178                 uint32_t distance)
3179 {
3180   struct PendingRequest *pr;
3181   struct ConnectedPeer *cp;
3182   struct ConnectedPeer *cps;
3183   struct CheckDuplicateRequestClosure cdc;
3184   struct GNUNET_TIME_Relative timeout;
3185   uint16_t msize;
3186   const struct GetMessage *gm;
3187   unsigned int bits;
3188   const GNUNET_HashCode *opt;
3189   uint32_t bm;
3190   size_t bfsize;
3191   uint32_t ttl_decrement;
3192   enum GNUNET_BLOCK_Type type;
3193   int have_ns;
3194   int ld;
3195
3196   msize = ntohs(message->size);
3197   if (msize < sizeof (struct GetMessage))
3198     {
3199       GNUNET_break_op (0);
3200       return GNUNET_SYSERR;
3201     }
3202   gm = (const struct GetMessage*) message;
3203   type = ntohl (gm->type);
3204   switch (type)
3205     {
3206     case GNUNET_BLOCK_TYPE_ANY:
3207     case GNUNET_BLOCK_TYPE_DBLOCK:
3208     case GNUNET_BLOCK_TYPE_IBLOCK:
3209     case GNUNET_BLOCK_TYPE_KBLOCK:
3210     case GNUNET_BLOCK_TYPE_SBLOCK:
3211       break;
3212     default:
3213       GNUNET_break_op (0);
3214       return GNUNET_SYSERR;
3215     }
3216   bm = ntohl (gm->hash_bitmap);
3217   bits = 0;
3218   while (bm > 0)
3219     {
3220       if (1 == (bm & 1))
3221         bits++;
3222       bm >>= 1;
3223     }
3224   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
3225     {
3226       GNUNET_break_op (0);
3227       return GNUNET_SYSERR;
3228     }  
3229   opt = (const GNUNET_HashCode*) &gm[1];
3230   bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
3231   bm = ntohl (gm->hash_bitmap);
3232   if ( (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) &&
3233        (type != GNUNET_BLOCK_TYPE_SBLOCK) )
3234     {
3235       GNUNET_break_op (0);
3236       return GNUNET_SYSERR;      
3237     }
3238   bits = 0;
3239   cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3240                                            &other->hashPubKey);
3241   if (NULL == cps)
3242     {
3243       /* peer must have just disconnected */
3244       GNUNET_STATISTICS_update (stats,
3245                                 gettext_noop ("# requests dropped due to initiator not being connected"),
3246                                 1,
3247                                 GNUNET_NO);
3248       return GNUNET_SYSERR;
3249     }
3250   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
3251     cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3252                                             &opt[bits++]);
3253   else
3254     cp = cps;
3255   if (cp == NULL)
3256     {
3257 #if DEBUG_FS
3258       if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
3259         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3260                     "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
3261                     GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
3262       
3263       else
3264         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3265                     "Failed to find peer `%4s' in connection set. Dropping query.\n",
3266                     GNUNET_i2s (other));
3267 #endif
3268       GNUNET_STATISTICS_update (stats,
3269                                 gettext_noop ("# requests dropped due to missing reverse route"),
3270                                 1,
3271                                 GNUNET_NO);
3272      /* FIXME: try connect? */
3273       return GNUNET_OK;
3274     }
3275   /* note that we can really only check load here since otherwise
3276      peers could find out that we are overloaded by not being
3277      disconnected after sending us a malformed query... */
3278
3279   /* FIXME: query priority should play
3280      a major role here! */
3281   ld = test_load_too_high ();
3282   if (GNUNET_YES == ld)
3283     {
3284 #if DEBUG_FS
3285       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3286                   "Dropping query from `%s', this peer is too busy.\n",
3287                   GNUNET_i2s (other));
3288 #endif
3289       GNUNET_STATISTICS_update (stats,
3290                                 gettext_noop ("# requests dropped due to high load"),
3291                                 1,
3292                                 GNUNET_NO);
3293       return GNUNET_OK;
3294     }
3295   /* FIXME: if ld == GNUNET_NO, forward
3296      instead of indirecting! */
3297
3298 #if DEBUG_FS 
3299   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3300               "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
3301               GNUNET_h2s (&gm->query),
3302               (unsigned int) type,
3303               GNUNET_i2s (other),
3304               (unsigned int) bm);
3305 #endif
3306   have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
3307   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
3308                       (have_ns ? sizeof(GNUNET_HashCode) : 0));
3309   if (have_ns)
3310     {
3311       pr->namespace = (GNUNET_HashCode*) &pr[1];
3312       memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
3313     }
3314   pr->type = type;
3315   pr->mingle = ntohl (gm->filter_mutator);
3316   if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
3317     pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
3318
3319   pr->anonymity_level = 1;
3320   pr->priority = bound_priority (ntohl (gm->priority), cps);
3321   pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
3322   pr->query = gm->query;
3323   /* decrement ttl (always) */
3324   ttl_decrement = 2 * TTL_DECREMENT +
3325     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3326                               TTL_DECREMENT);
3327   if ( (pr->ttl < 0) &&
3328        (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
3329     {
3330 #if DEBUG_FS
3331       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3332                   "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
3333                   GNUNET_i2s (other),
3334                   pr->ttl,
3335                   ttl_decrement);
3336 #endif
3337       GNUNET_STATISTICS_update (stats,
3338                                 gettext_noop ("# requests dropped due TTL underflow"),
3339                                 1,
3340                                 GNUNET_NO);
3341       /* integer underflow => drop (should be very rare)! */      
3342       GNUNET_free (pr);
3343       return GNUNET_OK;
3344     } 
3345   pr->ttl -= ttl_decrement;
3346   pr->start_time = GNUNET_TIME_absolute_get ();
3347
3348   /* get bloom filter */
3349   if (bfsize > 0)
3350     {
3351       pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
3352                                                   bfsize,
3353                                                   BLOOMFILTER_K);
3354       pr->bf_size = bfsize;
3355     }
3356
3357   cdc.have = NULL;
3358   cdc.pr = pr;
3359   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3360                                               &gm->query,
3361                                               &check_duplicate_request_peer,
3362                                               &cdc);
3363   if (cdc.have != NULL)
3364     {
3365       if (cdc.have->start_time.value + cdc.have->ttl >=
3366           pr->start_time.value + pr->ttl)
3367         {
3368           /* existing request has higher TTL, drop new one! */
3369           cdc.have->priority += pr->priority;
3370           destroy_pending_request (pr);
3371 #if DEBUG_FS
3372           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3373                       "Have existing request with higher TTL, dropping new request.\n",
3374                       GNUNET_i2s (other));
3375 #endif
3376           GNUNET_STATISTICS_update (stats,
3377                                     gettext_noop ("# requests dropped due to higher-TTL request"),
3378                                     1,
3379                                     GNUNET_NO);
3380           return GNUNET_OK;
3381         }
3382       else
3383         {
3384           /* existing request has lower TTL, drop old one! */
3385           pr->priority += cdc.have->priority;
3386           /* Possible optimization: if we have applicable pending
3387              replies in 'cdc.have', we might want to move those over
3388              (this is a really rare special-case, so it is not clear
3389              that this would be worth it) */
3390           destroy_pending_request (cdc.have);
3391           /* keep processing 'pr'! */
3392         }
3393     }
3394
3395   pr->cp = cp;
3396   GNUNET_break (GNUNET_OK ==
3397                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
3398                                                    &gm->query,
3399                                                    pr,
3400                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
3401   GNUNET_break (GNUNET_OK ==
3402                 GNUNET_CONTAINER_multihashmap_put (peer_request_map,
3403                                                    &other->hashPubKey,
3404                                                    pr,
3405                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
3406   
3407   pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
3408                                             pr,
3409                                             pr->start_time.value + pr->ttl);
3410
3411   GNUNET_STATISTICS_update (stats,
3412                             gettext_noop ("# P2P searches received"),
3413                             1,
3414                             GNUNET_NO);
3415   GNUNET_STATISTICS_update (stats,
3416                             gettext_noop ("# P2P searches active"),
3417                             1,
3418                             GNUNET_NO);
3419
3420   /* calculate change in traffic preference */
3421   cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
3422   /* process locally */
3423   if (type == GNUNET_BLOCK_TYPE_DBLOCK)
3424     type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
3425   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
3426                                            (pr->priority + 1)); 
3427   pr->qe = GNUNET_DATASTORE_get (dsh,
3428                                  &gm->query,
3429                                  type,                         
3430                                  pr->priority + 1,
3431                                  MAX_DATASTORE_QUEUE,                            
3432                                  timeout,
3433                                  &process_local_reply,
3434                                  pr);
3435
3436   /* Are multiple results possible?  If so, start processing remotely now! */
3437   switch (pr->type)
3438     {
3439     case GNUNET_BLOCK_TYPE_DBLOCK:
3440     case GNUNET_BLOCK_TYPE_IBLOCK:
3441       /* only one result, wait for datastore */
3442       break;
3443     default:
3444       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
3445         pr->task = GNUNET_SCHEDULER_add_now (sched,
3446                                              &forward_request_task,
3447                                              pr);
3448     }
3449
3450   /* make sure we don't track too many requests */
3451   if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
3452     {
3453       pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
3454       destroy_pending_request (pr);
3455     }
3456   return GNUNET_OK;
3457 }
3458
3459
3460 /* **************************** CS GET Handling ************************ */
3461
3462
3463 /**
3464  * Handle START_SEARCH-message (search request from client).
3465  *
3466  * @param cls closure
3467  * @param client identification of the client
3468  * @param message the actual message
3469  */
3470 static void
3471 handle_start_search (void *cls,
3472                      struct GNUNET_SERVER_Client *client,
3473                      const struct GNUNET_MessageHeader *message)
3474 {
3475   static GNUNET_HashCode all_zeros;
3476   const struct SearchMessage *sm;
3477   struct ClientList *cl;
3478   struct ClientRequestList *crl;
3479   struct PendingRequest *pr;
3480   uint16_t msize;
3481   unsigned int sc;
3482   enum GNUNET_BLOCK_Type type;
3483
3484   msize = ntohs (message->size);
3485   if ( (msize < sizeof (struct SearchMessage)) ||
3486        (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
3487     {
3488       GNUNET_break (0);
3489       GNUNET_SERVER_receive_done (client,
3490                                   GNUNET_SYSERR);
3491       return;
3492     }
3493   GNUNET_STATISTICS_update (stats,
3494                             gettext_noop ("# client searches received"),
3495                             1,
3496                             GNUNET_NO);
3497   sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
3498   sm = (const struct SearchMessage*) message;
3499   type = ntohl (sm->type);
3500 #if DEBUG_FS
3501   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3502               "Received request for `%s' of type %u from local client\n",
3503               GNUNET_h2s (&sm->query),
3504               (unsigned int) type);
3505 #endif
3506   switch (type)
3507     {
3508     case GNUNET_BLOCK_TYPE_ANY:
3509     case GNUNET_BLOCK_TYPE_DBLOCK:
3510     case GNUNET_BLOCK_TYPE_IBLOCK:
3511     case GNUNET_BLOCK_TYPE_KBLOCK:
3512     case GNUNET_BLOCK_TYPE_SBLOCK:
3513     case GNUNET_BLOCK_TYPE_NBLOCK:
3514       break;
3515     default:
3516       GNUNET_break (0);
3517       GNUNET_SERVER_receive_done (client,
3518                                   GNUNET_SYSERR);
3519       return;
3520     }  
3521
3522   cl = client_list;
3523   while ( (cl != NULL) &&
3524           (cl->client != client) )
3525     cl = cl->next;
3526   if (cl == NULL)
3527     {
3528       cl = GNUNET_malloc (sizeof (struct ClientList));
3529       cl->client = client;
3530       GNUNET_SERVER_client_keep (client);
3531       cl->next = client_list;
3532       client_list = cl;
3533     }
3534   /* detect duplicate KBLOCK requests */
3535   if ( (type == GNUNET_BLOCK_TYPE_KBLOCK) ||
3536        (type == GNUNET_BLOCK_TYPE_NBLOCK) ||
3537        (type == GNUNET_BLOCK_TYPE_ANY) )
3538     {
3539       crl = cl->rl_head;
3540       while ( (crl != NULL) &&
3541               ( (0 != memcmp (&crl->req->query,
3542                               &sm->query,
3543                               sizeof (GNUNET_HashCode))) ||
3544                 (crl->req->type != type) ) )
3545         crl = crl->next;
3546       if (crl != NULL)  
3547         { 
3548 #if DEBUG_FS
3549           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3550                       "Have existing request, merging content-seen lists.\n");
3551 #endif
3552           pr = crl->req;
3553           /* Duplicate request (used to send long list of
3554              known/blocked results); merge 'pr->replies_seen'
3555              and update bloom filter */
3556           GNUNET_array_grow (pr->replies_seen,
3557                              pr->replies_seen_size,
3558                              pr->replies_seen_off + sc);
3559           memcpy (&pr->replies_seen[pr->replies_seen_off],
3560                   &sm[1],
3561                   sc * sizeof (GNUNET_HashCode));
3562           pr->replies_seen_off += sc;
3563           refresh_bloomfilter (pr);
3564           GNUNET_STATISTICS_update (stats,
3565                                     gettext_noop ("# client searches updated (merged content seen list)"),
3566                                     1,
3567                                     GNUNET_NO);
3568           GNUNET_SERVER_receive_done (client,
3569                                       GNUNET_OK);
3570           return;
3571         }
3572     }
3573   GNUNET_STATISTICS_update (stats,
3574                             gettext_noop ("# client searches active"),
3575                             1,
3576                             GNUNET_NO);
3577   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
3578                       ((type == GNUNET_BLOCK_TYPE_SBLOCK) ? sizeof(GNUNET_HashCode) : 0));
3579   crl = GNUNET_malloc (sizeof (struct ClientRequestList));
3580   memset (crl, 0, sizeof (struct ClientRequestList));
3581   crl->client_list = cl;
3582   GNUNET_CONTAINER_DLL_insert (cl->rl_head,
3583                                cl->rl_tail,
3584                                crl);  
3585   crl->req = pr;
3586   pr->type = type;
3587   pr->client_request_list = crl;
3588   GNUNET_array_grow (pr->replies_seen,
3589                      pr->replies_seen_size,
3590                      sc);
3591   memcpy (pr->replies_seen,
3592           &sm[1],
3593           sc * sizeof (GNUNET_HashCode));
3594   pr->replies_seen_off = sc;
3595   pr->anonymity_level = ntohl (sm->anonymity_level); 
3596   refresh_bloomfilter (pr);
3597   pr->query = sm->query;
3598   if (0 == (1 & ntohl (sm->options)))
3599     pr->local_only = GNUNET_NO;
3600   else
3601     pr->local_only = GNUNET_YES;
3602   switch (type)
3603     {
3604     case GNUNET_BLOCK_TYPE_DBLOCK:
3605     case GNUNET_BLOCK_TYPE_IBLOCK:
3606       if (0 != memcmp (&sm->target,
3607                        &all_zeros,
3608                        sizeof (GNUNET_HashCode)))
3609         pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target);
3610       break;
3611     case GNUNET_BLOCK_TYPE_SBLOCK:
3612       pr->namespace = (GNUNET_HashCode*) &pr[1];
3613       memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode));
3614       break;
3615     default:
3616       break;
3617     }
3618   GNUNET_break (GNUNET_OK ==
3619                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
3620                                                    &sm->query,
3621                                                    pr,
3622                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
3623   if (type == GNUNET_BLOCK_TYPE_DBLOCK)
3624     type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */
3625   pr->qe = GNUNET_DATASTORE_get (dsh,
3626                                  &sm->query,
3627                                  type,
3628                                  -3, -1,
3629                                  GNUNET_CONSTANTS_SERVICE_TIMEOUT,                             
3630                                  &process_local_reply,
3631                                  pr);
3632 }
3633
3634
3635 /* **************************** Startup ************************ */
3636
3637 /**
3638  * Process fs requests.
3639  *
3640  * @param s scheduler to use
3641  * @param server the initialized server
3642  * @param c configuration to use
3643  */
3644 static int
3645 main_init (struct GNUNET_SCHEDULER_Handle *s,
3646            struct GNUNET_SERVER_Handle *server,
3647            const struct GNUNET_CONFIGURATION_Handle *c)
3648 {
3649   static const struct GNUNET_CORE_MessageHandler p2p_handlers[] =
3650     {
3651       { &handle_p2p_get, 
3652         GNUNET_MESSAGE_TYPE_FS_GET, 0 },
3653       { &handle_p2p_put, 
3654         GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
3655       { NULL, 0, 0 }
3656     };
3657   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
3658     {&GNUNET_FS_handle_index_start, NULL, 
3659      GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
3660     {&GNUNET_FS_handle_index_list_get, NULL, 
3661      GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
3662     {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
3663      sizeof (struct UnindexMessage) },
3664     {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
3665      0 },
3666     {NULL, NULL, 0, 0}
3667   };
3668   unsigned long long enc = 128;
3669
3670   sched = s;
3671   cfg = c;
3672   stats = GNUNET_STATISTICS_create (sched, "fs", cfg);
3673   min_migration_delay = GNUNET_TIME_UNIT_SECONDS;
3674   if ( (GNUNET_OK !=
3675         GNUNET_CONFIGURATION_get_value_number (cfg,
3676                                                "fs",
3677                                                "MAX_PENDING_REQUESTS",
3678                                                &max_pending_requests)) ||
3679        (GNUNET_OK !=
3680         GNUNET_CONFIGURATION_get_value_number (cfg,
3681                                                "fs",
3682                                                "EXPECTED_NEIGHBOUR_COUNT",
3683                                                &enc)) ||
3684        (GNUNET_OK != 
3685         GNUNET_CONFIGURATION_get_value_time (cfg,
3686                                              "fs",
3687                                              "MIN_MIGRATION_DELAY",
3688                                              &min_migration_delay)) )
3689     {
3690       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3691                   _("Configuration fails to specify certain parameters, assuming default values."));
3692     }
3693   connected_peers = GNUNET_CONTAINER_multihashmap_create (enc); 
3694   query_request_map = GNUNET_CONTAINER_multihashmap_create (max_pending_requests);
3695   peer_request_map = GNUNET_CONTAINER_multihashmap_create (enc);
3696   requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
3697   core = GNUNET_CORE_connect (sched,
3698                               cfg,
3699                               GNUNET_TIME_UNIT_FOREVER_REL,
3700                               NULL,
3701                               NULL,
3702                               &peer_connect_handler,
3703                               &peer_disconnect_handler,
3704                               NULL,
3705                               NULL, GNUNET_NO,
3706                               NULL, GNUNET_NO,
3707                               p2p_handlers);
3708   if (NULL == core)
3709     {
3710       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3711                   _("Failed to connect to `%s' service.\n"),
3712                   "core");
3713       GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
3714       connected_peers = NULL;
3715       GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
3716       query_request_map = NULL;
3717       GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
3718       requests_by_expiration_heap = NULL;
3719       GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
3720       peer_request_map = NULL;
3721       if (dsh != NULL)
3722         {
3723           GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
3724           dsh = NULL;
3725         }
3726       return GNUNET_SYSERR;
3727     }
3728   /* FIXME: distinguish between sending and storing in options? */
3729   if (active_migration) 
3730     {
3731       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3732                   _("Content migration is enabled, will start to gather data\n"));
3733       consider_migration_gathering ();
3734     }
3735   GNUNET_SERVER_disconnect_notify (server, 
3736                                    &handle_client_disconnect,
3737                                    NULL);
3738   GNUNET_assert (GNUNET_OK ==
3739                  GNUNET_CONFIGURATION_get_value_filename (cfg,
3740                                                           "fs",
3741                                                           "TRUST",
3742                                                           &trustDirectory));
3743   GNUNET_DISK_directory_create (trustDirectory);
3744   GNUNET_SCHEDULER_add_with_priority (sched,
3745                                       GNUNET_SCHEDULER_PRIORITY_HIGH,
3746                                       &cron_flush_trust, NULL);
3747
3748
3749   GNUNET_SERVER_add_handlers (server, handlers);
3750   GNUNET_SCHEDULER_add_delayed (sched,
3751                                 GNUNET_TIME_UNIT_FOREVER_REL,
3752                                 &shutdown_task,
3753                                 NULL);
3754   return GNUNET_OK;
3755 }
3756
3757
3758 /**
3759  * Process fs requests.
3760  *
3761  * @param cls closure
3762  * @param sched scheduler to use
3763  * @param server the initialized server
3764  * @param cfg configuration to use
3765  */
3766 static void
3767 run (void *cls,
3768      struct GNUNET_SCHEDULER_Handle *sched,
3769      struct GNUNET_SERVER_Handle *server,
3770      const struct GNUNET_CONFIGURATION_Handle *cfg)
3771 {
3772   active_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
3773                                                            "FS",
3774                                                            "ACTIVEMIGRATION");
3775   dsh = GNUNET_DATASTORE_connect (cfg,
3776                                   sched);
3777   if (dsh == NULL)
3778     {
3779       GNUNET_SCHEDULER_shutdown (sched);
3780       return;
3781     }
3782   if ( (GNUNET_OK != GNUNET_FS_indexing_init (sched, cfg, dsh)) ||
3783        (GNUNET_OK != main_init (sched, server, cfg)) )
3784     {    
3785       GNUNET_SCHEDULER_shutdown (sched);
3786       GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
3787       dsh = NULL;
3788       return;   
3789     }
3790 }
3791
3792
3793 /**
3794  * The main function for the fs service.
3795  *
3796  * @param argc number of arguments from the command line
3797  * @param argv command line arguments
3798  * @return 0 ok, 1 on error
3799  */
3800 int
3801 main (int argc, char *const *argv)
3802 {
3803   return (GNUNET_OK ==
3804           GNUNET_SERVICE_run (argc,
3805                               argv,
3806                               "fs",
3807                               GNUNET_SERVICE_OPTION_NONE,
3808                               &run, NULL)) ? 0 : 1;
3809 }
3810
3811 /* end of gnunet-service-fs.c */