fs hacking
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs.c
23  * @brief program that provides the file-sharing service
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - tracking of PendingRequests (and defining that struct...)
28  * - setup P2P search on CS request
29  * - setup P2P search on P2P GET
30  * - forward replies based on tracked requests
31  * - validation of KBLOCKS (almost done)
32  * - validation of SBLOCKS
33  * - validation of KSBLOCKS
34  * - content migration (put in local DS)
35  * - possible major issue: we may
36  *   queue "gazillions" of (K|S)Blocks for the
37  *   core to transmit to another peer; need
38  *   to make sure this is bounded overall...
39  * - various load-based actions (can wait)
40  * - remove on-demand blocks if they keep failing (can wait)
41  */
42 #include "platform.h"
43 #include "gnunet_core_service.h"
44 #include "gnunet_datastore_service.h"
45 #include "gnunet_protocols.h"
46 #include "gnunet_signatures.h"
47 #include "gnunet_util_lib.h"
48 #include "fs.h"
49
50
51 /**
52  * In-memory information about indexed files (also available
53  * on-disk).
54  */
55 struct IndexInfo
56 {
57   
58   /**
59    * This is a linked list.
60    */
61   struct IndexInfo *next;
62
63   /**
64    * Name of the indexed file.  Memory allocated
65    * at the end of this struct (do not free).
66    */
67   const char *filename;
68
69   /**
70    * Context for transmitting confirmation to client,
71    * NULL if we've done this already.
72    */
73   struct GNUNET_SERVER_TransmitContext *tc;
74   
75   /**
76    * Hash of the contents of the file.
77    */
78   GNUNET_HashCode file_id;
79
80 };
81
82
83 /**
84  * Signature of a function that is called whenever a datastore
85  * request can be processed (or an entry put on the queue times out).
86  *
87  * @param cls closure
88  * @param ok GNUNET_OK if DS is ready, GNUNET_SYSERR on timeout
89  */
90 typedef void (*RequestFunction)(void *cls,
91                                 int ok);
92
93
94 /**
95  * Doubly-linked list of our requests for the datastore.
96  */
97 struct DatastoreRequestQueue
98 {
99
100   /**
101    * This is a doubly-linked list.
102    */
103   struct DatastoreRequestQueue *next;
104
105   /**
106    * This is a doubly-linked list.
107    */
108   struct DatastoreRequestQueue *prev;
109
110   /**
111    * Function to call (will issue the request).
112    */
113   RequestFunction req;
114
115   /**
116    * Closure for req.
117    */
118   void *req_cls;
119
120   /**
121    * When should this request time-out because we don't care anymore?
122    */
123   struct GNUNET_TIME_Absolute timeout;
124     
125   /**
126    * ID of task used for signaling timeout.
127    */
128   GNUNET_SCHEDULER_TaskIdentifier task;
129
130 };
131
132
133 /**
134  * Closure for processing START_SEARCH messages from a client.
135  */
136 struct LocalGetContext
137 {
138
139   /**
140    * This is a doubly-linked list.
141    */
142   struct LocalGetContext *next;
143
144   /**
145    * This is a doubly-linked list.
146    */
147   struct LocalGetContext *prev;
148
149   /**
150    * Client that initiated the search.
151    */
152   struct GNUNET_SERVER_Client *client;
153
154   /**
155    * Array of results that we've already received 
156    * (can be NULL).
157    */
158   GNUNET_HashCode *results; 
159
160   /**
161    * Bloomfilter over all results (for fast query construction);
162    * NULL if we don't have any results.
163    */
164   struct GNUNET_CONTAINER_BloomFilter *results_bf; 
165
166   /**
167    * DS request associated with this operation.
168    */
169   struct DatastoreRequestQueue *req;
170
171   /**
172    * Current result message to transmit to client (or NULL).
173    */
174   struct ContentMessage *result;
175   
176   /**
177    * Type of the content that we're looking for.
178    * 0 for any.
179    */
180   uint32_t type;
181
182   /**
183    * Desired anonymity level.
184    */
185   uint32_t anonymity_level;
186
187   /**
188    * Number of results actually stored in the results array.
189    */
190   unsigned int results_used;
191   
192   /**
193    * Size of the results array in memory.
194    */
195   unsigned int results_size;
196
197   /**
198    * If the request is for a DBLOCK or IBLOCK, this is the identity of
199    * the peer that is known to have a response.  Set to all-zeros if
200    * such a target is not known (note that even if OUR anonymity
201    * level is >0 we may happen to know the responder's identity;
202    * nevertheless, we should probably not use it for a DHT-lookup
203    * or similar blunt actions in order to avoid exposing ourselves).
204    * <p>
205    * If the request is for an SBLOCK, this is the identity of the
206    * pseudonym to which the SBLOCK belongs. 
207    * <p>
208    * If the request is for a KBLOCK, "target" must be all zeros.
209    */
210   GNUNET_HashCode target;
211
212   /**
213    * Hash of the keyword (aka query) for KBLOCKs; Hash of
214    * the CHK-encoded block for DBLOCKS and IBLOCKS (aka query)
215    * and hash of the identifier XORed with the target for
216    * SBLOCKS (aka query).
217    */
218   GNUNET_HashCode query;
219
220 };
221
222
223 /**
224  * Possible routing policies for an FS-GET request.
225  */
226 enum RoutingPolicy
227   {
228     /**
229      * Simply drop the request.
230      */
231     ROUTING_POLICY_NONE = 0,
232     
233     /**
234      * Answer it if we can from local datastore.
235      */
236     ROUTING_POLICY_ANSWER = 1,
237
238     /**
239      * Forward the request to other peers (if possible).
240      */
241     ROUTING_POLICY_FORWARD = 2,
242
243     /**
244      * Forward to other peers, and ask them to route
245      * the response via ourselves.
246      */
247     ROUTING_POLICY_INDIRECT = 6,
248     
249     /**
250      * Do everything we could possibly do (that would
251      * make sense).
252      */
253     ROUTING_POLICY_ALL = 7
254   };
255
256
257 /**
258  * Internal context we use for our initial processing
259  * of a GET request.
260  */
261 struct ProcessGetContext
262 {
263   /**
264    * The search query (used for datastore lookup).
265    */
266   GNUNET_HashCode query;
267   
268   /**
269    * Which peer we should forward the response to.
270    */
271   struct GNUNET_PeerIdentity reply_to;
272
273   /**
274    * Namespace for the result (only set for SKS requests)
275    */
276   GNUNET_HashCode namespace;
277
278   /**
279    * Peer that we should forward the query to if possible
280    * (since that peer likely has the content).
281    */
282   struct GNUNET_PeerIdentity prime_target;
283
284   /**
285    * When did we receive this request?
286    */
287   struct GNUNET_TIME_Absolute start_time;
288
289   /**
290    * Our entry in the DRQ (non-NULL while we wait for our
291    * turn to interact with the local database).
292    */
293   struct DatastoreRequestQueue *drq;
294
295   /**
296    * Filter used to eliminate duplicate
297    * results.   Can be NULL if we are
298    * not yet filtering any results.
299    */
300   struct GNUNET_CONTAINER_BloomFilter *bf;
301
302   /**
303    * Bitmap describing which of the optional
304    * hash codes / peer identities were given to us.
305    */
306   uint32_t bm;
307
308   /**
309    * Desired block type.
310    */
311   uint32_t type;
312
313   /**
314    * Priority of the request.
315    */
316   uint32_t priority;
317
318   /**
319    * In what ways are we going to process
320    * the request?
321    */
322   enum RoutingPolicy policy;
323
324   /**
325    * Time-to-live for the request (value
326    * we use).
327    */
328   int32_t ttl;
329
330   /**
331    * Number to mingle hashes for bloom-filter
332    * tests with.
333    */
334   int32_t mingle;
335
336   /**
337    * Number of results that were found so far.
338    */
339   unsigned int results_found;
340 };
341
342
343 /**
344  * Information we keep for each pending
345  * request.
346  */
347 struct PendingRequest
348 {
349   // FIXME
350 };
351
352
353 /**
354  * Closure for "process_reply" function.
355  */
356 struct ProcessReplyClosure
357 {
358   /**
359    * The data for the reply.
360    */
361   const void *data;
362
363   /**
364    * When the reply expires.
365    */
366   struct GNUNET_TIME_Absolute expiration;
367
368   /**
369    * Size of data.
370    */
371   size_t size;
372
373   /**
374    * Type of the block.
375    */
376   uint32_t type;
377
378   /**
379    * How much was this reply worth to us?
380    */
381   uint32_t priority;
382 };
383
384
385 /**
386  * Map from queries to pending requests ("struct PendingRequest") for
387  * this query.
388  */
389 static struct GNUNET_CONTAINER_MultiHashMap *request_map;
390
391 /**
392  * Our connection to the datastore.
393  */
394 static struct GNUNET_DATASTORE_Handle *dsh;
395
396 /**
397  * Our scheduler.
398  */
399 static struct GNUNET_SCHEDULER_Handle *sched;
400
401 /**
402  * Our configuration.
403  */
404 const struct GNUNET_CONFIGURATION_Handle *cfg;
405
406 /**
407  * Handle to the core service (NULL until we've
408  * connected to it).
409  */
410 struct GNUNET_CORE_Handle *core;
411
412 /**
413  * Head of doubly-linked LGC list.
414  */
415 static struct LocalGetContext *lgc_head;
416
417 /**
418  * Tail of doubly-linked LGC list.
419  */
420 static struct LocalGetContext *lgc_tail;
421
422 /**
423  * Head of request queue for the datastore, sorted by timeout.
424  */
425 static struct DatastoreRequestQueue *drq_head;
426
427 /**
428  * Tail of request queue for the datastore.
429  */
430 static struct DatastoreRequestQueue *drq_tail;
431
432 /**
433  * Linked list of indexed files.
434  */
435 static struct IndexInfo *indexed_files;
436
437 /**
438  * Maps hash over content of indexed files
439  * to the respective filename.  The filenames
440  * are pointers into the indexed_files linked
441  * list and do not need to be freed.
442  */
443 static struct GNUNET_CONTAINER_MultiHashMap *ifm;
444
445
446 /**
447  * Write the current index information list to disk.
448  */ 
449 static void
450 write_index_list ()
451 {
452   struct GNUNET_BIO_WriteHandle *wh;
453   char *fn;
454   struct IndexInfo *pos;  
455
456   if (GNUNET_OK !=
457       GNUNET_CONFIGURATION_get_value_filename (cfg,
458                                                "FS",
459                                                "INDEXDB",
460                                                &fn))
461     {
462       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
463                   _("Configuration option `%s' in section `%s' missing.\n"),
464                   "INDEXDB",
465                   "FS");
466       return;
467     }
468   wh = GNUNET_BIO_write_open (fn);
469   if (NULL == wh)
470     {
471       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
472                   _("Could not open `%s'.\n"),
473                   fn);
474       GNUNET_free (fn);
475       return;
476     }
477   pos = indexed_files;
478   while (pos != NULL)
479     {
480       if ( (GNUNET_OK !=
481             GNUNET_BIO_write (wh,
482                               &pos->file_id,
483                               sizeof (GNUNET_HashCode))) ||
484            (GNUNET_OK !=
485             GNUNET_BIO_write_string (wh,
486                                      pos->filename)) )
487         break;
488       pos = pos->next;
489     }
490   if (GNUNET_OK != 
491       GNUNET_BIO_write_close (wh))
492     {
493       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
494                   _("Error writing `%s'.\n"),
495                   fn);
496       GNUNET_free (fn);
497       return;
498     }
499   GNUNET_free (fn);
500 }
501
502
503 /**
504  * Read index information from disk.
505  */
506 static void
507 read_index_list ()
508 {
509   struct GNUNET_BIO_ReadHandle *rh;
510   char *fn;
511   struct IndexInfo *pos;  
512   char *fname;
513   GNUNET_HashCode hc;
514   size_t slen;
515   char *emsg;
516
517   if (GNUNET_OK !=
518       GNUNET_CONFIGURATION_get_value_filename (cfg,
519                                                "FS",
520                                                "INDEXDB",
521                                                &fn))
522     {
523       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
524                   _("Configuration option `%s' in section `%s' missing.\n"),
525                   "INDEXDB",
526                   "FS");
527       return;
528     }
529   rh = GNUNET_BIO_read_open (fn);
530   if (NULL == rh)
531     {
532       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
533                   _("Could not open `%s'.\n"),
534                   fn);
535       GNUNET_free (fn);
536       return;
537     }
538
539   while ( (GNUNET_OK ==
540            GNUNET_BIO_read (rh,
541                             "Hash of indexed file",
542                             &hc,
543                             sizeof (GNUNET_HashCode))) &&
544           (GNUNET_OK ==
545            GNUNET_BIO_read_string (rh, 
546                                    "Name of indexed file",
547                                    &fname,
548                                    1024 * 16)) )
549     {
550       slen = strlen (fname) + 1;
551       pos = GNUNET_malloc (sizeof (struct IndexInfo) + slen);
552       pos->file_id = hc;
553       pos->filename = (const char *) &pos[1];
554       memcpy (&pos[1], fname, slen);
555       if (GNUNET_SYSERR ==
556           GNUNET_CONTAINER_multihashmap_put (ifm,
557                                              &hc,
558                                              (void*) pos->filename,
559                                              GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
560         {
561           GNUNET_free (pos);
562         }
563       else
564         {
565           pos->next = indexed_files;
566           indexed_files = pos;
567         }
568     }
569   if (GNUNET_OK != 
570       GNUNET_BIO_read_close (rh, &emsg))
571     GNUNET_free (emsg);
572   GNUNET_free (fn);
573 }
574
575
576 /**
577  * We've validated the hash of the file we're about to
578  * index.  Signal success to the client and update
579  * our internal data structures.
580  *
581  * @param ii the index info entry for the request
582  */
583 static void
584 signal_index_ok (struct IndexInfo *ii)
585 {
586   if (GNUNET_SYSERR ==
587       GNUNET_CONTAINER_multihashmap_put (ifm,
588                                          &ii->file_id,
589                                          (void*) ii->filename,
590                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
591     {
592       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
593                   _("Index request received for file `%s' is indexed as `%s'.  Permitting anyway.\n"),
594                   ii->filename,
595                   (const char*) GNUNET_CONTAINER_multihashmap_get (ifm,
596                                                                    &ii->file_id));
597       GNUNET_SERVER_transmit_context_append (ii->tc,
598                                              NULL, 0,
599                                              GNUNET_MESSAGE_TYPE_FS_INDEX_START_OK);
600       GNUNET_SERVER_transmit_context_run (ii->tc,
601                                           GNUNET_TIME_UNIT_MINUTES);
602       GNUNET_free (ii);
603       return;
604     }
605   ii->next = indexed_files;
606   indexed_files = ii;
607   write_index_list ();
608   GNUNET_SERVER_transmit_context_append (ii->tc,
609                                          NULL, 0,
610                                          GNUNET_MESSAGE_TYPE_FS_INDEX_START_OK);
611   GNUNET_SERVER_transmit_context_run (ii->tc,
612                                       GNUNET_TIME_UNIT_MINUTES);
613   ii->tc = NULL;
614 }
615
616
617 /**
618  * Function called once the hash computation over an
619  * indexed file has completed.
620  *
621  * @param cls closure, our publishing context
622  * @param res resulting hash, NULL on error
623  */
624 static void 
625 hash_for_index_val (void *cls,
626                     const GNUNET_HashCode *
627                     res)
628 {
629   struct IndexInfo *ii = cls;
630   
631   if ( (res == NULL) ||
632        (0 != memcmp (res,
633                      &ii->file_id,
634                      sizeof(GNUNET_HashCode))) )
635     {
636       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
637                   _("Hash mismatch trying to index file `%s'\n"),
638                   ii->filename);
639       GNUNET_SERVER_transmit_context_append (ii->tc,
640                                              NULL, 0,
641                                              GNUNET_MESSAGE_TYPE_FS_INDEX_START_FAILED);
642       GNUNET_SERVER_transmit_context_run (ii->tc,
643                                           GNUNET_TIME_UNIT_MINUTES);
644       GNUNET_free (ii);
645       return;
646     }
647   signal_index_ok (ii);
648 }
649
650
651 /**
652  * Handle INDEX_START-message.
653  *
654  * @param cls closure
655  * @param client identification of the client
656  * @param message the actual message
657  */
658 static void
659 handle_index_start (void *cls,
660                     struct GNUNET_SERVER_Client *client,
661                     const struct GNUNET_MessageHeader *message)
662 {
663   const struct IndexStartMessage *ism;
664   const char *fn;
665   uint16_t msize;
666   struct IndexInfo *ii;
667   size_t slen;
668   uint32_t dev;
669   uint64_t ino;
670   uint32_t mydev;
671   uint64_t myino;
672
673   msize = ntohs(message->size);
674   if ( (msize <= sizeof (struct IndexStartMessage)) ||
675        ( ((const char *)message)[msize-1] != '\0') )
676     {
677       GNUNET_break (0);
678       GNUNET_SERVER_receive_done (client,
679                                   GNUNET_SYSERR);
680       return;
681     }
682   ism = (const struct IndexStartMessage*) message;
683   fn = (const char*) &ism[1];
684   dev = ntohl (ism->device);
685   ino = GNUNET_ntohll (ism->inode);
686   ism = (const struct IndexStartMessage*) message;
687   slen = strlen (fn) + 1;
688   ii = GNUNET_malloc (sizeof (struct IndexInfo) + slen);
689   ii->filename = (const char*) &ii[1];
690   memcpy (&ii[1], fn, slen);
691   ii->file_id = ism->file_id;  
692   ii->tc = GNUNET_SERVER_transmit_context_create (client);
693   if ( ( (dev != 0) ||
694          (ino != 0) ) &&
695        (GNUNET_OK == GNUNET_DISK_file_get_identifiers (fn,
696                                                        &mydev,
697                                                        &myino)) &&
698        ( (dev == mydev) &&
699          (ino == myino) ) )
700     {      
701       /* fast validation OK! */
702       signal_index_ok (ii);
703       return;
704     }
705   /* slow validation, need to hash full file (again) */
706   GNUNET_CRYPTO_hash_file (sched,
707                            GNUNET_SCHEDULER_PRIORITY_IDLE,
708                            GNUNET_NO,
709                            fn,
710                            HASHING_BLOCKSIZE,
711                            &hash_for_index_val,
712                            ii);
713 }
714
715
716 /**
717  * Handle INDEX_LIST_GET-message.
718  *
719  * @param cls closure
720  * @param client identification of the client
721  * @param message the actual message
722  */
723 static void
724 handle_index_list_get (void *cls,
725                        struct GNUNET_SERVER_Client *client,
726                        const struct GNUNET_MessageHeader *message)
727 {
728   struct GNUNET_SERVER_TransmitContext *tc;
729   struct IndexInfoMessage *iim;
730   char buf[GNUNET_SERVER_MAX_MESSAGE_SIZE];
731   size_t slen;
732   const char *fn;
733   struct GNUNET_MessageHeader *msg;
734   struct IndexInfo *pos;
735
736   tc = GNUNET_SERVER_transmit_context_create (client);
737   iim = (struct IndexInfoMessage*) buf;
738   msg = &iim->header;
739   pos = indexed_files;
740   while (NULL != pos)
741     {
742       iim->reserved = 0;
743       iim->file_id = pos->file_id;
744       fn = pos->filename;
745       slen = strlen (fn) + 1;
746       if (slen + sizeof (struct IndexInfoMessage) > 
747           GNUNET_SERVER_MAX_MESSAGE_SIZE)
748         {
749           GNUNET_break (0);
750           break;
751         }
752       memcpy (&iim[1], fn, slen);
753       GNUNET_SERVER_transmit_context_append
754         (tc,
755          &msg[1],
756          sizeof (struct IndexInfoMessage) 
757          - sizeof (struct GNUNET_MessageHeader) + slen,
758          GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_ENTRY);
759       pos = pos->next;
760     }
761   GNUNET_SERVER_transmit_context_append (tc,
762                                          NULL, 0,
763                                          GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_END);
764   GNUNET_SERVER_transmit_context_run (tc,
765                                       GNUNET_TIME_UNIT_MINUTES);
766 }
767
768
769 /**
770  * Handle UNINDEX-message.
771  *
772  * @param cls closure
773  * @param client identification of the client
774  * @param message the actual message
775  */
776 static void
777 handle_unindex (void *cls,
778                 struct GNUNET_SERVER_Client *client,
779                 const struct GNUNET_MessageHeader *message)
780 {
781   const struct UnindexMessage *um;
782   struct IndexInfo *pos;
783   struct IndexInfo *prev;
784   struct IndexInfo *next;
785   struct GNUNET_SERVER_TransmitContext *tc;
786   int found;
787   
788   um = (const struct UnindexMessage*) message;
789   found = GNUNET_NO;
790   prev = NULL;
791   pos = indexed_files;
792   while (NULL != pos)
793     {
794       next = pos->next;
795       if (0 == memcmp (&pos->file_id,
796                        &um->file_id,
797                        sizeof (GNUNET_HashCode)))
798         {
799           if (prev == NULL)
800             indexed_files = pos->next;
801           else
802             prev->next = pos->next;
803           GNUNET_free (pos);
804           found = GNUNET_YES;
805         }
806       else
807         {
808           prev = pos;
809         }
810       pos = next;
811     }
812   if (GNUNET_YES == found)
813     write_index_list ();
814   tc = GNUNET_SERVER_transmit_context_create (client);
815   GNUNET_SERVER_transmit_context_append (tc,
816                                          NULL, 0,
817                                          GNUNET_MESSAGE_TYPE_FS_UNINDEX_OK);
818   GNUNET_SERVER_transmit_context_run (tc,
819                                       GNUNET_TIME_UNIT_MINUTES);
820 }
821
822
823 /**
824  * Run the next DS request in our
825  * queue, we're done with the current one.
826  */
827 static void
828 next_ds_request ()
829 {
830   struct DatastoreRequestQueue *e;
831   
832   while (NULL != (e = drq_head))
833     {
834       if (0 != GNUNET_TIME_absolute_get_remaining (e->timeout).value)
835         break;
836       if (e->task != GNUNET_SCHEDULER_NO_TASK)
837         GNUNET_SCHEDULER_cancel (sched, e->task);
838       GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
839       e->req (e->req_cls, GNUNET_NO);
840       GNUNET_free (e);  
841     }
842   if (e == NULL)
843     return;
844   if (e->task != GNUNET_SCHEDULER_NO_TASK)
845     GNUNET_SCHEDULER_cancel (sched, e->task);
846   e->task = GNUNET_SCHEDULER_NO_TASK;
847   e->req (e->req_cls, GNUNET_YES);
848   GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
849   GNUNET_free (e);  
850 }
851
852
853 /**
854  * A datastore request had to be timed out. 
855  *
856  * @param cls closure (of type "struct DatastoreRequestQueue*")
857  * @param tc task context, unused
858  */
859 static void
860 timeout_ds_request (void *cls,
861                     const struct GNUNET_SCHEDULER_TaskContext *tc)
862 {
863   struct DatastoreRequestQueue *e = cls;
864
865   e->task = GNUNET_SCHEDULER_NO_TASK;
866   GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
867   e->req (e->req_cls, GNUNET_NO);
868   GNUNET_free (e);  
869 }
870
871
872 /**
873  * Queue a request for the datastore.
874  *
875  * @param deadline by when the request should run
876  * @param fun function to call once the request can be run
877  * @param fun_cls closure for fun
878  */
879 static struct DatastoreRequestQueue *
880 queue_ds_request (struct GNUNET_TIME_Relative deadline,
881                   RequestFunction fun,
882                   void *fun_cls)
883 {
884   struct DatastoreRequestQueue *e;
885   struct DatastoreRequestQueue *bef;
886
887   if (drq_head == NULL)
888     {
889       /* no other requests pending, run immediately */
890       fun (fun_cls, GNUNET_OK);
891       return NULL;
892     }
893   e = GNUNET_malloc (sizeof (struct DatastoreRequestQueue));
894   e->timeout = GNUNET_TIME_relative_to_absolute (deadline);
895   e->req = fun;
896   e->req_cls = fun_cls;
897   if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
898     {
899       /* local request, highest prio, put at head of queue
900          regardless of deadline */
901       bef = NULL;
902     }
903   else
904     {
905       bef = drq_tail;
906       while ( (NULL != bef) &&
907               (e->timeout.value < bef->timeout.value) )
908         bef = bef->prev;
909     }
910   GNUNET_CONTAINER_DLL_insert_after (drq_head, drq_tail, bef, e);
911   if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
912     return e;
913   e->task = GNUNET_SCHEDULER_add_delayed (sched,
914                                           GNUNET_NO,
915                                           GNUNET_SCHEDULER_PRIORITY_BACKGROUND,
916                                           GNUNET_SCHEDULER_NO_TASK,
917                                           deadline,
918                                           &timeout_ds_request,
919                                           e);
920   return e;                                    
921 }
922
923
924 /**
925  * Free the state associated with a local get context.
926  *
927  * @param lgc the lgc to free
928  */
929 static void
930 local_get_context_free (struct LocalGetContext *lgc) 
931 {
932   GNUNET_CONTAINER_DLL_remove (lgc_head, lgc_tail, lgc);
933   GNUNET_SERVER_client_drop (lgc->client); 
934   GNUNET_free_non_null (lgc->results);
935   if (lgc->results_bf != NULL)
936     GNUNET_CONTAINER_bloomfilter_free (lgc->results_bf);
937   if (lgc->req != NULL)
938     {
939       if (lgc->req->task != GNUNET_SCHEDULER_NO_TASK)
940         GNUNET_SCHEDULER_cancel (sched, lgc->req->task);
941       GNUNET_CONTAINER_DLL_remove (lgc_head, lgc_tail, lgc);
942       GNUNET_free (lgc->req);
943     }
944   GNUNET_free (lgc);
945 }
946
947
948 /**
949  * We're able to transmit the next (local) result to the client.
950  * Do it and ask the datastore for more.  Or, on error, tell
951  * the datastore to stop giving us more.
952  *
953  * @param cls our closure (struct LocalGetContext)
954  * @param max maximum number of bytes we can transmit
955  * @param buf where to copy our message
956  * @return number of bytes copied to buf
957  */
958 static size_t
959 transmit_local_result (void *cls,
960                        size_t max,
961                        void *buf)
962 {
963   struct LocalGetContext *lgc = cls;  
964   uint16_t msize;
965
966   if (NULL == buf)
967     {
968       /* error, abort! */
969       GNUNET_free (lgc->result);
970       lgc->result = NULL;
971       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
972       return 0;
973     }
974   msize = ntohs (lgc->result->header.size);
975   GNUNET_assert (max >= msize);
976   memcpy (buf, lgc->result, msize);
977   GNUNET_free (lgc->result);
978   lgc->result = NULL;
979   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
980   return msize;
981 }
982
983
984 /**
985  * Continuation called from datastore's remove
986  * function.
987  *
988  * @param cls unused
989  * @param success did the deletion work?
990  * @param msg error message
991  */
992 static void
993 remove_cont (void *cls,
994              int success,
995              const char *msg)
996 {
997   if (GNUNET_OK != success)
998     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
999                 _("Failed to delete bogus block: %s\n"),
1000                 msg);
1001   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1002 }
1003
1004
1005 /**
1006  * Mingle hash with the mingle_number to
1007  * produce different bits.
1008  */
1009 static void
1010 mingle_hash (const GNUNET_HashCode * in,
1011              int32_t mingle_number, 
1012              GNUNET_HashCode * hc)
1013 {
1014   GNUNET_HashCode m;
1015
1016   GNUNET_CRYPTO_hash (&mingle_number, 
1017                       sizeof (int32_t), 
1018                       &m);
1019   GNUNET_CRYPTO_hash_xor (&m, in, hc);
1020 }
1021
1022
1023 /**
1024  * We've received an on-demand encoded block
1025  * from the datastore.  Attempt to do on-demand
1026  * encoding and (if successful), call the 
1027  * continuation with the resulting block.  On
1028  * error, clean up and ask the datastore for
1029  * more results.
1030  *
1031  * @param key key for the content
1032  * @param size number of bytes in data
1033  * @param data content stored
1034  * @param type type of the content
1035  * @param priority priority of the content
1036  * @param anonymity anonymity-level for the content
1037  * @param expiration expiration time for the content
1038  * @param uid unique identifier for the datum;
1039  *        maybe 0 if no unique identifier is available
1040  * @param cont function to call with the actual block
1041  * @param cont_cls closure for cont
1042  */
1043 static void
1044 handle_on_demand_block (const GNUNET_HashCode * key,
1045                         uint32_t size,
1046                         const void *data,
1047                         uint32_t type,
1048                         uint32_t priority,
1049                         uint32_t anonymity,
1050                         struct GNUNET_TIME_Absolute
1051                         expiration, uint64_t uid,
1052                         GNUNET_DATASTORE_Iterator cont,
1053                         void *cont_cls)
1054 {
1055   const struct OnDemandBlock *odb;
1056   GNUNET_HashCode nkey;
1057   struct GNUNET_CRYPTO_AesSessionKey skey;
1058   struct GNUNET_CRYPTO_AesInitializationVector iv;
1059   GNUNET_HashCode query;
1060   ssize_t nsize;
1061   char ndata[DBLOCK_SIZE];
1062   char edata[DBLOCK_SIZE];
1063   const char *fn;
1064   struct GNUNET_DISK_FileHandle *fh;
1065   uint64_t off;
1066
1067   if (size != sizeof (struct OnDemandBlock))
1068     {
1069       GNUNET_break (0);
1070       GNUNET_DATASTORE_remove (dsh, 
1071                                key,
1072                                size,
1073                                data,
1074                                &remove_cont,
1075                                NULL,
1076                                GNUNET_TIME_UNIT_FOREVER_REL);     
1077       return;
1078     }
1079   odb = (const struct OnDemandBlock*) data;
1080   off = GNUNET_ntohll (odb->offset);
1081   fn = (const char*) GNUNET_CONTAINER_multihashmap_get (ifm,
1082                                                         &odb->file_id);
1083   if ( (NULL == fn) ||
1084        (NULL == (fh = GNUNET_DISK_file_open (fn, 
1085                                              GNUNET_DISK_OPEN_READ))) ||
1086        (off !=
1087         GNUNET_DISK_file_seek (fh,
1088                                off,
1089                                GNUNET_DISK_SEEK_SET)) ||
1090        (-1 ==
1091         (nsize = GNUNET_DISK_file_read (fh,
1092                                         ndata,
1093                                         sizeof (ndata)))) )
1094     {
1095       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1096                   _("Could not access indexed file `%s' at offset %llu: %s\n"),
1097                   GNUNET_h2s (&odb->file_id),
1098                   (unsigned long long) off,
1099                   STRERROR (errno));
1100       if (fh != NULL)
1101         GNUNET_DISK_file_close (fh);
1102       /* FIXME: if this happens often, we need
1103          to remove the OnDemand block from the DS! */
1104       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);        
1105       return;
1106     }
1107   GNUNET_DISK_file_close (fh);
1108   GNUNET_CRYPTO_hash (ndata,
1109                       nsize,
1110                       &nkey);
1111   GNUNET_CRYPTO_hash_to_aes_key (&nkey, &skey, &iv);
1112   GNUNET_CRYPTO_aes_encrypt (ndata,
1113                              nsize,
1114                              &skey,
1115                              &iv,
1116                              edata);
1117   GNUNET_CRYPTO_hash (edata,
1118                       nsize,
1119                       &query);
1120   if (0 != memcmp (&query, 
1121                    key,
1122                    sizeof (GNUNET_HashCode)))
1123     {
1124       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1125                   _("Indexed file `%s' changed at offset %llu\n"),
1126                   fn,
1127                   (unsigned long long) off);
1128       /* FIXME: if this happens often, we need
1129          to remove the OnDemand block from the DS! */
1130       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1131       return;
1132     }
1133   cont (cont_cls,
1134         key,
1135         nsize,
1136         edata,
1137         GNUNET_DATASTORE_BLOCKTYPE_DBLOCK,
1138         priority,
1139         anonymity,
1140         expiration,
1141         uid);
1142 }
1143
1144
1145 /**
1146  * We're processing (local) results for a search request
1147  * from a (local) client.  Pass applicable results to the
1148  * client and if we are done either clean up (operation
1149  * complete) or switch to P2P search (more results possible).
1150  *
1151  * @param cls our closure (struct LocalGetContext)
1152  * @param key key for the content
1153  * @param size number of bytes in data
1154  * @param data content stored
1155  * @param type type of the content
1156  * @param priority priority of the content
1157  * @param anonymity anonymity-level for the content
1158  * @param expiration expiration time for the content
1159  * @param uid unique identifier for the datum;
1160  *        maybe 0 if no unique identifier is available
1161  */
1162 static void
1163 process_local_get_result (void *cls,
1164                           const GNUNET_HashCode * key,
1165                           uint32_t size,
1166                           const void *data,
1167                           uint32_t type,
1168                           uint32_t priority,
1169                           uint32_t anonymity,
1170                           struct GNUNET_TIME_Absolute
1171                           expiration, 
1172                           uint64_t uid)
1173 {
1174   struct LocalGetContext *lgc = cls;
1175   size_t msize;
1176   unsigned int i;
1177
1178   if (key == NULL)
1179     {
1180       /* no further results from datastore; continue
1181          processing further requests from the client and
1182          allow the next task to use the datastore; also,
1183          switch to P2P requests or clean up our state. */
1184       next_ds_request ();
1185       GNUNET_SERVER_receive_done (lgc->client,
1186                                   GNUNET_OK);
1187       if ( (lgc->results_used == 0) ||
1188            (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
1189            (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
1190            (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) )
1191         {
1192           // FIXME: initiate P2P search
1193           return;
1194         }
1195       /* got all possible results, clean up! */
1196       local_get_context_free (lgc);
1197       return;
1198     }
1199   if (type == GNUNET_DATASTORE_BLOCKTYPE_ONDEMAND)
1200     {
1201       handle_on_demand_block (key, size, data, type, priority, 
1202                               anonymity, expiration, uid,
1203                               &process_local_get_result,
1204                               lgc);
1205       return;
1206     }
1207   if (type != lgc->type)
1208     {
1209       /* this should be virtually impossible to reach (DBLOCK 
1210          query hash being identical to KBLOCK/SBLOCK query hash);
1211          nevertheless, if it happens, the correct thing is to
1212          simply skip the result. */
1213       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);        
1214       return;
1215     }
1216   /* check if this is a result we've alredy
1217      received */
1218   for (i=0;i<lgc->results_used;i++)
1219     if (0 == memcmp (key,
1220                      &lgc->results[i],
1221                      sizeof (GNUNET_HashCode)))
1222       {
1223         GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1224         return; 
1225       }
1226   if (lgc->results_used == lgc->results_size)
1227     GNUNET_array_grow (lgc->results,
1228                        lgc->results_size,
1229                        lgc->results_size * 2 + 2);
1230   GNUNET_CRYPTO_hash (data, 
1231                       size, 
1232                       &lgc->results[lgc->results_used++]);    
1233   msize = size + sizeof (struct ContentMessage);
1234   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1235   lgc->result = GNUNET_malloc (msize);
1236   lgc->result->header.size = htons (msize);
1237   lgc->result->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT);
1238   lgc->result->type = htonl (type);
1239   lgc->result->expiration = GNUNET_TIME_absolute_hton (expiration);
1240   memcpy (&lgc->result[1],
1241           data,
1242           size);
1243   GNUNET_SERVER_notify_transmit_ready (lgc->client,
1244                                        msize,
1245                                        GNUNET_TIME_UNIT_FOREVER_REL,
1246                                        &transmit_local_result,
1247                                        lgc);
1248 }
1249
1250
1251 /**
1252  * We're processing a search request from a local
1253  * client.  Now it is our turn to query the datastore.
1254  * 
1255  * @param cls our closure (struct LocalGetContext)
1256  * @param tc unused
1257  */
1258 static void
1259 transmit_local_get (void *cls,
1260                     const struct GNUNET_SCHEDULER_TaskContext *tc)
1261 {
1262   struct LocalGetContext *lgc = cls;
1263   uint32_t type;
1264   
1265   type = lgc->type;
1266   if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK)
1267     type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */
1268   GNUNET_DATASTORE_get (dsh,
1269                         &lgc->query,
1270                         type,
1271                         &process_local_get_result,
1272                         lgc,
1273                         GNUNET_TIME_UNIT_FOREVER_REL);
1274 }
1275
1276
1277 /**
1278  * We're processing a search request from a local
1279  * client.  Now it is our turn to query the datastore.
1280  * 
1281  * @param cls our closure (struct LocalGetContext)
1282  * @param ok did we succeed to queue for datastore access, should always be GNUNET_OK
1283  */
1284 static void 
1285 transmit_local_get_ready (void *cls,
1286                           int ok)
1287 {
1288   struct LocalGetContext *lgc = cls;
1289
1290   GNUNET_assert (GNUNET_OK == ok);
1291   GNUNET_SCHEDULER_add_continuation (sched,
1292                                      GNUNET_NO,
1293                                      &transmit_local_get,
1294                                      lgc,
1295                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
1296 }
1297
1298
1299 /**
1300  * Handle START_SEARCH-message (search request from client).
1301  *
1302  * @param cls closure
1303  * @param client identification of the client
1304  * @param message the actual message
1305  */
1306 static void
1307 handle_start_search (void *cls,
1308                      struct GNUNET_SERVER_Client *client,
1309                      const struct GNUNET_MessageHeader *message)
1310 {
1311   const struct SearchMessage *sm;
1312   struct LocalGetContext *lgc;
1313   uint16_t msize;
1314   unsigned int sc;
1315   
1316   msize = ntohs (message->size);
1317   if ( (msize < sizeof (struct SearchMessage)) ||
1318        (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
1319     {
1320       GNUNET_break (0);
1321       GNUNET_SERVER_receive_done (client,
1322                                   GNUNET_SYSERR);
1323       return;
1324     }
1325   sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
1326   sm = (const struct SearchMessage*) message;
1327   GNUNET_SERVER_client_keep (client);
1328   lgc = GNUNET_malloc (sizeof (struct LocalGetContext));
1329   if  (sc > 0)
1330     {
1331       lgc->results_used = sc;
1332       GNUNET_array_grow (lgc->results,
1333                          lgc->results_size,
1334                          sc * 2);
1335       memcpy (lgc->results,
1336               &sm[1],
1337               sc * sizeof (GNUNET_HashCode));
1338     }
1339   lgc->client = client;
1340   lgc->type = ntohl (sm->type);
1341   lgc->anonymity_level = ntohl (sm->anonymity_level);
1342   lgc->target = sm->target;
1343   lgc->query = sm->query;
1344   GNUNET_CONTAINER_DLL_insert (lgc_head, lgc_tail, lgc);
1345   lgc->req = queue_ds_request (GNUNET_TIME_UNIT_FOREVER_REL,
1346                                &transmit_local_get_ready,
1347                                lgc);
1348 }
1349
1350
1351 /**
1352  * List of handlers for the messages understood by this
1353  * service.
1354  */
1355 static struct GNUNET_SERVER_MessageHandler handlers[] = {
1356   {&handle_index_start, NULL, 
1357    GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
1358   {&handle_index_list_get, NULL, 
1359    GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
1360   {&handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
1361    sizeof (struct UnindexMessage) },
1362   {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
1363    0 },
1364   {NULL, NULL, 0, 0}
1365 };
1366
1367
1368 /**
1369  * A client disconnected.  Remove all of its pending queries.
1370  *
1371  * @param cls closure, NULL
1372  * @param client identification of the client
1373  */
1374 static void
1375 handle_client_disconnect (void *cls,
1376                           struct GNUNET_SERVER_Client
1377                           * client)
1378 {
1379   struct LocalGetContext *lgc;
1380
1381   lgc = lgc_head;
1382   while ( (NULL != lgc) &&
1383           (lgc->client != client) )
1384     lgc = lgc->next;
1385   if (lgc == NULL)
1386     return; /* not one of our clients */
1387   local_get_context_free (lgc);
1388 }
1389
1390
1391 /**
1392  * Task run during shutdown.
1393  *
1394  * @param cls unused
1395  * @param tc unused
1396  */
1397 static void
1398 shutdown_task (void *cls,
1399                const struct GNUNET_SCHEDULER_TaskContext *tc)
1400 {
1401   struct IndexInfo *pos;  
1402
1403   if (NULL != core)
1404     GNUNET_CORE_disconnect (core);
1405   GNUNET_DATASTORE_disconnect (dsh,
1406                                GNUNET_NO);
1407   dsh = NULL;
1408   // FIXME: iterate over 'request_map' to free entries!
1409   GNUNET_CONTAINER_multihashmap_destroy (request_map);
1410   request_map = NULL;
1411   GNUNET_CONTAINER_multihashmap_destroy (ifm);
1412   ifm = NULL;
1413   while (NULL != (pos = indexed_files))
1414     {
1415       indexed_files = pos->next;
1416       GNUNET_free (pos);
1417     }
1418 }
1419
1420
1421 /**
1422  * Method called whenever a peer disconnects.
1423  *
1424  * @param cls closure, not used
1425  * @param peer peer identity this notification is about
1426  */
1427 static void
1428 peer_disconnect_handler (void *cls,
1429                          const struct
1430                          GNUNET_PeerIdentity * peer)
1431 {
1432   // FIXME: remove all pending requests from this
1433   // peer from our memory
1434   // (iterate over request_map)
1435 }
1436
1437
1438 /**
1439  * We're processing a GET request from
1440  * another peer and have decided to forward
1441  * it to other peers.
1442  *
1443  * @param cls our "struct ProcessGetContext *"
1444  * @param tc unused
1445  */
1446 static void
1447 forward_get_request (void *cls,
1448                      const struct GNUNET_SCHEDULER_TaskContext *tc)
1449 {
1450   struct ProcessGetContext *pgc = cls;
1451
1452   // FIXME: install entry in
1453   // 'request_map' and do actual
1454   // forwarding...
1455   if (pgc->bf != NULL)
1456     GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
1457   GNUNET_free (pgc); 
1458 }
1459
1460
1461 /**
1462  * Transmit the given message by copying it to
1463  * the target buffer "buf".  "buf" will be
1464  * NULL and "size" zero if the socket was closed for
1465  * writing in the meantime.  In that case, only
1466  * free the message
1467  *
1468  * @param cls closure, pointer to the message
1469  * @param size number of bytes available in buf
1470  * @param buf where the callee should write the message
1471  * @return number of bytes written to buf
1472  */
1473 static size_t
1474 transmit_message (void *cls,
1475                   size_t size, void *buf)
1476 {
1477   struct GNUNET_MessageHeader *msg = cls;
1478   uint16_t msize;
1479   
1480   if (NULL == buf)
1481     {
1482 #if DEBUG_FS
1483       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1484                   "Dropping reply, core too busy.\n");
1485 #endif
1486       GNUNET_free (msg);
1487       return 0;
1488     }
1489   msize = ntohs (msg->size);
1490   GNUNET_assert (size >= msize);
1491   memcpy (buf, msg, msize);
1492   GNUNET_free (msg);
1493   return msize;
1494 }
1495
1496
1497 /**
1498  * Test if the load on this peer is too high
1499  * to even consider processing the query at
1500  * all.
1501  * 
1502  * @return GNUNET_YES if the load is too high, GNUNET_NO otherwise
1503  */
1504 static int
1505 test_load_too_high ()
1506 {
1507   return GNUNET_NO; // FIXME
1508 }
1509
1510
1511 /**
1512  * We're processing (local) results for a search request
1513  * from another peer.  Pass applicable results to the
1514  * peer and if we are done either clean up (operation
1515  * complete) or forward to other peers (more results possible).
1516  *
1517  * @param cls our closure (struct LocalGetContext)
1518  * @param key key for the content
1519  * @param size number of bytes in data
1520  * @param data content stored
1521  * @param type type of the content
1522  * @param priority priority of the content
1523  * @param anonymity anonymity-level for the content
1524  * @param expiration expiration time for the content
1525  * @param uid unique identifier for the datum;
1526  *        maybe 0 if no unique identifier is available
1527  */
1528 static void
1529 process_p2p_get_result (void *cls,
1530                         const GNUNET_HashCode * key,
1531                         uint32_t size,
1532                         const void *data,
1533                         uint32_t type,
1534                         uint32_t priority,
1535                         uint32_t anonymity,
1536                         struct GNUNET_TIME_Absolute
1537                         expiration, 
1538                         uint64_t uid)
1539 {
1540   struct ProcessGetContext *pgc = cls;
1541   GNUNET_HashCode dhash;
1542   GNUNET_HashCode mhash;
1543   struct PutMessage *reply;
1544   
1545   if (NULL == key)
1546     {
1547       /* no more results */
1548       if ( ( (pgc->policy & ROUTING_POLICY_FORWARD) ==  ROUTING_POLICY_FORWARD) &&
1549            ( (0 == pgc->results_found) ||
1550              (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
1551              (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
1552              (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) ) )
1553         {
1554           GNUNET_SCHEDULER_add_continuation (sched,
1555                                              GNUNET_NO,
1556                                              &forward_get_request,
1557                                              pgc,
1558                                              GNUNET_SCHEDULER_REASON_PREREQ_DONE);
1559         }
1560       else
1561         {
1562           if (pgc->bf != NULL)
1563             GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
1564           GNUNET_free (pgc); 
1565         }
1566       next_ds_request ();
1567       return;
1568     }
1569   if (type == GNUNET_DATASTORE_BLOCKTYPE_ONDEMAND)
1570     {
1571       handle_on_demand_block (key, size, data, type, priority, 
1572                               anonymity, expiration, uid,
1573                               &process_p2p_get_result,
1574                               pgc);
1575       return;
1576     }
1577   /* check for duplicates */
1578   GNUNET_CRYPTO_hash (data, size, &dhash);
1579   mingle_hash (&dhash, 
1580                pgc->mingle,
1581                &mhash);
1582   if ( (pgc->bf != NULL) &&
1583        (GNUNET_YES ==
1584         GNUNET_CONTAINER_bloomfilter_test (pgc->bf,
1585                                            &mhash)) )
1586     {      
1587 #if DEBUG_FS
1588       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1589                   "Result from datastore filtered by bloomfilter.\n");
1590 #endif
1591       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1592       return;
1593     }
1594   pgc->results_found++;
1595   if ( (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
1596        (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
1597        (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) )
1598     {
1599       if (pgc->bf == NULL)
1600         pgc->bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
1601                                                      32, 
1602                                                      BLOOMFILTER_K);
1603       GNUNET_CONTAINER_bloomfilter_add (pgc->bf, 
1604                                         &mhash);
1605     }
1606
1607   reply = GNUNET_malloc (sizeof (struct PutMessage) + size);
1608   reply->header.size = htons (sizeof (struct PutMessage) + size);
1609   reply->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
1610   reply->type = htonl (type);
1611   reply->expiration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (expiration));
1612   memcpy (&reply[1], data, size);
1613   GNUNET_CORE_notify_transmit_ready (core,
1614                                      pgc->priority,
1615                                      ACCEPTABLE_REPLY_DELAY,
1616                                      &pgc->reply_to,
1617                                      sizeof (struct PutMessage) + size,
1618                                      &transmit_message,
1619                                      reply);
1620   if ( (GNUNET_YES == test_load_too_high()) ||
1621        (pgc->results_found > 5 + 2 * pgc->priority) )
1622     {
1623       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
1624       pgc->policy &= ~ ROUTING_POLICY_FORWARD;
1625       return;
1626     }
1627   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1628 }
1629   
1630
1631 /**
1632  * We're processing a GET request from
1633  * another peer.  Give it to our local
1634  * datastore.
1635  *
1636  * @param cls our "struct ProcessGetContext"
1637  * @param ok did we get a datastore slice or not?
1638  */
1639 static void
1640 ds_get_request (void *cls, 
1641                 int ok)
1642 {
1643   struct ProcessGetContext *pgc = cls;
1644   uint32_t type;
1645   struct GNUNET_TIME_Relative timeout;
1646
1647   if (GNUNET_OK != ok)
1648     {
1649       /* no point in doing P2P stuff if we can't even do local */
1650       GNUNET_free (dsh);
1651       return;
1652     }
1653   type = pgc->type;
1654   if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK)
1655     type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */
1656   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
1657                                            (pgc->priority + 1));
1658   GNUNET_DATASTORE_get (dsh,
1659                         &pgc->query,
1660                         type,
1661                         &process_p2p_get_result,
1662                         pgc,
1663                         timeout);
1664 }
1665
1666
1667 /**
1668  * The priority level imposes a bound on the maximum
1669  * value for the ttl that can be requested.
1670  *
1671  * @param ttl_in requested ttl
1672  * @param priority given priority
1673  * @return ttl_in if ttl_in is below the limit,
1674  *         otherwise the ttl-limit for the given priority
1675  */
1676 static int32_t
1677 bound_ttl (int32_t ttl_in, uint32_t prio)
1678 {
1679   unsigned long long allowed;
1680
1681   if (ttl_in <= 0)
1682     return ttl_in;
1683   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
1684   if (ttl_in > allowed)      
1685     {
1686       if (allowed >= (1 << 30))
1687         return 1 << 30;
1688       return allowed;
1689     }
1690   return ttl_in;
1691 }
1692
1693
1694 /**
1695  * We've received a request with the specified
1696  * priority.  Bound it according to how much
1697  * we trust the given peer.
1698  * 
1699  * @param prio_in requested priority
1700  * @param peer the peer making the request
1701  * @return effective priority
1702  */
1703 static uint32_t
1704 bound_priority (uint32_t prio_in,
1705                 const struct GNUNET_PeerIdentity *peer)
1706 {
1707   return 0; // FIXME!
1708 }
1709
1710
1711 /**
1712  * Handle P2P "GET" request.
1713  *
1714  * @param cls closure, always NULL
1715  * @param peer the other peer involved (sender or receiver, NULL
1716  *        for loopback messages where we are both sender and receiver)
1717  * @param message the actual message
1718  * @return GNUNET_OK to keep the connection open,
1719  *         GNUNET_SYSERR to close it (signal serious error)
1720  */
1721 static int
1722 handle_p2p_get (void *cls,
1723                 const struct GNUNET_PeerIdentity *other,
1724                 const struct GNUNET_MessageHeader *message)
1725 {
1726   uint16_t msize;
1727   const struct GetMessage *gm;
1728   unsigned int bits;
1729   const GNUNET_HashCode *opt;
1730   struct ProcessGetContext *pgc;
1731   uint32_t bm;
1732   size_t bfsize;
1733   uint32_t ttl_decrement;
1734   double preference;
1735   int net_load_up;
1736   int net_load_down;
1737
1738   msize = ntohs(message->size);
1739   if (msize < sizeof (struct GetMessage))
1740     {
1741       GNUNET_break_op (0);
1742       return GNUNET_SYSERR;
1743     }
1744   gm = (const struct GetMessage*) message;
1745   bm = ntohl (gm->hash_bitmap);
1746   bits = 0;
1747   while (bm > 0)
1748     {
1749       if (1 == (bm & 1))
1750         bits++;
1751       bm >>= 1;
1752     }
1753   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
1754     {
1755       GNUNET_break_op (0);
1756       return GNUNET_SYSERR;
1757     }  
1758   opt = (const GNUNET_HashCode*) &gm[1];
1759   bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
1760   pgc = GNUNET_malloc (sizeof (struct ProcessGetContext));
1761   if (bfsize > 0)
1762     pgc->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &pgc[1],
1763                                                  bfsize,
1764                                                  BLOOMFILTER_K);
1765   pgc->type = ntohl (gm->type);
1766   pgc->bm = ntohl (gm->hash_bitmap);
1767   pgc->mingle = gm->filter_mutator;
1768   bits = 0;
1769   if (0 != (pgc->bm & GET_MESSAGE_BIT_RETURN_TO))
1770     pgc->reply_to.hashPubKey = opt[bits++];
1771   else
1772     pgc->reply_to = *other;
1773   if (0 != (pgc->bm & GET_MESSAGE_BIT_SKS_NAMESPACE))
1774     pgc->namespace = opt[bits++];
1775   else if (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)
1776     {
1777       GNUNET_break_op (0);
1778       GNUNET_free (pgc);
1779       return GNUNET_SYSERR;
1780     }
1781   if (0 != (pgc->bm & GET_MESSAGE_BIT_TRANSMIT_TO))
1782     pgc->prime_target.hashPubKey = opt[bits++];
1783   /* note that we can really only check load here since otherwise
1784      peers could find out that we are overloaded by being disconnected
1785      after sending us a malformed query... */
1786   if (GNUNET_YES == test_load_too_high ())
1787     {
1788       if (NULL != pgc->bf)
1789         GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
1790       GNUNET_free (pgc);
1791 #if DEBUG_FS
1792       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1793                   "Dropping query from `%s', this peer is too busy.\n",
1794                   GNUNET_h2s (other));
1795 #endif
1796       return GNUNET_OK;
1797     }
1798   net_load_up = 50; // FIXME
1799   net_load_down = 50; // FIXME
1800   pgc->policy = ROUTING_POLICY_NONE;
1801   if ( (net_load_up < IDLE_LOAD_THRESHOLD) &&
1802        (net_load_down < IDLE_LOAD_THRESHOLD) )
1803     {
1804       pgc->policy |= ROUTING_POLICY_ALL;
1805       pgc->priority = 0; /* no charge */
1806     }
1807   else
1808     {
1809       pgc->priority = bound_priority (ntohl (gm->priority), other);
1810       if ( (net_load_up < 
1811             IDLE_LOAD_THRESHOLD + pgc->priority * pgc->priority) &&
1812            (net_load_down < 
1813             IDLE_LOAD_THRESHOLD + pgc->priority * pgc->priority) )
1814         {
1815           pgc->policy |= ROUTING_POLICY_ALL;
1816         }
1817       else
1818         {
1819           // FIXME: is this sound?
1820           if (net_load_up < 90 + 10 * pgc->priority)
1821             pgc->policy |= ROUTING_POLICY_FORWARD;
1822           if (net_load_down < 90 + 10 * pgc->priority)
1823             pgc->policy |= ROUTING_POLICY_ANSWER;
1824         }
1825     }
1826   if (pgc->policy == ROUTING_POLICY_NONE)
1827     {
1828 #if DEBUG_FS
1829       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1830                   "Dropping query from `%s', network saturated.\n",
1831                   GNUNET_h2s (other));
1832 #endif
1833       if (NULL != pgc->bf)
1834         GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
1835       GNUNET_free (pgc);
1836       return GNUNET_OK;     /* drop */
1837     }
1838   if ((pgc->policy & ROUTING_POLICY_INDIRECT) != ROUTING_POLICY_INDIRECT)
1839     pgc->priority = 0;  /* kill the priority (we cannot benefit) */
1840   pgc->ttl = bound_ttl (ntohl (gm->ttl), pgc->priority);
1841   /* decrement ttl (always) */
1842   ttl_decrement = 2 * TTL_DECREMENT +
1843     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1844                               TTL_DECREMENT);
1845   if ( (pgc->ttl < 0) &&
1846        (pgc->ttl - ttl_decrement > 0) )
1847     {
1848 #if DEBUG_FS
1849       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1850                   "Dropping query from `%s' due to TTL underflow.\n",
1851                   GNUNET_h2s (other));
1852 #endif
1853       /* integer underflow => drop (should be very rare)! */
1854       if (NULL != pgc->bf)
1855         GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
1856       GNUNET_free (pgc);
1857       return GNUNET_OK;
1858     }
1859   pgc->ttl -= ttl_decrement;
1860   pgc->start_time = GNUNET_TIME_absolute_get ();
1861   preference = (double) pgc->priority;
1862   if (preference < QUERY_BANDWIDTH_VALUE)
1863     preference = QUERY_BANDWIDTH_VALUE;
1864   // FIXME: also reserve bandwidth for reply?
1865   GNUNET_CORE_peer_configure (core,
1866                               other,
1867                               GNUNET_TIME_UNIT_FOREVER_REL,
1868                               0, 0, preference, NULL, NULL);
1869   if (0 != (pgc->policy & ROUTING_POLICY_ANSWER))
1870     pgc->drq = queue_ds_request (BASIC_DATASTORE_REQUEST_DELAY,
1871                                  &ds_get_request,
1872                                  pgc);
1873   else
1874     GNUNET_SCHEDULER_add_continuation (sched,
1875                                        GNUNET_NO,
1876                                        &forward_get_request,
1877                                        pgc,
1878                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
1879   return GNUNET_OK;
1880 }
1881
1882
1883 /**
1884  * Iterator over pending requests.
1885  *
1886  * @param cls response (struct ProcessReplyClosure)
1887  * @param key our query
1888  * @param value value in the hash map (meta-info about the query)
1889  * @return GNUNET_YES (we should continue to iterate)
1890  */
1891 static int
1892 process_reply (void *cls,
1893                const GNUNET_HashCode * key,
1894                void *value)
1895 {
1896   struct ProcessReplyClosure *prq = cls;
1897   struct PendingRequest *pr = value;
1898
1899   fprintf (stderr, "FIXME %p %p\n", prq, pr);
1900   // FIXME: forward reply to client
1901   // or other peers (depending on pr...)
1902   return GNUNET_YES;
1903 }
1904
1905
1906 /**
1907  * Handle P2P "PUT" request.
1908  *
1909  * @param cls closure, always NULL
1910  * @param peer the other peer involved (sender or receiver, NULL
1911  *        for loopback messages where we are both sender and receiver)
1912  * @param message the actual message
1913  * @return GNUNET_OK to keep the connection open,
1914  *         GNUNET_SYSERR to close it (signal serious error)
1915  */
1916 static int
1917 handle_p2p_put (void *cls,
1918                 const struct GNUNET_PeerIdentity *other,
1919                 const struct GNUNET_MessageHeader *message)
1920 {
1921   const struct PutMessage *put;
1922   uint16_t msize;
1923   size_t dsize;
1924   uint32_t type;
1925   struct GNUNET_TIME_Absolute expiration;
1926   GNUNET_HashCode query;
1927   const struct KBlock *kb;
1928   struct ProcessReplyClosure prq;
1929
1930   msize = ntohs (message->size);
1931   if (msize < sizeof (struct PutMessage))
1932     {
1933       GNUNET_break_op(0);
1934       return GNUNET_SYSERR;
1935     }
1936   put = (const struct PutMessage*) message;
1937   dsize = msize - sizeof (struct PutMessage);
1938   type = ntohl (put->type);
1939   expiration = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (put->expiration));
1940
1941   /* first, validate! */
1942   switch (type)
1943     {
1944     case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
1945     case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
1946       GNUNET_CRYPTO_hash (&put[1], dsize, &query);
1947       break;
1948     case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
1949       if (dsize < sizeof (struct KBlock))
1950         {
1951           GNUNET_break_op (0);
1952           return GNUNET_SYSERR;
1953         }
1954       kb = (const struct KBlock*) &put[1];
1955       // FIXME -- validation code below broken...
1956       if ( (dsize != ntohs (kb->purpose.size) + 42) ||
1957            (GNUNET_OK !=
1958             GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_KBLOCK,
1959                                       &kb->purpose,
1960                                       &kb->signature,
1961                                       &kb->keyspace)) )
1962         {
1963           GNUNET_break_op (0);
1964           return GNUNET_SYSERR;
1965         }
1966       GNUNET_CRYPTO_hash (&kb->keyspace,
1967                           sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
1968                           &query);
1969       break;
1970     case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
1971       // FIXME -- validate SBLOCK!
1972       GNUNET_break (0);
1973       return GNUNET_OK;
1974     case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK:
1975       // FIXME -- validate SKBLOCK!
1976       GNUNET_break (0);
1977       return GNUNET_OK;
1978     default:
1979       /* unknown block type */
1980       GNUNET_break_op (0);
1981       return GNUNET_SYSERR;
1982     }
1983
1984   /* now, lookup 'query' */
1985   prq.data = (const void*) &put[1];
1986   prq.size = dsize;
1987   prq.type = type;
1988   prq.expiration = expiration;
1989   prq.priority = 0;
1990   GNUNET_CONTAINER_multihashmap_get_multiple (request_map,
1991                                               &query,
1992                                               &process_reply,
1993                                               &prq);
1994   // FIXME: if migration is on and load is low,
1995   // queue to store data in datastore;
1996   // use "prq.priority" for that!
1997   return GNUNET_OK;
1998 }
1999
2000
2001 /**
2002  * List of handlers for P2P messages
2003  * that we care about.
2004  */
2005 static struct GNUNET_CORE_MessageHandler p2p_handlers[] =
2006   {
2007     { &handle_p2p_get, 
2008       GNUNET_MESSAGE_TYPE_FS_GET, 0 },
2009     { &handle_p2p_put, 
2010       GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
2011     { NULL, 0, 0 }
2012   };
2013
2014
2015 /**
2016  * Task that will try to initiate a connection with the
2017  * core service.
2018  * 
2019  * @param cls unused
2020  * @param tc unused
2021  */
2022 static void
2023 core_connect_task (void *cls,
2024                    const struct GNUNET_SCHEDULER_TaskContext *tc);
2025
2026
2027 /**
2028  * Function called by the core after we've
2029  * connected.
2030  */
2031 static void
2032 core_start_cb (void *cls,
2033                struct GNUNET_CORE_Handle * server,
2034                const struct GNUNET_PeerIdentity *
2035                my_identity,
2036                const struct
2037                GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *
2038                publicKey)
2039 {
2040   if (server == NULL)
2041     {
2042       GNUNET_SCHEDULER_add_delayed (sched,
2043                                     GNUNET_NO,
2044                                     GNUNET_SCHEDULER_PRIORITY_HIGH,
2045                                     GNUNET_SCHEDULER_NO_TASK,
2046                                     GNUNET_TIME_UNIT_SECONDS,
2047                                     &core_connect_task,
2048                                     NULL);
2049       return;
2050     }
2051   core = server;
2052 }
2053
2054
2055 /**
2056  * Task that will try to initiate a connection with the
2057  * core service.
2058  * 
2059  * @param cls unused
2060  * @param tc unused
2061  */
2062 static void
2063 core_connect_task (void *cls,
2064                    const struct GNUNET_SCHEDULER_TaskContext *tc)
2065 {
2066   GNUNET_CORE_connect (sched,
2067                        cfg,
2068                        GNUNET_TIME_UNIT_FOREVER_REL,
2069                        NULL,
2070                        &core_start_cb,
2071                        NULL,
2072                        &peer_disconnect_handler,
2073                        NULL, 
2074                        NULL, GNUNET_NO,
2075                        NULL, GNUNET_NO,
2076                        p2p_handlers);
2077 }
2078
2079
2080 /**
2081  * Process fs requests.
2082  *
2083  * @param cls closure
2084  * @param sched scheduler to use
2085  * @param server the initialized server
2086  * @param cfg configuration to use
2087  */
2088 static void
2089 run (void *cls,
2090      struct GNUNET_SCHEDULER_Handle *s,
2091      struct GNUNET_SERVER_Handle *server,
2092      const struct GNUNET_CONFIGURATION_Handle *c)
2093 {
2094   sched = s;
2095   cfg = c;
2096
2097   ifm = GNUNET_CONTAINER_multihashmap_create (128);
2098   request_map = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
2099   read_index_list ();
2100   dsh = GNUNET_DATASTORE_connect (cfg,
2101                                   sched);
2102   if (NULL == dsh)
2103     {
2104       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2105                   _("Failed to connect to datastore service.\n"));
2106       return;
2107     }
2108   GNUNET_SERVER_disconnect_notify (server, 
2109                                    &handle_client_disconnect,
2110                                    NULL);
2111   GNUNET_SERVER_add_handlers (server, handlers);
2112   core_connect_task (NULL, NULL);
2113   GNUNET_SCHEDULER_add_delayed (sched,
2114                                 GNUNET_YES,
2115                                 GNUNET_SCHEDULER_PRIORITY_IDLE,
2116                                 GNUNET_SCHEDULER_NO_TASK,
2117                                 GNUNET_TIME_UNIT_FOREVER_REL,
2118                                 &shutdown_task,
2119                                 NULL);
2120 }
2121
2122
2123 /**
2124  * The main function for the fs service.
2125  *
2126  * @param argc number of arguments from the command line
2127  * @param argv command line arguments
2128  * @return 0 ok, 1 on error
2129  */
2130 int
2131 main (int argc, char *const *argv)
2132 {
2133   return (GNUNET_OK ==
2134           GNUNET_SERVICE_run (argc,
2135                               argv,
2136                               "fs", &run, NULL, NULL, NULL)) ? 0 : 1;
2137 }
2138
2139 /* end of gnunet-service-fs.c */