stuff
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs.c
23  * @brief program that provides the file-sharing service
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - fix gazillion of minor FIXME's
28  * - possible major issue: we may queue "gazillions" of (K|S)Blocks for the
29  *   core to transmit to another peer; need to make sure this is bounded overall...
30  * - randomly delay processing for improved anonymity (can wait)
31  * - content migration (put in local DS) (can wait)
32  * - handle some special cases when forwarding replies based on tracked requests (can wait)
33  * - tracking of success correlations for hot-path routing (can wait)
34  * - various load-based actions (can wait)
35  * - validation of KSBLOCKS (can wait)
36  * - remove on-demand blocks if they keep failing (can wait)
37  * - check that we decrement PIDs always where necessary (can wait)
38  * - find out how to use core-pulling instead of pushing (at least for some cases)
39  */
40 #include "platform.h"
41 #include <float.h>
42 #include "gnunet_core_service.h"
43 #include "gnunet_datastore_service.h"
44 #include "gnunet_peer_lib.h"
45 #include "gnunet_protocols.h"
46 #include "gnunet_signatures.h"
47 #include "gnunet_util_lib.h"
48 #include "gnunet-service-fs_indexing.h"
49 #include "fs.h"
50
51 #define DEBUG_FS GNUNET_NO
52
53 /**
54  * Signature of a function that is called whenever a datastore
55  * request can be processed (or an entry put on the queue times out).
56  *
57  * @param cls closure
58  * @param ok GNUNET_OK if DS is ready, GNUNET_SYSERR on timeout
59  */
60 typedef void (*RequestFunction)(void *cls,
61                                 int ok);
62
63
64 /**
65  * Doubly-linked list of our requests for the datastore.
66  */
67 struct DatastoreRequestQueue
68 {
69
70   /**
71    * This is a doubly-linked list.
72    */
73   struct DatastoreRequestQueue *next;
74
75   /**
76    * This is a doubly-linked list.
77    */
78   struct DatastoreRequestQueue *prev;
79
80   /**
81    * Function to call (will issue the request).
82    */
83   RequestFunction req;
84
85   /**
86    * Closure for req.
87    */
88   void *req_cls;
89
90   /**
91    * When should this request time-out because we don't care anymore?
92    */
93   struct GNUNET_TIME_Absolute timeout;
94     
95   /**
96    * ID of task used for signaling timeout.
97    */
98   GNUNET_SCHEDULER_TaskIdentifier task;
99
100 };
101
102
103 /**
104  * Closure for processing START_SEARCH messages from a client.
105  */
106 struct LocalGetContext
107 {
108
109   /**
110    * This is a doubly-linked list.
111    */
112   struct LocalGetContext *next;
113
114   /**
115    * This is a doubly-linked list.
116    */
117   struct LocalGetContext *prev;
118
119   /**
120    * Client that initiated the search.
121    */
122   struct GNUNET_SERVER_Client *client;
123
124   /**
125    * Array of results that we've already received 
126    * (can be NULL).
127    */
128   GNUNET_HashCode *results; 
129
130   /**
131    * Bloomfilter over all results (for fast query construction);
132    * NULL if we don't have any results.
133    *
134    * FIXME: this member is not used, is that OK? If so, it should
135    * be removed!
136    */
137   struct GNUNET_CONTAINER_BloomFilter *results_bf; 
138
139   /**
140    * DS request associated with this operation.
141    */
142   struct DatastoreRequestQueue *req;
143
144   /**
145    * Current result message to transmit to client (or NULL).
146    */
147   struct ContentMessage *result;
148   
149   /**
150    * Type of the content that we're looking for.
151    * 0 for any.
152    */
153   uint32_t type;
154
155   /**
156    * Desired anonymity level.
157    */
158   uint32_t anonymity_level;
159
160   /**
161    * Number of results actually stored in the results array.
162    */
163   unsigned int results_used;
164   
165   /**
166    * Size of the results array in memory.
167    */
168   unsigned int results_size;
169   
170   /**
171    * Size (in bytes) of the 'results_bf' bloomfilter.
172    *
173    * FIXME: this member is not used, is that OK? If so, it should
174    * be removed!
175    */
176   size_t results_bf_size;
177
178   /**
179    * If the request is for a DBLOCK or IBLOCK, this is the identity of
180    * the peer that is known to have a response.  Set to all-zeros if
181    * such a target is not known (note that even if OUR anonymity
182    * level is >0 we may happen to know the responder's identity;
183    * nevertheless, we should probably not use it for a DHT-lookup
184    * or similar blunt actions in order to avoid exposing ourselves).
185    */
186   struct GNUNET_PeerIdentity target;
187
188   /**
189    * If the request is for an SBLOCK, this is the identity of the
190    * pseudonym to which the SBLOCK belongs. 
191    */
192   GNUNET_HashCode namespace;
193
194   /**
195    * Hash of the keyword (aka query) for KBLOCKs; Hash of
196    * the CHK-encoded block for DBLOCKS and IBLOCKS (aka query)
197    * and hash of the identifier XORed with the target for
198    * SBLOCKS (aka query).
199    */
200   GNUNET_HashCode query;
201
202 };
203
204
205 /**
206  * Possible routing policies for an FS-GET request.
207  */
208 enum RoutingPolicy
209   {
210     /**
211      * Simply drop the request.
212      */
213     ROUTING_POLICY_NONE = 0,
214     
215     /**
216      * Answer it if we can from local datastore.
217      */
218     ROUTING_POLICY_ANSWER = 1,
219
220     /**
221      * Forward the request to other peers (if possible).
222      */
223     ROUTING_POLICY_FORWARD = 2,
224
225     /**
226      * Forward to other peers, and ask them to route
227      * the response via ourselves.
228      */
229     ROUTING_POLICY_INDIRECT = 6,
230     
231     /**
232      * Do everything we could possibly do (that would
233      * make sense).
234      */
235     ROUTING_POLICY_ALL = 7
236   };
237
238
239 /**
240  * Internal context we use for our initial processing
241  * of a GET request.
242  */
243 struct ProcessGetContext
244 {
245   /**
246    * The search query (used for datastore lookup).
247    */
248   GNUNET_HashCode query;
249   
250   /**
251    * Which peer we should forward the response to.
252    */
253   struct GNUNET_PeerIdentity reply_to;
254
255   /**
256    * Namespace for the result (only set for SKS requests)
257    */
258   GNUNET_HashCode namespace;
259
260   /**
261    * Peer that we should forward the query to if possible
262    * (since that peer likely has the content).
263    */
264   struct GNUNET_PeerIdentity prime_target;
265
266   /**
267    * When did we receive this request?
268    */
269   struct GNUNET_TIME_Absolute start_time;
270
271   /**
272    * Our entry in the DRQ (non-NULL while we wait for our
273    * turn to interact with the local database).
274    */
275   struct DatastoreRequestQueue *drq;
276
277   /**
278    * Filter used to eliminate duplicate results.  Can be NULL if we
279    * are not yet filtering any results.
280    */
281   struct GNUNET_CONTAINER_BloomFilter *bf;
282
283   /**
284    * Bitmap describing which of the optional
285    * hash codes / peer identities were given to us.
286    */
287   uint32_t bm;
288
289   /**
290    * Desired block type.
291    */
292   uint32_t type;
293
294   /**
295    * Priority of the request.
296    */
297   uint32_t priority;
298
299   /**
300    * Size of the 'bf' (in bytes).
301    */
302   size_t bf_size;
303
304   /**
305    * In what ways are we going to process
306    * the request?
307    */
308   enum RoutingPolicy policy;
309
310   /**
311    * Time-to-live for the request (value
312    * we use).
313    */
314   int32_t ttl;
315
316   /**
317    * Number to mingle hashes for bloom-filter
318    * tests with.
319    */
320   int32_t mingle;
321
322   /**
323    * Number of results that were found so far.
324    */
325   unsigned int results_found;
326 };
327
328
329 /**
330  * Information we keep for each pending reply.  The
331  * actual message follows at the end of this struct.
332  */
333 struct PendingMessage
334 {
335   /**
336    * This is a linked list.
337    */
338   struct PendingMessage *next;
339
340   /**
341    * Size of the reply; actual reply message follows
342    * at the end of this struct.
343    */
344   size_t msize;
345   
346   /**
347    * How important is this message for us?
348    */
349   uint32_t priority;
350
351 };
352
353
354 /**
355  * All requests from a client are kept in a doubly-linked list.
356  */
357 struct ClientRequestList;
358
359
360 /**
361  * Information we keep for each pending request.  We should try to
362  * keep this struct as small as possible since its memory consumption
363  * is key to how many requests we can have pending at once.
364  */
365 struct PendingRequest
366 {
367
368   /**
369    * ID of a client making a request, NULL if this entry is for a
370    * peer.
371    */
372   struct GNUNET_SERVER_Client *client;
373
374   /**
375    * If this request was made by a client,
376    * this is our entry in the client request
377    * list; otherwise NULL.
378    */
379   struct ClientRequestList *crl_entry;
380
381   /**
382    * If this is a namespace query, pointer to the hash of the public
383    * key of the namespace; otherwise NULL.
384    */
385   GNUNET_HashCode *namespace;
386
387   /**
388    * Bloomfilter we use to filter out replies that we don't care about
389    * (anymore).  NULL as long as we are interested in all replies.
390    */
391   struct GNUNET_CONTAINER_BloomFilter *bf;
392
393   /**
394    * Context of our GNUNET_CORE_peer_change_preference call.
395    */
396   struct GNUNET_CORE_InformationRequestContext *irc;
397
398   /**
399    * Handle for an active request for transmission to this peer, or
400    * NULL.  Only used for replies that we are trying to send to a peer
401    * that we are not yet connected to.
402    */
403   struct GNUNET_CORE_TransmitHandle *cth;
404
405   /**
406    * Replies that we have received but were unable to forward yet
407    * (typically non-null only if we have a pending transmission
408    * request with the client or the respective peer).
409    */
410   struct PendingMessage *replies_pending;
411
412   /**
413    * Pending transmission request for the target client (for processing of
414    * 'replies_pending').
415    */
416   struct GNUNET_CONNECTION_TransmitHandle *th;
417
418   /**
419    * Hash code of all replies that we have seen so far (only valid
420    * if client is not NULL since we only track replies like this for
421    * our own clients).
422    */
423   GNUNET_HashCode *replies_seen;
424
425   /**
426    * Node in the heap representing this entry.
427    */
428   struct GNUNET_CONTAINER_HeapNode *hnode;
429
430   /**
431    * When did we first see this request (form this peer), or, if our
432    * client is initiating, when did we last initiate a search?
433    */
434   struct GNUNET_TIME_Absolute start_time;
435
436   /**
437    * The query that this request is for.
438    */
439   GNUNET_HashCode query;
440
441   /**
442    * The task responsible for transmitting queries
443    * for this request.
444    */
445   GNUNET_SCHEDULER_TaskIdentifier task;
446
447   /**
448    * (Interned) Peer identifier (only valid if "client" is NULL)
449    * that identifies a peer that gave us this request.
450    */
451   GNUNET_PEER_Id source_pid;
452
453   /**
454    * (Interned) Peer identifier that identifies a preferred target
455    * for requests.
456    */
457   GNUNET_PEER_Id target_pid;
458
459   /**
460    * (Interned) Peer identifiers of peers that have already
461    * received our query for this content.
462    */
463   GNUNET_PEER_Id *used_pids;
464
465   /**
466    * Size of the 'bf' (in bytes).
467    */
468   size_t bf_size;
469
470   /**
471    * Desired anonymity level; only valid for requests from a local client.
472    */
473   uint32_t anonymity_level;
474
475   /**
476    * How many entries in "used_pids" are actually valid?
477    */
478   unsigned int used_pids_off;
479
480   /**
481    * How long is the "used_pids" array?
482    */
483   unsigned int used_pids_size;
484
485   /**
486    * How many entries in "replies_seen" are actually valid?
487    */
488   unsigned int replies_seen_off;
489
490   /**
491    * How long is the "replies_seen" array?
492    */
493   unsigned int replies_seen_size;
494   
495   /**
496    * Priority with which this request was made.  If one of our clients
497    * made the request, then this is the current priority that we are
498    * using when initiating the request.  This value is used when
499    * we decide to reward other peers with trust for providing a reply.
500    */
501   uint32_t priority;
502
503   /**
504    * Priority points left for us to spend when forwarding this request
505    * to other peers.
506    */
507   uint32_t remaining_priority;
508
509   /**
510    * Number to mingle hashes for bloom-filter
511    * tests with.
512    */
513   int32_t mingle;
514
515   /**
516    * TTL with which we saw this request (or, if we initiated, TTL that
517    * we used for the request).
518    */
519   int32_t ttl;
520   
521   /**
522    * Type of the content that this request is for.
523    */
524   uint32_t type;
525
526 };
527
528
529 /**
530  * All requests from a client are kept in a doubly-linked list.
531  */
532 struct ClientRequestList
533 {
534   /**
535    * This is a doubly-linked list.
536    */
537   struct ClientRequestList *next;
538
539   /**
540    * This is a doubly-linked list.
541    */ 
542   struct ClientRequestList *prev;
543
544   /**
545    * A request from this client.
546    */
547   struct PendingRequest *req;
548
549   /**
550    * Client list with the head and tail of this DLL.
551    */
552   struct ClientList *cl;
553 };
554
555
556 /**
557  * Linked list of all clients that we are currently processing
558  * requests for.
559  */
560 struct ClientList
561 {
562
563   /**
564    * This is a linked list.
565    */
566   struct ClientList *next;
567
568   /**
569    * What client is this entry for?
570    */
571   struct GNUNET_SERVER_Client* client;
572
573   /**
574    * Head of the DLL of requests from this client.
575    */
576   struct ClientRequestList *head;
577
578   /**
579    * Tail of the DLL of requests from this client.
580    */
581   struct ClientRequestList *tail;
582
583 };
584
585
586 /**
587  * Closure for "process_reply" function.
588  */
589 struct ProcessReplyClosure
590 {
591   /**
592    * The data for the reply.
593    */
594   const void *data;
595
596   /**
597    * When the reply expires.
598    */
599   struct GNUNET_TIME_Absolute expiration;
600
601   /**
602    * Size of data.
603    */
604   size_t size;
605
606   /**
607    * Namespace that this reply belongs to
608    * (if it is of type SBLOCK).
609    */
610   GNUNET_HashCode namespace;
611
612   /**
613    * Type of the block.
614    */
615   uint32_t type;
616
617   /**
618    * How much was this reply worth to us?
619    */
620   uint32_t priority;
621 };
622
623
624 /**
625  * Information about a peer that we are connected to.
626  * We track data that is useful for determining which
627  * peers should receive our requests.
628  */
629 struct ConnectedPeer
630 {
631
632   /**
633    * List of the last clients for which this peer
634    * successfully answered a query. 
635    */
636   struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
637
638   /**
639    * List of the last PIDs for which
640    * this peer successfully answered a query;
641    * We use 0 to indicate no successful reply.
642    */
643   GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
644
645   /**
646    * Average delay between sending the peer a request and
647    * getting a reply (only calculated over the requests for
648    * which we actually got a reply).   Calculated
649    * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
650    */ 
651   struct GNUNET_TIME_Relative avg_delay;
652
653   /**
654    * Handle for an active request for transmission to this
655    * peer, or NULL.
656    */
657   struct GNUNET_CORE_TransmitHandle *cth;
658
659   /**
660    * Messages (replies, queries, content migration) we would like to
661    * send to this peer in the near future.  Sorted by priority.
662    */
663   struct PendingMessage *pending_messages;
664
665   /**
666    * Average priority of successful replies.  Calculated
667    * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
668    */
669   double avg_priority;
670
671   /**
672    * The peer's identity.
673    */
674   GNUNET_PEER_Id pid;  
675
676   /**
677    * Number of requests we have currently pending with this peer (that
678    * is, requests that were transmitted so recently that we would not
679    * retransmit them right now).
680    */
681   unsigned int pending_requests;
682
683   /**
684    * Which offset in "last_p2p_replies" will be updated next?
685    * (we go round-robin).
686    */
687   unsigned int last_p2p_replies_woff;
688
689   /**
690    * Which offset in "last_client_replies" will be updated next?
691    * (we go round-robin).
692    */
693   unsigned int last_client_replies_woff;
694
695 };
696
697
698 /**
699  * Our connection to the datastore.
700  */
701 static struct GNUNET_DATASTORE_Handle *dsh;
702
703 /**
704  * Our scheduler.
705  */
706 static struct GNUNET_SCHEDULER_Handle *sched;
707
708 /**
709  * Our configuration.
710  */
711 const struct GNUNET_CONFIGURATION_Handle *cfg;
712
713 /**
714  * Handle to the core service (NULL until we've connected to it).
715  */
716 struct GNUNET_CORE_Handle *core;
717
718 /**
719  * Head of doubly-linked LGC list.
720  */
721 static struct LocalGetContext *lgc_head;
722
723 /**
724  * Tail of doubly-linked LGC list.
725  */
726 static struct LocalGetContext *lgc_tail;
727
728 /**
729  * Head of request queue for the datastore, sorted by timeout.
730  */
731 static struct DatastoreRequestQueue *drq_head;
732
733 /**
734  * Tail of request queue for the datastore.
735  */
736 static struct DatastoreRequestQueue *drq_tail;
737
738 /**
739  * Map of query hash codes to requests.
740  */
741 static struct GNUNET_CONTAINER_MultiHashMap *requests_by_query;
742
743 /**
744  * Map of peer IDs to requests (for those requests coming
745  * from other peers).
746  */
747 static struct GNUNET_CONTAINER_MultiHashMap *requests_by_peer;
748
749 /**
750  * Linked list of all of our clients and their requests.
751  */
752 static struct ClientList *clients;
753
754 /**
755  * Heap with the request that will expire next at the top.  Contains
756  * pointers of type "struct PendingRequest*"; these will *also* be
757  * aliased from the "requests_by_peer" data structures and the
758  * "requests_by_query" table.  Note that requests from our clients
759  * don't expire and are thus NOT in the "requests_by_expiration"
760  * (or the "requests_by_peer" tables).
761  */
762 static struct GNUNET_CONTAINER_Heap *requests_by_expiration;
763
764 /**
765  * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
766  */
767 static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
768
769 /**
770  * Maximum number of requests (from other peers) that we're
771  * willing to have pending at any given point in time.
772  * FIXME: set from configuration (and 32 is a tiny value for testing only).
773  */
774 static uint64_t max_pending_requests = 32;
775
776
777
778 /**
779  * Run the next DS request in our
780  * queue, we're done with the current one.
781  */
782 static void
783 next_ds_request ()
784 {
785   struct DatastoreRequestQueue *e;
786   
787   while (NULL != (e = drq_head))
788     {
789       if (0 != GNUNET_TIME_absolute_get_remaining (e->timeout).value)
790         break;
791       if (e->task != GNUNET_SCHEDULER_NO_TASK)
792         GNUNET_SCHEDULER_cancel (sched, e->task);
793       GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
794       e->req (e->req_cls, GNUNET_NO);
795       GNUNET_free (e);  
796     }
797   if (e == NULL)
798     return;
799   if (e->task != GNUNET_SCHEDULER_NO_TASK)
800     GNUNET_SCHEDULER_cancel (sched, e->task);
801   e->task = GNUNET_SCHEDULER_NO_TASK;
802   e->req (e->req_cls, GNUNET_YES);
803   GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
804   GNUNET_free (e);  
805 }
806
807
808 /**
809  * A datastore request had to be timed out. 
810  *
811  * @param cls closure (of type "struct DatastoreRequestQueue*")
812  * @param tc task context, unused
813  */
814 static void
815 timeout_ds_request (void *cls,
816                     const struct GNUNET_SCHEDULER_TaskContext *tc)
817 {
818   struct DatastoreRequestQueue *e = cls;
819
820   e->task = GNUNET_SCHEDULER_NO_TASK;
821   GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
822   e->req (e->req_cls, GNUNET_NO);
823   GNUNET_free (e);  
824 }
825
826
827 /**
828  * Queue a request for the datastore.
829  *
830  * @param deadline by when the request should run
831  * @param fun function to call once the request can be run
832  * @param fun_cls closure for fun
833  */
834 static struct DatastoreRequestQueue *
835 queue_ds_request (struct GNUNET_TIME_Relative deadline,
836                   RequestFunction fun,
837                   void *fun_cls)
838 {
839   struct DatastoreRequestQueue *e;
840   struct DatastoreRequestQueue *bef;
841
842   if (drq_head == NULL)
843     {
844       /* no other requests pending, run immediately */
845       fun (fun_cls, GNUNET_OK);
846       return NULL;
847     }
848   e = GNUNET_malloc (sizeof (struct DatastoreRequestQueue));
849   e->timeout = GNUNET_TIME_relative_to_absolute (deadline);
850   e->req = fun;
851   e->req_cls = fun_cls;
852   if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
853     {
854       /* local request, highest prio, put at head of queue
855          regardless of deadline */
856       bef = NULL;
857     }
858   else
859     {
860       bef = drq_tail;
861       while ( (NULL != bef) &&
862               (e->timeout.value < bef->timeout.value) )
863         bef = bef->prev;
864     }
865   GNUNET_CONTAINER_DLL_insert_after (drq_head, drq_tail, bef, e);
866   if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
867     return e;
868   e->task = GNUNET_SCHEDULER_add_delayed (sched,
869                                           deadline,
870                                           &timeout_ds_request,
871                                           e);
872   return e;                                    
873 }
874
875
876 /**
877  * Free the state associated with a local get context.
878  *
879  * @param lgc the lgc to free
880  */
881 static void
882 local_get_context_free (struct LocalGetContext *lgc) 
883 {
884   GNUNET_CONTAINER_DLL_remove (lgc_head, lgc_tail, lgc);
885   GNUNET_SERVER_client_drop (lgc->client); 
886   GNUNET_free_non_null (lgc->results);
887   if (lgc->results_bf != NULL)
888     GNUNET_CONTAINER_bloomfilter_free (lgc->results_bf);
889   if (lgc->req != NULL)
890     {
891       if (lgc->req->task != GNUNET_SCHEDULER_NO_TASK)
892         GNUNET_SCHEDULER_cancel (sched, lgc->req->task);
893       GNUNET_CONTAINER_DLL_remove (lgc_head, lgc_tail, lgc);
894       GNUNET_free (lgc->req);
895     }
896   GNUNET_free (lgc);
897 }
898
899
900 /**
901  * We're able to transmit the next (local) result to the client.
902  * Do it and ask the datastore for more.  Or, on error, tell
903  * the datastore to stop giving us more.
904  *
905  * @param cls our closure (struct LocalGetContext)
906  * @param max maximum number of bytes we can transmit
907  * @param buf where to copy our message
908  * @return number of bytes copied to buf
909  */
910 static size_t
911 transmit_local_result (void *cls,
912                        size_t max,
913                        void *buf)
914 {
915   struct LocalGetContext *lgc = cls;  
916   uint16_t msize;
917
918   if (NULL == buf)
919     {
920 #if DEBUG_FS
921       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
922                   "Failed to transmit result to local client, aborting datastore iteration.\n");
923 #endif
924       /* error, abort! */
925       GNUNET_free (lgc->result);
926       lgc->result = NULL;
927       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
928       return 0;
929     }
930   msize = ntohs (lgc->result->header.size);
931 #if DEBUG_FS
932   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
933               "Transmitting %u bytes of result to local client.\n",
934               msize);
935 #endif
936   GNUNET_assert (max >= msize);
937   memcpy (buf, lgc->result, msize);
938   GNUNET_free (lgc->result);
939   lgc->result = NULL;
940   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
941   return msize;
942 }
943
944
945 /**
946  * Mingle hash with the mingle_number to produce different bits.
947  */
948 static void
949 mingle_hash (const GNUNET_HashCode * in,
950              int32_t mingle_number, 
951              GNUNET_HashCode * hc)
952 {
953   GNUNET_HashCode m;
954
955   GNUNET_CRYPTO_hash (&mingle_number, 
956                       sizeof (int32_t), 
957                       &m);
958   GNUNET_CRYPTO_hash_xor (&m, in, hc);
959 }
960
961
962 /**
963  * How many bytes should a bloomfilter be if we have already seen
964  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
965  * of bits set per entry.  Furthermore, we should not re-size the
966  * filter too often (to keep it cheap).
967  *
968  * Since other peers will also add entries but not resize the filter,
969  * we should generally pick a slightly larger size than what the
970  * strict math would suggest.
971  *
972  * @return must be a power of two and smaller or equal to 2^15.
973  */
974 static size_t
975 compute_bloomfilter_size (unsigned int entry_count)
976 {
977   size_t size;
978   unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
979   uint16_t max = 1 << 15;
980
981   if (entry_count > max)
982     return max;
983   size = 8;
984   while ((size < max) && (size < ideal))
985     size *= 2;
986   if (size > max)
987     return max;
988   return size;
989 }
990
991
992 /**
993  * Recalculate our bloom filter for filtering replies.
994  *
995  * @param count number of entries we are filtering right now
996  * @param mingle set to our new mingling value
997  * @param bf_size set to the size of the bloomfilter
998  * @param entries the entries to filter
999  * @return updated bloomfilter, NULL for none
1000  */
1001 static struct GNUNET_CONTAINER_BloomFilter *
1002 refresh_bloomfilter (unsigned int count,
1003                      int32_t * mingle,
1004                      size_t *bf_size,
1005                      const GNUNET_HashCode *entries)
1006 {
1007   struct GNUNET_CONTAINER_BloomFilter *bf;
1008   size_t nsize;
1009   unsigned int i;
1010   GNUNET_HashCode mhash;
1011
1012   if (0 == count)
1013     return NULL;
1014   nsize = compute_bloomfilter_size (count);
1015   *mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
1016   *bf_size = nsize;
1017   bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
1018                                           nsize,
1019                                           BLOOMFILTER_K);
1020   for (i=0;i<count;i++)
1021     {
1022       mingle_hash (&entries[i], *mingle, &mhash);
1023       GNUNET_CONTAINER_bloomfilter_add (bf, &mhash);
1024     }
1025   return bf;
1026 }
1027
1028
1029 /**
1030  * Closure used for "target_peer_select_cb".
1031  */
1032 struct PeerSelectionContext 
1033 {
1034   /**
1035    * The request for which we are selecting
1036    * peers.
1037    */
1038   struct PendingRequest *pr;
1039
1040   /**
1041    * Current "prime" target.
1042    */
1043   struct GNUNET_PeerIdentity target;
1044
1045   /**
1046    * How much do we like this target?
1047    */
1048   double target_score;
1049
1050 };
1051
1052
1053 /**
1054  * Function called for each connected peer to determine
1055  * which one(s) would make good targets for forwarding.
1056  *
1057  * @param cls closure (struct PeerSelectionContext)
1058  * @param key current key code (peer identity)
1059  * @param value value in the hash map (struct ConnectedPeer)
1060  * @return GNUNET_YES if we should continue to
1061  *         iterate,
1062  *         GNUNET_NO if not.
1063  */
1064 static int
1065 target_peer_select_cb (void *cls,
1066                        const GNUNET_HashCode * key,
1067                        void *value)
1068 {
1069   struct PeerSelectionContext *psc = cls;
1070   struct ConnectedPeer *cp = value;
1071   struct PendingRequest *pr = psc->pr;
1072   double score;
1073   unsigned int i;
1074
1075   /* 1) check if we have already (recently) forwarded to this peer */
1076   for (i=0;i<pr->used_pids_off;i++)
1077     if (pr->used_pids[i] == cp->pid)
1078       return GNUNET_YES; /* skip */
1079   // 2) calculate how much we'd like to forward to this peer
1080   score = 0; // FIXME!
1081   
1082   /* store best-fit in closure */
1083   if (score > psc->target_score)
1084     {
1085       psc->target_score = score;
1086       psc->target.hashPubKey = *key; 
1087     }
1088   return GNUNET_YES;
1089 }
1090
1091
1092 /**
1093  * We use a random delay to make the timing of requests
1094  * less predictable.  This function returns such a random
1095  * delay.
1096  *
1097  * @return random delay to use for some request, between 0 and TTL_DECREMENT ms
1098  */
1099 static struct GNUNET_TIME_Relative
1100 get_processing_delay ()
1101 {
1102   return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
1103                                         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1104                                                                   TTL_DECREMENT));
1105 }
1106
1107
1108 /**
1109  * Task that is run for each request with the goal of forwarding the
1110  * associated query to other peers.  The task should re-schedule
1111  * itself to be re-run once the TTL has expired.  (or at a later time
1112  * if more peers should be queried earlier).
1113  *
1114  * @param cls the requests "struct PendingRequest*"
1115  * @param tc task context (unused)
1116  */
1117 static void
1118 forward_request_task (void *cls,
1119                       const struct GNUNET_SCHEDULER_TaskContext *tc);
1120
1121
1122 /**
1123  * We've selected a peer for forwarding of a query.  Construct the
1124  * message and then re-schedule the task to forward again to (other)
1125  * peers.
1126  *
1127  * @param cls closure
1128  * @param size number of bytes available in buf
1129  * @param buf where the callee should write the message
1130  * @return number of bytes written to buf
1131  */
1132 static size_t
1133 transmit_request_cb (void *cls,
1134                      size_t size, 
1135                      void *buf)
1136 {
1137   struct ConnectedPeer *cp = cls;
1138   char *cbuf = buf;
1139   struct GNUNET_PeerIdentity target;
1140   struct PendingMessage *pr;
1141   size_t tot;
1142
1143   cp->cth = NULL;
1144   tot = 0;
1145   while ( (NULL != (pr = cp->pending_messages)) &&
1146           (pr->msize < size - tot) )
1147     {
1148       memcpy (&cbuf[tot],
1149               &pr[1],
1150               pr->msize);
1151       tot += pr->msize;
1152       cp->pending_messages = pr->next;
1153       GNUNET_free (pr);
1154     }
1155   if (NULL != pr)
1156     {
1157       GNUNET_PEER_resolve (cp->pid,
1158                            &target);    
1159       cp->cth = GNUNET_CORE_notify_transmit_ready (core,
1160                                                    pr->priority,
1161                                                    GNUNET_TIME_UNIT_FOREVER_REL,
1162                                                    &target,
1163                                                    pr->msize,
1164                                                    &transmit_request_cb,
1165                                                    cp);
1166     }
1167   return tot;
1168 }
1169
1170
1171 /**
1172  * Function called after we've tried to reserve a certain amount of
1173  * bandwidth for a reply.  Check if we succeeded and if so send our
1174  * query.
1175  *
1176  * @param cls the requests "struct PendingRequest*"
1177  * @param peer identifies the peer
1178  * @param bpm_in set to the current bandwidth limit (receiving) for this peer
1179  * @param bpm_out set to the current bandwidth limit (sending) for this peer
1180  * @param amount set to the amount that was actually reserved or unreserved
1181  * @param preference current traffic preference for the given peer
1182  */
1183 static void
1184 target_reservation_cb (void *cls,
1185                        const struct
1186                        GNUNET_PeerIdentity * peer,
1187                        unsigned int bpm_in,
1188                        unsigned int bpm_out,
1189                        int amount,
1190                        uint64_t preference)
1191 {
1192   struct PendingRequest *pr = cls;
1193   struct ConnectedPeer *cp;
1194   struct PendingMessage *pm;
1195   struct PendingMessage *pos;
1196   struct PendingMessage *prev;
1197   struct GetMessage *gm;
1198   GNUNET_HashCode *ext;
1199   char *bfdata;
1200   size_t msize;
1201   unsigned int k;
1202
1203   pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1204                                            get_processing_delay (), // FIXME: longer?
1205                                            &forward_request_task,
1206                                            pr);
1207   pr->irc = NULL;
1208   GNUNET_assert (peer != NULL);
1209   if (amount != DBLOCK_SIZE) 
1210     {
1211       /* FIXME: call stats... */
1212       return; /* this target round failed */    
1213     }
1214   // (2) transmit, update ttl/priority
1215   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1216                                           &peer->hashPubKey);
1217   if (cp == NULL)
1218     {
1219       /* Peer must have just left; try again immediately */
1220       pr->task = GNUNET_SCHEDULER_add_now (sched,
1221                                            &forward_request_task,
1222                                            pr);
1223       return;
1224     }
1225   /* build message and insert message into priority queue */
1226   k = 0; // FIXME: count hash codes!
1227   msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
1228   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1229   pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
1230   pm->msize = msize;
1231   pm->priority = 0; // FIXME: calculate priority properly!
1232   gm = (struct GetMessage*) &pm[1];
1233   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
1234   gm->header.size = htons (msize);
1235   gm->type = htonl (pr->type);
1236   pr->remaining_priority /= 2;
1237   gm->priority = htonl (pr->remaining_priority);
1238   gm->ttl = htonl (pr->ttl);
1239   gm->filter_mutator = htonl(pr->mingle);
1240   gm->hash_bitmap = htonl (42);
1241   gm->query = pr->query;
1242   ext = (GNUNET_HashCode*) &gm[1];
1243
1244   // FIXME: setup "ext[0]..[k-1]"
1245   bfdata = (char *) &ext[k];
1246   if (pr->bf != NULL)
1247     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
1248                                                bfdata,
1249                                                pr->bf_size);
1250
1251
1252   prev = NULL;
1253   pos = cp->pending_messages;
1254   while ( (pos != NULL) &&
1255           (pm->priority < pos->priority) )
1256     {
1257       prev = pos;
1258       pos = pos->next;
1259     }
1260   if (prev == NULL)
1261     cp->pending_messages = pm;
1262   else
1263     prev->next = pm;
1264   pm->next = pos;
1265   if (cp->cth == NULL)
1266     cp->cth = GNUNET_CORE_notify_transmit_ready (core,
1267                                                  cp->pending_messages->priority,
1268                                                  GNUNET_TIME_UNIT_FOREVER_REL,
1269                                                  peer,
1270                                                  msize,
1271                                                  &transmit_request_cb,
1272                                                  cp);
1273   if (cp->cth == NULL)
1274     {
1275       /* technically, this should not be a 'break'; but
1276          we don't handle this (rare) case yet, so let's warn
1277          about it... */
1278       GNUNET_break (0);
1279       // FIXME: now what?
1280     }
1281 }
1282
1283
1284 /**
1285  * Task that is run for each request with the goal of forwarding the
1286  * associated query to other peers.  The task should re-schedule
1287  * itself to be re-run once the TTL has expired.  (or at a later time
1288  * if more peers should be queried earlier).
1289  *
1290  * @param cls the requests "struct PendingRequest*"
1291  * @param tc task context (unused)
1292  */
1293 static void
1294 forward_request_task (void *cls,
1295                       const struct GNUNET_SCHEDULER_TaskContext *tc)
1296 {
1297   struct PendingRequest *pr = cls;
1298   struct PeerSelectionContext psc;
1299
1300   pr->task = GNUNET_SCHEDULER_NO_TASK;
1301   /* (1) select target */
1302   psc.pr = pr;
1303   psc.target_score = DBL_MIN;
1304   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1305                                          &target_peer_select_cb,
1306                                          &psc);
1307   if (psc.target_score == DBL_MIN)
1308     {
1309       /* no possible target found, wait some time */
1310       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1311                                                get_processing_delay (), // FIXME: exponential back-off? or at least wait longer...
1312                                                &forward_request_task,
1313                                                pr);
1314       return;
1315     }
1316   /* (2) reserve reply bandwidth */
1317   GNUNET_assert (NULL == pr->irc);
1318   pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
1319                                                 &psc.target,
1320                                                 GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
1321                                                 -1,
1322                                                 DBLOCK_SIZE, // FIXME: make dependent on type?
1323                                                 0,
1324                                                 &target_reservation_cb,
1325                                                 pr);
1326 }
1327
1328
1329 /**
1330  * We're processing (local) results for a search request
1331  * from a (local) client.  Pass applicable results to the
1332  * client and if we are done either clean up (operation
1333  * complete) or switch to P2P search (more results possible).
1334  *
1335  * @param cls our closure (struct LocalGetContext)
1336  * @param key key for the content
1337  * @param size number of bytes in data
1338  * @param data content stored
1339  * @param type type of the content
1340  * @param priority priority of the content
1341  * @param anonymity anonymity-level for the content
1342  * @param expiration expiration time for the content
1343  * @param uid unique identifier for the datum;
1344  *        maybe 0 if no unique identifier is available
1345  */
1346 static void
1347 process_local_get_result (void *cls,
1348                           const GNUNET_HashCode * key,
1349                           uint32_t size,
1350                           const void *data,
1351                           uint32_t type,
1352                           uint32_t priority,
1353                           uint32_t anonymity,
1354                           struct GNUNET_TIME_Absolute
1355                           expiration, 
1356                           uint64_t uid)
1357 {
1358   struct LocalGetContext *lgc = cls;
1359   struct PendingRequest *pr;
1360   struct ClientRequestList *crl;
1361   struct ClientList *cl;
1362   size_t msize;
1363   unsigned int i;
1364
1365   if (key == NULL)
1366     {
1367 #if DEBUG_FS
1368       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1369                   "Received last result for `%s' from local datastore, deciding what to do next.\n",
1370                   GNUNET_h2s (&lgc->query));
1371 #endif
1372       /* no further results from datastore; continue
1373          processing further requests from the client and
1374          allow the next task to use the datastore; also,
1375          switch to P2P requests or clean up our state. */
1376       next_ds_request ();
1377       GNUNET_SERVER_receive_done (lgc->client,
1378                                   GNUNET_OK);
1379       if ( (lgc->results_used == 0) ||
1380            (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
1381            (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
1382            (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) )
1383         {
1384 #if DEBUG_FS
1385           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1386                       "Forwarding query for `%s' to network.\n",
1387                       GNUNET_h2s (&lgc->query));
1388 #endif
1389           cl = clients;
1390           while ( (NULL != cl) &&
1391                   (cl->client != lgc->client) )
1392             cl = cl->next;
1393           if (cl == NULL)
1394             {
1395               cl = GNUNET_malloc (sizeof (struct ClientList));
1396               cl->client = lgc->client;
1397               cl->next = clients;
1398               clients = cl;
1399             }
1400           crl = GNUNET_malloc (sizeof (struct ClientRequestList));
1401           crl->cl = cl;
1402           GNUNET_CONTAINER_DLL_insert (cl->head, cl->tail, crl);
1403           pr = GNUNET_malloc (sizeof (struct PendingRequest));
1404           pr->client = lgc->client;
1405           GNUNET_SERVER_client_keep (pr->client);
1406           pr->crl_entry = crl;
1407           crl->req = pr;
1408           if (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)
1409             {
1410               pr->namespace = GNUNET_malloc (sizeof (GNUNET_HashCode));
1411               *pr->namespace = lgc->namespace;
1412             }
1413           pr->replies_seen = lgc->results;
1414           lgc->results = NULL;
1415           pr->start_time = GNUNET_TIME_absolute_get ();
1416           pr->query = lgc->query;
1417           pr->target_pid = GNUNET_PEER_intern (&lgc->target);
1418           pr->replies_seen_off = lgc->results_used;
1419           pr->replies_seen_size = lgc->results_size;
1420           lgc->results_size = 0;
1421           pr->type = lgc->type;
1422           pr->anonymity_level = lgc->anonymity_level;
1423           pr->bf = refresh_bloomfilter (pr->replies_seen_off,
1424                                         &pr->mingle,
1425                                         &pr->bf_size,
1426                                         pr->replies_seen);
1427           GNUNET_CONTAINER_multihashmap_put (requests_by_query,
1428                                              &pr->query,
1429                                              pr,
1430                                              GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1431           pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1432                                                    get_processing_delay (),
1433                                                    &forward_request_task,
1434                                                    pr);
1435           local_get_context_free (lgc);
1436           return;
1437         }
1438       /* got all possible results, clean up! */
1439 #if DEBUG_FS
1440       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1441                   "Found all possible results for query for `%s', done!\n",
1442                   GNUNET_h2s (&lgc->query));
1443 #endif
1444       local_get_context_free (lgc);
1445       return;
1446     }
1447   if (type == GNUNET_DATASTORE_BLOCKTYPE_ONDEMAND)
1448     {
1449 #if DEBUG_FS
1450       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1451                   "Received on-demand block for `%s' from local datastore, fetching data.\n",
1452                   GNUNET_h2s (&lgc->query));
1453 #endif
1454       GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, 
1455                                         anonymity, expiration, uid,
1456                                         dsh,
1457                                         &process_local_get_result,
1458                                         lgc);
1459       return;
1460     }
1461   if ( (type != lgc->type) &&
1462        (lgc->type != GNUNET_DATASTORE_BLOCKTYPE_ANY) )
1463     {
1464       /* this should be virtually impossible to reach (DBLOCK 
1465          query hash being identical to KBLOCK/SBLOCK query hash);
1466          nevertheless, if it happens, the correct thing is to
1467          simply skip the result. */
1468 #if DEBUG_FS
1469       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1470                   "Received block of unexpected type (%u, want %u) for `%s' from local datastore, ignoring.\n",
1471                   type,
1472                   lgc->type,
1473                   GNUNET_h2s (&lgc->query));
1474 #endif
1475       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);        
1476       return;
1477     }
1478   /* check if this is a result we've alredy
1479      received */
1480   for (i=0;i<lgc->results_used;i++)
1481     if (0 == memcmp (key,
1482                      &lgc->results[i],
1483                      sizeof (GNUNET_HashCode)))
1484       {
1485 #if DEBUG_FS
1486         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1487                     "Received duplicate result for `%s' from local datastore, ignoring.\n",
1488                     GNUNET_h2s (&lgc->query));
1489 #endif
1490         GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1491         return; 
1492       }
1493   if (lgc->results_used == lgc->results_size)
1494     GNUNET_array_grow (lgc->results,
1495                        lgc->results_size,
1496                        lgc->results_size * 2 + 2);
1497   GNUNET_CRYPTO_hash (data, 
1498                       size, 
1499                       &lgc->results[lgc->results_used++]);    
1500   msize = size + sizeof (struct ContentMessage);
1501   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1502   lgc->result = GNUNET_malloc (msize);
1503   lgc->result->header.size = htons (msize);
1504   lgc->result->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT);
1505   lgc->result->type = htonl (type);
1506   lgc->result->expiration = GNUNET_TIME_absolute_hton (expiration);
1507   memcpy (&lgc->result[1],
1508           data,
1509           size);
1510 #if DEBUG_FS
1511   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1512               "Received new result for `%s' from local datastore, passing to client.\n",
1513               GNUNET_h2s (&lgc->query));
1514 #endif
1515   GNUNET_SERVER_notify_transmit_ready (lgc->client,
1516                                        msize,
1517                                        GNUNET_TIME_UNIT_FOREVER_REL,
1518                                        &transmit_local_result,
1519                                        lgc);
1520 }
1521
1522
1523 /**
1524  * We're processing a search request from a local
1525  * client.  Now it is our turn to query the datastore.
1526  * 
1527  * @param cls our closure (struct LocalGetContext)
1528  * @param tc unused
1529  */
1530 static void
1531 transmit_local_get (void *cls,
1532                     const struct GNUNET_SCHEDULER_TaskContext *tc)
1533 {
1534   struct LocalGetContext *lgc = cls;
1535   uint32_t type;
1536   
1537   type = lgc->type;
1538   if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK)
1539     type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */
1540   GNUNET_DATASTORE_get (dsh,
1541                         &lgc->query,
1542                         type,
1543                         &process_local_get_result,
1544                         lgc,
1545                         GNUNET_TIME_UNIT_FOREVER_REL);
1546 }
1547
1548
1549 /**
1550  * We're processing a search request from a local
1551  * client.  Now it is our turn to query the datastore.
1552  * 
1553  * @param cls our closure (struct LocalGetContext)
1554  * @param ok did we succeed to queue for datastore access, should always be GNUNET_OK
1555  */
1556 static void 
1557 transmit_local_get_ready (void *cls,
1558                           int ok)
1559 {
1560   struct LocalGetContext *lgc = cls;
1561
1562   GNUNET_assert (GNUNET_OK == ok);
1563   GNUNET_SCHEDULER_add_continuation (sched,
1564                                      &transmit_local_get,
1565                                      lgc,
1566                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
1567 }
1568
1569
1570 /**
1571  * Handle START_SEARCH-message (search request from client).
1572  *
1573  * @param cls closure
1574  * @param client identification of the client
1575  * @param message the actual message
1576  */
1577 static void
1578 handle_start_search (void *cls,
1579                      struct GNUNET_SERVER_Client *client,
1580                      const struct GNUNET_MessageHeader *message)
1581 {
1582   const struct SearchMessage *sm;
1583   struct LocalGetContext *lgc;
1584   uint16_t msize;
1585   unsigned int sc;
1586   
1587   msize = ntohs (message->size);
1588   if ( (msize < sizeof (struct SearchMessage)) ||
1589        (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
1590     {
1591       GNUNET_break (0);
1592       GNUNET_SERVER_receive_done (client,
1593                                   GNUNET_SYSERR);
1594       return;
1595     }
1596   sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
1597   sm = (const struct SearchMessage*) message;
1598   GNUNET_SERVER_client_keep (client);
1599   lgc = GNUNET_malloc (sizeof (struct LocalGetContext));
1600   if  (sc > 0)
1601     {
1602       lgc->results_used = sc;
1603       GNUNET_array_grow (lgc->results,
1604                          lgc->results_size,
1605                          sc * 2);
1606       memcpy (lgc->results,
1607               &sm[1],
1608               sc * sizeof (GNUNET_HashCode));
1609     }
1610   lgc->client = client;
1611   lgc->type = ntohl (sm->type);
1612   lgc->anonymity_level = ntohl (sm->anonymity_level);
1613   switch (lgc->type)
1614     {
1615     case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
1616     case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
1617       lgc->target.hashPubKey = sm->target;
1618       break;
1619     case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
1620       lgc->namespace = sm->target;
1621       break;
1622     default:
1623       break;
1624     }
1625   lgc->query = sm->query;
1626   GNUNET_CONTAINER_DLL_insert (lgc_head, lgc_tail, lgc);
1627   lgc->req = queue_ds_request (GNUNET_TIME_UNIT_FOREVER_REL,
1628                                &transmit_local_get_ready,
1629                                lgc);
1630 }
1631
1632
1633 /**
1634  * List of handlers for the messages understood by this
1635  * service.
1636  */
1637 static struct GNUNET_SERVER_MessageHandler handlers[] = {
1638   {&GNUNET_FS_handle_index_start, NULL, 
1639    GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
1640   {&GNUNET_FS_handle_index_list_get, NULL, 
1641    GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
1642   {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
1643    sizeof (struct UnindexMessage) },
1644   {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
1645    0 },
1646   {NULL, NULL, 0, 0}
1647 };
1648
1649
1650 /**
1651  * Clean up the memory used by the PendingRequest structure (except
1652  * for the client or peer list that the request may be part of).
1653  *
1654  * @param pr request to clean up
1655  */
1656 static void
1657 destroy_pending_request (struct PendingRequest *pr)
1658 {
1659   struct PendingMessage *reply;
1660   struct ClientList *cl;
1661
1662   GNUNET_CONTAINER_multihashmap_remove (requests_by_query,
1663                                         &pr->query,
1664                                         pr);
1665   // FIXME: not sure how this can work (efficiently)
1666   // also, what does the return value mean?
1667   if (pr->irc != NULL)
1668     {
1669       GNUNET_CORE_peer_change_preference_cancel (pr->irc);
1670       pr->irc = NULL;
1671     }
1672   if (pr->client == NULL)
1673     {
1674       GNUNET_CONTAINER_heap_remove_node (requests_by_expiration,
1675                                          pr->hnode);
1676     }
1677   else
1678     {
1679       cl = pr->crl_entry->cl;
1680       GNUNET_CONTAINER_DLL_remove (cl->head,
1681                                    cl->tail,
1682                                    pr->crl_entry);
1683     }
1684   if (GNUNET_SCHEDULER_NO_TASK != pr->task)
1685     GNUNET_SCHEDULER_cancel (sched, pr->task);
1686   if (NULL != pr->bf)
1687     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
1688   if (NULL != pr->th)
1689     GNUNET_CONNECTION_notify_transmit_ready_cancel (pr->th);
1690   while (NULL != (reply = pr->replies_pending))
1691     {
1692       pr->replies_pending = reply->next;
1693       GNUNET_free (reply);
1694     }
1695   if (NULL != pr->cth)
1696     GNUNET_CORE_notify_transmit_ready_cancel (pr->cth);
1697   GNUNET_PEER_change_rc (pr->source_pid, -1);
1698   GNUNET_PEER_change_rc (pr->target_pid, -1);
1699   GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
1700   GNUNET_free_non_null (pr->used_pids);
1701   GNUNET_free_non_null (pr->replies_seen);
1702   GNUNET_free_non_null (pr->namespace);
1703   GNUNET_free (pr);
1704 }
1705
1706
1707 /**
1708  * A client disconnected.  Remove all of its pending queries.
1709  *
1710  * @param cls closure, NULL
1711  * @param client identification of the client
1712  */
1713 static void
1714 handle_client_disconnect (void *cls,
1715                           struct GNUNET_SERVER_Client
1716                           * client)
1717 {
1718   struct LocalGetContext *lgc;
1719   struct ClientList *cpos;
1720   struct ClientList *cprev;
1721   struct ClientRequestList *rl;
1722
1723   if (client == NULL)
1724     return;
1725   lgc = lgc_head;
1726   while ( (NULL != lgc) &&
1727           (lgc->client != client) )
1728     lgc = lgc->next;
1729   if (lgc != NULL)
1730     local_get_context_free (lgc);
1731   cprev = NULL;
1732   cpos = clients;
1733   while ( (NULL != cpos) &&
1734           (clients->client != client) )
1735     {
1736       cprev = cpos;
1737       cpos = cpos->next;
1738     }
1739   if (cpos != NULL)
1740     {
1741       if (cprev == NULL)
1742         clients = cpos->next;
1743       else
1744         cprev->next = cpos->next;
1745       while (NULL != (rl = cpos->head))
1746         {
1747           cpos->head = rl->next;
1748           destroy_pending_request (rl->req);
1749           GNUNET_free (rl);
1750         }
1751       GNUNET_free (cpos);
1752     }
1753 }
1754
1755
1756 /**
1757  * Iterator over entries in the "requests_by_query" map
1758  * that frees all the entries.
1759  *
1760  * @param cls closure, NULL
1761  * @param key current key code (the query, unused) 
1762  * @param value value in the hash map, of type "struct PendingRequest*"
1763  * @return GNUNET_YES (we should continue to  iterate)
1764  */
1765 static int 
1766 destroy_pending_request_cb (void *cls,
1767                             const GNUNET_HashCode * key,
1768                             void *value)
1769 {
1770   struct PendingRequest *pr = value;
1771
1772   destroy_pending_request (pr);
1773   return GNUNET_YES;
1774 }
1775
1776
1777 /**
1778  * Task run during shutdown.
1779  *
1780  * @param cls unused
1781  * @param tc unused
1782  */
1783 static void
1784 shutdown_task (void *cls,
1785                const struct GNUNET_SCHEDULER_TaskContext *tc)
1786 {
1787   if (NULL != core)
1788     {
1789       GNUNET_CORE_disconnect (core);
1790       core = NULL;
1791     }
1792   if (NULL != dsh)
1793     {
1794       GNUNET_DATASTORE_disconnect (dsh,
1795                                    GNUNET_NO);
1796       dsh = NULL;
1797     }
1798   GNUNET_CONTAINER_multihashmap_iterate (requests_by_query,
1799                                          &destroy_pending_request_cb,
1800                                          NULL);
1801   while (clients != NULL)
1802     handle_client_disconnect (NULL,
1803                               clients->client);
1804   GNUNET_CONTAINER_multihashmap_destroy (requests_by_query);
1805   requests_by_query = NULL;
1806   GNUNET_CONTAINER_multihashmap_destroy (requests_by_peer);
1807   requests_by_peer = NULL;
1808   GNUNET_CONTAINER_heap_destroy (requests_by_expiration);
1809   requests_by_expiration = NULL;
1810   // FIXME: iterate over entries and free individually?
1811   // (or do we get disconnect notifications?)
1812   GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
1813   connected_peers = NULL;
1814 }
1815
1816
1817 /**
1818  * Free (each) request made by the peer.
1819  *
1820  * @param cls closure, points to peer that the request belongs to
1821  * @param key current key code
1822  * @param value value in the hash map
1823  * @return GNUNET_YES (we should continue to iterate)
1824  */
1825 static int
1826 destroy_request (void *cls,
1827                  const GNUNET_HashCode * key,
1828                  void *value)
1829 {
1830   const struct GNUNET_PeerIdentity * peer = cls;
1831   struct PendingRequest *pr = value;
1832   
1833   GNUNET_CONTAINER_multihashmap_remove (requests_by_peer,
1834                                         &peer->hashPubKey,
1835                                         pr);
1836   destroy_pending_request (pr);
1837   return GNUNET_YES;
1838 }
1839
1840
1841
1842 /**
1843  * Method called whenever a given peer connects.
1844  *
1845  * @param cls closure, not used
1846  * @param peer peer identity this notification is about
1847  * @param latency reported latency of the connection with 'other'
1848  * @param distance reported distance (DV) to 'other' 
1849  */
1850 static void 
1851 peer_connect_handler (void *cls,
1852                       const struct
1853                       GNUNET_PeerIdentity * peer,
1854                       struct GNUNET_TIME_Relative latency,
1855                       uint32_t distance)
1856 {
1857   struct ConnectedPeer *cp;
1858
1859   cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
1860   cp->pid = GNUNET_PEER_intern (peer);
1861   GNUNET_CONTAINER_multihashmap_put (connected_peers,
1862                                      &peer->hashPubKey,
1863                                      cp,
1864                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
1865 }
1866
1867
1868 /**
1869  * Method called whenever a peer disconnects.
1870  *
1871  * @param cls closure, not used
1872  * @param peer peer identity this notification is about
1873  */
1874 static void
1875 peer_disconnect_handler (void *cls,
1876                          const struct
1877                          GNUNET_PeerIdentity * peer)
1878 {
1879   struct ConnectedPeer *cp;
1880   struct PendingMessage *pm;
1881
1882   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1883                                           &peer->hashPubKey);
1884   if (cp != NULL)
1885     {
1886       GNUNET_PEER_change_rc (cp->pid, -1);
1887       GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1888       if (NULL != cp->cth)
1889         GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1890       while (NULL != (pm = cp->pending_messages))
1891         {
1892           cp->pending_messages = pm->next;
1893           GNUNET_free (pm);
1894         }
1895       GNUNET_free (cp);
1896     }
1897   GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_peer,
1898                                               &peer->hashPubKey,
1899                                               &destroy_request,
1900                                               (void*) peer);
1901 }
1902
1903
1904 /**
1905  * We're processing a GET request from another peer and have decided
1906  * to forward it to other peers.
1907  *
1908  * @param cls our "struct ProcessGetContext *"
1909  * @param tc unused
1910  */
1911 static void
1912 forward_get_request (void *cls,
1913                      const struct GNUNET_SCHEDULER_TaskContext *tc)
1914 {
1915   struct ProcessGetContext *pgc = cls;
1916   struct PendingRequest *pr;
1917   struct PendingRequest *eer;
1918   struct GNUNET_PeerIdentity target;
1919
1920   pr = GNUNET_malloc (sizeof (struct PendingRequest));
1921   if (GET_MESSAGE_BIT_SKS_NAMESPACE == (GET_MESSAGE_BIT_SKS_NAMESPACE & pgc->bm))
1922     {
1923       pr->namespace = GNUNET_malloc (sizeof(GNUNET_HashCode));
1924       *pr->namespace = pgc->namespace;
1925     }
1926   pr->bf = pgc->bf;
1927   pr->bf_size = pgc->bf_size;
1928   pgc->bf = NULL;
1929   pr->start_time = pgc->start_time;
1930   pr->query = pgc->query;
1931   pr->source_pid = GNUNET_PEER_intern (&pgc->reply_to);
1932   if (GET_MESSAGE_BIT_TRANSMIT_TO == (GET_MESSAGE_BIT_TRANSMIT_TO & pgc->bm))
1933     pr->target_pid = GNUNET_PEER_intern (&pgc->prime_target);
1934   pr->anonymity_level = 1; /* default */
1935   pr->priority = pgc->priority;
1936   pr->remaining_priority = pr->priority;
1937   pr->mingle = pgc->mingle;
1938   pr->ttl = pgc->ttl; 
1939   pr->type = pgc->type;
1940   GNUNET_CONTAINER_multihashmap_put (requests_by_query,
1941                                      &pr->query,
1942                                      pr,
1943                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1944   GNUNET_CONTAINER_multihashmap_put (requests_by_peer,
1945                                      &pgc->reply_to.hashPubKey,
1946                                      pr,
1947                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1948   pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration,
1949                                             pr,
1950                                             pr->start_time.value + pr->ttl);
1951   if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) > max_pending_requests)
1952     {
1953       /* expire oldest request! */
1954       eer = GNUNET_CONTAINER_heap_peek (requests_by_expiration);
1955       GNUNET_PEER_resolve (eer->source_pid,
1956                            &target);    
1957       GNUNET_CONTAINER_multihashmap_remove (requests_by_peer,
1958                                             &target.hashPubKey,
1959                                             eer);
1960       destroy_pending_request (eer);     
1961     }
1962   pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1963                                            get_processing_delay (),
1964                                            &forward_request_task,
1965                                            pr);
1966   GNUNET_free (pgc); 
1967 }
1968 /**
1969  * Transmit the given message by copying it to
1970  * the target buffer "buf".  "buf" will be
1971  * NULL and "size" zero if the socket was closed for
1972  * writing in the meantime.  In that case, only
1973
1974  * free the message
1975  *
1976  * @param cls closure, pointer to the message
1977  * @param size number of bytes available in buf
1978  * @param buf where the callee should write the message
1979  * @return number of bytes written to buf
1980  */
1981 static size_t
1982 transmit_message (void *cls,
1983                   size_t size, void *buf)
1984 {
1985   struct GNUNET_MessageHeader *msg = cls;
1986   uint16_t msize;
1987   
1988   if (NULL == buf)
1989     {
1990 #if DEBUG_FS
1991       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1992                   "Dropping reply, core too busy.\n");
1993 #endif
1994       GNUNET_free (msg);
1995       return 0;
1996     }
1997   msize = ntohs (msg->size);
1998   GNUNET_assert (size >= msize);
1999   memcpy (buf, msg, msize);
2000   GNUNET_free (msg);
2001   return msize;
2002 }
2003
2004
2005 /**
2006  * Test if the load on this peer is too high
2007  * to even consider processing the query at
2008  * all.
2009  * 
2010  * @return GNUNET_YES if the load is too high, GNUNET_NO otherwise
2011  */
2012 static int
2013 test_load_too_high ()
2014 {
2015   return GNUNET_NO; // FIXME
2016 }
2017
2018
2019 /**
2020  * We're processing (local) results for a search request
2021  * from another peer.  Pass applicable results to the
2022  * peer and if we are done either clean up (operation
2023  * complete) or forward to other peers (more results possible).
2024  *
2025  * @param cls our closure (struct LocalGetContext)
2026  * @param key key for the content
2027  * @param size number of bytes in data
2028  * @param data content stored
2029  * @param type type of the content
2030  * @param priority priority of the content
2031  * @param anonymity anonymity-level for the content
2032  * @param expiration expiration time for the content
2033  * @param uid unique identifier for the datum;
2034  *        maybe 0 if no unique identifier is available
2035  */
2036 static void
2037 process_p2p_get_result (void *cls,
2038                         const GNUNET_HashCode * key,
2039                         uint32_t size,
2040                         const void *data,
2041                         uint32_t type,
2042                         uint32_t priority,
2043                         uint32_t anonymity,
2044                         struct GNUNET_TIME_Absolute
2045                         expiration, 
2046                         uint64_t uid)
2047 {
2048   struct ProcessGetContext *pgc = cls;
2049   GNUNET_HashCode dhash;
2050   GNUNET_HashCode mhash;
2051   struct PutMessage *reply;
2052   
2053   if (NULL == key)
2054     {
2055       /* no more results */
2056       if ( ( (pgc->policy & ROUTING_POLICY_FORWARD) ==  ROUTING_POLICY_FORWARD) &&
2057            ( (0 == pgc->results_found) ||
2058              (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
2059              (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
2060              (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) ) )
2061         {
2062           GNUNET_SCHEDULER_add_continuation (sched,
2063                                              &forward_get_request,
2064                                              pgc,
2065                                              GNUNET_SCHEDULER_REASON_PREREQ_DONE);
2066         }
2067       else
2068         {
2069           if (pgc->bf != NULL)
2070             GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2071           GNUNET_free (pgc); 
2072         }
2073       next_ds_request ();
2074       return;
2075     }
2076   if (type == GNUNET_DATASTORE_BLOCKTYPE_ONDEMAND)
2077     {
2078       GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, 
2079                                         anonymity, expiration, uid, dsh,
2080                                         &process_p2p_get_result,
2081                                         pgc);
2082       return;
2083     }
2084   /* check for duplicates */
2085   GNUNET_CRYPTO_hash (data, size, &dhash);
2086   mingle_hash (&dhash, 
2087                pgc->mingle,
2088                &mhash);
2089   if ( (pgc->bf != NULL) &&
2090        (GNUNET_YES ==
2091         GNUNET_CONTAINER_bloomfilter_test (pgc->bf,
2092                                            &mhash)) )
2093     {      
2094 #if DEBUG_FS
2095       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2096                   "Result from datastore filtered by bloomfilter.\n");
2097 #endif
2098       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2099       return;
2100     }
2101   pgc->results_found++;
2102   if ( (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
2103        (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
2104        (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) )
2105     {
2106       if (pgc->bf == NULL)
2107         {
2108           pgc->bf_size = 32;
2109           pgc->bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
2110                                                        pgc->bf_size, 
2111                                                        BLOOMFILTER_K);
2112         }
2113       GNUNET_CONTAINER_bloomfilter_add (pgc->bf, 
2114                                         &mhash);
2115     }
2116
2117   reply = GNUNET_malloc (sizeof (struct PutMessage) + size);
2118   reply->header.size = htons (sizeof (struct PutMessage) + size);
2119   reply->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2120   reply->type = htonl (type);
2121   reply->expiration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (expiration));
2122   memcpy (&reply[1], data, size);
2123   GNUNET_CORE_notify_transmit_ready (core,
2124                                      pgc->priority,
2125                                      ACCEPTABLE_REPLY_DELAY,
2126                                      &pgc->reply_to,
2127                                      sizeof (struct PutMessage) + size,
2128                                      &transmit_message,
2129                                      reply);
2130   if ( (GNUNET_YES == test_load_too_high()) ||
2131        (pgc->results_found > 5 + 2 * pgc->priority) )
2132     {
2133       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
2134       pgc->policy &= ~ ROUTING_POLICY_FORWARD;
2135       return;
2136     }
2137   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2138 }
2139   
2140
2141 /**
2142  * We're processing a GET request from another peer.  Give it to our
2143  * local datastore.
2144  *
2145  * @param cls our "struct ProcessGetContext"
2146  * @param ok did we get a datastore slice or not?
2147  */
2148 static void
2149 ds_get_request (void *cls, 
2150                 int ok)
2151 {
2152   struct ProcessGetContext *pgc = cls;
2153   uint32_t type;
2154   struct GNUNET_TIME_Relative timeout;
2155
2156   if (GNUNET_OK != ok)
2157     {
2158       /* no point in doing P2P stuff if we can't even do local */
2159       GNUNET_free (dsh);
2160       return;
2161     }
2162   type = pgc->type;
2163   if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK)
2164     type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */
2165   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
2166                                            (pgc->priority + 1));
2167   GNUNET_DATASTORE_get (dsh,
2168                         &pgc->query,
2169                         type,
2170                         &process_p2p_get_result,
2171                         pgc,
2172                         timeout);
2173 }
2174
2175
2176 /**
2177  * The priority level imposes a bound on the maximum
2178  * value for the ttl that can be requested.
2179  *
2180  * @param ttl_in requested ttl
2181  * @param prio given priority
2182  * @return ttl_in if ttl_in is below the limit,
2183  *         otherwise the ttl-limit for the given priority
2184  */
2185 static int32_t
2186 bound_ttl (int32_t ttl_in, uint32_t prio)
2187 {
2188   unsigned long long allowed;
2189
2190   if (ttl_in <= 0)
2191     return ttl_in;
2192   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
2193   if (ttl_in > allowed)      
2194     {
2195       if (allowed >= (1 << 30))
2196         return 1 << 30;
2197       return allowed;
2198     }
2199   return ttl_in;
2200 }
2201
2202
2203 /**
2204  * We've received a request with the specified
2205  * priority.  Bound it according to how much
2206  * we trust the given peer.
2207  * 
2208  * @param prio_in requested priority
2209  * @param peer the peer making the request
2210  * @return effective priority
2211  */
2212 static uint32_t
2213 bound_priority (uint32_t prio_in,
2214                 const struct GNUNET_PeerIdentity *peer)
2215 {
2216   return 0; // FIXME!
2217 }
2218
2219
2220 /**
2221  * Handle P2P "GET" request.
2222  *
2223  * @param cls closure, always NULL
2224  * @param other the other peer involved (sender or receiver, NULL
2225  *        for loopback messages where we are both sender and receiver)
2226  * @param message the actual message
2227  * @param latency reported latency of the connection with 'other'
2228  * @param distance reported distance (DV) to 'other' 
2229  * @return GNUNET_OK to keep the connection open,
2230  *         GNUNET_SYSERR to close it (signal serious error)
2231  */
2232 static int
2233 handle_p2p_get (void *cls,
2234                 const struct GNUNET_PeerIdentity *other,
2235                 const struct GNUNET_MessageHeader *message,
2236                                   struct GNUNET_TIME_Relative latency,
2237                                   uint32_t distance)
2238 {
2239   uint16_t msize;
2240   const struct GetMessage *gm;
2241   unsigned int bits;
2242   const GNUNET_HashCode *opt;
2243   struct ProcessGetContext *pgc;
2244   uint32_t bm;
2245   size_t bfsize;
2246   uint32_t ttl_decrement;
2247   double preference;
2248   int net_load_up;
2249   int net_load_down;
2250
2251   msize = ntohs(message->size);
2252   if (msize < sizeof (struct GetMessage))
2253     {
2254       GNUNET_break_op (0);
2255       return GNUNET_SYSERR;
2256     }
2257   gm = (const struct GetMessage*) message;
2258   bm = ntohl (gm->hash_bitmap);
2259   bits = 0;
2260   while (bm > 0)
2261     {
2262       if (1 == (bm & 1))
2263         bits++;
2264       bm >>= 1;
2265     }
2266   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
2267     {
2268       GNUNET_break_op (0);
2269       return GNUNET_SYSERR;
2270     }  
2271   opt = (const GNUNET_HashCode*) &gm[1];
2272   bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
2273   pgc = GNUNET_malloc (sizeof (struct ProcessGetContext));
2274   if (bfsize > 0)
2275     {
2276       pgc->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &pgc[1],
2277                                                    bfsize,
2278                                                    BLOOMFILTER_K);
2279       pgc->bf_size = bfsize;
2280     }
2281   pgc->type = ntohl (gm->type);
2282   pgc->bm = ntohl (gm->hash_bitmap);
2283   pgc->mingle = gm->filter_mutator;
2284   bits = 0;
2285   if (0 != (pgc->bm & GET_MESSAGE_BIT_RETURN_TO))
2286     pgc->reply_to.hashPubKey = opt[bits++];
2287   else
2288     pgc->reply_to = *other;
2289   if (0 != (pgc->bm & GET_MESSAGE_BIT_SKS_NAMESPACE))
2290     pgc->namespace = opt[bits++];
2291   else if (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)
2292     {
2293       GNUNET_break_op (0);
2294       GNUNET_free (pgc);
2295       return GNUNET_SYSERR;
2296     }
2297   if (0 != (pgc->bm & GET_MESSAGE_BIT_TRANSMIT_TO))
2298     pgc->prime_target.hashPubKey = opt[bits++];
2299   /* note that we can really only check load here since otherwise
2300      peers could find out that we are overloaded by being disconnected
2301      after sending us a malformed query... */
2302   if (GNUNET_YES == test_load_too_high ())
2303     {
2304       if (NULL != pgc->bf)
2305         GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2306       GNUNET_free (pgc);
2307 #if DEBUG_FS
2308       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2309                   "Dropping query from `%s', this peer is too busy.\n",
2310                   GNUNET_i2s (other));
2311 #endif
2312       return GNUNET_OK;
2313     }
2314   net_load_up = 50; // FIXME
2315   net_load_down = 50; // FIXME
2316   pgc->policy = ROUTING_POLICY_NONE;
2317   if ( (net_load_up < IDLE_LOAD_THRESHOLD) &&
2318        (net_load_down < IDLE_LOAD_THRESHOLD) )
2319     {
2320       pgc->policy |= ROUTING_POLICY_ALL;
2321       pgc->priority = 0; /* no charge */
2322     }
2323   else
2324     {
2325       pgc->priority = bound_priority (ntohl (gm->priority), other);
2326       if ( (net_load_up < 
2327             IDLE_LOAD_THRESHOLD + pgc->priority * pgc->priority) &&
2328            (net_load_down < 
2329             IDLE_LOAD_THRESHOLD + pgc->priority * pgc->priority) )
2330         {
2331           pgc->policy |= ROUTING_POLICY_ALL;
2332         }
2333       else
2334         {
2335           // FIXME: is this sound?
2336           if (net_load_up < 90 + 10 * pgc->priority)
2337             pgc->policy |= ROUTING_POLICY_FORWARD;
2338           if (net_load_down < 90 + 10 * pgc->priority)
2339             pgc->policy |= ROUTING_POLICY_ANSWER;
2340         }
2341     }
2342   if (pgc->policy == ROUTING_POLICY_NONE)
2343     {
2344 #if DEBUG_FS
2345       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2346                   "Dropping query from `%s', network saturated.\n",
2347                   GNUNET_i2s (other));
2348 #endif
2349       if (NULL != pgc->bf)
2350         GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2351       GNUNET_free (pgc);
2352       return GNUNET_OK;     /* drop */
2353     }
2354   if ((pgc->policy & ROUTING_POLICY_INDIRECT) != ROUTING_POLICY_INDIRECT)
2355     pgc->priority = 0;  /* kill the priority (we cannot benefit) */
2356   pgc->ttl = bound_ttl (ntohl (gm->ttl), pgc->priority);
2357   /* decrement ttl (always) */
2358   ttl_decrement = 2 * TTL_DECREMENT +
2359     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2360                               TTL_DECREMENT);
2361   if ( (pgc->ttl < 0) &&
2362        (pgc->ttl - ttl_decrement > 0) )
2363     {
2364 #if DEBUG_FS
2365       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2366                   "Dropping query from `%s' due to TTL underflow.\n",
2367                   GNUNET_i2s (other));
2368 #endif
2369       /* integer underflow => drop (should be very rare)! */
2370       if (NULL != pgc->bf)
2371         GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2372       GNUNET_free (pgc);
2373       return GNUNET_OK;
2374     }
2375   pgc->ttl -= ttl_decrement;
2376   pgc->start_time = GNUNET_TIME_absolute_get ();
2377   preference = (double) pgc->priority;
2378   if (preference < QUERY_BANDWIDTH_VALUE)
2379     preference = QUERY_BANDWIDTH_VALUE;
2380   // FIXME: also reserve bandwidth for reply?
2381   (void) GNUNET_CORE_peer_change_preference (sched, cfg,
2382                                              other,
2383                                              GNUNET_TIME_UNIT_FOREVER_REL,
2384                                              0, 0, preference, NULL, NULL);
2385   if (0 != (pgc->policy & ROUTING_POLICY_ANSWER))
2386     pgc->drq = queue_ds_request (BASIC_DATASTORE_REQUEST_DELAY,
2387                                  &ds_get_request,
2388                                  pgc);
2389   else
2390     GNUNET_SCHEDULER_add_continuation (sched,
2391                                        &forward_get_request,
2392                                        pgc,
2393                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
2394   return GNUNET_OK;
2395 }
2396
2397
2398 /**
2399  * Function called to notify us that we can now transmit a reply to a
2400  * client or peer.  "buf" will be NULL and "size" zero if the socket was
2401  * closed for writing in the meantime.
2402  *
2403  * @param cls closure, points to a "struct PendingRequest*" with
2404  *            one or more pending replies
2405  * @param size number of bytes available in buf
2406  * @param buf where the callee should write the message
2407  * @return number of bytes written to buf
2408  */
2409 static size_t
2410 transmit_result (void *cls,
2411                  size_t size, 
2412                  void *buf)
2413 {
2414   struct PendingRequest *pr = cls;
2415   char *cbuf = buf;
2416   struct PendingMessage *reply;
2417   size_t ret;
2418
2419   ret = 0;
2420   while (NULL != (reply = pr->replies_pending))
2421     {
2422       if ( (reply->msize + ret < ret) ||
2423            (reply->msize + ret > size) )
2424         break;
2425       pr->replies_pending = reply->next;
2426       memcpy (&cbuf[ret], &reply[1], reply->msize);
2427       ret += reply->msize;
2428       GNUNET_free (reply);
2429     }
2430   return ret;
2431 }
2432
2433
2434 /**
2435  * We have received a reply; handle it!
2436  *
2437  * @param cls response (struct ProcessReplyClosure)
2438  * @param key our query
2439  * @param value value in the hash map (meta-info about the query)
2440  * @return GNUNET_YES (we should continue to iterate)
2441  */
2442 static int
2443 process_reply (void *cls,
2444                const GNUNET_HashCode * key,
2445                void *value)
2446 {
2447   struct ProcessReplyClosure *prq = cls;
2448   struct PendingRequest *pr = value;
2449   struct PendingRequest *eer;
2450   struct PendingMessage *reply;
2451   struct PutMessage *pm;
2452   struct ContentMessage *cm;
2453   struct ConnectedPeer *cp;
2454   GNUNET_HashCode chash;
2455   GNUNET_HashCode mhash;
2456   struct GNUNET_PeerIdentity target;
2457   size_t msize;
2458   uint32_t prio;
2459   struct GNUNET_TIME_Relative max_delay;
2460   
2461   GNUNET_CRYPTO_hash (prq->data,
2462                       prq->size,
2463                       &chash);
2464   switch (prq->type)
2465     {
2466     case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
2467     case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
2468       break;
2469     case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
2470       /* FIXME: does prq->namespace match our expectations? */
2471       /* then: fall-through??? */
2472     case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
2473       if (pr->bf != NULL) 
2474         {
2475           mingle_hash (&chash, pr->mingle, &mhash);
2476           if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (pr->bf,
2477                                                                &mhash))
2478             return GNUNET_YES; /* duplicate */
2479           GNUNET_CONTAINER_bloomfilter_add (pr->bf,
2480                                             &mhash);
2481         }
2482       break;
2483     case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK:
2484       // FIXME: any checks against duplicates for SKBlocks?
2485       break;
2486     }
2487   prio = pr->priority;
2488   prq->priority += pr->remaining_priority;
2489   pr->remaining_priority = 0;
2490   if (pr->client != NULL)
2491     {
2492       if (pr->replies_seen_size == pr->replies_seen_off)
2493         GNUNET_array_grow (pr->replies_seen,
2494                            pr->replies_seen_size,
2495                            pr->replies_seen_size * 2 + 4);
2496       pr->replies_seen[pr->replies_seen_off++] = chash;
2497       // FIXME: possibly recalculate BF!
2498     }
2499   if (pr->client == NULL)
2500     {
2501       GNUNET_PEER_resolve (pr->source_pid,
2502                            &target);
2503       cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2504                                               &target.hashPubKey);
2505       msize = sizeof (struct ContentMessage) + prq->size;
2506       reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
2507       reply->msize = msize;
2508       reply->priority = (uint32_t) -1; /* send replies first! */
2509       cm = (struct ContentMessage*) &reply[1];
2510       cm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT);
2511       cm->header.size = htons (msize);
2512       cm->type = htonl (prq->type);
2513       cm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
2514       memcpy (&reply[1], prq->data, prq->size);
2515       max_delay = GNUNET_TIME_UNIT_FOREVER_REL;
2516       if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) >= max_pending_requests)
2517         {
2518           /* estimate expiration time from time difference between
2519              first request that will be discarded and this request */
2520           eer = GNUNET_CONTAINER_heap_peek (requests_by_expiration);
2521           max_delay = GNUNET_TIME_absolute_get_difference (pr->start_time,
2522                                                            eer->start_time);
2523         }
2524
2525       if (cp == NULL)
2526         {
2527           /* FIXME: bound queue size! */
2528           reply->next = pr->replies_pending;
2529           pr->replies_pending = reply;
2530           if (pr->cth == NULL)
2531             {
2532               /* implicitly tries to connect */
2533               pr->cth = GNUNET_CORE_notify_transmit_ready (core,
2534                                                            prio,
2535                                                            max_delay,
2536                                                            &target,
2537                                                            msize,
2538                                                            &transmit_result,
2539                                                            pr);
2540             }
2541         }
2542       else
2543         {
2544           /* insert replies always at the head */
2545           reply->next = cp->pending_messages;
2546           cp->pending_messages = reply;
2547           if (cp->cth == NULL)
2548             cp->cth = GNUNET_CORE_notify_transmit_ready (core,
2549                                                          reply->priority,
2550                                                          GNUNET_TIME_UNIT_FOREVER_REL,
2551                                                          &target,
2552                                                          msize,
2553                                                          &transmit_request_cb,
2554                                                          cp);
2555         }
2556     }
2557   else
2558     {
2559       msize = sizeof (struct PutMessage) + prq->size;
2560       reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
2561       reply->msize = msize;
2562       reply->next = pr->replies_pending;
2563       pm = (struct PutMessage*) &reply[1];
2564       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2565       pm->header.size = htons (msize);
2566       pm->type = htonl (prq->type);
2567       pm->expiration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (prq->expiration));
2568       pr->replies_pending = reply;
2569       memcpy (&reply[1], prq->data, prq->size);
2570       if (pr->th != NULL)
2571         return GNUNET_YES;
2572       pr->th = GNUNET_SERVER_notify_transmit_ready (pr->client,
2573                                                     msize,
2574                                                     GNUNET_TIME_UNIT_FOREVER_REL,
2575                                                     &transmit_result,
2576                                                     pr);
2577       if (pr->th == NULL)
2578         {
2579           // FIXME: need to try again later (not much
2580           // to do here specifically, but we need to
2581           // check somewhere else to handle this case!)
2582         }
2583     }
2584   // FIXME: implement hot-path routing statistics keeping!
2585   return GNUNET_YES;
2586 }
2587
2588
2589 /**
2590  * Check if the given KBlock is well-formed.
2591  *
2592  * @param kb the kblock data (or at least "dsize" bytes claiming to be one)
2593  * @param dsize size of "kb" in bytes; check for < sizeof(struct KBlock)!
2594  * @param query where to store the query that this block answers
2595  * @return GNUNET_OK if this is actually a well-formed KBlock
2596  */
2597 static int
2598 check_kblock (const struct KBlock *kb,
2599               size_t dsize,
2600               GNUNET_HashCode *query)
2601 {
2602   if (dsize < sizeof (struct KBlock))
2603     {
2604       GNUNET_break_op (0);
2605       return GNUNET_SYSERR;
2606     }
2607   if (dsize - sizeof (struct KBlock) !=
2608       ntohs (kb->purpose.size) 
2609       - sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) 
2610       - sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) ) 
2611     {
2612       GNUNET_break_op (0);
2613       return GNUNET_SYSERR;
2614     }
2615   if (GNUNET_OK !=
2616       GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_KBLOCK,
2617                                 &kb->purpose,
2618                                 &kb->signature,
2619                                 &kb->keyspace)) 
2620     {
2621       GNUNET_break_op (0);
2622       return GNUNET_SYSERR;
2623     }
2624   if (query != NULL)
2625     GNUNET_CRYPTO_hash (&kb->keyspace,
2626                         sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
2627                         query);
2628   return GNUNET_OK;
2629 }
2630
2631
2632 /**
2633  * Check if the given SBlock is well-formed.
2634  *
2635  * @param sb the sblock data (or at least "dsize" bytes claiming to be one)
2636  * @param dsize size of "kb" in bytes; check for < sizeof(struct SBlock)!
2637  * @param query where to store the query that this block answers
2638  * @param namespace where to store the namespace that this block belongs to
2639  * @return GNUNET_OK if this is actually a well-formed SBlock
2640  */
2641 static int
2642 check_sblock (const struct SBlock *sb,
2643               size_t dsize,
2644               GNUNET_HashCode *query,   
2645               GNUNET_HashCode *namespace)
2646 {
2647   if (dsize < sizeof (struct SBlock))
2648     {
2649       GNUNET_break_op (0);
2650       return GNUNET_SYSERR;
2651     }
2652   if (dsize !=
2653       ntohs (sb->purpose.size) + sizeof (struct GNUNET_CRYPTO_RsaSignature))
2654     {
2655       GNUNET_break_op (0);
2656       return GNUNET_SYSERR;
2657     }
2658   if (GNUNET_OK !=
2659       GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_SBLOCK,
2660                                 &sb->purpose,
2661                                 &sb->signature,
2662                                 &sb->subspace)) 
2663     {
2664       GNUNET_break_op (0);
2665       return GNUNET_SYSERR;
2666     }
2667   if (query != NULL)
2668     *query = sb->identifier;
2669   if (namespace != NULL)
2670     GNUNET_CRYPTO_hash (&sb->subspace,
2671                         sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
2672                         namespace);
2673   return GNUNET_OK;
2674 }
2675
2676
2677 /**
2678  * Handle P2P "PUT" request.
2679  *
2680  * @param cls closure, always NULL
2681  * @param other the other peer involved (sender or receiver, NULL
2682  *        for loopback messages where we are both sender and receiver)
2683  * @param message the actual message
2684  * @param latency reported latency of the connection with 'other'
2685  * @param distance reported distance (DV) to 'other' 
2686  * @return GNUNET_OK to keep the connection open,
2687  *         GNUNET_SYSERR to close it (signal serious error)
2688  */
2689 static int
2690 handle_p2p_put (void *cls,
2691                 const struct GNUNET_PeerIdentity *other,
2692                 const struct GNUNET_MessageHeader *message,
2693                 struct GNUNET_TIME_Relative latency,
2694                 uint32_t distance)
2695 {
2696   const struct PutMessage *put;
2697   uint16_t msize;
2698   size_t dsize;
2699   uint32_t type;
2700   struct GNUNET_TIME_Absolute expiration;
2701   GNUNET_HashCode query;
2702   struct ProcessReplyClosure prq;
2703
2704   msize = ntohs (message->size);
2705   if (msize < sizeof (struct PutMessage))
2706     {
2707       GNUNET_break_op(0);
2708       return GNUNET_SYSERR;
2709     }
2710   put = (const struct PutMessage*) message;
2711   dsize = msize - sizeof (struct PutMessage);
2712   type = ntohl (put->type);
2713   expiration = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (put->expiration));
2714
2715   /* first, validate! */
2716   switch (type)
2717     {
2718     case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
2719     case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
2720       GNUNET_CRYPTO_hash (&put[1], dsize, &query);
2721       break;
2722     case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
2723       if (GNUNET_OK !=
2724           check_kblock ((const struct KBlock*) &put[1],
2725                         dsize,
2726                         &query))
2727         return GNUNET_SYSERR;
2728       break;
2729     case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
2730       if (GNUNET_OK !=
2731           check_sblock ((const struct SBlock*) &put[1],
2732                         dsize,
2733                         &query,
2734                         &prq.namespace))
2735         return GNUNET_SYSERR;
2736       break;
2737     case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK:
2738       // FIXME -- validate SKBLOCK!
2739       GNUNET_break (0);
2740       return GNUNET_OK;
2741     default:
2742       /* unknown block type */
2743       GNUNET_break_op (0);
2744       return GNUNET_SYSERR;
2745     }
2746
2747   /* now, lookup 'query' */
2748   prq.data = (const void*) &put[1];
2749   prq.size = dsize;
2750   prq.type = type;
2751   prq.expiration = expiration;
2752   prq.priority = 0;
2753   GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_query,
2754                                               &query,
2755                                               &process_reply,
2756                                               &prq);
2757   // FIXME: if migration is on and load is low,
2758   // queue to store data in datastore;
2759   // use "prq.priority" for that!
2760   return GNUNET_OK;
2761 }
2762
2763
2764 /**
2765  * List of handlers for P2P messages
2766  * that we care about.
2767  */
2768 static struct GNUNET_CORE_MessageHandler p2p_handlers[] =
2769   {
2770     { &handle_p2p_get, 
2771       GNUNET_MESSAGE_TYPE_FS_GET, 0 },
2772     { &handle_p2p_put, 
2773       GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
2774     { NULL, 0, 0 }
2775   };
2776
2777
2778 /**
2779  * Process fs requests.
2780  *
2781  * @param cls closure
2782  * @param s scheduler to use
2783  * @param server the initialized server
2784  * @param c configuration to use
2785  */
2786 static void
2787 run (void *cls,
2788      struct GNUNET_SCHEDULER_Handle *s,
2789      struct GNUNET_SERVER_Handle *server,
2790      const struct GNUNET_CONFIGURATION_Handle *c)
2791 {
2792   sched = s;
2793   cfg = c;
2794
2795   requests_by_query = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
2796   requests_by_peer = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
2797   connected_peers = GNUNET_CONTAINER_multihashmap_create (64);
2798   requests_by_expiration = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
2799   GNUNET_FS_init_indexing (sched, cfg);
2800   dsh = GNUNET_DATASTORE_connect (cfg,
2801                                   sched);
2802   core = GNUNET_CORE_connect (sched,
2803                               cfg,
2804                               GNUNET_TIME_UNIT_FOREVER_REL,
2805                               NULL,
2806                               NULL,
2807                               NULL,
2808                               &peer_connect_handler,
2809                               &peer_disconnect_handler,
2810                               NULL, GNUNET_NO,
2811                               NULL, GNUNET_NO,
2812                               p2p_handlers);
2813
2814   GNUNET_SERVER_disconnect_notify (server, 
2815                                    &handle_client_disconnect,
2816                                    NULL);
2817   GNUNET_SERVER_add_handlers (server, handlers);
2818   GNUNET_SCHEDULER_add_delayed (sched,
2819                                 GNUNET_TIME_UNIT_FOREVER_REL,
2820                                 &shutdown_task,
2821                                 NULL);
2822   if (NULL == dsh)
2823     {
2824       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2825                   _("Failed to connect to `%s' service.\n"),
2826                   "datastore");
2827       GNUNET_SCHEDULER_shutdown (sched);
2828       return;
2829     }
2830   if (NULL == core)
2831     {
2832       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2833                   _("Failed to connect to `%s' service.\n"),
2834                   "core");
2835       GNUNET_SCHEDULER_shutdown (sched);
2836       return;
2837     }
2838 }
2839
2840
2841 /**
2842  * The main function for the fs service.
2843  *
2844  * @param argc number of arguments from the command line
2845  * @param argv command line arguments
2846  * @return 0 ok, 1 on error
2847  */
2848 int
2849 main (int argc, char *const *argv)
2850 {
2851   return (GNUNET_OK ==
2852           GNUNET_SERVICE_run (argc,
2853                               argv,
2854                               "fs",
2855                               GNUNET_SERVICE_OPTION_NONE,
2856                               &run, NULL)) ? 0 : 1;
2857 }
2858
2859 /* end of gnunet-service-fs.c */