more work on fs
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs.c
23  * @brief program that provides the file-sharing service
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - tracking of PendingRequests (and defining that struct...)
28  * - setup P2P search on CS request
29  * - setup P2P search on P2P GET
30  * - forward replies based on tracked requests
31  * - validation of KBLOCKS (almost done)
32  * - validation of SBLOCKS
33  * - validation of KSBLOCKS
34  * - content migration (put in local DS)
35  * - possible major issue: we may
36  *   queue "gazillions" of (K|S)Blocks for the
37  *   core to transmit to another peer; need
38  *   to make sure this is bounded overall...
39  * - various load-based actions (can wait)
40  * - remove on-demand blocks if they keep failing (can wait)
41  */
42 #include "platform.h"
43 #include "gnunet_core_service.h"
44 #include "gnunet_datastore_service.h"
45 #include "gnunet_peer_lib.h"
46 #include "gnunet_protocols.h"
47 #include "gnunet_signatures.h"
48 #include "gnunet_util_lib.h"
49 #include "fs.h"
50
51
52 /**
53  * In-memory information about indexed files (also available
54  * on-disk).
55  */
56 struct IndexInfo
57 {
58   
59   /**
60    * This is a linked list.
61    */
62   struct IndexInfo *next;
63
64   /**
65    * Name of the indexed file.  Memory allocated
66    * at the end of this struct (do not free).
67    */
68   const char *filename;
69
70   /**
71    * Context for transmitting confirmation to client,
72    * NULL if we've done this already.
73    */
74   struct GNUNET_SERVER_TransmitContext *tc;
75   
76   /**
77    * Hash of the contents of the file.
78    */
79   GNUNET_HashCode file_id;
80
81 };
82
83
84 /**
85  * Signature of a function that is called whenever a datastore
86  * request can be processed (or an entry put on the queue times out).
87  *
88  * @param cls closure
89  * @param ok GNUNET_OK if DS is ready, GNUNET_SYSERR on timeout
90  */
91 typedef void (*RequestFunction)(void *cls,
92                                 int ok);
93
94
95 /**
96  * Doubly-linked list of our requests for the datastore.
97  */
98 struct DatastoreRequestQueue
99 {
100
101   /**
102    * This is a doubly-linked list.
103    */
104   struct DatastoreRequestQueue *next;
105
106   /**
107    * This is a doubly-linked list.
108    */
109   struct DatastoreRequestQueue *prev;
110
111   /**
112    * Function to call (will issue the request).
113    */
114   RequestFunction req;
115
116   /**
117    * Closure for req.
118    */
119   void *req_cls;
120
121   /**
122    * When should this request time-out because we don't care anymore?
123    */
124   struct GNUNET_TIME_Absolute timeout;
125     
126   /**
127    * ID of task used for signaling timeout.
128    */
129   GNUNET_SCHEDULER_TaskIdentifier task;
130
131 };
132
133
134 /**
135  * Closure for processing START_SEARCH messages from a client.
136  */
137 struct LocalGetContext
138 {
139
140   /**
141    * This is a doubly-linked list.
142    */
143   struct LocalGetContext *next;
144
145   /**
146    * This is a doubly-linked list.
147    */
148   struct LocalGetContext *prev;
149
150   /**
151    * Client that initiated the search.
152    */
153   struct GNUNET_SERVER_Client *client;
154
155   /**
156    * Array of results that we've already received 
157    * (can be NULL).
158    */
159   GNUNET_HashCode *results; 
160
161   /**
162    * Bloomfilter over all results (for fast query construction);
163    * NULL if we don't have any results.
164    */
165   struct GNUNET_CONTAINER_BloomFilter *results_bf; 
166
167   /**
168    * DS request associated with this operation.
169    */
170   struct DatastoreRequestQueue *req;
171
172   /**
173    * Current result message to transmit to client (or NULL).
174    */
175   struct ContentMessage *result;
176   
177   /**
178    * Type of the content that we're looking for.
179    * 0 for any.
180    */
181   uint32_t type;
182
183   /**
184    * Desired anonymity level.
185    */
186   uint32_t anonymity_level;
187
188   /**
189    * Number of results actually stored in the results array.
190    */
191   unsigned int results_used;
192   
193   /**
194    * Size of the results array in memory.
195    */
196   unsigned int results_size;
197
198   /**
199    * If the request is for a DBLOCK or IBLOCK, this is the identity of
200    * the peer that is known to have a response.  Set to all-zeros if
201    * such a target is not known (note that even if OUR anonymity
202    * level is >0 we may happen to know the responder's identity;
203    * nevertheless, we should probably not use it for a DHT-lookup
204    * or similar blunt actions in order to avoid exposing ourselves).
205    * <p>
206    * If the request is for an SBLOCK, this is the identity of the
207    * pseudonym to which the SBLOCK belongs. 
208    * <p>
209    * If the request is for a KBLOCK, "target" must be all zeros.
210    */
211   GNUNET_HashCode target;
212
213   /**
214    * Hash of the keyword (aka query) for KBLOCKs; Hash of
215    * the CHK-encoded block for DBLOCKS and IBLOCKS (aka query)
216    * and hash of the identifier XORed with the target for
217    * SBLOCKS (aka query).
218    */
219   GNUNET_HashCode query;
220
221 };
222
223
224 /**
225  * Possible routing policies for an FS-GET request.
226  */
227 enum RoutingPolicy
228   {
229     /**
230      * Simply drop the request.
231      */
232     ROUTING_POLICY_NONE = 0,
233     
234     /**
235      * Answer it if we can from local datastore.
236      */
237     ROUTING_POLICY_ANSWER = 1,
238
239     /**
240      * Forward the request to other peers (if possible).
241      */
242     ROUTING_POLICY_FORWARD = 2,
243
244     /**
245      * Forward to other peers, and ask them to route
246      * the response via ourselves.
247      */
248     ROUTING_POLICY_INDIRECT = 6,
249     
250     /**
251      * Do everything we could possibly do (that would
252      * make sense).
253      */
254     ROUTING_POLICY_ALL = 7
255   };
256
257
258 /**
259  * Internal context we use for our initial processing
260  * of a GET request.
261  */
262 struct ProcessGetContext
263 {
264   /**
265    * The search query (used for datastore lookup).
266    */
267   GNUNET_HashCode query;
268   
269   /**
270    * Which peer we should forward the response to.
271    */
272   struct GNUNET_PeerIdentity reply_to;
273
274   /**
275    * Namespace for the result (only set for SKS requests)
276    */
277   GNUNET_HashCode namespace;
278
279   /**
280    * Peer that we should forward the query to if possible
281    * (since that peer likely has the content).
282    */
283   struct GNUNET_PeerIdentity prime_target;
284
285   /**
286    * When did we receive this request?
287    */
288   struct GNUNET_TIME_Absolute start_time;
289
290   /**
291    * Our entry in the DRQ (non-NULL while we wait for our
292    * turn to interact with the local database).
293    */
294   struct DatastoreRequestQueue *drq;
295
296   /**
297    * Filter used to eliminate duplicate
298    * results.   Can be NULL if we are
299    * not yet filtering any results.
300    */
301   struct GNUNET_CONTAINER_BloomFilter *bf;
302
303   /**
304    * Bitmap describing which of the optional
305    * hash codes / peer identities were given to us.
306    */
307   uint32_t bm;
308
309   /**
310    * Desired block type.
311    */
312   uint32_t type;
313
314   /**
315    * Priority of the request.
316    */
317   uint32_t priority;
318
319   /**
320    * In what ways are we going to process
321    * the request?
322    */
323   enum RoutingPolicy policy;
324
325   /**
326    * Time-to-live for the request (value
327    * we use).
328    */
329   int32_t ttl;
330
331   /**
332    * Number to mingle hashes for bloom-filter
333    * tests with.
334    */
335   int32_t mingle;
336
337   /**
338    * Number of results that were found so far.
339    */
340   unsigned int results_found;
341 };
342
343
344 /**
345  * Information we keep for each pending request.  We should try to
346  * keep this struct as small as possible since its memory consumption
347  * is key to how many requests we can have pending at once.
348  */
349 struct PendingRequest
350 {
351
352   /**
353    * ID of a client making a request, NULL if this entry is for a
354    * peer.
355    */
356   struct GNUNET_SERVER_Client *client;
357
358   /**
359    * If this is a namespace query, pointer to the hash of the public
360    * key of the namespace; otherwise NULL.
361    */
362   GNUNET_HashCode *namespace;
363
364   /**
365    * Bloomfilter we use to filter out replies that we don't care about
366    * (anymore).  NULL as long as we are interested in all replies.
367    */
368   struct GNUNET_CONTAINER_BloomFilter *bf;
369
370   /**
371    * Hash code of all replies that we have seen so far (only valid
372    * if client is not NULL since we only track replies like this for
373    * our own clients).
374    */
375   GNUNET_HashCode *replies_seen;
376
377   /**
378    * When did we first see this request (form this peer), or, if our
379    * client is initiating, when did we last initiate a search?
380    */
381   struct GNUNET_TIME_Absolute start_time;
382
383   /**
384    * The query that this request is for.
385    */
386   GNUNET_HashCode query;
387
388   /**
389    * (Interned) Peer identifier (only valid if "client" is NULL)
390    * that identifies a peer that gave us this request.
391    */
392   GNUNET_PEER_Id source_pid;
393
394   /**
395    * (Interned) Peer identifier that identifies a preferred target
396    * for requests.
397    */
398   GNUNET_PEER_Id target_pid;
399
400   /**
401    * (Interned) Peer identifiers of peers that have already
402    * received our query for this content.
403    */
404   GNUNET_PEER_Id *used_pids;
405
406   /**
407    * How many entries in "used_pids" are actually valid?
408    */
409   unsigned int used_pids_off;
410
411   /**
412    * How long is the "used_pids" array?
413    */
414   unsigned int used_pids_size;
415
416   /**
417    * How many entries in "replies_seen" are actually valid?
418    */
419   unsigned int replies_seen_off;
420
421   /**
422    * How long is the "replies_seen" array?
423    */
424   unsigned int replies_seen_size;
425   
426   /**
427    * Priority with which this request was made.  If one of our clients
428    * made the request, then this is the current priority that we are
429    * using when initiating the request.  This value is used when
430    * we decide to reward other peers with trust for providing a reply.
431    */
432   uint32_t priority;
433
434   /**
435    * Priority points left for us to spend when forwarding this request
436    * to other peers.
437    */
438   uint32_t remaining_priority;
439
440   /**
441    * TTL with which we saw this request (or, if we initiated, TTL that
442    * we used for the request).
443    */
444   int32_t ttl;
445   
446   /**
447    * Type of the content that this request is for.
448    */
449   uint32_t type;
450
451 };
452
453
454 /**
455  * Closure for "process_reply" function.
456  */
457 struct ProcessReplyClosure
458 {
459   /**
460    * The data for the reply.
461    */
462   const void *data;
463
464   /**
465    * When the reply expires.
466    */
467   struct GNUNET_TIME_Absolute expiration;
468
469   /**
470    * Size of data.
471    */
472   size_t size;
473
474   /**
475    * Type of the block.
476    */
477   uint32_t type;
478
479   /**
480    * How much was this reply worth to us?
481    */
482   uint32_t priority;
483 };
484
485
486 /**
487  * Map from queries to pending requests ("struct PendingRequest") for
488  * this query.
489  */
490 static struct GNUNET_CONTAINER_MultiHashMap *request_map;
491
492 /**
493  * Our connection to the datastore.
494  */
495 static struct GNUNET_DATASTORE_Handle *dsh;
496
497 /**
498  * Our scheduler.
499  */
500 static struct GNUNET_SCHEDULER_Handle *sched;
501
502 /**
503  * Our configuration.
504  */
505 const struct GNUNET_CONFIGURATION_Handle *cfg;
506
507 /**
508  * Handle to the core service (NULL until we've
509  * connected to it).
510  */
511 struct GNUNET_CORE_Handle *core;
512
513 /**
514  * Head of doubly-linked LGC list.
515  */
516 static struct LocalGetContext *lgc_head;
517
518 /**
519  * Tail of doubly-linked LGC list.
520  */
521 static struct LocalGetContext *lgc_tail;
522
523 /**
524  * Head of request queue for the datastore, sorted by timeout.
525  */
526 static struct DatastoreRequestQueue *drq_head;
527
528 /**
529  * Tail of request queue for the datastore.
530  */
531 static struct DatastoreRequestQueue *drq_tail;
532
533 /**
534  * Linked list of indexed files.
535  */
536 static struct IndexInfo *indexed_files;
537
538 /**
539  * Maps hash over content of indexed files
540  * to the respective filename.  The filenames
541  * are pointers into the indexed_files linked
542  * list and do not need to be freed.
543  */
544 static struct GNUNET_CONTAINER_MultiHashMap *ifm;
545
546
547 /**
548  * Write the current index information list to disk.
549  */ 
550 static void
551 write_index_list ()
552 {
553   struct GNUNET_BIO_WriteHandle *wh;
554   char *fn;
555   struct IndexInfo *pos;  
556
557   if (GNUNET_OK !=
558       GNUNET_CONFIGURATION_get_value_filename (cfg,
559                                                "FS",
560                                                "INDEXDB",
561                                                &fn))
562     {
563       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
564                   _("Configuration option `%s' in section `%s' missing.\n"),
565                   "INDEXDB",
566                   "FS");
567       return;
568     }
569   wh = GNUNET_BIO_write_open (fn);
570   if (NULL == wh)
571     {
572       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
573                   _("Could not open `%s'.\n"),
574                   fn);
575       GNUNET_free (fn);
576       return;
577     }
578   pos = indexed_files;
579   while (pos != NULL)
580     {
581       if ( (GNUNET_OK !=
582             GNUNET_BIO_write (wh,
583                               &pos->file_id,
584                               sizeof (GNUNET_HashCode))) ||
585            (GNUNET_OK !=
586             GNUNET_BIO_write_string (wh,
587                                      pos->filename)) )
588         break;
589       pos = pos->next;
590     }
591   if (GNUNET_OK != 
592       GNUNET_BIO_write_close (wh))
593     {
594       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
595                   _("Error writing `%s'.\n"),
596                   fn);
597       GNUNET_free (fn);
598       return;
599     }
600   GNUNET_free (fn);
601 }
602
603
604 /**
605  * Read index information from disk.
606  */
607 static void
608 read_index_list ()
609 {
610   struct GNUNET_BIO_ReadHandle *rh;
611   char *fn;
612   struct IndexInfo *pos;  
613   char *fname;
614   GNUNET_HashCode hc;
615   size_t slen;
616   char *emsg;
617
618   if (GNUNET_OK !=
619       GNUNET_CONFIGURATION_get_value_filename (cfg,
620                                                "FS",
621                                                "INDEXDB",
622                                                &fn))
623     {
624       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
625                   _("Configuration option `%s' in section `%s' missing.\n"),
626                   "INDEXDB",
627                   "FS");
628       return;
629     }
630   rh = GNUNET_BIO_read_open (fn);
631   if (NULL == rh)
632     {
633       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
634                   _("Could not open `%s'.\n"),
635                   fn);
636       GNUNET_free (fn);
637       return;
638     }
639
640   while ( (GNUNET_OK ==
641            GNUNET_BIO_read (rh,
642                             "Hash of indexed file",
643                             &hc,
644                             sizeof (GNUNET_HashCode))) &&
645           (GNUNET_OK ==
646            GNUNET_BIO_read_string (rh, 
647                                    "Name of indexed file",
648                                    &fname,
649                                    1024 * 16)) )
650     {
651       slen = strlen (fname) + 1;
652       pos = GNUNET_malloc (sizeof (struct IndexInfo) + slen);
653       pos->file_id = hc;
654       pos->filename = (const char *) &pos[1];
655       memcpy (&pos[1], fname, slen);
656       if (GNUNET_SYSERR ==
657           GNUNET_CONTAINER_multihashmap_put (ifm,
658                                              &hc,
659                                              (void*) pos->filename,
660                                              GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
661         {
662           GNUNET_free (pos);
663         }
664       else
665         {
666           pos->next = indexed_files;
667           indexed_files = pos;
668         }
669     }
670   if (GNUNET_OK != 
671       GNUNET_BIO_read_close (rh, &emsg))
672     GNUNET_free (emsg);
673   GNUNET_free (fn);
674 }
675
676
677 /**
678  * We've validated the hash of the file we're about to
679  * index.  Signal success to the client and update
680  * our internal data structures.
681  *
682  * @param ii the index info entry for the request
683  */
684 static void
685 signal_index_ok (struct IndexInfo *ii)
686 {
687   if (GNUNET_SYSERR ==
688       GNUNET_CONTAINER_multihashmap_put (ifm,
689                                          &ii->file_id,
690                                          (void*) ii->filename,
691                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
692     {
693       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
694                   _("Index request received for file `%s' is indexed as `%s'.  Permitting anyway.\n"),
695                   ii->filename,
696                   (const char*) GNUNET_CONTAINER_multihashmap_get (ifm,
697                                                                    &ii->file_id));
698       GNUNET_SERVER_transmit_context_append (ii->tc,
699                                              NULL, 0,
700                                              GNUNET_MESSAGE_TYPE_FS_INDEX_START_OK);
701       GNUNET_SERVER_transmit_context_run (ii->tc,
702                                           GNUNET_TIME_UNIT_MINUTES);
703       GNUNET_free (ii);
704       return;
705     }
706   ii->next = indexed_files;
707   indexed_files = ii;
708   write_index_list ();
709   GNUNET_SERVER_transmit_context_append (ii->tc,
710                                          NULL, 0,
711                                          GNUNET_MESSAGE_TYPE_FS_INDEX_START_OK);
712   GNUNET_SERVER_transmit_context_run (ii->tc,
713                                       GNUNET_TIME_UNIT_MINUTES);
714   ii->tc = NULL;
715 }
716
717
718 /**
719  * Function called once the hash computation over an
720  * indexed file has completed.
721  *
722  * @param cls closure, our publishing context
723  * @param res resulting hash, NULL on error
724  */
725 static void 
726 hash_for_index_val (void *cls,
727                     const GNUNET_HashCode *
728                     res)
729 {
730   struct IndexInfo *ii = cls;
731   
732   if ( (res == NULL) ||
733        (0 != memcmp (res,
734                      &ii->file_id,
735                      sizeof(GNUNET_HashCode))) )
736     {
737       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
738                   _("Hash mismatch trying to index file `%s'\n"),
739                   ii->filename);
740       GNUNET_SERVER_transmit_context_append (ii->tc,
741                                              NULL, 0,
742                                              GNUNET_MESSAGE_TYPE_FS_INDEX_START_FAILED);
743       GNUNET_SERVER_transmit_context_run (ii->tc,
744                                           GNUNET_TIME_UNIT_MINUTES);
745       GNUNET_free (ii);
746       return;
747     }
748   signal_index_ok (ii);
749 }
750
751
752 /**
753  * Handle INDEX_START-message.
754  *
755  * @param cls closure
756  * @param client identification of the client
757  * @param message the actual message
758  */
759 static void
760 handle_index_start (void *cls,
761                     struct GNUNET_SERVER_Client *client,
762                     const struct GNUNET_MessageHeader *message)
763 {
764   const struct IndexStartMessage *ism;
765   const char *fn;
766   uint16_t msize;
767   struct IndexInfo *ii;
768   size_t slen;
769   uint32_t dev;
770   uint64_t ino;
771   uint32_t mydev;
772   uint64_t myino;
773
774   msize = ntohs(message->size);
775   if ( (msize <= sizeof (struct IndexStartMessage)) ||
776        ( ((const char *)message)[msize-1] != '\0') )
777     {
778       GNUNET_break (0);
779       GNUNET_SERVER_receive_done (client,
780                                   GNUNET_SYSERR);
781       return;
782     }
783   ism = (const struct IndexStartMessage*) message;
784   fn = (const char*) &ism[1];
785   dev = ntohl (ism->device);
786   ino = GNUNET_ntohll (ism->inode);
787   ism = (const struct IndexStartMessage*) message;
788   slen = strlen (fn) + 1;
789   ii = GNUNET_malloc (sizeof (struct IndexInfo) + slen);
790   ii->filename = (const char*) &ii[1];
791   memcpy (&ii[1], fn, slen);
792   ii->file_id = ism->file_id;  
793   ii->tc = GNUNET_SERVER_transmit_context_create (client);
794   if ( ( (dev != 0) ||
795          (ino != 0) ) &&
796        (GNUNET_OK == GNUNET_DISK_file_get_identifiers (fn,
797                                                        &mydev,
798                                                        &myino)) &&
799        ( (dev == mydev) &&
800          (ino == myino) ) )
801     {      
802       /* fast validation OK! */
803       signal_index_ok (ii);
804       return;
805     }
806   /* slow validation, need to hash full file (again) */
807   GNUNET_CRYPTO_hash_file (sched,
808                            GNUNET_SCHEDULER_PRIORITY_IDLE,
809                            GNUNET_NO,
810                            fn,
811                            HASHING_BLOCKSIZE,
812                            &hash_for_index_val,
813                            ii);
814 }
815
816
817 /**
818  * Handle INDEX_LIST_GET-message.
819  *
820  * @param cls closure
821  * @param client identification of the client
822  * @param message the actual message
823  */
824 static void
825 handle_index_list_get (void *cls,
826                        struct GNUNET_SERVER_Client *client,
827                        const struct GNUNET_MessageHeader *message)
828 {
829   struct GNUNET_SERVER_TransmitContext *tc;
830   struct IndexInfoMessage *iim;
831   char buf[GNUNET_SERVER_MAX_MESSAGE_SIZE];
832   size_t slen;
833   const char *fn;
834   struct GNUNET_MessageHeader *msg;
835   struct IndexInfo *pos;
836
837   tc = GNUNET_SERVER_transmit_context_create (client);
838   iim = (struct IndexInfoMessage*) buf;
839   msg = &iim->header;
840   pos = indexed_files;
841   while (NULL != pos)
842     {
843       iim->reserved = 0;
844       iim->file_id = pos->file_id;
845       fn = pos->filename;
846       slen = strlen (fn) + 1;
847       if (slen + sizeof (struct IndexInfoMessage) > 
848           GNUNET_SERVER_MAX_MESSAGE_SIZE)
849         {
850           GNUNET_break (0);
851           break;
852         }
853       memcpy (&iim[1], fn, slen);
854       GNUNET_SERVER_transmit_context_append
855         (tc,
856          &msg[1],
857          sizeof (struct IndexInfoMessage) 
858          - sizeof (struct GNUNET_MessageHeader) + slen,
859          GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_ENTRY);
860       pos = pos->next;
861     }
862   GNUNET_SERVER_transmit_context_append (tc,
863                                          NULL, 0,
864                                          GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_END);
865   GNUNET_SERVER_transmit_context_run (tc,
866                                       GNUNET_TIME_UNIT_MINUTES);
867 }
868
869
870 /**
871  * Handle UNINDEX-message.
872  *
873  * @param cls closure
874  * @param client identification of the client
875  * @param message the actual message
876  */
877 static void
878 handle_unindex (void *cls,
879                 struct GNUNET_SERVER_Client *client,
880                 const struct GNUNET_MessageHeader *message)
881 {
882   const struct UnindexMessage *um;
883   struct IndexInfo *pos;
884   struct IndexInfo *prev;
885   struct IndexInfo *next;
886   struct GNUNET_SERVER_TransmitContext *tc;
887   int found;
888   
889   um = (const struct UnindexMessage*) message;
890   found = GNUNET_NO;
891   prev = NULL;
892   pos = indexed_files;
893   while (NULL != pos)
894     {
895       next = pos->next;
896       if (0 == memcmp (&pos->file_id,
897                        &um->file_id,
898                        sizeof (GNUNET_HashCode)))
899         {
900           if (prev == NULL)
901             indexed_files = pos->next;
902           else
903             prev->next = pos->next;
904           GNUNET_free (pos);
905           found = GNUNET_YES;
906         }
907       else
908         {
909           prev = pos;
910         }
911       pos = next;
912     }
913   if (GNUNET_YES == found)
914     write_index_list ();
915   tc = GNUNET_SERVER_transmit_context_create (client);
916   GNUNET_SERVER_transmit_context_append (tc,
917                                          NULL, 0,
918                                          GNUNET_MESSAGE_TYPE_FS_UNINDEX_OK);
919   GNUNET_SERVER_transmit_context_run (tc,
920                                       GNUNET_TIME_UNIT_MINUTES);
921 }
922
923
924 /**
925  * Run the next DS request in our
926  * queue, we're done with the current one.
927  */
928 static void
929 next_ds_request ()
930 {
931   struct DatastoreRequestQueue *e;
932   
933   while (NULL != (e = drq_head))
934     {
935       if (0 != GNUNET_TIME_absolute_get_remaining (e->timeout).value)
936         break;
937       if (e->task != GNUNET_SCHEDULER_NO_TASK)
938         GNUNET_SCHEDULER_cancel (sched, e->task);
939       GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
940       e->req (e->req_cls, GNUNET_NO);
941       GNUNET_free (e);  
942     }
943   if (e == NULL)
944     return;
945   if (e->task != GNUNET_SCHEDULER_NO_TASK)
946     GNUNET_SCHEDULER_cancel (sched, e->task);
947   e->task = GNUNET_SCHEDULER_NO_TASK;
948   e->req (e->req_cls, GNUNET_YES);
949   GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
950   GNUNET_free (e);  
951 }
952
953
954 /**
955  * A datastore request had to be timed out. 
956  *
957  * @param cls closure (of type "struct DatastoreRequestQueue*")
958  * @param tc task context, unused
959  */
960 static void
961 timeout_ds_request (void *cls,
962                     const struct GNUNET_SCHEDULER_TaskContext *tc)
963 {
964   struct DatastoreRequestQueue *e = cls;
965
966   e->task = GNUNET_SCHEDULER_NO_TASK;
967   GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
968   e->req (e->req_cls, GNUNET_NO);
969   GNUNET_free (e);  
970 }
971
972
973 /**
974  * Queue a request for the datastore.
975  *
976  * @param deadline by when the request should run
977  * @param fun function to call once the request can be run
978  * @param fun_cls closure for fun
979  */
980 static struct DatastoreRequestQueue *
981 queue_ds_request (struct GNUNET_TIME_Relative deadline,
982                   RequestFunction fun,
983                   void *fun_cls)
984 {
985   struct DatastoreRequestQueue *e;
986   struct DatastoreRequestQueue *bef;
987
988   if (drq_head == NULL)
989     {
990       /* no other requests pending, run immediately */
991       fun (fun_cls, GNUNET_OK);
992       return NULL;
993     }
994   e = GNUNET_malloc (sizeof (struct DatastoreRequestQueue));
995   e->timeout = GNUNET_TIME_relative_to_absolute (deadline);
996   e->req = fun;
997   e->req_cls = fun_cls;
998   if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
999     {
1000       /* local request, highest prio, put at head of queue
1001          regardless of deadline */
1002       bef = NULL;
1003     }
1004   else
1005     {
1006       bef = drq_tail;
1007       while ( (NULL != bef) &&
1008               (e->timeout.value < bef->timeout.value) )
1009         bef = bef->prev;
1010     }
1011   GNUNET_CONTAINER_DLL_insert_after (drq_head, drq_tail, bef, e);
1012   if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
1013     return e;
1014   e->task = GNUNET_SCHEDULER_add_delayed (sched,
1015                                           GNUNET_NO,
1016                                           GNUNET_SCHEDULER_PRIORITY_BACKGROUND,
1017                                           GNUNET_SCHEDULER_NO_TASK,
1018                                           deadline,
1019                                           &timeout_ds_request,
1020                                           e);
1021   return e;                                    
1022 }
1023
1024
1025 /**
1026  * Free the state associated with a local get context.
1027  *
1028  * @param lgc the lgc to free
1029  */
1030 static void
1031 local_get_context_free (struct LocalGetContext *lgc) 
1032 {
1033   GNUNET_CONTAINER_DLL_remove (lgc_head, lgc_tail, lgc);
1034   GNUNET_SERVER_client_drop (lgc->client); 
1035   GNUNET_free_non_null (lgc->results);
1036   if (lgc->results_bf != NULL)
1037     GNUNET_CONTAINER_bloomfilter_free (lgc->results_bf);
1038   if (lgc->req != NULL)
1039     {
1040       if (lgc->req->task != GNUNET_SCHEDULER_NO_TASK)
1041         GNUNET_SCHEDULER_cancel (sched, lgc->req->task);
1042       GNUNET_CONTAINER_DLL_remove (lgc_head, lgc_tail, lgc);
1043       GNUNET_free (lgc->req);
1044     }
1045   GNUNET_free (lgc);
1046 }
1047
1048
1049 /**
1050  * We're able to transmit the next (local) result to the client.
1051  * Do it and ask the datastore for more.  Or, on error, tell
1052  * the datastore to stop giving us more.
1053  *
1054  * @param cls our closure (struct LocalGetContext)
1055  * @param max maximum number of bytes we can transmit
1056  * @param buf where to copy our message
1057  * @return number of bytes copied to buf
1058  */
1059 static size_t
1060 transmit_local_result (void *cls,
1061                        size_t max,
1062                        void *buf)
1063 {
1064   struct LocalGetContext *lgc = cls;  
1065   uint16_t msize;
1066
1067   if (NULL == buf)
1068     {
1069       /* error, abort! */
1070       GNUNET_free (lgc->result);
1071       lgc->result = NULL;
1072       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
1073       return 0;
1074     }
1075   msize = ntohs (lgc->result->header.size);
1076   GNUNET_assert (max >= msize);
1077   memcpy (buf, lgc->result, msize);
1078   GNUNET_free (lgc->result);
1079   lgc->result = NULL;
1080   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1081   return msize;
1082 }
1083
1084
1085 /**
1086  * Continuation called from datastore's remove
1087  * function.
1088  *
1089  * @param cls unused
1090  * @param success did the deletion work?
1091  * @param msg error message
1092  */
1093 static void
1094 remove_cont (void *cls,
1095              int success,
1096              const char *msg)
1097 {
1098   if (GNUNET_OK != success)
1099     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1100                 _("Failed to delete bogus block: %s\n"),
1101                 msg);
1102   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1103 }
1104
1105
1106 /**
1107  * Mingle hash with the mingle_number to
1108  * produce different bits.
1109  */
1110 static void
1111 mingle_hash (const GNUNET_HashCode * in,
1112              int32_t mingle_number, 
1113              GNUNET_HashCode * hc)
1114 {
1115   GNUNET_HashCode m;
1116
1117   GNUNET_CRYPTO_hash (&mingle_number, 
1118                       sizeof (int32_t), 
1119                       &m);
1120   GNUNET_CRYPTO_hash_xor (&m, in, hc);
1121 }
1122
1123
1124 /**
1125  * We've received an on-demand encoded block
1126  * from the datastore.  Attempt to do on-demand
1127  * encoding and (if successful), call the 
1128  * continuation with the resulting block.  On
1129  * error, clean up and ask the datastore for
1130  * more results.
1131  *
1132  * @param key key for the content
1133  * @param size number of bytes in data
1134  * @param data content stored
1135  * @param type type of the content
1136  * @param priority priority of the content
1137  * @param anonymity anonymity-level for the content
1138  * @param expiration expiration time for the content
1139  * @param uid unique identifier for the datum;
1140  *        maybe 0 if no unique identifier is available
1141  * @param cont function to call with the actual block
1142  * @param cont_cls closure for cont
1143  */
1144 static void
1145 handle_on_demand_block (const GNUNET_HashCode * key,
1146                         uint32_t size,
1147                         const void *data,
1148                         uint32_t type,
1149                         uint32_t priority,
1150                         uint32_t anonymity,
1151                         struct GNUNET_TIME_Absolute
1152                         expiration, uint64_t uid,
1153                         GNUNET_DATASTORE_Iterator cont,
1154                         void *cont_cls)
1155 {
1156   const struct OnDemandBlock *odb;
1157   GNUNET_HashCode nkey;
1158   struct GNUNET_CRYPTO_AesSessionKey skey;
1159   struct GNUNET_CRYPTO_AesInitializationVector iv;
1160   GNUNET_HashCode query;
1161   ssize_t nsize;
1162   char ndata[DBLOCK_SIZE];
1163   char edata[DBLOCK_SIZE];
1164   const char *fn;
1165   struct GNUNET_DISK_FileHandle *fh;
1166   uint64_t off;
1167
1168   if (size != sizeof (struct OnDemandBlock))
1169     {
1170       GNUNET_break (0);
1171       GNUNET_DATASTORE_remove (dsh, 
1172                                key,
1173                                size,
1174                                data,
1175                                &remove_cont,
1176                                NULL,
1177                                GNUNET_TIME_UNIT_FOREVER_REL);     
1178       return;
1179     }
1180   odb = (const struct OnDemandBlock*) data;
1181   off = GNUNET_ntohll (odb->offset);
1182   fn = (const char*) GNUNET_CONTAINER_multihashmap_get (ifm,
1183                                                         &odb->file_id);
1184   fh = NULL;
1185   if ( (NULL == fn) ||
1186        (NULL == (fh = GNUNET_DISK_file_open (fn, 
1187                                              GNUNET_DISK_OPEN_READ))) ||
1188        (off !=
1189         GNUNET_DISK_file_seek (fh,
1190                                off,
1191                                GNUNET_DISK_SEEK_SET)) ||
1192        (-1 ==
1193         (nsize = GNUNET_DISK_file_read (fh,
1194                                         ndata,
1195                                         sizeof (ndata)))) )
1196     {
1197       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1198                   _("Could not access indexed file `%s' at offset %llu: %s\n"),
1199                   GNUNET_h2s (&odb->file_id),
1200                   (unsigned long long) off,
1201                   STRERROR (errno));
1202       if (fh != NULL)
1203         GNUNET_DISK_file_close (fh);
1204       /* FIXME: if this happens often, we need
1205          to remove the OnDemand block from the DS! */
1206       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);        
1207       return;
1208     }
1209   GNUNET_DISK_file_close (fh);
1210   GNUNET_CRYPTO_hash (ndata,
1211                       nsize,
1212                       &nkey);
1213   GNUNET_CRYPTO_hash_to_aes_key (&nkey, &skey, &iv);
1214   GNUNET_CRYPTO_aes_encrypt (ndata,
1215                              nsize,
1216                              &skey,
1217                              &iv,
1218                              edata);
1219   GNUNET_CRYPTO_hash (edata,
1220                       nsize,
1221                       &query);
1222   if (0 != memcmp (&query, 
1223                    key,
1224                    sizeof (GNUNET_HashCode)))
1225     {
1226       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1227                   _("Indexed file `%s' changed at offset %llu\n"),
1228                   fn,
1229                   (unsigned long long) off);
1230       /* FIXME: if this happens often, we need
1231          to remove the OnDemand block from the DS! */
1232       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1233       return;
1234     }
1235   cont (cont_cls,
1236         key,
1237         nsize,
1238         edata,
1239         GNUNET_DATASTORE_BLOCKTYPE_DBLOCK,
1240         priority,
1241         anonymity,
1242         expiration,
1243         uid);
1244 }
1245
1246
1247 /**
1248  * We're processing (local) results for a search request
1249  * from a (local) client.  Pass applicable results to the
1250  * client and if we are done either clean up (operation
1251  * complete) or switch to P2P search (more results possible).
1252  *
1253  * @param cls our closure (struct LocalGetContext)
1254  * @param key key for the content
1255  * @param size number of bytes in data
1256  * @param data content stored
1257  * @param type type of the content
1258  * @param priority priority of the content
1259  * @param anonymity anonymity-level for the content
1260  * @param expiration expiration time for the content
1261  * @param uid unique identifier for the datum;
1262  *        maybe 0 if no unique identifier is available
1263  */
1264 static void
1265 process_local_get_result (void *cls,
1266                           const GNUNET_HashCode * key,
1267                           uint32_t size,
1268                           const void *data,
1269                           uint32_t type,
1270                           uint32_t priority,
1271                           uint32_t anonymity,
1272                           struct GNUNET_TIME_Absolute
1273                           expiration, 
1274                           uint64_t uid)
1275 {
1276   struct LocalGetContext *lgc = cls;
1277   size_t msize;
1278   unsigned int i;
1279
1280   if (key == NULL)
1281     {
1282       /* no further results from datastore; continue
1283          processing further requests from the client and
1284          allow the next task to use the datastore; also,
1285          switch to P2P requests or clean up our state. */
1286       next_ds_request ();
1287       GNUNET_SERVER_receive_done (lgc->client,
1288                                   GNUNET_OK);
1289       if ( (lgc->results_used == 0) ||
1290            (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
1291            (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
1292            (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) )
1293         {
1294           // FIXME: initiate P2P search
1295           return;
1296         }
1297       /* got all possible results, clean up! */
1298       local_get_context_free (lgc);
1299       return;
1300     }
1301   if (type == GNUNET_DATASTORE_BLOCKTYPE_ONDEMAND)
1302     {
1303       handle_on_demand_block (key, size, data, type, priority, 
1304                               anonymity, expiration, uid,
1305                               &process_local_get_result,
1306                               lgc);
1307       return;
1308     }
1309   if (type != lgc->type)
1310     {
1311       /* this should be virtually impossible to reach (DBLOCK 
1312          query hash being identical to KBLOCK/SBLOCK query hash);
1313          nevertheless, if it happens, the correct thing is to
1314          simply skip the result. */
1315       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);        
1316       return;
1317     }
1318   /* check if this is a result we've alredy
1319      received */
1320   for (i=0;i<lgc->results_used;i++)
1321     if (0 == memcmp (key,
1322                      &lgc->results[i],
1323                      sizeof (GNUNET_HashCode)))
1324       {
1325         GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1326         return; 
1327       }
1328   if (lgc->results_used == lgc->results_size)
1329     GNUNET_array_grow (lgc->results,
1330                        lgc->results_size,
1331                        lgc->results_size * 2 + 2);
1332   GNUNET_CRYPTO_hash (data, 
1333                       size, 
1334                       &lgc->results[lgc->results_used++]);    
1335   msize = size + sizeof (struct ContentMessage);
1336   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1337   lgc->result = GNUNET_malloc (msize);
1338   lgc->result->header.size = htons (msize);
1339   lgc->result->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT);
1340   lgc->result->type = htonl (type);
1341   lgc->result->expiration = GNUNET_TIME_absolute_hton (expiration);
1342   memcpy (&lgc->result[1],
1343           data,
1344           size);
1345   GNUNET_SERVER_notify_transmit_ready (lgc->client,
1346                                        msize,
1347                                        GNUNET_TIME_UNIT_FOREVER_REL,
1348                                        &transmit_local_result,
1349                                        lgc);
1350 }
1351
1352
1353 /**
1354  * We're processing a search request from a local
1355  * client.  Now it is our turn to query the datastore.
1356  * 
1357  * @param cls our closure (struct LocalGetContext)
1358  * @param tc unused
1359  */
1360 static void
1361 transmit_local_get (void *cls,
1362                     const struct GNUNET_SCHEDULER_TaskContext *tc)
1363 {
1364   struct LocalGetContext *lgc = cls;
1365   uint32_t type;
1366   
1367   type = lgc->type;
1368   if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK)
1369     type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */
1370   GNUNET_DATASTORE_get (dsh,
1371                         &lgc->query,
1372                         type,
1373                         &process_local_get_result,
1374                         lgc,
1375                         GNUNET_TIME_UNIT_FOREVER_REL);
1376 }
1377
1378
1379 /**
1380  * We're processing a search request from a local
1381  * client.  Now it is our turn to query the datastore.
1382  * 
1383  * @param cls our closure (struct LocalGetContext)
1384  * @param ok did we succeed to queue for datastore access, should always be GNUNET_OK
1385  */
1386 static void 
1387 transmit_local_get_ready (void *cls,
1388                           int ok)
1389 {
1390   struct LocalGetContext *lgc = cls;
1391
1392   GNUNET_assert (GNUNET_OK == ok);
1393   GNUNET_SCHEDULER_add_continuation (sched,
1394                                      GNUNET_NO,
1395                                      &transmit_local_get,
1396                                      lgc,
1397                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
1398 }
1399
1400
1401 /**
1402  * Handle START_SEARCH-message (search request from client).
1403  *
1404  * @param cls closure
1405  * @param client identification of the client
1406  * @param message the actual message
1407  */
1408 static void
1409 handle_start_search (void *cls,
1410                      struct GNUNET_SERVER_Client *client,
1411                      const struct GNUNET_MessageHeader *message)
1412 {
1413   const struct SearchMessage *sm;
1414   struct LocalGetContext *lgc;
1415   uint16_t msize;
1416   unsigned int sc;
1417   
1418   msize = ntohs (message->size);
1419   if ( (msize < sizeof (struct SearchMessage)) ||
1420        (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
1421     {
1422       GNUNET_break (0);
1423       GNUNET_SERVER_receive_done (client,
1424                                   GNUNET_SYSERR);
1425       return;
1426     }
1427   sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
1428   sm = (const struct SearchMessage*) message;
1429   GNUNET_SERVER_client_keep (client);
1430   lgc = GNUNET_malloc (sizeof (struct LocalGetContext));
1431   if  (sc > 0)
1432     {
1433       lgc->results_used = sc;
1434       GNUNET_array_grow (lgc->results,
1435                          lgc->results_size,
1436                          sc * 2);
1437       memcpy (lgc->results,
1438               &sm[1],
1439               sc * sizeof (GNUNET_HashCode));
1440     }
1441   lgc->client = client;
1442   lgc->type = ntohl (sm->type);
1443   lgc->anonymity_level = ntohl (sm->anonymity_level);
1444   lgc->target = sm->target;
1445   lgc->query = sm->query;
1446   GNUNET_CONTAINER_DLL_insert (lgc_head, lgc_tail, lgc);
1447   lgc->req = queue_ds_request (GNUNET_TIME_UNIT_FOREVER_REL,
1448                                &transmit_local_get_ready,
1449                                lgc);
1450 }
1451
1452
1453 /**
1454  * List of handlers for the messages understood by this
1455  * service.
1456  */
1457 static struct GNUNET_SERVER_MessageHandler handlers[] = {
1458   {&handle_index_start, NULL, 
1459    GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
1460   {&handle_index_list_get, NULL, 
1461    GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
1462   {&handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
1463    sizeof (struct UnindexMessage) },
1464   {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
1465    0 },
1466   {NULL, NULL, 0, 0}
1467 };
1468
1469
1470 /**
1471  * A client disconnected.  Remove all of its pending queries.
1472  *
1473  * @param cls closure, NULL
1474  * @param client identification of the client
1475  */
1476 static void
1477 handle_client_disconnect (void *cls,
1478                           struct GNUNET_SERVER_Client
1479                           * client)
1480 {
1481   struct LocalGetContext *lgc;
1482
1483   lgc = lgc_head;
1484   while ( (NULL != lgc) &&
1485           (lgc->client != client) )
1486     lgc = lgc->next;
1487   if (lgc == NULL)
1488     return; /* not one of our clients */
1489   local_get_context_free (lgc);
1490 }
1491
1492
1493 /**
1494  * Task run during shutdown.
1495  *
1496  * @param cls unused
1497  * @param tc unused
1498  */
1499 static void
1500 shutdown_task (void *cls,
1501                const struct GNUNET_SCHEDULER_TaskContext *tc)
1502 {
1503   struct IndexInfo *pos;  
1504
1505   if (NULL != core)
1506     GNUNET_CORE_disconnect (core);
1507   GNUNET_DATASTORE_disconnect (dsh,
1508                                GNUNET_NO);
1509   dsh = NULL;
1510   // FIXME: iterate over 'request_map' to free entries!
1511   GNUNET_CONTAINER_multihashmap_destroy (request_map);
1512   request_map = NULL;
1513   GNUNET_CONTAINER_multihashmap_destroy (ifm);
1514   ifm = NULL;
1515   while (NULL != (pos = indexed_files))
1516     {
1517       indexed_files = pos->next;
1518       GNUNET_free (pos);
1519     }
1520 }
1521
1522
1523 /**
1524  * Method called whenever a peer disconnects.
1525  *
1526  * @param cls closure, not used
1527  * @param peer peer identity this notification is about
1528  */
1529 static void
1530 peer_disconnect_handler (void *cls,
1531                          const struct
1532                          GNUNET_PeerIdentity * peer)
1533 {
1534   // FIXME: remove all pending requests from this
1535   // peer from our memory
1536   // (iterate over request_map)
1537 }
1538
1539
1540 /**
1541  * We're processing a GET request from
1542  * another peer and have decided to forward
1543  * it to other peers.
1544  *
1545  * @param cls our "struct ProcessGetContext *"
1546  * @param tc unused
1547  */
1548 static void
1549 forward_get_request (void *cls,
1550                      const struct GNUNET_SCHEDULER_TaskContext *tc)
1551 {
1552   struct ProcessGetContext *pgc = cls;
1553
1554   // FIXME: install entry in
1555   // 'request_map' and do actual
1556   // forwarding...
1557   if (pgc->bf != NULL)
1558     GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
1559   GNUNET_free (pgc); 
1560 }
1561
1562
1563 /**
1564  * Transmit the given message by copying it to
1565  * the target buffer "buf".  "buf" will be
1566  * NULL and "size" zero if the socket was closed for
1567  * writing in the meantime.  In that case, only
1568  * free the message
1569  *
1570  * @param cls closure, pointer to the message
1571  * @param size number of bytes available in buf
1572  * @param buf where the callee should write the message
1573  * @return number of bytes written to buf
1574  */
1575 static size_t
1576 transmit_message (void *cls,
1577                   size_t size, void *buf)
1578 {
1579   struct GNUNET_MessageHeader *msg = cls;
1580   uint16_t msize;
1581   
1582   if (NULL == buf)
1583     {
1584 #if DEBUG_FS
1585       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1586                   "Dropping reply, core too busy.\n");
1587 #endif
1588       GNUNET_free (msg);
1589       return 0;
1590     }
1591   msize = ntohs (msg->size);
1592   GNUNET_assert (size >= msize);
1593   memcpy (buf, msg, msize);
1594   GNUNET_free (msg);
1595   return msize;
1596 }
1597
1598
1599 /**
1600  * Test if the load on this peer is too high
1601  * to even consider processing the query at
1602  * all.
1603  * 
1604  * @return GNUNET_YES if the load is too high, GNUNET_NO otherwise
1605  */
1606 static int
1607 test_load_too_high ()
1608 {
1609   return GNUNET_NO; // FIXME
1610 }
1611
1612
1613 /**
1614  * We're processing (local) results for a search request
1615  * from another peer.  Pass applicable results to the
1616  * peer and if we are done either clean up (operation
1617  * complete) or forward to other peers (more results possible).
1618  *
1619  * @param cls our closure (struct LocalGetContext)
1620  * @param key key for the content
1621  * @param size number of bytes in data
1622  * @param data content stored
1623  * @param type type of the content
1624  * @param priority priority of the content
1625  * @param anonymity anonymity-level for the content
1626  * @param expiration expiration time for the content
1627  * @param uid unique identifier for the datum;
1628  *        maybe 0 if no unique identifier is available
1629  */
1630 static void
1631 process_p2p_get_result (void *cls,
1632                         const GNUNET_HashCode * key,
1633                         uint32_t size,
1634                         const void *data,
1635                         uint32_t type,
1636                         uint32_t priority,
1637                         uint32_t anonymity,
1638                         struct GNUNET_TIME_Absolute
1639                         expiration, 
1640                         uint64_t uid)
1641 {
1642   struct ProcessGetContext *pgc = cls;
1643   GNUNET_HashCode dhash;
1644   GNUNET_HashCode mhash;
1645   struct PutMessage *reply;
1646   
1647   if (NULL == key)
1648     {
1649       /* no more results */
1650       if ( ( (pgc->policy & ROUTING_POLICY_FORWARD) ==  ROUTING_POLICY_FORWARD) &&
1651            ( (0 == pgc->results_found) ||
1652              (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
1653              (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
1654              (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) ) )
1655         {
1656           GNUNET_SCHEDULER_add_continuation (sched,
1657                                              GNUNET_NO,
1658                                              &forward_get_request,
1659                                              pgc,
1660                                              GNUNET_SCHEDULER_REASON_PREREQ_DONE);
1661         }
1662       else
1663         {
1664           if (pgc->bf != NULL)
1665             GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
1666           GNUNET_free (pgc); 
1667         }
1668       next_ds_request ();
1669       return;
1670     }
1671   if (type == GNUNET_DATASTORE_BLOCKTYPE_ONDEMAND)
1672     {
1673       handle_on_demand_block (key, size, data, type, priority, 
1674                               anonymity, expiration, uid,
1675                               &process_p2p_get_result,
1676                               pgc);
1677       return;
1678     }
1679   /* check for duplicates */
1680   GNUNET_CRYPTO_hash (data, size, &dhash);
1681   mingle_hash (&dhash, 
1682                pgc->mingle,
1683                &mhash);
1684   if ( (pgc->bf != NULL) &&
1685        (GNUNET_YES ==
1686         GNUNET_CONTAINER_bloomfilter_test (pgc->bf,
1687                                            &mhash)) )
1688     {      
1689 #if DEBUG_FS
1690       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1691                   "Result from datastore filtered by bloomfilter.\n");
1692 #endif
1693       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1694       return;
1695     }
1696   pgc->results_found++;
1697   if ( (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
1698        (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
1699        (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) )
1700     {
1701       if (pgc->bf == NULL)
1702         pgc->bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
1703                                                      32, 
1704                                                      BLOOMFILTER_K);
1705       GNUNET_CONTAINER_bloomfilter_add (pgc->bf, 
1706                                         &mhash);
1707     }
1708
1709   reply = GNUNET_malloc (sizeof (struct PutMessage) + size);
1710   reply->header.size = htons (sizeof (struct PutMessage) + size);
1711   reply->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
1712   reply->type = htonl (type);
1713   reply->expiration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (expiration));
1714   memcpy (&reply[1], data, size);
1715   GNUNET_CORE_notify_transmit_ready (core,
1716                                      pgc->priority,
1717                                      ACCEPTABLE_REPLY_DELAY,
1718                                      &pgc->reply_to,
1719                                      sizeof (struct PutMessage) + size,
1720                                      &transmit_message,
1721                                      reply);
1722   if ( (GNUNET_YES == test_load_too_high()) ||
1723        (pgc->results_found > 5 + 2 * pgc->priority) )
1724     {
1725       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
1726       pgc->policy &= ~ ROUTING_POLICY_FORWARD;
1727       return;
1728     }
1729   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1730 }
1731   
1732
1733 /**
1734  * We're processing a GET request from
1735  * another peer.  Give it to our local
1736  * datastore.
1737  *
1738  * @param cls our "struct ProcessGetContext"
1739  * @param ok did we get a datastore slice or not?
1740  */
1741 static void
1742 ds_get_request (void *cls, 
1743                 int ok)
1744 {
1745   struct ProcessGetContext *pgc = cls;
1746   uint32_t type;
1747   struct GNUNET_TIME_Relative timeout;
1748
1749   if (GNUNET_OK != ok)
1750     {
1751       /* no point in doing P2P stuff if we can't even do local */
1752       GNUNET_free (dsh);
1753       return;
1754     }
1755   type = pgc->type;
1756   if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK)
1757     type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */
1758   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
1759                                            (pgc->priority + 1));
1760   GNUNET_DATASTORE_get (dsh,
1761                         &pgc->query,
1762                         type,
1763                         &process_p2p_get_result,
1764                         pgc,
1765                         timeout);
1766 }
1767
1768
1769 /**
1770  * The priority level imposes a bound on the maximum
1771  * value for the ttl that can be requested.
1772  *
1773  * @param ttl_in requested ttl
1774  * @param priority given priority
1775  * @return ttl_in if ttl_in is below the limit,
1776  *         otherwise the ttl-limit for the given priority
1777  */
1778 static int32_t
1779 bound_ttl (int32_t ttl_in, uint32_t prio)
1780 {
1781   unsigned long long allowed;
1782
1783   if (ttl_in <= 0)
1784     return ttl_in;
1785   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
1786   if (ttl_in > allowed)      
1787     {
1788       if (allowed >= (1 << 30))
1789         return 1 << 30;
1790       return allowed;
1791     }
1792   return ttl_in;
1793 }
1794
1795
1796 /**
1797  * We've received a request with the specified
1798  * priority.  Bound it according to how much
1799  * we trust the given peer.
1800  * 
1801  * @param prio_in requested priority
1802  * @param peer the peer making the request
1803  * @return effective priority
1804  */
1805 static uint32_t
1806 bound_priority (uint32_t prio_in,
1807                 const struct GNUNET_PeerIdentity *peer)
1808 {
1809   return 0; // FIXME!
1810 }
1811
1812
1813 /**
1814  * Handle P2P "GET" request.
1815  *
1816  * @param cls closure, always NULL
1817  * @param peer the other peer involved (sender or receiver, NULL
1818  *        for loopback messages where we are both sender and receiver)
1819  * @param message the actual message
1820  * @return GNUNET_OK to keep the connection open,
1821  *         GNUNET_SYSERR to close it (signal serious error)
1822  */
1823 static int
1824 handle_p2p_get (void *cls,
1825                 const struct GNUNET_PeerIdentity *other,
1826                 const struct GNUNET_MessageHeader *message)
1827 {
1828   uint16_t msize;
1829   const struct GetMessage *gm;
1830   unsigned int bits;
1831   const GNUNET_HashCode *opt;
1832   struct ProcessGetContext *pgc;
1833   uint32_t bm;
1834   size_t bfsize;
1835   uint32_t ttl_decrement;
1836   double preference;
1837   int net_load_up;
1838   int net_load_down;
1839
1840   msize = ntohs(message->size);
1841   if (msize < sizeof (struct GetMessage))
1842     {
1843       GNUNET_break_op (0);
1844       return GNUNET_SYSERR;
1845     }
1846   gm = (const struct GetMessage*) message;
1847   bm = ntohl (gm->hash_bitmap);
1848   bits = 0;
1849   while (bm > 0)
1850     {
1851       if (1 == (bm & 1))
1852         bits++;
1853       bm >>= 1;
1854     }
1855   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
1856     {
1857       GNUNET_break_op (0);
1858       return GNUNET_SYSERR;
1859     }  
1860   opt = (const GNUNET_HashCode*) &gm[1];
1861   bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
1862   pgc = GNUNET_malloc (sizeof (struct ProcessGetContext));
1863   if (bfsize > 0)
1864     pgc->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &pgc[1],
1865                                                  bfsize,
1866                                                  BLOOMFILTER_K);
1867   pgc->type = ntohl (gm->type);
1868   pgc->bm = ntohl (gm->hash_bitmap);
1869   pgc->mingle = gm->filter_mutator;
1870   bits = 0;
1871   if (0 != (pgc->bm & GET_MESSAGE_BIT_RETURN_TO))
1872     pgc->reply_to.hashPubKey = opt[bits++];
1873   else
1874     pgc->reply_to = *other;
1875   if (0 != (pgc->bm & GET_MESSAGE_BIT_SKS_NAMESPACE))
1876     pgc->namespace = opt[bits++];
1877   else if (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)
1878     {
1879       GNUNET_break_op (0);
1880       GNUNET_free (pgc);
1881       return GNUNET_SYSERR;
1882     }
1883   if (0 != (pgc->bm & GET_MESSAGE_BIT_TRANSMIT_TO))
1884     pgc->prime_target.hashPubKey = opt[bits++];
1885   /* note that we can really only check load here since otherwise
1886      peers could find out that we are overloaded by being disconnected
1887      after sending us a malformed query... */
1888   if (GNUNET_YES == test_load_too_high ())
1889     {
1890       if (NULL != pgc->bf)
1891         GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
1892       GNUNET_free (pgc);
1893 #if DEBUG_FS
1894       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1895                   "Dropping query from `%s', this peer is too busy.\n",
1896                   GNUNET_h2s (other));
1897 #endif
1898       return GNUNET_OK;
1899     }
1900   net_load_up = 50; // FIXME
1901   net_load_down = 50; // FIXME
1902   pgc->policy = ROUTING_POLICY_NONE;
1903   if ( (net_load_up < IDLE_LOAD_THRESHOLD) &&
1904        (net_load_down < IDLE_LOAD_THRESHOLD) )
1905     {
1906       pgc->policy |= ROUTING_POLICY_ALL;
1907       pgc->priority = 0; /* no charge */
1908     }
1909   else
1910     {
1911       pgc->priority = bound_priority (ntohl (gm->priority), other);
1912       if ( (net_load_up < 
1913             IDLE_LOAD_THRESHOLD + pgc->priority * pgc->priority) &&
1914            (net_load_down < 
1915             IDLE_LOAD_THRESHOLD + pgc->priority * pgc->priority) )
1916         {
1917           pgc->policy |= ROUTING_POLICY_ALL;
1918         }
1919       else
1920         {
1921           // FIXME: is this sound?
1922           if (net_load_up < 90 + 10 * pgc->priority)
1923             pgc->policy |= ROUTING_POLICY_FORWARD;
1924           if (net_load_down < 90 + 10 * pgc->priority)
1925             pgc->policy |= ROUTING_POLICY_ANSWER;
1926         }
1927     }
1928   if (pgc->policy == ROUTING_POLICY_NONE)
1929     {
1930 #if DEBUG_FS
1931       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1932                   "Dropping query from `%s', network saturated.\n",
1933                   GNUNET_h2s (other));
1934 #endif
1935       if (NULL != pgc->bf)
1936         GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
1937       GNUNET_free (pgc);
1938       return GNUNET_OK;     /* drop */
1939     }
1940   if ((pgc->policy & ROUTING_POLICY_INDIRECT) != ROUTING_POLICY_INDIRECT)
1941     pgc->priority = 0;  /* kill the priority (we cannot benefit) */
1942   pgc->ttl = bound_ttl (ntohl (gm->ttl), pgc->priority);
1943   /* decrement ttl (always) */
1944   ttl_decrement = 2 * TTL_DECREMENT +
1945     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1946                               TTL_DECREMENT);
1947   if ( (pgc->ttl < 0) &&
1948        (pgc->ttl - ttl_decrement > 0) )
1949     {
1950 #if DEBUG_FS
1951       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1952                   "Dropping query from `%s' due to TTL underflow.\n",
1953                   GNUNET_h2s (other));
1954 #endif
1955       /* integer underflow => drop (should be very rare)! */
1956       if (NULL != pgc->bf)
1957         GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
1958       GNUNET_free (pgc);
1959       return GNUNET_OK;
1960     }
1961   pgc->ttl -= ttl_decrement;
1962   pgc->start_time = GNUNET_TIME_absolute_get ();
1963   preference = (double) pgc->priority;
1964   if (preference < QUERY_BANDWIDTH_VALUE)
1965     preference = QUERY_BANDWIDTH_VALUE;
1966   // FIXME: also reserve bandwidth for reply?
1967   GNUNET_CORE_peer_configure (core,
1968                               other,
1969                               GNUNET_TIME_UNIT_FOREVER_REL,
1970                               0, 0, preference, NULL, NULL);
1971   if (0 != (pgc->policy & ROUTING_POLICY_ANSWER))
1972     pgc->drq = queue_ds_request (BASIC_DATASTORE_REQUEST_DELAY,
1973                                  &ds_get_request,
1974                                  pgc);
1975   else
1976     GNUNET_SCHEDULER_add_continuation (sched,
1977                                        GNUNET_NO,
1978                                        &forward_get_request,
1979                                        pgc,
1980                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
1981   return GNUNET_OK;
1982 }
1983
1984
1985 /**
1986  * Iterator over pending requests.
1987  *
1988  * @param cls response (struct ProcessReplyClosure)
1989  * @param key our query
1990  * @param value value in the hash map (meta-info about the query)
1991  * @return GNUNET_YES (we should continue to iterate)
1992  */
1993 static int
1994 process_reply (void *cls,
1995                const GNUNET_HashCode * key,
1996                void *value)
1997 {
1998   struct ProcessReplyClosure *prq = cls;
1999   struct PendingRequest *pr = value;
2000
2001   fprintf (stderr, "FIXME %p %p\n", prq, pr);
2002   // FIXME: forward reply to client
2003   // or other peers (depending on pr...)
2004   return GNUNET_YES;
2005 }
2006
2007
2008 /**
2009  * Handle P2P "PUT" request.
2010  *
2011  * @param cls closure, always NULL
2012  * @param peer the other peer involved (sender or receiver, NULL
2013  *        for loopback messages where we are both sender and receiver)
2014  * @param message the actual message
2015  * @return GNUNET_OK to keep the connection open,
2016  *         GNUNET_SYSERR to close it (signal serious error)
2017  */
2018 static int
2019 handle_p2p_put (void *cls,
2020                 const struct GNUNET_PeerIdentity *other,
2021                 const struct GNUNET_MessageHeader *message)
2022 {
2023   const struct PutMessage *put;
2024   uint16_t msize;
2025   size_t dsize;
2026   uint32_t type;
2027   struct GNUNET_TIME_Absolute expiration;
2028   GNUNET_HashCode query;
2029   const struct KBlock *kb;
2030   struct ProcessReplyClosure prq;
2031
2032   msize = ntohs (message->size);
2033   if (msize < sizeof (struct PutMessage))
2034     {
2035       GNUNET_break_op(0);
2036       return GNUNET_SYSERR;
2037     }
2038   put = (const struct PutMessage*) message;
2039   dsize = msize - sizeof (struct PutMessage);
2040   type = ntohl (put->type);
2041   expiration = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (put->expiration));
2042
2043   /* first, validate! */
2044   switch (type)
2045     {
2046     case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
2047     case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
2048       GNUNET_CRYPTO_hash (&put[1], dsize, &query);
2049       break;
2050     case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
2051       if (dsize < sizeof (struct KBlock))
2052         {
2053           GNUNET_break_op (0);
2054           return GNUNET_SYSERR;
2055         }
2056       kb = (const struct KBlock*) &put[1];
2057       // FIXME -- validation code below broken...
2058       if ( (dsize != ntohs (kb->purpose.size) + 42) ||
2059            (GNUNET_OK !=
2060             GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_KBLOCK,
2061                                       &kb->purpose,
2062                                       &kb->signature,
2063                                       &kb->keyspace)) )
2064         {
2065           GNUNET_break_op (0);
2066           return GNUNET_SYSERR;
2067         }
2068       GNUNET_CRYPTO_hash (&kb->keyspace,
2069                           sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
2070                           &query);
2071       break;
2072     case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
2073       // FIXME -- validate SBLOCK!
2074       GNUNET_break (0);
2075       return GNUNET_OK;
2076     case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK:
2077       // FIXME -- validate SKBLOCK!
2078       GNUNET_break (0);
2079       return GNUNET_OK;
2080     default:
2081       /* unknown block type */
2082       GNUNET_break_op (0);
2083       return GNUNET_SYSERR;
2084     }
2085
2086   /* now, lookup 'query' */
2087   prq.data = (const void*) &put[1];
2088   prq.size = dsize;
2089   prq.type = type;
2090   prq.expiration = expiration;
2091   prq.priority = 0;
2092   GNUNET_CONTAINER_multihashmap_get_multiple (request_map,
2093                                               &query,
2094                                               &process_reply,
2095                                               &prq);
2096   // FIXME: if migration is on and load is low,
2097   // queue to store data in datastore;
2098   // use "prq.priority" for that!
2099   return GNUNET_OK;
2100 }
2101
2102
2103 /**
2104  * List of handlers for P2P messages
2105  * that we care about.
2106  */
2107 static struct GNUNET_CORE_MessageHandler p2p_handlers[] =
2108   {
2109     { &handle_p2p_get, 
2110       GNUNET_MESSAGE_TYPE_FS_GET, 0 },
2111     { &handle_p2p_put, 
2112       GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
2113     { NULL, 0, 0 }
2114   };
2115
2116
2117 /**
2118  * Task that will try to initiate a connection with the
2119  * core service.
2120  * 
2121  * @param cls unused
2122  * @param tc unused
2123  */
2124 static void
2125 core_connect_task (void *cls,
2126                    const struct GNUNET_SCHEDULER_TaskContext *tc);
2127
2128
2129 /**
2130  * Function called by the core after we've
2131  * connected.
2132  */
2133 static void
2134 core_start_cb (void *cls,
2135                struct GNUNET_CORE_Handle * server,
2136                const struct GNUNET_PeerIdentity *
2137                my_identity,
2138                const struct
2139                GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *
2140                publicKey)
2141 {
2142   if (server == NULL)
2143     {
2144       GNUNET_SCHEDULER_add_delayed (sched,
2145                                     GNUNET_NO,
2146                                     GNUNET_SCHEDULER_PRIORITY_HIGH,
2147                                     GNUNET_SCHEDULER_NO_TASK,
2148                                     GNUNET_TIME_UNIT_SECONDS,
2149                                     &core_connect_task,
2150                                     NULL);
2151       return;
2152     }
2153   core = server;
2154 }
2155
2156
2157 /**
2158  * Task that will try to initiate a connection with the
2159  * core service.
2160  * 
2161  * @param cls unused
2162  * @param tc unused
2163  */
2164 static void
2165 core_connect_task (void *cls,
2166                    const struct GNUNET_SCHEDULER_TaskContext *tc)
2167 {
2168   GNUNET_CORE_connect (sched,
2169                        cfg,
2170                        GNUNET_TIME_UNIT_FOREVER_REL,
2171                        NULL,
2172                        &core_start_cb,
2173                        NULL,
2174                        &peer_disconnect_handler,
2175                        NULL, 
2176                        NULL, GNUNET_NO,
2177                        NULL, GNUNET_NO,
2178                        p2p_handlers);
2179 }
2180
2181
2182 /**
2183  * Process fs requests.
2184  *
2185  * @param cls closure
2186  * @param sched scheduler to use
2187  * @param server the initialized server
2188  * @param cfg configuration to use
2189  */
2190 static void
2191 run (void *cls,
2192      struct GNUNET_SCHEDULER_Handle *s,
2193      struct GNUNET_SERVER_Handle *server,
2194      const struct GNUNET_CONFIGURATION_Handle *c)
2195 {
2196   sched = s;
2197   cfg = c;
2198
2199   ifm = GNUNET_CONTAINER_multihashmap_create (128);
2200   request_map = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
2201   read_index_list ();
2202   dsh = GNUNET_DATASTORE_connect (cfg,
2203                                   sched);
2204   if (NULL == dsh)
2205     {
2206       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2207                   _("Failed to connect to datastore service.\n"));
2208       return;
2209     }
2210   GNUNET_SERVER_disconnect_notify (server, 
2211                                    &handle_client_disconnect,
2212                                    NULL);
2213   GNUNET_SERVER_add_handlers (server, handlers);
2214   core_connect_task (NULL, NULL);
2215   GNUNET_SCHEDULER_add_delayed (sched,
2216                                 GNUNET_YES,
2217                                 GNUNET_SCHEDULER_PRIORITY_IDLE,
2218                                 GNUNET_SCHEDULER_NO_TASK,
2219                                 GNUNET_TIME_UNIT_FOREVER_REL,
2220                                 &shutdown_task,
2221                                 NULL);
2222 }
2223
2224
2225 /**
2226  * The main function for the fs service.
2227  *
2228  * @param argc number of arguments from the command line
2229  * @param argv command line arguments
2230  * @return 0 ok, 1 on error
2231  */
2232 int
2233 main (int argc, char *const *argv)
2234 {
2235   return (GNUNET_OK ==
2236           GNUNET_SERVICE_run (argc,
2237                               argv,
2238                               "fs", &run, NULL, NULL, NULL)) ? 0 : 1;
2239 }
2240
2241 /* end of gnunet-service-fs.c */