towards migration
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs.c
23  * @brief gnunet anonymity protocol implementation
24  * @author Christian Grothoff
25  *
26  * FIXME:
27  * - TTL/priority calculations are absent!
28  * TODO:
29  * - have non-zero preference / priority for requests we initiate!
30  * - track stats for hot-path routing
31  * - implement hot-path routing decision procedure
32  * - implement: bound_priority, test_load_too_high, validate_nblock
33  * - add content migration support (store locally) [or create new service]
34  * - statistics
35  */
36 #include "platform.h"
37 #include <float.h>
38 #include "gnunet_constants.h"
39 #include "gnunet_core_service.h"
40 #include "gnunet_datastore_service.h"
41 #include "gnunet_peer_lib.h"
42 #include "gnunet_protocols.h"
43 #include "gnunet_signatures.h"
44 #include "gnunet_statistics_service.h"
45 #include "gnunet_util_lib.h"
46 #include "gnunet-service-fs_indexing.h"
47 #include "fs.h"
48
49 #define DEBUG_FS GNUNET_NO
50
51 /**
52  * Maximum number of outgoing messages we queue per peer.
53  * FIXME: make configurable?
54  */
55 #define MAX_QUEUE_PER_PEER 16
56
57 /**
58  * Inverse of the probability that we will submit the same query
59  * to the same peer again.  If the same peer already got the query
60  * repeatedly recently, the probability is multiplied by the inverse
61  * of this number each time.  Note that we only try about every TTL_DECREMENT/2
62  * plus MAX_CORK_DELAY (so roughly every 3.5s).
63  */
64 #define RETRY_PROBABILITY_INV 3
65
66 /**
67  * What is the maximum delay for a P2P FS message (in our interaction
68  * with core)?  FS-internal delays are another story.  The value is
69  * chosen based on the 32k block size.  Given that peers typcially
70  * have at least 1 kb/s bandwidth, 45s waits give us a chance to
71  * transmit one message even to the lowest-bandwidth peers.
72  */
73 #define MAX_TRANSMIT_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 45)
74
75
76
77 /**
78  * Maximum number of requests (from other peers) that we're
79  * willing to have pending at any given point in time.
80  * FIXME: set from configuration.
81  */
82 static uint64_t max_pending_requests = (32 * 1024);
83
84
85 /**
86  * Information we keep for each pending reply.  The
87  * actual message follows at the end of this struct.
88  */
89 struct PendingMessage;
90
91 /**
92  * Our connection to the datastore.
93  */
94 static struct GNUNET_DATASTORE_Handle *dsh;
95
96
97 /**
98  * Function called upon completion of a transmission.
99  *
100  * @param cls closure
101  * @param pid ID of receiving peer, 0 on transmission error
102  */
103 typedef void (*TransmissionContinuation)(void * cls, 
104                                          GNUNET_PEER_Id tpid);
105
106
107 /**
108  * Information we keep for each pending message (GET/PUT).  The
109  * actual message follows at the end of this struct.
110  */
111 struct PendingMessage
112 {
113   /**
114    * This is a doubly-linked list of messages to the same peer.
115    */
116   struct PendingMessage *next;
117
118   /**
119    * This is a doubly-linked list of messages to the same peer.
120    */
121   struct PendingMessage *prev;
122
123   /**
124    * Entry in pending message list for this pending message.
125    */ 
126   struct PendingMessageList *pml;  
127
128   /**
129    * Function to call immediately once we have transmitted this
130    * message.
131    */
132   TransmissionContinuation cont;
133
134   /**
135    * Closure for cont.
136    */
137   void *cont_cls;
138
139   /**
140    * Size of the reply; actual reply message follows
141    * at the end of this struct.
142    */
143   size_t msize;
144   
145   /**
146    * How important is this message for us?
147    */
148   uint32_t priority;
149  
150 };
151
152
153 /**
154  * Information about a peer that we are connected to.
155  * We track data that is useful for determining which
156  * peers should receive our requests.  We also keep
157  * a list of messages to transmit to this peer.
158  */
159 struct ConnectedPeer
160 {
161
162   /**
163    * List of the last clients for which this peer successfully
164    * answered a query.
165    */
166   struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
167
168   /**
169    * List of the last PIDs for which
170    * this peer successfully answered a query;
171    * We use 0 to indicate no successful reply.
172    */
173   GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
174
175   /**
176    * Average delay between sending the peer a request and
177    * getting a reply (only calculated over the requests for
178    * which we actually got a reply).   Calculated
179    * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
180    */ 
181   struct GNUNET_TIME_Relative avg_delay;
182
183   /**
184    * Handle for an active request for transmission to this
185    * peer, or NULL.
186    */
187   struct GNUNET_CORE_TransmitHandle *cth;
188
189   /**
190    * Messages (replies, queries, content migration) we would like to
191    * send to this peer in the near future.  Sorted by priority, head.
192    */
193   struct PendingMessage *pending_messages_head;
194
195   /**
196    * Messages (replies, queries, content migration) we would like to
197    * send to this peer in the near future.  Sorted by priority, tail.
198    */
199   struct PendingMessage *pending_messages_tail;
200
201   /**
202    * Average priority of successful replies.  Calculated
203    * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
204    */
205   double avg_priority;
206
207   /**
208    * Increase in traffic preference still to be submitted
209    * to the core service for this peer. FIXME: double or 'uint64_t'?
210    */
211   double inc_preference;
212
213   /**
214    * The peer's identity.
215    */
216   GNUNET_PEER_Id pid;  
217
218   /**
219    * Size of the linked list of 'pending_messages'.
220    */
221   unsigned int pending_requests;
222
223   /**
224    * Which offset in "last_p2p_replies" will be updated next?
225    * (we go round-robin).
226    */
227   unsigned int last_p2p_replies_woff;
228
229   /**
230    * Which offset in "last_client_replies" will be updated next?
231    * (we go round-robin).
232    */
233   unsigned int last_client_replies_woff;
234
235 };
236
237
238 /**
239  * Information we keep for each pending request.  We should try to
240  * keep this struct as small as possible since its memory consumption
241  * is key to how many requests we can have pending at once.
242  */
243 struct PendingRequest;
244
245
246 /**
247  * Doubly-linked list of requests we are performing
248  * on behalf of the same client.
249  */
250 struct ClientRequestList
251 {
252
253   /**
254    * This is a doubly-linked list.
255    */
256   struct ClientRequestList *next;
257
258   /**
259    * This is a doubly-linked list.
260    */
261   struct ClientRequestList *prev;
262
263   /**
264    * Request this entry represents.
265    */
266   struct PendingRequest *req;
267
268   /**
269    * Client list this request belongs to.
270    */
271   struct ClientList *client_list;
272
273 };
274
275
276 /**
277  * Replies to be transmitted to the client.  The actual
278  * response message is allocated after this struct.
279  */
280 struct ClientResponseMessage
281 {
282   /**
283    * This is a doubly-linked list.
284    */
285   struct ClientResponseMessage *next;
286
287   /**
288    * This is a doubly-linked list.
289    */
290   struct ClientResponseMessage *prev;
291
292   /**
293    * Client list entry this response belongs to.
294    */
295   struct ClientList *client_list;
296
297   /**
298    * Number of bytes in the response.
299    */
300   size_t msize;
301 };
302
303
304 /**
305  * Linked list of clients we are performing requests
306  * for right now.
307  */
308 struct ClientList
309 {
310   /**
311    * This is a linked list.
312    */
313   struct ClientList *next;
314
315   /**
316    * ID of a client making a request, NULL if this entry is for a
317    * peer.
318    */
319   struct GNUNET_SERVER_Client *client;
320
321   /**
322    * Head of list of requests performed on behalf
323    * of this client right now.
324    */
325   struct ClientRequestList *rl_head;
326
327   /**
328    * Tail of list of requests performed on behalf
329    * of this client right now.
330    */
331   struct ClientRequestList *rl_tail;
332
333   /**
334    * Head of linked list of responses.
335    */
336   struct ClientResponseMessage *res_head;
337
338   /**
339    * Tail of linked list of responses.
340    */
341   struct ClientResponseMessage *res_tail;
342
343   /**
344    * Context for sending replies.
345    */
346   struct GNUNET_CONNECTION_TransmitHandle *th;
347
348 };
349
350
351 /**
352  * Doubly-linked list of messages we are performing
353  * due to a pending request.
354  */
355 struct PendingMessageList
356 {
357
358   /**
359    * This is a doubly-linked list of messages on behalf of the same request.
360    */
361   struct PendingMessageList *next;
362
363   /**
364    * This is a doubly-linked list of messages on behalf of the same request.
365    */
366   struct PendingMessageList *prev;
367
368   /**
369    * Message this entry represents.
370    */
371   struct PendingMessage *pm;
372
373   /**
374    * Request this entry belongs to.
375    */
376   struct PendingRequest *req;
377
378   /**
379    * Peer this message is targeted for.
380    */
381   struct ConnectedPeer *target;
382
383 };
384
385
386 /**
387  * Information we keep for each pending request.  We should try to
388  * keep this struct as small as possible since its memory consumption
389  * is key to how many requests we can have pending at once.
390  */
391 struct PendingRequest
392 {
393
394   /**
395    * If this request was made by a client, this is our entry in the
396    * client request list; otherwise NULL.
397    */
398   struct ClientRequestList *client_request_list;
399
400   /**
401    * Entry of peer responsible for this entry (if this request
402    * was made by a peer).
403    */
404   struct ConnectedPeer *cp;
405
406   /**
407    * If this is a namespace query, pointer to the hash of the public
408    * key of the namespace; otherwise NULL.  Pointer will be to the 
409    * end of this struct (so no need to free it).
410    */
411   const GNUNET_HashCode *namespace;
412
413   /**
414    * Bloomfilter we use to filter out replies that we don't care about
415    * (anymore).  NULL as long as we are interested in all replies.
416    */
417   struct GNUNET_CONTAINER_BloomFilter *bf;
418
419   /**
420    * Context of our GNUNET_CORE_peer_change_preference call.
421    */
422   struct GNUNET_CORE_InformationRequestContext *irc;
423
424   /**
425    * Hash code of all replies that we have seen so far (only valid
426    * if client is not NULL since we only track replies like this for
427    * our own clients).
428    */
429   GNUNET_HashCode *replies_seen;
430
431   /**
432    * Node in the heap representing this entry; NULL
433    * if we have no heap node.
434    */
435   struct GNUNET_CONTAINER_HeapNode *hnode;
436
437   /**
438    * Head of list of messages being performed on behalf of this
439    * request.
440    */
441   struct PendingMessageList *pending_head;
442
443   /**
444    * Tail of list of messages being performed on behalf of this
445    * request.
446    */
447   struct PendingMessageList *pending_tail;
448
449   /**
450    * When did we first see this request (form this peer), or, if our
451    * client is initiating, when did we last initiate a search?
452    */
453   struct GNUNET_TIME_Absolute start_time;
454
455   /**
456    * The query that this request is for.
457    */
458   GNUNET_HashCode query;
459
460   /**
461    * The task responsible for transmitting queries
462    * for this request.
463    */
464   GNUNET_SCHEDULER_TaskIdentifier task;
465
466   /**
467    * (Interned) Peer identifier that identifies a preferred target
468    * for requests.
469    */
470   GNUNET_PEER_Id target_pid;
471
472   /**
473    * (Interned) Peer identifiers of peers that have already
474    * received our query for this content.
475    */
476   GNUNET_PEER_Id *used_pids;
477   
478   /**
479    * Our entry in the queue (non-NULL while we wait for our
480    * turn to interact with the local database).
481    */
482   struct GNUNET_DATASTORE_QueueEntry *qe;
483
484   /**
485    * Size of the 'bf' (in bytes).
486    */
487   size_t bf_size;
488
489   /**
490    * Desired anonymity level; only valid for requests from a local client.
491    */
492   uint32_t anonymity_level;
493
494   /**
495    * How many entries in "used_pids" are actually valid?
496    */
497   unsigned int used_pids_off;
498
499   /**
500    * How long is the "used_pids" array?
501    */
502   unsigned int used_pids_size;
503
504   /**
505    * Number of results found for this request.
506    */
507   unsigned int results_found;
508
509   /**
510    * How many entries in "replies_seen" are actually valid?
511    */
512   unsigned int replies_seen_off;
513
514   /**
515    * How long is the "replies_seen" array?
516    */
517   unsigned int replies_seen_size;
518   
519   /**
520    * Priority with which this request was made.  If one of our clients
521    * made the request, then this is the current priority that we are
522    * using when initiating the request.  This value is used when
523    * we decide to reward other peers with trust for providing a reply.
524    */
525   uint32_t priority;
526
527   /**
528    * Priority points left for us to spend when forwarding this request
529    * to other peers.
530    */
531   uint32_t remaining_priority;
532
533   /**
534    * Number to mingle hashes for bloom-filter tests with.
535    */
536   int32_t mingle;
537
538   /**
539    * TTL with which we saw this request (or, if we initiated, TTL that
540    * we used for the request).
541    */
542   int32_t ttl;
543   
544   /**
545    * Type of the content that this request is for.
546    */
547   enum GNUNET_BLOCK_Type type;
548
549   /**
550    * Remove this request after transmission of the current response.
551    */
552   int16_t do_remove;
553
554   /**
555    * GNUNET_YES if we should not forward this request to other peers.
556    */
557   int16_t local_only;
558
559 };
560
561
562 /**
563  * Block that is ready for migration to other peers.  Actual data is at the end of the block.
564  */
565 struct MigrationReadyBlock
566 {
567
568   /**
569    * This is a doubly-linked list.
570    */
571   struct MigrationReadyBlock *next;
572
573   /**
574    * This is a doubly-linked list.
575    */
576   struct MigrationReadyBlock *prev;
577
578   /**
579    * Query for the block.
580    */
581   GNUNET_HashCode query;
582
583   /**
584    * When does this block expire? 
585    */
586   struct GNUNET_TIME_Absolute expiration;
587
588   /**
589    * Size of the block.
590    */
591   size_t size;
592
593   /**
594    * Type of the block.
595    */
596   enum GNUNET_BLOCK_Type type;
597 };
598
599
600 /**
601  * Our scheduler.
602  */
603 static struct GNUNET_SCHEDULER_Handle *sched;
604
605 /**
606  * Our configuration.
607  */
608 static const struct GNUNET_CONFIGURATION_Handle *cfg;
609
610 /**
611  * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
612  */
613 static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
614
615 /**
616  * Map of peer identifiers to "struct PendingRequest" (for that peer).
617  */
618 static struct GNUNET_CONTAINER_MultiHashMap *peer_request_map;
619
620 /**
621  * Map of query identifiers to "struct PendingRequest" (for that query).
622  */
623 static struct GNUNET_CONTAINER_MultiHashMap *query_request_map;
624
625 /**
626  * Heap with the request that will expire next at the top.  Contains
627  * pointers of type "struct PendingRequest*"; these will *also* be
628  * aliased from the "requests_by_peer" data structures and the
629  * "requests_by_query" table.  Note that requests from our clients
630  * don't expire and are thus NOT in the "requests_by_expiration"
631  * (or the "requests_by_peer" tables).
632  */
633 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
634
635 /**
636  * Handle for reporting statistics.
637  */
638 static struct GNUNET_STATISTICS_Handle *stats;
639
640 /**
641  * Linked list of clients we are currently processing requests for.
642  */
643 static struct ClientList *client_list;
644
645 /**
646  * Pointer to handle to the core service (points to NULL until we've
647  * connected to it).
648  */
649 static struct GNUNET_CORE_Handle *core;
650
651 /**
652  * Head of linked list of blocks that can be migrated.
653  */
654 static struct MigrationReadyBlock *mig_head;
655
656 /**
657  * Tail of linked list of blocks that can be migrated.
658  */
659 static struct MigrationReadyBlock *mig_tail;
660
661 /**
662  * Request to datastore for migration (or NULL).
663  */
664 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
665
666 /**
667  * ID of task that collects blocks for migration.
668  */
669 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
670
671 /**
672  * What is the maximum frequency at which we are allowed to
673  * poll the datastore for migration content?
674  */
675 static struct GNUNET_TIME_Relative min_migration_delay;
676
677 /**
678  * Size of the doubly-linked list of migration blocks.
679  */
680 static unsigned int mig_size;
681
682 /**
683  * Are we allowed to migrate content to this peer.
684  */
685 static int active_migration;
686
687 /* ******************* clean up functions ************************ */
688
689
690 /**
691  * Delete the given migration block.
692  *
693  * @param mb block to delete
694  */
695 static void
696 delete_migration_block (struct MigrationReadyBlock *mb)
697 {
698   GNUNET_CONTAINER_DLL_remove (mig_head,
699                                mig_tail,
700                                mb);
701   mig_size--;
702   GNUNET_free (mb);
703 }
704
705
706 /**
707  * Consider migrating content to a given peer.
708  *
709  * @param cls not used
710  * @param key ID of the peer (not used)
711  * @param value 'struct ConnectedPeer' of the peer
712  * @return GNUNET_YES (always continue iteration)2
713  */
714 static int
715 consider_migration (void *cls,
716                     const GNUNET_HashCode *key,
717                     void *value)
718 {
719   struct ConnectedPeer *cp = value;
720   
721   if (cp->cth != NULL)
722     return GNUNET_YES; /* or what? */
723   /* FIXME: not implemented! */
724   return GNUNET_YES;
725 }
726
727
728
729
730 /**
731  * Task that is run periodically to obtain blocks for content
732  * migration
733  * 
734  * @param cls unused
735  * @param tc scheduler context (also unused)
736  */
737 static void
738 gather_migration_blocks (void *cls,
739                          const struct GNUNET_SCHEDULER_TaskContext *tc);
740
741
742 /**
743  * Process content offered for migration.
744  *
745  * @param cls closure
746  * @param key key for the content
747  * @param size number of bytes in data
748  * @param data content stored
749  * @param type type of the content
750  * @param priority priority of the content
751  * @param anonymity anonymity-level for the content
752  * @param expiration expiration time for the content
753  * @param uid unique identifier for the datum;
754  *        maybe 0 if no unique identifier is available
755  */
756 static void
757 process_migration_content (void *cls,
758                            const GNUNET_HashCode * key,
759                            uint32_t size,
760                            const void *data,
761                            enum GNUNET_BLOCK_Type type,
762                            uint32_t priority,
763                            uint32_t anonymity,
764                            struct GNUNET_TIME_Absolute
765                            expiration, uint64_t uid)
766 {
767   struct MigrationReadyBlock *mb;
768   struct GNUNET_TIME_Relative delay;
769   
770   if (key == NULL)
771     {
772       mig_qe = NULL;
773       if (mig_size < MAX_MIGRATION_QUEUE)  
774         {
775           delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
776                                                  mig_size);
777           delay = GNUNET_TIME_relative_divide (GNUNET_TIME_UNIT_SECONDS,
778                                                MAX_MIGRATION_QUEUE);
779           delay = GNUNET_TIME_relative_max (delay,
780                                             min_migration_delay);
781           mig_task = GNUNET_SCHEDULER_add_delayed (sched,
782                                                    delay,
783                                                    &gather_migration_blocks,
784                                                    NULL);
785         }
786       return;
787     }
788   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
789   mb->query = *key;
790   mb->expiration = expiration;
791   mb->size = size;
792   mb->type = type;
793   memcpy (&mb[1], data, size);
794   GNUNET_CONTAINER_DLL_insert_after (mig_head,
795                                      mig_tail,
796                                      mig_tail,
797                                      mb);
798   mig_size++;
799   if (mig_size == 1)
800     GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
801                                            &consider_migration,
802                                            NULL);
803   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
804 }
805
806
807 /**
808  * Task that is run periodically to obtain blocks for content
809  * migration
810  * 
811  * @param cls unused
812  * @param tc scheduler context (also unused)
813  */
814 static void
815 gather_migration_blocks (void *cls,
816                          const struct GNUNET_SCHEDULER_TaskContext *tc)
817 {
818   mig_task = GNUNET_SCHEDULER_NO_TASK;
819   mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, -1,
820                                         GNUNET_TIME_UNIT_FOREVER_REL,
821                                         &process_migration_content, NULL);
822 }
823
824
825 /**
826  * We're done with a particular message list entry.
827  * Free all associated resources.
828  * 
829  * @param pml entry to destroy
830  */
831 static void
832 destroy_pending_message_list_entry (struct PendingMessageList *pml)
833 {
834   GNUNET_CONTAINER_DLL_remove (pml->req->pending_head,
835                                pml->req->pending_tail,
836                                pml);
837   GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head,
838                                pml->target->pending_messages_tail,
839                                pml->pm);
840   pml->target->pending_requests--;
841   GNUNET_free (pml->pm);
842   GNUNET_free (pml);
843 }
844
845
846 /**
847  * Destroy the given pending message (and call the respective
848  * continuation).
849  *
850  * @param pm message to destroy
851  * @param tpid id of peer that the message was delivered to, or 0 for none
852  */
853 static void
854 destroy_pending_message (struct PendingMessage *pm,
855                          GNUNET_PEER_Id tpid)
856 {
857   struct PendingMessageList *pml = pm->pml;
858   TransmissionContinuation cont;
859   void *cont_cls;
860
861   GNUNET_assert (pml->pm == pm);
862   GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
863   cont = pm->cont;
864   cont_cls = pm->cont_cls;
865   destroy_pending_message_list_entry (pml);
866   cont (cont_cls, tpid);  
867 }
868
869
870 /**
871  * We're done processing a particular request.
872  * Free all associated resources.
873  *
874  * @param pr request to destroy
875  */
876 static void
877 destroy_pending_request (struct PendingRequest *pr)
878 {
879   struct GNUNET_PeerIdentity pid;
880
881   if (pr->hnode != NULL)
882     {
883       GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
884                                          pr->hnode);
885       pr->hnode = NULL;
886     }
887   if (NULL == pr->client_request_list)
888     {
889       GNUNET_STATISTICS_update (stats,
890                                 gettext_noop ("# P2P searches active"),
891                                 -1,
892                                 GNUNET_NO);
893     }
894   else
895     {
896       GNUNET_STATISTICS_update (stats,
897                                 gettext_noop ("# client searches active"),
898                                 -1,
899                                 GNUNET_NO);
900     }
901   /* might have already been removed from map in 'process_reply' (if
902      there was a unique reply) or never inserted if it was a
903      duplicate; hence ignore the return value here */
904   (void) GNUNET_CONTAINER_multihashmap_remove (query_request_map,
905                                                &pr->query,
906                                                pr);
907   if (pr->qe != NULL)
908      {
909       GNUNET_DATASTORE_cancel (pr->qe);
910       pr->qe = NULL;
911     }
912   if (pr->client_request_list != NULL)
913     {
914       GNUNET_CONTAINER_DLL_remove (pr->client_request_list->client_list->rl_head,
915                                    pr->client_request_list->client_list->rl_tail,
916                                    pr->client_request_list);
917       GNUNET_free (pr->client_request_list);
918       pr->client_request_list = NULL;
919     }
920   if (pr->cp != NULL)
921     {
922       GNUNET_PEER_resolve (pr->cp->pid,
923                            &pid);
924       (void) GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
925                                                    &pid.hashPubKey,
926                                                    pr);
927       pr->cp = NULL;
928     }
929   if (pr->bf != NULL)
930     {
931       GNUNET_CONTAINER_bloomfilter_free (pr->bf);                                        
932       pr->bf = NULL;
933     }
934   if (pr->irc != NULL)
935     {
936       GNUNET_CORE_peer_change_preference_cancel (pr->irc);
937       pr->irc = NULL;
938     }
939   if (pr->replies_seen != NULL)
940     {
941       GNUNET_free (pr->replies_seen);
942       pr->replies_seen = NULL;
943     }
944   if (pr->task != GNUNET_SCHEDULER_NO_TASK)
945     {
946       GNUNET_SCHEDULER_cancel (sched,
947                                pr->task);
948       pr->task = GNUNET_SCHEDULER_NO_TASK;
949     }
950   while (NULL != pr->pending_head)    
951     destroy_pending_message_list_entry (pr->pending_head);
952   GNUNET_PEER_change_rc (pr->target_pid, -1);
953   if (pr->used_pids != NULL)
954     {
955       GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
956       GNUNET_free (pr->used_pids);
957       pr->used_pids_off = 0;
958       pr->used_pids_size = 0;
959       pr->used_pids = NULL;
960     }
961   GNUNET_free (pr);
962 }
963
964
965 /**
966  * Method called whenever a given peer connects.
967  *
968  * @param cls closure, not used
969  * @param peer peer identity this notification is about
970  * @param latency reported latency of the connection with 'other'
971  * @param distance reported distance (DV) to 'other' 
972  */
973 static void 
974 peer_connect_handler (void *cls,
975                       const struct
976                       GNUNET_PeerIdentity * peer,
977                       struct GNUNET_TIME_Relative latency,
978                       uint32_t distance)
979 {
980   struct ConnectedPeer *cp;
981
982   cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
983   cp->pid = GNUNET_PEER_intern (peer);
984   GNUNET_break (GNUNET_OK ==
985                 GNUNET_CONTAINER_multihashmap_put (connected_peers,
986                                                    &peer->hashPubKey,
987                                                    cp,
988                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
989   if (mig_size > 0)
990     (void) consider_migration (NULL, &peer->hashPubKey, cp);
991 }
992
993
994
995 /**
996  * Free (each) request made by the peer.
997  *
998  * @param cls closure, points to peer that the request belongs to
999  * @param key current key code
1000  * @param value value in the hash map
1001  * @return GNUNET_YES (we should continue to iterate)
1002  */
1003 static int
1004 destroy_request (void *cls,
1005                  const GNUNET_HashCode * key,
1006                  void *value)
1007 {
1008   const struct GNUNET_PeerIdentity * peer = cls;
1009   struct PendingRequest *pr = value;
1010   
1011   GNUNET_break (GNUNET_YES ==
1012                 GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1013                                                       &peer->hashPubKey,
1014                                                       pr));
1015   destroy_pending_request (pr);
1016   return GNUNET_YES;
1017 }
1018
1019
1020 /**
1021  * Method called whenever a peer disconnects.
1022  *
1023  * @param cls closure, not used
1024  * @param peer peer identity this notification is about
1025  */
1026 static void
1027 peer_disconnect_handler (void *cls,
1028                          const struct
1029                          GNUNET_PeerIdentity * peer)
1030 {
1031   struct ConnectedPeer *cp;
1032   struct PendingMessage *pm;
1033   unsigned int i;
1034
1035   GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map,
1036                                               &peer->hashPubKey,
1037                                               &destroy_request,
1038                                               (void*) peer);
1039   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1040                                           &peer->hashPubKey);
1041   if (cp == NULL)
1042     return;
1043   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1044     {
1045       if (NULL != cp->last_client_replies[i])
1046         {
1047           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1048           cp->last_client_replies[i] = NULL;
1049         }
1050     }
1051   GNUNET_break (GNUNET_YES ==
1052                 GNUNET_CONTAINER_multihashmap_remove (connected_peers,
1053                                                       &peer->hashPubKey,
1054                                                       cp));
1055   GNUNET_PEER_change_rc (cp->pid, -1);
1056   GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1057   if (NULL != cp->cth)
1058     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1059   while (NULL != (pm = cp->pending_messages_head))
1060     destroy_pending_message (pm, 0 /* delivery failed */);
1061   GNUNET_break (0 == cp->pending_requests);
1062   GNUNET_free (cp);
1063 }
1064
1065
1066 /**
1067  * Iterator over hash map entries that removes all occurences
1068  * of the given 'client' from the 'last_client_replies' of the
1069  * given connected peer.
1070  *
1071  * @param cls closure, the 'struct GNUNET_SERVER_Client*' to remove
1072  * @param key current key code (unused)
1073  * @param value value in the hash map (the 'struct ConnectedPeer*' to change)
1074  * @return GNUNET_YES (we should continue to iterate)
1075  */
1076 static int
1077 remove_client_from_last_client_replies (void *cls,
1078                                         const GNUNET_HashCode * key,
1079                                         void *value)
1080 {
1081   struct GNUNET_SERVER_Client *client = cls;
1082   struct ConnectedPeer *cp = value;
1083   unsigned int i;
1084
1085   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1086     {
1087       if (cp->last_client_replies[i] == client)
1088         {
1089           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1090           cp->last_client_replies[i] = NULL;
1091         }
1092     }  
1093   return GNUNET_YES;
1094 }
1095
1096
1097 /**
1098  * A client disconnected.  Remove all of its pending queries.
1099  *
1100  * @param cls closure, NULL
1101  * @param client identification of the client
1102  */
1103 static void
1104 handle_client_disconnect (void *cls,
1105                           struct GNUNET_SERVER_Client
1106                           * client)
1107 {
1108   struct ClientList *pos;
1109   struct ClientList *prev;
1110   struct ClientRequestList *rcl;
1111   struct ClientResponseMessage *creply;
1112
1113   if (client == NULL)
1114     return;
1115   prev = NULL;
1116   pos = client_list;
1117   while ( (NULL != pos) &&
1118           (pos->client != client) )
1119     {
1120       prev = pos;
1121       pos = pos->next;
1122     }
1123   if (pos == NULL)
1124     return; /* no requests pending for this client */
1125   while (NULL != (rcl = pos->rl_head))
1126     {
1127       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1128                   "Destroying pending request `%s' on disconnect\n",
1129                   GNUNET_h2s (&rcl->req->query));
1130       destroy_pending_request (rcl->req);
1131     }
1132   if (prev == NULL)
1133     client_list = pos->next;
1134   else
1135     prev->next = pos->next;
1136   if (pos->th != NULL)
1137     {
1138       GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
1139       pos->th = NULL;
1140     }
1141   while (NULL != (creply = pos->res_head))
1142     {
1143       GNUNET_CONTAINER_DLL_remove (pos->res_head,
1144                                    pos->res_tail,
1145                                    creply);
1146       GNUNET_free (creply);
1147     }    
1148   GNUNET_SERVER_client_drop (pos->client);
1149   GNUNET_free (pos);
1150   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1151                                          &remove_client_from_last_client_replies,
1152                                          client);
1153 }
1154
1155
1156 /**
1157  * Iterator to free peer entries.
1158  *
1159  * @param cls closure, unused
1160  * @param key current key code
1161  * @param value value in the hash map (peer entry)
1162  * @return GNUNET_YES (we should continue to iterate)
1163  */
1164 static int 
1165 clean_peer (void *cls,
1166             const GNUNET_HashCode * key,
1167             void *value)
1168 {
1169   peer_disconnect_handler (NULL, (const struct GNUNET_PeerIdentity*) key);
1170   return GNUNET_YES;
1171 }
1172
1173
1174 /**
1175  * Task run during shutdown.
1176  *
1177  * @param cls unused
1178  * @param tc unused
1179  */
1180 static void
1181 shutdown_task (void *cls,
1182                const struct GNUNET_SCHEDULER_TaskContext *tc)
1183 {
1184   if (mig_qe != NULL)
1185     {
1186       GNUNET_DATASTORE_cancel (mig_qe);
1187       mig_qe = NULL;
1188     }
1189   if (GNUNET_SCHEDULER_NO_TASK != mig_task)
1190     {
1191       GNUNET_SCHEDULER_cancel (sched, mig_task);
1192       mig_task = GNUNET_SCHEDULER_NO_TASK;
1193     }
1194   while (client_list != NULL)
1195     handle_client_disconnect (NULL,
1196                               client_list->client);
1197   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1198                                          &clean_peer,
1199                                          NULL);
1200   GNUNET_break (0 == GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap));
1201   GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
1202   requests_by_expiration_heap = 0;
1203   GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
1204   connected_peers = NULL;
1205   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map));
1206   GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
1207   query_request_map = NULL;
1208   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map));
1209   GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
1210   peer_request_map = NULL;
1211   GNUNET_assert (NULL != core);
1212   GNUNET_CORE_disconnect (core);
1213   core = NULL;
1214   if (stats != NULL)
1215     {
1216       GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
1217       stats = NULL;
1218     }
1219   GNUNET_DATASTORE_disconnect (dsh,
1220                                GNUNET_NO);
1221   while (mig_head != NULL)
1222     delete_migration_block (mig_head);
1223   GNUNET_assert (0 == mig_size);
1224   dsh = NULL;
1225   sched = NULL;
1226   cfg = NULL;  
1227 }
1228
1229
1230 /* ******************* Utility functions  ******************** */
1231
1232
1233 /**
1234  * Transmit the given message by copying it to the target buffer
1235  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
1236  * for writing in the meantime.  In that case, do nothing
1237  * (the disconnect or shutdown handler will take care of the rest).
1238  * If we were able to transmit messages and there are still more
1239  * pending, ask core again for further calls to this function.
1240  *
1241  * @param cls closure, pointer to the 'struct ConnectedPeer*'
1242  * @param size number of bytes available in buf
1243  * @param buf where the callee should write the message
1244  * @return number of bytes written to buf
1245  */
1246 static size_t
1247 transmit_to_peer (void *cls,
1248                   size_t size, void *buf)
1249 {
1250   struct ConnectedPeer *cp = cls;
1251   char *cbuf = buf;
1252   struct GNUNET_PeerIdentity pid;
1253   struct PendingMessage *pm;
1254   size_t msize;
1255  
1256   cp->cth = NULL;
1257   if (NULL == buf)
1258     {
1259 #if DEBUG_FS
1260       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1261                   "Dropping message, core too busy.\n");
1262 #endif
1263       return 0;
1264     }
1265   msize = 0;
1266   while ( (NULL != (pm = cp->pending_messages_head) ) &&
1267           (pm->msize <= size) )
1268     {
1269       memcpy (&cbuf[msize], &pm[1], pm->msize);
1270       msize += pm->msize;
1271       size -= pm->msize;
1272       destroy_pending_message (pm, cp->pid);
1273     }
1274   if (NULL != pm)
1275     {
1276       GNUNET_PEER_resolve (cp->pid,
1277                            &pid);
1278       cp->cth = GNUNET_CORE_notify_transmit_ready (core,
1279                                                    pm->priority,
1280                                                    GNUNET_CONSTANTS_SERVICE_TIMEOUT,
1281                                                    &pid,
1282                                                    pm->msize,
1283                                                    &transmit_to_peer,
1284                                                    cp);
1285     }
1286 #if DEBUG_FS
1287   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1288               "Transmitting %u bytes to peer %u\n",
1289               msize,
1290               cp->pid);
1291 #endif
1292   return msize;
1293 }
1294
1295
1296 /**
1297  * Add a message to the set of pending messages for the given peer.
1298  *
1299  * @param cp peer to send message to
1300  * @param pm message to queue
1301  * @param pr request on which behalf this message is being queued
1302  */
1303 static void
1304 add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
1305                                   struct PendingMessage *pm,
1306                                   struct PendingRequest *pr)
1307 {
1308   struct PendingMessage *pos;
1309   struct PendingMessageList *pml;
1310   struct GNUNET_PeerIdentity pid;
1311
1312   GNUNET_assert (pm->next == NULL);
1313   GNUNET_assert (pm->pml == NULL);    
1314   pml = GNUNET_malloc (sizeof (struct PendingMessageList));
1315   pml->req = pr;
1316   pml->target = cp;
1317   pml->pm = pm;
1318   pm->pml = pml;  
1319   GNUNET_CONTAINER_DLL_insert (pr->pending_head,
1320                                pr->pending_tail,
1321                                pml);
1322   pos = cp->pending_messages_head;
1323   while ( (pos != NULL) &&
1324           (pm->priority < pos->priority) )
1325     pos = pos->next;    
1326   GNUNET_CONTAINER_DLL_insert_after (cp->pending_messages_head,
1327                                      cp->pending_messages_tail,
1328                                      pos,
1329                                      pm);
1330   cp->pending_requests++;
1331   if (cp->pending_requests > MAX_QUEUE_PER_PEER)
1332     destroy_pending_message (cp->pending_messages_tail, 0);  
1333   if (cp->cth == NULL)
1334     {
1335       /* need to schedule transmission */
1336       GNUNET_PEER_resolve (cp->pid, &pid);
1337       cp->cth = GNUNET_CORE_notify_transmit_ready (core,
1338                                                    cp->pending_messages_head->priority,
1339                                                    MAX_TRANSMIT_DELAY,
1340                                                    &pid,
1341                                                    cp->pending_messages_head->msize,
1342                                                    &transmit_to_peer,
1343                                                    cp);
1344     }
1345   if (cp->cth == NULL)
1346     {
1347 #if DEBUG_FS
1348       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1349                   "Failed to schedule transmission with core!\n");
1350 #endif
1351       /* FIXME: call stats (rare, bad case) */
1352     }
1353 }
1354
1355
1356 /**
1357  * Mingle hash with the mingle_number to produce different bits.
1358  */
1359 static void
1360 mingle_hash (const GNUNET_HashCode * in,
1361              int32_t mingle_number, 
1362              GNUNET_HashCode * hc)
1363 {
1364   GNUNET_HashCode m;
1365
1366   GNUNET_CRYPTO_hash (&mingle_number, 
1367                       sizeof (int32_t), 
1368                       &m);
1369   GNUNET_CRYPTO_hash_xor (&m, in, hc);
1370 }
1371
1372
1373 /**
1374  * Test if the load on this peer is too high
1375  * to even consider processing the query at
1376  * all.
1377  * 
1378  * @return GNUNET_YES if the load is too high, GNUNET_NO otherwise
1379  */
1380 static int
1381 test_load_too_high ()
1382 {
1383   return GNUNET_NO; // FIXME
1384 }
1385
1386
1387 /* ******************* Pending Request Refresh Task ******************** */
1388
1389
1390
1391 /**
1392  * We use a random delay to make the timing of requests less
1393  * predictable.  This function returns such a random delay.  We add a base
1394  * delay of MAX_CORK_DELAY (1s).
1395  *
1396  * FIXME: make schedule dependent on the specifics of the request?
1397  * Or bandwidth and number of connected peers and load?
1398  *
1399  * @return random delay to use for some request, between 1s and 1000+TTL_DECREMENT ms
1400  */
1401 static struct GNUNET_TIME_Relative
1402 get_processing_delay ()
1403 {
1404   return 
1405     GNUNET_TIME_relative_add (GNUNET_CONSTANTS_MAX_CORK_DELAY,
1406                               GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
1407                                                              GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1408                                                                                        TTL_DECREMENT)));
1409 }
1410
1411
1412 /**
1413  * We're processing a GET request from another peer and have decided
1414  * to forward it to other peers.  This function is called periodically
1415  * and should forward the request to other peers until we have all
1416  * possible replies.  If we have transmitted the *only* reply to
1417  * the initiator we should destroy the pending request.  If we have
1418  * many replies in the queue to the initiator, we should delay sending
1419  * out more queries until the reply queue has shrunk some.
1420  *
1421  * @param cls our "struct ProcessGetContext *"
1422  * @param tc unused
1423  */
1424 static void
1425 forward_request_task (void *cls,
1426                       const struct GNUNET_SCHEDULER_TaskContext *tc);
1427
1428
1429 /**
1430  * Function called after we either failed or succeeded
1431  * at transmitting a query to a peer.  
1432  *
1433  * @param cls the requests "struct PendingRequest*"
1434  * @param tpid ID of receiving peer, 0 on transmission error
1435  */
1436 static void
1437 transmit_query_continuation (void *cls,
1438                              GNUNET_PEER_Id tpid)
1439 {
1440   struct PendingRequest *pr = cls;
1441
1442   GNUNET_STATISTICS_update (stats,
1443                             gettext_noop ("# queries scheduled for forwarding"),
1444                             -1,
1445                             GNUNET_NO);
1446   if (tpid == 0)   
1447     {
1448 #if DEBUG_FS
1449       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1450                   "Transmission of request failed, will try again later.\n");
1451 #endif
1452       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
1453         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1454                                                  get_processing_delay (),
1455                                                  &forward_request_task,
1456                                                  pr); 
1457       return;    
1458     }
1459   GNUNET_STATISTICS_update (stats,
1460                             gettext_noop ("# queries forwarded"),
1461                             1,
1462                             GNUNET_NO);
1463   GNUNET_PEER_change_rc (tpid, 1);
1464   if (pr->used_pids_off == pr->used_pids_size)
1465     GNUNET_array_grow (pr->used_pids,
1466                        pr->used_pids_size,
1467                        pr->used_pids_size * 2 + 2);
1468   pr->used_pids[pr->used_pids_off++] = tpid;
1469   if (pr->task == GNUNET_SCHEDULER_NO_TASK)
1470     pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1471                                              get_processing_delay (),
1472                                              &forward_request_task,
1473                                              pr);
1474 }
1475
1476
1477 /**
1478  * How many bytes should a bloomfilter be if we have already seen
1479  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
1480  * of bits set per entry.  Furthermore, we should not re-size the
1481  * filter too often (to keep it cheap).
1482  *
1483  * Since other peers will also add entries but not resize the filter,
1484  * we should generally pick a slightly larger size than what the
1485  * strict math would suggest.
1486  *
1487  * @return must be a power of two and smaller or equal to 2^15.
1488  */
1489 static size_t
1490 compute_bloomfilter_size (unsigned int entry_count)
1491 {
1492   size_t size;
1493   unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
1494   uint16_t max = 1 << 15;
1495
1496   if (entry_count > max)
1497     return max;
1498   size = 8;
1499   while ((size < max) && (size < ideal))
1500     size *= 2;
1501   if (size > max)
1502     return max;
1503   return size;
1504 }
1505
1506
1507 /**
1508  * Recalculate our bloom filter for filtering replies.  This function
1509  * will create a new bloom filter from scratch, so it should only be
1510  * called if we have no bloomfilter at all (and hence can create a
1511  * fresh one of minimal size without problems) OR if our peer is the
1512  * initiator (in which case we may resize to larger than mimimum size).
1513  *
1514  * @param pr request for which the BF is to be recomputed
1515  */
1516 static void
1517 refresh_bloomfilter (struct PendingRequest *pr)
1518 {
1519   unsigned int i;
1520   size_t nsize;
1521   GNUNET_HashCode mhash;
1522
1523   nsize = compute_bloomfilter_size (pr->replies_seen_off);
1524   if (nsize == pr->bf_size)
1525     return; /* size not changed */
1526   if (pr->bf != NULL)
1527     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
1528   pr->bf_size = nsize;
1529   pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
1530   pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
1531                                               pr->bf_size,
1532                                               BLOOMFILTER_K);
1533   for (i=0;i<pr->replies_seen_off;i++)
1534     {
1535       mingle_hash (&pr->replies_seen[i], pr->mingle, &mhash);
1536       GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
1537     }
1538 }
1539
1540
1541 /**
1542  * Function called after we've tried to reserve a certain amount of
1543  * bandwidth for a reply.  Check if we succeeded and if so send our
1544  * query.
1545  *
1546  * @param cls the requests "struct PendingRequest*"
1547  * @param peer identifies the peer
1548  * @param bpm_in set to the current bandwidth limit (receiving) for this peer
1549  * @param bpm_out set to the current bandwidth limit (sending) for this peer
1550  * @param amount set to the amount that was actually reserved or unreserved
1551  * @param preference current traffic preference for the given peer
1552  */
1553 static void
1554 target_reservation_cb (void *cls,
1555                        const struct
1556                        GNUNET_PeerIdentity * peer,
1557                        struct GNUNET_BANDWIDTH_Value32NBO bpm_in,
1558                        struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
1559                        int amount,
1560                        uint64_t preference)
1561 {
1562   struct PendingRequest *pr = cls;
1563   struct ConnectedPeer *cp;
1564   struct PendingMessage *pm;
1565   struct GetMessage *gm;
1566   GNUNET_HashCode *ext;
1567   char *bfdata;
1568   size_t msize;
1569   unsigned int k;
1570   int no_route;
1571   uint32_t bm;
1572
1573   pr->irc = NULL;
1574   if (peer == NULL)
1575     {
1576       /* error in communication with core, try again later */
1577       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
1578         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1579                                                  get_processing_delay (),
1580                                                  &forward_request_task,
1581                                                  pr);
1582       return;
1583     }
1584   // (3) transmit, update ttl/priority
1585   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1586                                           &peer->hashPubKey);
1587   if (cp == NULL)
1588     {
1589       /* Peer must have just left */
1590 #if DEBUG_FS
1591       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1592                   "Selected peer disconnected!\n");
1593 #endif
1594       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
1595         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1596                                                  get_processing_delay (),
1597                                                  &forward_request_task,
1598                                                  pr);
1599       return;
1600     }
1601   no_route = GNUNET_NO;
1602   /* FIXME: check against DBLOCK_SIZE and possibly return
1603      amount to reserve; however, this also needs to work
1604      with testcases which currently start out with a far
1605      too low per-peer bw limit, so they would never send
1606      anything.  Big issue. */
1607   if (amount == 0)
1608     {
1609       if (pr->cp == NULL)
1610         {
1611 #if DEBUG_FS > 1
1612           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1613                       "Failed to reserve bandwidth for reply (got %d/%u bytes only)!\n",
1614                       amount,
1615                       DBLOCK_SIZE);
1616 #endif
1617           GNUNET_STATISTICS_update (stats,
1618                                     gettext_noop ("# reply bandwidth reservation requests failed"),
1619                                     1,
1620                                     GNUNET_NO);
1621           if (pr->task == GNUNET_SCHEDULER_NO_TASK)
1622             pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1623                                                      get_processing_delay (),
1624                                                      &forward_request_task,
1625                                                      pr);
1626           return;  /* this target round failed */
1627         }
1628       /* FIXME: if we are "quite" busy, we may still want to skip
1629          this round; need more load detection code! */
1630       no_route = GNUNET_YES;
1631     }
1632   
1633   GNUNET_STATISTICS_update (stats,
1634                             gettext_noop ("# queries scheduled for forwarding"),
1635                             1,
1636                             GNUNET_NO);
1637   /* build message and insert message into priority queue */
1638 #if DEBUG_FS
1639   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1640               "Forwarding request `%s' to `%4s'!\n",
1641               GNUNET_h2s (&pr->query),
1642               GNUNET_i2s (peer));
1643 #endif
1644   k = 0;
1645   bm = 0;
1646   if (GNUNET_YES == no_route)
1647     {
1648       bm |= GET_MESSAGE_BIT_RETURN_TO;
1649       k++;      
1650     }
1651   if (pr->namespace != NULL)
1652     {
1653       bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
1654       k++;
1655     }
1656   if (pr->target_pid != 0)
1657     {
1658       bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
1659       k++;
1660     }
1661   msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
1662   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1663   pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
1664   pm->msize = msize;
1665   gm = (struct GetMessage*) &pm[1];
1666   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
1667   gm->header.size = htons (msize);
1668   gm->type = htonl (pr->type);
1669   pr->remaining_priority /= 2;
1670   gm->priority = htonl (pr->remaining_priority);
1671   gm->ttl = htonl (pr->ttl);
1672   gm->filter_mutator = htonl(pr->mingle); 
1673   gm->hash_bitmap = htonl (bm);
1674   gm->query = pr->query;
1675   ext = (GNUNET_HashCode*) &gm[1];
1676   k = 0;
1677   if (GNUNET_YES == no_route)
1678     GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
1679   if (pr->namespace != NULL)
1680     memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
1681   if (pr->target_pid != 0)
1682     GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
1683   bfdata = (char *) &ext[k];
1684   if (pr->bf != NULL)
1685     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
1686                                                bfdata,
1687                                                pr->bf_size);
1688   pm->cont = &transmit_query_continuation;
1689   pm->cont_cls = pr;
1690   add_to_pending_messages_for_peer (cp, pm, pr);
1691 }
1692
1693
1694 /**
1695  * Closure used for "target_peer_select_cb".
1696  */
1697 struct PeerSelectionContext 
1698 {
1699   /**
1700    * The request for which we are selecting
1701    * peers.
1702    */
1703   struct PendingRequest *pr;
1704
1705   /**
1706    * Current "prime" target.
1707    */
1708   struct GNUNET_PeerIdentity target;
1709
1710   /**
1711    * How much do we like this target?
1712    */
1713   double target_score;
1714
1715 };
1716
1717
1718 /**
1719  * Function called for each connected peer to determine
1720  * which one(s) would make good targets for forwarding.
1721  *
1722  * @param cls closure (struct PeerSelectionContext)
1723  * @param key current key code (peer identity)
1724  * @param value value in the hash map (struct ConnectedPeer)
1725  * @return GNUNET_YES if we should continue to
1726  *         iterate,
1727  *         GNUNET_NO if not.
1728  */
1729 static int
1730 target_peer_select_cb (void *cls,
1731                        const GNUNET_HashCode * key,
1732                        void *value)
1733 {
1734   struct PeerSelectionContext *psc = cls;
1735   struct ConnectedPeer *cp = value;
1736   struct PendingRequest *pr = psc->pr;
1737   double score;
1738   unsigned int i;
1739   unsigned int pc;
1740
1741   /* 1) check that this peer is not the initiator */
1742   if (cp == pr->cp)
1743     return GNUNET_YES; /* skip */          
1744
1745   /* 2) check if we have already (recently) forwarded to this peer */
1746   pc = 0;
1747   for (i=0;i<pr->used_pids_off;i++)
1748     if (pr->used_pids[i] == cp->pid) 
1749       {
1750         pc++;
1751         if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1752                                            RETRY_PROBABILITY_INV))
1753           {
1754 #if DEBUG_FS
1755             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1756                         "NOT re-trying query that was previously transmitted %u times\n",
1757                         (unsigned int) pr->used_pids_off);
1758 #endif
1759             return GNUNET_YES; /* skip */
1760           }
1761       }
1762 #if DEBUG_FS
1763   if (0 < pc)
1764     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1765                 "Re-trying query that was previously transmitted %u times to this peer\n",
1766                 (unsigned int) pc);
1767 #endif
1768   // 3) calculate how much we'd like to forward to this peer
1769   score = 42; // FIXME!
1770   // FIXME: also need API to gather data on responsiveness
1771   // of this peer (we have fields for that in 'cp', but
1772   // they are never set!)
1773   
1774   /* store best-fit in closure */
1775   if (score > psc->target_score)
1776     {
1777       psc->target_score = score;
1778       psc->target.hashPubKey = *key; 
1779     }
1780   return GNUNET_YES;
1781 }
1782   
1783
1784 /**
1785  * The priority level imposes a bound on the maximum
1786  * value for the ttl that can be requested.
1787  *
1788  * @param ttl_in requested ttl
1789  * @param prio given priority
1790  * @return ttl_in if ttl_in is below the limit,
1791  *         otherwise the ttl-limit for the given priority
1792  */
1793 static int32_t
1794 bound_ttl (int32_t ttl_in, uint32_t prio)
1795 {
1796   unsigned long long allowed;
1797
1798   if (ttl_in <= 0)
1799     return ttl_in;
1800   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
1801   if (ttl_in > allowed)      
1802     {
1803       if (allowed >= (1 << 30))
1804         return 1 << 30;
1805       return allowed;
1806     }
1807   return ttl_in;
1808 }
1809
1810
1811 /**
1812  * We're processing a GET request from another peer and have decided
1813  * to forward it to other peers.  This function is called periodically
1814  * and should forward the request to other peers until we have all
1815  * possible replies.  If we have transmitted the *only* reply to
1816  * the initiator we should destroy the pending request.  If we have
1817  * many replies in the queue to the initiator, we should delay sending
1818  * out more queries until the reply queue has shrunk some.
1819  *
1820  * @param cls our "struct ProcessGetContext *"
1821  * @param tc unused
1822  */
1823 static void
1824 forward_request_task (void *cls,
1825                      const struct GNUNET_SCHEDULER_TaskContext *tc)
1826 {
1827   struct PendingRequest *pr = cls;
1828   struct PeerSelectionContext psc;
1829   struct ConnectedPeer *cp; 
1830   struct GNUNET_TIME_Relative delay;
1831
1832   pr->task = GNUNET_SCHEDULER_NO_TASK;
1833   if (pr->irc != NULL)
1834     {
1835 #if DEBUG_FS
1836       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1837                   "Forwarding of query `%s' not attempted due to pending local lookup!\n",
1838                   GNUNET_h2s (&pr->query));
1839 #endif
1840       return; /* already pending */
1841     }
1842   if (GNUNET_YES == pr->local_only)
1843     return; /* configured to not do P2P search */
1844   /* (1) select target */
1845   psc.pr = pr;
1846   psc.target_score = DBL_MIN;
1847   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1848                                          &target_peer_select_cb,
1849                                          &psc);  
1850   if (psc.target_score == DBL_MIN)
1851     {
1852       delay = get_processing_delay ();
1853 #if DEBUG_FS 
1854       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1855                   "No peer selected for forwarding of query `%s', will try again in %llu ms!\n",
1856                   GNUNET_h2s (&pr->query),
1857                   delay.value);
1858 #endif
1859       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1860                                                delay,
1861                                                &forward_request_task,
1862                                                pr);
1863       return; /* nobody selected */
1864     }
1865   /* (3) update TTL/priority */
1866   
1867   if (pr->client_request_list != NULL)
1868     {
1869       /* FIXME: use better algorithm!? */
1870       if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1871                                          4))
1872         pr->priority++;
1873       /* FIXME: bound priority by "customary" priority used by other peers
1874          at this time! */
1875       pr->ttl = bound_ttl (pr->ttl + TTL_DECREMENT * 2,
1876                            pr->priority);
1877 #if DEBUG_FS
1878       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1879                   "Trying query `%s' with priority %u and TTL %d.\n",
1880                   GNUNET_h2s (&pr->query),
1881                   pr->priority,
1882                   pr->ttl);
1883 #endif
1884     }
1885   else
1886     {
1887       /* FIXME: should we do something here as well!? */
1888     }
1889
1890   /* (3) reserve reply bandwidth */
1891   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1892                                           &psc.target.hashPubKey);
1893   GNUNET_assert (NULL != cp);
1894   pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
1895                                                 &psc.target,
1896                                                 GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
1897                                                 GNUNET_BANDWIDTH_value_init ((uint32_t) -1 /* no limit */), 
1898                                                 DBLOCK_SIZE * 2, 
1899                                                 (uint64_t) cp->inc_preference,
1900                                                 &target_reservation_cb,
1901                                                 pr);
1902   cp->inc_preference = 0.0;
1903 }
1904
1905
1906 /* **************************** P2P PUT Handling ************************ */
1907
1908
1909 /**
1910  * Function called after we either failed or succeeded
1911  * at transmitting a reply to a peer.  
1912  *
1913  * @param cls the requests "struct PendingRequest*"
1914  * @param tpid ID of receiving peer, 0 on transmission error
1915  */
1916 static void
1917 transmit_reply_continuation (void *cls,
1918                              GNUNET_PEER_Id tpid)
1919 {
1920   struct PendingRequest *pr = cls;
1921   
1922   switch (pr->type)
1923     {
1924     case GNUNET_BLOCK_TYPE_DBLOCK:
1925     case GNUNET_BLOCK_TYPE_IBLOCK:
1926       /* only one reply expected, done with the request! */
1927       destroy_pending_request (pr);
1928       break;
1929     case GNUNET_BLOCK_TYPE_ANY:
1930     case GNUNET_BLOCK_TYPE_KBLOCK:
1931     case GNUNET_BLOCK_TYPE_SBLOCK:
1932       break;
1933     default:
1934       GNUNET_break (0);
1935       break;
1936     }
1937 }
1938
1939
1940 /**
1941  * Transmit the given message by copying it to the target buffer
1942  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
1943  * for writing in the meantime.  In that case, do nothing
1944  * (the disconnect or shutdown handler will take care of the rest).
1945  * If we were able to transmit messages and there are still more
1946  * pending, ask core again for further calls to this function.
1947  *
1948  * @param cls closure, pointer to the 'struct ClientList*'
1949  * @param size number of bytes available in buf
1950  * @param buf where the callee should write the message
1951  * @return number of bytes written to buf
1952  */
1953 static size_t
1954 transmit_to_client (void *cls,
1955                   size_t size, void *buf)
1956 {
1957   struct ClientList *cl = cls;
1958   char *cbuf = buf;
1959   struct ClientResponseMessage *creply;
1960   size_t msize;
1961   
1962   cl->th = NULL;
1963   if (NULL == buf)
1964     {
1965 #if DEBUG_FS
1966       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1967                   "Not sending reply, client communication problem.\n");
1968 #endif
1969       return 0;
1970     }
1971   msize = 0;
1972   while ( (NULL != (creply = cl->res_head) ) &&
1973           (creply->msize <= size) )
1974     {
1975       memcpy (&cbuf[msize], &creply[1], creply->msize);
1976       msize += creply->msize;
1977       size -= creply->msize;
1978       GNUNET_CONTAINER_DLL_remove (cl->res_head,
1979                                    cl->res_tail,
1980                                    creply);
1981       GNUNET_free (creply);
1982     }
1983   if (NULL != creply)
1984     cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
1985                                                   creply->msize,
1986                                                   GNUNET_TIME_UNIT_FOREVER_REL,
1987                                                   &transmit_to_client,
1988                                                   cl);
1989 #if DEBUG_FS
1990   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1991               "Transmitted %u bytes to client\n",
1992               (unsigned int) msize);
1993 #endif
1994   return msize;
1995 }
1996
1997
1998 /**
1999  * Closure for "process_reply" function.
2000  */
2001 struct ProcessReplyClosure
2002 {
2003   /**
2004    * The data for the reply.
2005    */
2006   const void *data;
2007
2008   // FIXME: add 'struct ConnectedPeer' to track 'last_xxx_replies' here!
2009
2010   /**
2011    * When the reply expires.
2012    */
2013   struct GNUNET_TIME_Absolute expiration;
2014
2015   /**
2016    * Size of data.
2017    */
2018   size_t size;
2019
2020   /**
2021    * Namespace that this reply belongs to
2022    * (if it is of type SBLOCK).
2023    */
2024   GNUNET_HashCode namespace;
2025
2026   /**
2027    * Type of the block.
2028    */
2029   enum GNUNET_BLOCK_Type type;
2030
2031   /**
2032    * How much was this reply worth to us?
2033    */
2034   uint32_t priority;
2035 };
2036
2037
2038 /**
2039  * We have received a reply; handle it!
2040  *
2041  * @param cls response (struct ProcessReplyClosure)
2042  * @param key our query
2043  * @param value value in the hash map (info about the query)
2044  * @return GNUNET_YES (we should continue to iterate)
2045  */
2046 static int
2047 process_reply (void *cls,
2048                const GNUNET_HashCode * key,
2049                void *value)
2050 {
2051   struct ProcessReplyClosure *prq = cls;
2052   struct PendingRequest *pr = value;
2053   struct PendingMessage *reply;
2054   struct ClientResponseMessage *creply;
2055   struct ClientList *cl;
2056   struct PutMessage *pm;
2057   struct ConnectedPeer *cp;
2058   GNUNET_HashCode chash;
2059   GNUNET_HashCode mhash;
2060   size_t msize;
2061
2062 #if DEBUG_FS
2063   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2064               "Matched result (type %u) for query `%s' with pending request\n",
2065               (unsigned int) prq->type,
2066               GNUNET_h2s (key));
2067 #endif  
2068   GNUNET_STATISTICS_update (stats,
2069                             gettext_noop ("# replies received and matched"),
2070                             1,
2071                             GNUNET_NO);
2072   GNUNET_CRYPTO_hash (prq->data,
2073                       prq->size,
2074                       &chash);
2075   switch (prq->type)
2076     {
2077     case GNUNET_BLOCK_TYPE_DBLOCK:
2078     case GNUNET_BLOCK_TYPE_IBLOCK:
2079       /* only possible reply, stop requesting! */
2080       while (NULL != pr->pending_head)
2081         destroy_pending_message_list_entry (pr->pending_head);
2082       if (pr->qe != NULL)
2083         {
2084           if (pr->client_request_list != NULL)
2085             GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
2086                                         GNUNET_YES);
2087           GNUNET_DATASTORE_cancel (pr->qe);
2088           pr->qe = NULL;
2089         }
2090       pr->do_remove = GNUNET_YES;
2091       if (pr->task != GNUNET_SCHEDULER_NO_TASK)
2092         {
2093           GNUNET_SCHEDULER_cancel (sched,
2094                                    pr->task);
2095           pr->task = GNUNET_SCHEDULER_NO_TASK;
2096         }
2097       GNUNET_break (GNUNET_YES ==
2098                     GNUNET_CONTAINER_multihashmap_remove (query_request_map,
2099                                                           key,
2100                                                           pr));
2101       break;
2102     case GNUNET_BLOCK_TYPE_SBLOCK:
2103       if (pr->namespace == NULL)
2104         {
2105           GNUNET_break (0);
2106           return GNUNET_YES;
2107         }
2108       if (0 != memcmp (pr->namespace,
2109                        &prq->namespace,
2110                        sizeof (GNUNET_HashCode)))
2111         {
2112           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2113                       _("Reply mismatched in terms of namespace.  Discarded.\n"));
2114           return GNUNET_YES; /* wrong namespace */      
2115         }
2116       /* then: fall-through! */
2117     case GNUNET_BLOCK_TYPE_KBLOCK:
2118     case GNUNET_BLOCK_TYPE_NBLOCK:
2119       if (pr->bf != NULL) 
2120         {
2121           mingle_hash (&chash, pr->mingle, &mhash);
2122           if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (pr->bf,
2123                                                                &mhash))
2124             {
2125               GNUNET_STATISTICS_update (stats,
2126                                         gettext_noop ("# duplicate replies discarded (bloomfilter)"),
2127                                         1,
2128                                         GNUNET_NO);
2129 #if DEBUG_FS
2130               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2131                           "Duplicate response `%s', discarding.\n",
2132                           GNUNET_h2s (&mhash));
2133 #endif
2134               return GNUNET_YES; /* duplicate */
2135             }
2136 #if DEBUG_FS
2137           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2138                       "New response `%s', adding to filter.\n",
2139                       GNUNET_h2s (&mhash));
2140 #endif
2141         }
2142       if (pr->client_request_list != NULL)
2143         {
2144           if (pr->replies_seen_size == pr->replies_seen_off)
2145             GNUNET_array_grow (pr->replies_seen,
2146                                pr->replies_seen_size,
2147                                pr->replies_seen_size * 2 + 4);  
2148             pr->replies_seen[pr->replies_seen_off++] = chash;         
2149         }
2150       if ( (pr->bf == NULL) ||
2151            (pr->client_request_list != NULL) )
2152         refresh_bloomfilter (pr);
2153       GNUNET_CONTAINER_bloomfilter_add (pr->bf,
2154                                         &mhash);
2155       break;
2156     default:
2157       GNUNET_break (0);
2158       return GNUNET_YES;
2159     }
2160   prq->priority += pr->remaining_priority;
2161   pr->remaining_priority = 0;
2162   if (pr->client_request_list != NULL)
2163     {
2164       GNUNET_STATISTICS_update (stats,
2165                                 gettext_noop ("# replies received for local clients"),
2166                                 1,
2167                                 GNUNET_NO);
2168       cl = pr->client_request_list->client_list;
2169       msize = sizeof (struct PutMessage) + prq->size;
2170       creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage));
2171       creply->msize = msize;
2172       creply->client_list = cl;
2173       GNUNET_CONTAINER_DLL_insert_after (cl->res_head,
2174                                          cl->res_tail,
2175                                          cl->res_tail,
2176                                          creply);      
2177       pm = (struct PutMessage*) &creply[1];
2178       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2179       pm->header.size = htons (msize);
2180       pm->type = htonl (prq->type);
2181       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
2182       memcpy (&pm[1], prq->data, prq->size);      
2183       if (NULL == cl->th)
2184         {
2185 #if DEBUG_FS
2186           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2187                       "Transmitting result for query `%s' to client\n",
2188                       GNUNET_h2s (key));
2189 #endif  
2190           cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
2191                                                         msize,
2192                                                         GNUNET_TIME_UNIT_FOREVER_REL,
2193                                                         &transmit_to_client,
2194                                                         cl);
2195         }
2196       GNUNET_break (cl->th != NULL);
2197       if (pr->do_remove)                
2198         destroy_pending_request (pr);           
2199     }
2200   else
2201     {
2202       cp = pr->cp;
2203 #if DEBUG_FS
2204       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2205                   "Transmitting result for query `%s' to other peer (PID=%u)\n",
2206                   GNUNET_h2s (key),
2207                   (unsigned int) cp->pid);
2208 #endif  
2209       GNUNET_STATISTICS_update (stats,
2210                                 gettext_noop ("# replies received for other peers"),
2211                                 1,
2212                                 GNUNET_NO);
2213       msize = sizeof (struct PutMessage) + prq->size;
2214       reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
2215       reply->cont = &transmit_reply_continuation;
2216       reply->cont_cls = pr;
2217       reply->msize = msize;
2218       reply->priority = (uint32_t) -1; /* send replies first! */
2219       pm = (struct PutMessage*) &reply[1];
2220       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2221       pm->header.size = htons (msize);
2222       pm->type = htonl (prq->type);
2223       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
2224       memcpy (&pm[1], prq->data, prq->size);
2225       add_to_pending_messages_for_peer (cp, reply, pr);
2226     }
2227   // FIXME: implement hot-path routing statistics keeping!
2228   return GNUNET_YES;
2229 }
2230
2231
2232
2233 /**
2234  * Continuation called to notify client about result of the
2235  * operation.
2236  *
2237  * @param cls closure
2238  * @param success GNUNET_SYSERR on failure
2239  * @param msg NULL on success, otherwise an error message
2240  */
2241 static void 
2242 put_migration_continuation (void *cls,
2243                             int success,
2244                             const char *msg)
2245 {
2246   /* FIXME */
2247 }
2248
2249
2250 /**
2251  * Handle P2P "PUT" message.
2252  *
2253  * @param cls closure, always NULL
2254  * @param other the other peer involved (sender or receiver, NULL
2255  *        for loopback messages where we are both sender and receiver)
2256  * @param message the actual message
2257  * @param latency reported latency of the connection with 'other'
2258  * @param distance reported distance (DV) to 'other' 
2259  * @return GNUNET_OK to keep the connection open,
2260  *         GNUNET_SYSERR to close it (signal serious error)
2261  */
2262 static int
2263 handle_p2p_put (void *cls,
2264                 const struct GNUNET_PeerIdentity *other,
2265                 const struct GNUNET_MessageHeader *message,
2266                 struct GNUNET_TIME_Relative latency,
2267                 uint32_t distance)
2268 {
2269   const struct PutMessage *put;
2270   uint16_t msize;
2271   size_t dsize;
2272   enum GNUNET_BLOCK_Type type;
2273   struct GNUNET_TIME_Absolute expiration;
2274   GNUNET_HashCode query;
2275   struct ProcessReplyClosure prq;
2276   const struct SBlock *sb;
2277
2278   msize = ntohs (message->size);
2279   if (msize < sizeof (struct PutMessage))
2280     {
2281       GNUNET_break_op(0);
2282       return GNUNET_SYSERR;
2283     }
2284   put = (const struct PutMessage*) message;
2285   dsize = msize - sizeof (struct PutMessage);
2286   type = ntohl (put->type);
2287   expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
2288
2289   if (GNUNET_OK !=
2290       GNUNET_BLOCK_check_block (type,
2291                                 &put[1],
2292                                 dsize,
2293                                 &query))
2294     {
2295       GNUNET_break_op (0);
2296       return GNUNET_SYSERR;
2297     }
2298   if (type == GNUNET_BLOCK_TYPE_ONDEMAND)
2299     return GNUNET_SYSERR;
2300   if (GNUNET_BLOCK_TYPE_SBLOCK == type)
2301     { 
2302       sb = (const struct SBlock*) &put[1];
2303       GNUNET_CRYPTO_hash (&sb->subspace,
2304                           sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
2305                           &prq.namespace);
2306     }
2307
2308 #if DEBUG_FS
2309   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2310               "Received result for query `%s' from peer `%4s'\n",
2311               GNUNET_h2s (&query),
2312               GNUNET_i2s (other));
2313 #endif
2314   GNUNET_STATISTICS_update (stats,
2315                             gettext_noop ("# replies received (overall)"),
2316                             1,
2317                             GNUNET_NO);
2318   /* now, lookup 'query' */
2319   prq.data = (const void*) &put[1];
2320   prq.size = dsize;
2321   prq.type = type;
2322   prq.expiration = expiration;
2323   prq.priority = 0;
2324   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
2325                                               &query,
2326                                               &process_reply,
2327                                               &prq);
2328   if (GNUNET_YES == active_migration)
2329     {
2330       GNUNET_DATASTORE_put (dsh,
2331                             0, &query, dsize, &put[1],
2332                             type, prq.priority, 1 /* anonymity */, 
2333                             expiration, 
2334                             1 + prq.priority, MAX_DATASTORE_QUEUE,
2335                             GNUNET_CONSTANTS_SERVICE_TIMEOUT,
2336                             &put_migration_continuation, 
2337                             NULL);
2338     }
2339   return GNUNET_OK;
2340 }
2341
2342
2343 /* **************************** P2P GET Handling ************************ */
2344
2345
2346 /**
2347  * Closure for 'check_duplicate_request_{peer,client}'.
2348  */
2349 struct CheckDuplicateRequestClosure
2350 {
2351   /**
2352    * The new request we should check if it already exists.
2353    */
2354   const struct PendingRequest *pr;
2355
2356   /**
2357    * Existing request found by the checker, NULL if none.
2358    */
2359   struct PendingRequest *have;
2360 };
2361
2362
2363 /**
2364  * Iterator over entries in the 'query_request_map' that
2365  * tries to see if we have the same request pending from
2366  * the same client already.
2367  *
2368  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
2369  * @param key current key code (query, ignored, must match)
2370  * @param value value in the hash map (a 'struct PendingRequest' 
2371  *              that already exists)
2372  * @return GNUNET_YES if we should continue to
2373  *         iterate (no match yet)
2374  *         GNUNET_NO if not (match found).
2375  */
2376 static int
2377 check_duplicate_request_client (void *cls,
2378                                 const GNUNET_HashCode * key,
2379                                 void *value)
2380 {
2381   struct CheckDuplicateRequestClosure *cdc = cls;
2382   struct PendingRequest *have = value;
2383
2384   if (have->client_request_list == NULL)
2385     return GNUNET_YES;
2386   if ( (cdc->pr->client_request_list->client_list->client == have->client_request_list->client_list->client) &&
2387        (cdc->pr != have) )
2388     {
2389       cdc->have = have;
2390       return GNUNET_NO;
2391     }
2392   return GNUNET_YES;
2393 }
2394
2395
2396 /**
2397  * We're processing (local) results for a search request
2398  * from another peer.  Pass applicable results to the
2399  * peer and if we are done either clean up (operation
2400  * complete) or forward to other peers (more results possible).
2401  *
2402  * @param cls our closure (struct LocalGetContext)
2403  * @param key key for the content
2404  * @param size number of bytes in data
2405  * @param data content stored
2406  * @param type type of the content
2407  * @param priority priority of the content
2408  * @param anonymity anonymity-level for the content
2409  * @param expiration expiration time for the content
2410  * @param uid unique identifier for the datum;
2411  *        maybe 0 if no unique identifier is available
2412  */
2413 static void
2414 process_local_reply (void *cls,
2415                      const GNUNET_HashCode * key,
2416                      uint32_t size,
2417                      const void *data,
2418                      enum GNUNET_BLOCK_Type type,
2419                      uint32_t priority,
2420                      uint32_t anonymity,
2421                      struct GNUNET_TIME_Absolute
2422                      expiration, 
2423                      uint64_t uid)
2424 {
2425   struct PendingRequest *pr = cls;
2426   struct ProcessReplyClosure prq;
2427   struct CheckDuplicateRequestClosure cdrc;
2428   const struct SBlock *sb;
2429   GNUNET_HashCode dhash;
2430   GNUNET_HashCode mhash;
2431   GNUNET_HashCode query;
2432   
2433   if (NULL == key)
2434     {
2435 #if DEBUG_FS > 1
2436       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2437                   "Done processing local replies, forwarding request to other peers.\n");
2438 #endif
2439       pr->qe = NULL;
2440       if (pr->client_request_list != NULL)
2441         {
2442           GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
2443                                       GNUNET_YES);
2444           /* Figure out if this is a duplicate request and possibly
2445              merge 'struct PendingRequest' entries */
2446           cdrc.have = NULL;
2447           cdrc.pr = pr;
2448           GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
2449                                                       &pr->query,
2450                                                       &check_duplicate_request_client,
2451                                                       &cdrc);
2452           if (cdrc.have != NULL)
2453             {
2454 #if DEBUG_FS
2455               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2456                           "Received request for block `%s' twice from client, will only request once.\n",
2457                           GNUNET_h2s (&pr->query));
2458 #endif
2459               
2460               destroy_pending_request (pr);
2461               return;
2462             }
2463         }
2464
2465       /* no more results */
2466       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2467         pr->task = GNUNET_SCHEDULER_add_now (sched,
2468                                              &forward_request_task,
2469                                              pr);      
2470       return;
2471     }
2472 #if DEBUG_FS
2473   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2474               "New local response to `%s' of type %u.\n",
2475               GNUNET_h2s (key),
2476               type);
2477 #endif
2478   if (type == GNUNET_BLOCK_TYPE_ONDEMAND)
2479     {
2480 #if DEBUG_FS
2481       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2482                   "Found ONDEMAND block, performing on-demand encoding\n");
2483 #endif
2484       GNUNET_STATISTICS_update (stats,
2485                                 gettext_noop ("# on-demand blocks matched requests"),
2486                                 1,
2487                                 GNUNET_NO);
2488       if (GNUNET_OK != 
2489           GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, 
2490                                             anonymity, expiration, uid, 
2491                                             &process_local_reply,
2492                                             pr))
2493       if (pr->qe != NULL)
2494         GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2495       return;
2496     }
2497   /* check for duplicates */
2498   GNUNET_CRYPTO_hash (data, size, &dhash);
2499   mingle_hash (&dhash, 
2500                pr->mingle,
2501                &mhash);
2502   if ( (pr->bf != NULL) &&
2503        (GNUNET_YES ==
2504         GNUNET_CONTAINER_bloomfilter_test (pr->bf,
2505                                            &mhash)) )
2506     {      
2507 #if DEBUG_FS
2508       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2509                   "Result from datastore filtered by bloomfilter (duplicate).\n");
2510 #endif
2511       GNUNET_STATISTICS_update (stats,
2512                                 gettext_noop ("# results filtered by query bloomfilter"),
2513                                 1,
2514                                 GNUNET_NO);
2515       if (pr->qe != NULL)
2516         GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2517       return;
2518     }
2519 #if DEBUG_FS
2520   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2521               "Found result for query `%s' in local datastore\n",
2522               GNUNET_h2s (key));
2523 #endif
2524   GNUNET_STATISTICS_update (stats,
2525                             gettext_noop ("# results found locally"),
2526                             1,
2527                             GNUNET_NO);
2528   pr->results_found++;
2529   memset (&prq, 0, sizeof (prq));
2530   prq.data = data;
2531   prq.expiration = expiration;
2532   prq.size = size;  
2533   if (GNUNET_BLOCK_TYPE_SBLOCK == type)
2534     { 
2535       sb = (const struct SBlock*) data;
2536       GNUNET_CRYPTO_hash (&sb->subspace,
2537                           sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
2538                           &prq.namespace);
2539     }
2540   if (GNUNET_OK != GNUNET_BLOCK_check_block (type,
2541                                              data,
2542                                              size,
2543                                              &query))
2544     {
2545       GNUNET_break (0);
2546       GNUNET_DATASTORE_remove (dsh,
2547                                key,
2548                                size, data,
2549                                -1, -1, 
2550                                GNUNET_TIME_UNIT_FOREVER_REL,
2551                                NULL, NULL);
2552       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2553       return;
2554     }
2555   prq.type = type;
2556   prq.priority = priority;  
2557   process_reply (&prq, key, pr);
2558
2559   if ( (type == GNUNET_BLOCK_TYPE_DBLOCK) ||
2560        (type == GNUNET_BLOCK_TYPE_IBLOCK) ) 
2561     {
2562       if (pr->qe != NULL)
2563         GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
2564       return;
2565     }
2566   if ( (pr->client_request_list == NULL) &&
2567        ( (GNUNET_YES == test_load_too_high()) ||
2568          (pr->results_found > 5 + 2 * pr->priority) ) )
2569     {
2570 #if DEBUG_FS > 2
2571       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2572                   "Load too high, done with request\n");
2573 #endif
2574       GNUNET_STATISTICS_update (stats,
2575                                 gettext_noop ("# processing result set cut short due to load"),
2576                                 1,
2577                                 GNUNET_NO);
2578       if (pr->qe != NULL)
2579         GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
2580       return;
2581     }
2582   if (pr->qe != NULL)
2583     GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2584 }
2585
2586
2587 /**
2588  * We've received a request with the specified priority.  Bound it
2589  * according to how much we trust the given peer.
2590  * 
2591  * @param prio_in requested priority
2592  * @param cp the peer making the request
2593  * @return effective priority
2594  */
2595 static uint32_t
2596 bound_priority (uint32_t prio_in,
2597                 struct ConnectedPeer *cp)
2598 {
2599   return 0; // FIXME!
2600 }
2601
2602
2603 /**
2604  * Iterator over entries in the 'query_request_map' that
2605  * tries to see if we have the same request pending from
2606  * the same peer already.
2607  *
2608  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
2609  * @param key current key code (query, ignored, must match)
2610  * @param value value in the hash map (a 'struct PendingRequest' 
2611  *              that already exists)
2612  * @return GNUNET_YES if we should continue to
2613  *         iterate (no match yet)
2614  *         GNUNET_NO if not (match found).
2615  */
2616 static int
2617 check_duplicate_request_peer (void *cls,
2618                               const GNUNET_HashCode * key,
2619                               void *value)
2620 {
2621   struct CheckDuplicateRequestClosure *cdc = cls;
2622   struct PendingRequest *have = value;
2623
2624   if (cdc->pr->target_pid == have->target_pid)
2625     {
2626       cdc->have = have;
2627       return GNUNET_NO;
2628     }
2629   return GNUNET_YES;
2630 }
2631
2632
2633 /**
2634  * Handle P2P "GET" request.
2635  *
2636  * @param cls closure, always NULL
2637  * @param other the other peer involved (sender or receiver, NULL
2638  *        for loopback messages where we are both sender and receiver)
2639  * @param message the actual message
2640  * @param latency reported latency of the connection with 'other'
2641  * @param distance reported distance (DV) to 'other' 
2642  * @return GNUNET_OK to keep the connection open,
2643  *         GNUNET_SYSERR to close it (signal serious error)
2644  */
2645 static int
2646 handle_p2p_get (void *cls,
2647                 const struct GNUNET_PeerIdentity *other,
2648                 const struct GNUNET_MessageHeader *message,
2649                 struct GNUNET_TIME_Relative latency,
2650                 uint32_t distance)
2651 {
2652   struct PendingRequest *pr;
2653   struct ConnectedPeer *cp;
2654   struct ConnectedPeer *cps;
2655   struct CheckDuplicateRequestClosure cdc;
2656   struct GNUNET_TIME_Relative timeout;
2657   uint16_t msize;
2658   const struct GetMessage *gm;
2659   unsigned int bits;
2660   const GNUNET_HashCode *opt;
2661   uint32_t bm;
2662   size_t bfsize;
2663   uint32_t ttl_decrement;
2664   enum GNUNET_BLOCK_Type type;
2665   double preference;
2666   int have_ns;
2667
2668   msize = ntohs(message->size);
2669   if (msize < sizeof (struct GetMessage))
2670     {
2671       GNUNET_break_op (0);
2672       return GNUNET_SYSERR;
2673     }
2674   gm = (const struct GetMessage*) message;
2675   type = ntohl (gm->type);
2676   switch (type)
2677     {
2678     case GNUNET_BLOCK_TYPE_ANY:
2679     case GNUNET_BLOCK_TYPE_DBLOCK:
2680     case GNUNET_BLOCK_TYPE_IBLOCK:
2681     case GNUNET_BLOCK_TYPE_KBLOCK:
2682     case GNUNET_BLOCK_TYPE_SBLOCK:
2683       break;
2684     default:
2685       GNUNET_break_op (0);
2686       return GNUNET_SYSERR;
2687     }
2688   bm = ntohl (gm->hash_bitmap);
2689   bits = 0;
2690   while (bm > 0)
2691     {
2692       if (1 == (bm & 1))
2693         bits++;
2694       bm >>= 1;
2695     }
2696   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
2697     {
2698       GNUNET_break_op (0);
2699       return GNUNET_SYSERR;
2700     }  
2701   opt = (const GNUNET_HashCode*) &gm[1];
2702   bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
2703   bm = ntohl (gm->hash_bitmap);
2704   if ( (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) &&
2705        (type != GNUNET_BLOCK_TYPE_SBLOCK) )
2706     {
2707       GNUNET_break_op (0);
2708       return GNUNET_SYSERR;      
2709     }
2710   bits = 0;
2711   cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2712                                            &other->hashPubKey);
2713   if (NULL == cps)
2714     {
2715       /* peer must have just disconnected */
2716       GNUNET_STATISTICS_update (stats,
2717                                 gettext_noop ("# requests dropped due to initiator not being connected"),
2718                                 1,
2719                                 GNUNET_NO);
2720       return GNUNET_SYSERR;
2721     }
2722   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
2723     cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2724                                             &opt[bits++]);
2725   else
2726     cp = cps;
2727   if (cp == NULL)
2728     {
2729 #if DEBUG_FS
2730       if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
2731         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2732                     "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
2733                     GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
2734       
2735       else
2736         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2737                     "Failed to find peer `%4s' in connection set. Dropping query.\n",
2738                     GNUNET_i2s (other));
2739 #endif
2740       GNUNET_STATISTICS_update (stats,
2741                                 gettext_noop ("# requests dropped due to missing reverse route"),
2742                                 1,
2743                                 GNUNET_NO);
2744      /* FIXME: try connect? */
2745       return GNUNET_OK;
2746     }
2747   /* note that we can really only check load here since otherwise
2748      peers could find out that we are overloaded by not being
2749      disconnected after sending us a malformed query... */
2750   if (GNUNET_YES == test_load_too_high ())
2751     {
2752 #if DEBUG_FS
2753       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2754                   "Dropping query from `%s', this peer is too busy.\n",
2755                   GNUNET_i2s (other));
2756 #endif
2757       GNUNET_STATISTICS_update (stats,
2758                                 gettext_noop ("# requests dropped due to high load"),
2759                                 1,
2760                                 GNUNET_NO);
2761       return GNUNET_OK;
2762     }
2763
2764 #if DEBUG_FS 
2765   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2766               "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
2767               GNUNET_h2s (&gm->query),
2768               (unsigned int) type,
2769               GNUNET_i2s (other),
2770               (unsigned int) bm);
2771 #endif
2772   have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
2773   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
2774                       (have_ns ? sizeof(GNUNET_HashCode) : 0));
2775   if (have_ns)
2776     pr->namespace = (GNUNET_HashCode*) &pr[1];
2777   pr->type = type;
2778   pr->mingle = ntohl (gm->filter_mutator);
2779   if (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE))    
2780     memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
2781   if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
2782     pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
2783
2784   pr->anonymity_level = 1;
2785   pr->priority = bound_priority (ntohl (gm->priority), cps);
2786   pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
2787   pr->query = gm->query;
2788   /* decrement ttl (always) */
2789   ttl_decrement = 2 * TTL_DECREMENT +
2790     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2791                               TTL_DECREMENT);
2792   if ( (pr->ttl < 0) &&
2793        (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
2794     {
2795 #if DEBUG_FS
2796       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2797                   "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
2798                   GNUNET_i2s (other),
2799                   pr->ttl,
2800                   ttl_decrement);
2801 #endif
2802       GNUNET_STATISTICS_update (stats,
2803                                 gettext_noop ("# requests dropped due TTL underflow"),
2804                                 1,
2805                                 GNUNET_NO);
2806       /* integer underflow => drop (should be very rare)! */
2807       GNUNET_free (pr);
2808       return GNUNET_OK;
2809     } 
2810   pr->ttl -= ttl_decrement;
2811   pr->start_time = GNUNET_TIME_absolute_get ();
2812
2813   /* get bloom filter */
2814   if (bfsize > 0)
2815     {
2816       pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
2817                                                   bfsize,
2818                                                   BLOOMFILTER_K);
2819       pr->bf_size = bfsize;
2820     }
2821
2822   cdc.have = NULL;
2823   cdc.pr = pr;
2824   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
2825                                               &gm->query,
2826                                               &check_duplicate_request_peer,
2827                                               &cdc);
2828   if (cdc.have != NULL)
2829     {
2830       if (cdc.have->start_time.value + cdc.have->ttl >=
2831           pr->start_time.value + pr->ttl)
2832         {
2833           /* existing request has higher TTL, drop new one! */
2834           cdc.have->priority += pr->priority;
2835           destroy_pending_request (pr);
2836 #if DEBUG_FS
2837           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2838                       "Have existing request with higher TTL, dropping new request.\n",
2839                       GNUNET_i2s (other));
2840 #endif
2841           GNUNET_STATISTICS_update (stats,
2842                                     gettext_noop ("# requests dropped due to higher-TTL request"),
2843                                     1,
2844                                     GNUNET_NO);
2845           return GNUNET_OK;
2846         }
2847       else
2848         {
2849           /* existing request has lower TTL, drop old one! */
2850           pr->priority += cdc.have->priority;
2851           /* Possible optimization: if we have applicable pending
2852              replies in 'cdc.have', we might want to move those over
2853              (this is a really rare special-case, so it is not clear
2854              that this would be worth it) */
2855           destroy_pending_request (cdc.have);
2856           /* keep processing 'pr'! */
2857         }
2858     }
2859
2860   pr->cp = cp;
2861   GNUNET_break (GNUNET_OK ==
2862                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
2863                                                    &gm->query,
2864                                                    pr,
2865                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
2866   GNUNET_break (GNUNET_OK ==
2867                 GNUNET_CONTAINER_multihashmap_put (peer_request_map,
2868                                                    &other->hashPubKey,
2869                                                    pr,
2870                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
2871   
2872   pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
2873                                             pr,
2874                                             pr->start_time.value + pr->ttl);
2875
2876   GNUNET_STATISTICS_update (stats,
2877                             gettext_noop ("# P2P searches received"),
2878                             1,
2879                             GNUNET_NO);
2880   GNUNET_STATISTICS_update (stats,
2881                             gettext_noop ("# P2P searches active"),
2882                             1,
2883                             GNUNET_NO);
2884
2885   /* calculate change in traffic preference */
2886   preference = (double) pr->priority;
2887   if (preference < QUERY_BANDWIDTH_VALUE)
2888     preference = QUERY_BANDWIDTH_VALUE;
2889   cps->inc_preference += preference;
2890
2891   /* process locally */
2892   if (type == GNUNET_BLOCK_TYPE_DBLOCK)
2893     type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
2894   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
2895                                            (pr->priority + 1)); 
2896   pr->qe = GNUNET_DATASTORE_get (dsh,
2897                                  &gm->query,
2898                                  type,                         
2899                                  pr->priority + 1,
2900                                  MAX_DATASTORE_QUEUE,                            
2901                                  timeout,
2902                                  &process_local_reply,
2903                                  pr);
2904
2905   /* Are multiple results possible?  If so, start processing remotely now! */
2906   switch (pr->type)
2907     {
2908     case GNUNET_BLOCK_TYPE_DBLOCK:
2909     case GNUNET_BLOCK_TYPE_IBLOCK:
2910       /* only one result, wait for datastore */
2911       break;
2912     default:
2913       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2914         pr->task = GNUNET_SCHEDULER_add_now (sched,
2915                                              &forward_request_task,
2916                                              pr);
2917     }
2918
2919   /* make sure we don't track too many requests */
2920   if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
2921     {
2922       pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
2923       destroy_pending_request (pr);
2924     }
2925   return GNUNET_OK;
2926 }
2927
2928
2929 /* **************************** CS GET Handling ************************ */
2930
2931
2932 /**
2933  * Handle START_SEARCH-message (search request from client).
2934  *
2935  * @param cls closure
2936  * @param client identification of the client
2937  * @param message the actual message
2938  */
2939 static void
2940 handle_start_search (void *cls,
2941                      struct GNUNET_SERVER_Client *client,
2942                      const struct GNUNET_MessageHeader *message)
2943 {
2944   static GNUNET_HashCode all_zeros;
2945   const struct SearchMessage *sm;
2946   struct ClientList *cl;
2947   struct ClientRequestList *crl;
2948   struct PendingRequest *pr;
2949   uint16_t msize;
2950   unsigned int sc;
2951   enum GNUNET_BLOCK_Type type;
2952
2953   msize = ntohs (message->size);
2954   if ( (msize < sizeof (struct SearchMessage)) ||
2955        (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
2956     {
2957       GNUNET_break (0);
2958       GNUNET_SERVER_receive_done (client,
2959                                   GNUNET_SYSERR);
2960       return;
2961     }
2962   GNUNET_STATISTICS_update (stats,
2963                             gettext_noop ("# client searches received"),
2964                             1,
2965                             GNUNET_NO);
2966   sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
2967   sm = (const struct SearchMessage*) message;
2968   type = ntohl (sm->type);
2969 #if DEBUG_FS
2970   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2971               "Received request for `%s' of type %u from local client\n",
2972               GNUNET_h2s (&sm->query),
2973               (unsigned int) type);
2974 #endif
2975   switch (type)
2976     {
2977     case GNUNET_BLOCK_TYPE_ANY:
2978     case GNUNET_BLOCK_TYPE_DBLOCK:
2979     case GNUNET_BLOCK_TYPE_IBLOCK:
2980     case GNUNET_BLOCK_TYPE_KBLOCK:
2981     case GNUNET_BLOCK_TYPE_SBLOCK:
2982     case GNUNET_BLOCK_TYPE_NBLOCK:
2983       break;
2984     default:
2985       GNUNET_break (0);
2986       GNUNET_SERVER_receive_done (client,
2987                                   GNUNET_SYSERR);
2988       return;
2989     }  
2990
2991   cl = client_list;
2992   while ( (cl != NULL) &&
2993           (cl->client != client) )
2994     cl = cl->next;
2995   if (cl == NULL)
2996     {
2997       cl = GNUNET_malloc (sizeof (struct ClientList));
2998       cl->client = client;
2999       GNUNET_SERVER_client_keep (client);
3000       cl->next = client_list;
3001       client_list = cl;
3002     }
3003   /* detect duplicate KBLOCK requests */
3004   if ( (type == GNUNET_BLOCK_TYPE_KBLOCK) ||
3005        (type == GNUNET_BLOCK_TYPE_NBLOCK) ||
3006        (type == GNUNET_BLOCK_TYPE_ANY) )
3007     {
3008       crl = cl->rl_head;
3009       while ( (crl != NULL) &&
3010               ( (0 != memcmp (&crl->req->query,
3011                               &sm->query,
3012                               sizeof (GNUNET_HashCode))) ||
3013                 (crl->req->type != type) ) )
3014         crl = crl->next;
3015       if (crl != NULL)  
3016         { 
3017 #if DEBUG_FS
3018           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3019                       "Have existing request, merging content-seen lists.\n");
3020 #endif
3021           pr = crl->req;
3022           /* Duplicate request (used to send long list of
3023              known/blocked results); merge 'pr->replies_seen'
3024              and update bloom filter */
3025           GNUNET_array_grow (pr->replies_seen,
3026                              pr->replies_seen_size,
3027                              pr->replies_seen_off + sc);
3028           memcpy (&pr->replies_seen[pr->replies_seen_off],
3029                   &sm[1],
3030                   sc * sizeof (GNUNET_HashCode));
3031           pr->replies_seen_off += sc;
3032           refresh_bloomfilter (pr);
3033           GNUNET_STATISTICS_update (stats,
3034                                     gettext_noop ("# client searches updated (merged content seen list)"),
3035                                     1,
3036                                     GNUNET_NO);
3037           GNUNET_SERVER_receive_done (client,
3038                                       GNUNET_OK);
3039           return;
3040         }
3041     }
3042   GNUNET_STATISTICS_update (stats,
3043                             gettext_noop ("# client searches active"),
3044                             1,
3045                             GNUNET_NO);
3046   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
3047                       ((type == GNUNET_BLOCK_TYPE_SBLOCK) ? sizeof(GNUNET_HashCode) : 0));
3048   crl = GNUNET_malloc (sizeof (struct ClientRequestList));
3049   memset (crl, 0, sizeof (struct ClientRequestList));
3050   crl->client_list = cl;
3051   GNUNET_CONTAINER_DLL_insert (cl->rl_head,
3052                                cl->rl_tail,
3053                                crl);  
3054   crl->req = pr;
3055   pr->type = type;
3056   pr->client_request_list = crl;
3057   GNUNET_array_grow (pr->replies_seen,
3058                      pr->replies_seen_size,
3059                      sc);
3060   memcpy (pr->replies_seen,
3061           &sm[1],
3062           sc * sizeof (GNUNET_HashCode));
3063   pr->replies_seen_off = sc;
3064   pr->anonymity_level = ntohl (sm->anonymity_level); 
3065   refresh_bloomfilter (pr);
3066   pr->query = sm->query;
3067   if (0 == (1 & ntohl (sm->options)))
3068     pr->local_only = GNUNET_NO;
3069   else
3070     pr->local_only = GNUNET_YES;
3071   switch (type)
3072     {
3073     case GNUNET_BLOCK_TYPE_DBLOCK:
3074     case GNUNET_BLOCK_TYPE_IBLOCK:
3075       if (0 != memcmp (&sm->target,
3076                        &all_zeros,
3077                        sizeof (GNUNET_HashCode)))
3078         pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target);
3079       break;
3080     case GNUNET_BLOCK_TYPE_SBLOCK:
3081       pr->namespace = (GNUNET_HashCode*) &pr[1];
3082       memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode));
3083       break;
3084     default:
3085       break;
3086     }
3087   GNUNET_break (GNUNET_OK ==
3088                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
3089                                                    &sm->query,
3090                                                    pr,
3091                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
3092   if (type == GNUNET_BLOCK_TYPE_DBLOCK)
3093     type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */
3094   pr->qe = GNUNET_DATASTORE_get (dsh,
3095                                  &sm->query,
3096                                  type,
3097                                  -3, -1,
3098                                  GNUNET_CONSTANTS_SERVICE_TIMEOUT,                             
3099                                  &process_local_reply,
3100                                  pr);
3101 }
3102
3103
3104 /* **************************** Startup ************************ */
3105
3106
3107 /**
3108  * List of handlers for P2P messages
3109  * that we care about.
3110  */
3111 static struct GNUNET_CORE_MessageHandler p2p_handlers[] =
3112   {
3113     { &handle_p2p_get, 
3114       GNUNET_MESSAGE_TYPE_FS_GET, 0 },
3115     { &handle_p2p_put, 
3116       GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
3117     { NULL, 0, 0 }
3118   };
3119
3120
3121 /**
3122  * List of handlers for the messages understood by this
3123  * service.
3124  */
3125 static struct GNUNET_SERVER_MessageHandler handlers[] = {
3126   {&GNUNET_FS_handle_index_start, NULL, 
3127    GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
3128   {&GNUNET_FS_handle_index_list_get, NULL, 
3129    GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
3130   {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
3131    sizeof (struct UnindexMessage) },
3132   {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
3133    0 },
3134   {NULL, NULL, 0, 0}
3135 };
3136
3137
3138 /**
3139  * Process fs requests.
3140  *
3141  * @param s scheduler to use
3142  * @param server the initialized server
3143  * @param c configuration to use
3144  */
3145 static int
3146 main_init (struct GNUNET_SCHEDULER_Handle *s,
3147            struct GNUNET_SERVER_Handle *server,
3148            const struct GNUNET_CONFIGURATION_Handle *c)
3149 {
3150   sched = s;
3151   cfg = c;
3152   stats = GNUNET_STATISTICS_create (sched, "fs", cfg);
3153   min_migration_delay = GNUNET_TIME_UNIT_SECONDS; // FIXME: get from config
3154   connected_peers = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
3155   query_request_map = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
3156   peer_request_map = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
3157   requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
3158   core = GNUNET_CORE_connect (sched,
3159                               cfg,
3160                               GNUNET_TIME_UNIT_FOREVER_REL,
3161                               NULL,
3162                               NULL,
3163                               &peer_connect_handler,
3164                               &peer_disconnect_handler,
3165                               NULL, GNUNET_NO,
3166                               NULL, GNUNET_NO,
3167                               p2p_handlers);
3168   if (NULL == core)
3169     {
3170       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3171                   _("Failed to connect to `%s' service.\n"),
3172                   "core");
3173       GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
3174       connected_peers = NULL;
3175       GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
3176       query_request_map = NULL;
3177       GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
3178       requests_by_expiration_heap = NULL;
3179       GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
3180       peer_request_map = NULL;
3181       if (dsh != NULL)
3182         {
3183           GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
3184           dsh = NULL;
3185         }
3186       return GNUNET_SYSERR;
3187     }
3188   /* FIXME: distinguish between sending and storing in options? */
3189   if (active_migration) 
3190     {
3191       mig_task = GNUNET_SCHEDULER_add_now (sched,
3192                                            &gather_migration_blocks,
3193                                            NULL);
3194     }
3195   GNUNET_SERVER_disconnect_notify (server, 
3196                                    &handle_client_disconnect,
3197                                    NULL);
3198   GNUNET_SERVER_add_handlers (server, handlers);
3199   GNUNET_SCHEDULER_add_delayed (sched,
3200                                 GNUNET_TIME_UNIT_FOREVER_REL,
3201                                 &shutdown_task,
3202                                 NULL);
3203   return GNUNET_OK;
3204 }
3205
3206
3207 /**
3208  * Process fs requests.
3209  *
3210  * @param cls closure
3211  * @param sched scheduler to use
3212  * @param server the initialized server
3213  * @param cfg configuration to use
3214  */
3215 static void
3216 run (void *cls,
3217      struct GNUNET_SCHEDULER_Handle *sched,
3218      struct GNUNET_SERVER_Handle *server,
3219      const struct GNUNET_CONFIGURATION_Handle *cfg)
3220 {
3221   active_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
3222                                                            "FS",
3223                                                            "ACTIVEMIGRATION");
3224   dsh = GNUNET_DATASTORE_connect (cfg,
3225                                   sched);
3226   if (dsh == NULL)
3227     {
3228       GNUNET_SCHEDULER_shutdown (sched);
3229       return;
3230     }
3231   if ( (GNUNET_OK != GNUNET_FS_indexing_init (sched, cfg, dsh)) ||
3232        (GNUNET_OK != main_init (sched, server, cfg)) )
3233     {    
3234       GNUNET_SCHEDULER_shutdown (sched);
3235       GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
3236       dsh = NULL;
3237       return;   
3238     }
3239 }
3240
3241
3242 /**
3243  * The main function for the fs service.
3244  *
3245  * @param argc number of arguments from the command line
3246  * @param argv command line arguments
3247  * @return 0 ok, 1 on error
3248  */
3249 int
3250 main (int argc, char *const *argv)
3251 {
3252   return (GNUNET_OK ==
3253           GNUNET_SERVICE_run (argc,
3254                               argv,
3255                               "fs",
3256                               GNUNET_SERVICE_OPTION_NONE,
3257                               &run, NULL)) ? 0 : 1;
3258 }
3259
3260 /* end of gnunet-service-fs.c */