code simplification
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs.c
23  * @brief gnunet anonymity protocol implementation
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - implement test_load_too_high, make decision priority-based, implement forwarding, etc.
28  * - consider more precise latency estimation (per-peer & request)
29  * - introduce random latency in processing
30  * - tell other peers to stop migration if our PUTs fail (or if
31  *   we don't support migration per configuration?)
32  * - more statistics
33  */
34 #include "platform.h"
35 #include <float.h>
36 #include "gnunet_constants.h"
37 #include "gnunet_core_service.h"
38 #include "gnunet_datastore_service.h"
39 #include "gnunet_peer_lib.h"
40 #include "gnunet_protocols.h"
41 #include "gnunet_signatures.h"
42 #include "gnunet_statistics_service.h"
43 #include "gnunet_util_lib.h"
44 #include "gnunet-service-fs_indexing.h"
45 #include "fs.h"
46
47 #define DEBUG_FS GNUNET_NO
48
49 /**
50  * Maximum number of outgoing messages we queue per peer.
51  */
52 #define MAX_QUEUE_PER_PEER 16
53
54 /**
55  * How often do we flush trust values to disk?
56  */
57 #define TRUST_FLUSH_FREQ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5)
58
59 /**
60  * Inverse of the probability that we will submit the same query
61  * to the same peer again.  If the same peer already got the query
62  * repeatedly recently, the probability is multiplied by the inverse
63  * of this number each time.  Note that we only try about every TTL_DECREMENT/2
64  * plus MAX_CORK_DELAY (so roughly every 3.5s).
65  */
66 #define RETRY_PROBABILITY_INV 3
67
68 /**
69  * What is the maximum delay for a P2P FS message (in our interaction
70  * with core)?  FS-internal delays are another story.  The value is
71  * chosen based on the 32k block size.  Given that peers typcially
72  * have at least 1 kb/s bandwidth, 45s waits give us a chance to
73  * transmit one message even to the lowest-bandwidth peers.
74  */
75 #define MAX_TRANSMIT_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 45)
76
77 /**
78  * Maximum number of requests (from other peers) that we're
79  * willing to have pending at any given point in time.
80  */
81 static unsigned long long max_pending_requests = (32 * 1024);
82
83
84 /**
85  * Information we keep for each pending reply.  The
86  * actual message follows at the end of this struct.
87  */
88 struct PendingMessage;
89
90 /**
91  * Function called upon completion of a transmission.
92  *
93  * @param cls closure
94  * @param pid ID of receiving peer, 0 on transmission error
95  */
96 typedef void (*TransmissionContinuation)(void * cls, 
97                                          GNUNET_PEER_Id tpid);
98
99
100 /**
101  * Information we keep for each pending message (GET/PUT).  The
102  * actual message follows at the end of this struct.
103  */
104 struct PendingMessage
105 {
106   /**
107    * This is a doubly-linked list of messages to the same peer.
108    */
109   struct PendingMessage *next;
110
111   /**
112    * This is a doubly-linked list of messages to the same peer.
113    */
114   struct PendingMessage *prev;
115
116   /**
117    * Entry in pending message list for this pending message.
118    */ 
119   struct PendingMessageList *pml;  
120
121   /**
122    * Function to call immediately once we have transmitted this
123    * message.
124    */
125   TransmissionContinuation cont;
126
127   /**
128    * Closure for cont.
129    */
130   void *cont_cls;
131
132   /**
133    * Size of the reply; actual reply message follows
134    * at the end of this struct.
135    */
136   size_t msize;
137   
138   /**
139    * How important is this message for us?
140    */
141   uint32_t priority;
142  
143 };
144
145
146 /**
147  * Information about a peer that we are connected to.
148  * We track data that is useful for determining which
149  * peers should receive our requests.  We also keep
150  * a list of messages to transmit to this peer.
151  */
152 struct ConnectedPeer
153 {
154
155   /**
156    * List of the last clients for which this peer successfully
157    * answered a query.
158    */
159   struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
160
161   /**
162    * List of the last PIDs for which
163    * this peer successfully answered a query;
164    * We use 0 to indicate no successful reply.
165    */
166   GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
167
168   /**
169    * Average delay between sending the peer a request and
170    * getting a reply (only calculated over the requests for
171    * which we actually got a reply).   Calculated
172    * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
173    */ 
174   struct GNUNET_TIME_Relative avg_delay;
175
176   /**
177    * Handle for an active request for transmission to this
178    * peer, or NULL.
179    */
180   struct GNUNET_CORE_TransmitHandle *cth;
181
182   /**
183    * Messages (replies, queries, content migration) we would like to
184    * send to this peer in the near future.  Sorted by priority, head.
185    */
186   struct PendingMessage *pending_messages_head;
187
188   /**
189    * Messages (replies, queries, content migration) we would like to
190    * send to this peer in the near future.  Sorted by priority, tail.
191    */
192   struct PendingMessage *pending_messages_tail;
193
194   /**
195    * Average priority of successful replies.  Calculated
196    * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
197    */
198   double avg_priority;
199
200   /**
201    * Increase in traffic preference still to be submitted
202    * to the core service for this peer.
203    */
204   uint64_t inc_preference;
205
206   /**
207    * Trust rating for this peer
208    */
209   uint32_t trust;
210
211   /**
212    * Trust rating for this peer on disk.
213    */
214   uint32_t disk_trust;
215
216   /**
217    * The peer's identity.
218    */
219   GNUNET_PEER_Id pid;  
220
221   /**
222    * Size of the linked list of 'pending_messages'.
223    */
224   unsigned int pending_requests;
225
226   /**
227    * Which offset in "last_p2p_replies" will be updated next?
228    * (we go round-robin).
229    */
230   unsigned int last_p2p_replies_woff;
231
232   /**
233    * Which offset in "last_client_replies" will be updated next?
234    * (we go round-robin).
235    */
236   unsigned int last_client_replies_woff;
237
238 };
239
240
241 /**
242  * Information we keep for each pending request.  We should try to
243  * keep this struct as small as possible since its memory consumption
244  * is key to how many requests we can have pending at once.
245  */
246 struct PendingRequest;
247
248
249 /**
250  * Doubly-linked list of requests we are performing
251  * on behalf of the same client.
252  */
253 struct ClientRequestList
254 {
255
256   /**
257    * This is a doubly-linked list.
258    */
259   struct ClientRequestList *next;
260
261   /**
262    * This is a doubly-linked list.
263    */
264   struct ClientRequestList *prev;
265
266   /**
267    * Request this entry represents.
268    */
269   struct PendingRequest *req;
270
271   /**
272    * Client list this request belongs to.
273    */
274   struct ClientList *client_list;
275
276 };
277
278
279 /**
280  * Replies to be transmitted to the client.  The actual
281  * response message is allocated after this struct.
282  */
283 struct ClientResponseMessage
284 {
285   /**
286    * This is a doubly-linked list.
287    */
288   struct ClientResponseMessage *next;
289
290   /**
291    * This is a doubly-linked list.
292    */
293   struct ClientResponseMessage *prev;
294
295   /**
296    * Client list entry this response belongs to.
297    */
298   struct ClientList *client_list;
299
300   /**
301    * Number of bytes in the response.
302    */
303   size_t msize;
304 };
305
306
307 /**
308  * Linked list of clients we are performing requests
309  * for right now.
310  */
311 struct ClientList
312 {
313   /**
314    * This is a linked list.
315    */
316   struct ClientList *next;
317
318   /**
319    * ID of a client making a request, NULL if this entry is for a
320    * peer.
321    */
322   struct GNUNET_SERVER_Client *client;
323
324   /**
325    * Head of list of requests performed on behalf
326    * of this client right now.
327    */
328   struct ClientRequestList *rl_head;
329
330   /**
331    * Tail of list of requests performed on behalf
332    * of this client right now.
333    */
334   struct ClientRequestList *rl_tail;
335
336   /**
337    * Head of linked list of responses.
338    */
339   struct ClientResponseMessage *res_head;
340
341   /**
342    * Tail of linked list of responses.
343    */
344   struct ClientResponseMessage *res_tail;
345
346   /**
347    * Context for sending replies.
348    */
349   struct GNUNET_CONNECTION_TransmitHandle *th;
350
351 };
352
353
354 /**
355  * Doubly-linked list of messages we are performing
356  * due to a pending request.
357  */
358 struct PendingMessageList
359 {
360
361   /**
362    * This is a doubly-linked list of messages on behalf of the same request.
363    */
364   struct PendingMessageList *next;
365
366   /**
367    * This is a doubly-linked list of messages on behalf of the same request.
368    */
369   struct PendingMessageList *prev;
370
371   /**
372    * Message this entry represents.
373    */
374   struct PendingMessage *pm;
375
376   /**
377    * Request this entry belongs to.
378    */
379   struct PendingRequest *req;
380
381   /**
382    * Peer this message is targeted for.
383    */
384   struct ConnectedPeer *target;
385
386 };
387
388
389 /**
390  * Information we keep for each pending request.  We should try to
391  * keep this struct as small as possible since its memory consumption
392  * is key to how many requests we can have pending at once.
393  */
394 struct PendingRequest
395 {
396
397   /**
398    * If this request was made by a client, this is our entry in the
399    * client request list; otherwise NULL.
400    */
401   struct ClientRequestList *client_request_list;
402
403   /**
404    * Entry of peer responsible for this entry (if this request
405    * was made by a peer).
406    */
407   struct ConnectedPeer *cp;
408
409   /**
410    * If this is a namespace query, pointer to the hash of the public
411    * key of the namespace; otherwise NULL.  Pointer will be to the 
412    * end of this struct (so no need to free it).
413    */
414   const GNUNET_HashCode *namespace;
415
416   /**
417    * Bloomfilter we use to filter out replies that we don't care about
418    * (anymore).  NULL as long as we are interested in all replies.
419    */
420   struct GNUNET_CONTAINER_BloomFilter *bf;
421
422   /**
423    * Context of our GNUNET_CORE_peer_change_preference call.
424    */
425   struct GNUNET_CORE_InformationRequestContext *irc;
426
427   /**
428    * Hash code of all replies that we have seen so far (only valid
429    * if client is not NULL since we only track replies like this for
430    * our own clients).
431    */
432   GNUNET_HashCode *replies_seen;
433
434   /**
435    * Node in the heap representing this entry; NULL
436    * if we have no heap node.
437    */
438   struct GNUNET_CONTAINER_HeapNode *hnode;
439
440   /**
441    * Head of list of messages being performed on behalf of this
442    * request.
443    */
444   struct PendingMessageList *pending_head;
445
446   /**
447    * Tail of list of messages being performed on behalf of this
448    * request.
449    */
450   struct PendingMessageList *pending_tail;
451
452   /**
453    * When did we first see this request (form this peer), or, if our
454    * client is initiating, when did we last initiate a search?
455    */
456   struct GNUNET_TIME_Absolute start_time;
457
458   /**
459    * The query that this request is for.
460    */
461   GNUNET_HashCode query;
462
463   /**
464    * The task responsible for transmitting queries
465    * for this request.
466    */
467   GNUNET_SCHEDULER_TaskIdentifier task;
468
469   /**
470    * (Interned) Peer identifier that identifies a preferred target
471    * for requests.
472    */
473   GNUNET_PEER_Id target_pid;
474
475   /**
476    * (Interned) Peer identifiers of peers that have already
477    * received our query for this content.
478    */
479   GNUNET_PEER_Id *used_pids;
480   
481   /**
482    * Our entry in the queue (non-NULL while we wait for our
483    * turn to interact with the local database).
484    */
485   struct GNUNET_DATASTORE_QueueEntry *qe;
486
487   /**
488    * Size of the 'bf' (in bytes).
489    */
490   size_t bf_size;
491
492   /**
493    * Desired anonymity level; only valid for requests from a local client.
494    */
495   uint32_t anonymity_level;
496
497   /**
498    * How many entries in "used_pids" are actually valid?
499    */
500   unsigned int used_pids_off;
501
502   /**
503    * How long is the "used_pids" array?
504    */
505   unsigned int used_pids_size;
506
507   /**
508    * Number of results found for this request.
509    */
510   unsigned int results_found;
511
512   /**
513    * How many entries in "replies_seen" are actually valid?
514    */
515   unsigned int replies_seen_off;
516
517   /**
518    * How long is the "replies_seen" array?
519    */
520   unsigned int replies_seen_size;
521   
522   /**
523    * Priority with which this request was made.  If one of our clients
524    * made the request, then this is the current priority that we are
525    * using when initiating the request.  This value is used when
526    * we decide to reward other peers with trust for providing a reply.
527    */
528   uint32_t priority;
529
530   /**
531    * Priority points left for us to spend when forwarding this request
532    * to other peers.
533    */
534   uint32_t remaining_priority;
535
536   /**
537    * Number to mingle hashes for bloom-filter tests with.
538    */
539   int32_t mingle;
540
541   /**
542    * TTL with which we saw this request (or, if we initiated, TTL that
543    * we used for the request).
544    */
545   int32_t ttl;
546   
547   /**
548    * Type of the content that this request is for.
549    */
550   enum GNUNET_BLOCK_Type type;
551
552   /**
553    * Remove this request after transmission of the current response.
554    */
555   int16_t do_remove;
556
557   /**
558    * GNUNET_YES if we should not forward this request to other peers.
559    */
560   int16_t local_only;
561
562 };
563
564
565 /**
566  * Block that is ready for migration to other peers.  Actual data is at the end of the block.
567  */
568 struct MigrationReadyBlock
569 {
570
571   /**
572    * This is a doubly-linked list.
573    */
574   struct MigrationReadyBlock *next;
575
576   /**
577    * This is a doubly-linked list.
578    */
579   struct MigrationReadyBlock *prev;
580
581   /**
582    * Query for the block.
583    */
584   GNUNET_HashCode query;
585
586   /**
587    * When does this block expire? 
588    */
589   struct GNUNET_TIME_Absolute expiration;
590
591   /**
592    * Peers we would consider forwarding this
593    * block to.  Zero for empty entries.
594    */
595   GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
596
597   /**
598    * Size of the block.
599    */
600   size_t size;
601
602   /**
603    *  Number of targets already used.
604    */
605   unsigned int used_targets;
606
607   /**
608    * Type of the block.
609    */
610   enum GNUNET_BLOCK_Type type;
611 };
612
613
614 /**
615  * Our connection to the datastore.
616  */
617 static struct GNUNET_DATASTORE_Handle *dsh;
618
619 /**
620  * Our block context.
621  */
622 static struct GNUNET_BLOCK_Context *block_ctx;
623
624 /**
625  * Our block configuration.
626  */
627 static struct GNUNET_CONFIGURATION_Handle *block_cfg;
628
629 /**
630  * Our scheduler.
631  */
632 static struct GNUNET_SCHEDULER_Handle *sched;
633
634 /**
635  * Our configuration.
636  */
637 static const struct GNUNET_CONFIGURATION_Handle *cfg;
638
639 /**
640  * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
641  */
642 static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
643
644 /**
645  * Map of peer identifiers to "struct PendingRequest" (for that peer).
646  */
647 static struct GNUNET_CONTAINER_MultiHashMap *peer_request_map;
648
649 /**
650  * Map of query identifiers to "struct PendingRequest" (for that query).
651  */
652 static struct GNUNET_CONTAINER_MultiHashMap *query_request_map;
653
654 /**
655  * Heap with the request that will expire next at the top.  Contains
656  * pointers of type "struct PendingRequest*"; these will *also* be
657  * aliased from the "requests_by_peer" data structures and the
658  * "requests_by_query" table.  Note that requests from our clients
659  * don't expire and are thus NOT in the "requests_by_expiration"
660  * (or the "requests_by_peer" tables).
661  */
662 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
663
664 /**
665  * Handle for reporting statistics.
666  */
667 static struct GNUNET_STATISTICS_Handle *stats;
668
669 /**
670  * Linked list of clients we are currently processing requests for.
671  */
672 static struct ClientList *client_list;
673
674 /**
675  * Pointer to handle to the core service (points to NULL until we've
676  * connected to it).
677  */
678 static struct GNUNET_CORE_Handle *core;
679
680 /**
681  * Head of linked list of blocks that can be migrated.
682  */
683 static struct MigrationReadyBlock *mig_head;
684
685 /**
686  * Tail of linked list of blocks that can be migrated.
687  */
688 static struct MigrationReadyBlock *mig_tail;
689
690 /**
691  * Request to datastore for migration (or NULL).
692  */
693 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
694
695 /**
696  * Where do we store trust information?
697  */
698 static char *trustDirectory;
699
700 /**
701  * ID of task that collects blocks for migration.
702  */
703 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
704
705 /**
706  * What is the maximum frequency at which we are allowed to
707  * poll the datastore for migration content?
708  */
709 static struct GNUNET_TIME_Relative min_migration_delay;
710
711 /**
712  * Size of the doubly-linked list of migration blocks.
713  */
714 static unsigned int mig_size;
715
716 /**
717  * Are we allowed to migrate content to this peer.
718  */
719 static int active_migration;
720
721 /**
722  * Typical priorities we're seeing from other peers right now.  Since
723  * most priorities will be zero, this value is the weighted average of
724  * non-zero priorities seen "recently".  In order to ensure that new
725  * values do not dramatically change the ratio, values are first
726  * "capped" to a reasonable range (+N of the current value) and then
727  * averaged into the existing value by a ratio of 1:N.  Hence
728  * receiving the largest possible priority can still only raise our
729  * "current_priorities" by at most 1.
730  */
731 static double current_priorities;
732
733 /**
734  * Get the filename under which we would store the GNUNET_HELLO_Message
735  * for the given host and protocol.
736  * @return filename of the form DIRECTORY/HOSTID
737  */
738 static char *
739 get_trust_filename (const struct GNUNET_PeerIdentity *id)
740 {
741   struct GNUNET_CRYPTO_HashAsciiEncoded fil;
742   char *fn;
743
744   GNUNET_CRYPTO_hash_to_enc (&id->hashPubKey, &fil);
745   GNUNET_asprintf (&fn, "%s%s%s", trustDirectory, DIR_SEPARATOR_STR, &fil);
746   return fn;
747 }
748
749
750
751 /**
752  * Transmit messages by copying it to the target buffer
753  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
754  * for writing in the meantime.  In that case, do nothing
755  * (the disconnect or shutdown handler will take care of the rest).
756  * If we were able to transmit messages and there are still more
757  * pending, ask core again for further calls to this function.
758  *
759  * @param cls closure, pointer to the 'struct ConnectedPeer*'
760  * @param size number of bytes available in buf
761  * @param buf where the callee should write the message
762  * @return number of bytes written to buf
763  */
764 static size_t
765 transmit_to_peer (void *cls,
766                   size_t size, void *buf);
767
768
769 /* ******************* clean up functions ************************ */
770
771
772 /**
773  * Delete the given migration block.
774  *
775  * @param mb block to delete
776  */
777 static void
778 delete_migration_block (struct MigrationReadyBlock *mb)
779 {
780   GNUNET_CONTAINER_DLL_remove (mig_head,
781                                mig_tail,
782                                mb);
783   GNUNET_PEER_decrement_rcs (mb->target_list,
784                              MIGRATION_LIST_SIZE);
785   mig_size--;
786   GNUNET_free (mb);
787 }
788
789
790 /**
791  * Compare the distance of two peers to a key.
792  *
793  * @param key key
794  * @param p1 first peer
795  * @param p2 second peer
796  * @return GNUNET_YES if P1 is closer to key than P2
797  */
798 static int
799 is_closer (const GNUNET_HashCode *key,
800            const struct GNUNET_PeerIdentity *p1,
801            const struct GNUNET_PeerIdentity *p2)
802 {
803   return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey,
804                                     &p2->hashPubKey,
805                                     key);
806 }
807
808
809 /**
810  * Consider migrating content to a given peer.
811  *
812  * @param cls 'struct MigrationReadyBlock*' to select
813  *            targets for (or NULL for none)
814  * @param key ID of the peer 
815  * @param value 'struct ConnectedPeer' of the peer
816  * @return GNUNET_YES (always continue iteration)
817  */
818 static int
819 consider_migration (void *cls,
820                     const GNUNET_HashCode *key,
821                     void *value)
822 {
823   struct MigrationReadyBlock *mb = cls;
824   struct ConnectedPeer *cp = value;
825   struct MigrationReadyBlock *pos;
826   struct GNUNET_PeerIdentity cppid;
827   struct GNUNET_PeerIdentity otherpid;
828   struct GNUNET_PeerIdentity worstpid;
829   size_t msize;
830   unsigned int i;
831   unsigned int repl;
832   
833   /* consider 'cp' as a migration target for mb */
834   if (mb != NULL)
835     {
836       GNUNET_PEER_resolve (cp->pid,
837                            &cppid);
838       repl = MIGRATION_LIST_SIZE;
839       for (i=0;i<MIGRATION_LIST_SIZE;i++)
840         {
841           if (mb->target_list[i] == 0)
842             {
843               mb->target_list[i] = cp->pid;
844               GNUNET_PEER_change_rc (mb->target_list[i], 1);
845               repl = MIGRATION_LIST_SIZE;
846               break;
847             }
848           GNUNET_PEER_resolve (mb->target_list[i],
849                                &otherpid);
850           if ( (repl == MIGRATION_LIST_SIZE) &&
851                is_closer (&mb->query,
852                           &cppid,
853                           &otherpid)) 
854             {
855               repl = i;
856               worstpid = otherpid;
857             }
858           else if ( (repl != MIGRATION_LIST_SIZE) &&
859                     (is_closer (&mb->query,
860                                 &worstpid,
861                                 &otherpid) ) )
862             {
863               repl = i;
864               worstpid = otherpid;
865             }       
866         }
867       if (repl != MIGRATION_LIST_SIZE) 
868         {
869           GNUNET_PEER_change_rc (mb->target_list[repl], -1);
870           mb->target_list[repl] = cp->pid;
871           GNUNET_PEER_change_rc (mb->target_list[repl], 1);
872         }
873     }
874
875   /* consider scheduling transmission to cp for content migration */
876   if (cp->cth != NULL)
877     return GNUNET_YES; 
878   msize = 0;
879   pos = mig_head;
880   while (pos != NULL)
881     {
882       for (i=0;i<MIGRATION_LIST_SIZE;i++)
883         {
884           if (cp->pid == pos->target_list[i])
885             {
886               if (msize == 0)
887                 msize = pos->size;
888               else
889                 msize = GNUNET_MIN (msize,
890                                     pos->size);
891               break;
892             }
893         }
894       pos = pos->next;
895     }
896   if (msize == 0)
897     return GNUNET_YES; /* no content available */
898 #if DEBUG_FS
899   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
900               "Trying to migrate at least %u bytes to peer `%s'\n",
901               msize,
902               GNUNET_h2s (key));
903 #endif
904   cp->cth 
905     = GNUNET_CORE_notify_transmit_ready (core,
906                                          0, GNUNET_TIME_UNIT_FOREVER_REL,
907                                          (const struct GNUNET_PeerIdentity*) key,
908                                          msize + sizeof (struct PutMessage),
909                                          &transmit_to_peer,
910                                          cp);
911   return GNUNET_YES;
912 }
913
914
915 /**
916  * Task that is run periodically to obtain blocks for content
917  * migration
918  * 
919  * @param cls unused
920  * @param tc scheduler context (also unused)
921  */
922 static void
923 gather_migration_blocks (void *cls,
924                          const struct GNUNET_SCHEDULER_TaskContext *tc);
925
926
927 /**
928  * If the migration task is not currently running, consider
929  * (re)scheduling it with the appropriate delay.
930  */
931 static void
932 consider_migration_gathering ()
933 {
934   struct GNUNET_TIME_Relative delay;
935
936   if (dsh == NULL)
937     return;
938   if (mig_qe != NULL)
939     return;
940   if (mig_task != GNUNET_SCHEDULER_NO_TASK)
941     return;
942   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
943                                          mig_size);
944   delay = GNUNET_TIME_relative_divide (delay,
945                                        MAX_MIGRATION_QUEUE);
946   delay = GNUNET_TIME_relative_max (delay,
947                                     min_migration_delay);
948   mig_task = GNUNET_SCHEDULER_add_delayed (sched,
949                                            delay,
950                                            &gather_migration_blocks,
951                                            NULL);
952 }
953
954
955 /**
956  * Process content offered for migration.
957  *
958  * @param cls closure
959  * @param key key for the content
960  * @param size number of bytes in data
961  * @param data content stored
962  * @param type type of the content
963  * @param priority priority of the content
964  * @param anonymity anonymity-level for the content
965  * @param expiration expiration time for the content
966  * @param uid unique identifier for the datum;
967  *        maybe 0 if no unique identifier is available
968  */
969 static void
970 process_migration_content (void *cls,
971                            const GNUNET_HashCode * key,
972                            uint32_t size,
973                            const void *data,
974                            enum GNUNET_BLOCK_Type type,
975                            uint32_t priority,
976                            uint32_t anonymity,
977                            struct GNUNET_TIME_Absolute
978                            expiration, uint64_t uid)
979 {
980   struct MigrationReadyBlock *mb;
981   
982   if (key == NULL)
983     {
984       mig_qe = NULL;
985       if (mig_size < MAX_MIGRATION_QUEUE)  
986         consider_migration_gathering ();
987       return;
988     }
989   if (type == GNUNET_BLOCK_TYPE_ONDEMAND)
990     {
991       if (GNUNET_OK !=
992           GNUNET_FS_handle_on_demand_block (key, size, data,
993                                             type, priority, anonymity,
994                                             expiration, uid, 
995                                             &process_migration_content,
996                                             NULL))
997         GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
998       return;
999     }
1000 #if DEBUG_FS
1001   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1002               "Retrieved block `%s' of type %u for migration\n",
1003               GNUNET_h2s (key),
1004               type);
1005 #endif
1006   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
1007   mb->query = *key;
1008   mb->expiration = expiration;
1009   mb->size = size;
1010   mb->type = type;
1011   memcpy (&mb[1], data, size);
1012   GNUNET_CONTAINER_DLL_insert_after (mig_head,
1013                                      mig_tail,
1014                                      mig_tail,
1015                                      mb);
1016   mig_size++;
1017   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1018                                          &consider_migration,
1019                                          mb);
1020   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1021 }
1022
1023
1024 /**
1025  * Task that is run periodically to obtain blocks for content
1026  * migration
1027  * 
1028  * @param cls unused
1029  * @param tc scheduler context (also unused)
1030  */
1031 static void
1032 gather_migration_blocks (void *cls,
1033                          const struct GNUNET_SCHEDULER_TaskContext *tc)
1034 {
1035   mig_task = GNUNET_SCHEDULER_NO_TASK;
1036   if (dsh != NULL)
1037     {
1038       mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, -1,
1039                                             GNUNET_TIME_UNIT_FOREVER_REL,
1040                                             &process_migration_content, NULL);
1041       GNUNET_assert (mig_qe != NULL);
1042     }
1043 }
1044
1045
1046 /**
1047  * We're done with a particular message list entry.
1048  * Free all associated resources.
1049  * 
1050  * @param pml entry to destroy
1051  */
1052 static void
1053 destroy_pending_message_list_entry (struct PendingMessageList *pml)
1054 {
1055   GNUNET_CONTAINER_DLL_remove (pml->req->pending_head,
1056                                pml->req->pending_tail,
1057                                pml);
1058   GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head,
1059                                pml->target->pending_messages_tail,
1060                                pml->pm);
1061   pml->target->pending_requests--;
1062   GNUNET_free (pml->pm);
1063   GNUNET_free (pml);
1064 }
1065
1066
1067 /**
1068  * Destroy the given pending message (and call the respective
1069  * continuation).
1070  *
1071  * @param pm message to destroy
1072  * @param tpid id of peer that the message was delivered to, or 0 for none
1073  */
1074 static void
1075 destroy_pending_message (struct PendingMessage *pm,
1076                          GNUNET_PEER_Id tpid)
1077 {
1078   struct PendingMessageList *pml = pm->pml;
1079   TransmissionContinuation cont;
1080   void *cont_cls;
1081
1082   GNUNET_assert (pml->pm == pm);
1083   GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
1084   cont = pm->cont;
1085   cont_cls = pm->cont_cls;
1086   destroy_pending_message_list_entry (pml);
1087   cont (cont_cls, tpid);  
1088 }
1089
1090
1091 /**
1092  * We're done processing a particular request.
1093  * Free all associated resources.
1094  *
1095  * @param pr request to destroy
1096  */
1097 static void
1098 destroy_pending_request (struct PendingRequest *pr)
1099 {
1100   struct GNUNET_PeerIdentity pid;
1101
1102   if (pr->hnode != NULL)
1103     {
1104       GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
1105                                          pr->hnode);
1106       pr->hnode = NULL;
1107     }
1108   if (NULL == pr->client_request_list)
1109     {
1110       GNUNET_STATISTICS_update (stats,
1111                                 gettext_noop ("# P2P searches active"),
1112                                 -1,
1113                                 GNUNET_NO);
1114     }
1115   else
1116     {
1117       GNUNET_STATISTICS_update (stats,
1118                                 gettext_noop ("# client searches active"),
1119                                 -1,
1120                                 GNUNET_NO);
1121     }
1122   /* might have already been removed from map in 'process_reply' (if
1123      there was a unique reply) or never inserted if it was a
1124      duplicate; hence ignore the return value here */
1125   (void) GNUNET_CONTAINER_multihashmap_remove (query_request_map,
1126                                                &pr->query,
1127                                                pr);
1128   if (pr->qe != NULL)
1129      {
1130       GNUNET_DATASTORE_cancel (pr->qe);
1131       pr->qe = NULL;
1132     }
1133   if (pr->client_request_list != NULL)
1134     {
1135       GNUNET_CONTAINER_DLL_remove (pr->client_request_list->client_list->rl_head,
1136                                    pr->client_request_list->client_list->rl_tail,
1137                                    pr->client_request_list);
1138       GNUNET_free (pr->client_request_list);
1139       pr->client_request_list = NULL;
1140     }
1141   if (pr->cp != NULL)
1142     {
1143       GNUNET_PEER_resolve (pr->cp->pid,
1144                            &pid);
1145       (void) GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1146                                                    &pid.hashPubKey,
1147                                                    pr);
1148       pr->cp = NULL;
1149     }
1150   if (pr->bf != NULL)
1151     {
1152       GNUNET_CONTAINER_bloomfilter_free (pr->bf);                                        
1153       pr->bf = NULL;
1154     }
1155   if (pr->irc != NULL)
1156     {
1157       GNUNET_CORE_peer_change_preference_cancel (pr->irc);
1158       pr->irc = NULL;
1159     }
1160   if (pr->replies_seen != NULL)
1161     {
1162       GNUNET_free (pr->replies_seen);
1163       pr->replies_seen = NULL;
1164     }
1165   if (pr->task != GNUNET_SCHEDULER_NO_TASK)
1166     {
1167       GNUNET_SCHEDULER_cancel (sched,
1168                                pr->task);
1169       pr->task = GNUNET_SCHEDULER_NO_TASK;
1170     }
1171   while (NULL != pr->pending_head)    
1172     destroy_pending_message_list_entry (pr->pending_head);
1173   GNUNET_PEER_change_rc (pr->target_pid, -1);
1174   if (pr->used_pids != NULL)
1175     {
1176       GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
1177       GNUNET_free (pr->used_pids);
1178       pr->used_pids_off = 0;
1179       pr->used_pids_size = 0;
1180       pr->used_pids = NULL;
1181     }
1182   GNUNET_free (pr);
1183 }
1184
1185
1186 /**
1187  * Method called whenever a given peer connects.
1188  *
1189  * @param cls closure, not used
1190  * @param peer peer identity this notification is about
1191  * @param latency reported latency of the connection with 'other'
1192  * @param distance reported distance (DV) to 'other' 
1193  */
1194 static void 
1195 peer_connect_handler (void *cls,
1196                       const struct
1197                       GNUNET_PeerIdentity * peer,
1198                       struct GNUNET_TIME_Relative latency,
1199                       uint32_t distance)
1200 {
1201   struct ConnectedPeer *cp;
1202   struct MigrationReadyBlock *pos;
1203   char *fn;
1204   uint32_t trust;
1205   
1206   cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
1207   cp->pid = GNUNET_PEER_intern (peer);
1208
1209   fn = get_trust_filename (peer);
1210   if ((GNUNET_DISK_file_test (fn) == GNUNET_YES) &&
1211       (sizeof (trust) == GNUNET_DISK_fn_read (fn, &trust, sizeof (trust))))
1212     cp->disk_trust = cp->trust = ntohl (trust);
1213   GNUNET_free (fn);
1214
1215   GNUNET_break (GNUNET_OK ==
1216                 GNUNET_CONTAINER_multihashmap_put (connected_peers,
1217                                                    &peer->hashPubKey,
1218                                                    cp,
1219                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1220
1221   pos = mig_head;
1222   while (NULL != pos)
1223     {
1224       (void) consider_migration (pos, &peer->hashPubKey, cp);
1225       pos = pos->next;
1226     }
1227 }
1228
1229
1230 /**
1231  * Increase the host credit by a value.
1232  *
1233  * @param host which peer to change the trust value on
1234  * @param value is the int value by which the
1235  *  host credit is to be increased or decreased
1236  * @returns the actual change in trust (positive or negative)
1237  */
1238 static int
1239 change_host_trust (struct ConnectedPeer *host, int value)
1240 {
1241   unsigned int old_trust;
1242
1243   if (value == 0)
1244     return 0;
1245   GNUNET_assert (host != NULL);
1246   old_trust = host->trust;
1247   if (value > 0)
1248     {
1249       if (host->trust + value < host->trust)
1250         {
1251           value = UINT32_MAX - host->trust;
1252           host->trust = UINT32_MAX;
1253         }
1254       else
1255         host->trust += value;
1256     }
1257   else
1258     {
1259       if (host->trust < -value)
1260         {
1261           value = -host->trust;
1262           host->trust = 0;
1263         }
1264       else
1265         host->trust += value;
1266     }
1267   return value;
1268 }
1269
1270
1271 /**
1272  * Write host-trust information to a file - flush the buffer entry!
1273  */
1274 static int
1275 flush_trust (void *cls,
1276              const GNUNET_HashCode *key,
1277              void *value)
1278 {
1279   struct ConnectedPeer *host = value;
1280   char *fn;
1281   uint32_t trust;
1282   struct GNUNET_PeerIdentity pid;
1283
1284   if (host->trust == host->disk_trust)
1285     return GNUNET_OK;                     /* unchanged */
1286   GNUNET_PEER_resolve (host->pid,
1287                        &pid);
1288   fn = get_trust_filename (&pid);
1289   if (host->trust == 0)
1290     {
1291       if ((0 != UNLINK (fn)) && (errno != ENOENT))
1292         GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING |
1293                                   GNUNET_ERROR_TYPE_BULK, "unlink", fn);
1294     }
1295   else
1296     {
1297       trust = htonl (host->trust);
1298       if (sizeof(uint32_t) == GNUNET_DISK_fn_write (fn, &trust, 
1299                                                     sizeof(uint32_t),
1300                                                     GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE
1301                                                     | GNUNET_DISK_PERM_GROUP_READ | GNUNET_DISK_PERM_OTHER_READ))
1302         host->disk_trust = host->trust;
1303     }
1304   GNUNET_free (fn);
1305   return GNUNET_OK;
1306 }
1307
1308 /**
1309  * Call this method periodically to scan data/hosts for new hosts.
1310  */
1311 static void
1312 cron_flush_trust (void *cls,
1313                   const struct GNUNET_SCHEDULER_TaskContext *tc)
1314 {
1315
1316   if (NULL == connected_peers)
1317     return;
1318   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1319                                          &flush_trust,
1320                                          NULL);
1321   if (NULL == tc)
1322     return;
1323   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1324     return;
1325   GNUNET_SCHEDULER_add_delayed (tc->sched,
1326                                 TRUST_FLUSH_FREQ, &cron_flush_trust, NULL);
1327 }
1328
1329
1330 /**
1331  * Free (each) request made by the peer.
1332  *
1333  * @param cls closure, points to peer that the request belongs to
1334  * @param key current key code
1335  * @param value value in the hash map
1336  * @return GNUNET_YES (we should continue to iterate)
1337  */
1338 static int
1339 destroy_request (void *cls,
1340                  const GNUNET_HashCode * key,
1341                  void *value)
1342 {
1343   const struct GNUNET_PeerIdentity * peer = cls;
1344   struct PendingRequest *pr = value;
1345   
1346   GNUNET_break (GNUNET_YES ==
1347                 GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1348                                                       &peer->hashPubKey,
1349                                                       pr));
1350   destroy_pending_request (pr);
1351   return GNUNET_YES;
1352 }
1353
1354
1355 /**
1356  * Method called whenever a peer disconnects.
1357  *
1358  * @param cls closure, not used
1359  * @param peer peer identity this notification is about
1360  */
1361 static void
1362 peer_disconnect_handler (void *cls,
1363                          const struct
1364                          GNUNET_PeerIdentity * peer)
1365 {
1366   struct ConnectedPeer *cp;
1367   struct PendingMessage *pm;
1368   unsigned int i;
1369   struct MigrationReadyBlock *pos;
1370   struct MigrationReadyBlock *next;
1371
1372   GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map,
1373                                               &peer->hashPubKey,
1374                                               &destroy_request,
1375                                               (void*) peer);
1376   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1377                                           &peer->hashPubKey);
1378   if (cp == NULL)
1379     return;
1380   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1381     {
1382       if (NULL != cp->last_client_replies[i])
1383         {
1384           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1385           cp->last_client_replies[i] = NULL;
1386         }
1387     }
1388   GNUNET_break (GNUNET_YES ==
1389                 GNUNET_CONTAINER_multihashmap_remove (connected_peers,
1390                                                       &peer->hashPubKey,
1391                                                       cp));
1392   /* remove this peer from migration considerations; schedule
1393      alternatives */
1394   next = mig_head;
1395   while (NULL != (pos = next))
1396     {
1397       next = pos->next;
1398       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1399         {
1400           if (pos->target_list[i] == cp->pid)
1401             {
1402               GNUNET_PEER_change_rc (pos->target_list[i], -1);
1403               pos->target_list[i] = 0;
1404             }
1405          }
1406       if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers))
1407         {
1408           delete_migration_block (pos);
1409           consider_migration_gathering ();
1410           continue;
1411         }
1412       GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1413                                              &consider_migration,
1414                                              pos);
1415     }
1416   GNUNET_PEER_change_rc (cp->pid, -1);
1417   GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1418   if (NULL != cp->cth)
1419     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1420   while (NULL != (pm = cp->pending_messages_head))
1421     destroy_pending_message (pm, 0 /* delivery failed */);
1422   GNUNET_break (0 == cp->pending_requests);
1423   GNUNET_free (cp);
1424 }
1425
1426
1427 /**
1428  * Iterator over hash map entries that removes all occurences
1429  * of the given 'client' from the 'last_client_replies' of the
1430  * given connected peer.
1431  *
1432  * @param cls closure, the 'struct GNUNET_SERVER_Client*' to remove
1433  * @param key current key code (unused)
1434  * @param value value in the hash map (the 'struct ConnectedPeer*' to change)
1435  * @return GNUNET_YES (we should continue to iterate)
1436  */
1437 static int
1438 remove_client_from_last_client_replies (void *cls,
1439                                         const GNUNET_HashCode * key,
1440                                         void *value)
1441 {
1442   struct GNUNET_SERVER_Client *client = cls;
1443   struct ConnectedPeer *cp = value;
1444   unsigned int i;
1445
1446   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1447     {
1448       if (cp->last_client_replies[i] == client)
1449         {
1450           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1451           cp->last_client_replies[i] = NULL;
1452         }
1453     }  
1454   return GNUNET_YES;
1455 }
1456
1457
1458 /**
1459  * A client disconnected.  Remove all of its pending queries.
1460  *
1461  * @param cls closure, NULL
1462  * @param client identification of the client
1463  */
1464 static void
1465 handle_client_disconnect (void *cls,
1466                           struct GNUNET_SERVER_Client
1467                           * client)
1468 {
1469   struct ClientList *pos;
1470   struct ClientList *prev;
1471   struct ClientRequestList *rcl;
1472   struct ClientResponseMessage *creply;
1473
1474   if (client == NULL)
1475     return;
1476   prev = NULL;
1477   pos = client_list;
1478   while ( (NULL != pos) &&
1479           (pos->client != client) )
1480     {
1481       prev = pos;
1482       pos = pos->next;
1483     }
1484   if (pos == NULL)
1485     return; /* no requests pending for this client */
1486   while (NULL != (rcl = pos->rl_head))
1487     {
1488       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1489                   "Destroying pending request `%s' on disconnect\n",
1490                   GNUNET_h2s (&rcl->req->query));
1491       destroy_pending_request (rcl->req);
1492     }
1493   if (prev == NULL)
1494     client_list = pos->next;
1495   else
1496     prev->next = pos->next;
1497   if (pos->th != NULL)
1498     {
1499       GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
1500       pos->th = NULL;
1501     }
1502   while (NULL != (creply = pos->res_head))
1503     {
1504       GNUNET_CONTAINER_DLL_remove (pos->res_head,
1505                                    pos->res_tail,
1506                                    creply);
1507       GNUNET_free (creply);
1508     }    
1509   GNUNET_SERVER_client_drop (pos->client);
1510   GNUNET_free (pos);
1511   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1512                                          &remove_client_from_last_client_replies,
1513                                          client);
1514 }
1515
1516
1517 /**
1518  * Iterator to free peer entries.
1519  *
1520  * @param cls closure, unused
1521  * @param key current key code
1522  * @param value value in the hash map (peer entry)
1523  * @return GNUNET_YES (we should continue to iterate)
1524  */
1525 static int 
1526 clean_peer (void *cls,
1527             const GNUNET_HashCode * key,
1528             void *value)
1529 {
1530   peer_disconnect_handler (NULL, (const struct GNUNET_PeerIdentity*) key);
1531   return GNUNET_YES;
1532 }
1533
1534
1535 /**
1536  * Task run during shutdown.
1537  *
1538  * @param cls unused
1539  * @param tc unused
1540  */
1541 static void
1542 shutdown_task (void *cls,
1543                const struct GNUNET_SCHEDULER_TaskContext *tc)
1544 {
1545   if (mig_qe != NULL)
1546     {
1547       GNUNET_DATASTORE_cancel (mig_qe);
1548       mig_qe = NULL;
1549     }
1550   if (GNUNET_SCHEDULER_NO_TASK != mig_task)
1551     {
1552       GNUNET_SCHEDULER_cancel (sched, mig_task);
1553       mig_task = GNUNET_SCHEDULER_NO_TASK;
1554     }
1555   while (client_list != NULL)
1556     handle_client_disconnect (NULL,
1557                               client_list->client);
1558   cron_flush_trust (NULL, NULL);
1559   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1560                                          &clean_peer,
1561                                          NULL);
1562   GNUNET_break (0 == GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap));
1563   GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
1564   requests_by_expiration_heap = 0;
1565   GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
1566   connected_peers = NULL;
1567   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map));
1568   GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
1569   query_request_map = NULL;
1570   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map));
1571   GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
1572   peer_request_map = NULL;
1573   GNUNET_assert (NULL != core);
1574   GNUNET_CORE_disconnect (core);
1575   core = NULL;
1576   if (stats != NULL)
1577     {
1578       GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
1579       stats = NULL;
1580     }
1581   if (dsh != NULL)
1582     {
1583       GNUNET_DATASTORE_disconnect (dsh,
1584                                    GNUNET_NO);
1585       dsh = NULL;
1586     }
1587   while (mig_head != NULL)
1588     delete_migration_block (mig_head);
1589   GNUNET_assert (0 == mig_size);
1590   GNUNET_BLOCK_context_destroy (block_ctx);
1591   block_ctx = NULL;
1592   GNUNET_CONFIGURATION_destroy (block_cfg);
1593   block_cfg = NULL;
1594   sched = NULL;
1595   cfg = NULL;  
1596   GNUNET_free_non_null (trustDirectory);
1597   trustDirectory = NULL;
1598 }
1599
1600
1601 /* ******************* Utility functions  ******************** */
1602
1603
1604 /**
1605  * Transmit messages by copying it to the target buffer
1606  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
1607  * for writing in the meantime.  In that case, do nothing
1608  * (the disconnect or shutdown handler will take care of the rest).
1609  * If we were able to transmit messages and there are still more
1610  * pending, ask core again for further calls to this function.
1611  *
1612  * @param cls closure, pointer to the 'struct ConnectedPeer*'
1613  * @param size number of bytes available in buf
1614  * @param buf where the callee should write the message
1615  * @return number of bytes written to buf
1616  */
1617 static size_t
1618 transmit_to_peer (void *cls,
1619                   size_t size, void *buf)
1620 {
1621   struct ConnectedPeer *cp = cls;
1622   char *cbuf = buf;
1623   struct GNUNET_PeerIdentity pid;
1624   struct PendingMessage *pm;
1625   struct MigrationReadyBlock *mb;
1626   struct MigrationReadyBlock *next;
1627   struct PutMessage migm;
1628   size_t msize;
1629   unsigned int i;
1630  
1631   cp->cth = NULL;
1632   if (NULL == buf)
1633     {
1634 #if DEBUG_FS
1635       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1636                   "Dropping message, core too busy.\n");
1637 #endif
1638       return 0;
1639     }
1640   msize = 0;
1641   while ( (NULL != (pm = cp->pending_messages_head) ) &&
1642           (pm->msize <= size) )
1643     {
1644       memcpy (&cbuf[msize], &pm[1], pm->msize);
1645       msize += pm->msize;
1646       size -= pm->msize;
1647       destroy_pending_message (pm, cp->pid);
1648     }
1649   if (NULL != pm)
1650     {
1651       GNUNET_PEER_resolve (cp->pid,
1652                            &pid);
1653       cp->cth = GNUNET_CORE_notify_transmit_ready (core,
1654                                                    pm->priority,
1655                                                    GNUNET_CONSTANTS_SERVICE_TIMEOUT,
1656                                                    &pid,
1657                                                    pm->msize,
1658                                                    &transmit_to_peer,
1659                                                    cp);
1660     }
1661   else
1662     {      
1663       next = mig_head;
1664       while (NULL != (mb = next))
1665         {
1666           next = mb->next;
1667           for (i=0;i<MIGRATION_LIST_SIZE;i++)
1668             {
1669               if ( (cp->pid == mb->target_list[i]) &&
1670                    (mb->size + sizeof (migm) <= size) )
1671                 {
1672                   GNUNET_PEER_change_rc (mb->target_list[i], -1);
1673                   mb->target_list[i] = 0;
1674                   mb->used_targets++;
1675                   memset (&migm, 0, sizeof (migm));
1676                   migm.header.size = htons (sizeof (migm) + mb->size);
1677                   migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
1678                   migm.type = htonl (mb->type);
1679                   migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
1680                   memcpy (&cbuf[msize], &migm, sizeof (migm));
1681                   msize += sizeof (migm);
1682                   size -= sizeof (migm);
1683                   memcpy (&cbuf[msize], &mb[1], mb->size);
1684                   msize += mb->size;
1685                   size -= mb->size;
1686 #if DEBUG_FS
1687                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1688                               "Pushing migration block `%s' (%u bytes) to `%s'\n",
1689                               GNUNET_h2s (&mb->query),
1690                               mb->size,
1691                               GNUNET_i2s (&pid));
1692 #endif    
1693                   break;
1694                 }
1695               else
1696                 {
1697 #if DEBUG_FS
1698                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1699                               "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
1700                               GNUNET_h2s (&mb->query),
1701                               mb->size,
1702                               GNUNET_i2s (&pid));
1703 #endif    
1704                 }
1705             }
1706           if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
1707                (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
1708             {
1709               delete_migration_block (mb);
1710               consider_migration_gathering ();
1711             }
1712         }
1713       consider_migration (NULL, 
1714                           &pid.hashPubKey,
1715                           cp);
1716     }
1717 #if DEBUG_FS
1718   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1719               "Transmitting %u bytes to peer %u\n",
1720               msize,
1721               cp->pid);
1722 #endif
1723   return msize;
1724 }
1725
1726
1727 /**
1728  * Add a message to the set of pending messages for the given peer.
1729  *
1730  * @param cp peer to send message to
1731  * @param pm message to queue
1732  * @param pr request on which behalf this message is being queued
1733  */
1734 static void
1735 add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
1736                                   struct PendingMessage *pm,
1737                                   struct PendingRequest *pr)
1738 {
1739   struct PendingMessage *pos;
1740   struct PendingMessageList *pml;
1741   struct GNUNET_PeerIdentity pid;
1742
1743   GNUNET_assert (pm->next == NULL);
1744   GNUNET_assert (pm->pml == NULL);    
1745   pml = GNUNET_malloc (sizeof (struct PendingMessageList));
1746   pml->req = pr;
1747   pml->target = cp;
1748   pml->pm = pm;
1749   pm->pml = pml;  
1750   GNUNET_CONTAINER_DLL_insert (pr->pending_head,
1751                                pr->pending_tail,
1752                                pml);
1753   pos = cp->pending_messages_head;
1754   while ( (pos != NULL) &&
1755           (pm->priority < pos->priority) )
1756     pos = pos->next;    
1757   GNUNET_CONTAINER_DLL_insert_after (cp->pending_messages_head,
1758                                      cp->pending_messages_tail,
1759                                      pos,
1760                                      pm);
1761   cp->pending_requests++;
1762   if (cp->pending_requests > MAX_QUEUE_PER_PEER)
1763     destroy_pending_message (cp->pending_messages_tail, 0);  
1764   GNUNET_PEER_resolve (cp->pid, &pid);
1765   if (NULL != cp->cth)
1766     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1767   /* need to schedule transmission */
1768   cp->cth = GNUNET_CORE_notify_transmit_ready (core,
1769                                                cp->pending_messages_head->priority,
1770                                                MAX_TRANSMIT_DELAY,
1771                                                &pid,
1772                                                cp->pending_messages_head->msize,
1773                                                &transmit_to_peer,
1774                                                cp);
1775   if (cp->cth == NULL)
1776     {
1777 #if DEBUG_FS
1778       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1779                   "Failed to schedule transmission with core!\n");
1780 #endif
1781       GNUNET_STATISTICS_update (stats,
1782                                 gettext_noop ("# CORE transmission failures"),
1783                                 1,
1784                                 GNUNET_NO);
1785     }
1786 }
1787
1788
1789 /**
1790  * Test if the load on this peer is too high
1791  * to even consider processing the query at
1792  * all.
1793  * 
1794  * @return GNUNET_YES if the load is too high to do anything, GNUNET_NO to forward (load high, but not too high), GNUNET_SYSERR to indirect (load low)
1795  */
1796 static int
1797 test_load_too_high ()
1798 {
1799   return GNUNET_SYSERR; // FIXME
1800 }
1801
1802
1803 /* ******************* Pending Request Refresh Task ******************** */
1804
1805
1806
1807 /**
1808  * We use a random delay to make the timing of requests less
1809  * predictable.  This function returns such a random delay.  We add a base
1810  * delay of MAX_CORK_DELAY (1s).
1811  *
1812  * FIXME: make schedule dependent on the specifics of the request?
1813  * Or bandwidth and number of connected peers and load?
1814  *
1815  * @return random delay to use for some request, between 1s and 1000+TTL_DECREMENT ms
1816  */
1817 static struct GNUNET_TIME_Relative
1818 get_processing_delay ()
1819 {
1820   return 
1821     GNUNET_TIME_relative_add (GNUNET_CONSTANTS_MAX_CORK_DELAY,
1822                               GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
1823                                                              GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1824                                                                                        TTL_DECREMENT)));
1825 }
1826
1827
1828 /**
1829  * We're processing a GET request from another peer and have decided
1830  * to forward it to other peers.  This function is called periodically
1831  * and should forward the request to other peers until we have all
1832  * possible replies.  If we have transmitted the *only* reply to
1833  * the initiator we should destroy the pending request.  If we have
1834  * many replies in the queue to the initiator, we should delay sending
1835  * out more queries until the reply queue has shrunk some.
1836  *
1837  * @param cls our "struct ProcessGetContext *"
1838  * @param tc unused
1839  */
1840 static void
1841 forward_request_task (void *cls,
1842                       const struct GNUNET_SCHEDULER_TaskContext *tc);
1843
1844
1845 /**
1846  * Function called after we either failed or succeeded
1847  * at transmitting a query to a peer.  
1848  *
1849  * @param cls the requests "struct PendingRequest*"
1850  * @param tpid ID of receiving peer, 0 on transmission error
1851  */
1852 static void
1853 transmit_query_continuation (void *cls,
1854                              GNUNET_PEER_Id tpid)
1855 {
1856   struct PendingRequest *pr = cls;
1857
1858   GNUNET_STATISTICS_update (stats,
1859                             gettext_noop ("# queries scheduled for forwarding"),
1860                             -1,
1861                             GNUNET_NO);
1862   if (tpid == 0)   
1863     {
1864 #if DEBUG_FS
1865       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1866                   "Transmission of request failed, will try again later.\n");
1867 #endif
1868       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
1869         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1870                                                  get_processing_delay (),
1871                                                  &forward_request_task,
1872                                                  pr); 
1873       return;    
1874     }
1875   GNUNET_STATISTICS_update (stats,
1876                             gettext_noop ("# queries forwarded"),
1877                             1,
1878                             GNUNET_NO);
1879   GNUNET_PEER_change_rc (tpid, 1);
1880   if (pr->used_pids_off == pr->used_pids_size)
1881     GNUNET_array_grow (pr->used_pids,
1882                        pr->used_pids_size,
1883                        pr->used_pids_size * 2 + 2);
1884   pr->used_pids[pr->used_pids_off++] = tpid;
1885   if (pr->task == GNUNET_SCHEDULER_NO_TASK)
1886     pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1887                                              get_processing_delay (),
1888                                              &forward_request_task,
1889                                              pr);
1890 }
1891
1892
1893 /**
1894  * How many bytes should a bloomfilter be if we have already seen
1895  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
1896  * of bits set per entry.  Furthermore, we should not re-size the
1897  * filter too often (to keep it cheap).
1898  *
1899  * Since other peers will also add entries but not resize the filter,
1900  * we should generally pick a slightly larger size than what the
1901  * strict math would suggest.
1902  *
1903  * @return must be a power of two and smaller or equal to 2^15.
1904  */
1905 static size_t
1906 compute_bloomfilter_size (unsigned int entry_count)
1907 {
1908   size_t size;
1909   unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
1910   uint16_t max = 1 << 15;
1911
1912   if (entry_count > max)
1913     return max;
1914   size = 8;
1915   while ((size < max) && (size < ideal))
1916     size *= 2;
1917   if (size > max)
1918     return max;
1919   return size;
1920 }
1921
1922
1923 /**
1924  * Recalculate our bloom filter for filtering replies.  This function
1925  * will create a new bloom filter from scratch, so it should only be
1926  * called if we have no bloomfilter at all (and hence can create a
1927  * fresh one of minimal size without problems) OR if our peer is the
1928  * initiator (in which case we may resize to larger than mimimum size).
1929  *
1930  * @param pr request for which the BF is to be recomputed
1931  */
1932 static void
1933 refresh_bloomfilter (struct PendingRequest *pr)
1934 {
1935   unsigned int i;
1936   size_t nsize;
1937   GNUNET_HashCode mhash;
1938
1939   nsize = compute_bloomfilter_size (pr->replies_seen_off);
1940   if (nsize == pr->bf_size)
1941     return; /* size not changed */
1942   if (pr->bf != NULL)
1943     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
1944   pr->bf_size = nsize;
1945   pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
1946   pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
1947                                               pr->bf_size,
1948                                               BLOOMFILTER_K);
1949   for (i=0;i<pr->replies_seen_off;i++)
1950     {
1951       GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i],
1952                                 pr->mingle,
1953                                 &mhash);
1954       GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
1955     }
1956 }
1957
1958
1959 /**
1960  * Function called after we've tried to reserve a certain amount of
1961  * bandwidth for a reply.  Check if we succeeded and if so send our
1962  * query.
1963  *
1964  * @param cls the requests "struct PendingRequest*"
1965  * @param peer identifies the peer
1966  * @param bpm_in set to the current bandwidth limit (receiving) for this peer
1967  * @param bpm_out set to the current bandwidth limit (sending) for this peer
1968  * @param amount set to the amount that was actually reserved or unreserved
1969  * @param preference current traffic preference for the given peer
1970  */
1971 static void
1972 target_reservation_cb (void *cls,
1973                        const struct
1974                        GNUNET_PeerIdentity * peer,
1975                        struct GNUNET_BANDWIDTH_Value32NBO bpm_in,
1976                        struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
1977                        int amount,
1978                        uint64_t preference)
1979 {
1980   struct PendingRequest *pr = cls;
1981   struct ConnectedPeer *cp;
1982   struct PendingMessage *pm;
1983   struct GetMessage *gm;
1984   GNUNET_HashCode *ext;
1985   char *bfdata;
1986   size_t msize;
1987   unsigned int k;
1988   int no_route;
1989   uint32_t bm;
1990
1991   pr->irc = NULL;
1992   if (peer == NULL)
1993     {
1994       /* error in communication with core, try again later */
1995       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
1996         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1997                                                  get_processing_delay (),
1998                                                  &forward_request_task,
1999                                                  pr);
2000       return;
2001     }
2002   // (3) transmit, update ttl/priority
2003   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2004                                           &peer->hashPubKey);
2005   if (cp == NULL)
2006     {
2007       /* Peer must have just left */
2008 #if DEBUG_FS
2009       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2010                   "Selected peer disconnected!\n");
2011 #endif
2012       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2013         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2014                                                  get_processing_delay (),
2015                                                  &forward_request_task,
2016                                                  pr);
2017       return;
2018     }
2019   no_route = GNUNET_NO;
2020   if (amount == 0)
2021     {
2022       if (pr->cp == NULL)
2023         {
2024 #if DEBUG_FS > 1
2025           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2026                       "Failed to reserve bandwidth for reply (got %d/%u bytes only)!\n",
2027                       amount,
2028                       DBLOCK_SIZE);
2029 #endif
2030           GNUNET_STATISTICS_update (stats,
2031                                     gettext_noop ("# reply bandwidth reservation requests failed"),
2032                                     1,
2033                                     GNUNET_NO);
2034           if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2035             pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2036                                                      get_processing_delay (),
2037                                                      &forward_request_task,
2038                                                      pr);
2039           return;  /* this target round failed */
2040         }
2041       /* FIXME: if we are "quite" busy, we may still want to skip
2042          this round; need more load detection code! */
2043       no_route = GNUNET_YES;
2044     }
2045   
2046   GNUNET_STATISTICS_update (stats,
2047                             gettext_noop ("# queries scheduled for forwarding"),
2048                             1,
2049                             GNUNET_NO);
2050   /* build message and insert message into priority queue */
2051 #if DEBUG_FS
2052   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2053               "Forwarding request `%s' to `%4s'!\n",
2054               GNUNET_h2s (&pr->query),
2055               GNUNET_i2s (peer));
2056 #endif
2057   k = 0;
2058   bm = 0;
2059   if (GNUNET_YES == no_route)
2060     {
2061       bm |= GET_MESSAGE_BIT_RETURN_TO;
2062       k++;      
2063     }
2064   if (pr->namespace != NULL)
2065     {
2066       bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
2067       k++;
2068     }
2069   if (pr->target_pid != 0)
2070     {
2071       bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
2072       k++;
2073     }
2074   msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
2075   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
2076   pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
2077   pm->msize = msize;
2078   gm = (struct GetMessage*) &pm[1];
2079   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
2080   gm->header.size = htons (msize);
2081   gm->type = htonl (pr->type);
2082   pr->remaining_priority /= 2;
2083   gm->priority = htonl (pr->remaining_priority);
2084   gm->ttl = htonl (pr->ttl);
2085   gm->filter_mutator = htonl(pr->mingle); 
2086   gm->hash_bitmap = htonl (bm);
2087   gm->query = pr->query;
2088   ext = (GNUNET_HashCode*) &gm[1];
2089   k = 0;
2090   if (GNUNET_YES == no_route)
2091     GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2092   if (pr->namespace != NULL)
2093     memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
2094   if (pr->target_pid != 0)
2095     GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
2096   bfdata = (char *) &ext[k];
2097   if (pr->bf != NULL)
2098     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
2099                                                bfdata,
2100                                                pr->bf_size);
2101   pm->cont = &transmit_query_continuation;
2102   pm->cont_cls = pr;
2103   add_to_pending_messages_for_peer (cp, pm, pr);
2104 }
2105
2106
2107 /**
2108  * Closure used for "target_peer_select_cb".
2109  */
2110 struct PeerSelectionContext 
2111 {
2112   /**
2113    * The request for which we are selecting
2114    * peers.
2115    */
2116   struct PendingRequest *pr;
2117
2118   /**
2119    * Current "prime" target.
2120    */
2121   struct GNUNET_PeerIdentity target;
2122
2123   /**
2124    * How much do we like this target?
2125    */
2126   double target_score;
2127
2128 };
2129
2130
2131 /**
2132  * Function called for each connected peer to determine
2133  * which one(s) would make good targets for forwarding.
2134  *
2135  * @param cls closure (struct PeerSelectionContext)
2136  * @param key current key code (peer identity)
2137  * @param value value in the hash map (struct ConnectedPeer)
2138  * @return GNUNET_YES if we should continue to
2139  *         iterate,
2140  *         GNUNET_NO if not.
2141  */
2142 static int
2143 target_peer_select_cb (void *cls,
2144                        const GNUNET_HashCode * key,
2145                        void *value)
2146 {
2147   struct PeerSelectionContext *psc = cls;
2148   struct ConnectedPeer *cp = value;
2149   struct PendingRequest *pr = psc->pr;
2150   double score;
2151   unsigned int i;
2152   unsigned int pc;
2153
2154   /* 1) check that this peer is not the initiator */
2155   if (cp == pr->cp)
2156     {
2157 #if DEBUG_FS
2158       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2159                   "Skipping initiator in forwarding selection\n");
2160 #endif
2161       return GNUNET_YES; /* skip */        
2162     }
2163
2164   /* 2) check if we have already (recently) forwarded to this peer */
2165   pc = 0;
2166   for (i=0;i<pr->used_pids_off;i++)
2167     if (pr->used_pids[i] == cp->pid) 
2168       {
2169         pc++;
2170         if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2171                                            RETRY_PROBABILITY_INV))
2172           {
2173 #if DEBUG_FS
2174             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2175                         "NOT re-trying query that was previously transmitted %u times\n",
2176                         (unsigned int) pr->used_pids_off);
2177 #endif
2178             return GNUNET_YES; /* skip */
2179           }
2180       }
2181 #if DEBUG_FS
2182   if (0 < pc)
2183     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2184                 "Re-trying query that was previously transmitted %u times to this peer\n",
2185                 (unsigned int) pc);
2186 #endif
2187   /* 3) calculate how much we'd like to forward to this peer,
2188      starting with a random value that is strong enough
2189      to at least give any peer a chance sometimes 
2190      (compared to the other factors that come later) */
2191   /* 3a) count successful (recent) routes from cp for same source */
2192   if (pr->cp != NULL)
2193     {
2194       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2195                                         P2P_SUCCESS_LIST_SIZE);
2196       for (i=0;i<P2P_SUCCESS_LIST_SIZE;i++)
2197         if (cp->last_p2p_replies[i] == pr->cp->pid)
2198           score += 1; /* likely successful based on hot path */
2199     }
2200   else
2201     {
2202       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2203                                         CS2P_SUCCESS_LIST_SIZE);
2204       for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
2205         if (cp->last_client_replies[i] == pr->client_request_list->client_list->client)
2206           score += 1; /* likely successful based on hot path */
2207     }
2208   /* 3b) include latency */
2209   if (cp->avg_delay.value < 4 * TTL_DECREMENT)
2210     score += 1; /* likely fast based on latency */
2211   /* 3c) include priorities */
2212   if (cp->avg_priority <= pr->remaining_priority / 2.0)
2213     score += 1; /* likely successful based on priorities */
2214   /* 3d) penalize for queue size */  
2215   score -= (2.0 * cp->pending_requests / (double) MAX_QUEUE_PER_PEER); 
2216   /* 3e) include peer proximity */
2217   score -= (2.0 * (GNUNET_CRYPTO_hash_distance_u32 (key,
2218                                                     &pr->query)) / (double) UINT32_MAX);
2219   /* 4) super-bonus for being the known target */
2220   if (pr->target_pid == cp->pid)
2221     score += 100.0;
2222   /* store best-fit in closure */
2223 #if DEBUG_FS
2224   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2225               "Peer `%s' gets score %f for forwarding query, max is %f\n",
2226               GNUNET_h2s (key),
2227               score,
2228               psc->target_score);
2229 #endif  
2230   score++; /* avoid zero */
2231   if (score > psc->target_score)
2232     {
2233       psc->target_score = score;
2234       psc->target.hashPubKey = *key; 
2235     }
2236   return GNUNET_YES;
2237 }
2238   
2239
2240 /**
2241  * The priority level imposes a bound on the maximum
2242  * value for the ttl that can be requested.
2243  *
2244  * @param ttl_in requested ttl
2245  * @param prio given priority
2246  * @return ttl_in if ttl_in is below the limit,
2247  *         otherwise the ttl-limit for the given priority
2248  */
2249 static int32_t
2250 bound_ttl (int32_t ttl_in, uint32_t prio)
2251 {
2252   unsigned long long allowed;
2253
2254   if (ttl_in <= 0)
2255     return ttl_in;
2256   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
2257   if (ttl_in > allowed)      
2258     {
2259       if (allowed >= (1 << 30))
2260         return 1 << 30;
2261       return allowed;
2262     }
2263   return ttl_in;
2264 }
2265
2266
2267 /**
2268  * We're processing a GET request and have decided
2269  * to forward it to other peers.  This function is called periodically
2270  * and should forward the request to other peers until we have all
2271  * possible replies.  If we have transmitted the *only* reply to
2272  * the initiator we should destroy the pending request.  If we have
2273  * many replies in the queue to the initiator, we should delay sending
2274  * out more queries until the reply queue has shrunk some.
2275  *
2276  * @param cls our "struct ProcessGetContext *"
2277  * @param tc unused
2278  */
2279 static void
2280 forward_request_task (void *cls,
2281                      const struct GNUNET_SCHEDULER_TaskContext *tc)
2282 {
2283   struct PendingRequest *pr = cls;
2284   struct PeerSelectionContext psc;
2285   struct ConnectedPeer *cp; 
2286   struct GNUNET_TIME_Relative delay;
2287
2288   pr->task = GNUNET_SCHEDULER_NO_TASK;
2289   if (pr->irc != NULL)
2290     {
2291 #if DEBUG_FS
2292       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2293                   "Forwarding of query `%s' not attempted due to pending local lookup!\n",
2294                   GNUNET_h2s (&pr->query));
2295 #endif
2296       return; /* already pending */
2297     }
2298   if (GNUNET_YES == pr->local_only)
2299     return; /* configured to not do P2P search */
2300   /* (1) select target */
2301   psc.pr = pr;
2302   psc.target_score = -DBL_MAX;
2303   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
2304                                          &target_peer_select_cb,
2305                                          &psc);  
2306   if (psc.target_score == -DBL_MAX)
2307     {
2308       delay = get_processing_delay ();
2309 #if DEBUG_FS 
2310       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2311                   "No peer selected for forwarding of query `%s', will try again in %llu ms!\n",
2312                   GNUNET_h2s (&pr->query),
2313                   delay.value);
2314 #endif
2315       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2316                                                delay,
2317                                                &forward_request_task,
2318                                                pr);
2319       return; /* nobody selected */
2320     }
2321   /* (3) update TTL/priority */
2322   if (pr->client_request_list != NULL)
2323     {
2324       /* FIXME: use better algorithm!? */
2325       if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2326                                          4))
2327         pr->priority++;
2328       /* bound priority we use by priorities we see from other peers
2329          rounded up (must round up so that we can see non-zero
2330          priorities, but round up as little as possible to make it
2331          plausible that we forwarded another peers request) */
2332       if (pr->priority > current_priorities + 1.0)
2333         pr->priority = (uint32_t) current_priorities + 1.0;
2334       pr->ttl = bound_ttl (pr->ttl + TTL_DECREMENT * 2,
2335                            pr->priority);
2336 #if DEBUG_FS
2337       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2338                   "Trying query `%s' with priority %u and TTL %d.\n",
2339                   GNUNET_h2s (&pr->query),
2340                   pr->priority,
2341                   pr->ttl);
2342 #endif
2343     }
2344
2345   /* (3) reserve reply bandwidth */
2346   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2347                                           &psc.target.hashPubKey);
2348   GNUNET_assert (NULL != cp);
2349   pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
2350                                                 &psc.target,
2351                                                 GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
2352                                                 GNUNET_BANDWIDTH_value_init (UINT32_MAX),
2353                                                 DBLOCK_SIZE * 2, 
2354                                                 cp->inc_preference,
2355                                                 &target_reservation_cb,
2356                                                 pr);
2357   cp->inc_preference = 0;
2358 }
2359
2360
2361 /* **************************** P2P PUT Handling ************************ */
2362
2363
2364 /**
2365  * Function called after we either failed or succeeded
2366  * at transmitting a reply to a peer.  
2367  *
2368  * @param cls the requests "struct PendingRequest*"
2369  * @param tpid ID of receiving peer, 0 on transmission error
2370  */
2371 static void
2372 transmit_reply_continuation (void *cls,
2373                              GNUNET_PEER_Id tpid)
2374 {
2375   struct PendingRequest *pr = cls;
2376   
2377   switch (pr->type)
2378     {
2379     case GNUNET_BLOCK_TYPE_DBLOCK:
2380     case GNUNET_BLOCK_TYPE_IBLOCK:
2381       /* only one reply expected, done with the request! */
2382       destroy_pending_request (pr);
2383       break;
2384     case GNUNET_BLOCK_TYPE_ANY:
2385     case GNUNET_BLOCK_TYPE_KBLOCK:
2386     case GNUNET_BLOCK_TYPE_SBLOCK:
2387       break;
2388     default:
2389       GNUNET_break (0);
2390       break;
2391     }
2392 }
2393
2394
2395 /**
2396  * Transmit the given message by copying it to the target buffer
2397  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
2398  * for writing in the meantime.  In that case, do nothing
2399  * (the disconnect or shutdown handler will take care of the rest).
2400  * If we were able to transmit messages and there are still more
2401  * pending, ask core again for further calls to this function.
2402  *
2403  * @param cls closure, pointer to the 'struct ClientList*'
2404  * @param size number of bytes available in buf
2405  * @param buf where the callee should write the message
2406  * @return number of bytes written to buf
2407  */
2408 static size_t
2409 transmit_to_client (void *cls,
2410                   size_t size, void *buf)
2411 {
2412   struct ClientList *cl = cls;
2413   char *cbuf = buf;
2414   struct ClientResponseMessage *creply;
2415   size_t msize;
2416   
2417   cl->th = NULL;
2418   if (NULL == buf)
2419     {
2420 #if DEBUG_FS
2421       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2422                   "Not sending reply, client communication problem.\n");
2423 #endif
2424       return 0;
2425     }
2426   msize = 0;
2427   while ( (NULL != (creply = cl->res_head) ) &&
2428           (creply->msize <= size) )
2429     {
2430       memcpy (&cbuf[msize], &creply[1], creply->msize);
2431       msize += creply->msize;
2432       size -= creply->msize;
2433       GNUNET_CONTAINER_DLL_remove (cl->res_head,
2434                                    cl->res_tail,
2435                                    creply);
2436       GNUNET_free (creply);
2437     }
2438   if (NULL != creply)
2439     cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
2440                                                   creply->msize,
2441                                                   GNUNET_TIME_UNIT_FOREVER_REL,
2442                                                   &transmit_to_client,
2443                                                   cl);
2444 #if DEBUG_FS
2445   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2446               "Transmitted %u bytes to client\n",
2447               (unsigned int) msize);
2448 #endif
2449   return msize;
2450 }
2451
2452
2453 /**
2454  * Closure for "process_reply" function.
2455  */
2456 struct ProcessReplyClosure
2457 {
2458   /**
2459    * The data for the reply.
2460    */
2461   const void *data;
2462
2463   /**
2464    * Who gave us this reply? NULL for local host.
2465    */
2466   struct ConnectedPeer *sender;
2467
2468   /**
2469    * When the reply expires.
2470    */
2471   struct GNUNET_TIME_Absolute expiration;
2472
2473   /**
2474    * Size of data.
2475    */
2476   size_t size;
2477
2478   /**
2479    * Namespace that this reply belongs to
2480    * (if it is of type SBLOCK).
2481    */
2482   GNUNET_HashCode namespace;
2483
2484   /**
2485    * Type of the block.
2486    */
2487   enum GNUNET_BLOCK_Type type;
2488
2489   /**
2490    * How much was this reply worth to us?
2491    */
2492   uint32_t priority;
2493
2494   /**
2495    * Did we finish processing the associated request?
2496    */ 
2497   int finished;
2498 };
2499
2500
2501 /**
2502  * We have received a reply; handle it!
2503  *
2504  * @param cls response (struct ProcessReplyClosure)
2505  * @param key our query
2506  * @param value value in the hash map (info about the query)
2507  * @return GNUNET_YES (we should continue to iterate)
2508  */
2509 static int
2510 process_reply (void *cls,
2511                const GNUNET_HashCode * key,
2512                void *value)
2513 {
2514   struct ProcessReplyClosure *prq = cls;
2515   struct PendingRequest *pr = value;
2516   struct PendingMessage *reply;
2517   struct ClientResponseMessage *creply;
2518   struct ClientList *cl;
2519   struct PutMessage *pm;
2520   struct ConnectedPeer *cp;
2521   struct GNUNET_TIME_Relative cur_delay;
2522   enum GNUNET_BLOCK_EvaluationResult eval;
2523   size_t msize;
2524
2525 #if DEBUG_FS
2526   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2527               "Matched result (type %u) for query `%s' with pending request\n",
2528               (unsigned int) prq->type,
2529               GNUNET_h2s (key));
2530 #endif  
2531   GNUNET_STATISTICS_update (stats,
2532                             gettext_noop ("# replies received and matched"),
2533                             1,
2534                             GNUNET_NO);
2535   if (prq->sender != NULL)
2536     {
2537       /* FIXME: should we be more precise here and not use
2538          "start_time" but a peer-specific time stamp? */
2539       cur_delay = GNUNET_TIME_absolute_get_duration (pr->start_time);
2540       prq->sender->avg_delay.value
2541         = (prq->sender->avg_delay.value * 
2542            (RUNAVG_DELAY_N - 1) + cur_delay.value) / RUNAVG_DELAY_N; 
2543       prq->sender->avg_priority
2544         = (prq->sender->avg_priority * 
2545            (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
2546       if (pr->cp != NULL)
2547         {
2548           GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
2549                                  [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], 
2550                                  -1);
2551           GNUNET_PEER_change_rc (pr->cp->pid, 1);
2552           prq->sender->last_p2p_replies
2553             [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
2554             = pr->cp->pid;
2555         }
2556       else
2557         {
2558           if (NULL != prq->sender->last_client_replies
2559               [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
2560             GNUNET_SERVER_client_drop (prq->sender->last_client_replies
2561                                        [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
2562           prq->sender->last_client_replies
2563             [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
2564             = pr->client_request_list->client_list->client;
2565           GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
2566         }
2567     }
2568   eval = GNUNET_BLOCK_evaluate (block_ctx,
2569                                 prq->type,
2570                                 key,
2571                                 &pr->bf,
2572                                 pr->mingle,
2573                                 pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
2574                                 prq->data,
2575                                 prq->size);
2576   switch (eval)
2577     {
2578     case GNUNET_BLOCK_EVALUATION_OK_MORE:
2579       break;
2580     case GNUNET_BLOCK_EVALUATION_OK_LAST:
2581       while (NULL != pr->pending_head)
2582         destroy_pending_message_list_entry (pr->pending_head);
2583       if (pr->qe != NULL)
2584         {
2585           if (pr->client_request_list != NULL)
2586             GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
2587                                         GNUNET_YES);
2588           GNUNET_DATASTORE_cancel (pr->qe);
2589           pr->qe = NULL;
2590         }
2591       pr->do_remove = GNUNET_YES;
2592       if (pr->task != GNUNET_SCHEDULER_NO_TASK)
2593         {
2594           GNUNET_SCHEDULER_cancel (sched,
2595                                    pr->task);
2596           pr->task = GNUNET_SCHEDULER_NO_TASK;
2597         }
2598       GNUNET_break (GNUNET_YES ==
2599                     GNUNET_CONTAINER_multihashmap_remove (query_request_map,
2600                                                           key,
2601                                                           pr));
2602       break;
2603     case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
2604       GNUNET_STATISTICS_update (stats,
2605                                 gettext_noop ("# duplicate replies discarded (bloomfilter)"),
2606                                 1,
2607                                 GNUNET_NO);
2608 #if DEBUG_FS
2609       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2610                   "Duplicate response `%s', discarding.\n",
2611                   GNUNET_h2s (&mhash));
2612 #endif
2613       return GNUNET_YES; /* duplicate */
2614     case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
2615       return GNUNET_YES; /* wrong namespace */  
2616     case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
2617       GNUNET_break (0);
2618       return GNUNET_YES;
2619     case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
2620       GNUNET_break (0);
2621       return GNUNET_YES;
2622     case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
2623       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2624                   _("Unsupported block type %u\n"),
2625                   prq->type);
2626       return GNUNET_NO;
2627     }
2628   if (pr->client_request_list != NULL)
2629     {
2630       if (pr->replies_seen_size == pr->replies_seen_off)
2631         GNUNET_array_grow (pr->replies_seen,
2632                            pr->replies_seen_size,
2633                            pr->replies_seen_size * 2 + 4);      
2634       GNUNET_CRYPTO_hash (prq->data,
2635                           prq->size,
2636                           &pr->replies_seen[pr->replies_seen_off++]);         
2637       refresh_bloomfilter (pr);
2638     }
2639   prq->priority += pr->remaining_priority;
2640   pr->remaining_priority = 0;
2641   if (NULL != pr->client_request_list)
2642     {
2643       GNUNET_STATISTICS_update (stats,
2644                                 gettext_noop ("# replies received for local clients"),
2645                                 1,
2646                                 GNUNET_NO);
2647       cl = pr->client_request_list->client_list;
2648       msize = sizeof (struct PutMessage) + prq->size;
2649       creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage));
2650       creply->msize = msize;
2651       creply->client_list = cl;
2652       GNUNET_CONTAINER_DLL_insert_after (cl->res_head,
2653                                          cl->res_tail,
2654                                          cl->res_tail,
2655                                          creply);      
2656       pm = (struct PutMessage*) &creply[1];
2657       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2658       pm->header.size = htons (msize);
2659       pm->type = htonl (prq->type);
2660       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
2661       memcpy (&pm[1], prq->data, prq->size);      
2662       if (NULL == cl->th)
2663         {
2664 #if DEBUG_FS
2665           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2666                       "Transmitting result for query `%s' to client\n",
2667                       GNUNET_h2s (key));
2668 #endif  
2669           cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
2670                                                         msize,
2671                                                         GNUNET_TIME_UNIT_FOREVER_REL,
2672                                                         &transmit_to_client,
2673                                                         cl);
2674         }
2675       GNUNET_break (cl->th != NULL);
2676       if (pr->do_remove)                
2677         {
2678           prq->finished = GNUNET_YES;
2679           destroy_pending_request (pr);         
2680         }
2681     }
2682   else
2683     {
2684       cp = pr->cp;
2685 #if DEBUG_FS
2686       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2687                   "Transmitting result for query `%s' to other peer (PID=%u)\n",
2688                   GNUNET_h2s (key),
2689                   (unsigned int) cp->pid);
2690 #endif  
2691       GNUNET_STATISTICS_update (stats,
2692                                 gettext_noop ("# replies received for other peers"),
2693                                 1,
2694                                 GNUNET_NO);
2695       msize = sizeof (struct PutMessage) + prq->size;
2696       reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
2697       reply->cont = &transmit_reply_continuation;
2698       reply->cont_cls = pr;
2699       reply->msize = msize;
2700       reply->priority = UINT32_MAX; /* send replies first! */
2701       pm = (struct PutMessage*) &reply[1];
2702       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2703       pm->header.size = htons (msize);
2704       pm->type = htonl (prq->type);
2705       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
2706       memcpy (&pm[1], prq->data, prq->size);
2707       add_to_pending_messages_for_peer (cp, reply, pr);
2708     }
2709   return GNUNET_YES;
2710 }
2711
2712
2713 /**
2714  * Continuation called to notify client about result of the
2715  * operation.
2716  *
2717  * @param cls closure
2718  * @param success GNUNET_SYSERR on failure
2719  * @param msg NULL on success, otherwise an error message
2720  */
2721 static void 
2722 put_migration_continuation (void *cls,
2723                             int success,
2724                             const char *msg)
2725 {
2726   /* FIXME */
2727 }
2728
2729
2730 /**
2731  * Handle P2P "PUT" message.
2732  *
2733  * @param cls closure, always NULL
2734  * @param other the other peer involved (sender or receiver, NULL
2735  *        for loopback messages where we are both sender and receiver)
2736  * @param message the actual message
2737  * @param latency reported latency of the connection with 'other'
2738  * @param distance reported distance (DV) to 'other' 
2739  * @return GNUNET_OK to keep the connection open,
2740  *         GNUNET_SYSERR to close it (signal serious error)
2741  */
2742 static int
2743 handle_p2p_put (void *cls,
2744                 const struct GNUNET_PeerIdentity *other,
2745                 const struct GNUNET_MessageHeader *message,
2746                 struct GNUNET_TIME_Relative latency,
2747                 uint32_t distance)
2748 {
2749   const struct PutMessage *put;
2750   uint16_t msize;
2751   size_t dsize;
2752   enum GNUNET_BLOCK_Type type;
2753   struct GNUNET_TIME_Absolute expiration;
2754   GNUNET_HashCode query;
2755   struct ProcessReplyClosure prq;
2756   const struct SBlock *sb;
2757
2758   msize = ntohs (message->size);
2759   if (msize < sizeof (struct PutMessage))
2760     {
2761       GNUNET_break_op(0);
2762       return GNUNET_SYSERR;
2763     }
2764   put = (const struct PutMessage*) message;
2765   dsize = msize - sizeof (struct PutMessage);
2766   type = ntohl (put->type);
2767   expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
2768
2769   if (type == GNUNET_BLOCK_TYPE_ONDEMAND)
2770     return GNUNET_SYSERR;
2771   if (GNUNET_OK !=
2772       GNUNET_BLOCK_get_key (block_ctx,
2773                             type,
2774                             &put[1],
2775                             dsize,
2776                             &query))
2777     {
2778       GNUNET_break_op (0);
2779       return GNUNET_SYSERR;
2780     }
2781   if (GNUNET_BLOCK_TYPE_SBLOCK == type)
2782     { 
2783       sb = (const struct SBlock*) &put[1];
2784       GNUNET_CRYPTO_hash (&sb->subspace,
2785                           sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
2786                           &prq.namespace);
2787     }
2788
2789 #if DEBUG_FS
2790   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2791               "Received result for query `%s' from peer `%4s'\n",
2792               GNUNET_h2s (&query),
2793               GNUNET_i2s (other));
2794 #endif
2795   GNUNET_STATISTICS_update (stats,
2796                             gettext_noop ("# replies received (overall)"),
2797                             1,
2798                             GNUNET_NO);
2799   /* now, lookup 'query' */
2800   prq.data = (const void*) &put[1];
2801   if (other != NULL)
2802     prq.sender = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2803                                                     &other->hashPubKey);
2804   else
2805     prq.sender = NULL;
2806   prq.size = dsize;
2807   prq.type = type;
2808   prq.expiration = expiration;
2809   prq.priority = 0;
2810   prq.finished = GNUNET_NO;
2811   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
2812                                               &query,
2813                                               &process_reply,
2814                                               &prq);
2815   if (prq.sender != NULL)
2816     {
2817       prq.sender->inc_preference += CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority;
2818       prq.sender->trust += prq.priority;
2819     }
2820   if (GNUNET_YES == active_migration)
2821     {
2822 #if DEBUG_FS
2823       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2824                   "Replicating result for query `%s' with priority %u\n",
2825                   GNUNET_h2s (&query),
2826                   prq.priority);
2827 #endif
2828       GNUNET_DATASTORE_put (dsh,
2829                             0, &query, dsize, &put[1],
2830                             type, prq.priority, 1 /* anonymity */, 
2831                             expiration, 
2832                             1 + prq.priority, MAX_DATASTORE_QUEUE,
2833                             GNUNET_CONSTANTS_SERVICE_TIMEOUT,
2834                             &put_migration_continuation, 
2835                             NULL);
2836     }
2837   return GNUNET_OK;
2838 }
2839
2840
2841 /* **************************** P2P GET Handling ************************ */
2842
2843
2844 /**
2845  * Closure for 'check_duplicate_request_{peer,client}'.
2846  */
2847 struct CheckDuplicateRequestClosure
2848 {
2849   /**
2850    * The new request we should check if it already exists.
2851    */
2852   const struct PendingRequest *pr;
2853
2854   /**
2855    * Existing request found by the checker, NULL if none.
2856    */
2857   struct PendingRequest *have;
2858 };
2859
2860
2861 /**
2862  * Iterator over entries in the 'query_request_map' that
2863  * tries to see if we have the same request pending from
2864  * the same client already.
2865  *
2866  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
2867  * @param key current key code (query, ignored, must match)
2868  * @param value value in the hash map (a 'struct PendingRequest' 
2869  *              that already exists)
2870  * @return GNUNET_YES if we should continue to
2871  *         iterate (no match yet)
2872  *         GNUNET_NO if not (match found).
2873  */
2874 static int
2875 check_duplicate_request_client (void *cls,
2876                                 const GNUNET_HashCode * key,
2877                                 void *value)
2878 {
2879   struct CheckDuplicateRequestClosure *cdc = cls;
2880   struct PendingRequest *have = value;
2881
2882   if (have->client_request_list == NULL)
2883     return GNUNET_YES;
2884   if ( (cdc->pr->client_request_list->client_list->client == have->client_request_list->client_list->client) &&
2885        (cdc->pr != have) )
2886     {
2887       cdc->have = have;
2888       return GNUNET_NO;
2889     }
2890   return GNUNET_YES;
2891 }
2892
2893
2894 /**
2895  * We're processing (local) results for a search request
2896  * from another peer.  Pass applicable results to the
2897  * peer and if we are done either clean up (operation
2898  * complete) or forward to other peers (more results possible).
2899  *
2900  * @param cls our closure (struct LocalGetContext)
2901  * @param key key for the content
2902  * @param size number of bytes in data
2903  * @param data content stored
2904  * @param type type of the content
2905  * @param priority priority of the content
2906  * @param anonymity anonymity-level for the content
2907  * @param expiration expiration time for the content
2908  * @param uid unique identifier for the datum;
2909  *        maybe 0 if no unique identifier is available
2910  */
2911 static void
2912 process_local_reply (void *cls,
2913                      const GNUNET_HashCode * key,
2914                      uint32_t size,
2915                      const void *data,
2916                      enum GNUNET_BLOCK_Type type,
2917                      uint32_t priority,
2918                      uint32_t anonymity,
2919                      struct GNUNET_TIME_Absolute
2920                      expiration, 
2921                      uint64_t uid)
2922 {
2923   struct PendingRequest *pr = cls;
2924   struct ProcessReplyClosure prq;
2925   struct CheckDuplicateRequestClosure cdrc;
2926   const struct SBlock *sb;
2927   GNUNET_HashCode dhash;
2928   GNUNET_HashCode mhash;
2929   GNUNET_HashCode query;
2930   
2931   if (NULL == key)
2932     {
2933 #if DEBUG_FS > 1
2934       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2935                   "Done processing local replies, forwarding request to other peers.\n");
2936 #endif
2937       pr->qe = NULL;
2938       if (pr->client_request_list != NULL)
2939         {
2940           GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
2941                                       GNUNET_YES);
2942           /* Figure out if this is a duplicate request and possibly
2943              merge 'struct PendingRequest' entries */
2944           cdrc.have = NULL;
2945           cdrc.pr = pr;
2946           GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
2947                                                       &pr->query,
2948                                                       &check_duplicate_request_client,
2949                                                       &cdrc);
2950           if (cdrc.have != NULL)
2951             {
2952 #if DEBUG_FS
2953               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2954                           "Received request for block `%s' twice from client, will only request once.\n",
2955                           GNUNET_h2s (&pr->query));
2956 #endif
2957               
2958               destroy_pending_request (pr);
2959               return;
2960             }
2961         }
2962
2963       /* no more results */
2964       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2965         pr->task = GNUNET_SCHEDULER_add_now (sched,
2966                                              &forward_request_task,
2967                                              pr);      
2968       return;
2969     }
2970 #if DEBUG_FS
2971   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2972               "New local response to `%s' of type %u.\n",
2973               GNUNET_h2s (key),
2974               type);
2975 #endif
2976   if (type == GNUNET_BLOCK_TYPE_ONDEMAND)
2977     {
2978 #if DEBUG_FS
2979       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2980                   "Found ONDEMAND block, performing on-demand encoding\n");
2981 #endif
2982       GNUNET_STATISTICS_update (stats,
2983                                 gettext_noop ("# on-demand blocks matched requests"),
2984                                 1,
2985                                 GNUNET_NO);
2986       if (GNUNET_OK != 
2987           GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, 
2988                                             anonymity, expiration, uid, 
2989                                             &process_local_reply,
2990                                             pr))
2991       if (pr->qe != NULL)
2992         GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2993       return;
2994     }
2995   /* check for duplicates */
2996   GNUNET_CRYPTO_hash (data, size, &dhash);
2997   GNUNET_BLOCK_mingle_hash (&dhash, 
2998                             pr->mingle,
2999                             &mhash);
3000   if ( (pr->bf != NULL) &&
3001        (GNUNET_YES ==
3002         GNUNET_CONTAINER_bloomfilter_test (pr->bf,
3003                                            &mhash)) )
3004     {      
3005 #if DEBUG_FS
3006       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3007                   "Result from datastore filtered by bloomfilter (duplicate).\n");
3008 #endif
3009       GNUNET_STATISTICS_update (stats,
3010                                 gettext_noop ("# results filtered by query bloomfilter"),
3011                                 1,
3012                                 GNUNET_NO);
3013       if (pr->qe != NULL)
3014         GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3015       return;
3016     }
3017 #if DEBUG_FS
3018   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3019               "Found result for query `%s' in local datastore\n",
3020               GNUNET_h2s (key));
3021 #endif
3022   GNUNET_STATISTICS_update (stats,
3023                             gettext_noop ("# results found locally"),
3024                             1,
3025                             GNUNET_NO);
3026   pr->results_found++;
3027   memset (&prq, 0, sizeof (prq));
3028   prq.data = data;
3029   prq.expiration = expiration;
3030   prq.size = size;  
3031   if (GNUNET_BLOCK_TYPE_SBLOCK == type)
3032     { 
3033       sb = (const struct SBlock*) data;
3034       GNUNET_CRYPTO_hash (&sb->subspace,
3035                           sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
3036                           &prq.namespace);
3037     }
3038   if (GNUNET_OK != 
3039       GNUNET_BLOCK_get_key (block_ctx,
3040                             type,
3041                             data,
3042                             size,
3043                             &query))
3044     {
3045       GNUNET_break (0);
3046       GNUNET_DATASTORE_remove (dsh,
3047                                key,
3048                                size, data,
3049                                -1, -1, 
3050                                GNUNET_TIME_UNIT_FOREVER_REL,
3051                                NULL, NULL);
3052       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3053       return;
3054     }
3055   prq.type = type;
3056   prq.priority = priority;  
3057   prq.finished = GNUNET_NO;
3058   process_reply (&prq, key, pr);
3059   if (prq.finished == GNUNET_YES)
3060     return;
3061   if (pr->qe == NULL)
3062     return; /* done here */
3063   if ( (type == GNUNET_BLOCK_TYPE_DBLOCK) ||
3064        (type == GNUNET_BLOCK_TYPE_IBLOCK) ) 
3065     {
3066       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3067       return;
3068     }
3069   if ( (pr->client_request_list == NULL) &&
3070        ( (GNUNET_YES == test_load_too_high()) ||
3071          (pr->results_found > 5 + 2 * pr->priority) ) )
3072     {
3073 #if DEBUG_FS > 2
3074       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3075                   "Load too high, done with request\n");
3076 #endif
3077       GNUNET_STATISTICS_update (stats,
3078                                 gettext_noop ("# processing result set cut short due to load"),
3079                                 1,
3080                                 GNUNET_NO);
3081       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
3082       return;
3083     }
3084   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
3085 }
3086
3087
3088 /**
3089  * We've received a request with the specified priority.  Bound it
3090  * according to how much we trust the given peer.
3091  * 
3092  * @param prio_in requested priority
3093  * @param cp the peer making the request
3094  * @return effective priority
3095  */
3096 static uint32_t
3097 bound_priority (uint32_t prio_in,
3098                 struct ConnectedPeer *cp)
3099 {
3100 #define N ((double)128.0)
3101   uint32_t ret;
3102   double rret;
3103   int ld;
3104
3105   ld = test_load_too_high ();
3106   if (ld == GNUNET_SYSERR)
3107     return 0; /* excess resources */
3108   ret = change_host_trust (cp, prio_in);
3109   if (ret > 0)
3110     {
3111       if (ret > current_priorities + N)
3112         rret = current_priorities + N;
3113       else
3114         rret = ret;
3115       current_priorities 
3116         = (current_priorities * (N-1) + rret)/N;
3117     }
3118 #undef N
3119   return ret;
3120 }
3121
3122
3123 /**
3124  * Iterator over entries in the 'query_request_map' that
3125  * tries to see if we have the same request pending from
3126  * the same peer already.
3127  *
3128  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
3129  * @param key current key code (query, ignored, must match)
3130  * @param value value in the hash map (a 'struct PendingRequest' 
3131  *              that already exists)
3132  * @return GNUNET_YES if we should continue to
3133  *         iterate (no match yet)
3134  *         GNUNET_NO if not (match found).
3135  */
3136 static int
3137 check_duplicate_request_peer (void *cls,
3138                               const GNUNET_HashCode * key,
3139                               void *value)
3140 {
3141   struct CheckDuplicateRequestClosure *cdc = cls;
3142   struct PendingRequest *have = value;
3143
3144   if (cdc->pr->target_pid == have->target_pid)
3145     {
3146       cdc->have = have;
3147       return GNUNET_NO;
3148     }
3149   return GNUNET_YES;
3150 }
3151
3152
3153 /**
3154  * Handle P2P "GET" request.
3155  *
3156  * @param cls closure, always NULL
3157  * @param other the other peer involved (sender or receiver, NULL
3158  *        for loopback messages where we are both sender and receiver)
3159  * @param message the actual message
3160  * @param latency reported latency of the connection with 'other'
3161  * @param distance reported distance (DV) to 'other' 
3162  * @return GNUNET_OK to keep the connection open,
3163  *         GNUNET_SYSERR to close it (signal serious error)
3164  */
3165 static int
3166 handle_p2p_get (void *cls,
3167                 const struct GNUNET_PeerIdentity *other,
3168                 const struct GNUNET_MessageHeader *message,
3169                 struct GNUNET_TIME_Relative latency,
3170                 uint32_t distance)
3171 {
3172   struct PendingRequest *pr;
3173   struct ConnectedPeer *cp;
3174   struct ConnectedPeer *cps;
3175   struct CheckDuplicateRequestClosure cdc;
3176   struct GNUNET_TIME_Relative timeout;
3177   uint16_t msize;
3178   const struct GetMessage *gm;
3179   unsigned int bits;
3180   const GNUNET_HashCode *opt;
3181   uint32_t bm;
3182   size_t bfsize;
3183   uint32_t ttl_decrement;
3184   enum GNUNET_BLOCK_Type type;
3185   int have_ns;
3186   int ld;
3187
3188   msize = ntohs(message->size);
3189   if (msize < sizeof (struct GetMessage))
3190     {
3191       GNUNET_break_op (0);
3192       return GNUNET_SYSERR;
3193     }
3194   gm = (const struct GetMessage*) message;
3195   type = ntohl (gm->type);
3196   bm = ntohl (gm->hash_bitmap);
3197   bits = 0;
3198   while (bm > 0)
3199     {
3200       if (1 == (bm & 1))
3201         bits++;
3202       bm >>= 1;
3203     }
3204   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
3205     {
3206       GNUNET_break_op (0);
3207       return GNUNET_SYSERR;
3208     }  
3209   opt = (const GNUNET_HashCode*) &gm[1];
3210   bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
3211   bm = ntohl (gm->hash_bitmap);
3212   if ( (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) &&
3213        (type != GNUNET_BLOCK_TYPE_SBLOCK) )
3214     {
3215       GNUNET_break_op (0);
3216       return GNUNET_SYSERR;      
3217     }
3218   bits = 0;
3219   cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3220                                            &other->hashPubKey);
3221   if (NULL == cps)
3222     {
3223       /* peer must have just disconnected */
3224       GNUNET_STATISTICS_update (stats,
3225                                 gettext_noop ("# requests dropped due to initiator not being connected"),
3226                                 1,
3227                                 GNUNET_NO);
3228       return GNUNET_SYSERR;
3229     }
3230   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
3231     cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3232                                             &opt[bits++]);
3233   else
3234     cp = cps;
3235   if (cp == NULL)
3236     {
3237 #if DEBUG_FS
3238       if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
3239         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3240                     "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
3241                     GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
3242       
3243       else
3244         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3245                     "Failed to find peer `%4s' in connection set. Dropping query.\n",
3246                     GNUNET_i2s (other));
3247 #endif
3248       GNUNET_STATISTICS_update (stats,
3249                                 gettext_noop ("# requests dropped due to missing reverse route"),
3250                                 1,
3251                                 GNUNET_NO);
3252      /* FIXME: try connect? */
3253       return GNUNET_OK;
3254     }
3255   /* note that we can really only check load here since otherwise
3256      peers could find out that we are overloaded by not being
3257      disconnected after sending us a malformed query... */
3258
3259   /* FIXME: query priority should play
3260      a major role here! */
3261   ld = test_load_too_high ();
3262   if (GNUNET_YES == ld)
3263     {
3264 #if DEBUG_FS
3265       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3266                   "Dropping query from `%s', this peer is too busy.\n",
3267                   GNUNET_i2s (other));
3268 #endif
3269       GNUNET_STATISTICS_update (stats,
3270                                 gettext_noop ("# requests dropped due to high load"),
3271                                 1,
3272                                 GNUNET_NO);
3273       return GNUNET_OK;
3274     }
3275   /* FIXME: if ld == GNUNET_NO, forward
3276      instead of indirecting! */
3277
3278 #if DEBUG_FS 
3279   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3280               "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
3281               GNUNET_h2s (&gm->query),
3282               (unsigned int) type,
3283               GNUNET_i2s (other),
3284               (unsigned int) bm);
3285 #endif
3286   have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
3287   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
3288                       (have_ns ? sizeof(GNUNET_HashCode) : 0));
3289   if (have_ns)
3290     {
3291       pr->namespace = (GNUNET_HashCode*) &pr[1];
3292       memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
3293     }
3294   pr->type = type;
3295   pr->mingle = ntohl (gm->filter_mutator);
3296   if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
3297     pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
3298
3299   pr->anonymity_level = 1;
3300   pr->priority = bound_priority (ntohl (gm->priority), cps);
3301   pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
3302   pr->query = gm->query;
3303   /* decrement ttl (always) */
3304   ttl_decrement = 2 * TTL_DECREMENT +
3305     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3306                               TTL_DECREMENT);
3307   if ( (pr->ttl < 0) &&
3308        (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
3309     {
3310 #if DEBUG_FS
3311       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3312                   "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
3313                   GNUNET_i2s (other),
3314                   pr->ttl,
3315                   ttl_decrement);
3316 #endif
3317       GNUNET_STATISTICS_update (stats,
3318                                 gettext_noop ("# requests dropped due TTL underflow"),
3319                                 1,
3320                                 GNUNET_NO);
3321       /* integer underflow => drop (should be very rare)! */      
3322       GNUNET_free (pr);
3323       return GNUNET_OK;
3324     } 
3325   pr->ttl -= ttl_decrement;
3326   pr->start_time = GNUNET_TIME_absolute_get ();
3327
3328   /* get bloom filter */
3329   if (bfsize > 0)
3330     {
3331       pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
3332                                                   bfsize,
3333                                                   BLOOMFILTER_K);
3334       pr->bf_size = bfsize;
3335     }
3336
3337   cdc.have = NULL;
3338   cdc.pr = pr;
3339   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3340                                               &gm->query,
3341                                               &check_duplicate_request_peer,
3342                                               &cdc);
3343   if (cdc.have != NULL)
3344     {
3345       if (cdc.have->start_time.value + cdc.have->ttl >=
3346           pr->start_time.value + pr->ttl)
3347         {
3348           /* existing request has higher TTL, drop new one! */
3349           cdc.have->priority += pr->priority;
3350           destroy_pending_request (pr);
3351 #if DEBUG_FS
3352           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3353                       "Have existing request with higher TTL, dropping new request.\n",
3354                       GNUNET_i2s (other));
3355 #endif
3356           GNUNET_STATISTICS_update (stats,
3357                                     gettext_noop ("# requests dropped due to higher-TTL request"),
3358                                     1,
3359                                     GNUNET_NO);
3360           return GNUNET_OK;
3361         }
3362       else
3363         {
3364           /* existing request has lower TTL, drop old one! */
3365           pr->priority += cdc.have->priority;
3366           /* Possible optimization: if we have applicable pending
3367              replies in 'cdc.have', we might want to move those over
3368              (this is a really rare special-case, so it is not clear
3369              that this would be worth it) */
3370           destroy_pending_request (cdc.have);
3371           /* keep processing 'pr'! */
3372         }
3373     }
3374
3375   pr->cp = cp;
3376   GNUNET_break (GNUNET_OK ==
3377                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
3378                                                    &gm->query,
3379                                                    pr,
3380                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
3381   GNUNET_break (GNUNET_OK ==
3382                 GNUNET_CONTAINER_multihashmap_put (peer_request_map,
3383                                                    &other->hashPubKey,
3384                                                    pr,
3385                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
3386   
3387   pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
3388                                             pr,
3389                                             pr->start_time.value + pr->ttl);
3390
3391   GNUNET_STATISTICS_update (stats,
3392                             gettext_noop ("# P2P searches received"),
3393                             1,
3394                             GNUNET_NO);
3395   GNUNET_STATISTICS_update (stats,
3396                             gettext_noop ("# P2P searches active"),
3397                             1,
3398                             GNUNET_NO);
3399
3400   /* calculate change in traffic preference */
3401   cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
3402   /* process locally */
3403   if (type == GNUNET_BLOCK_TYPE_DBLOCK)
3404     type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
3405   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
3406                                            (pr->priority + 1)); 
3407   pr->qe = GNUNET_DATASTORE_get (dsh,
3408                                  &gm->query,
3409                                  type,                         
3410                                  pr->priority + 1,
3411                                  MAX_DATASTORE_QUEUE,                            
3412                                  timeout,
3413                                  &process_local_reply,
3414                                  pr);
3415
3416   /* Are multiple results possible?  If so, start processing remotely now! */
3417   switch (pr->type)
3418     {
3419     case GNUNET_BLOCK_TYPE_DBLOCK:
3420     case GNUNET_BLOCK_TYPE_IBLOCK:
3421       /* only one result, wait for datastore */
3422       break;
3423     default:
3424       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
3425         pr->task = GNUNET_SCHEDULER_add_now (sched,
3426                                              &forward_request_task,
3427                                              pr);
3428     }
3429
3430   /* make sure we don't track too many requests */
3431   if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
3432     {
3433       pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
3434       GNUNET_assert (pr != NULL);
3435       destroy_pending_request (pr);
3436     }
3437   return GNUNET_OK;
3438 }
3439
3440
3441 /* **************************** CS GET Handling ************************ */
3442
3443
3444 /**
3445  * Handle START_SEARCH-message (search request from client).
3446  *
3447  * @param cls closure
3448  * @param client identification of the client
3449  * @param message the actual message
3450  */
3451 static void
3452 handle_start_search (void *cls,
3453                      struct GNUNET_SERVER_Client *client,
3454                      const struct GNUNET_MessageHeader *message)
3455 {
3456   static GNUNET_HashCode all_zeros;
3457   const struct SearchMessage *sm;
3458   struct ClientList *cl;
3459   struct ClientRequestList *crl;
3460   struct PendingRequest *pr;
3461   uint16_t msize;
3462   unsigned int sc;
3463   enum GNUNET_BLOCK_Type type;
3464
3465   msize = ntohs (message->size);
3466   if ( (msize < sizeof (struct SearchMessage)) ||
3467        (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
3468     {
3469       GNUNET_break (0);
3470       GNUNET_SERVER_receive_done (client,
3471                                   GNUNET_SYSERR);
3472       return;
3473     }
3474   GNUNET_STATISTICS_update (stats,
3475                             gettext_noop ("# client searches received"),
3476                             1,
3477                             GNUNET_NO);
3478   sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
3479   sm = (const struct SearchMessage*) message;
3480   type = ntohl (sm->type);
3481 #if DEBUG_FS
3482   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3483               "Received request for `%s' of type %u from local client\n",
3484               GNUNET_h2s (&sm->query),
3485               (unsigned int) type);
3486 #endif
3487   cl = client_list;
3488   while ( (cl != NULL) &&
3489           (cl->client != client) )
3490     cl = cl->next;
3491   if (cl == NULL)
3492     {
3493       cl = GNUNET_malloc (sizeof (struct ClientList));
3494       cl->client = client;
3495       GNUNET_SERVER_client_keep (client);
3496       cl->next = client_list;
3497       client_list = cl;
3498     }
3499   /* detect duplicate KBLOCK requests */
3500   if ( (type == GNUNET_BLOCK_TYPE_KBLOCK) ||
3501        (type == GNUNET_BLOCK_TYPE_NBLOCK) ||
3502        (type == GNUNET_BLOCK_TYPE_ANY) )
3503     {
3504       crl = cl->rl_head;
3505       while ( (crl != NULL) &&
3506               ( (0 != memcmp (&crl->req->query,
3507                               &sm->query,
3508                               sizeof (GNUNET_HashCode))) ||
3509                 (crl->req->type != type) ) )
3510         crl = crl->next;
3511       if (crl != NULL)  
3512         { 
3513 #if DEBUG_FS
3514           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3515                       "Have existing request, merging content-seen lists.\n");
3516 #endif
3517           pr = crl->req;
3518           /* Duplicate request (used to send long list of
3519              known/blocked results); merge 'pr->replies_seen'
3520              and update bloom filter */
3521           GNUNET_array_grow (pr->replies_seen,
3522                              pr->replies_seen_size,
3523                              pr->replies_seen_off + sc);
3524           memcpy (&pr->replies_seen[pr->replies_seen_off],
3525                   &sm[1],
3526                   sc * sizeof (GNUNET_HashCode));
3527           pr->replies_seen_off += sc;
3528           refresh_bloomfilter (pr);
3529           GNUNET_STATISTICS_update (stats,
3530                                     gettext_noop ("# client searches updated (merged content seen list)"),
3531                                     1,
3532                                     GNUNET_NO);
3533           GNUNET_SERVER_receive_done (client,
3534                                       GNUNET_OK);
3535           return;
3536         }
3537     }
3538   GNUNET_STATISTICS_update (stats,
3539                             gettext_noop ("# client searches active"),
3540                             1,
3541                             GNUNET_NO);
3542   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
3543                       ((type == GNUNET_BLOCK_TYPE_SBLOCK) ? sizeof(GNUNET_HashCode) : 0));
3544   crl = GNUNET_malloc (sizeof (struct ClientRequestList));
3545   memset (crl, 0, sizeof (struct ClientRequestList));
3546   crl->client_list = cl;
3547   GNUNET_CONTAINER_DLL_insert (cl->rl_head,
3548                                cl->rl_tail,
3549                                crl);  
3550   crl->req = pr;
3551   pr->type = type;
3552   pr->client_request_list = crl;
3553   GNUNET_array_grow (pr->replies_seen,
3554                      pr->replies_seen_size,
3555                      sc);
3556   memcpy (pr->replies_seen,
3557           &sm[1],
3558           sc * sizeof (GNUNET_HashCode));
3559   pr->replies_seen_off = sc;
3560   pr->anonymity_level = ntohl (sm->anonymity_level); 
3561   refresh_bloomfilter (pr);
3562   pr->query = sm->query;
3563   if (0 == (1 & ntohl (sm->options)))
3564     pr->local_only = GNUNET_NO;
3565   else
3566     pr->local_only = GNUNET_YES;
3567   switch (type)
3568     {
3569     case GNUNET_BLOCK_TYPE_DBLOCK:
3570     case GNUNET_BLOCK_TYPE_IBLOCK:
3571       if (0 != memcmp (&sm->target,
3572                        &all_zeros,
3573                        sizeof (GNUNET_HashCode)))
3574         pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target);
3575       break;
3576     case GNUNET_BLOCK_TYPE_SBLOCK:
3577       pr->namespace = (GNUNET_HashCode*) &pr[1];
3578       memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode));
3579       break;
3580     default:
3581       break;
3582     }
3583   GNUNET_break (GNUNET_OK ==
3584                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
3585                                                    &sm->query,
3586                                                    pr,
3587                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
3588   if (type == GNUNET_BLOCK_TYPE_DBLOCK)
3589     type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */
3590   pr->qe = GNUNET_DATASTORE_get (dsh,
3591                                  &sm->query,
3592                                  type,
3593                                  -3, -1,
3594                                  GNUNET_CONSTANTS_SERVICE_TIMEOUT,                             
3595                                  &process_local_reply,
3596                                  pr);
3597 }
3598
3599
3600 /* **************************** Startup ************************ */
3601
3602 /**
3603  * Process fs requests.
3604  *
3605  * @param s scheduler to use
3606  * @param server the initialized server
3607  * @param c configuration to use
3608  */
3609 static int
3610 main_init (struct GNUNET_SCHEDULER_Handle *s,
3611            struct GNUNET_SERVER_Handle *server,
3612            const struct GNUNET_CONFIGURATION_Handle *c)
3613 {
3614   static const struct GNUNET_CORE_MessageHandler p2p_handlers[] =
3615     {
3616       { &handle_p2p_get, 
3617         GNUNET_MESSAGE_TYPE_FS_GET, 0 },
3618       { &handle_p2p_put, 
3619         GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
3620       { NULL, 0, 0 }
3621     };
3622   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
3623     {&GNUNET_FS_handle_index_start, NULL, 
3624      GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
3625     {&GNUNET_FS_handle_index_list_get, NULL, 
3626      GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
3627     {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
3628      sizeof (struct UnindexMessage) },
3629     {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
3630      0 },
3631     {NULL, NULL, 0, 0}
3632   };
3633   unsigned long long enc = 128;
3634
3635   sched = s;
3636   cfg = c;
3637   stats = GNUNET_STATISTICS_create (sched, "fs", cfg);
3638   min_migration_delay = GNUNET_TIME_UNIT_SECONDS;
3639   if ( (GNUNET_OK !=
3640         GNUNET_CONFIGURATION_get_value_number (cfg,
3641                                                "fs",
3642                                                "MAX_PENDING_REQUESTS",
3643                                                &max_pending_requests)) ||
3644        (GNUNET_OK !=
3645         GNUNET_CONFIGURATION_get_value_number (cfg,
3646                                                "fs",
3647                                                "EXPECTED_NEIGHBOUR_COUNT",
3648                                                &enc)) ||
3649        (GNUNET_OK != 
3650         GNUNET_CONFIGURATION_get_value_time (cfg,
3651                                              "fs",
3652                                              "MIN_MIGRATION_DELAY",
3653                                              &min_migration_delay)) )
3654     {
3655       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3656                   _("Configuration fails to specify certain parameters, assuming default values."));
3657     }
3658   connected_peers = GNUNET_CONTAINER_multihashmap_create (enc); 
3659   query_request_map = GNUNET_CONTAINER_multihashmap_create (max_pending_requests);
3660   peer_request_map = GNUNET_CONTAINER_multihashmap_create (enc);
3661   requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
3662   core = GNUNET_CORE_connect (sched,
3663                               cfg,
3664                               GNUNET_TIME_UNIT_FOREVER_REL,
3665                               NULL,
3666                               NULL,
3667                               &peer_connect_handler,
3668                               &peer_disconnect_handler,
3669                               NULL,
3670                               NULL, GNUNET_NO,
3671                               NULL, GNUNET_NO,
3672                               p2p_handlers);
3673   if (NULL == core)
3674     {
3675       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3676                   _("Failed to connect to `%s' service.\n"),
3677                   "core");
3678       GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
3679       connected_peers = NULL;
3680       GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
3681       query_request_map = NULL;
3682       GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
3683       requests_by_expiration_heap = NULL;
3684       GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
3685       peer_request_map = NULL;
3686       if (dsh != NULL)
3687         {
3688           GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
3689           dsh = NULL;
3690         }
3691       return GNUNET_SYSERR;
3692     }
3693   /* FIXME: distinguish between sending and storing in options? */
3694   if (active_migration) 
3695     {
3696       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3697                   _("Content migration is enabled, will start to gather data\n"));
3698       consider_migration_gathering ();
3699     }
3700   GNUNET_SERVER_disconnect_notify (server, 
3701                                    &handle_client_disconnect,
3702                                    NULL);
3703   GNUNET_assert (GNUNET_OK ==
3704                  GNUNET_CONFIGURATION_get_value_filename (cfg,
3705                                                           "fs",
3706                                                           "TRUST",
3707                                                           &trustDirectory));
3708   GNUNET_DISK_directory_create (trustDirectory);
3709   GNUNET_SCHEDULER_add_with_priority (sched,
3710                                       GNUNET_SCHEDULER_PRIORITY_HIGH,
3711                                       &cron_flush_trust, NULL);
3712
3713
3714   GNUNET_SERVER_add_handlers (server, handlers);
3715   GNUNET_SCHEDULER_add_delayed (sched,
3716                                 GNUNET_TIME_UNIT_FOREVER_REL,
3717                                 &shutdown_task,
3718                                 NULL);
3719   return GNUNET_OK;
3720 }
3721
3722
3723 /**
3724  * Process fs requests.
3725  *
3726  * @param cls closure
3727  * @param sched scheduler to use
3728  * @param server the initialized server
3729  * @param cfg configuration to use
3730  */
3731 static void
3732 run (void *cls,
3733      struct GNUNET_SCHEDULER_Handle *sched,
3734      struct GNUNET_SERVER_Handle *server,
3735      const struct GNUNET_CONFIGURATION_Handle *cfg)
3736 {
3737   active_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
3738                                                            "FS",
3739                                                            "ACTIVEMIGRATION");
3740   dsh = GNUNET_DATASTORE_connect (cfg,
3741                                   sched);
3742   if (dsh == NULL)
3743     {
3744       GNUNET_SCHEDULER_shutdown (sched);
3745       return;
3746     }
3747   block_cfg = GNUNET_CONFIGURATION_create ();
3748   GNUNET_CONFIGURATION_set_value_string (block_cfg,
3749                                          "block",
3750                                          "PLUGINS",
3751                                          "fs");
3752   block_ctx = GNUNET_BLOCK_context_create (block_cfg);
3753   GNUNET_assert (NULL != block_ctx);
3754   if ( (GNUNET_OK != GNUNET_FS_indexing_init (sched, cfg, dsh)) ||
3755        (GNUNET_OK != main_init (sched, server, cfg)) )
3756     {    
3757       GNUNET_SCHEDULER_shutdown (sched);
3758       GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
3759       dsh = NULL;
3760       GNUNET_BLOCK_context_destroy (block_ctx);
3761       block_ctx = NULL;
3762       GNUNET_CONFIGURATION_destroy (block_cfg);
3763       block_cfg = NULL;
3764       return;   
3765     }
3766 }
3767
3768
3769 /**
3770  * The main function for the fs service.
3771  *
3772  * @param argc number of arguments from the command line
3773  * @param argv command line arguments
3774  * @return 0 ok, 1 on error
3775  */
3776 int
3777 main (int argc, char *const *argv)
3778 {
3779   return (GNUNET_OK ==
3780           GNUNET_SERVICE_run (argc,
3781                               argv,
3782                               "fs",
3783                               GNUNET_SERVICE_OPTION_NONE,
3784                               &run, NULL)) ? 0 : 1;
3785 }
3786
3787 /* end of gnunet-service-fs.c */