a0e275f2de6c35df8e5b2503f8afc929ec0dfddd
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs.c
23  * @brief gnunet anonymity protocol implementation
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - implement test_load_too_high, make decision priority-based, implement forwarding, etc.
28  * - consider more precise latency estimation (per-peer & request)
29  * - introduce random latency in processing
30  * - tell other peers to stop migration if our PUTs fail (or if
31  *   we don't support migration per configuration?)
32  * - more statistics
33  */
34 #include "platform.h"
35 #include <float.h>
36 #include "gnunet_constants.h"
37 #include "gnunet_core_service.h"
38 #include "gnunet_datastore_service.h"
39 #include "gnunet_peer_lib.h"
40 #include "gnunet_protocols.h"
41 #include "gnunet_signatures.h"
42 #include "gnunet_statistics_service.h"
43 #include "gnunet_util_lib.h"
44 #include "gnunet-service-fs_indexing.h"
45 #include "fs.h"
46
47 #define DEBUG_FS GNUNET_NO
48
49 /**
50  * Maximum number of outgoing messages we queue per peer.
51  */
52 #define MAX_QUEUE_PER_PEER 16
53
54 /**
55  * How often do we flush trust values to disk?
56  */
57 #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
58
59 /**
60  * Inverse of the probability that we will submit the same query
61  * to the same peer again.  If the same peer already got the query
62  * repeatedly recently, the probability is multiplied by the inverse
63  * of this number each time.  Note that we only try about every TTL_DECREMENT/2
64  * plus MAX_CORK_DELAY (so roughly every 3.5s).
65  */
66 #define RETRY_PROBABILITY_INV 3
67
68 /**
69  * What is the maximum delay for a P2P FS message (in our interaction
70  * with core)?  FS-internal delays are another story.  The value is
71  * chosen based on the 32k block size.  Given that peers typcially
72  * have at least 1 kb/s bandwidth, 45s waits give us a chance to
73  * transmit one message even to the lowest-bandwidth peers.
74  */
75 #define MAX_TRANSMIT_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 45)
76
77 /**
78  * Maximum number of requests (from other peers) that we're
79  * willing to have pending at any given point in time.
80  */
81 static unsigned long long max_pending_requests = (32 * 1024);
82
83
84 /**
85  * Information we keep for each pending reply.  The
86  * actual message follows at the end of this struct.
87  */
88 struct PendingMessage;
89
90 /**
91  * Function called upon completion of a transmission.
92  *
93  * @param cls closure
94  * @param pid ID of receiving peer, 0 on transmission error
95  */
96 typedef void (*TransmissionContinuation)(void * cls, 
97                                          GNUNET_PEER_Id tpid);
98
99
100 /**
101  * Information we keep for each pending message (GET/PUT).  The
102  * actual message follows at the end of this struct.
103  */
104 struct PendingMessage
105 {
106   /**
107    * This is a doubly-linked list of messages to the same peer.
108    */
109   struct PendingMessage *next;
110
111   /**
112    * This is a doubly-linked list of messages to the same peer.
113    */
114   struct PendingMessage *prev;
115
116   /**
117    * Entry in pending message list for this pending message.
118    */ 
119   struct PendingMessageList *pml;  
120
121   /**
122    * Function to call immediately once we have transmitted this
123    * message.
124    */
125   TransmissionContinuation cont;
126
127   /**
128    * Closure for cont.
129    */
130   void *cont_cls;
131
132   /**
133    * Size of the reply; actual reply message follows
134    * at the end of this struct.
135    */
136   size_t msize;
137   
138   /**
139    * How important is this message for us?
140    */
141   uint32_t priority;
142  
143 };
144
145
146 /**
147  * Information about a peer that we are connected to.
148  * We track data that is useful for determining which
149  * peers should receive our requests.  We also keep
150  * a list of messages to transmit to this peer.
151  */
152 struct ConnectedPeer
153 {
154
155   /**
156    * List of the last clients for which this peer successfully
157    * answered a query.
158    */
159   struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
160
161   /**
162    * List of the last PIDs for which
163    * this peer successfully answered a query;
164    * We use 0 to indicate no successful reply.
165    */
166   GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
167
168   /**
169    * Average delay between sending the peer a request and
170    * getting a reply (only calculated over the requests for
171    * which we actually got a reply).   Calculated
172    * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
173    */ 
174   struct GNUNET_TIME_Relative avg_delay;
175
176   /**
177    * Handle for an active request for transmission to this
178    * peer, or NULL.
179    */
180   struct GNUNET_CORE_TransmitHandle *cth;
181
182   /**
183    * Messages (replies, queries, content migration) we would like to
184    * send to this peer in the near future.  Sorted by priority, head.
185    */
186   struct PendingMessage *pending_messages_head;
187
188   /**
189    * Messages (replies, queries, content migration) we would like to
190    * send to this peer in the near future.  Sorted by priority, tail.
191    */
192   struct PendingMessage *pending_messages_tail;
193
194   /**
195    * Average priority of successful replies.  Calculated
196    * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
197    */
198   double avg_priority;
199
200   /**
201    * Increase in traffic preference still to be submitted
202    * to the core service for this peer.
203    */
204   uint64_t inc_preference;
205
206   /**
207    * Trust rating for this peer
208    */
209   uint32_t trust;
210
211   /**
212    * Trust rating for this peer on disk.
213    */
214   uint32_t disk_trust;
215
216   /**
217    * The peer's identity.
218    */
219   GNUNET_PEER_Id pid;  
220
221   /**
222    * Size of the linked list of 'pending_messages'.
223    */
224   unsigned int pending_requests;
225
226   /**
227    * Which offset in "last_p2p_replies" will be updated next?
228    * (we go round-robin).
229    */
230   unsigned int last_p2p_replies_woff;
231
232   /**
233    * Which offset in "last_client_replies" will be updated next?
234    * (we go round-robin).
235    */
236   unsigned int last_client_replies_woff;
237
238 };
239
240
241 /**
242  * Information we keep for each pending request.  We should try to
243  * keep this struct as small as possible since its memory consumption
244  * is key to how many requests we can have pending at once.
245  */
246 struct PendingRequest;
247
248
249 /**
250  * Doubly-linked list of requests we are performing
251  * on behalf of the same client.
252  */
253 struct ClientRequestList
254 {
255
256   /**
257    * This is a doubly-linked list.
258    */
259   struct ClientRequestList *next;
260
261   /**
262    * This is a doubly-linked list.
263    */
264   struct ClientRequestList *prev;
265
266   /**
267    * Request this entry represents.
268    */
269   struct PendingRequest *req;
270
271   /**
272    * Client list this request belongs to.
273    */
274   struct ClientList *client_list;
275
276 };
277
278
279 /**
280  * Replies to be transmitted to the client.  The actual
281  * response message is allocated after this struct.
282  */
283 struct ClientResponseMessage
284 {
285   /**
286    * This is a doubly-linked list.
287    */
288   struct ClientResponseMessage *next;
289
290   /**
291    * This is a doubly-linked list.
292    */
293   struct ClientResponseMessage *prev;
294
295   /**
296    * Client list entry this response belongs to.
297    */
298   struct ClientList *client_list;
299
300   /**
301    * Number of bytes in the response.
302    */
303   size_t msize;
304 };
305
306
307 /**
308  * Linked list of clients we are performing requests
309  * for right now.
310  */
311 struct ClientList
312 {
313   /**
314    * This is a linked list.
315    */
316   struct ClientList *next;
317
318   /**
319    * ID of a client making a request, NULL if this entry is for a
320    * peer.
321    */
322   struct GNUNET_SERVER_Client *client;
323
324   /**
325    * Head of list of requests performed on behalf
326    * of this client right now.
327    */
328   struct ClientRequestList *rl_head;
329
330   /**
331    * Tail of list of requests performed on behalf
332    * of this client right now.
333    */
334   struct ClientRequestList *rl_tail;
335
336   /**
337    * Head of linked list of responses.
338    */
339   struct ClientResponseMessage *res_head;
340
341   /**
342    * Tail of linked list of responses.
343    */
344   struct ClientResponseMessage *res_tail;
345
346   /**
347    * Context for sending replies.
348    */
349   struct GNUNET_CONNECTION_TransmitHandle *th;
350
351 };
352
353
354 /**
355  * Doubly-linked list of messages we are performing
356  * due to a pending request.
357  */
358 struct PendingMessageList
359 {
360
361   /**
362    * This is a doubly-linked list of messages on behalf of the same request.
363    */
364   struct PendingMessageList *next;
365
366   /**
367    * This is a doubly-linked list of messages on behalf of the same request.
368    */
369   struct PendingMessageList *prev;
370
371   /**
372    * Message this entry represents.
373    */
374   struct PendingMessage *pm;
375
376   /**
377    * Request this entry belongs to.
378    */
379   struct PendingRequest *req;
380
381   /**
382    * Peer this message is targeted for.
383    */
384   struct ConnectedPeer *target;
385
386 };
387
388
389 /**
390  * Information we keep for each pending request.  We should try to
391  * keep this struct as small as possible since its memory consumption
392  * is key to how many requests we can have pending at once.
393  */
394 struct PendingRequest
395 {
396
397   /**
398    * If this request was made by a client, this is our entry in the
399    * client request list; otherwise NULL.
400    */
401   struct ClientRequestList *client_request_list;
402
403   /**
404    * Entry of peer responsible for this entry (if this request
405    * was made by a peer).
406    */
407   struct ConnectedPeer *cp;
408
409   /**
410    * If this is a namespace query, pointer to the hash of the public
411    * key of the namespace; otherwise NULL.  Pointer will be to the 
412    * end of this struct (so no need to free it).
413    */
414   const GNUNET_HashCode *namespace;
415
416   /**
417    * Bloomfilter we use to filter out replies that we don't care about
418    * (anymore).  NULL as long as we are interested in all replies.
419    */
420   struct GNUNET_CONTAINER_BloomFilter *bf;
421
422   /**
423    * Context of our GNUNET_CORE_peer_change_preference call.
424    */
425   struct GNUNET_CORE_InformationRequestContext *irc;
426
427   /**
428    * Hash code of all replies that we have seen so far (only valid
429    * if client is not NULL since we only track replies like this for
430    * our own clients).
431    */
432   GNUNET_HashCode *replies_seen;
433
434   /**
435    * Node in the heap representing this entry; NULL
436    * if we have no heap node.
437    */
438   struct GNUNET_CONTAINER_HeapNode *hnode;
439
440   /**
441    * Head of list of messages being performed on behalf of this
442    * request.
443    */
444   struct PendingMessageList *pending_head;
445
446   /**
447    * Tail of list of messages being performed on behalf of this
448    * request.
449    */
450   struct PendingMessageList *pending_tail;
451
452   /**
453    * When did we first see this request (form this peer), or, if our
454    * client is initiating, when did we last initiate a search?
455    */
456   struct GNUNET_TIME_Absolute start_time;
457
458   /**
459    * The query that this request is for.
460    */
461   GNUNET_HashCode query;
462
463   /**
464    * The task responsible for transmitting queries
465    * for this request.
466    */
467   GNUNET_SCHEDULER_TaskIdentifier task;
468
469   /**
470    * (Interned) Peer identifier that identifies a preferred target
471    * for requests.
472    */
473   GNUNET_PEER_Id target_pid;
474
475   /**
476    * (Interned) Peer identifiers of peers that have already
477    * received our query for this content.
478    */
479   GNUNET_PEER_Id *used_pids;
480   
481   /**
482    * Our entry in the queue (non-NULL while we wait for our
483    * turn to interact with the local database).
484    */
485   struct GNUNET_DATASTORE_QueueEntry *qe;
486
487   /**
488    * Size of the 'bf' (in bytes).
489    */
490   size_t bf_size;
491
492   /**
493    * Desired anonymity level; only valid for requests from a local client.
494    */
495   uint32_t anonymity_level;
496
497   /**
498    * How many entries in "used_pids" are actually valid?
499    */
500   unsigned int used_pids_off;
501
502   /**
503    * How long is the "used_pids" array?
504    */
505   unsigned int used_pids_size;
506
507   /**
508    * Number of results found for this request.
509    */
510   unsigned int results_found;
511
512   /**
513    * How many entries in "replies_seen" are actually valid?
514    */
515   unsigned int replies_seen_off;
516
517   /**
518    * How long is the "replies_seen" array?
519    */
520   unsigned int replies_seen_size;
521   
522   /**
523    * Priority with which this request was made.  If one of our clients
524    * made the request, then this is the current priority that we are
525    * using when initiating the request.  This value is used when
526    * we decide to reward other peers with trust for providing a reply.
527    */
528   uint32_t priority;
529
530   /**
531    * Priority points left for us to spend when forwarding this request
532    * to other peers.
533    */
534   uint32_t remaining_priority;
535
536   /**
537    * Number to mingle hashes for bloom-filter tests with.
538    */
539   int32_t mingle;
540
541   /**
542    * TTL with which we saw this request (or, if we initiated, TTL that
543    * we used for the request).
544    */
545   int32_t ttl;
546   
547   /**
548    * Type of the content that this request is for.
549    */
550   enum GNUNET_BLOCK_Type type;
551
552   /**
553    * Remove this request after transmission of the current response.
554    */
555   int16_t do_remove;
556
557   /**
558    * GNUNET_YES if we should not forward this request to other peers.
559    */
560   int16_t local_only;
561
562 };
563
564
565 /**
566  * Block that is ready for migration to other peers.  Actual data is at the end of the block.
567  */
568 struct MigrationReadyBlock
569 {
570
571   /**
572    * This is a doubly-linked list.
573    */
574   struct MigrationReadyBlock *next;
575
576   /**
577    * This is a doubly-linked list.
578    */
579   struct MigrationReadyBlock *prev;
580
581   /**
582    * Query for the block.
583    */
584   GNUNET_HashCode query;
585
586   /**
587    * When does this block expire? 
588    */
589   struct GNUNET_TIME_Absolute expiration;
590
591   /**
592    * Peers we would consider forwarding this
593    * block to.  Zero for empty entries.
594    */
595   GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
596
597   /**
598    * Size of the block.
599    */
600   size_t size;
601
602   /**
603    *  Number of targets already used.
604    */
605   unsigned int used_targets;
606
607   /**
608    * Type of the block.
609    */
610   enum GNUNET_BLOCK_Type type;
611 };
612
613
614 /**
615  * Our connection to the datastore.
616  */
617 static struct GNUNET_DATASTORE_Handle *dsh;
618
619 /**
620  * Our block context.
621  */
622 static struct GNUNET_BLOCK_Context *block_ctx;
623
624 /**
625  * Our block configuration.
626  */
627 static struct GNUNET_CONFIGURATION_Handle *block_cfg;
628
629 /**
630  * Our scheduler.
631  */
632 static struct GNUNET_SCHEDULER_Handle *sched;
633
634 /**
635  * Our configuration.
636  */
637 static const struct GNUNET_CONFIGURATION_Handle *cfg;
638
639 /**
640  * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
641  */
642 static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
643
644 /**
645  * Map of peer identifiers to "struct PendingRequest" (for that peer).
646  */
647 static struct GNUNET_CONTAINER_MultiHashMap *peer_request_map;
648
649 /**
650  * Map of query identifiers to "struct PendingRequest" (for that query).
651  */
652 static struct GNUNET_CONTAINER_MultiHashMap *query_request_map;
653
654 /**
655  * Heap with the request that will expire next at the top.  Contains
656  * pointers of type "struct PendingRequest*"; these will *also* be
657  * aliased from the "requests_by_peer" data structures and the
658  * "requests_by_query" table.  Note that requests from our clients
659  * don't expire and are thus NOT in the "requests_by_expiration"
660  * (or the "requests_by_peer" tables).
661  */
662 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
663
664 /**
665  * Handle for reporting statistics.
666  */
667 static struct GNUNET_STATISTICS_Handle *stats;
668
669 /**
670  * Linked list of clients we are currently processing requests for.
671  */
672 static struct ClientList *client_list;
673
674 /**
675  * Pointer to handle to the core service (points to NULL until we've
676  * connected to it).
677  */
678 static struct GNUNET_CORE_Handle *core;
679
680 /**
681  * Head of linked list of blocks that can be migrated.
682  */
683 static struct MigrationReadyBlock *mig_head;
684
685 /**
686  * Tail of linked list of blocks that can be migrated.
687  */
688 static struct MigrationReadyBlock *mig_tail;
689
690 /**
691  * Request to datastore for migration (or NULL).
692  */
693 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
694
695 /**
696  * Where do we store trust information?
697  */
698 static char *trustDirectory;
699
700 /**
701  * ID of task that collects blocks for migration.
702  */
703 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
704
705 /**
706  * What is the maximum frequency at which we are allowed to
707  * poll the datastore for migration content?
708  */
709 static struct GNUNET_TIME_Relative min_migration_delay;
710
711 /**
712  * Size of the doubly-linked list of migration blocks.
713  */
714 static unsigned int mig_size;
715
716 /**
717  * Are we allowed to migrate content to this peer.
718  */
719 static int active_migration;
720
721 /**
722  * Typical priorities we're seeing from other peers right now.  Since
723  * most priorities will be zero, this value is the weighted average of
724  * non-zero priorities seen "recently".  In order to ensure that new
725  * values do not dramatically change the ratio, values are first
726  * "capped" to a reasonable range (+N of the current value) and then
727  * averaged into the existing value by a ratio of 1:N.  Hence
728  * receiving the largest possible priority can still only raise our
729  * "current_priorities" by at most 1.
730  */
731 static double current_priorities;
732
733 /**
734  * Get the filename under which we would store the GNUNET_HELLO_Message
735  * for the given host and protocol.
736  * @return filename of the form DIRECTORY/HOSTID
737  */
738 static char *
739 get_trust_filename (const struct GNUNET_PeerIdentity *id)
740 {
741   struct GNUNET_CRYPTO_HashAsciiEncoded fil;
742   char *fn;
743
744   GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
745   GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil);
746   return fn;
747 }
748
749
750
751 /**
752  * Transmit messages by copying it to the target buffer
753  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
754  * for writing in the meantime.  In that case, do nothing
755  * (the disconnect or shutdown handler will take care of the rest).
756  * If we were able to transmit messages and there are still more
757  * pending, ask core again for further calls to this function.
758  *
759  * @param cls closure, pointer to the 'struct ConnectedPeer*'
760  * @param size number of bytes available in buf
761  * @param buf where the callee should write the message
762  * @return number of bytes written to buf
763  */
764 static size_t
765 transmit_to_peer (void *cls,
766                   size_t size, void *buf);
767
768
769 /* ******************* clean up functions ************************ */
770
771
772 /**
773  * Delete the given migration block.
774  *
775  * @param mb block to delete
776  */
777 static void
778 delete_migration_block (struct MigrationReadyBlock *mb)
779 {
780   GNUNET_CONTAINER_DLL_remove (mig_head,
781                                mig_tail,
782                                mb);
783   GNUNET_PEER_decrement_rcs (mb->target_list,
784                              MIGRATION_LIST_SIZE);
785   mig_size--;
786   GNUNET_free (mb);
787 }
788
789
790 /**
791  * Compare the distance of two peers to a key.
792  *
793  * @param key key
794  * @param p1 first peer
795  * @param p2 second peer
796  * @return GNUNET_YES if P1 is closer to key than P2
797  */
798 static int
799 is_closer (const GNUNET_HashCode *key,
800            const struct GNUNET_PeerIdentity *p1,
801            const struct GNUNET_PeerIdentity *p2)
802 {
803   return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey,
804                                     &p2->hashPubKey,
805                                     key);
806 }
807
808
809 /**
810  * Consider migrating content to a given peer.
811  *
812  * @param cls 'struct MigrationReadyBlock*' to select
813  *            targets for (or NULL for none)
814  * @param key ID of the peer 
815  * @param value 'struct ConnectedPeer' of the peer
816  * @return GNUNET_YES (always continue iteration)
817  */
818 static int
819 consider_migration (void *cls,
820                     const GNUNET_HashCode *key,
821                     void *value)
822 {
823   struct MigrationReadyBlock *mb = cls;
824   struct ConnectedPeer *cp = value;
825   struct MigrationReadyBlock *pos;
826   struct GNUNET_PeerIdentity cppid;
827   struct GNUNET_PeerIdentity otherpid;
828   struct GNUNET_PeerIdentity worstpid;
829   size_t msize;
830   unsigned int i;
831   unsigned int repl;
832   
833   /* consider 'cp' as a migration target for mb */
834   if (mb != NULL)
835     {
836       GNUNET_PEER_resolve (cp->pid,
837                            &cppid);
838       repl = MIGRATION_LIST_SIZE;
839       for (i=0;i<MIGRATION_LIST_SIZE;i++)
840         {
841           if (mb->target_list[i] == 0)
842             {
843               mb->target_list[i] = cp->pid;
844               GNUNET_PEER_change_rc (mb->target_list[i], 1);
845               repl = MIGRATION_LIST_SIZE;
846               break;
847             }
848           GNUNET_PEER_resolve (mb->target_list[i],
849                                &otherpid);
850           if ( (repl == MIGRATION_LIST_SIZE) &&
851                is_closer (&mb->query,
852                           &cppid,
853                           &otherpid)) 
854             {
855               repl = i;
856               worstpid = otherpid;
857             }
858           else if ( (repl != MIGRATION_LIST_SIZE) &&
859                     (is_closer (&mb->query,
860                                 &worstpid,
861                                 &otherpid) ) )
862             {
863               repl = i;
864               worstpid = otherpid;
865             }       
866         }
867       if (repl != MIGRATION_LIST_SIZE) 
868         {
869           GNUNET_PEER_change_rc (mb->target_list[repl], -1);
870           mb->target_list[repl] = cp->pid;
871           GNUNET_PEER_change_rc (mb->target_list[repl], 1);
872         }
873     }
874
875   /* consider scheduling transmission to cp for content migration */
876   if (cp->cth != NULL)
877     return GNUNET_YES; 
878   msize = 0;
879   pos = mig_head;
880   while (pos != NULL)
881     {
882       for (i=0;i<MIGRATION_LIST_SIZE;i++)
883         {
884           if (cp->pid == pos->target_list[i])
885             {
886               if (msize == 0)
887                 msize = pos->size;
888               else
889                 msize = GNUNET_MIN (msize,
890                                     pos->size);
891               break;
892             }
893         }
894       pos = pos->next;
895     }
896   if (msize == 0)
897     return GNUNET_YES; /* no content available */
898 #if DEBUG_FS
899   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
900               "Trying to migrate at least %u bytes to peer `%s'\n",
901               msize,
902               GNUNET_h2s (key));
903 #endif
904   cp->cth 
905     = GNUNET_CORE_notify_transmit_ready (core,
906                                          0, GNUNET_TIME_UNIT_FOREVER_REL,
907                                          (const struct GNUNET_PeerIdentity*) key,
908                                          msize + sizeof (struct PutMessage),
909                                          &transmit_to_peer,
910                                          cp);
911   return GNUNET_YES;
912 }
913
914
915 /**
916  * Task that is run periodically to obtain blocks for content
917  * migration
918  * 
919  * @param cls unused
920  * @param tc scheduler context (also unused)
921  */
922 static void
923 gather_migration_blocks (void *cls,
924                          const struct GNUNET_SCHEDULER_TaskContext *tc);
925
926
927 /**
928  * If the migration task is not currently running, consider
929  * (re)scheduling it with the appropriate delay.
930  */
931 static void
932 consider_migration_gathering ()
933 {
934   struct GNUNET_TIME_Relative delay;
935
936   if (dsh == NULL)
937     return;
938   if (mig_qe != NULL)
939     return;
940   if (mig_task != GNUNET_SCHEDULER_NO_TASK)
941     return;
942   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
943                                          mig_size);
944   delay = GNUNET_TIME_relative_divide (delay,
945                                        MAX_MIGRATION_QUEUE);
946   delay = GNUNET_TIME_relative_max (delay,
947                                     min_migration_delay);
948   mig_task = GNUNET_SCHEDULER_add_delayed (sched,
949                                            delay,
950                                            &gather_migration_blocks,
951                                            NULL);
952 }
953
954
955 /**
956  * Process content offered for migration.
957  *
958  * @param cls closure
959  * @param key key for the content
960  * @param size number of bytes in data
961  * @param data content stored
962  * @param type type of the content
963  * @param priority priority of the content
964  * @param anonymity anonymity-level for the content
965  * @param expiration expiration time for the content
966  * @param uid unique identifier for the datum;
967  *        maybe 0 if no unique identifier is available
968  */
969 static void
970 process_migration_content (void *cls,
971                            const GNUNET_HashCode * key,
972                            uint32_t size,
973                            const void *data,
974                            enum GNUNET_BLOCK_Type type,
975                            uint32_t priority,
976                            uint32_t anonymity,
977                            struct GNUNET_TIME_Absolute
978                            expiration, uint64_t uid)
979 {
980   struct MigrationReadyBlock *mb;
981   
982   if (key == NULL)
983     {
984       mig_qe = NULL;
985       if (mig_size < MAX_MIGRATION_QUEUE)  
986         consider_migration_gathering ();
987       return;
988     }
989   if (type == GNUNET_BLOCK_TYPE_ONDEMAND)
990     {
991       if (GNUNET_OK !=
992           GNUNET_FS_handle_on_demand_block (key, size, data,
993                                             type, priority, anonymity,
994                                             expiration, uid, 
995                                             &process_migration_content,
996                                             NULL))
997         GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
998       return;
999     }
1000 #if DEBUG_FS
1001   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1002               "Retrieved block `%s' of type %u for migration\n",
1003               GNUNET_h2s (key),
1004               type);
1005 #endif
1006   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
1007   mb->query = *key;
1008   mb->expiration = expiration;
1009   mb->size = size;
1010   mb->type = type;
1011   memcpy (&mb[1], data, size);
1012   GNUNET_CONTAINER_DLL_insert_after (mig_head,
1013                                      mig_tail,
1014                                      mig_tail,
1015                                      mb);
1016   mig_size++;
1017   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1018                                          &consider_migration,
1019                                          mb);
1020   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1021 }
1022
1023
1024 /**
1025  * Task that is run periodically to obtain blocks for content
1026  * migration
1027  * 
1028  * @param cls unused
1029  * @param tc scheduler context (also unused)
1030  */
1031 static void
1032 gather_migration_blocks (void *cls,
1033                          const struct GNUNET_SCHEDULER_TaskContext *tc)
1034 {
1035   mig_task = GNUNET_SCHEDULER_NO_TASK;
1036   if (dsh != NULL)
1037     {
1038       mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, -1,
1039                                             GNUNET_TIME_UNIT_FOREVER_REL,
1040                                             &process_migration_content, NULL);
1041       GNUNET_assert (mig_qe != NULL);
1042     }
1043 }
1044
1045
1046 /**
1047  * We're done with a particular message list entry.
1048  * Free all associated resources.
1049  * 
1050  * @param pml entry to destroy
1051  */
1052 static void
1053 destroy_pending_message_list_entry (struct PendingMessageList *pml)
1054 {
1055   GNUNET_CONTAINER_DLL_remove (pml->req->pending_head,
1056                                pml->req->pending_tail,
1057                                pml);
1058   GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head,
1059                                pml->target->pending_messages_tail,
1060                                pml->pm);
1061   pml->target->pending_requests--;
1062   GNUNET_free (pml->pm);
1063   GNUNET_free (pml);
1064 }
1065
1066
1067 /**
1068  * Destroy the given pending message (and call the respective
1069  * continuation).
1070  *
1071  * @param pm message to destroy
1072  * @param tpid id of peer that the message was delivered to, or 0 for none
1073  */
1074 static void
1075 destroy_pending_message (struct PendingMessage *pm,
1076                          GNUNET_PEER_Id tpid)
1077 {
1078   struct PendingMessageList *pml = pm->pml;
1079   TransmissionContinuation cont;
1080   void *cont_cls;
1081
1082   GNUNET_assert (pml->pm == pm);
1083   GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
1084   cont = pm->cont;
1085   cont_cls = pm->cont_cls;
1086   destroy_pending_message_list_entry (pml);
1087   cont (cont_cls, tpid);  
1088 }
1089
1090
1091 /**
1092  * We're done processing a particular request.
1093  * Free all associated resources.
1094  *
1095  * @param pr request to destroy
1096  */
1097 static void
1098 destroy_pending_request (struct PendingRequest *pr)
1099 {
1100   struct GNUNET_PeerIdentity pid;
1101
1102   if (pr->hnode != NULL)
1103     {
1104       GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
1105                                          pr->hnode);
1106       pr->hnode = NULL;
1107     }
1108   if (NULL == pr->client_request_list)
1109     {
1110       GNUNET_STATISTICS_update (stats,
1111                                 gettext_noop ("# P2P searches active"),
1112                                 -1,
1113                                 GNUNET_NO);
1114     }
1115   else
1116     {
1117       GNUNET_STATISTICS_update (stats,
1118                                 gettext_noop ("# client searches active"),
1119                                 -1,
1120                                 GNUNET_NO);
1121     }
1122   /* might have already been removed from map in 'process_reply' (if
1123      there was a unique reply) or never inserted if it was a
1124      duplicate; hence ignore the return value here */
1125   (void) GNUNET_CONTAINER_multihashmap_remove (query_request_map,
1126                                                &pr->query,
1127                                                pr);
1128   if (pr->qe != NULL)
1129      {
1130       GNUNET_DATASTORE_cancel (pr->qe);
1131       pr->qe = NULL;
1132     }
1133   if (pr->client_request_list != NULL)
1134     {
1135       GNUNET_CONTAINER_DLL_remove (pr->client_request_list->client_list->rl_head,
1136                                    pr->client_request_list->client_list->rl_tail,
1137                                    pr->client_request_list);
1138       GNUNET_free (pr->client_request_list);
1139       pr->client_request_list = NULL;
1140     }
1141   if (pr->cp != NULL)
1142     {
1143       GNUNET_PEER_resolve (pr->cp->pid,
1144                            &pid);
1145       (void) GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1146                                                    &pid.hashPubKey,
1147                                                    pr);
1148       pr->cp = NULL;
1149     }
1150   if (pr->bf != NULL)
1151     {
1152       GNUNET_CONTAINER_bloomfilter_free (pr->bf);                                        
1153       pr->bf = NULL;
1154     }
1155   if (pr->irc != NULL)
1156     {
1157       GNUNET_CORE_peer_change_preference_cancel (pr->irc);
1158       pr->irc = NULL;
1159     }
1160   if (pr->replies_seen != NULL)
1161     {
1162       GNUNET_free (pr->replies_seen);
1163       pr->replies_seen = NULL;
1164     }
1165   if (pr->task != GNUNET_SCHEDULER_NO_TASK)
1166     {
1167       GNUNET_SCHEDULER_cancel (sched,
1168                                pr->task);
1169       pr->task = GNUNET_SCHEDULER_NO_TASK;
1170     }
1171   while (NULL != pr->pending_head)    
1172     destroy_pending_message_list_entry (pr->pending_head);
1173   GNUNET_PEER_change_rc (pr->target_pid, -1);
1174   if (pr->used_pids != NULL)
1175     {
1176       GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
1177       GNUNET_free (pr->used_pids);
1178       pr->used_pids_off = 0;
1179       pr->used_pids_size = 0;
1180       pr->used_pids = NULL;
1181     }
1182   GNUNET_free (pr);
1183 }
1184
1185
1186 /**
1187  * Method called whenever a given peer connects.
1188  *
1189  * @param cls closure, not used
1190  * @param peer peer identity this notification is about
1191  * @param latency reported latency of the connection with 'other'
1192  * @param distance reported distance (DV) to 'other' 
1193  */
1194 static void 
1195 peer_connect_handler (void *cls,
1196                       const struct
1197                       GNUNET_PeerIdentity * peer,
1198                       struct GNUNET_TIME_Relative latency,
1199                       uint32_t distance)
1200 {
1201   struct ConnectedPeer *cp;
1202   struct MigrationReadyBlock *pos;
1203   char *fn;
1204   uint32_t trust;
1205   
1206   cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
1207   cp->pid = GNUNET_PEER_intern (peer);
1208
1209   fn = get_trust_filename (peer);
1210   if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
1211       (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
1212     cp->disk_trust = cp->trust = ntohl (trust);
1213   GNUNET_free (fn);
1214
1215   GNUNET_break (GNUNET_OK ==
1216                 GNUNET_CONTAINER_multihashmap_put (connected_peers,
1217                                                    &peer->hashPubKey,
1218                                                    cp,
1219                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1220
1221   pos = mig_head;
1222   while (NULL != pos)
1223     {
1224       (void) consider_migration (pos, &peer->hashPubKey, cp);
1225       pos = pos->next;
1226     }
1227 }
1228
1229
1230 /**
1231  * Increase the host credit by a value.
1232  *
1233  * @param host which peer to change the trust value on
1234  * @param value is the int value by which the
1235  *  host credit is to be increased or decreased
1236  * @returns the actual change in trust (positive or negative)
1237  */
1238 static int
1239 change_host_trust (struct ConnectedPeer *host, int value)
1240 {
1241   unsigned int old_trust;
1242
1243   if (value == 0)
1244     return 0;
1245   GNUNET_assert (host != NULL);
1246   old_trust = host->trust;
1247   if (value > 0)
1248     {
1249       if (host->trust + value < host->trust)
1250         {
1251           value = UINT32_MAX - host->trust;
1252           host->trust = UINT32_MAX;
1253         }
1254       else
1255         host->trust += value;
1256     }
1257   else
1258     {
1259       if (host->trust < -value)
1260         {
1261           value = -host->trust;
1262           host->trust = 0;
1263         }
1264       else
1265         host->trust += value;
1266     }
1267   return value;
1268 }
1269
1270
1271 /**
1272  * Write host-trust information to a file - flush the buffer entry!
1273  */
1274 static int
1275 flush_trust (void *cls,
1276              const GNUNET_HashCode *key,
1277              void *value)
1278 {
1279   struct ConnectedPeer *host = value;
1280   char *fn;
1281   uint32_t trust;
1282   struct GNUNET_PeerIdentity pid;
1283
1284   if (host->trust == host->disk_trust)
1285     return GNUNET_OK;                     /* unchanged */
1286   GNUNET_PEER_resolve (host->pid,
1287                        &pid);
1288   fn = get_trust_filename (&pid);
1289   if (host->trust == 0)
1290     {
1291       if ((0 != UNLINK (fn)) && (errno != ENOENT))
1292         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
1293                                   GNUNET_ERROR_TYPE_BULK, "unlink", fn);
1294     }
1295   else
1296     {
1297       trust = htonl (host->trust);
1298       if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust, 
1299                                                     sizeof(uint32_t),
1300                                                     GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE
1301                                                     | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ))
1302         host->disk_trust = host->trust;
1303     }
1304   GNUNET_free (fn);
1305   return GNUNET_OK;
1306 }
1307
1308 /**
1309  * Call this method periodically to scan data/hosts for new hosts.
1310  */
1311 static void
1312 cron_flush_trust (void *cls,
1313                   const struct GNUNET_SCHEDULER_TaskContext *tc)
1314 {
1315
1316   if (NULL == connected_peers)
1317     return;
1318   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1319                                          &flush_trust,
1320                                          NULL);
1321   if (NULL == tc)
1322     return;
1323   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1324     return;
1325   GNUNET_SCHEDULER_add_delayed (tc->sched,
1326                                 TRUST_FLUSH_FREQ, &cron_flush_trust, NULL);
1327 }
1328
1329
1330 /**
1331  * Free (each) request made by the peer.
1332  *
1333  * @param cls closure, points to peer that the request belongs to
1334  * @param key current key code
1335  * @param value value in the hash map
1336  * @return GNUNET_YES (we should continue to iterate)
1337  */
1338 static int
1339 destroy_request (void *cls,
1340                  const GNUNET_HashCode * key,
1341                  void *value)
1342 {
1343   const struct GNUNET_PeerIdentity * peer = cls;
1344   struct PendingRequest *pr = value;
1345   
1346   GNUNET_break (GNUNET_YES ==
1347                 GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1348                                                       &peer->hashPubKey,
1349                                                       pr));
1350   destroy_pending_request (pr);
1351   return GNUNET_YES;
1352 }
1353
1354
1355 /**
1356  * Method called whenever a peer disconnects.
1357  *
1358  * @param cls closure, not used
1359  * @param peer peer identity this notification is about
1360  */
1361 static void
1362 peer_disconnect_handler (void *cls,
1363                          const struct
1364                          GNUNET_PeerIdentity * peer)
1365 {
1366   struct ConnectedPeer *cp;
1367   struct PendingMessage *pm;
1368   unsigned int i;
1369   struct MigrationReadyBlock *pos;
1370   struct MigrationReadyBlock *next;
1371
1372   GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map,
1373                                               &peer->hashPubKey,
1374                                               &destroy_request,
1375                                               (void*) peer);
1376   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1377                                           &peer->hashPubKey);
1378   if (cp == NULL)
1379     return;
1380   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1381     {
1382       if (NULL != cp->last_client_replies[i])
1383         {
1384           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1385           cp->last_client_replies[i] = NULL;
1386         }
1387     }
1388   GNUNET_break (GNUNET_YES ==
1389                 GNUNET_CONTAINER_multihashmap_remove (connected_peers,
1390                                                       &peer->hashPubKey,
1391                                                       cp));
1392   /* remove this peer from migration considerations; schedule
1393      alternatives */
1394   next = mig_head;
1395   while (NULL != (pos = next))
1396     {
1397       next = pos->next;
1398       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1399         {
1400           if (pos->target_list[i] == cp->pid)
1401             {
1402               GNUNET_PEER_change_rc (pos->target_list[i], -1);
1403               pos->target_list[i] = 0;
1404             }
1405          }
1406       if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers))
1407         {
1408           delete_migration_block (pos);
1409           consider_migration_gathering ();
1410           continue;
1411         }
1412       GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1413                                              &consider_migration,
1414                                              pos);
1415     }
1416   GNUNET_PEER_change_rc (cp->pid, -1);
1417   GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1418   if (NULL != cp->cth)
1419     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1420   while (NULL != (pm = cp->pending_messages_head))
1421     destroy_pending_message (pm, 0 /* delivery failed */);
1422   GNUNET_break (0 == cp->pending_requests);
1423   GNUNET_free (cp);
1424 }
1425
1426
1427 /**
1428  * Iterator over hash map entries that removes all occurences
1429  * of the given 'client' from the 'last_client_replies' of the
1430  * given connected peer.
1431  *
1432  * @param cls closure, the 'struct GNUNET_SERVER_Client*' to remove
1433  * @param key current key code (unused)
1434  * @param value value in the hash map (the 'struct ConnectedPeer*' to change)
1435  * @return GNUNET_YES (we should continue to iterate)
1436  */
1437 static int
1438 remove_client_from_last_client_replies (void *cls,
1439                                         const GNUNET_HashCode * key,
1440                                         void *value)
1441 {
1442   struct GNUNET_SERVER_Client *client = cls;
1443   struct ConnectedPeer *cp = value;
1444   unsigned int i;
1445
1446   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1447     {
1448       if (cp->last_client_replies[i] == client)
1449         {
1450           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1451           cp->last_client_replies[i] = NULL;
1452         }
1453     }  
1454   return GNUNET_YES;
1455 }
1456
1457
1458 /**
1459  * A client disconnected.  Remove all of its pending queries.
1460  *
1461  * @param cls closure, NULL
1462  * @param client identification of the client
1463  */
1464 static void
1465 handle_client_disconnect (void *cls,
1466                           struct GNUNET_SERVER_Client
1467                           * client)
1468 {
1469   struct ClientList *pos;
1470   struct ClientList *prev;
1471   struct ClientRequestList *rcl;
1472   struct ClientResponseMessage *creply;
1473
1474   if (client == NULL)
1475     return;
1476   prev = NULL;
1477   pos = client_list;
1478   while ( (NULL != pos) &&
1479           (pos->client != client) )
1480     {
1481       prev = pos;
1482       pos = pos->next;
1483     }
1484   if (pos == NULL)
1485     return; /* no requests pending for this client */
1486   while (NULL != (rcl = pos->rl_head))
1487     {
1488       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1489                   "Destroying pending request `%s' on disconnect\n",
1490                   GNUNET_h2s (&rcl->req->query));
1491       destroy_pending_request (rcl->req);
1492     }
1493   if (prev == NULL)
1494     client_list = pos->next;
1495   else
1496     prev->next = pos->next;
1497   if (pos->th != NULL)
1498     {
1499       GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
1500       pos->th = NULL;
1501     }
1502   while (NULL != (creply = pos->res_head))
1503     {
1504       GNUNET_CONTAINER_DLL_remove (pos->res_head,
1505                                    pos->res_tail,
1506                                    creply);
1507       GNUNET_free (creply);
1508     }    
1509   GNUNET_SERVER_client_drop (pos->client);
1510   GNUNET_free (pos);
1511   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1512                                          &remove_client_from_last_client_replies,
1513                                          client);
1514 }
1515
1516
1517 /**
1518  * Iterator to free peer entries.
1519  *
1520  * @param cls closure, unused
1521  * @param key current key code
1522  * @param value value in the hash map (peer entry)
1523  * @return GNUNET_YES (we should continue to iterate)
1524  */
1525 static int 
1526 clean_peer (void *cls,
1527             const GNUNET_HashCode * key,
1528             void *value)
1529 {
1530   peer_disconnect_handler (NULL, (const struct GNUNET_PeerIdentity*) key);
1531   return GNUNET_YES;
1532 }
1533
1534
1535 /**
1536  * Task run during shutdown.
1537  *
1538  * @param cls unused
1539  * @param tc unused
1540  */
1541 static void
1542 shutdown_task (void *cls,
1543                const struct GNUNET_SCHEDULER_TaskContext *tc)
1544 {
1545   if (mig_qe != NULL)
1546     {
1547       GNUNET_DATASTORE_cancel (mig_qe);
1548       mig_qe = NULL;
1549     }
1550   if (GNUNET_SCHEDULER_NO_TASK != mig_task)
1551     {
1552       GNUNET_SCHEDULER_cancel (sched, mig_task);
1553       mig_task = GNUNET_SCHEDULER_NO_TASK;
1554     }
1555   while (client_list != NULL)
1556     handle_client_disconnect (NULL,
1557                               client_list->client);
1558   cron_flush_trust (NULL, NULL);
1559   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1560                                          &clean_peer,
1561                                          NULL);
1562   GNUNET_break (0 == GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap));
1563   GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
1564   requests_by_expiration_heap = 0;
1565   GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
1566   connected_peers = NULL;
1567   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map));
1568   GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
1569   query_request_map = NULL;
1570   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map));
1571   GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
1572   peer_request_map = NULL;
1573   GNUNET_assert (NULL != core);
1574   GNUNET_CORE_disconnect (core);
1575   core = NULL;
1576   if (stats != NULL)
1577     {
1578       GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
1579       stats = NULL;
1580     }
1581   if (dsh != NULL)
1582     {
1583       GNUNET_DATASTORE_disconnect (dsh,
1584                                    GNUNET_NO);
1585       dsh = NULL;
1586     }
1587   while (mig_head != NULL)
1588     delete_migration_block (mig_head);
1589   GNUNET_assert (0 == mig_size);
1590   GNUNET_BLOCK_context_destroy (block_ctx);
1591   block_ctx = NULL;
1592   GNUNET_CONFIGURATION_destroy (block_cfg);
1593   block_cfg = NULL;
1594   sched = NULL;
1595   cfg = NULL;  
1596   GNUNET_free_non_null (trustDirectory);
1597   trustDirectory = NULL;
1598 }
1599
1600
1601 /* ******************* Utility functions  ******************** */
1602
1603
1604 /**
1605  * Transmit messages by copying it to the target buffer
1606  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
1607  * for writing in the meantime.  In that case, do nothing
1608  * (the disconnect or shutdown handler will take care of the rest).
1609  * If we were able to transmit messages and there are still more
1610  * pending, ask core again for further calls to this function.
1611  *
1612  * @param cls closure, pointer to the 'struct ConnectedPeer*'
1613  * @param size number of bytes available in buf
1614  * @param buf where the callee should write the message
1615  * @return number of bytes written to buf
1616  */
1617 static size_t
1618 transmit_to_peer (void *cls,
1619                   size_t size, void *buf)
1620 {
1621   struct ConnectedPeer *cp = cls;
1622   char *cbuf = buf;
1623   struct GNUNET_PeerIdentity pid;
1624   struct PendingMessage *pm;
1625   struct MigrationReadyBlock *mb;
1626   struct MigrationReadyBlock *next;
1627   struct PutMessage migm;
1628   size_t msize;
1629   unsigned int i;
1630  
1631   cp->cth = NULL;
1632   if (NULL == buf)
1633     {
1634 #if DEBUG_FS
1635       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1636                   "Dropping message, core too busy.\n");
1637 #endif
1638       return 0;
1639     }
1640   msize = 0;
1641   while ( (NULL != (pm = cp->pending_messages_head) ) &&
1642           (pm->msize <= size) )
1643     {
1644       memcpy (&cbuf[msize], &pm[1], pm->msize);
1645       msize += pm->msize;
1646       size -= pm->msize;
1647       destroy_pending_message (pm, cp->pid);
1648     }
1649   if (NULL != pm)
1650     {
1651       GNUNET_PEER_resolve (cp->pid,
1652                            &pid);
1653       cp->cth = GNUNET_CORE_notify_transmit_ready (core,
1654                                                    pm->priority,
1655                                                    GNUNET_CONSTANTS_SERVICE_TIMEOUT,
1656                                                    &pid,
1657                                                    pm->msize,
1658                                                    &transmit_to_peer,
1659                                                    cp);
1660     }
1661   else
1662     {      
1663       next = mig_head;
1664       while (NULL != (mb = next))
1665         {
1666           next = mb->next;
1667           for (i=0;i<MIGRATION_LIST_SIZE;i++)
1668             {
1669               if ( (cp->pid == mb->target_list[i]) &&
1670                    (mb->size + sizeof (migm) <= size) )
1671                 {
1672                   GNUNET_PEER_change_rc (mb->target_list[i], -1);
1673                   mb->target_list[i] = 0;
1674                   mb->used_targets++;
1675                   memset (&migm, 0, sizeof (migm));
1676                   migm.header.size = htons (sizeof (migm) + mb->size);
1677                   migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
1678                   migm.type = htonl (mb->type);
1679                   migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
1680                   memcpy (&cbuf[msize], &migm, sizeof (migm));
1681                   msize += sizeof (migm);
1682                   size -= sizeof (migm);
1683                   memcpy (&cbuf[msize], &mb[1], mb->size);
1684                   msize += mb->size;
1685                   size -= mb->size;
1686 #if DEBUG_FS
1687                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1688                               "Pushing migration block `%s' (%u bytes) to `%s'\n",
1689                               GNUNET_h2s (&mb->query),
1690                               mb->size,
1691                               GNUNET_i2s (&pid));
1692 #endif    
1693                   break;
1694                 }
1695               else
1696                 {
1697 #if DEBUG_FS
1698                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1699                               "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
1700                               GNUNET_h2s (&mb->query),
1701                               mb->size,
1702                               GNUNET_i2s (&pid));
1703 #endif    
1704                 }
1705             }
1706           if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
1707                (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
1708             {
1709               delete_migration_block (mb);
1710               consider_migration_gathering ();
1711             }
1712         }
1713       consider_migration (NULL, 
1714                           &pid.hashPubKey,
1715                           cp);
1716     }
1717 #if DEBUG_FS
1718   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1719               "Transmitting %u bytes to peer %u\n",
1720               msize,
1721               cp->pid);
1722 #endif
1723   return msize;
1724 }
1725
1726
1727 /**
1728  * Add a message to the set of pending messages for the given peer.
1729  *
1730  * @param cp peer to send message to
1731  * @param pm message to queue
1732  * @param pr request on which behalf this message is being queued
1733  */
1734 static void
1735 add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
1736                                   struct PendingMessage *pm,
1737                                   struct PendingRequest *pr)
1738 {
1739   struct PendingMessage *pos;
1740   struct PendingMessageList *pml;
1741   struct GNUNET_PeerIdentity pid;
1742
1743   GNUNET_assert (pm->next == NULL);
1744   GNUNET_assert (pm->pml == NULL);    
1745   pml = GNUNET_malloc (sizeof (struct PendingMessageList));
1746   pml->req = pr;
1747   pml->target = cp;
1748   pml->pm = pm;
1749   pm->pml = pml;  
1750   GNUNET_CONTAINER_DLL_insert (pr->pending_head,
1751                                pr->pending_tail,
1752                                pml);
1753   pos = cp->pending_messages_head;
1754   while ( (pos != NULL) &&
1755           (pm->priority < pos->priority) )
1756     pos = pos->next;    
1757   GNUNET_CONTAINER_DLL_insert_after (cp->pending_messages_head,
1758                                      cp->pending_messages_tail,
1759                                      pos,
1760                                      pm);
1761   cp->pending_requests++;
1762   if (cp->pending_requests > MAX_QUEUE_PER_PEER)
1763     destroy_pending_message (cp->pending_messages_tail, 0);  
1764   GNUNET_PEER_resolve (cp->pid, &pid);
1765   if (NULL != cp->cth)
1766     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1767   /* need to schedule transmission */
1768   cp->cth = GNUNET_CORE_notify_transmit_ready (core,
1769                                                cp->pending_messages_head->priority,
1770                                                MAX_TRANSMIT_DELAY,
1771                                                &pid,
1772                                                cp->pending_messages_head->msize,
1773                                                &transmit_to_peer,
1774                                                cp);
1775   if (cp->cth == NULL)
1776     {
1777 #if DEBUG_FS
1778       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1779                   "Failed to schedule transmission with core!\n");
1780 #endif
1781       GNUNET_STATISTICS_update (stats,
1782                                 gettext_noop ("# CORE transmission failures"),
1783                                 1,
1784                                 GNUNET_NO);
1785     }
1786 }
1787
1788
1789 /**
1790  * Mingle hash with the mingle_number to produce different bits.
1791  */
1792 static void
1793 mingle_hash (const GNUNET_HashCode * in,
1794              int32_t mingle_number, 
1795              GNUNET_HashCode * hc)
1796 {
1797   GNUNET_HashCode m;
1798
1799   GNUNET_CRYPTO_hash (&mingle_number, 
1800                       sizeof (int32_t), 
1801                       &m);
1802   GNUNET_CRYPTO_hash_xor (&m, in, hc);
1803 }
1804
1805
1806 /**
1807  * Test if the load on this peer is too high
1808  * to even consider processing the query at
1809  * all.
1810  * 
1811  * @return GNUNET_YES if the load is too high to do anything, GNUNET_NO to forward (load high, but not too high), GNUNET_SYSERR to indirect (load low)
1812  */
1813 static int
1814 test_load_too_high ()
1815 {
1816   return GNUNET_SYSERR; // FIXME
1817 }
1818
1819
1820 /* ******************* Pending Request Refresh Task ******************** */
1821
1822
1823
1824 /**
1825  * We use a random delay to make the timing of requests less
1826  * predictable.  This function returns such a random delay.  We add a base
1827  * delay of MAX_CORK_DELAY (1s).
1828  *
1829  * FIXME: make schedule dependent on the specifics of the request?
1830  * Or bandwidth and number of connected peers and load?
1831  *
1832  * @return random delay to use for some request, between 1s and 1000+TTL_DECREMENT ms
1833  */
1834 static struct GNUNET_TIME_Relative
1835 get_processing_delay ()
1836 {
1837   return 
1838     GNUNET_TIME_relative_add (GNUNET_CONSTANTS_MAX_CORK_DELAY,
1839                               GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
1840                                                              GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1841                                                                                        TTL_DECREMENT)));
1842 }
1843
1844
1845 /**
1846  * We're processing a GET request from another peer and have decided
1847  * to forward it to other peers.  This function is called periodically
1848  * and should forward the request to other peers until we have all
1849  * possible replies.  If we have transmitted the *only* reply to
1850  * the initiator we should destroy the pending request.  If we have
1851  * many replies in the queue to the initiator, we should delay sending
1852  * out more queries until the reply queue has shrunk some.
1853  *
1854  * @param cls our "struct ProcessGetContext *"
1855  * @param tc unused
1856  */
1857 static void
1858 forward_request_task (void *cls,
1859                       const struct GNUNET_SCHEDULER_TaskContext *tc);
1860
1861
1862 /**
1863  * Function called after we either failed or succeeded
1864  * at transmitting a query to a peer.  
1865  *
1866  * @param cls the requests "struct PendingRequest*"
1867  * @param tpid ID of receiving peer, 0 on transmission error
1868  */
1869 static void
1870 transmit_query_continuation (void *cls,
1871                              GNUNET_PEER_Id tpid)
1872 {
1873   struct PendingRequest *pr = cls;
1874
1875   GNUNET_STATISTICS_update (stats,
1876                             gettext_noop ("# queries scheduled for forwarding"),
1877                             -1,
1878                             GNUNET_NO);
1879   if (tpid == 0)   
1880     {
1881 #if DEBUG_FS
1882       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1883                   "Transmission of request failed, will try again later.\n");
1884 #endif
1885       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
1886         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1887                                                  get_processing_delay (),
1888                                                  &forward_request_task,
1889                                                  pr); 
1890       return;    
1891     }
1892   GNUNET_STATISTICS_update (stats,
1893                             gettext_noop ("# queries forwarded"),
1894                             1,
1895                             GNUNET_NO);
1896   GNUNET_PEER_change_rc (tpid, 1);
1897   if (pr->used_pids_off == pr->used_pids_size)
1898     GNUNET_array_grow (pr->used_pids,
1899                        pr->used_pids_size,
1900                        pr->used_pids_size * 2 + 2);
1901   pr->used_pids[pr->used_pids_off++] = tpid;
1902   if (pr->task == GNUNET_SCHEDULER_NO_TASK)
1903     pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1904                                              get_processing_delay (),
1905                                              &forward_request_task,
1906                                              pr);
1907 }
1908
1909
1910 /**
1911  * How many bytes should a bloomfilter be if we have already seen
1912  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
1913  * of bits set per entry.  Furthermore, we should not re-size the
1914  * filter too often (to keep it cheap).
1915  *
1916  * Since other peers will also add entries but not resize the filter,
1917  * we should generally pick a slightly larger size than what the
1918  * strict math would suggest.
1919  *
1920  * @return must be a power of two and smaller or equal to 2^15.
1921  */
1922 static size_t
1923 compute_bloomfilter_size (unsigned int entry_count)
1924 {
1925   size_t size;
1926   unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
1927   uint16_t max = 1 << 15;
1928
1929   if (entry_count > max)
1930     return max;
1931   size = 8;
1932   while ((size < max) && (size < ideal))
1933     size *= 2;
1934   if (size > max)
1935     return max;
1936   return size;
1937 }
1938
1939
1940 /**
1941  * Recalculate our bloom filter for filtering replies.  This function
1942  * will create a new bloom filter from scratch, so it should only be
1943  * called if we have no bloomfilter at all (and hence can create a
1944  * fresh one of minimal size without problems) OR if our peer is the
1945  * initiator (in which case we may resize to larger than mimimum size).
1946  *
1947  * @param pr request for which the BF is to be recomputed
1948  */
1949 static void
1950 refresh_bloomfilter (struct PendingRequest *pr)
1951 {
1952   unsigned int i;
1953   size_t nsize;
1954   GNUNET_HashCode mhash;
1955
1956   nsize = compute_bloomfilter_size (pr->replies_seen_off);
1957   if (nsize == pr->bf_size)
1958     return; /* size not changed */
1959   if (pr->bf != NULL)
1960     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
1961   pr->bf_size = nsize;
1962   pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
1963   pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
1964                                               pr->bf_size,
1965                                               BLOOMFILTER_K);
1966   for (i=0;i<pr->replies_seen_off;i++)
1967     {
1968       mingle_hash (&pr->replies_seen[i], pr->mingle, &mhash);
1969       GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
1970     }
1971 }
1972
1973
1974 /**
1975  * Function called after we've tried to reserve a certain amount of
1976  * bandwidth for a reply.  Check if we succeeded and if so send our
1977  * query.
1978  *
1979  * @param cls the requests "struct PendingRequest*"
1980  * @param peer identifies the peer
1981  * @param bpm_in set to the current bandwidth limit (receiving) for this peer
1982  * @param bpm_out set to the current bandwidth limit (sending) for this peer
1983  * @param amount set to the amount that was actually reserved or unreserved
1984  * @param preference current traffic preference for the given peer
1985  */
1986 static void
1987 target_reservation_cb (void *cls,
1988                        const struct
1989                        GNUNET_PeerIdentity * peer,
1990                        struct GNUNET_BANDWIDTH_Value32NBO bpm_in,
1991                        struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
1992                        int amount,
1993                        uint64_t preference)
1994 {
1995   struct PendingRequest *pr = cls;
1996   struct ConnectedPeer *cp;
1997   struct PendingMessage *pm;
1998   struct GetMessage *gm;
1999   GNUNET_HashCode *ext;
2000   char *bfdata;
2001   size_t msize;
2002   unsigned int k;
2003   int no_route;
2004   uint32_t bm;
2005
2006   pr->irc = NULL;
2007   if (peer == NULL)
2008     {
2009       /* error in communication with core, try again later */
2010       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2011         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2012                                                  get_processing_delay (),
2013                                                  &forward_request_task,
2014                                                  pr);
2015       return;
2016     }
2017   // (3) transmit, update ttl/priority
2018   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2019                                           &peer->hashPubKey);
2020   if (cp == NULL)
2021     {
2022       /* Peer must have just left */
2023 #if DEBUG_FS
2024       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2025                   "Selected peer disconnected!\n");
2026 #endif
2027       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2028         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2029                                                  get_processing_delay (),
2030                                                  &forward_request_task,
2031                                                  pr);
2032       return;
2033     }
2034   no_route = GNUNET_NO;
2035   if (amount == 0)
2036     {
2037       if (pr->cp == NULL)
2038         {
2039 #if DEBUG_FS > 1
2040           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2041                       "Failed to reserve bandwidth for reply (got %d/%u bytes only)!\n",
2042                       amount,
2043                       DBLOCK_SIZE);
2044 #endif
2045           GNUNET_STATISTICS_update (stats,
2046                                     gettext_noop ("# reply bandwidth reservation requests failed"),
2047                                     1,
2048                                     GNUNET_NO);
2049           if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2050             pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2051                                                      get_processing_delay (),
2052                                                      &forward_request_task,
2053                                                      pr);
2054           return;  /* this target round failed */
2055         }
2056       /* FIXME: if we are "quite" busy, we may still want to skip
2057          this round; need more load detection code! */
2058       no_route = GNUNET_YES;
2059     }
2060   
2061   GNUNET_STATISTICS_update (stats,
2062                             gettext_noop ("# queries scheduled for forwarding"),
2063                             1,
2064                             GNUNET_NO);
2065   /* build message and insert message into priority queue */
2066 #if DEBUG_FS
2067   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2068               "Forwarding request `%s' to `%4s'!\n",
2069               GNUNET_h2s (&pr->query),
2070               GNUNET_i2s (peer));
2071 #endif
2072   k = 0;
2073   bm = 0;
2074   if (GNUNET_YES == no_route)
2075     {
2076       bm |= GET_MESSAGE_BIT_RETURN_TO;
2077       k++;      
2078     }
2079   if (pr->namespace != NULL)
2080     {
2081       bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
2082       k++;
2083     }
2084   if (pr->target_pid != 0)
2085     {
2086       bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
2087       k++;
2088     }
2089   msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
2090   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
2091   pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
2092   pm->msize = msize;
2093   gm = (struct GetMessage*) &pm[1];
2094   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
2095   gm->header.size = htons (msize);
2096   gm->type = htonl (pr->type);
2097   pr->remaining_priority /= 2;
2098   gm->priority = htonl (pr->remaining_priority);
2099   gm->ttl = htonl (pr->ttl);
2100   gm->filter_mutator = htonl(pr->mingle); 
2101   gm->hash_bitmap = htonl (bm);
2102   gm->query = pr->query;
2103   ext = (GNUNET_HashCode*) &gm[1];
2104   k = 0;
2105   if (GNUNET_YES == no_route)
2106     GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2107   if (pr->namespace != NULL)
2108     memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
2109   if (pr->target_pid != 0)
2110     GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2111   bfdata = (char *) &ext[k];
2112   if (pr->bf != NULL)
2113     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
2114                                                bfdata,
2115                                                pr->bf_size);
2116   pm->cont = &transmit_query_continuation;
2117   pm->cont_cls = pr;
2118   add_to_pending_messages_for_peer (cp, pm, pr);
2119 }
2120
2121
2122 /**
2123  * Closure used for "target_peer_select_cb".
2124  */
2125 struct PeerSelectionContext 
2126 {
2127   /**
2128    * The request for which we are selecting
2129    * peers.
2130    */
2131   struct PendingRequest *pr;
2132
2133   /**
2134    * Current "prime" target.
2135    */
2136   struct GNUNET_PeerIdentity target;
2137
2138   /**
2139    * How much do we like this target?
2140    */
2141   double target_score;
2142
2143 };
2144
2145
2146 /**
2147  * Function called for each connected peer to determine
2148  * which one(s) would make good targets for forwarding.
2149  *
2150  * @param cls closure (struct PeerSelectionContext)
2151  * @param key current key code (peer identity)
2152  * @param value value in the hash map (struct ConnectedPeer)
2153  * @return GNUNET_YES if we should continue to
2154  *         iterate,
2155  *         GNUNET_NO if not.
2156  */
2157 static int
2158 target_peer_select_cb (void *cls,
2159                        const GNUNET_HashCode * key,
2160                        void *value)
2161 {
2162   struct PeerSelectionContext *psc = cls;
2163   struct ConnectedPeer *cp = value;
2164   struct PendingRequest *pr = psc->pr;
2165   double score;
2166   unsigned int i;
2167   unsigned int pc;
2168
2169   /* 1) check that this peer is not the initiator */
2170   if (cp == pr->cp)
2171     {
2172 #if DEBUG_FS
2173       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2174                   "Skipping initiator in forwarding selection\n");
2175 #endif
2176       return GNUNET_YES; /* skip */        
2177     }
2178
2179   /* 2) check if we have already (recently) forwarded to this peer */
2180   pc = 0;
2181   for (i=0;i<pr->used_pids_off;i++)
2182     if (pr->used_pids[i] == cp->pid) 
2183       {
2184         pc++;
2185         if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2186                                            RETRY_PROBABILITY_INV))
2187           {
2188 #if DEBUG_FS
2189             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2190                         "NOT re-trying query that was previously transmitted %u times\n",
2191                         (unsigned int) pr->used_pids_off);
2192 #endif
2193             return GNUNET_YES; /* skip */
2194           }
2195       }
2196 #if DEBUG_FS
2197   if (0 < pc)
2198     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2199                 "Re-trying query that was previously transmitted %u times to this peer\n",
2200                 (unsigned int) pc);
2201 #endif
2202   /* 3) calculate how much we'd like to forward to this peer,
2203      starting with a random value that is strong enough
2204      to at least give any peer a chance sometimes 
2205      (compared to the other factors that come later) */
2206   /* 3a) count successful (recent) routes from cp for same source */
2207   if (pr->cp != NULL)
2208     {
2209       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2210                                         P2P_SUCCESS_LIST_SIZE);
2211       for (i=0;i<P2P_SUCCESS_LIST_SIZE;i++)
2212         if (cp->last_p2p_replies[i] == pr->cp->pid)
2213           score += 1; /* likely successful based on hot path */
2214     }
2215   else
2216     {
2217       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2218                                         CS2P_SUCCESS_LIST_SIZE);
2219       for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
2220         if (cp->last_client_replies[i] == pr->client_request_list->client_list->client)
2221           score += 1; /* likely successful based on hot path */
2222     }
2223   /* 3b) include latency */
2224   if (cp->avg_delay.value < 4 * TTL_DECREMENT)
2225     score += 1; /* likely fast based on latency */
2226   /* 3c) include priorities */
2227   if (cp->avg_priority <= pr->remaining_priority / 2.0)
2228     score += 1; /* likely successful based on priorities */
2229   /* 3d) penalize for queue size */  
2230   score -= (2.0 * cp->pending_requests / (double) MAX_QUEUE_PER_PEER); 
2231   /* 3e) include peer proximity */
2232   score -= (2.0 * (GNUNET_CRYPTO_hash_distance_u32 (key,
2233                                                     &pr->query)) / (double) UINT32_MAX);
2234   /* 4) super-bonus for being the known target */
2235   if (pr->target_pid == cp->pid)
2236     score += 100.0;
2237   /* store best-fit in closure */
2238 #if DEBUG_FS
2239   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2240               "Peer `%s' gets score %f for forwarding query, max is %f\n",
2241               GNUNET_h2s (key),
2242               score,
2243               psc->target_score);
2244 #endif  
2245   score++; /* avoid zero */
2246   if (score > psc->target_score)
2247     {
2248       psc->target_score = score;
2249       psc->target.hashPubKey = *key; 
2250     }
2251   return GNUNET_YES;
2252 }
2253   
2254
2255 /**
2256  * The priority level imposes a bound on the maximum
2257  * value for the ttl that can be requested.
2258  *
2259  * @param ttl_in requested ttl
2260  * @param prio given priority
2261  * @return ttl_in if ttl_in is below the limit,
2262  *         otherwise the ttl-limit for the given priority
2263  */
2264 static int32_t
2265 bound_ttl (int32_t ttl_in, uint32_t prio)
2266 {
2267   unsigned long long allowed;
2268
2269   if (ttl_in <= 0)
2270     return ttl_in;
2271   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
2272   if (ttl_in > allowed)      
2273     {
2274       if (allowed >= (1 << 30))
2275         return 1 << 30;
2276       return allowed;
2277     }
2278   return ttl_in;
2279 }
2280
2281
2282 /**
2283  * We're processing a GET request and have decided
2284  * to forward it to other peers.  This function is called periodically
2285  * and should forward the request to other peers until we have all
2286  * possible replies.  If we have transmitted the *only* reply to
2287  * the initiator we should destroy the pending request.  If we have
2288  * many replies in the queue to the initiator, we should delay sending
2289  * out more queries until the reply queue has shrunk some.
2290  *
2291  * @param cls our "struct ProcessGetContext *"
2292  * @param tc unused
2293  */
2294 static void
2295 forward_request_task (void *cls,
2296                      const struct GNUNET_SCHEDULER_TaskContext *tc)
2297 {
2298   struct PendingRequest *pr = cls;
2299   struct PeerSelectionContext psc;
2300   struct ConnectedPeer *cp; 
2301   struct GNUNET_TIME_Relative delay;
2302
2303   pr->task = GNUNET_SCHEDULER_NO_TASK;
2304   if (pr->irc != NULL)
2305     {
2306 #if DEBUG_FS
2307       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2308                   "Forwarding of query `%s' not attempted due to pending local lookup!\n",
2309                   GNUNET_h2s (&pr->query));
2310 #endif
2311       return; /* already pending */
2312     }
2313   if (GNUNET_YES == pr->local_only)
2314     return; /* configured to not do P2P search */
2315   /* (1) select target */
2316   psc.pr = pr;
2317   psc.target_score = -DBL_MAX;
2318   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
2319                                          &target_peer_select_cb,
2320                                          &psc);  
2321   if (psc.target_score == -DBL_MAX)
2322     {
2323       delay = get_processing_delay ();
2324 #if DEBUG_FS 
2325       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2326                   "No peer selected for forwarding of query `%s', will try again in %llu ms!\n",
2327                   GNUNET_h2s (&pr->query),
2328                   delay.value);
2329 #endif
2330       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2331                                                delay,
2332                                                &forward_request_task,
2333                                                pr);
2334       return; /* nobody selected */
2335     }
2336   /* (3) update TTL/priority */
2337   if (pr->client_request_list != NULL)
2338     {
2339       /* FIXME: use better algorithm!? */
2340       if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2341                                          4))
2342         pr->priority++;
2343       /* bound priority we use by priorities we see from other peers
2344          rounded up (must round up so that we can see non-zero
2345          priorities, but round up as little as possible to make it
2346          plausible that we forwarded another peers request) */
2347       if (pr->priority > current_priorities + 1.0)
2348         pr->priority = (uint32_t) current_priorities + 1.0;
2349       pr->ttl = bound_ttl (pr->ttl + TTL_DECREMENT * 2,
2350                            pr->priority);
2351 #if DEBUG_FS
2352       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2353                   "Trying query `%s' with priority %u and TTL %d.\n",
2354                   GNUNET_h2s (&pr->query),
2355                   pr->priority,
2356                   pr->ttl);
2357 #endif
2358     }
2359
2360   /* (3) reserve reply bandwidth */
2361   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2362                                           &psc.target.hashPubKey);
2363   GNUNET_assert (NULL != cp);
2364   pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
2365                                                 &psc.target,
2366                                                 GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
2367                                                 GNUNET_BANDWIDTH_value_init (UINT32_MAX),
2368                                                 DBLOCK_SIZE * 2, 
2369                                                 cp->inc_preference,
2370                                                 &target_reservation_cb,
2371                                                 pr);
2372   cp->inc_preference = 0;
2373 }
2374
2375
2376 /* **************************** P2P PUT Handling ************************ */
2377
2378
2379 /**
2380  * Function called after we either failed or succeeded
2381  * at transmitting a reply to a peer.  
2382  *
2383  * @param cls the requests "struct PendingRequest*"
2384  * @param tpid ID of receiving peer, 0 on transmission error
2385  */
2386 static void
2387 transmit_reply_continuation (void *cls,
2388                              GNUNET_PEER_Id tpid)
2389 {
2390   struct PendingRequest *pr = cls;
2391   
2392   switch (pr->type)
2393     {
2394     case GNUNET_BLOCK_TYPE_DBLOCK:
2395     case GNUNET_BLOCK_TYPE_IBLOCK:
2396       /* only one reply expected, done with the request! */
2397       destroy_pending_request (pr);
2398       break;
2399     case GNUNET_BLOCK_TYPE_ANY:
2400     case GNUNET_BLOCK_TYPE_KBLOCK:
2401     case GNUNET_BLOCK_TYPE_SBLOCK:
2402       break;
2403     default:
2404       GNUNET_break (0);
2405       break;
2406     }
2407 }
2408
2409
2410 /**
2411  * Transmit the given message by copying it to the target buffer
2412  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
2413  * for writing in the meantime.  In that case, do nothing
2414  * (the disconnect or shutdown handler will take care of the rest).
2415  * If we were able to transmit messages and there are still more
2416  * pending, ask core again for further calls to this function.
2417  *
2418  * @param cls closure, pointer to the 'struct ClientList*'
2419  * @param size number of bytes available in buf
2420  * @param buf where the callee should write the message
2421  * @return number of bytes written to buf
2422  */
2423 static size_t
2424 transmit_to_client (void *cls,
2425                   size_t size, void *buf)
2426 {
2427   struct ClientList *cl = cls;
2428   char *cbuf = buf;
2429   struct ClientResponseMessage *creply;
2430   size_t msize;
2431   
2432   cl->th = NULL;
2433   if (NULL == buf)
2434     {
2435 #if DEBUG_FS
2436       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2437                   "Not sending reply, client communication problem.\n");
2438 #endif
2439       return 0;
2440     }
2441   msize = 0;
2442   while ( (NULL != (creply = cl->res_head) ) &&
2443           (creply->msize <= size) )
2444     {
2445       memcpy (&cbuf[msize], &creply[1], creply->msize);
2446       msize += creply->msize;
2447       size -= creply->msize;
2448       GNUNET_CONTAINER_DLL_remove (cl->res_head,
2449                                    cl->res_tail,
2450                                    creply);
2451       GNUNET_free (creply);
2452     }
2453   if (NULL != creply)
2454     cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
2455                                                   creply->msize,
2456                                                   GNUNET_TIME_UNIT_FOREVER_REL,
2457                                                   &transmit_to_client,
2458                                                   cl);
2459 #if DEBUG_FS
2460   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2461               "Transmitted %u bytes to client\n",
2462               (unsigned int) msize);
2463 #endif
2464   return msize;
2465 }
2466
2467
2468 /**
2469  * Closure for "process_reply" function.
2470  */
2471 struct ProcessReplyClosure
2472 {
2473   /**
2474    * The data for the reply.
2475    */
2476   const void *data;
2477
2478   /**
2479    * Who gave us this reply? NULL for local host.
2480    */
2481   struct ConnectedPeer *sender;
2482
2483   /**
2484    * When the reply expires.
2485    */
2486   struct GNUNET_TIME_Absolute expiration;
2487
2488   /**
2489    * Size of data.
2490    */
2491   size_t size;
2492
2493   /**
2494    * Namespace that this reply belongs to
2495    * (if it is of type SBLOCK).
2496    */
2497   GNUNET_HashCode namespace;
2498
2499   /**
2500    * Type of the block.
2501    */
2502   enum GNUNET_BLOCK_Type type;
2503
2504   /**
2505    * How much was this reply worth to us?
2506    */
2507   uint32_t priority;
2508
2509   /**
2510    * Did we finish processing the associated request?
2511    */ 
2512   int finished;
2513 };
2514
2515
2516 /**
2517  * We have received a reply; handle it!
2518  *
2519  * @param cls response (struct ProcessReplyClosure)
2520  * @param key our query
2521  * @param value value in the hash map (info about the query)
2522  * @return GNUNET_YES (we should continue to iterate)
2523  */
2524 static int
2525 process_reply (void *cls,
2526                const GNUNET_HashCode * key,
2527                void *value)
2528 {
2529   struct ProcessReplyClosure *prq = cls;
2530   struct PendingRequest *pr = value;
2531   struct PendingMessage *reply;
2532   struct ClientResponseMessage *creply;
2533   struct ClientList *cl;
2534   struct PutMessage *pm;
2535   struct ConnectedPeer *cp;
2536   struct GNUNET_TIME_Relative cur_delay;
2537   GNUNET_HashCode chash;
2538   GNUNET_HashCode mhash;
2539   size_t msize;
2540
2541 #if DEBUG_FS
2542   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2543               "Matched result (type %u) for query `%s' with pending request\n",
2544               (unsigned int) prq->type,
2545               GNUNET_h2s (key));
2546 #endif  
2547   GNUNET_STATISTICS_update (stats,
2548                             gettext_noop ("# replies received and matched"),
2549                             1,
2550                             GNUNET_NO);
2551   if (prq->sender != NULL)
2552     {
2553       /* FIXME: should we be more precise here and not use
2554          "start_time" but a peer-specific time stamp? */
2555       cur_delay = GNUNET_TIME_absolute_get_duration (pr->start_time);
2556       prq->sender->avg_delay.value
2557         = (prq->sender->avg_delay.value * 
2558            (RUNAVG_DELAY_N - 1) + cur_delay.value) / RUNAVG_DELAY_N; 
2559       prq->sender->avg_priority
2560         = (prq->sender->avg_priority * 
2561            (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
2562       if (pr->cp != NULL)
2563         {
2564           GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
2565                                  [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], 
2566                                  -1);
2567           GNUNET_PEER_change_rc (pr->cp->pid, 1);
2568           prq->sender->last_p2p_replies
2569             [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
2570             = pr->cp->pid;
2571         }
2572       else
2573         {
2574           if (NULL != prq->sender->last_client_replies
2575               [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
2576             GNUNET_SERVER_client_drop (prq->sender->last_client_replies
2577                                        [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
2578           prq->sender->last_client_replies
2579             [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
2580             = pr->client_request_list->client_list->client;
2581           GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
2582         }
2583     }
2584   GNUNET_CRYPTO_hash (prq->data,
2585                       prq->size,
2586                       &chash);
2587   switch (prq->type)
2588     {
2589     case GNUNET_BLOCK_TYPE_DBLOCK:
2590     case GNUNET_BLOCK_TYPE_IBLOCK:
2591       /* only possible reply, stop requesting! */
2592       while (NULL != pr->pending_head)
2593         destroy_pending_message_list_entry (pr->pending_head);
2594       if (pr->qe != NULL)
2595         {
2596           if (pr->client_request_list != NULL)
2597             GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
2598                                         GNUNET_YES);
2599           GNUNET_DATASTORE_cancel (pr->qe);
2600           pr->qe = NULL;
2601         }
2602       pr->do_remove = GNUNET_YES;
2603       if (pr->task != GNUNET_SCHEDULER_NO_TASK)
2604         {
2605           GNUNET_SCHEDULER_cancel (sched,
2606                                    pr->task);
2607           pr->task = GNUNET_SCHEDULER_NO_TASK;
2608         }
2609       GNUNET_break (GNUNET_YES ==
2610                     GNUNET_CONTAINER_multihashmap_remove (query_request_map,
2611                                                           key,
2612                                                           pr));
2613       break;
2614     case GNUNET_BLOCK_TYPE_SBLOCK:
2615       if (pr->namespace == NULL)
2616         {
2617           GNUNET_break (0);
2618           return GNUNET_YES;
2619         }
2620       if (0 != memcmp (pr->namespace,
2621                        &prq->namespace,
2622                        sizeof (GNUNET_HashCode)))
2623         {
2624           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2625                       _("Reply mismatched in terms of namespace.  Discarded.\n"));
2626           return GNUNET_YES; /* wrong namespace */      
2627         }
2628       /* then: fall-through! */
2629     case GNUNET_BLOCK_TYPE_KBLOCK:
2630     case GNUNET_BLOCK_TYPE_NBLOCK:
2631       if (pr->bf != NULL) 
2632         {
2633           mingle_hash (&chash, pr->mingle, &mhash);
2634           if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (pr->bf,
2635                                                                &mhash))
2636             {
2637               GNUNET_STATISTICS_update (stats,
2638                                         gettext_noop ("# duplicate replies discarded (bloomfilter)"),
2639                                         1,
2640                                         GNUNET_NO);
2641 #if DEBUG_FS
2642               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2643                           "Duplicate response `%s', discarding.\n",
2644                           GNUNET_h2s (&mhash));
2645 #endif
2646               return GNUNET_YES; /* duplicate */
2647             }
2648 #if DEBUG_FS
2649           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2650                       "New response `%s', adding to filter.\n",
2651                       GNUNET_h2s (&mhash));
2652 #endif
2653         }
2654       if (pr->client_request_list != NULL)
2655         {
2656           if (pr->replies_seen_size == pr->replies_seen_off)
2657             GNUNET_array_grow (pr->replies_seen,
2658                                pr->replies_seen_size,
2659                                pr->replies_seen_size * 2 + 4);  
2660             pr->replies_seen[pr->replies_seen_off++] = chash;         
2661         }
2662       if ( (pr->bf == NULL) ||
2663            (pr->client_request_list != NULL) )
2664         refresh_bloomfilter (pr);
2665       GNUNET_CONTAINER_bloomfilter_add (pr->bf,
2666                                         &mhash);
2667       break;
2668     default:
2669       GNUNET_break (0);
2670       return GNUNET_YES;
2671     }
2672   prq->priority += pr->remaining_priority;
2673   pr->remaining_priority = 0;
2674   if (NULL != pr->client_request_list)
2675     {
2676       GNUNET_STATISTICS_update (stats,
2677                                 gettext_noop ("# replies received for local clients"),
2678                                 1,
2679                                 GNUNET_NO);
2680       cl = pr->client_request_list->client_list;
2681       msize = sizeof (struct PutMessage) + prq->size;
2682       creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage));
2683       creply->msize = msize;
2684       creply->client_list = cl;
2685       GNUNET_CONTAINER_DLL_insert_after (cl->res_head,
2686                                          cl->res_tail,
2687                                          cl->res_tail,
2688                                          creply);      
2689       pm = (struct PutMessage*) &creply[1];
2690       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2691       pm->header.size = htons (msize);
2692       pm->type = htonl (prq->type);
2693       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
2694       memcpy (&pm[1], prq->data, prq->size);      
2695       if (NULL == cl->th)
2696         {
2697 #if DEBUG_FS
2698           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2699                       "Transmitting result for query `%s' to client\n",
2700                       GNUNET_h2s (key));
2701 #endif  
2702           cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
2703                                                         msize,
2704                                                         GNUNET_TIME_UNIT_FOREVER_REL,
2705                                                         &transmit_to_client,
2706                                                         cl);
2707         }
2708       GNUNET_break (cl->th != NULL);
2709       if (pr->do_remove)                
2710         {
2711           prq->finished = GNUNET_YES;
2712           destroy_pending_request (pr);         
2713         }
2714     }
2715   else
2716     {
2717       cp = pr->cp;
2718 #if DEBUG_FS
2719       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2720                   "Transmitting result for query `%s' to other peer (PID=%u)\n",
2721                   GNUNET_h2s (key),
2722                   (unsigned int) cp->pid);
2723 #endif  
2724       GNUNET_STATISTICS_update (stats,
2725                                 gettext_noop ("# replies received for other peers"),
2726                                 1,
2727                                 GNUNET_NO);
2728       msize = sizeof (struct PutMessage) + prq->size;
2729       reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
2730       reply->cont = &transmit_reply_continuation;
2731       reply->cont_cls = pr;
2732       reply->msize = msize;
2733       reply->priority = UINT32_MAX; /* send replies first! */
2734       pm = (struct PutMessage*) &reply[1];
2735       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2736       pm->header.size = htons (msize);
2737       pm->type = htonl (prq->type);
2738       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
2739       memcpy (&pm[1], prq->data, prq->size);
2740       add_to_pending_messages_for_peer (cp, reply, pr);
2741     }
2742   return GNUNET_YES;
2743 }
2744
2745
2746 /**
2747  * Continuation called to notify client about result of the
2748  * operation.
2749  *
2750  * @param cls closure
2751  * @param success GNUNET_SYSERR on failure
2752  * @param msg NULL on success, otherwise an error message
2753  */
2754 static void 
2755 put_migration_continuation (void *cls,
2756                             int success,
2757                             const char *msg)
2758 {
2759   /* FIXME */
2760 }
2761
2762
2763 /**
2764  * Handle P2P "PUT" message.
2765  *
2766  * @param cls closure, always NULL
2767  * @param other the other peer involved (sender or receiver, NULL
2768  *        for loopback messages where we are both sender and receiver)
2769  * @param message the actual message
2770  * @param latency reported latency of the connection with 'other'
2771  * @param distance reported distance (DV) to 'other' 
2772  * @return GNUNET_OK to keep the connection open,
2773  *         GNUNET_SYSERR to close it (signal serious error)
2774  */
2775 static int
2776 handle_p2p_put (void *cls,
2777                 const struct GNUNET_PeerIdentity *other,
2778                 const struct GNUNET_MessageHeader *message,
2779                 struct GNUNET_TIME_Relative latency,
2780                 uint32_t distance)
2781 {
2782   const struct PutMessage *put;
2783   uint16_t msize;
2784   size_t dsize;
2785   enum GNUNET_BLOCK_Type type;
2786   struct GNUNET_TIME_Absolute expiration;
2787   GNUNET_HashCode query;
2788   struct ProcessReplyClosure prq;
2789   const struct SBlock *sb;
2790
2791   msize = ntohs (message->size);
2792   if (msize < sizeof (struct PutMessage))
2793     {
2794       GNUNET_break_op(0);
2795       return GNUNET_SYSERR;
2796     }
2797   put = (const struct PutMessage*) message;
2798   dsize = msize - sizeof (struct PutMessage);
2799   type = ntohl (put->type);
2800   expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
2801
2802   if (type == GNUNET_BLOCK_TYPE_ONDEMAND)
2803     return GNUNET_SYSERR;
2804   if (GNUNET_OK !=
2805       GNUNET_BLOCK_get_key (block_ctx,
2806                             type,
2807                             &put[1],
2808                             dsize,
2809                             &query))
2810     {
2811       GNUNET_break_op (0);
2812       return GNUNET_SYSERR;
2813     }
2814   if (GNUNET_BLOCK_TYPE_SBLOCK == type)
2815     { 
2816       sb = (const struct SBlock*) &put[1];
2817       GNUNET_CRYPTO_hash (&sb->subspace,
2818                           sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
2819                           &prq.namespace);
2820     }
2821
2822 #if DEBUG_FS
2823   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2824               "Received result for query `%s' from peer `%4s'\n",
2825               GNUNET_h2s (&query),
2826               GNUNET_i2s (other));
2827 #endif
2828   GNUNET_STATISTICS_update (stats,
2829                             gettext_noop ("# replies received (overall)"),
2830                             1,
2831                             GNUNET_NO);
2832   /* now, lookup 'query' */
2833   prq.data = (const void*) &put[1];
2834   if (other != NULL)
2835     prq.sender = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2836                                                     &other->hashPubKey);
2837   else
2838     prq.sender = NULL;
2839   prq.size = dsize;
2840   prq.type = type;
2841   prq.expiration = expiration;
2842   prq.priority = 0;
2843   prq.finished = GNUNET_NO;
2844   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
2845                                               &query,
2846                                               &process_reply,
2847                                               &prq);
2848   if (prq.sender != NULL)
2849     {
2850       prq.sender->inc_preference += CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority;
2851       prq.sender->trust += prq.priority;
2852     }
2853   if (GNUNET_YES == active_migration)
2854     {
2855 #if DEBUG_FS
2856       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2857                   "Replicating result for query `%s' with priority %u\n",
2858                   GNUNET_h2s (&query),
2859                   prq.priority);
2860 #endif
2861       GNUNET_DATASTORE_put (dsh,
2862                             0, &query, dsize, &put[1],
2863                             type, prq.priority, 1 /* anonymity */, 
2864                             expiration, 
2865                             1 + prq.priority, MAX_DATASTORE_QUEUE,
2866                             GNUNET_CONSTANTS_SERVICE_TIMEOUT,
2867                             &put_migration_continuation, 
2868                             NULL);
2869     }
2870   return GNUNET_OK;
2871 }
2872
2873
2874 /* **************************** P2P GET Handling ************************ */
2875
2876
2877 /**
2878  * Closure for 'check_duplicate_request_{peer,client}'.
2879  */
2880 struct CheckDuplicateRequestClosure
2881 {
2882   /**
2883    * The new request we should check if it already exists.
2884    */
2885   const struct PendingRequest *pr;
2886
2887   /**
2888    * Existing request found by the checker, NULL if none.
2889    */
2890   struct PendingRequest *have;
2891 };
2892
2893
2894 /**
2895  * Iterator over entries in the 'query_request_map' that
2896  * tries to see if we have the same request pending from
2897  * the same client already.
2898  *
2899  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
2900  * @param key current key code (query, ignored, must match)
2901  * @param value value in the hash map (a 'struct PendingRequest' 
2902  *              that already exists)
2903  * @return GNUNET_YES if we should continue to
2904  *         iterate (no match yet)
2905  *         GNUNET_NO if not (match found).
2906  */
2907 static int
2908 check_duplicate_request_client (void *cls,
2909                                 const GNUNET_HashCode * key,
2910                                 void *value)
2911 {
2912   struct CheckDuplicateRequestClosure *cdc = cls;
2913   struct PendingRequest *have = value;
2914
2915   if (have->client_request_list == NULL)
2916     return GNUNET_YES;
2917   if ( (cdc->pr->client_request_list->client_list->client == have->client_request_list->client_list->client) &&
2918        (cdc->pr != have) )
2919     {
2920       cdc->have = have;
2921       return GNUNET_NO;
2922     }
2923   return GNUNET_YES;
2924 }
2925
2926
2927 /**
2928  * We're processing (local) results for a search request
2929  * from another peer.  Pass applicable results to the
2930  * peer and if we are done either clean up (operation
2931  * complete) or forward to other peers (more results possible).
2932  *
2933  * @param cls our closure (struct LocalGetContext)
2934  * @param key key for the content
2935  * @param size number of bytes in data
2936  * @param data content stored
2937  * @param type type of the content
2938  * @param priority priority of the content
2939  * @param anonymity anonymity-level for the content
2940  * @param expiration expiration time for the content
2941  * @param uid unique identifier for the datum;
2942  *        maybe 0 if no unique identifier is available
2943  */
2944 static void
2945 process_local_reply (void *cls,
2946                      const GNUNET_HashCode * key,
2947                      uint32_t size,
2948                      const void *data,
2949                      enum GNUNET_BLOCK_Type type,
2950                      uint32_t priority,
2951                      uint32_t anonymity,
2952                      struct GNUNET_TIME_Absolute
2953                      expiration, 
2954                      uint64_t uid)
2955 {
2956   struct PendingRequest *pr = cls;
2957   struct ProcessReplyClosure prq;
2958   struct CheckDuplicateRequestClosure cdrc;
2959   const struct SBlock *sb;
2960   GNUNET_HashCode dhash;
2961   GNUNET_HashCode mhash;
2962   GNUNET_HashCode query;
2963   
2964   if (NULL == key)
2965     {
2966 #if DEBUG_FS > 1
2967       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2968                   "Done processing local replies, forwarding request to other peers.\n");
2969 #endif
2970       pr->qe = NULL;
2971       if (pr->client_request_list != NULL)
2972         {
2973           GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
2974                                       GNUNET_YES);
2975           /* Figure out if this is a duplicate request and possibly
2976              merge 'struct PendingRequest' entries */
2977           cdrc.have = NULL;
2978           cdrc.pr = pr;
2979           GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
2980                                                       &pr->query,
2981                                                       &check_duplicate_request_client,
2982                                                       &cdrc);
2983           if (cdrc.have != NULL)
2984             {
2985 #if DEBUG_FS
2986               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2987                           "Received request for block `%s' twice from client, will only request once.\n",
2988                           GNUNET_h2s (&pr->query));
2989 #endif
2990               
2991               destroy_pending_request (pr);
2992               return;
2993             }
2994         }
2995
2996       /* no more results */
2997       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2998         pr->task = GNUNET_SCHEDULER_add_now (sched,
2999                                              &forward_request_task,
3000                                              pr);      
3001       return;
3002     }
3003 #if DEBUG_FS
3004   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3005               "New local response to `%s' of type %u.\n",
3006               GNUNET_h2s (key),
3007               type);
3008 #endif
3009   if (type == GNUNET_BLOCK_TYPE_ONDEMAND)
3010     {
3011 #if DEBUG_FS
3012       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3013                   "Found ONDEMAND block, performing on-demand encoding\n");
3014 #endif
3015       GNUNET_STATISTICS_update (stats,
3016                                 gettext_noop ("# on-demand blocks matched requests"),
3017                                 1,
3018                                 GNUNET_NO);
3019       if (GNUNET_OK != 
3020           GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, 
3021                                             anonymity, expiration, uid, 
3022                                             &process_local_reply,
3023                                             pr))
3024       if (pr->qe != NULL)
3025         GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3026       return;
3027     }
3028   /* check for duplicates */
3029   GNUNET_CRYPTO_hash (data, size, &dhash);
3030   mingle_hash (&dhash, 
3031                pr->mingle,
3032                &mhash);
3033   if ( (pr->bf != NULL) &&
3034        (GNUNET_YES ==
3035         GNUNET_CONTAINER_bloomfilter_test (pr->bf,
3036                                            &mhash)) )
3037     {      
3038 #if DEBUG_FS
3039       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3040                   "Result from datastore filtered by bloomfilter (duplicate).\n");
3041 #endif
3042       GNUNET_STATISTICS_update (stats,
3043                                 gettext_noop ("# results filtered by query bloomfilter"),
3044                                 1,
3045                                 GNUNET_NO);
3046       if (pr->qe != NULL)
3047         GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3048       return;
3049     }
3050 #if DEBUG_FS
3051   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3052               "Found result for query `%s' in local datastore\n",
3053               GNUNET_h2s (key));
3054 #endif
3055   GNUNET_STATISTICS_update (stats,
3056                             gettext_noop ("# results found locally"),
3057                             1,
3058                             GNUNET_NO);
3059   pr->results_found++;
3060   memset (&prq, 0, sizeof (prq));
3061   prq.data = data;
3062   prq.expiration = expiration;
3063   prq.size = size;  
3064   if (GNUNET_BLOCK_TYPE_SBLOCK == type)
3065     { 
3066       sb = (const struct SBlock*) data;
3067       GNUNET_CRYPTO_hash (&sb->subspace,
3068                           sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
3069                           &prq.namespace);
3070     }
3071   if (GNUNET_OK != 
3072       GNUNET_BLOCK_get_key (block_ctx,
3073                             type,
3074                             data,
3075                             size,
3076                             &query))
3077     {
3078       GNUNET_break (0);
3079       GNUNET_DATASTORE_remove (dsh,
3080                                key,
3081                                size, data,
3082                                -1, -1, 
3083                                GNUNET_TIME_UNIT_FOREVER_REL,
3084                                NULL, NULL);
3085       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3086       return;
3087     }
3088   prq.type = type;
3089   prq.priority = priority;  
3090   prq.finished = GNUNET_NO;
3091   process_reply (&prq, key, pr);
3092   if (prq.finished == GNUNET_YES)
3093     return;
3094   if (pr->qe == NULL)
3095     return; /* done here */
3096   if ( (type == GNUNET_BLOCK_TYPE_DBLOCK) ||
3097        (type == GNUNET_BLOCK_TYPE_IBLOCK) ) 
3098     {
3099       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3100       return;
3101     }
3102   if ( (pr->client_request_list == NULL) &&
3103        ( (GNUNET_YES == test_load_too_high()) ||
3104          (pr->results_found > 5 + 2 * pr->priority) ) )
3105     {
3106 #if DEBUG_FS > 2
3107       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3108                   "Load too high, done with request\n");
3109 #endif
3110       GNUNET_STATISTICS_update (stats,
3111                                 gettext_noop ("# processing result set cut short due to load"),
3112                                 1,
3113                                 GNUNET_NO);
3114       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3115       return;
3116     }
3117   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3118 }
3119
3120
3121 /**
3122  * We've received a request with the specified priority.  Bound it
3123  * according to how much we trust the given peer.
3124  * 
3125  * @param prio_in requested priority
3126  * @param cp the peer making the request
3127  * @return effective priority
3128  */
3129 static uint32_t
3130 bound_priority (uint32_t prio_in,
3131                 struct ConnectedPeer *cp)
3132 {
3133 #define N ((double)128.0)
3134   uint32_t ret;
3135   double rret;
3136   int ld;
3137
3138   ld = test_load_too_high ();
3139   if (ld == GNUNET_SYSERR)
3140     return 0; /* excess resources */
3141   ret = change_host_trust (cp, prio_in);
3142   if (ret > 0)
3143     {
3144       if (ret > current_priorities + N)
3145         rret = current_priorities + N;
3146       else
3147         rret = ret;
3148       current_priorities 
3149         = (current_priorities * (N-1) + rret)/N;
3150     }
3151 #undef N
3152   return ret;
3153 }
3154
3155
3156 /**
3157  * Iterator over entries in the 'query_request_map' that
3158  * tries to see if we have the same request pending from
3159  * the same peer already.
3160  *
3161  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
3162  * @param key current key code (query, ignored, must match)
3163  * @param value value in the hash map (a 'struct PendingRequest' 
3164  *              that already exists)
3165  * @return GNUNET_YES if we should continue to
3166  *         iterate (no match yet)
3167  *         GNUNET_NO if not (match found).
3168  */
3169 static int
3170 check_duplicate_request_peer (void *cls,
3171                               const GNUNET_HashCode * key,
3172                               void *value)
3173 {
3174   struct CheckDuplicateRequestClosure *cdc = cls;
3175   struct PendingRequest *have = value;
3176
3177   if (cdc->pr->target_pid == have->target_pid)
3178     {
3179       cdc->have = have;
3180       return GNUNET_NO;
3181     }
3182   return GNUNET_YES;
3183 }
3184
3185
3186 /**
3187  * Handle P2P "GET" request.
3188  *
3189  * @param cls closure, always NULL
3190  * @param other the other peer involved (sender or receiver, NULL
3191  *        for loopback messages where we are both sender and receiver)
3192  * @param message the actual message
3193  * @param latency reported latency of the connection with 'other'
3194  * @param distance reported distance (DV) to 'other' 
3195  * @return GNUNET_OK to keep the connection open,
3196  *         GNUNET_SYSERR to close it (signal serious error)
3197  */
3198 static int
3199 handle_p2p_get (void *cls,
3200                 const struct GNUNET_PeerIdentity *other,
3201                 const struct GNUNET_MessageHeader *message,
3202                 struct GNUNET_TIME_Relative latency,
3203                 uint32_t distance)
3204 {
3205   struct PendingRequest *pr;
3206   struct ConnectedPeer *cp;
3207   struct ConnectedPeer *cps;
3208   struct CheckDuplicateRequestClosure cdc;
3209   struct GNUNET_TIME_Relative timeout;
3210   uint16_t msize;
3211   const struct GetMessage *gm;
3212   unsigned int bits;
3213   const GNUNET_HashCode *opt;
3214   uint32_t bm;
3215   size_t bfsize;
3216   uint32_t ttl_decrement;
3217   enum GNUNET_BLOCK_Type type;
3218   int have_ns;
3219   int ld;
3220
3221   msize = ntohs(message->size);
3222   if (msize < sizeof (struct GetMessage))
3223     {
3224       GNUNET_break_op (0);
3225       return GNUNET_SYSERR;
3226     }
3227   gm = (const struct GetMessage*) message;
3228   type = ntohl (gm->type);
3229   switch (type)
3230     {
3231     case GNUNET_BLOCK_TYPE_ANY:
3232     case GNUNET_BLOCK_TYPE_DBLOCK:
3233     case GNUNET_BLOCK_TYPE_IBLOCK:
3234     case GNUNET_BLOCK_TYPE_KBLOCK:
3235     case GNUNET_BLOCK_TYPE_SBLOCK:
3236       break;
3237     default:
3238       GNUNET_break_op (0);
3239       return GNUNET_SYSERR;
3240     }
3241   bm = ntohl (gm->hash_bitmap);
3242   bits = 0;
3243   while (bm > 0)
3244     {
3245       if (1 == (bm & 1))
3246         bits++;
3247       bm >>= 1;
3248     }
3249   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
3250     {
3251       GNUNET_break_op (0);
3252       return GNUNET_SYSERR;
3253     }  
3254   opt = (const GNUNET_HashCode*) &gm[1];
3255   bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
3256   bm = ntohl (gm->hash_bitmap);
3257   if ( (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) &&
3258        (type != GNUNET_BLOCK_TYPE_SBLOCK) )
3259     {
3260       GNUNET_break_op (0);
3261       return GNUNET_SYSERR;      
3262     }
3263   bits = 0;
3264   cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3265                                            &other->hashPubKey);
3266   if (NULL == cps)
3267     {
3268       /* peer must have just disconnected */
3269       GNUNET_STATISTICS_update (stats,
3270                                 gettext_noop ("# requests dropped due to initiator not being connected"),
3271                                 1,
3272                                 GNUNET_NO);
3273       return GNUNET_SYSERR;
3274     }
3275   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
3276     cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3277                                             &opt[bits++]);
3278   else
3279     cp = cps;
3280   if (cp == NULL)
3281     {
3282 #if DEBUG_FS
3283       if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
3284         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3285                     "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
3286                     GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
3287       
3288       else
3289         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3290                     "Failed to find peer `%4s' in connection set. Dropping query.\n",
3291                     GNUNET_i2s (other));
3292 #endif
3293       GNUNET_STATISTICS_update (stats,
3294                                 gettext_noop ("# requests dropped due to missing reverse route"),
3295                                 1,
3296                                 GNUNET_NO);
3297      /* FIXME: try connect? */
3298       return GNUNET_OK;
3299     }
3300   /* note that we can really only check load here since otherwise
3301      peers could find out that we are overloaded by not being
3302      disconnected after sending us a malformed query... */
3303
3304   /* FIXME: query priority should play
3305      a major role here! */
3306   ld = test_load_too_high ();
3307   if (GNUNET_YES == ld)
3308     {
3309 #if DEBUG_FS
3310       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3311                   "Dropping query from `%s', this peer is too busy.\n",
3312                   GNUNET_i2s (other));
3313 #endif
3314       GNUNET_STATISTICS_update (stats,
3315                                 gettext_noop ("# requests dropped due to high load"),
3316                                 1,
3317                                 GNUNET_NO);
3318       return GNUNET_OK;
3319     }
3320   /* FIXME: if ld == GNUNET_NO, forward
3321      instead of indirecting! */
3322
3323 #if DEBUG_FS 
3324   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3325               "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
3326               GNUNET_h2s (&gm->query),
3327               (unsigned int) type,
3328               GNUNET_i2s (other),
3329               (unsigned int) bm);
3330 #endif
3331   have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
3332   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
3333                       (have_ns ? sizeof(GNUNET_HashCode) : 0));
3334   if (have_ns)
3335     {
3336       pr->namespace = (GNUNET_HashCode*) &pr[1];
3337       memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
3338     }
3339   pr->type = type;
3340   pr->mingle = ntohl (gm->filter_mutator);
3341   if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
3342     pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
3343
3344   pr->anonymity_level = 1;
3345   pr->priority = bound_priority (ntohl (gm->priority), cps);
3346   pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
3347   pr->query = gm->query;
3348   /* decrement ttl (always) */
3349   ttl_decrement = 2 * TTL_DECREMENT +
3350     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3351                               TTL_DECREMENT);
3352   if ( (pr->ttl < 0) &&
3353        (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
3354     {
3355 #if DEBUG_FS
3356       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3357                   "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
3358                   GNUNET_i2s (other),
3359                   pr->ttl,
3360                   ttl_decrement);
3361 #endif
3362       GNUNET_STATISTICS_update (stats,
3363                                 gettext_noop ("# requests dropped due TTL underflow"),
3364                                 1,
3365                                 GNUNET_NO);
3366       /* integer underflow => drop (should be very rare)! */      
3367       GNUNET_free (pr);
3368       return GNUNET_OK;
3369     } 
3370   pr->ttl -= ttl_decrement;
3371   pr->start_time = GNUNET_TIME_absolute_get ();
3372
3373   /* get bloom filter */
3374   if (bfsize > 0)
3375     {
3376       pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
3377                                                   bfsize,
3378                                                   BLOOMFILTER_K);
3379       pr->bf_size = bfsize;
3380     }
3381
3382   cdc.have = NULL;
3383   cdc.pr = pr;
3384   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3385                                               &gm->query,
3386                                               &check_duplicate_request_peer,
3387                                               &cdc);
3388   if (cdc.have != NULL)
3389     {
3390       if (cdc.have->start_time.value + cdc.have->ttl >=
3391           pr->start_time.value + pr->ttl)
3392         {
3393           /* existing request has higher TTL, drop new one! */
3394           cdc.have->priority += pr->priority;
3395           destroy_pending_request (pr);
3396 #if DEBUG_FS
3397           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3398                       "Have existing request with higher TTL, dropping new request.\n",
3399                       GNUNET_i2s (other));
3400 #endif
3401           GNUNET_STATISTICS_update (stats,
3402                                     gettext_noop ("# requests dropped due to higher-TTL request"),
3403                                     1,
3404                                     GNUNET_NO);
3405           return GNUNET_OK;
3406         }
3407       else
3408         {
3409           /* existing request has lower TTL, drop old one! */
3410           pr->priority += cdc.have->priority;
3411           /* Possible optimization: if we have applicable pending
3412              replies in 'cdc.have', we might want to move those over
3413              (this is a really rare special-case, so it is not clear
3414              that this would be worth it) */
3415           destroy_pending_request (cdc.have);
3416           /* keep processing 'pr'! */
3417         }
3418     }
3419
3420   pr->cp = cp;
3421   GNUNET_break (GNUNET_OK ==
3422                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
3423                                                    &gm->query,
3424                                                    pr,
3425                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
3426   GNUNET_break (GNUNET_OK ==
3427                 GNUNET_CONTAINER_multihashmap_put (peer_request_map,
3428                                                    &other->hashPubKey,
3429                                                    pr,
3430                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
3431   
3432   pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
3433                                             pr,
3434                                             pr->start_time.value + pr->ttl);
3435
3436   GNUNET_STATISTICS_update (stats,
3437                             gettext_noop ("# P2P searches received"),
3438                             1,
3439                             GNUNET_NO);
3440   GNUNET_STATISTICS_update (stats,
3441                             gettext_noop ("# P2P searches active"),
3442                             1,
3443                             GNUNET_NO);
3444
3445   /* calculate change in traffic preference */
3446   cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
3447   /* process locally */
3448   if (type == GNUNET_BLOCK_TYPE_DBLOCK)
3449     type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
3450   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
3451                                            (pr->priority + 1)); 
3452   pr->qe = GNUNET_DATASTORE_get (dsh,
3453                                  &gm->query,
3454                                  type,                         
3455                                  pr->priority + 1,
3456                                  MAX_DATASTORE_QUEUE,                            
3457                                  timeout,
3458                                  &process_local_reply,
3459                                  pr);
3460
3461   /* Are multiple results possible?  If so, start processing remotely now! */
3462   switch (pr->type)
3463     {
3464     case GNUNET_BLOCK_TYPE_DBLOCK:
3465     case GNUNET_BLOCK_TYPE_IBLOCK:
3466       /* only one result, wait for datastore */
3467       break;
3468     default:
3469       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
3470         pr->task = GNUNET_SCHEDULER_add_now (sched,
3471                                              &forward_request_task,
3472                                              pr);
3473     }
3474
3475   /* make sure we don't track too many requests */
3476   if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
3477     {
3478       pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
3479       GNUNET_assert (pr != NULL);
3480       destroy_pending_request (pr);
3481     }
3482   return GNUNET_OK;
3483 }
3484
3485
3486 /* **************************** CS GET Handling ************************ */
3487
3488
3489 /**
3490  * Handle START_SEARCH-message (search request from client).
3491  *
3492  * @param cls closure
3493  * @param client identification of the client
3494  * @param message the actual message
3495  */
3496 static void
3497 handle_start_search (void *cls,
3498                      struct GNUNET_SERVER_Client *client,
3499                      const struct GNUNET_MessageHeader *message)
3500 {
3501   static GNUNET_HashCode all_zeros;
3502   const struct SearchMessage *sm;
3503   struct ClientList *cl;
3504   struct ClientRequestList *crl;
3505   struct PendingRequest *pr;
3506   uint16_t msize;
3507   unsigned int sc;
3508   enum GNUNET_BLOCK_Type type;
3509
3510   msize = ntohs (message->size);
3511   if ( (msize < sizeof (struct SearchMessage)) ||
3512        (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
3513     {
3514       GNUNET_break (0);
3515       GNUNET_SERVER_receive_done (client,
3516                                   GNUNET_SYSERR);
3517       return;
3518     }
3519   GNUNET_STATISTICS_update (stats,
3520                             gettext_noop ("# client searches received"),
3521                             1,
3522                             GNUNET_NO);
3523   sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
3524   sm = (const struct SearchMessage*) message;
3525   type = ntohl (sm->type);
3526 #if DEBUG_FS
3527   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3528               "Received request for `%s' of type %u from local client\n",
3529               GNUNET_h2s (&sm->query),
3530               (unsigned int) type);
3531 #endif
3532   switch (type)
3533     {
3534     case GNUNET_BLOCK_TYPE_ANY:
3535     case GNUNET_BLOCK_TYPE_DBLOCK:
3536     case GNUNET_BLOCK_TYPE_IBLOCK:
3537     case GNUNET_BLOCK_TYPE_KBLOCK:
3538     case GNUNET_BLOCK_TYPE_SBLOCK:
3539     case GNUNET_BLOCK_TYPE_NBLOCK:
3540       break;
3541     default:
3542       GNUNET_break (0);
3543       GNUNET_SERVER_receive_done (client,
3544                                   GNUNET_SYSERR);
3545       return;
3546     }  
3547
3548   cl = client_list;
3549   while ( (cl != NULL) &&
3550           (cl->client != client) )
3551     cl = cl->next;
3552   if (cl == NULL)
3553     {
3554       cl = GNUNET_malloc (sizeof (struct ClientList));
3555       cl->client = client;
3556       GNUNET_SERVER_client_keep (client);
3557       cl->next = client_list;
3558       client_list = cl;
3559     }
3560   /* detect duplicate KBLOCK requests */
3561   if ( (type == GNUNET_BLOCK_TYPE_KBLOCK) ||
3562        (type == GNUNET_BLOCK_TYPE_NBLOCK) ||
3563        (type == GNUNET_BLOCK_TYPE_ANY) )
3564     {
3565       crl = cl->rl_head;
3566       while ( (crl != NULL) &&
3567               ( (0 != memcmp (&crl->req->query,
3568                               &sm->query,
3569                               sizeof (GNUNET_HashCode))) ||
3570                 (crl->req->type != type) ) )
3571         crl = crl->next;
3572       if (crl != NULL)  
3573         { 
3574 #if DEBUG_FS
3575           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3576                       "Have existing request, merging content-seen lists.\n");
3577 #endif
3578           pr = crl->req;
3579           /* Duplicate request (used to send long list of
3580              known/blocked results); merge 'pr->replies_seen'
3581              and update bloom filter */
3582           GNUNET_array_grow (pr->replies_seen,
3583                              pr->replies_seen_size,
3584                              pr->replies_seen_off + sc);
3585           memcpy (&pr->replies_seen[pr->replies_seen_off],
3586                   &sm[1],
3587                   sc * sizeof (GNUNET_HashCode));
3588           pr->replies_seen_off += sc;
3589           refresh_bloomfilter (pr);
3590           GNUNET_STATISTICS_update (stats,
3591                                     gettext_noop ("# client searches updated (merged content seen list)"),
3592                                     1,
3593                                     GNUNET_NO);
3594           GNUNET_SERVER_receive_done (client,
3595                                       GNUNET_OK);
3596           return;
3597         }
3598     }
3599   GNUNET_STATISTICS_update (stats,
3600                             gettext_noop ("# client searches active"),
3601                             1,
3602                             GNUNET_NO);
3603   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
3604                       ((type == GNUNET_BLOCK_TYPE_SBLOCK) ? sizeof(GNUNET_HashCode) : 0));
3605   crl = GNUNET_malloc (sizeof (struct ClientRequestList));
3606   memset (crl, 0, sizeof (struct ClientRequestList));
3607   crl->client_list = cl;
3608   GNUNET_CONTAINER_DLL_insert (cl->rl_head,
3609                                cl->rl_tail,
3610                                crl);  
3611   crl->req = pr;
3612   pr->type = type;
3613   pr->client_request_list = crl;
3614   GNUNET_array_grow (pr->replies_seen,
3615                      pr->replies_seen_size,
3616                      sc);
3617   memcpy (pr->replies_seen,
3618           &sm[1],
3619           sc * sizeof (GNUNET_HashCode));
3620   pr->replies_seen_off = sc;
3621   pr->anonymity_level = ntohl (sm->anonymity_level); 
3622   refresh_bloomfilter (pr);
3623   pr->query = sm->query;
3624   if (0 == (1 & ntohl (sm->options)))
3625     pr->local_only = GNUNET_NO;
3626   else
3627     pr->local_only = GNUNET_YES;
3628   switch (type)
3629     {
3630     case GNUNET_BLOCK_TYPE_DBLOCK:
3631     case GNUNET_BLOCK_TYPE_IBLOCK:
3632       if (0 != memcmp (&sm->target,
3633                        &all_zeros,
3634                        sizeof (GNUNET_HashCode)))
3635         pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target);
3636       break;
3637     case GNUNET_BLOCK_TYPE_SBLOCK:
3638       pr->namespace = (GNUNET_HashCode*) &pr[1];
3639       memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode));
3640       break;
3641     default:
3642       break;
3643     }
3644   GNUNET_break (GNUNET_OK ==
3645                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
3646                                                    &sm->query,
3647                                                    pr,
3648                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
3649   if (type == GNUNET_BLOCK_TYPE_DBLOCK)
3650     type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */
3651   pr->qe = GNUNET_DATASTORE_get (dsh,
3652                                  &sm->query,
3653                                  type,
3654                                  -3, -1,
3655                                  GNUNET_CONSTANTS_SERVICE_TIMEOUT,                             
3656                                  &process_local_reply,
3657                                  pr);
3658 }
3659
3660
3661 /* **************************** Startup ************************ */
3662
3663 /**
3664  * Process fs requests.
3665  *
3666  * @param s scheduler to use
3667  * @param server the initialized server
3668  * @param c configuration to use
3669  */
3670 static int
3671 main_init (struct GNUNET_SCHEDULER_Handle *s,
3672            struct GNUNET_SERVER_Handle *server,
3673            const struct GNUNET_CONFIGURATION_Handle *c)
3674 {
3675   static const struct GNUNET_CORE_MessageHandler p2p_handlers[] =
3676     {
3677       { &handle_p2p_get, 
3678         GNUNET_MESSAGE_TYPE_FS_GET, 0 },
3679       { &handle_p2p_put, 
3680         GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
3681       { NULL, 0, 0 }
3682     };
3683   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
3684     {&GNUNET_FS_handle_index_start, NULL, 
3685      GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
3686     {&GNUNET_FS_handle_index_list_get, NULL, 
3687      GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
3688     {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
3689      sizeof (struct UnindexMessage) },
3690     {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
3691      0 },
3692     {NULL, NULL, 0, 0}
3693   };
3694   unsigned long long enc = 128;
3695
3696   sched = s;
3697   cfg = c;
3698   stats = GNUNET_STATISTICS_create (sched, "fs", cfg);
3699   min_migration_delay = GNUNET_TIME_UNIT_SECONDS;
3700   if ( (GNUNET_OK !=
3701         GNUNET_CONFIGURATION_get_value_number (cfg,
3702                                                "fs",
3703                                                "MAX_PENDING_REQUESTS",
3704                                                &max_pending_requests)) ||
3705        (GNUNET_OK !=
3706         GNUNET_CONFIGURATION_get_value_number (cfg,
3707                                                "fs",
3708                                                "EXPECTED_NEIGHBOUR_COUNT",
3709                                                &enc)) ||
3710        (GNUNET_OK != 
3711         GNUNET_CONFIGURATION_get_value_time (cfg,
3712                                              "fs",
3713                                              "MIN_MIGRATION_DELAY",
3714                                              &min_migration_delay)) )
3715     {
3716       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3717                   _("Configuration fails to specify certain parameters, assuming default values."));
3718     }
3719   connected_peers = GNUNET_CONTAINER_multihashmap_create (enc); 
3720   query_request_map = GNUNET_CONTAINER_multihashmap_create (max_pending_requests);
3721   peer_request_map = GNUNET_CONTAINER_multihashmap_create (enc);
3722   requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
3723   core = GNUNET_CORE_connect (sched,
3724                               cfg,
3725                               GNUNET_TIME_UNIT_FOREVER_REL,
3726                               NULL,
3727                               NULL,
3728                               &peer_connect_handler,
3729                               &peer_disconnect_handler,
3730                               NULL,
3731                               NULL, GNUNET_NO,
3732                               NULL, GNUNET_NO,
3733                               p2p_handlers);
3734   if (NULL == core)
3735     {
3736       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3737                   _("Failed to connect to `%s' service.\n"),
3738                   "core");
3739       GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
3740       connected_peers = NULL;
3741       GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
3742       query_request_map = NULL;
3743       GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
3744       requests_by_expiration_heap = NULL;
3745       GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
3746       peer_request_map = NULL;
3747       if (dsh != NULL)
3748         {
3749           GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
3750           dsh = NULL;
3751         }
3752       return GNUNET_SYSERR;
3753     }
3754   /* FIXME: distinguish between sending and storing in options? */
3755   if (active_migration) 
3756     {
3757       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3758                   _("Content migration is enabled, will start to gather data\n"));
3759       consider_migration_gathering ();
3760     }
3761   GNUNET_SERVER_disconnect_notify (server, 
3762                                    &handle_client_disconnect,
3763                                    NULL);
3764   GNUNET_assert (GNUNET_OK ==
3765                  GNUNET_CONFIGURATION_get_value_filename (cfg,
3766                                                           "fs",
3767                                                           "TRUST",
3768                                                           &trustDirectory));
3769   GNUNET_DISK_directory_create (trustDirectory);
3770   GNUNET_SCHEDULER_add_with_priority (sched,
3771                                       GNUNET_SCHEDULER_PRIORITY_HIGH,
3772                                       &cron_flush_trust, NULL);
3773
3774
3775   GNUNET_SERVER_add_handlers (server, handlers);
3776   GNUNET_SCHEDULER_add_delayed (sched,
3777                                 GNUNET_TIME_UNIT_FOREVER_REL,
3778                                 &shutdown_task,
3779                                 NULL);
3780   return GNUNET_OK;
3781 }
3782
3783
3784 /**
3785  * Process fs requests.
3786  *
3787  * @param cls closure
3788  * @param sched scheduler to use
3789  * @param server the initialized server
3790  * @param cfg configuration to use
3791  */
3792 static void
3793 run (void *cls,
3794      struct GNUNET_SCHEDULER_Handle *sched,
3795      struct GNUNET_SERVER_Handle *server,
3796      const struct GNUNET_CONFIGURATION_Handle *cfg)
3797 {
3798   active_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
3799                                                            "FS",
3800                                                            "ACTIVEMIGRATION");
3801   dsh = GNUNET_DATASTORE_connect (cfg,
3802                                   sched);
3803   if (dsh == NULL)
3804     {
3805       GNUNET_SCHEDULER_shutdown (sched);
3806       return;
3807     }
3808   block_cfg = GNUNET_CONFIGURATION_create ();
3809   GNUNET_CONFIGURATION_set_value_string (block_cfg,
3810                                          "block",
3811                                          "PLUGINS",
3812                                          "fs");
3813   block_ctx = GNUNET_BLOCK_context_create (block_cfg);
3814   GNUNET_assert (NULL != block_ctx);
3815   if ( (GNUNET_OK != GNUNET_FS_indexing_init (sched, cfg, dsh)) ||
3816        (GNUNET_OK != main_init (sched, server, cfg)) )
3817     {    
3818       GNUNET_SCHEDULER_shutdown (sched);
3819       GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
3820       dsh = NULL;
3821       GNUNET_BLOCK_context_destroy (block_ctx);
3822       block_ctx = NULL;
3823       GNUNET_CONFIGURATION_destroy (block_cfg);
3824       block_cfg = NULL;
3825       return;   
3826     }
3827 }
3828
3829
3830 /**
3831  * The main function for the fs service.
3832  *
3833  * @param argc number of arguments from the command line
3834  * @param argv command line arguments
3835  * @return 0 ok, 1 on error
3836  */
3837 int
3838 main (int argc, char *const *argv)
3839 {
3840   return (GNUNET_OK ==
3841           GNUNET_SERVICE_run (argc,
3842                               argv,
3843                               "fs",
3844                               GNUNET_SERVICE_OPTION_NONE,
3845                               &run, NULL)) ? 0 : 1;
3846 }
3847
3848 /* end of gnunet-service-fs.c */