moving indexing out of main file
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs.c
23  * @brief program that provides the file-sharing service
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - fix gazillion of minor FIXME's
28  * - possible major issue: we may queue "gazillions" of (K|S)Blocks for the
29  *   core to transmit to another peer; need to make sure this is bounded overall...
30  * - randomly delay processing for improved anonymity (can wait)
31  * - content migration (put in local DS) (can wait)
32  * - handle some special cases when forwarding replies based on tracked requests (can wait)
33  * - tracking of success correlations for hot-path routing (can wait)
34  * - various load-based actions (can wait)
35  * - validation of KSBLOCKS (can wait)
36  * - remove on-demand blocks if they keep failing (can wait)
37  * - check that we decrement PIDs always where necessary (can wait)
38  * - find out how to use core-pulling instead of pushing (at least for some cases)
39  */
40 #include "platform.h"
41 #include <float.h>
42 #include "gnunet_core_service.h"
43 #include "gnunet_datastore_service.h"
44 #include "gnunet_peer_lib.h"
45 #include "gnunet_protocols.h"
46 #include "gnunet_signatures.h"
47 #include "gnunet_util_lib.h"
48 #include "gnunet-service-fs_indexing.h"
49 #include "fs.h"
50
51 #define DEBUG_FS GNUNET_NO
52
53
54
55 /**
56  * Signature of a function that is called whenever a datastore
57  * request can be processed (or an entry put on the queue times out).
58  *
59  * @param cls closure
60  * @param ok GNUNET_OK if DS is ready, GNUNET_SYSERR on timeout
61  */
62 typedef void (*RequestFunction)(void *cls,
63                                 int ok);
64
65
66 /**
67  * Doubly-linked list of our requests for the datastore.
68  */
69 struct DatastoreRequestQueue
70 {
71
72   /**
73    * This is a doubly-linked list.
74    */
75   struct DatastoreRequestQueue *next;
76
77   /**
78    * This is a doubly-linked list.
79    */
80   struct DatastoreRequestQueue *prev;
81
82   /**
83    * Function to call (will issue the request).
84    */
85   RequestFunction req;
86
87   /**
88    * Closure for req.
89    */
90   void *req_cls;
91
92   /**
93    * When should this request time-out because we don't care anymore?
94    */
95   struct GNUNET_TIME_Absolute timeout;
96     
97   /**
98    * ID of task used for signaling timeout.
99    */
100   GNUNET_SCHEDULER_TaskIdentifier task;
101
102 };
103
104
105 /**
106  * Closure for processing START_SEARCH messages from a client.
107  */
108 struct LocalGetContext
109 {
110
111   /**
112    * This is a doubly-linked list.
113    */
114   struct LocalGetContext *next;
115
116   /**
117    * This is a doubly-linked list.
118    */
119   struct LocalGetContext *prev;
120
121   /**
122    * Client that initiated the search.
123    */
124   struct GNUNET_SERVER_Client *client;
125
126   /**
127    * Array of results that we've already received 
128    * (can be NULL).
129    */
130   GNUNET_HashCode *results; 
131
132   /**
133    * Bloomfilter over all results (for fast query construction);
134    * NULL if we don't have any results.
135    *
136    * FIXME: this member is not used, is that OK? If so, it should
137    * be removed!
138    */
139   struct GNUNET_CONTAINER_BloomFilter *results_bf; 
140
141   /**
142    * DS request associated with this operation.
143    */
144   struct DatastoreRequestQueue *req;
145
146   /**
147    * Current result message to transmit to client (or NULL).
148    */
149   struct ContentMessage *result;
150   
151   /**
152    * Type of the content that we're looking for.
153    * 0 for any.
154    */
155   uint32_t type;
156
157   /**
158    * Desired anonymity level.
159    */
160   uint32_t anonymity_level;
161
162   /**
163    * Number of results actually stored in the results array.
164    */
165   unsigned int results_used;
166   
167   /**
168    * Size of the results array in memory.
169    */
170   unsigned int results_size;
171   
172   /**
173    * Size (in bytes) of the 'results_bf' bloomfilter.
174    *
175    * FIXME: this member is not used, is that OK? If so, it should
176    * be removed!
177    */
178   size_t results_bf_size;
179
180   /**
181    * If the request is for a DBLOCK or IBLOCK, this is the identity of
182    * the peer that is known to have a response.  Set to all-zeros if
183    * such a target is not known (note that even if OUR anonymity
184    * level is >0 we may happen to know the responder's identity;
185    * nevertheless, we should probably not use it for a DHT-lookup
186    * or similar blunt actions in order to avoid exposing ourselves).
187    */
188   struct GNUNET_PeerIdentity target;
189
190   /**
191    * If the request is for an SBLOCK, this is the identity of the
192    * pseudonym to which the SBLOCK belongs. 
193    */
194   GNUNET_HashCode namespace;
195
196   /**
197    * Hash of the keyword (aka query) for KBLOCKs; Hash of
198    * the CHK-encoded block for DBLOCKS and IBLOCKS (aka query)
199    * and hash of the identifier XORed with the target for
200    * SBLOCKS (aka query).
201    */
202   GNUNET_HashCode query;
203
204 };
205
206
207 /**
208  * Possible routing policies for an FS-GET request.
209  */
210 enum RoutingPolicy
211   {
212     /**
213      * Simply drop the request.
214      */
215     ROUTING_POLICY_NONE = 0,
216     
217     /**
218      * Answer it if we can from local datastore.
219      */
220     ROUTING_POLICY_ANSWER = 1,
221
222     /**
223      * Forward the request to other peers (if possible).
224      */
225     ROUTING_POLICY_FORWARD = 2,
226
227     /**
228      * Forward to other peers, and ask them to route
229      * the response via ourselves.
230      */
231     ROUTING_POLICY_INDIRECT = 6,
232     
233     /**
234      * Do everything we could possibly do (that would
235      * make sense).
236      */
237     ROUTING_POLICY_ALL = 7
238   };
239
240
241 /**
242  * Internal context we use for our initial processing
243  * of a GET request.
244  */
245 struct ProcessGetContext
246 {
247   /**
248    * The search query (used for datastore lookup).
249    */
250   GNUNET_HashCode query;
251   
252   /**
253    * Which peer we should forward the response to.
254    */
255   struct GNUNET_PeerIdentity reply_to;
256
257   /**
258    * Namespace for the result (only set for SKS requests)
259    */
260   GNUNET_HashCode namespace;
261
262   /**
263    * Peer that we should forward the query to if possible
264    * (since that peer likely has the content).
265    */
266   struct GNUNET_PeerIdentity prime_target;
267
268   /**
269    * When did we receive this request?
270    */
271   struct GNUNET_TIME_Absolute start_time;
272
273   /**
274    * Our entry in the DRQ (non-NULL while we wait for our
275    * turn to interact with the local database).
276    */
277   struct DatastoreRequestQueue *drq;
278
279   /**
280    * Filter used to eliminate duplicate results.  Can be NULL if we
281    * are not yet filtering any results.
282    */
283   struct GNUNET_CONTAINER_BloomFilter *bf;
284
285   /**
286    * Bitmap describing which of the optional
287    * hash codes / peer identities were given to us.
288    */
289   uint32_t bm;
290
291   /**
292    * Desired block type.
293    */
294   uint32_t type;
295
296   /**
297    * Priority of the request.
298    */
299   uint32_t priority;
300
301   /**
302    * Size of the 'bf' (in bytes).
303    */
304   size_t bf_size;
305
306   /**
307    * In what ways are we going to process
308    * the request?
309    */
310   enum RoutingPolicy policy;
311
312   /**
313    * Time-to-live for the request (value
314    * we use).
315    */
316   int32_t ttl;
317
318   /**
319    * Number to mingle hashes for bloom-filter
320    * tests with.
321    */
322   int32_t mingle;
323
324   /**
325    * Number of results that were found so far.
326    */
327   unsigned int results_found;
328 };
329
330
331 /**
332  * Information we keep for each pending reply.  The
333  * actual message follows at the end of this struct.
334  */
335 struct PendingMessage
336 {
337   /**
338    * This is a linked list.
339    */
340   struct PendingMessage *next;
341
342   /**
343    * Size of the reply; actual reply message follows
344    * at the end of this struct.
345    */
346   size_t msize;
347   
348   /**
349    * How important is this message for us?
350    */
351   uint32_t priority;
352
353 };
354
355
356 /**
357  * All requests from a client are kept in a doubly-linked list.
358  */
359 struct ClientRequestList;
360
361
362 /**
363  * Information we keep for each pending request.  We should try to
364  * keep this struct as small as possible since its memory consumption
365  * is key to how many requests we can have pending at once.
366  */
367 struct PendingRequest
368 {
369
370   /**
371    * ID of a client making a request, NULL if this entry is for a
372    * peer.
373    */
374   struct GNUNET_SERVER_Client *client;
375
376   /**
377    * If this request was made by a client,
378    * this is our entry in the client request
379    * list; otherwise NULL.
380    */
381   struct ClientRequestList *crl_entry;
382
383   /**
384    * If this is a namespace query, pointer to the hash of the public
385    * key of the namespace; otherwise NULL.
386    */
387   GNUNET_HashCode *namespace;
388
389   /**
390    * Bloomfilter we use to filter out replies that we don't care about
391    * (anymore).  NULL as long as we are interested in all replies.
392    */
393   struct GNUNET_CONTAINER_BloomFilter *bf;
394
395   /**
396    * Context of our GNUNET_CORE_peer_change_preference call.
397    */
398   struct GNUNET_CORE_InformationRequestContext *irc;
399
400   /**
401    * Replies that we have received but were unable to forward yet
402    * (typically non-null only if we have a pending transmission
403    * request with the client or the respective peer).
404    */
405   struct PendingMessage *replies_pending;
406
407   /**
408    * Pending transmission request with the core service for the target
409    * peer (for processing of 'replies_pending') or Handle for a
410    * pending query-request for P2P-transmission with the core service.
411    * If non-NULL, this request must be cancelled should this struct be
412    * destroyed!
413    */
414   struct GNUNET_CORE_TransmitHandle *cth;
415
416   /**
417    * Pending transmission request for the target client (for processing of
418    * 'replies_pending').
419    */
420   struct GNUNET_CONNECTION_TransmitHandle *th;
421
422   /**
423    * Hash code of all replies that we have seen so far (only valid
424    * if client is not NULL since we only track replies like this for
425    * our own clients).
426    */
427   GNUNET_HashCode *replies_seen;
428
429   /**
430    * Node in the heap representing this entry.
431    */
432   struct GNUNET_CONTAINER_HeapNode *hnode;
433
434   /**
435    * When did we first see this request (form this peer), or, if our
436    * client is initiating, when did we last initiate a search?
437    */
438   struct GNUNET_TIME_Absolute start_time;
439
440   /**
441    * The query that this request is for.
442    */
443   GNUNET_HashCode query;
444
445   /**
446    * The task responsible for transmitting queries
447    * for this request.
448    */
449   GNUNET_SCHEDULER_TaskIdentifier task;
450
451   /**
452    * (Interned) Peer identifier (only valid if "client" is NULL)
453    * that identifies a peer that gave us this request.
454    */
455   GNUNET_PEER_Id source_pid;
456
457   /**
458    * (Interned) Peer identifier that identifies a preferred target
459    * for requests.
460    */
461   GNUNET_PEER_Id target_pid;
462
463   /**
464    * (Interned) Peer identifiers of peers that have already
465    * received our query for this content.
466    */
467   GNUNET_PEER_Id *used_pids;
468
469   /**
470    * Size of the 'bf' (in bytes).
471    */
472   size_t bf_size;
473
474   /**
475    * Desired anonymity level; only valid for requests from a local client.
476    */
477   uint32_t anonymity_level;
478
479   /**
480    * How many entries in "used_pids" are actually valid?
481    */
482   unsigned int used_pids_off;
483
484   /**
485    * How long is the "used_pids" array?
486    */
487   unsigned int used_pids_size;
488
489   /**
490    * How many entries in "replies_seen" are actually valid?
491    */
492   unsigned int replies_seen_off;
493
494   /**
495    * How long is the "replies_seen" array?
496    */
497   unsigned int replies_seen_size;
498   
499   /**
500    * Priority with which this request was made.  If one of our clients
501    * made the request, then this is the current priority that we are
502    * using when initiating the request.  This value is used when
503    * we decide to reward other peers with trust for providing a reply.
504    */
505   uint32_t priority;
506
507   /**
508    * Priority points left for us to spend when forwarding this request
509    * to other peers.
510    */
511   uint32_t remaining_priority;
512
513   /**
514    * Number to mingle hashes for bloom-filter
515    * tests with.
516    */
517   int32_t mingle;
518
519   /**
520    * TTL with which we saw this request (or, if we initiated, TTL that
521    * we used for the request).
522    */
523   int32_t ttl;
524   
525   /**
526    * Type of the content that this request is for.
527    */
528   uint32_t type;
529
530 };
531
532
533 /**
534  * All requests from a client are kept in a doubly-linked list.
535  */
536 struct ClientRequestList
537 {
538   /**
539    * This is a doubly-linked list.
540    */
541   struct ClientRequestList *next;
542
543   /**
544    * This is a doubly-linked list.
545    */ 
546   struct ClientRequestList *prev;
547
548   /**
549    * A request from this client.
550    */
551   struct PendingRequest *req;
552
553   /**
554    * Client list with the head and tail of this DLL.
555    */
556   struct ClientList *cl;
557 };
558
559
560 /**
561  * Linked list of all clients that we are 
562  * currently processing requests for.
563  */
564 struct ClientList
565 {
566
567   /**
568    * This is a linked list.
569    */
570   struct ClientList *next;
571
572   /**
573    * What client is this entry for?
574    */
575   struct GNUNET_SERVER_Client* client;
576
577   /**
578    * Head of the DLL of requests from this client.
579    */
580   struct ClientRequestList *head;
581
582   /**
583    * Tail of the DLL of requests from this client.
584    */
585   struct ClientRequestList *tail;
586
587 };
588
589
590 /**
591  * Closure for "process_reply" function.
592  */
593 struct ProcessReplyClosure
594 {
595   /**
596    * The data for the reply.
597    */
598   const void *data;
599
600   /**
601    * When the reply expires.
602    */
603   struct GNUNET_TIME_Absolute expiration;
604
605   /**
606    * Size of data.
607    */
608   size_t size;
609
610   /**
611    * Namespace that this reply belongs to
612    * (if it is of type SBLOCK).
613    */
614   GNUNET_HashCode namespace;
615
616   /**
617    * Type of the block.
618    */
619   uint32_t type;
620
621   /**
622    * How much was this reply worth to us?
623    */
624   uint32_t priority;
625 };
626
627
628 /**
629  * Information about a peer that we are connected to.
630  * We track data that is useful for determining which
631  * peers should receive our requests.
632  */
633 struct ConnectedPeer
634 {
635
636   /**
637    * List of the last clients for which this peer
638    * successfully answered a query. 
639    */
640   struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
641
642   /**
643    * List of the last PIDs for which
644    * this peer successfully answered a query;
645    * We use 0 to indicate no successful reply.
646    */
647   GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
648
649   /**
650    * Average delay between sending the peer a request and
651    * getting a reply (only calculated over the requests for
652    * which we actually got a reply).   Calculated
653    * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
654    */ 
655   struct GNUNET_TIME_Relative avg_delay;
656
657   /**
658    * Handle for an active request for transmission to this
659    * peer, or NULL.
660    */
661   struct GNUNET_CORE_PeerRequestHandle *prh;
662
663   /**
664    * Messages (replies, queries, content migration) we would like to
665    * send to this peer in the near future.  Sorted by priority.
666    */
667   struct PendingMessage *pending_messages;
668
669   /**
670    * Average priority of successful replies.  Calculated
671    * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
672    */
673   double avg_priority;
674
675   /**
676    * The peer's identity.
677    */
678   GNUNET_PEER_Id pid;  
679
680   /**
681    * Number of requests we have currently pending with this peer (that
682    * is, requests that were transmitted so recently that we would not
683    * retransmit them right now).
684    */
685   unsigned int pending_requests;
686
687   /**
688    * Which offset in "last_p2p_replies" will be updated next?
689    * (we go round-robin).
690    */
691   unsigned int last_p2p_replies_woff;
692
693   /**
694    * Which offset in "last_client_replies" will be updated next?
695    * (we go round-robin).
696    */
697   unsigned int last_client_replies_woff;
698
699 };
700
701
702 /**
703  * Our connection to the datastore.
704  */
705 static struct GNUNET_DATASTORE_Handle *dsh;
706
707 /**
708  * Our scheduler.
709  */
710 static struct GNUNET_SCHEDULER_Handle *sched;
711
712 /**
713  * Our configuration.
714  */
715 const struct GNUNET_CONFIGURATION_Handle *cfg;
716
717 /**
718  * Handle to the core service (NULL until we've connected to it).
719  */
720 struct GNUNET_CORE_Handle *core;
721
722 /**
723  * Head of doubly-linked LGC list.
724  */
725 static struct LocalGetContext *lgc_head;
726
727 /**
728  * Tail of doubly-linked LGC list.
729  */
730 static struct LocalGetContext *lgc_tail;
731
732 /**
733  * Head of request queue for the datastore, sorted by timeout.
734  */
735 static struct DatastoreRequestQueue *drq_head;
736
737 /**
738  * Tail of request queue for the datastore.
739  */
740 static struct DatastoreRequestQueue *drq_tail;
741
742 /**
743  * Map of query hash codes to requests.
744  */
745 static struct GNUNET_CONTAINER_MultiHashMap *requests_by_query;
746
747 /**
748  * Map of peer IDs to requests (for those requests coming
749  * from other peers).
750  */
751 static struct GNUNET_CONTAINER_MultiHashMap *requests_by_peer;
752
753 /**
754  * Linked list of all of our clients and their requests.
755  */
756 static struct ClientList *clients;
757
758 /**
759  * Heap with the request that will expire next at the top.  Contains
760  * pointers of type "struct PendingRequest*"; these will *also* be
761  * aliased from the "requests_by_peer" data structures and the
762  * "requests_by_query" table.  Note that requests from our clients
763  * don't expire and are thus NOT in the "requests_by_expiration"
764  * (or the "requests_by_peer" tables).
765  */
766 static struct GNUNET_CONTAINER_Heap *requests_by_expiration;
767
768 /**
769  * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
770  */
771 static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
772
773 /**
774  * Maximum number of requests (from other peers) that we're
775  * willing to have pending at any given point in time.
776  * FIXME: set from configuration (and 32 is a tiny value for testing only).
777  */
778 static uint64_t max_pending_requests = 32;
779
780
781
782
783
784
785 /**
786  * Run the next DS request in our
787  * queue, we're done with the current one.
788  */
789 static void
790 next_ds_request ()
791 {
792   struct DatastoreRequestQueue *e;
793   
794   while (NULL != (e = drq_head))
795     {
796       if (0 != GNUNET_TIME_absolute_get_remaining (e->timeout).value)
797         break;
798       if (e->task != GNUNET_SCHEDULER_NO_TASK)
799         GNUNET_SCHEDULER_cancel (sched, e->task);
800       GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
801       e->req (e->req_cls, GNUNET_NO);
802       GNUNET_free (e);  
803     }
804   if (e == NULL)
805     return;
806   if (e->task != GNUNET_SCHEDULER_NO_TASK)
807     GNUNET_SCHEDULER_cancel (sched, e->task);
808   e->task = GNUNET_SCHEDULER_NO_TASK;
809   e->req (e->req_cls, GNUNET_YES);
810   GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
811   GNUNET_free (e);  
812 }
813
814
815 /**
816  * A datastore request had to be timed out. 
817  *
818  * @param cls closure (of type "struct DatastoreRequestQueue*")
819  * @param tc task context, unused
820  */
821 static void
822 timeout_ds_request (void *cls,
823                     const struct GNUNET_SCHEDULER_TaskContext *tc)
824 {
825   struct DatastoreRequestQueue *e = cls;
826
827   e->task = GNUNET_SCHEDULER_NO_TASK;
828   GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
829   e->req (e->req_cls, GNUNET_NO);
830   GNUNET_free (e);  
831 }
832
833
834 /**
835  * Queue a request for the datastore.
836  *
837  * @param deadline by when the request should run
838  * @param fun function to call once the request can be run
839  * @param fun_cls closure for fun
840  */
841 static struct DatastoreRequestQueue *
842 queue_ds_request (struct GNUNET_TIME_Relative deadline,
843                   RequestFunction fun,
844                   void *fun_cls)
845 {
846   struct DatastoreRequestQueue *e;
847   struct DatastoreRequestQueue *bef;
848
849   if (drq_head == NULL)
850     {
851       /* no other requests pending, run immediately */
852       fun (fun_cls, GNUNET_OK);
853       return NULL;
854     }
855   e = GNUNET_malloc (sizeof (struct DatastoreRequestQueue));
856   e->timeout = GNUNET_TIME_relative_to_absolute (deadline);
857   e->req = fun;
858   e->req_cls = fun_cls;
859   if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
860     {
861       /* local request, highest prio, put at head of queue
862          regardless of deadline */
863       bef = NULL;
864     }
865   else
866     {
867       bef = drq_tail;
868       while ( (NULL != bef) &&
869               (e->timeout.value < bef->timeout.value) )
870         bef = bef->prev;
871     }
872   GNUNET_CONTAINER_DLL_insert_after (drq_head, drq_tail, bef, e);
873   if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
874     return e;
875   e->task = GNUNET_SCHEDULER_add_delayed (sched,
876                                           deadline,
877                                           &timeout_ds_request,
878                                           e);
879   return e;                                    
880 }
881
882
883 /**
884  * Free the state associated with a local get context.
885  *
886  * @param lgc the lgc to free
887  */
888 static void
889 local_get_context_free (struct LocalGetContext *lgc) 
890 {
891   GNUNET_CONTAINER_DLL_remove (lgc_head, lgc_tail, lgc);
892   GNUNET_SERVER_client_drop (lgc->client); 
893   GNUNET_free_non_null (lgc->results);
894   if (lgc->results_bf != NULL)
895     GNUNET_CONTAINER_bloomfilter_free (lgc->results_bf);
896   if (lgc->req != NULL)
897     {
898       if (lgc->req->task != GNUNET_SCHEDULER_NO_TASK)
899         GNUNET_SCHEDULER_cancel (sched, lgc->req->task);
900       GNUNET_CONTAINER_DLL_remove (lgc_head, lgc_tail, lgc);
901       GNUNET_free (lgc->req);
902     }
903   GNUNET_free (lgc);
904 }
905
906
907 /**
908  * We're able to transmit the next (local) result to the client.
909  * Do it and ask the datastore for more.  Or, on error, tell
910  * the datastore to stop giving us more.
911  *
912  * @param cls our closure (struct LocalGetContext)
913  * @param max maximum number of bytes we can transmit
914  * @param buf where to copy our message
915  * @return number of bytes copied to buf
916  */
917 static size_t
918 transmit_local_result (void *cls,
919                        size_t max,
920                        void *buf)
921 {
922   struct LocalGetContext *lgc = cls;  
923   uint16_t msize;
924
925   if (NULL == buf)
926     {
927 #if DEBUG_FS
928       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
929                   "Failed to transmit result to local client, aborting datastore iteration.\n");
930 #endif
931       /* error, abort! */
932       GNUNET_free (lgc->result);
933       lgc->result = NULL;
934       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
935       return 0;
936     }
937   msize = ntohs (lgc->result->header.size);
938 #if DEBUG_FS
939   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
940               "Transmitting %u bytes of result to local client.\n",
941               msize);
942 #endif
943   GNUNET_assert (max >= msize);
944   memcpy (buf, lgc->result, msize);
945   GNUNET_free (lgc->result);
946   lgc->result = NULL;
947   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
948   return msize;
949 }
950
951
952 /**
953  * Mingle hash with the mingle_number to
954  * produce different bits.
955  */
956 static void
957 mingle_hash (const GNUNET_HashCode * in,
958              int32_t mingle_number, 
959              GNUNET_HashCode * hc)
960 {
961   GNUNET_HashCode m;
962
963   GNUNET_CRYPTO_hash (&mingle_number, 
964                       sizeof (int32_t), 
965                       &m);
966   GNUNET_CRYPTO_hash_xor (&m, in, hc);
967 }
968
969
970 /**
971  * How many bytes should a bloomfilter be if we have already seen
972  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
973  * of bits set per entry.  Furthermore, we should not re-size the
974  * filter too often (to keep it cheap).
975  *
976  * Since other peers will also add entries but not resize the filter,
977  * we should generally pick a slightly larger size than what the
978  * strict math would suggest.
979  *
980  * @return must be a power of two and smaller or equal to 2^15.
981  */
982 static size_t
983 compute_bloomfilter_size (unsigned int entry_count)
984 {
985   size_t size;
986   unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
987   uint16_t max = 1 << 15;
988
989   if (entry_count > max)
990     return max;
991   size = 8;
992   while ((size < max) && (size < ideal))
993     size *= 2;
994   if (size > max)
995     return max;
996   return size;
997 }
998
999
1000 /**
1001  * Recalculate our bloom filter for filtering replies.
1002  *
1003  * @param count number of entries we are filtering right now
1004  * @param mingle set to our new mingling value
1005  * @param bf_size set to the size of the bloomfilter
1006  * @param entries the entries to filter
1007  * @return updated bloomfilter, NULL for none
1008  */
1009 static struct GNUNET_CONTAINER_BloomFilter *
1010 refresh_bloomfilter (unsigned int count,
1011                      int32_t * mingle,
1012                      size_t *bf_size,
1013                      const GNUNET_HashCode *entries)
1014 {
1015   struct GNUNET_CONTAINER_BloomFilter *bf;
1016   size_t nsize;
1017   unsigned int i;
1018   GNUNET_HashCode mhash;
1019
1020   if (0 == count)
1021     return NULL;
1022   nsize = compute_bloomfilter_size (count);
1023   *mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
1024   *bf_size = nsize;
1025   bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
1026                                           nsize,
1027                                           BLOOMFILTER_K);
1028   for (i=0;i<count;i++)
1029     {
1030       mingle_hash (&entries[i], *mingle, &mhash);
1031       GNUNET_CONTAINER_bloomfilter_add (bf, &mhash);
1032     }
1033   return bf;
1034 }
1035
1036
1037 /**
1038  * Closure used for "target_peer_select_cb".
1039  */
1040 struct PeerSelectionContext 
1041 {
1042   /**
1043    * The request for which we are selecting
1044    * peers.
1045    */
1046   struct PendingRequest *pr;
1047
1048   /**
1049    * Current "prime" target.
1050    */
1051   struct GNUNET_PeerIdentity target;
1052
1053   /**
1054    * How much do we like this target?
1055    */
1056   double target_score;
1057
1058 };
1059
1060
1061 /**
1062  * Function called for each connected peer to determine
1063  * which one(s) would make good targets for forwarding.
1064  *
1065  * @param cls closure (struct PeerSelectionContext)
1066  * @param key current key code (peer identity)
1067  * @param value value in the hash map (struct ConnectedPeer)
1068  * @return GNUNET_YES if we should continue to
1069  *         iterate,
1070  *         GNUNET_NO if not.
1071  */
1072 static int
1073 target_peer_select_cb (void *cls,
1074                        const GNUNET_HashCode * key,
1075                        void *value)
1076 {
1077   struct PeerSelectionContext *psc = cls;
1078   struct ConnectedPeer *cp = value;
1079   struct PendingRequest *pr = psc->pr;
1080   double score;
1081   unsigned int i;
1082
1083   /* 1) check if we have already (recently) forwarded to this peer */
1084   for (i=0;i<pr->used_pids_off;i++)
1085     if (pr->used_pids[i] == cp->pid)
1086       return GNUNET_YES; /* skip */
1087   // 2) calculate how much we'd like to forward to this peer
1088   score = 0; // FIXME!
1089   
1090   /* store best-fit in closure */
1091   if (score > psc->target_score)
1092     {
1093       psc->target_score = score;
1094       psc->target.hashPubKey = *key; 
1095     }
1096   return GNUNET_YES;
1097 }
1098
1099
1100 /**
1101  * We use a random delay to make the timing of requests
1102  * less predictable.  This function returns such a random
1103  * delay.
1104  *
1105  * @return random delay to use for some request, between 0 and TTL_DECREMENT ms
1106  */
1107 static struct GNUNET_TIME_Relative
1108 get_processing_delay ()
1109 {
1110   return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
1111                                         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1112                                                                   TTL_DECREMENT));
1113 }
1114
1115
1116 /**
1117  * Task that is run for each request with the
1118  * goal of forwarding the associated query to
1119  * other peers.  The task should re-schedule
1120  * itself to be re-run once the TTL has expired.
1121  * (or at a later time if more peers should
1122  * be queried earlier).
1123  *
1124  * @param cls the requests "struct PendingRequest*"
1125  * @param tc task context (unused)
1126  */
1127 static void
1128 forward_request_task (void *cls,
1129                       const struct GNUNET_SCHEDULER_TaskContext *tc);
1130
1131
1132 /**
1133  * We've selected a peer for forwarding of a query.
1134  * Construct the message and then re-schedule the
1135  * task to forward again to (other) peers.
1136  *
1137  * @param cls closure
1138  * @param size number of bytes available in buf
1139  * @param buf where the callee should write the message
1140  * @return number of bytes written to buf
1141  */
1142 static size_t
1143 transmit_request_cb (void *cls,
1144                      size_t size, 
1145                      void *buf)
1146 {
1147   struct PendingRequest *pr = cls;
1148   struct GetMessage *gm;
1149   GNUNET_HashCode *ext;
1150   char *bfdata;
1151   size_t msize;
1152   unsigned int k;
1153
1154   pr->cth = NULL;
1155   /* (1) check for timeout */
1156   if (NULL == buf)
1157     {
1158       /* timeout, try another peer immediately again */
1159       pr->task = GNUNET_SCHEDULER_add_with_priority (sched,
1160                                                      GNUNET_SCHEDULER_PRIORITY_IDLE,
1161                                                      &forward_request_task,
1162                                                      pr);
1163       return 0;
1164     }
1165   /* (2) build query message */
1166   k = 0; // FIXME: count hash codes!
1167   msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
1168   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1169   gm = (struct GetMessage*) buf;
1170   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
1171   gm->header.size = htons (msize);
1172   gm->type = htonl (pr->type);
1173   pr->remaining_priority /= 2;
1174   gm->priority = htonl (pr->remaining_priority);
1175   gm->ttl = htonl (pr->ttl);
1176   gm->filter_mutator = htonl(pr->mingle);
1177   gm->hash_bitmap = htonl (42);
1178   gm->query = pr->query;
1179   ext = (GNUNET_HashCode*) &gm[1];
1180   // FIXME: setup "ext[0]..[k-1]"
1181   bfdata = (char *) &ext[k];
1182   if (pr->bf != NULL)
1183     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
1184                                                bfdata,
1185                                                pr->bf_size);
1186   
1187   /* (3) schedule job to do it again (or another peer, etc.) */
1188   pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1189                                            get_processing_delay (), // FIXME!
1190                                            &forward_request_task,
1191                                            pr);
1192
1193   return msize;
1194 }
1195
1196
1197 /**
1198  * Function called after we've tried to reserve
1199  * a certain amount of bandwidth for a reply.
1200  * Check if we succeeded and if so send our query.
1201  *
1202  * @param cls the requests "struct PendingRequest*"
1203  * @param peer identifies the peer
1204  * @param bpm_in set to the current bandwidth limit (receiving) for this peer
1205  * @param bpm_out set to the current bandwidth limit (sending) for this peer
1206  * @param amount set to the amount that was actually reserved or unreserved
1207  * @param preference current traffic preference for the given peer
1208  */
1209 static void
1210 target_reservation_cb (void *cls,
1211                        const struct
1212                        GNUNET_PeerIdentity * peer,
1213                        unsigned int bpm_in,
1214                        unsigned int bpm_out,
1215                        int amount,
1216                        uint64_t preference)
1217 {
1218   struct PendingRequest *pr = cls;
1219   uint32_t priority;
1220   uint16_t size;
1221   struct GNUNET_TIME_Relative maxdelay;
1222
1223   pr->irc = NULL;
1224   GNUNET_assert (peer != NULL);
1225   if ( (amount != DBLOCK_SIZE) ||
1226        (pr->cth != NULL) )
1227     {
1228       /* try again later; FIXME: we may need to un-reserve "amount"? */
1229       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1230                                                get_processing_delay (), // FIXME: longer?
1231                                                &forward_request_task,
1232                                                pr);
1233       return;
1234     }
1235   // (2) transmit, update ttl/priority
1236   // FIXME: calculate priority, maxdelay, size properly!
1237   priority = 0;
1238   size = 60000;
1239   maxdelay = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
1240   pr->cth = GNUNET_CORE_notify_transmit_ready (core,
1241                                                priority,
1242                                                maxdelay,
1243                                                peer,
1244                                                size,
1245                                                &transmit_request_cb,
1246                                                pr);
1247   if (pr->cth == NULL)
1248     {
1249       /* try again later */
1250       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1251                                                get_processing_delay (), // FIXME: longer?
1252                                                &forward_request_task,
1253                                                pr);
1254     }
1255 }
1256
1257
1258 /**
1259  * Task that is run for each request with the
1260  * goal of forwarding the associated query to
1261  * other peers.  The task should re-schedule
1262  * itself to be re-run once the TTL has expired.
1263  * (or at a later time if more peers should
1264  * be queried earlier).
1265  *
1266  * @param cls the requests "struct PendingRequest*"
1267  * @param tc task context (unused)
1268  */
1269 static void
1270 forward_request_task (void *cls,
1271                       const struct GNUNET_SCHEDULER_TaskContext *tc)
1272 {
1273   struct PendingRequest *pr = cls;
1274   struct PeerSelectionContext psc;
1275
1276   pr->task = GNUNET_SCHEDULER_NO_TASK;
1277   if (pr->cth != NULL) 
1278     {
1279       /* we're busy transmitting a result, wait a bit */
1280       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1281                                                get_processing_delay (), 
1282                                                &forward_request_task,
1283                                                pr);
1284       return;
1285     }
1286   /* (1) select target */
1287   psc.pr = pr;
1288   psc.target_score = DBL_MIN;
1289   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1290                                          &target_peer_select_cb,
1291                                          &psc);
1292   if (psc.target_score == DBL_MIN)
1293     {
1294       /* no possible target found, wait some time */
1295       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1296                                                get_processing_delay (), // FIXME: exponential back-off? or at least wait longer...
1297                                                &forward_request_task,
1298                                                pr);
1299       return;
1300     }
1301   /* (2) reserve reply bandwidth */
1302   GNUNET_assert (NULL == pr->irc);
1303   pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
1304                                        &psc.target,
1305                                        GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
1306                                        -1,
1307                                        DBLOCK_SIZE, // FIXME: make dependent on type?
1308                                        0,
1309                                        &target_reservation_cb,
1310                                        pr);
1311 }
1312
1313
1314 /**
1315  * We're processing (local) results for a search request
1316  * from a (local) client.  Pass applicable results to the
1317  * client and if we are done either clean up (operation
1318  * complete) or switch to P2P search (more results possible).
1319  *
1320  * @param cls our closure (struct LocalGetContext)
1321  * @param key key for the content
1322  * @param size number of bytes in data
1323  * @param data content stored
1324  * @param type type of the content
1325  * @param priority priority of the content
1326  * @param anonymity anonymity-level for the content
1327  * @param expiration expiration time for the content
1328  * @param uid unique identifier for the datum;
1329  *        maybe 0 if no unique identifier is available
1330  */
1331 static void
1332 process_local_get_result (void *cls,
1333                           const GNUNET_HashCode * key,
1334                           uint32_t size,
1335                           const void *data,
1336                           uint32_t type,
1337                           uint32_t priority,
1338                           uint32_t anonymity,
1339                           struct GNUNET_TIME_Absolute
1340                           expiration, 
1341                           uint64_t uid)
1342 {
1343   struct LocalGetContext *lgc = cls;
1344   struct PendingRequest *pr;
1345   struct ClientRequestList *crl;
1346   struct ClientList *cl;
1347   size_t msize;
1348   unsigned int i;
1349
1350   if (key == NULL)
1351     {
1352 #if DEBUG_FS
1353       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1354                   "Received last result for `%s' from local datastore, deciding what to do next.\n",
1355                   GNUNET_h2s (&lgc->query));
1356 #endif
1357       /* no further results from datastore; continue
1358          processing further requests from the client and
1359          allow the next task to use the datastore; also,
1360          switch to P2P requests or clean up our state. */
1361       next_ds_request ();
1362       GNUNET_SERVER_receive_done (lgc->client,
1363                                   GNUNET_OK);
1364       if ( (lgc->results_used == 0) ||
1365            (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
1366            (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
1367            (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) )
1368         {
1369 #if DEBUG_FS
1370           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1371                       "Forwarding query for `%s' to network.\n",
1372                       GNUNET_h2s (&lgc->query));
1373 #endif
1374           cl = clients;
1375           while ( (NULL != cl) &&
1376                   (cl->client != lgc->client) )
1377             cl = cl->next;
1378           if (cl == NULL)
1379             {
1380               cl = GNUNET_malloc (sizeof (struct ClientList));
1381               cl->client = lgc->client;
1382               cl->next = clients;
1383               clients = cl;
1384             }
1385           crl = GNUNET_malloc (sizeof (struct ClientRequestList));
1386           crl->cl = cl;
1387           GNUNET_CONTAINER_DLL_insert (cl->head, cl->tail, crl);
1388           pr = GNUNET_malloc (sizeof (struct PendingRequest));
1389           pr->client = lgc->client;
1390           GNUNET_SERVER_client_keep (pr->client);
1391           pr->crl_entry = crl;
1392           crl->req = pr;
1393           if (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)
1394             {
1395               pr->namespace = GNUNET_malloc (sizeof (GNUNET_HashCode));
1396               *pr->namespace = lgc->namespace;
1397             }
1398           pr->replies_seen = lgc->results;
1399           lgc->results = NULL;
1400           pr->start_time = GNUNET_TIME_absolute_get ();
1401           pr->query = lgc->query;
1402           pr->target_pid = GNUNET_PEER_intern (&lgc->target);
1403           pr->replies_seen_off = lgc->results_used;
1404           pr->replies_seen_size = lgc->results_size;
1405           lgc->results_size = 0;
1406           pr->type = lgc->type;
1407           pr->anonymity_level = lgc->anonymity_level;
1408           pr->bf = refresh_bloomfilter (pr->replies_seen_off,
1409                                         &pr->mingle,
1410                                         &pr->bf_size,
1411                                         pr->replies_seen);
1412           GNUNET_CONTAINER_multihashmap_put (requests_by_query,
1413                                              &pr->query,
1414                                              pr,
1415                                              GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1416           pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1417                                                    get_processing_delay (),
1418                                                    &forward_request_task,
1419                                                    pr);
1420           local_get_context_free (lgc);
1421           return;
1422         }
1423       /* got all possible results, clean up! */
1424 #if DEBUG_FS
1425       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1426                   "Found all possible results for query for `%s', done!\n",
1427                   GNUNET_h2s (&lgc->query));
1428 #endif
1429       local_get_context_free (lgc);
1430       return;
1431     }
1432   if (type == GNUNET_DATASTORE_BLOCKTYPE_ONDEMAND)
1433     {
1434 #if DEBUG_FS
1435       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1436                   "Received on-demand block for `%s' from local datastore, fetching data.\n",
1437                   GNUNET_h2s (&lgc->query));
1438 #endif
1439       GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, 
1440                                         anonymity, expiration, uid,
1441                                         dsh,
1442                                         &process_local_get_result,
1443                                         lgc);
1444       return;
1445     }
1446   if ( (type != lgc->type) &&
1447        (lgc->type != GNUNET_DATASTORE_BLOCKTYPE_ANY) )
1448     {
1449       /* this should be virtually impossible to reach (DBLOCK 
1450          query hash being identical to KBLOCK/SBLOCK query hash);
1451          nevertheless, if it happens, the correct thing is to
1452          simply skip the result. */
1453 #if DEBUG_FS
1454       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1455                   "Received block of unexpected type (%u, want %u) for `%s' from local datastore, ignoring.\n",
1456                   type,
1457                   lgc->type,
1458                   GNUNET_h2s (&lgc->query));
1459 #endif
1460       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);        
1461       return;
1462     }
1463   /* check if this is a result we've alredy
1464      received */
1465   for (i=0;i<lgc->results_used;i++)
1466     if (0 == memcmp (key,
1467                      &lgc->results[i],
1468                      sizeof (GNUNET_HashCode)))
1469       {
1470 #if DEBUG_FS
1471         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1472                     "Received duplicate result for `%s' from local datastore, ignoring.\n",
1473                     GNUNET_h2s (&lgc->query));
1474 #endif
1475         GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1476         return; 
1477       }
1478   if (lgc->results_used == lgc->results_size)
1479     GNUNET_array_grow (lgc->results,
1480                        lgc->results_size,
1481                        lgc->results_size * 2 + 2);
1482   GNUNET_CRYPTO_hash (data, 
1483                       size, 
1484                       &lgc->results[lgc->results_used++]);    
1485   msize = size + sizeof (struct ContentMessage);
1486   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1487   lgc->result = GNUNET_malloc (msize);
1488   lgc->result->header.size = htons (msize);
1489   lgc->result->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT);
1490   lgc->result->type = htonl (type);
1491   lgc->result->expiration = GNUNET_TIME_absolute_hton (expiration);
1492   memcpy (&lgc->result[1],
1493           data,
1494           size);
1495 #if DEBUG_FS
1496   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1497               "Received new result for `%s' from local datastore, passing to client.\n",
1498               GNUNET_h2s (&lgc->query));
1499 #endif
1500   GNUNET_SERVER_notify_transmit_ready (lgc->client,
1501                                        msize,
1502                                        GNUNET_TIME_UNIT_FOREVER_REL,
1503                                        &transmit_local_result,
1504                                        lgc);
1505 }
1506
1507
1508 /**
1509  * We're processing a search request from a local
1510  * client.  Now it is our turn to query the datastore.
1511  * 
1512  * @param cls our closure (struct LocalGetContext)
1513  * @param tc unused
1514  */
1515 static void
1516 transmit_local_get (void *cls,
1517                     const struct GNUNET_SCHEDULER_TaskContext *tc)
1518 {
1519   struct LocalGetContext *lgc = cls;
1520   uint32_t type;
1521   
1522   type = lgc->type;
1523   if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK)
1524     type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */
1525   GNUNET_DATASTORE_get (dsh,
1526                         &lgc->query,
1527                         type,
1528                         &process_local_get_result,
1529                         lgc,
1530                         GNUNET_TIME_UNIT_FOREVER_REL);
1531 }
1532
1533
1534 /**
1535  * We're processing a search request from a local
1536  * client.  Now it is our turn to query the datastore.
1537  * 
1538  * @param cls our closure (struct LocalGetContext)
1539  * @param ok did we succeed to queue for datastore access, should always be GNUNET_OK
1540  */
1541 static void 
1542 transmit_local_get_ready (void *cls,
1543                           int ok)
1544 {
1545   struct LocalGetContext *lgc = cls;
1546
1547   GNUNET_assert (GNUNET_OK == ok);
1548   GNUNET_SCHEDULER_add_continuation (sched,
1549                                      &transmit_local_get,
1550                                      lgc,
1551                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
1552 }
1553
1554
1555 /**
1556  * Handle START_SEARCH-message (search request from client).
1557  *
1558  * @param cls closure
1559  * @param client identification of the client
1560  * @param message the actual message
1561  */
1562 static void
1563 handle_start_search (void *cls,
1564                      struct GNUNET_SERVER_Client *client,
1565                      const struct GNUNET_MessageHeader *message)
1566 {
1567   const struct SearchMessage *sm;
1568   struct LocalGetContext *lgc;
1569   uint16_t msize;
1570   unsigned int sc;
1571   
1572   msize = ntohs (message->size);
1573   if ( (msize < sizeof (struct SearchMessage)) ||
1574        (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
1575     {
1576       GNUNET_break (0);
1577       GNUNET_SERVER_receive_done (client,
1578                                   GNUNET_SYSERR);
1579       return;
1580     }
1581   sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
1582   sm = (const struct SearchMessage*) message;
1583   GNUNET_SERVER_client_keep (client);
1584   lgc = GNUNET_malloc (sizeof (struct LocalGetContext));
1585   if  (sc > 0)
1586     {
1587       lgc->results_used = sc;
1588       GNUNET_array_grow (lgc->results,
1589                          lgc->results_size,
1590                          sc * 2);
1591       memcpy (lgc->results,
1592               &sm[1],
1593               sc * sizeof (GNUNET_HashCode));
1594     }
1595   lgc->client = client;
1596   lgc->type = ntohl (sm->type);
1597   lgc->anonymity_level = ntohl (sm->anonymity_level);
1598   switch (lgc->type)
1599     {
1600     case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
1601     case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
1602       lgc->target.hashPubKey = sm->target;
1603       break;
1604     case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
1605       lgc->namespace = sm->target;
1606       break;
1607     default:
1608       break;
1609     }
1610   lgc->query = sm->query;
1611   GNUNET_CONTAINER_DLL_insert (lgc_head, lgc_tail, lgc);
1612   lgc->req = queue_ds_request (GNUNET_TIME_UNIT_FOREVER_REL,
1613                                &transmit_local_get_ready,
1614                                lgc);
1615 }
1616
1617
1618 /**
1619  * List of handlers for the messages understood by this
1620  * service.
1621  */
1622 static struct GNUNET_SERVER_MessageHandler handlers[] = {
1623   {&GNUNET_FS_handle_index_start, NULL, 
1624    GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
1625   {&GNUNET_FS_handle_index_list_get, NULL, 
1626    GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
1627   {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
1628    sizeof (struct UnindexMessage) },
1629   {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
1630    0 },
1631   {NULL, NULL, 0, 0}
1632 };
1633
1634
1635 /**
1636  * Clean up the memory used by the PendingRequest structure (except
1637  * for the client or peer list that the request may be part of).
1638  *
1639  * @param pr request to clean up
1640  */
1641 static void
1642 destroy_pending_request (struct PendingRequest *pr)
1643 {
1644   struct PendingMessage *reply;
1645   struct ClientList *cl;
1646
1647   GNUNET_CONTAINER_multihashmap_remove (requests_by_query,
1648                                         &pr->query,
1649                                         pr);
1650   // FIXME: not sure how this can work (efficiently)
1651   // also, what does the return value mean?
1652   if (pr->irc != NULL)
1653     {
1654       GNUNET_CORE_peer_change_preference_cancel (pr->irc);
1655       pr->irc = NULL;
1656     }
1657   if (pr->client == NULL)
1658     {
1659       GNUNET_CONTAINER_heap_remove_node (requests_by_expiration,
1660                                          pr->hnode);
1661     }
1662   else
1663     {
1664       cl = pr->crl_entry->cl;
1665       GNUNET_CONTAINER_DLL_remove (cl->head,
1666                                    cl->tail,
1667                                    pr->crl_entry);
1668     }
1669   if (GNUNET_SCHEDULER_NO_TASK != pr->task)
1670     GNUNET_SCHEDULER_cancel (sched, pr->task);
1671   if (NULL != pr->cth)
1672     GNUNET_CORE_notify_transmit_ready_cancel (pr->cth);
1673   if (NULL != pr->bf)
1674     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
1675   if (NULL != pr->th)
1676     GNUNET_CONNECTION_notify_transmit_ready_cancel (pr->th);
1677   while (NULL != (reply = pr->replies_pending))
1678     {
1679       pr->replies_pending = reply->next;
1680       GNUNET_free (reply);
1681     }
1682   GNUNET_PEER_change_rc (pr->source_pid, -1);
1683   GNUNET_PEER_change_rc (pr->target_pid, -1);
1684   GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
1685   GNUNET_free_non_null (pr->used_pids);
1686   GNUNET_free_non_null (pr->replies_seen);
1687   GNUNET_free_non_null (pr->namespace);
1688   GNUNET_free (pr);
1689 }
1690
1691
1692 /**
1693  * A client disconnected.  Remove all of its pending queries.
1694  *
1695  * @param cls closure, NULL
1696  * @param client identification of the client
1697  */
1698 static void
1699 handle_client_disconnect (void *cls,
1700                           struct GNUNET_SERVER_Client
1701                           * client)
1702 {
1703   struct LocalGetContext *lgc;
1704   struct ClientList *cpos;
1705   struct ClientList *cprev;
1706   struct ClientRequestList *rl;
1707
1708   if (client == NULL)
1709     return;
1710   lgc = lgc_head;
1711   while ( (NULL != lgc) &&
1712           (lgc->client != client) )
1713     lgc = lgc->next;
1714   if (lgc != NULL)
1715     local_get_context_free (lgc);
1716   cprev = NULL;
1717   cpos = clients;
1718   while ( (NULL != cpos) &&
1719           (clients->client != client) )
1720     {
1721       cprev = cpos;
1722       cpos = cpos->next;
1723     }
1724   if (cpos != NULL)
1725     {
1726       if (cprev == NULL)
1727         clients = cpos->next;
1728       else
1729         cprev->next = cpos->next;
1730       while (NULL != (rl = cpos->head))
1731         {
1732           cpos->head = rl->next;
1733           destroy_pending_request (rl->req);
1734           GNUNET_free (rl);
1735         }
1736       GNUNET_free (cpos);
1737     }
1738 }
1739
1740
1741 /**
1742  * Iterator over entries in the "requests_by_query" map
1743  * that frees all the entries.
1744  *
1745  * @param cls closure, NULL
1746  * @param key current key code (the query, unused) 
1747  * @param value value in the hash map, of type "struct PendingRequest*"
1748  * @return GNUNET_YES (we should continue to  iterate)
1749  */
1750 static int 
1751 destroy_pending_request_cb (void *cls,
1752                             const GNUNET_HashCode * key,
1753                             void *value)
1754 {
1755   struct PendingRequest *pr = value;
1756
1757   destroy_pending_request (pr);
1758   return GNUNET_YES;
1759 }
1760
1761
1762 /**
1763  * Task run during shutdown.
1764  *
1765  * @param cls unused
1766  * @param tc unused
1767  */
1768 static void
1769 shutdown_task (void *cls,
1770                const struct GNUNET_SCHEDULER_TaskContext *tc)
1771 {
1772   if (NULL != core)
1773     {
1774       GNUNET_CORE_disconnect (core);
1775       core = NULL;
1776     }
1777   if (NULL != dsh)
1778     {
1779       GNUNET_DATASTORE_disconnect (dsh,
1780                                    GNUNET_NO);
1781       dsh = NULL;
1782     }
1783   GNUNET_CONTAINER_multihashmap_iterate (requests_by_query,
1784                                          &destroy_pending_request_cb,
1785                                          NULL);
1786   while (clients != NULL)
1787     handle_client_disconnect (NULL,
1788                               clients->client);
1789   GNUNET_CONTAINER_multihashmap_destroy (requests_by_query);
1790   requests_by_query = NULL;
1791   GNUNET_CONTAINER_multihashmap_destroy (requests_by_peer);
1792   requests_by_peer = NULL;
1793   GNUNET_CONTAINER_heap_destroy (requests_by_expiration);
1794   requests_by_expiration = NULL;
1795   // FIXME: iterate over entries and free individually?
1796   // (or do we get disconnect notifications?)
1797   GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
1798   connected_peers = NULL;
1799 }
1800
1801
1802 /**
1803  * Free (each) request made by the peer.
1804  *
1805  * @param cls closure, points to peer that the request belongs to
1806  * @param key current key code
1807  * @param value value in the hash map
1808  * @return GNUNET_YES (we should continue to iterate)
1809  */
1810 static int
1811 destroy_request (void *cls,
1812                  const GNUNET_HashCode * key,
1813                  void *value)
1814 {
1815   const struct GNUNET_PeerIdentity * peer = cls;
1816   struct PendingRequest *pr = value;
1817   
1818   GNUNET_CONTAINER_multihashmap_remove (requests_by_peer,
1819                                         &peer->hashPubKey,
1820                                         pr);
1821   destroy_pending_request (pr);
1822   return GNUNET_YES;
1823 }
1824
1825
1826
1827 /**
1828  * Method called whenever a given peer connects.
1829  *
1830  * @param cls closure, not used
1831  * @param peer peer identity this notification is about
1832  * @param latency reported latency of the connection with 'other'
1833  * @param distance reported distance (DV) to 'other' 
1834  */
1835 static void 
1836 peer_connect_handler (void *cls,
1837                       const struct
1838                       GNUNET_PeerIdentity * peer,
1839                       struct GNUNET_TIME_Relative latency,
1840                       uint32_t distance)
1841 {
1842   struct ConnectedPeer *cp;
1843
1844   cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
1845   cp->pid = GNUNET_PEER_intern (peer);
1846   GNUNET_CONTAINER_multihashmap_put (connected_peers,
1847                                      &peer->hashPubKey,
1848                                      cp,
1849                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
1850 }
1851
1852
1853 /**
1854  * Method called whenever a peer disconnects.
1855  *
1856  * @param cls closure, not used
1857  * @param peer peer identity this notification is about
1858  */
1859 static void
1860 peer_disconnect_handler (void *cls,
1861                          const struct
1862                          GNUNET_PeerIdentity * peer)
1863 {
1864   struct ConnectedPeer *cp;
1865
1866   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1867                                           &peer->hashPubKey);
1868   GNUNET_PEER_change_rc (cp->pid, -1);
1869   GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1870   GNUNET_free (cp);
1871   GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_peer,
1872                                               &peer->hashPubKey,
1873                                               &destroy_request,
1874                                               (void*) peer);
1875 }
1876
1877
1878 /**
1879  * We're processing a GET request from
1880  * another peer and have decided to forward
1881  * it to other peers.
1882  *
1883  * @param cls our "struct ProcessGetContext *"
1884  * @param tc unused
1885  */
1886 static void
1887 forward_get_request (void *cls,
1888                      const struct GNUNET_SCHEDULER_TaskContext *tc)
1889 {
1890   struct ProcessGetContext *pgc = cls;
1891   struct PendingRequest *pr;
1892   struct PendingRequest *eer;
1893   struct GNUNET_PeerIdentity target;
1894
1895   pr = GNUNET_malloc (sizeof (struct PendingRequest));
1896   if (GET_MESSAGE_BIT_SKS_NAMESPACE == (GET_MESSAGE_BIT_SKS_NAMESPACE & pgc->bm))
1897     {
1898       pr->namespace = GNUNET_malloc (sizeof(GNUNET_HashCode));
1899       *pr->namespace = pgc->namespace;
1900     }
1901   pr->bf = pgc->bf;
1902   pr->bf_size = pgc->bf_size;
1903   pgc->bf = NULL;
1904   pr->start_time = pgc->start_time;
1905   pr->query = pgc->query;
1906   pr->source_pid = GNUNET_PEER_intern (&pgc->reply_to);
1907   if (GET_MESSAGE_BIT_TRANSMIT_TO == (GET_MESSAGE_BIT_TRANSMIT_TO & pgc->bm))
1908     pr->target_pid = GNUNET_PEER_intern (&pgc->prime_target);
1909   pr->anonymity_level = 1; /* default */
1910   pr->priority = pgc->priority;
1911   pr->remaining_priority = pr->priority;
1912   pr->mingle = pgc->mingle;
1913   pr->ttl = pgc->ttl; 
1914   pr->type = pgc->type;
1915   GNUNET_CONTAINER_multihashmap_put (requests_by_query,
1916                                      &pr->query,
1917                                      pr,
1918                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1919   GNUNET_CONTAINER_multihashmap_put (requests_by_peer,
1920                                      &pgc->reply_to.hashPubKey,
1921                                      pr,
1922                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1923   pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration,
1924                                             pr,
1925                                             pr->start_time.value + pr->ttl);
1926   if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) > max_pending_requests)
1927     {
1928       /* expire oldest request! */
1929       eer = GNUNET_CONTAINER_heap_peek (requests_by_expiration);
1930       GNUNET_PEER_resolve (eer->source_pid,
1931                            &target);    
1932       GNUNET_CONTAINER_multihashmap_remove (requests_by_peer,
1933                                             &target.hashPubKey,
1934                                             eer);
1935       destroy_pending_request (eer);     
1936     }
1937   pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1938                                            get_processing_delay (),
1939                                            &forward_request_task,
1940                                            pr);
1941   GNUNET_free (pgc); 
1942 }
1943 /**
1944  * Transmit the given message by copying it to
1945  * the target buffer "buf".  "buf" will be
1946  * NULL and "size" zero if the socket was closed for
1947  * writing in the meantime.  In that case, only
1948
1949  * free the message
1950  *
1951  * @param cls closure, pointer to the message
1952  * @param size number of bytes available in buf
1953  * @param buf where the callee should write the message
1954  * @return number of bytes written to buf
1955  */
1956 static size_t
1957 transmit_message (void *cls,
1958                   size_t size, void *buf)
1959 {
1960   struct GNUNET_MessageHeader *msg = cls;
1961   uint16_t msize;
1962   
1963   if (NULL == buf)
1964     {
1965 #if DEBUG_FS
1966       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1967                   "Dropping reply, core too busy.\n");
1968 #endif
1969       GNUNET_free (msg);
1970       return 0;
1971     }
1972   msize = ntohs (msg->size);
1973   GNUNET_assert (size >= msize);
1974   memcpy (buf, msg, msize);
1975   GNUNET_free (msg);
1976   return msize;
1977 }
1978
1979
1980 /**
1981  * Test if the load on this peer is too high
1982  * to even consider processing the query at
1983  * all.
1984  * 
1985  * @return GNUNET_YES if the load is too high, GNUNET_NO otherwise
1986  */
1987 static int
1988 test_load_too_high ()
1989 {
1990   return GNUNET_NO; // FIXME
1991 }
1992
1993
1994 /**
1995  * We're processing (local) results for a search request
1996  * from another peer.  Pass applicable results to the
1997  * peer and if we are done either clean up (operation
1998  * complete) or forward to other peers (more results possible).
1999  *
2000  * @param cls our closure (struct LocalGetContext)
2001  * @param key key for the content
2002  * @param size number of bytes in data
2003  * @param data content stored
2004  * @param type type of the content
2005  * @param priority priority of the content
2006  * @param anonymity anonymity-level for the content
2007  * @param expiration expiration time for the content
2008  * @param uid unique identifier for the datum;
2009  *        maybe 0 if no unique identifier is available
2010  */
2011 static void
2012 process_p2p_get_result (void *cls,
2013                         const GNUNET_HashCode * key,
2014                         uint32_t size,
2015                         const void *data,
2016                         uint32_t type,
2017                         uint32_t priority,
2018                         uint32_t anonymity,
2019                         struct GNUNET_TIME_Absolute
2020                         expiration, 
2021                         uint64_t uid)
2022 {
2023   struct ProcessGetContext *pgc = cls;
2024   GNUNET_HashCode dhash;
2025   GNUNET_HashCode mhash;
2026   struct PutMessage *reply;
2027   
2028   if (NULL == key)
2029     {
2030       /* no more results */
2031       if ( ( (pgc->policy & ROUTING_POLICY_FORWARD) ==  ROUTING_POLICY_FORWARD) &&
2032            ( (0 == pgc->results_found) ||
2033              (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
2034              (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
2035              (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) ) )
2036         {
2037           GNUNET_SCHEDULER_add_continuation (sched,
2038                                              &forward_get_request,
2039                                              pgc,
2040                                              GNUNET_SCHEDULER_REASON_PREREQ_DONE);
2041         }
2042       else
2043         {
2044           if (pgc->bf != NULL)
2045             GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2046           GNUNET_free (pgc); 
2047         }
2048       next_ds_request ();
2049       return;
2050     }
2051   if (type == GNUNET_DATASTORE_BLOCKTYPE_ONDEMAND)
2052     {
2053       GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, 
2054                                         anonymity, expiration, uid, dsh,
2055                                         &process_p2p_get_result,
2056                                         pgc);
2057       return;
2058     }
2059   /* check for duplicates */
2060   GNUNET_CRYPTO_hash (data, size, &dhash);
2061   mingle_hash (&dhash, 
2062                pgc->mingle,
2063                &mhash);
2064   if ( (pgc->bf != NULL) &&
2065        (GNUNET_YES ==
2066         GNUNET_CONTAINER_bloomfilter_test (pgc->bf,
2067                                            &mhash)) )
2068     {      
2069 #if DEBUG_FS
2070       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2071                   "Result from datastore filtered by bloomfilter.\n");
2072 #endif
2073       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2074       return;
2075     }
2076   pgc->results_found++;
2077   if ( (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
2078        (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
2079        (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) )
2080     {
2081       if (pgc->bf == NULL)
2082         {
2083           pgc->bf_size = 32;
2084           pgc->bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
2085                                                        pgc->bf_size, 
2086                                                        BLOOMFILTER_K);
2087         }
2088       GNUNET_CONTAINER_bloomfilter_add (pgc->bf, 
2089                                         &mhash);
2090     }
2091
2092   reply = GNUNET_malloc (sizeof (struct PutMessage) + size);
2093   reply->header.size = htons (sizeof (struct PutMessage) + size);
2094   reply->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2095   reply->type = htonl (type);
2096   reply->expiration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (expiration));
2097   memcpy (&reply[1], data, size);
2098   GNUNET_CORE_notify_transmit_ready (core,
2099                                      pgc->priority,
2100                                      ACCEPTABLE_REPLY_DELAY,
2101                                      &pgc->reply_to,
2102                                      sizeof (struct PutMessage) + size,
2103                                      &transmit_message,
2104                                      reply);
2105   if ( (GNUNET_YES == test_load_too_high()) ||
2106        (pgc->results_found > 5 + 2 * pgc->priority) )
2107     {
2108       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
2109       pgc->policy &= ~ ROUTING_POLICY_FORWARD;
2110       return;
2111     }
2112   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2113 }
2114   
2115
2116 /**
2117  * We're processing a GET request from another peer.  Give it to our
2118  * local datastore.
2119  *
2120  * @param cls our "struct ProcessGetContext"
2121  * @param ok did we get a datastore slice or not?
2122  */
2123 static void
2124 ds_get_request (void *cls, 
2125                 int ok)
2126 {
2127   struct ProcessGetContext *pgc = cls;
2128   uint32_t type;
2129   struct GNUNET_TIME_Relative timeout;
2130
2131   if (GNUNET_OK != ok)
2132     {
2133       /* no point in doing P2P stuff if we can't even do local */
2134       GNUNET_free (dsh);
2135       return;
2136     }
2137   type = pgc->type;
2138   if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK)
2139     type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */
2140   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
2141                                            (pgc->priority + 1));
2142   GNUNET_DATASTORE_get (dsh,
2143                         &pgc->query,
2144                         type,
2145                         &process_p2p_get_result,
2146                         pgc,
2147                         timeout);
2148 }
2149
2150
2151 /**
2152  * The priority level imposes a bound on the maximum
2153  * value for the ttl that can be requested.
2154  *
2155  * @param ttl_in requested ttl
2156  * @param prio given priority
2157  * @return ttl_in if ttl_in is below the limit,
2158  *         otherwise the ttl-limit for the given priority
2159  */
2160 static int32_t
2161 bound_ttl (int32_t ttl_in, uint32_t prio)
2162 {
2163   unsigned long long allowed;
2164
2165   if (ttl_in <= 0)
2166     return ttl_in;
2167   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
2168   if (ttl_in > allowed)      
2169     {
2170       if (allowed >= (1 << 30))
2171         return 1 << 30;
2172       return allowed;
2173     }
2174   return ttl_in;
2175 }
2176
2177
2178 /**
2179  * We've received a request with the specified
2180  * priority.  Bound it according to how much
2181  * we trust the given peer.
2182  * 
2183  * @param prio_in requested priority
2184  * @param peer the peer making the request
2185  * @return effective priority
2186  */
2187 static uint32_t
2188 bound_priority (uint32_t prio_in,
2189                 const struct GNUNET_PeerIdentity *peer)
2190 {
2191   return 0; // FIXME!
2192 }
2193
2194
2195 /**
2196  * Handle P2P "GET" request.
2197  *
2198  * @param cls closure, always NULL
2199  * @param other the other peer involved (sender or receiver, NULL
2200  *        for loopback messages where we are both sender and receiver)
2201  * @param message the actual message
2202  * @param latency reported latency of the connection with 'other'
2203  * @param distance reported distance (DV) to 'other' 
2204  * @return GNUNET_OK to keep the connection open,
2205  *         GNUNET_SYSERR to close it (signal serious error)
2206  */
2207 static int
2208 handle_p2p_get (void *cls,
2209                 const struct GNUNET_PeerIdentity *other,
2210                 const struct GNUNET_MessageHeader *message,
2211                                   struct GNUNET_TIME_Relative latency,
2212                                   uint32_t distance)
2213 {
2214   uint16_t msize;
2215   const struct GetMessage *gm;
2216   unsigned int bits;
2217   const GNUNET_HashCode *opt;
2218   struct ProcessGetContext *pgc;
2219   uint32_t bm;
2220   size_t bfsize;
2221   uint32_t ttl_decrement;
2222   double preference;
2223   int net_load_up;
2224   int net_load_down;
2225
2226   msize = ntohs(message->size);
2227   if (msize < sizeof (struct GetMessage))
2228     {
2229       GNUNET_break_op (0);
2230       return GNUNET_SYSERR;
2231     }
2232   gm = (const struct GetMessage*) message;
2233   bm = ntohl (gm->hash_bitmap);
2234   bits = 0;
2235   while (bm > 0)
2236     {
2237       if (1 == (bm & 1))
2238         bits++;
2239       bm >>= 1;
2240     }
2241   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
2242     {
2243       GNUNET_break_op (0);
2244       return GNUNET_SYSERR;
2245     }  
2246   opt = (const GNUNET_HashCode*) &gm[1];
2247   bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
2248   pgc = GNUNET_malloc (sizeof (struct ProcessGetContext));
2249   if (bfsize > 0)
2250     {
2251       pgc->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &pgc[1],
2252                                                    bfsize,
2253                                                    BLOOMFILTER_K);
2254       pgc->bf_size = bfsize;
2255     }
2256   pgc->type = ntohl (gm->type);
2257   pgc->bm = ntohl (gm->hash_bitmap);
2258   pgc->mingle = gm->filter_mutator;
2259   bits = 0;
2260   if (0 != (pgc->bm & GET_MESSAGE_BIT_RETURN_TO))
2261     pgc->reply_to.hashPubKey = opt[bits++];
2262   else
2263     pgc->reply_to = *other;
2264   if (0 != (pgc->bm & GET_MESSAGE_BIT_SKS_NAMESPACE))
2265     pgc->namespace = opt[bits++];
2266   else if (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)
2267     {
2268       GNUNET_break_op (0);
2269       GNUNET_free (pgc);
2270       return GNUNET_SYSERR;
2271     }
2272   if (0 != (pgc->bm & GET_MESSAGE_BIT_TRANSMIT_TO))
2273     pgc->prime_target.hashPubKey = opt[bits++];
2274   /* note that we can really only check load here since otherwise
2275      peers could find out that we are overloaded by being disconnected
2276      after sending us a malformed query... */
2277   if (GNUNET_YES == test_load_too_high ())
2278     {
2279       if (NULL != pgc->bf)
2280         GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2281       GNUNET_free (pgc);
2282 #if DEBUG_FS
2283       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2284                   "Dropping query from `%s', this peer is too busy.\n",
2285                   GNUNET_i2s (other));
2286 #endif
2287       return GNUNET_OK;
2288     }
2289   net_load_up = 50; // FIXME
2290   net_load_down = 50; // FIXME
2291   pgc->policy = ROUTING_POLICY_NONE;
2292   if ( (net_load_up < IDLE_LOAD_THRESHOLD) &&
2293        (net_load_down < IDLE_LOAD_THRESHOLD) )
2294     {
2295       pgc->policy |= ROUTING_POLICY_ALL;
2296       pgc->priority = 0; /* no charge */
2297     }
2298   else
2299     {
2300       pgc->priority = bound_priority (ntohl (gm->priority), other);
2301       if ( (net_load_up < 
2302             IDLE_LOAD_THRESHOLD + pgc->priority * pgc->priority) &&
2303            (net_load_down < 
2304             IDLE_LOAD_THRESHOLD + pgc->priority * pgc->priority) )
2305         {
2306           pgc->policy |= ROUTING_POLICY_ALL;
2307         }
2308       else
2309         {
2310           // FIXME: is this sound?
2311           if (net_load_up < 90 + 10 * pgc->priority)
2312             pgc->policy |= ROUTING_POLICY_FORWARD;
2313           if (net_load_down < 90 + 10 * pgc->priority)
2314             pgc->policy |= ROUTING_POLICY_ANSWER;
2315         }
2316     }
2317   if (pgc->policy == ROUTING_POLICY_NONE)
2318     {
2319 #if DEBUG_FS
2320       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2321                   "Dropping query from `%s', network saturated.\n",
2322                   GNUNET_i2s (other));
2323 #endif
2324       if (NULL != pgc->bf)
2325         GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2326       GNUNET_free (pgc);
2327       return GNUNET_OK;     /* drop */
2328     }
2329   if ((pgc->policy & ROUTING_POLICY_INDIRECT) != ROUTING_POLICY_INDIRECT)
2330     pgc->priority = 0;  /* kill the priority (we cannot benefit) */
2331   pgc->ttl = bound_ttl (ntohl (gm->ttl), pgc->priority);
2332   /* decrement ttl (always) */
2333   ttl_decrement = 2 * TTL_DECREMENT +
2334     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2335                               TTL_DECREMENT);
2336   if ( (pgc->ttl < 0) &&
2337        (pgc->ttl - ttl_decrement > 0) )
2338     {
2339 #if DEBUG_FS
2340       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2341                   "Dropping query from `%s' due to TTL underflow.\n",
2342                   GNUNET_i2s (other));
2343 #endif
2344       /* integer underflow => drop (should be very rare)! */
2345       if (NULL != pgc->bf)
2346         GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2347       GNUNET_free (pgc);
2348       return GNUNET_OK;
2349     }
2350   pgc->ttl -= ttl_decrement;
2351   pgc->start_time = GNUNET_TIME_absolute_get ();
2352   preference = (double) pgc->priority;
2353   if (preference < QUERY_BANDWIDTH_VALUE)
2354     preference = QUERY_BANDWIDTH_VALUE;
2355   // FIXME: also reserve bandwidth for reply?
2356   (void) GNUNET_CORE_peer_change_preference (sched, cfg,
2357                                              other,
2358                                              GNUNET_TIME_UNIT_FOREVER_REL,
2359                                              0, 0, preference, NULL, NULL);
2360   if (0 != (pgc->policy & ROUTING_POLICY_ANSWER))
2361     pgc->drq = queue_ds_request (BASIC_DATASTORE_REQUEST_DELAY,
2362                                  &ds_get_request,
2363                                  pgc);
2364   else
2365     GNUNET_SCHEDULER_add_continuation (sched,
2366                                        &forward_get_request,
2367                                        pgc,
2368                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
2369   return GNUNET_OK;
2370 }
2371
2372
2373 /**
2374  * Function called to notify us that we can now transmit a reply to a
2375  * client or peer.  "buf" will be NULL and "size" zero if the socket was
2376  * closed for writing in the meantime.
2377  *
2378  * @param cls closure, points to a "struct PendingRequest*" with
2379  *            one or more pending replies
2380  * @param size number of bytes available in buf
2381  * @param buf where the callee should write the message
2382  * @return number of bytes written to buf
2383  */
2384 static size_t
2385 transmit_result (void *cls,
2386                  size_t size, 
2387                  void *buf)
2388 {
2389   struct PendingRequest *pr = cls;
2390   char *cbuf = buf;
2391   struct PendingMessage *reply;
2392   size_t ret;
2393
2394   ret = 0;
2395   while (NULL != (reply = pr->replies_pending))
2396     {
2397       if ( (reply->msize + ret < ret) ||
2398            (reply->msize + ret > size) )
2399         break;
2400       pr->replies_pending = reply->next;
2401       memcpy (&cbuf[ret], &reply[1], reply->msize);
2402       ret += reply->msize;
2403       GNUNET_free (reply);
2404     }
2405   return ret;
2406 }
2407
2408
2409 /**
2410  * Iterator over pending requests.
2411  *
2412  * @param cls response (struct ProcessReplyClosure)
2413  * @param key our query
2414  * @param value value in the hash map (meta-info about the query)
2415  * @return GNUNET_YES (we should continue to iterate)
2416  */
2417 static int
2418 process_reply (void *cls,
2419                const GNUNET_HashCode * key,
2420                void *value)
2421 {
2422   struct ProcessReplyClosure *prq = cls;
2423   struct PendingRequest *pr = value;
2424   struct PendingRequest *eer;
2425   struct PendingMessage *reply;
2426   struct PutMessage *pm;
2427   struct ContentMessage *cm;
2428   GNUNET_HashCode chash;
2429   GNUNET_HashCode mhash;
2430   struct GNUNET_PeerIdentity target;
2431   size_t msize;
2432   uint32_t prio;
2433   struct GNUNET_TIME_Relative max_delay;
2434   
2435   GNUNET_CRYPTO_hash (prq->data,
2436                       prq->size,
2437                       &chash);
2438   switch (prq->type)
2439     {
2440     case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
2441     case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
2442       break;
2443     case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
2444       /* FIXME: does prq->namespace match our expectations? */
2445       /* then: fall-through??? */
2446     case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
2447       if (pr->bf != NULL) 
2448         {
2449           mingle_hash (&chash, pr->mingle, &mhash);
2450           if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (pr->bf,
2451                                                                &mhash))
2452             return GNUNET_YES; /* duplicate */
2453           GNUNET_CONTAINER_bloomfilter_add (pr->bf,
2454                                             &mhash);
2455         }
2456       break;
2457     case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK:
2458       // FIXME: any checks against duplicates for SKBlocks?
2459       break;
2460     }
2461   prio = pr->priority;
2462   prq->priority += pr->remaining_priority;
2463   pr->remaining_priority = 0;
2464   if (pr->client != NULL)
2465     {
2466       if (pr->replies_seen_size == pr->replies_seen_off)
2467         GNUNET_array_grow (pr->replies_seen,
2468                            pr->replies_seen_size,
2469                            pr->replies_seen_size * 2 + 4);
2470       pr->replies_seen[pr->replies_seen_off++] = chash;
2471       // FIXME: possibly recalculate BF!
2472     }
2473   if (pr->client == NULL)
2474     {
2475       msize = sizeof (struct ContentMessage) + prq->size;
2476       reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
2477       reply->msize = msize;
2478       cm = (struct ContentMessage*) &reply[1];
2479       cm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT);
2480       cm->header.size = htons (msize);
2481       cm->type = htonl (prq->type);
2482       cm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
2483       reply->next = pr->replies_pending;
2484       pr->replies_pending = reply;
2485       memcpy (&reply[1], prq->data, prq->size);
2486       if (pr->cth != NULL)
2487         return GNUNET_YES;
2488       max_delay = GNUNET_TIME_UNIT_FOREVER_REL;
2489       if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) >= max_pending_requests)
2490         {
2491           /* estimate expiration time from time difference between
2492              first request that will be discarded and this request */
2493           eer = GNUNET_CONTAINER_heap_peek (requests_by_expiration);
2494           max_delay = GNUNET_TIME_absolute_get_difference (pr->start_time,
2495                                                            eer->start_time);
2496         }
2497       GNUNET_PEER_resolve (pr->source_pid,
2498                            &target);
2499       pr->cth = GNUNET_CORE_notify_transmit_ready (core,
2500                                                    prio,
2501                                                    max_delay,
2502                                                    &target,
2503                                                    msize,
2504                                                    &transmit_result,
2505                                                    pr);
2506       if (NULL == pr->cth)
2507         {
2508           // FIXME: now what? discard?
2509         }
2510     }
2511   else
2512     {
2513       msize = sizeof (struct PutMessage) + prq->size;
2514       reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
2515       reply->msize = msize;
2516       reply->next = pr->replies_pending;
2517       pm = (struct PutMessage*) &reply[1];
2518       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2519       pm->header.size = htons (msize);
2520       pm->type = htonl (prq->type);
2521       pm->expiration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (prq->expiration));
2522       pr->replies_pending = reply;
2523       memcpy (&reply[1], prq->data, prq->size);
2524       if (pr->th != NULL)
2525         return GNUNET_YES;
2526       pr->th = GNUNET_SERVER_notify_transmit_ready (pr->client,
2527                                                     msize,
2528                                                     GNUNET_TIME_UNIT_FOREVER_REL,
2529                                                     &transmit_result,
2530                                                     pr);
2531       if (pr->th == NULL)
2532         {
2533           // FIXME: need to try again later (not much
2534           // to do here specifically, but we need to
2535           // check somewhere else to handle this case!)
2536         }
2537     }
2538   // FIXME: implement hot-path routing statistics keeping!
2539   return GNUNET_YES;
2540 }
2541
2542
2543 /**
2544  * Check if the given KBlock is well-formed.
2545  *
2546  * @param kb the kblock data (or at least "dsize" bytes claiming to be one)
2547  * @param dsize size of "kb" in bytes; check for < sizeof(struct KBlock)!
2548  * @param query where to store the query that this block answers
2549  * @return GNUNET_OK if this is actually a well-formed KBlock
2550  */
2551 static int
2552 check_kblock (const struct KBlock *kb,
2553               size_t dsize,
2554               GNUNET_HashCode *query)
2555 {
2556   if (dsize < sizeof (struct KBlock))
2557     {
2558       GNUNET_break_op (0);
2559       return GNUNET_SYSERR;
2560     }
2561   if (dsize - sizeof (struct KBlock) !=
2562       ntohs (kb->purpose.size) 
2563       - sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) 
2564       - sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) ) 
2565     {
2566       GNUNET_break_op (0);
2567       return GNUNET_SYSERR;
2568     }
2569   if (GNUNET_OK !=
2570       GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_KBLOCK,
2571                                 &kb->purpose,
2572                                 &kb->signature,
2573                                 &kb->keyspace)) 
2574     {
2575       GNUNET_break_op (0);
2576       return GNUNET_SYSERR;
2577     }
2578   if (query != NULL)
2579     GNUNET_CRYPTO_hash (&kb->keyspace,
2580                         sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
2581                         query);
2582   return GNUNET_OK;
2583 }
2584
2585
2586 /**
2587  * Check if the given SBlock is well-formed.
2588  *
2589  * @param sb the sblock data (or at least "dsize" bytes claiming to be one)
2590  * @param dsize size of "kb" in bytes; check for < sizeof(struct SBlock)!
2591  * @param query where to store the query that this block answers
2592  * @param namespace where to store the namespace that this block belongs to
2593  * @return GNUNET_OK if this is actually a well-formed SBlock
2594  */
2595 static int
2596 check_sblock (const struct SBlock *sb,
2597               size_t dsize,
2598               GNUNET_HashCode *query,   
2599               GNUNET_HashCode *namespace)
2600 {
2601   if (dsize < sizeof (struct SBlock))
2602     {
2603       GNUNET_break_op (0);
2604       return GNUNET_SYSERR;
2605     }
2606   if (dsize !=
2607       ntohs (sb->purpose.size) + sizeof (struct GNUNET_CRYPTO_RsaSignature))
2608     {
2609       GNUNET_break_op (0);
2610       return GNUNET_SYSERR;
2611     }
2612   if (GNUNET_OK !=
2613       GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_SBLOCK,
2614                                 &sb->purpose,
2615                                 &sb->signature,
2616                                 &sb->subspace)) 
2617     {
2618       GNUNET_break_op (0);
2619       return GNUNET_SYSERR;
2620     }
2621   if (query != NULL)
2622     *query = sb->identifier;
2623   if (namespace != NULL)
2624     GNUNET_CRYPTO_hash (&sb->subspace,
2625                         sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
2626                         namespace);
2627   return GNUNET_OK;
2628 }
2629
2630
2631 /**
2632  * Handle P2P "PUT" request.
2633  *
2634  * @param cls closure, always NULL
2635  * @param other the other peer involved (sender or receiver, NULL
2636  *        for loopback messages where we are both sender and receiver)
2637  * @param message the actual message
2638  * @param latency reported latency of the connection with 'other'
2639  * @param distance reported distance (DV) to 'other' 
2640  * @return GNUNET_OK to keep the connection open,
2641  *         GNUNET_SYSERR to close it (signal serious error)
2642  */
2643 static int
2644 handle_p2p_put (void *cls,
2645                 const struct GNUNET_PeerIdentity *other,
2646                 const struct GNUNET_MessageHeader *message,
2647                 struct GNUNET_TIME_Relative latency,
2648                 uint32_t distance)
2649 {
2650   const struct PutMessage *put;
2651   uint16_t msize;
2652   size_t dsize;
2653   uint32_t type;
2654   struct GNUNET_TIME_Absolute expiration;
2655   GNUNET_HashCode query;
2656   struct ProcessReplyClosure prq;
2657
2658   msize = ntohs (message->size);
2659   if (msize < sizeof (struct PutMessage))
2660     {
2661       GNUNET_break_op(0);
2662       return GNUNET_SYSERR;
2663     }
2664   put = (const struct PutMessage*) message;
2665   dsize = msize - sizeof (struct PutMessage);
2666   type = ntohl (put->type);
2667   expiration = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (put->expiration));
2668
2669   /* first, validate! */
2670   switch (type)
2671     {
2672     case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
2673     case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
2674       GNUNET_CRYPTO_hash (&put[1], dsize, &query);
2675       break;
2676     case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
2677       if (GNUNET_OK !=
2678           check_kblock ((const struct KBlock*) &put[1],
2679                         dsize,
2680                         &query))
2681         return GNUNET_SYSERR;
2682       break;
2683     case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
2684       if (GNUNET_OK !=
2685           check_sblock ((const struct SBlock*) &put[1],
2686                         dsize,
2687                         &query,
2688                         &prq.namespace))
2689         return GNUNET_SYSERR;
2690       break;
2691     case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK:
2692       // FIXME -- validate SKBLOCK!
2693       GNUNET_break (0);
2694       return GNUNET_OK;
2695     default:
2696       /* unknown block type */
2697       GNUNET_break_op (0);
2698       return GNUNET_SYSERR;
2699     }
2700
2701   /* now, lookup 'query' */
2702   prq.data = (const void*) &put[1];
2703   prq.size = dsize;
2704   prq.type = type;
2705   prq.expiration = expiration;
2706   prq.priority = 0;
2707   GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_query,
2708                                               &query,
2709                                               &process_reply,
2710                                               &prq);
2711   // FIXME: if migration is on and load is low,
2712   // queue to store data in datastore;
2713   // use "prq.priority" for that!
2714   return GNUNET_OK;
2715 }
2716
2717
2718 /**
2719  * List of handlers for P2P messages
2720  * that we care about.
2721  */
2722 static struct GNUNET_CORE_MessageHandler p2p_handlers[] =
2723   {
2724     { &handle_p2p_get, 
2725       GNUNET_MESSAGE_TYPE_FS_GET, 0 },
2726     { &handle_p2p_put, 
2727       GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
2728     { NULL, 0, 0 }
2729   };
2730
2731
2732 /**
2733  * Process fs requests.
2734  *
2735  * @param cls closure
2736  * @param s scheduler to use
2737  * @param server the initialized server
2738  * @param c configuration to use
2739  */
2740 static void
2741 run (void *cls,
2742      struct GNUNET_SCHEDULER_Handle *s,
2743      struct GNUNET_SERVER_Handle *server,
2744      const struct GNUNET_CONFIGURATION_Handle *c)
2745 {
2746   sched = s;
2747   cfg = c;
2748
2749
2750   requests_by_query = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
2751   requests_by_peer = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
2752   connected_peers = GNUNET_CONTAINER_multihashmap_create (64);
2753   requests_by_expiration = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
2754   GNUNET_FS_init_indexing (sched, cfg);
2755   dsh = GNUNET_DATASTORE_connect (cfg,
2756                                   sched);
2757   core = GNUNET_CORE_connect (sched,
2758                               cfg,
2759                               GNUNET_TIME_UNIT_FOREVER_REL,
2760                               NULL,
2761                               NULL,
2762                               NULL,
2763                               &peer_connect_handler,
2764                               &peer_disconnect_handler,
2765                               NULL, GNUNET_NO,
2766                               NULL, GNUNET_NO,
2767                               p2p_handlers);
2768
2769   GNUNET_SERVER_disconnect_notify (server, 
2770                                    &handle_client_disconnect,
2771                                    NULL);
2772   GNUNET_SERVER_add_handlers (server, handlers);
2773   GNUNET_SCHEDULER_add_delayed (sched,
2774                                 GNUNET_TIME_UNIT_FOREVER_REL,
2775                                 &shutdown_task,
2776                                 NULL);
2777   if (NULL == dsh)
2778     {
2779       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2780                   _("Failed to connect to `%s' service.\n"),
2781                   "datastore");
2782       GNUNET_SCHEDULER_shutdown (sched);
2783       return;
2784     }
2785   if (NULL == core)
2786     {
2787       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2788                   _("Failed to connect to `%s' service.\n"),
2789                   "core");
2790       GNUNET_SCHEDULER_shutdown (sched);
2791       return;
2792     }
2793 }
2794
2795
2796 /**
2797  * The main function for the fs service.
2798  *
2799  * @param argc number of arguments from the command line
2800  * @param argv command line arguments
2801  * @return 0 ok, 1 on error
2802  */
2803 int
2804 main (int argc, char *const *argv)
2805 {
2806   return (GNUNET_OK ==
2807           GNUNET_SERVICE_run (argc,
2808                               argv,
2809                               "fs",
2810                               GNUNET_SERVICE_OPTION_NONE,
2811                               &run, NULL)) ? 0 : 1;
2812 }
2813
2814 /* end of gnunet-service-fs.c */