stuff
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs.c
23  * @brief program that provides the file-sharing service
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - tracking of PendingRequests
28  * - setup P2P search on CS request
29  * - setup P2P search on P2P GET
30  * - forward replies based on tracked requests
31  * - validation of KBLOCKS (almost done)
32  * - validation of SBLOCKS
33  * - validation of KSBLOCKS
34  * - content migration (put in local DS)
35  * - possible major issue: we may
36  *   queue "gazillions" of (K|S)Blocks for the
37  *   core to transmit to another peer; need
38  *   to make sure this is bounded overall...
39  * - various load-based actions (can wait)
40  * - remove on-demand blocks if they keep failing (can wait)
41  */
42 #include "platform.h"
43 #include "gnunet_core_service.h"
44 #include "gnunet_datastore_service.h"
45 #include "gnunet_peer_lib.h"
46 #include "gnunet_protocols.h"
47 #include "gnunet_signatures.h"
48 #include "gnunet_util_lib.h"
49 #include "fs.h"
50
51
52 /**
53  * In-memory information about indexed files (also available
54  * on-disk).
55  */
56 struct IndexInfo
57 {
58   
59   /**
60    * This is a linked list.
61    */
62   struct IndexInfo *next;
63
64   /**
65    * Name of the indexed file.  Memory allocated
66    * at the end of this struct (do not free).
67    */
68   const char *filename;
69
70   /**
71    * Context for transmitting confirmation to client,
72    * NULL if we've done this already.
73    */
74   struct GNUNET_SERVER_TransmitContext *tc;
75   
76   /**
77    * Hash of the contents of the file.
78    */
79   GNUNET_HashCode file_id;
80
81 };
82
83
84 /**
85  * Signature of a function that is called whenever a datastore
86  * request can be processed (or an entry put on the queue times out).
87  *
88  * @param cls closure
89  * @param ok GNUNET_OK if DS is ready, GNUNET_SYSERR on timeout
90  */
91 typedef void (*RequestFunction)(void *cls,
92                                 int ok);
93
94
95 /**
96  * Doubly-linked list of our requests for the datastore.
97  */
98 struct DatastoreRequestQueue
99 {
100
101   /**
102    * This is a doubly-linked list.
103    */
104   struct DatastoreRequestQueue *next;
105
106   /**
107    * This is a doubly-linked list.
108    */
109   struct DatastoreRequestQueue *prev;
110
111   /**
112    * Function to call (will issue the request).
113    */
114   RequestFunction req;
115
116   /**
117    * Closure for req.
118    */
119   void *req_cls;
120
121   /**
122    * When should this request time-out because we don't care anymore?
123    */
124   struct GNUNET_TIME_Absolute timeout;
125     
126   /**
127    * ID of task used for signaling timeout.
128    */
129   GNUNET_SCHEDULER_TaskIdentifier task;
130
131 };
132
133
134 /**
135  * Closure for processing START_SEARCH messages from a client.
136  */
137 struct LocalGetContext
138 {
139
140   /**
141    * This is a doubly-linked list.
142    */
143   struct LocalGetContext *next;
144
145   /**
146    * This is a doubly-linked list.
147    */
148   struct LocalGetContext *prev;
149
150   /**
151    * Client that initiated the search.
152    */
153   struct GNUNET_SERVER_Client *client;
154
155   /**
156    * Array of results that we've already received 
157    * (can be NULL).
158    */
159   GNUNET_HashCode *results; 
160
161   /**
162    * Bloomfilter over all results (for fast query construction);
163    * NULL if we don't have any results.
164    */
165   struct GNUNET_CONTAINER_BloomFilter *results_bf; 
166
167   /**
168    * DS request associated with this operation.
169    */
170   struct DatastoreRequestQueue *req;
171
172   /**
173    * Current result message to transmit to client (or NULL).
174    */
175   struct ContentMessage *result;
176   
177   /**
178    * Type of the content that we're looking for.
179    * 0 for any.
180    */
181   uint32_t type;
182
183   /**
184    * Desired anonymity level.
185    */
186   uint32_t anonymity_level;
187
188   /**
189    * Number of results actually stored in the results array.
190    */
191   unsigned int results_used;
192   
193   /**
194    * Size of the results array in memory.
195    */
196   unsigned int results_size;
197
198   /**
199    * If the request is for a DBLOCK or IBLOCK, this is the identity of
200    * the peer that is known to have a response.  Set to all-zeros if
201    * such a target is not known (note that even if OUR anonymity
202    * level is >0 we may happen to know the responder's identity;
203    * nevertheless, we should probably not use it for a DHT-lookup
204    * or similar blunt actions in order to avoid exposing ourselves).
205    * <p>
206    * If the request is for an SBLOCK, this is the identity of the
207    * pseudonym to which the SBLOCK belongs. 
208    * <p>
209    * If the request is for a KBLOCK, "target" must be all zeros.
210    */
211   GNUNET_HashCode target;
212
213   /**
214    * Hash of the keyword (aka query) for KBLOCKs; Hash of
215    * the CHK-encoded block for DBLOCKS and IBLOCKS (aka query)
216    * and hash of the identifier XORed with the target for
217    * SBLOCKS (aka query).
218    */
219   GNUNET_HashCode query;
220
221 };
222
223
224 /**
225  * Possible routing policies for an FS-GET request.
226  */
227 enum RoutingPolicy
228   {
229     /**
230      * Simply drop the request.
231      */
232     ROUTING_POLICY_NONE = 0,
233     
234     /**
235      * Answer it if we can from local datastore.
236      */
237     ROUTING_POLICY_ANSWER = 1,
238
239     /**
240      * Forward the request to other peers (if possible).
241      */
242     ROUTING_POLICY_FORWARD = 2,
243
244     /**
245      * Forward to other peers, and ask them to route
246      * the response via ourselves.
247      */
248     ROUTING_POLICY_INDIRECT = 6,
249     
250     /**
251      * Do everything we could possibly do (that would
252      * make sense).
253      */
254     ROUTING_POLICY_ALL = 7
255   };
256
257
258 /**
259  * Internal context we use for our initial processing
260  * of a GET request.
261  */
262 struct ProcessGetContext
263 {
264   /**
265    * The search query (used for datastore lookup).
266    */
267   GNUNET_HashCode query;
268   
269   /**
270    * Which peer we should forward the response to.
271    */
272   struct GNUNET_PeerIdentity reply_to;
273
274   /**
275    * Namespace for the result (only set for SKS requests)
276    */
277   GNUNET_HashCode namespace;
278
279   /**
280    * Peer that we should forward the query to if possible
281    * (since that peer likely has the content).
282    */
283   struct GNUNET_PeerIdentity prime_target;
284
285   /**
286    * When did we receive this request?
287    */
288   struct GNUNET_TIME_Absolute start_time;
289
290   /**
291    * Our entry in the DRQ (non-NULL while we wait for our
292    * turn to interact with the local database).
293    */
294   struct DatastoreRequestQueue *drq;
295
296   /**
297    * Filter used to eliminate duplicate
298    * results.   Can be NULL if we are
299    * not yet filtering any results.
300    */
301   struct GNUNET_CONTAINER_BloomFilter *bf;
302
303   /**
304    * Bitmap describing which of the optional
305    * hash codes / peer identities were given to us.
306    */
307   uint32_t bm;
308
309   /**
310    * Desired block type.
311    */
312   uint32_t type;
313
314   /**
315    * Priority of the request.
316    */
317   uint32_t priority;
318
319   /**
320    * In what ways are we going to process
321    * the request?
322    */
323   enum RoutingPolicy policy;
324
325   /**
326    * Time-to-live for the request (value
327    * we use).
328    */
329   int32_t ttl;
330
331   /**
332    * Number to mingle hashes for bloom-filter
333    * tests with.
334    */
335   int32_t mingle;
336
337   /**
338    * Number of results that were found so far.
339    */
340   unsigned int results_found;
341 };
342
343
344 /**
345  * All requests from a client are 
346  * kept in a doubly-linked list.
347  */
348 struct ClientRequestList;
349
350
351 /**
352  * Information we keep for each pending request.  We should try to
353  * keep this struct as small as possible since its memory consumption
354  * is key to how many requests we can have pending at once.
355  */
356 struct PendingRequest
357 {
358
359   /**
360    * ID of a client making a request, NULL if this entry is for a
361    * peer.
362    */
363   struct GNUNET_SERVER_Client *client;
364
365   /**
366    * If this request was made by a client,
367    * this is our entry in the client request
368    * list; otherwise NULL.
369    */
370   struct ClientRequestList *crl_entry;
371
372   /**
373    * If this is a namespace query, pointer to the hash of the public
374    * key of the namespace; otherwise NULL.
375    */
376   GNUNET_HashCode *namespace;
377
378   /**
379    * Bloomfilter we use to filter out replies that we don't care about
380    * (anymore).  NULL as long as we are interested in all replies.
381    */
382   struct GNUNET_CONTAINER_BloomFilter *bf;
383
384   /**
385    * Hash code of all replies that we have seen so far (only valid
386    * if client is not NULL since we only track replies like this for
387    * our own clients).
388    */
389   GNUNET_HashCode *replies_seen;
390
391   /**
392    * When did we first see this request (form this peer), or, if our
393    * client is initiating, when did we last initiate a search?
394    */
395   struct GNUNET_TIME_Absolute start_time;
396
397   /**
398    * The query that this request is for.
399    */
400   GNUNET_HashCode query;
401
402   /**
403    * (Interned) Peer identifier (only valid if "client" is NULL)
404    * that identifies a peer that gave us this request.
405    */
406   GNUNET_PEER_Id source_pid;
407
408   /**
409    * (Interned) Peer identifier that identifies a preferred target
410    * for requests.
411    */
412   GNUNET_PEER_Id target_pid;
413
414   /**
415    * (Interned) Peer identifiers of peers that have already
416    * received our query for this content.
417    */
418   GNUNET_PEER_Id *used_pids;
419
420   /**
421    * How many entries in "used_pids" are actually valid?
422    */
423   unsigned int used_pids_off;
424
425   /**
426    * How long is the "used_pids" array?
427    */
428   unsigned int used_pids_size;
429
430   /**
431    * How many entries in "replies_seen" are actually valid?
432    */
433   unsigned int replies_seen_off;
434
435   /**
436    * How long is the "replies_seen" array?
437    */
438   unsigned int replies_seen_size;
439   
440   /**
441    * Priority with which this request was made.  If one of our clients
442    * made the request, then this is the current priority that we are
443    * using when initiating the request.  This value is used when
444    * we decide to reward other peers with trust for providing a reply.
445    */
446   uint32_t priority;
447
448   /**
449    * Priority points left for us to spend when forwarding this request
450    * to other peers.
451    */
452   uint32_t remaining_priority;
453
454   /**
455    * TTL with which we saw this request (or, if we initiated, TTL that
456    * we used for the request).
457    */
458   int32_t ttl;
459   
460   /**
461    * Type of the content that this request is for.
462    */
463   uint32_t type;
464
465 };
466
467
468 /**
469  * All requests from a client are 
470  * kept in a doubly-linked list.
471  */
472 struct ClientRequestList
473 {
474   /**
475    * This is a doubly-linked list.
476    */
477   struct ClientRequestList *next;
478
479   /**
480    * This is a doubly-linked list.
481    */ 
482   struct ClientRequestList *prev;
483
484   /**
485    * A request from this client.
486    */
487   struct PendingRequest *req;
488
489 };
490
491
492 /**
493  * Linked list of all clients that we are 
494  * currently processing requests for.
495  */
496 struct ClientList
497 {
498
499   /**
500    * This is a linked list.
501    */
502   struct ClientList *next;
503
504   /**
505    * What client is this entry for?
506    */
507   struct GNUNET_SERVER_Client* client;
508
509   /**
510    * Head of the DLL of requests from this client.
511    */
512   struct ClientRequestList *head;
513
514   /**
515    * Tail of the DLL of requests from this client.
516    */
517   struct ClientRequestList *tail;
518
519 };
520
521
522 /**
523  * Closure for "process_reply" function.
524  */
525 struct ProcessReplyClosure
526 {
527   /**
528    * The data for the reply.
529    */
530   const void *data;
531
532   /**
533    * When the reply expires.
534    */
535   struct GNUNET_TIME_Absolute expiration;
536
537   /**
538    * Size of data.
539    */
540   size_t size;
541
542   /**
543    * Type of the block.
544    */
545   uint32_t type;
546
547   /**
548    * How much was this reply worth to us?
549    */
550   uint32_t priority;
551 };
552
553
554 /**
555  * Map from queries to pending requests ("struct PendingRequest") for
556  * this query.
557  */
558 static struct GNUNET_CONTAINER_MultiHashMap *request_map;
559
560 /**
561  * Our connection to the datastore.
562  */
563 static struct GNUNET_DATASTORE_Handle *dsh;
564
565 /**
566  * Our scheduler.
567  */
568 static struct GNUNET_SCHEDULER_Handle *sched;
569
570 /**
571  * Our configuration.
572  */
573 const struct GNUNET_CONFIGURATION_Handle *cfg;
574
575 /**
576  * Handle to the core service (NULL until we've connected to it).
577  */
578 struct GNUNET_CORE_Handle *core;
579
580 /**
581  * Head of doubly-linked LGC list.
582  */
583 static struct LocalGetContext *lgc_head;
584
585 /**
586  * Tail of doubly-linked LGC list.
587  */
588 static struct LocalGetContext *lgc_tail;
589
590 /**
591  * Head of request queue for the datastore, sorted by timeout.
592  */
593 static struct DatastoreRequestQueue *drq_head;
594
595 /**
596  * Tail of request queue for the datastore.
597  */
598 static struct DatastoreRequestQueue *drq_tail;
599
600 /**
601  * Linked list of indexed files.
602  */
603 static struct IndexInfo *indexed_files;
604
605 /**
606  * Maps hash over content of indexed files
607  * to the respective filename.  The filenames
608  * are pointers into the indexed_files linked
609  * list and do not need to be freed.
610  */
611 static struct GNUNET_CONTAINER_MultiHashMap *ifm;
612
613 /**
614  * Map of query hash codes to requests.
615  */
616 static struct GNUNET_CONTAINER_MultiHashMap *requests_by_query;
617
618 /**
619  * Map of peer IDs to requests (for those requests coming
620  * from other peers).
621  */
622 static struct GNUNET_CONTAINER_MultiHashMap *requests_by_peer;
623
624 /**
625  * Linked list of all of our clients and their requests.
626  */
627 static struct ClientList *clients;
628
629 /**
630  * Heap with the request that will expire next at the top.
631  */
632 static struct GNUNET_CONTAINER_Heap *requests_by_expiration;
633
634
635 /**
636  * Write the current index information list to disk.
637  */ 
638 static void
639 write_index_list ()
640 {
641   struct GNUNET_BIO_WriteHandle *wh;
642   char *fn;
643   struct IndexInfo *pos;  
644
645   if (GNUNET_OK !=
646       GNUNET_CONFIGURATION_get_value_filename (cfg,
647                                                "FS",
648                                                "INDEXDB",
649                                                &fn))
650     {
651       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
652                   _("Configuration option `%s' in section `%s' missing.\n"),
653                   "INDEXDB",
654                   "FS");
655       return;
656     }
657   wh = GNUNET_BIO_write_open (fn);
658   if (NULL == wh)
659     {
660       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
661                   _("Could not open `%s'.\n"),
662                   fn);
663       GNUNET_free (fn);
664       return;
665     }
666   pos = indexed_files;
667   while (pos != NULL)
668     {
669       if ( (GNUNET_OK !=
670             GNUNET_BIO_write (wh,
671                               &pos->file_id,
672                               sizeof (GNUNET_HashCode))) ||
673            (GNUNET_OK !=
674             GNUNET_BIO_write_string (wh,
675                                      pos->filename)) )
676         break;
677       pos = pos->next;
678     }
679   if (GNUNET_OK != 
680       GNUNET_BIO_write_close (wh))
681     {
682       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
683                   _("Error writing `%s'.\n"),
684                   fn);
685       GNUNET_free (fn);
686       return;
687     }
688   GNUNET_free (fn);
689 }
690
691
692 /**
693  * Read index information from disk.
694  */
695 static void
696 read_index_list ()
697 {
698   struct GNUNET_BIO_ReadHandle *rh;
699   char *fn;
700   struct IndexInfo *pos;  
701   char *fname;
702   GNUNET_HashCode hc;
703   size_t slen;
704   char *emsg;
705
706   if (GNUNET_OK !=
707       GNUNET_CONFIGURATION_get_value_filename (cfg,
708                                                "FS",
709                                                "INDEXDB",
710                                                &fn))
711     {
712       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
713                   _("Configuration option `%s' in section `%s' missing.\n"),
714                   "INDEXDB",
715                   "FS");
716       return;
717     }
718   rh = GNUNET_BIO_read_open (fn);
719   if (NULL == rh)
720     {
721       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
722                   _("Could not open `%s'.\n"),
723                   fn);
724       GNUNET_free (fn);
725       return;
726     }
727
728   while ( (GNUNET_OK ==
729            GNUNET_BIO_read (rh,
730                             "Hash of indexed file",
731                             &hc,
732                             sizeof (GNUNET_HashCode))) &&
733           (GNUNET_OK ==
734            GNUNET_BIO_read_string (rh, 
735                                    "Name of indexed file",
736                                    &fname,
737                                    1024 * 16)) )
738     {
739       slen = strlen (fname) + 1;
740       pos = GNUNET_malloc (sizeof (struct IndexInfo) + slen);
741       pos->file_id = hc;
742       pos->filename = (const char *) &pos[1];
743       memcpy (&pos[1], fname, slen);
744       if (GNUNET_SYSERR ==
745           GNUNET_CONTAINER_multihashmap_put (ifm,
746                                              &hc,
747                                              (void*) pos->filename,
748                                              GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
749         {
750           GNUNET_free (pos);
751         }
752       else
753         {
754           pos->next = indexed_files;
755           indexed_files = pos;
756         }
757     }
758   if (GNUNET_OK != 
759       GNUNET_BIO_read_close (rh, &emsg))
760     GNUNET_free (emsg);
761   GNUNET_free (fn);
762 }
763
764
765 /**
766  * We've validated the hash of the file we're about to
767  * index.  Signal success to the client and update
768  * our internal data structures.
769  *
770  * @param ii the index info entry for the request
771  */
772 static void
773 signal_index_ok (struct IndexInfo *ii)
774 {
775   if (GNUNET_SYSERR ==
776       GNUNET_CONTAINER_multihashmap_put (ifm,
777                                          &ii->file_id,
778                                          (void*) ii->filename,
779                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
780     {
781       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
782                   _("Index request received for file `%s' is indexed as `%s'.  Permitting anyway.\n"),
783                   ii->filename,
784                   (const char*) GNUNET_CONTAINER_multihashmap_get (ifm,
785                                                                    &ii->file_id));
786       GNUNET_SERVER_transmit_context_append (ii->tc,
787                                              NULL, 0,
788                                              GNUNET_MESSAGE_TYPE_FS_INDEX_START_OK);
789       GNUNET_SERVER_transmit_context_run (ii->tc,
790                                           GNUNET_TIME_UNIT_MINUTES);
791       GNUNET_free (ii);
792       return;
793     }
794   ii->next = indexed_files;
795   indexed_files = ii;
796   write_index_list ();
797   GNUNET_SERVER_transmit_context_append (ii->tc,
798                                          NULL, 0,
799                                          GNUNET_MESSAGE_TYPE_FS_INDEX_START_OK);
800   GNUNET_SERVER_transmit_context_run (ii->tc,
801                                       GNUNET_TIME_UNIT_MINUTES);
802   ii->tc = NULL;
803 }
804
805
806 /**
807  * Function called once the hash computation over an
808  * indexed file has completed.
809  *
810  * @param cls closure, our publishing context
811  * @param res resulting hash, NULL on error
812  */
813 static void 
814 hash_for_index_val (void *cls,
815                     const GNUNET_HashCode *
816                     res)
817 {
818   struct IndexInfo *ii = cls;
819   
820   if ( (res == NULL) ||
821        (0 != memcmp (res,
822                      &ii->file_id,
823                      sizeof(GNUNET_HashCode))) )
824     {
825       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
826                   _("Hash mismatch trying to index file `%s'\n"),
827                   ii->filename);
828       GNUNET_SERVER_transmit_context_append (ii->tc,
829                                              NULL, 0,
830                                              GNUNET_MESSAGE_TYPE_FS_INDEX_START_FAILED);
831       GNUNET_SERVER_transmit_context_run (ii->tc,
832                                           GNUNET_TIME_UNIT_MINUTES);
833       GNUNET_free (ii);
834       return;
835     }
836   signal_index_ok (ii);
837 }
838
839
840 /**
841  * Handle INDEX_START-message.
842  *
843  * @param cls closure
844  * @param client identification of the client
845  * @param message the actual message
846  */
847 static void
848 handle_index_start (void *cls,
849                     struct GNUNET_SERVER_Client *client,
850                     const struct GNUNET_MessageHeader *message)
851 {
852   const struct IndexStartMessage *ism;
853   const char *fn;
854   uint16_t msize;
855   struct IndexInfo *ii;
856   size_t slen;
857   uint32_t dev;
858   uint64_t ino;
859   uint32_t mydev;
860   uint64_t myino;
861
862   msize = ntohs(message->size);
863   if ( (msize <= sizeof (struct IndexStartMessage)) ||
864        ( ((const char *)message)[msize-1] != '\0') )
865     {
866       GNUNET_break (0);
867       GNUNET_SERVER_receive_done (client,
868                                   GNUNET_SYSERR);
869       return;
870     }
871   ism = (const struct IndexStartMessage*) message;
872   fn = (const char*) &ism[1];
873   dev = ntohl (ism->device);
874   ino = GNUNET_ntohll (ism->inode);
875   ism = (const struct IndexStartMessage*) message;
876   slen = strlen (fn) + 1;
877   ii = GNUNET_malloc (sizeof (struct IndexInfo) + slen);
878   ii->filename = (const char*) &ii[1];
879   memcpy (&ii[1], fn, slen);
880   ii->file_id = ism->file_id;  
881   ii->tc = GNUNET_SERVER_transmit_context_create (client);
882   if ( ( (dev != 0) ||
883          (ino != 0) ) &&
884        (GNUNET_OK == GNUNET_DISK_file_get_identifiers (fn,
885                                                        &mydev,
886                                                        &myino)) &&
887        ( (dev == mydev) &&
888          (ino == myino) ) )
889     {      
890       /* fast validation OK! */
891       signal_index_ok (ii);
892       return;
893     }
894   /* slow validation, need to hash full file (again) */
895   GNUNET_CRYPTO_hash_file (sched,
896                            GNUNET_SCHEDULER_PRIORITY_IDLE,
897                            GNUNET_NO,
898                            fn,
899                            HASHING_BLOCKSIZE,
900                            &hash_for_index_val,
901                            ii);
902 }
903
904
905 /**
906  * Handle INDEX_LIST_GET-message.
907  *
908  * @param cls closure
909  * @param client identification of the client
910  * @param message the actual message
911  */
912 static void
913 handle_index_list_get (void *cls,
914                        struct GNUNET_SERVER_Client *client,
915                        const struct GNUNET_MessageHeader *message)
916 {
917   struct GNUNET_SERVER_TransmitContext *tc;
918   struct IndexInfoMessage *iim;
919   char buf[GNUNET_SERVER_MAX_MESSAGE_SIZE];
920   size_t slen;
921   const char *fn;
922   struct GNUNET_MessageHeader *msg;
923   struct IndexInfo *pos;
924
925   tc = GNUNET_SERVER_transmit_context_create (client);
926   iim = (struct IndexInfoMessage*) buf;
927   msg = &iim->header;
928   pos = indexed_files;
929   while (NULL != pos)
930     {
931       iim->reserved = 0;
932       iim->file_id = pos->file_id;
933       fn = pos->filename;
934       slen = strlen (fn) + 1;
935       if (slen + sizeof (struct IndexInfoMessage) > 
936           GNUNET_SERVER_MAX_MESSAGE_SIZE)
937         {
938           GNUNET_break (0);
939           break;
940         }
941       memcpy (&iim[1], fn, slen);
942       GNUNET_SERVER_transmit_context_append
943         (tc,
944          &msg[1],
945          sizeof (struct IndexInfoMessage) 
946          - sizeof (struct GNUNET_MessageHeader) + slen,
947          GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_ENTRY);
948       pos = pos->next;
949     }
950   GNUNET_SERVER_transmit_context_append (tc,
951                                          NULL, 0,
952                                          GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_END);
953   GNUNET_SERVER_transmit_context_run (tc,
954                                       GNUNET_TIME_UNIT_MINUTES);
955 }
956
957
958 /**
959  * Handle UNINDEX-message.
960  *
961  * @param cls closure
962  * @param client identification of the client
963  * @param message the actual message
964  */
965 static void
966 handle_unindex (void *cls,
967                 struct GNUNET_SERVER_Client *client,
968                 const struct GNUNET_MessageHeader *message)
969 {
970   const struct UnindexMessage *um;
971   struct IndexInfo *pos;
972   struct IndexInfo *prev;
973   struct IndexInfo *next;
974   struct GNUNET_SERVER_TransmitContext *tc;
975   int found;
976   
977   um = (const struct UnindexMessage*) message;
978   found = GNUNET_NO;
979   prev = NULL;
980   pos = indexed_files;
981   while (NULL != pos)
982     {
983       next = pos->next;
984       if (0 == memcmp (&pos->file_id,
985                        &um->file_id,
986                        sizeof (GNUNET_HashCode)))
987         {
988           if (prev == NULL)
989             indexed_files = pos->next;
990           else
991             prev->next = pos->next;
992           GNUNET_free (pos);
993           found = GNUNET_YES;
994         }
995       else
996         {
997           prev = pos;
998         }
999       pos = next;
1000     }
1001   if (GNUNET_YES == found)
1002     write_index_list ();
1003   tc = GNUNET_SERVER_transmit_context_create (client);
1004   GNUNET_SERVER_transmit_context_append (tc,
1005                                          NULL, 0,
1006                                          GNUNET_MESSAGE_TYPE_FS_UNINDEX_OK);
1007   GNUNET_SERVER_transmit_context_run (tc,
1008                                       GNUNET_TIME_UNIT_MINUTES);
1009 }
1010
1011
1012 /**
1013  * Run the next DS request in our
1014  * queue, we're done with the current one.
1015  */
1016 static void
1017 next_ds_request ()
1018 {
1019   struct DatastoreRequestQueue *e;
1020   
1021   while (NULL != (e = drq_head))
1022     {
1023       if (0 != GNUNET_TIME_absolute_get_remaining (e->timeout).value)
1024         break;
1025       if (e->task != GNUNET_SCHEDULER_NO_TASK)
1026         GNUNET_SCHEDULER_cancel (sched, e->task);
1027       GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
1028       e->req (e->req_cls, GNUNET_NO);
1029       GNUNET_free (e);  
1030     }
1031   if (e == NULL)
1032     return;
1033   if (e->task != GNUNET_SCHEDULER_NO_TASK)
1034     GNUNET_SCHEDULER_cancel (sched, e->task);
1035   e->task = GNUNET_SCHEDULER_NO_TASK;
1036   e->req (e->req_cls, GNUNET_YES);
1037   GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
1038   GNUNET_free (e);  
1039 }
1040
1041
1042 /**
1043  * A datastore request had to be timed out. 
1044  *
1045  * @param cls closure (of type "struct DatastoreRequestQueue*")
1046  * @param tc task context, unused
1047  */
1048 static void
1049 timeout_ds_request (void *cls,
1050                     const struct GNUNET_SCHEDULER_TaskContext *tc)
1051 {
1052   struct DatastoreRequestQueue *e = cls;
1053
1054   e->task = GNUNET_SCHEDULER_NO_TASK;
1055   GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
1056   e->req (e->req_cls, GNUNET_NO);
1057   GNUNET_free (e);  
1058 }
1059
1060
1061 /**
1062  * Queue a request for the datastore.
1063  *
1064  * @param deadline by when the request should run
1065  * @param fun function to call once the request can be run
1066  * @param fun_cls closure for fun
1067  */
1068 static struct DatastoreRequestQueue *
1069 queue_ds_request (struct GNUNET_TIME_Relative deadline,
1070                   RequestFunction fun,
1071                   void *fun_cls)
1072 {
1073   struct DatastoreRequestQueue *e;
1074   struct DatastoreRequestQueue *bef;
1075
1076   if (drq_head == NULL)
1077     {
1078       /* no other requests pending, run immediately */
1079       fun (fun_cls, GNUNET_OK);
1080       return NULL;
1081     }
1082   e = GNUNET_malloc (sizeof (struct DatastoreRequestQueue));
1083   e->timeout = GNUNET_TIME_relative_to_absolute (deadline);
1084   e->req = fun;
1085   e->req_cls = fun_cls;
1086   if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
1087     {
1088       /* local request, highest prio, put at head of queue
1089          regardless of deadline */
1090       bef = NULL;
1091     }
1092   else
1093     {
1094       bef = drq_tail;
1095       while ( (NULL != bef) &&
1096               (e->timeout.value < bef->timeout.value) )
1097         bef = bef->prev;
1098     }
1099   GNUNET_CONTAINER_DLL_insert_after (drq_head, drq_tail, bef, e);
1100   if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
1101     return e;
1102   e->task = GNUNET_SCHEDULER_add_delayed (sched,
1103                                           GNUNET_NO,
1104                                           GNUNET_SCHEDULER_PRIORITY_BACKGROUND,
1105                                           GNUNET_SCHEDULER_NO_TASK,
1106                                           deadline,
1107                                           &timeout_ds_request,
1108                                           e);
1109   return e;                                    
1110 }
1111
1112
1113 /**
1114  * Free the state associated with a local get context.
1115  *
1116  * @param lgc the lgc to free
1117  */
1118 static void
1119 local_get_context_free (struct LocalGetContext *lgc) 
1120 {
1121   GNUNET_CONTAINER_DLL_remove (lgc_head, lgc_tail, lgc);
1122   GNUNET_SERVER_client_drop (lgc->client); 
1123   GNUNET_free_non_null (lgc->results);
1124   if (lgc->results_bf != NULL)
1125     GNUNET_CONTAINER_bloomfilter_free (lgc->results_bf);
1126   if (lgc->req != NULL)
1127     {
1128       if (lgc->req->task != GNUNET_SCHEDULER_NO_TASK)
1129         GNUNET_SCHEDULER_cancel (sched, lgc->req->task);
1130       GNUNET_CONTAINER_DLL_remove (lgc_head, lgc_tail, lgc);
1131       GNUNET_free (lgc->req);
1132     }
1133   GNUNET_free (lgc);
1134 }
1135
1136
1137 /**
1138  * We're able to transmit the next (local) result to the client.
1139  * Do it and ask the datastore for more.  Or, on error, tell
1140  * the datastore to stop giving us more.
1141  *
1142  * @param cls our closure (struct LocalGetContext)
1143  * @param max maximum number of bytes we can transmit
1144  * @param buf where to copy our message
1145  * @return number of bytes copied to buf
1146  */
1147 static size_t
1148 transmit_local_result (void *cls,
1149                        size_t max,
1150                        void *buf)
1151 {
1152   struct LocalGetContext *lgc = cls;  
1153   uint16_t msize;
1154
1155   if (NULL == buf)
1156     {
1157       /* error, abort! */
1158       GNUNET_free (lgc->result);
1159       lgc->result = NULL;
1160       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
1161       return 0;
1162     }
1163   msize = ntohs (lgc->result->header.size);
1164   GNUNET_assert (max >= msize);
1165   memcpy (buf, lgc->result, msize);
1166   GNUNET_free (lgc->result);
1167   lgc->result = NULL;
1168   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1169   return msize;
1170 }
1171
1172
1173 /**
1174  * Continuation called from datastore's remove
1175  * function.
1176  *
1177  * @param cls unused
1178  * @param success did the deletion work?
1179  * @param msg error message
1180  */
1181 static void
1182 remove_cont (void *cls,
1183              int success,
1184              const char *msg)
1185 {
1186   if (GNUNET_OK != success)
1187     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1188                 _("Failed to delete bogus block: %s\n"),
1189                 msg);
1190   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1191 }
1192
1193
1194 /**
1195  * Mingle hash with the mingle_number to
1196  * produce different bits.
1197  */
1198 static void
1199 mingle_hash (const GNUNET_HashCode * in,
1200              int32_t mingle_number, 
1201              GNUNET_HashCode * hc)
1202 {
1203   GNUNET_HashCode m;
1204
1205   GNUNET_CRYPTO_hash (&mingle_number, 
1206                       sizeof (int32_t), 
1207                       &m);
1208   GNUNET_CRYPTO_hash_xor (&m, in, hc);
1209 }
1210
1211
1212 /**
1213  * We've received an on-demand encoded block
1214  * from the datastore.  Attempt to do on-demand
1215  * encoding and (if successful), call the 
1216  * continuation with the resulting block.  On
1217  * error, clean up and ask the datastore for
1218  * more results.
1219  *
1220  * @param key key for the content
1221  * @param size number of bytes in data
1222  * @param data content stored
1223  * @param type type of the content
1224  * @param priority priority of the content
1225  * @param anonymity anonymity-level for the content
1226  * @param expiration expiration time for the content
1227  * @param uid unique identifier for the datum;
1228  *        maybe 0 if no unique identifier is available
1229  * @param cont function to call with the actual block
1230  * @param cont_cls closure for cont
1231  */
1232 static void
1233 handle_on_demand_block (const GNUNET_HashCode * key,
1234                         uint32_t size,
1235                         const void *data,
1236                         uint32_t type,
1237                         uint32_t priority,
1238                         uint32_t anonymity,
1239                         struct GNUNET_TIME_Absolute
1240                         expiration, uint64_t uid,
1241                         GNUNET_DATASTORE_Iterator cont,
1242                         void *cont_cls)
1243 {
1244   const struct OnDemandBlock *odb;
1245   GNUNET_HashCode nkey;
1246   struct GNUNET_CRYPTO_AesSessionKey skey;
1247   struct GNUNET_CRYPTO_AesInitializationVector iv;
1248   GNUNET_HashCode query;
1249   ssize_t nsize;
1250   char ndata[DBLOCK_SIZE];
1251   char edata[DBLOCK_SIZE];
1252   const char *fn;
1253   struct GNUNET_DISK_FileHandle *fh;
1254   uint64_t off;
1255
1256   if (size != sizeof (struct OnDemandBlock))
1257     {
1258       GNUNET_break (0);
1259       GNUNET_DATASTORE_remove (dsh, 
1260                                key,
1261                                size,
1262                                data,
1263                                &remove_cont,
1264                                NULL,
1265                                GNUNET_TIME_UNIT_FOREVER_REL);     
1266       return;
1267     }
1268   odb = (const struct OnDemandBlock*) data;
1269   off = GNUNET_ntohll (odb->offset);
1270   fn = (const char*) GNUNET_CONTAINER_multihashmap_get (ifm,
1271                                                         &odb->file_id);
1272   fh = NULL;
1273   if ( (NULL == fn) ||
1274        (NULL == (fh = GNUNET_DISK_file_open (fn, 
1275                                              GNUNET_DISK_OPEN_READ))) ||
1276        (off !=
1277         GNUNET_DISK_file_seek (fh,
1278                                off,
1279                                GNUNET_DISK_SEEK_SET)) ||
1280        (-1 ==
1281         (nsize = GNUNET_DISK_file_read (fh,
1282                                         ndata,
1283                                         sizeof (ndata)))) )
1284     {
1285       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1286                   _("Could not access indexed file `%s' at offset %llu: %s\n"),
1287                   GNUNET_h2s (&odb->file_id),
1288                   (unsigned long long) off,
1289                   STRERROR (errno));
1290       if (fh != NULL)
1291         GNUNET_DISK_file_close (fh);
1292       /* FIXME: if this happens often, we need
1293          to remove the OnDemand block from the DS! */
1294       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);        
1295       return;
1296     }
1297   GNUNET_DISK_file_close (fh);
1298   GNUNET_CRYPTO_hash (ndata,
1299                       nsize,
1300                       &nkey);
1301   GNUNET_CRYPTO_hash_to_aes_key (&nkey, &skey, &iv);
1302   GNUNET_CRYPTO_aes_encrypt (ndata,
1303                              nsize,
1304                              &skey,
1305                              &iv,
1306                              edata);
1307   GNUNET_CRYPTO_hash (edata,
1308                       nsize,
1309                       &query);
1310   if (0 != memcmp (&query, 
1311                    key,
1312                    sizeof (GNUNET_HashCode)))
1313     {
1314       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1315                   _("Indexed file `%s' changed at offset %llu\n"),
1316                   fn,
1317                   (unsigned long long) off);
1318       /* FIXME: if this happens often, we need
1319          to remove the OnDemand block from the DS! */
1320       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1321       return;
1322     }
1323   cont (cont_cls,
1324         key,
1325         nsize,
1326         edata,
1327         GNUNET_DATASTORE_BLOCKTYPE_DBLOCK,
1328         priority,
1329         anonymity,
1330         expiration,
1331         uid);
1332 }
1333
1334
1335 /**
1336  * We're processing (local) results for a search request
1337  * from a (local) client.  Pass applicable results to the
1338  * client and if we are done either clean up (operation
1339  * complete) or switch to P2P search (more results possible).
1340  *
1341  * @param cls our closure (struct LocalGetContext)
1342  * @param key key for the content
1343  * @param size number of bytes in data
1344  * @param data content stored
1345  * @param type type of the content
1346  * @param priority priority of the content
1347  * @param anonymity anonymity-level for the content
1348  * @param expiration expiration time for the content
1349  * @param uid unique identifier for the datum;
1350  *        maybe 0 if no unique identifier is available
1351  */
1352 static void
1353 process_local_get_result (void *cls,
1354                           const GNUNET_HashCode * key,
1355                           uint32_t size,
1356                           const void *data,
1357                           uint32_t type,
1358                           uint32_t priority,
1359                           uint32_t anonymity,
1360                           struct GNUNET_TIME_Absolute
1361                           expiration, 
1362                           uint64_t uid)
1363 {
1364   struct LocalGetContext *lgc = cls;
1365   size_t msize;
1366   unsigned int i;
1367
1368   if (key == NULL)
1369     {
1370       /* no further results from datastore; continue
1371          processing further requests from the client and
1372          allow the next task to use the datastore; also,
1373          switch to P2P requests or clean up our state. */
1374       next_ds_request ();
1375       GNUNET_SERVER_receive_done (lgc->client,
1376                                   GNUNET_OK);
1377       if ( (lgc->results_used == 0) ||
1378            (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
1379            (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
1380            (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) )
1381         {
1382           // FIXME: initiate P2P search
1383           return;
1384         }
1385       /* got all possible results, clean up! */
1386       local_get_context_free (lgc);
1387       return;
1388     }
1389   if (type == GNUNET_DATASTORE_BLOCKTYPE_ONDEMAND)
1390     {
1391       handle_on_demand_block (key, size, data, type, priority, 
1392                               anonymity, expiration, uid,
1393                               &process_local_get_result,
1394                               lgc);
1395       return;
1396     }
1397   if (type != lgc->type)
1398     {
1399       /* this should be virtually impossible to reach (DBLOCK 
1400          query hash being identical to KBLOCK/SBLOCK query hash);
1401          nevertheless, if it happens, the correct thing is to
1402          simply skip the result. */
1403       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);        
1404       return;
1405     }
1406   /* check if this is a result we've alredy
1407      received */
1408   for (i=0;i<lgc->results_used;i++)
1409     if (0 == memcmp (key,
1410                      &lgc->results[i],
1411                      sizeof (GNUNET_HashCode)))
1412       {
1413         GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1414         return; 
1415       }
1416   if (lgc->results_used == lgc->results_size)
1417     GNUNET_array_grow (lgc->results,
1418                        lgc->results_size,
1419                        lgc->results_size * 2 + 2);
1420   GNUNET_CRYPTO_hash (data, 
1421                       size, 
1422                       &lgc->results[lgc->results_used++]);    
1423   msize = size + sizeof (struct ContentMessage);
1424   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1425   lgc->result = GNUNET_malloc (msize);
1426   lgc->result->header.size = htons (msize);
1427   lgc->result->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT);
1428   lgc->result->type = htonl (type);
1429   lgc->result->expiration = GNUNET_TIME_absolute_hton (expiration);
1430   memcpy (&lgc->result[1],
1431           data,
1432           size);
1433   GNUNET_SERVER_notify_transmit_ready (lgc->client,
1434                                        msize,
1435                                        GNUNET_TIME_UNIT_FOREVER_REL,
1436                                        &transmit_local_result,
1437                                        lgc);
1438 }
1439
1440
1441 /**
1442  * We're processing a search request from a local
1443  * client.  Now it is our turn to query the datastore.
1444  * 
1445  * @param cls our closure (struct LocalGetContext)
1446  * @param tc unused
1447  */
1448 static void
1449 transmit_local_get (void *cls,
1450                     const struct GNUNET_SCHEDULER_TaskContext *tc)
1451 {
1452   struct LocalGetContext *lgc = cls;
1453   uint32_t type;
1454   
1455   type = lgc->type;
1456   if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK)
1457     type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */
1458   GNUNET_DATASTORE_get (dsh,
1459                         &lgc->query,
1460                         type,
1461                         &process_local_get_result,
1462                         lgc,
1463                         GNUNET_TIME_UNIT_FOREVER_REL);
1464 }
1465
1466
1467 /**
1468  * We're processing a search request from a local
1469  * client.  Now it is our turn to query the datastore.
1470  * 
1471  * @param cls our closure (struct LocalGetContext)
1472  * @param ok did we succeed to queue for datastore access, should always be GNUNET_OK
1473  */
1474 static void 
1475 transmit_local_get_ready (void *cls,
1476                           int ok)
1477 {
1478   struct LocalGetContext *lgc = cls;
1479
1480   GNUNET_assert (GNUNET_OK == ok);
1481   GNUNET_SCHEDULER_add_continuation (sched,
1482                                      GNUNET_NO,
1483                                      &transmit_local_get,
1484                                      lgc,
1485                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
1486 }
1487
1488
1489 /**
1490  * Handle START_SEARCH-message (search request from client).
1491  *
1492  * @param cls closure
1493  * @param client identification of the client
1494  * @param message the actual message
1495  */
1496 static void
1497 handle_start_search (void *cls,
1498                      struct GNUNET_SERVER_Client *client,
1499                      const struct GNUNET_MessageHeader *message)
1500 {
1501   const struct SearchMessage *sm;
1502   struct LocalGetContext *lgc;
1503   uint16_t msize;
1504   unsigned int sc;
1505   
1506   msize = ntohs (message->size);
1507   if ( (msize < sizeof (struct SearchMessage)) ||
1508        (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
1509     {
1510       GNUNET_break (0);
1511       GNUNET_SERVER_receive_done (client,
1512                                   GNUNET_SYSERR);
1513       return;
1514     }
1515   sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
1516   sm = (const struct SearchMessage*) message;
1517   GNUNET_SERVER_client_keep (client);
1518   lgc = GNUNET_malloc (sizeof (struct LocalGetContext));
1519   if  (sc > 0)
1520     {
1521       lgc->results_used = sc;
1522       GNUNET_array_grow (lgc->results,
1523                          lgc->results_size,
1524                          sc * 2);
1525       memcpy (lgc->results,
1526               &sm[1],
1527               sc * sizeof (GNUNET_HashCode));
1528     }
1529   lgc->client = client;
1530   lgc->type = ntohl (sm->type);
1531   lgc->anonymity_level = ntohl (sm->anonymity_level);
1532   lgc->target = sm->target;
1533   lgc->query = sm->query;
1534   GNUNET_CONTAINER_DLL_insert (lgc_head, lgc_tail, lgc);
1535   lgc->req = queue_ds_request (GNUNET_TIME_UNIT_FOREVER_REL,
1536                                &transmit_local_get_ready,
1537                                lgc);
1538 }
1539
1540
1541 /**
1542  * List of handlers for the messages understood by this
1543  * service.
1544  */
1545 static struct GNUNET_SERVER_MessageHandler handlers[] = {
1546   {&handle_index_start, NULL, 
1547    GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
1548   {&handle_index_list_get, NULL, 
1549    GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
1550   {&handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
1551    sizeof (struct UnindexMessage) },
1552   {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
1553    0 },
1554   {NULL, NULL, 0, 0}
1555 };
1556
1557
1558 /**
1559  * Clean up the memory used by the PendingRequest
1560  * structure (except for the client or peer list
1561  * that the request may be part of).
1562  *
1563  * @param pr request to clean up
1564  */
1565 static void
1566 destroy_pending_request (struct PendingRequest *pr)
1567 {
1568   GNUNET_CONTAINER_multihashmap_remove (requests_by_query,
1569                                         &pr->query,
1570                                         pr);
1571   // FIXME: not sure how this can work (efficiently)
1572   // also, what does the return value mean?
1573   GNUNET_CONTAINER_heap_remove_node (requests_by_expiration,
1574                                      pr);
1575   if (NULL != pr->bf)
1576     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
1577   GNUNET_PEER_change_rc (pr->source_pid, -1);
1578   GNUNET_PEER_change_rc (pr->target_pid, -1);
1579   GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
1580   GNUNET_free_non_null (pr->used_pids);
1581   GNUNET_free_non_null (pr->replies_seen);
1582   GNUNET_free_non_null (pr->namespace);
1583   GNUNET_free (pr);
1584 }
1585
1586
1587 /**
1588  * A client disconnected.  Remove all of its pending queries.
1589  *
1590  * @param cls closure, NULL
1591  * @param client identification of the client
1592  */
1593 static void
1594 handle_client_disconnect (void *cls,
1595                           struct GNUNET_SERVER_Client
1596                           * client)
1597 {
1598   struct LocalGetContext *lgc;
1599   struct ClientList *cpos;
1600   struct ClientList *cprev;
1601   struct ClientRequestList *rl;
1602
1603   lgc = lgc_head;
1604   while ( (NULL != lgc) &&
1605           (lgc->client != client) )
1606     lgc = lgc->next;
1607   if (lgc != NULL)
1608     local_get_context_free (lgc);
1609   cprev = NULL;
1610   cpos = clients;
1611   while ( (NULL != cpos) &&
1612           (clients->client != client) )
1613     {
1614       cprev = cpos;
1615       cpos = cpos->next;
1616     }
1617   if (cpos != NULL)
1618     {
1619       if (cprev == NULL)
1620         clients = cpos->next;
1621       else
1622         cprev->next = cpos->next;
1623       while (NULL != (rl = cpos->head))
1624         {
1625           cpos->head = rl->next;
1626           destroy_pending_request (rl->req);
1627           GNUNET_free (rl);
1628         }
1629       GNUNET_free (cpos);
1630     }
1631 }
1632
1633
1634 /**
1635  * Task run during shutdown.
1636  *
1637  * @param cls unused
1638  * @param tc unused
1639  */
1640 static void
1641 shutdown_task (void *cls,
1642                const struct GNUNET_SCHEDULER_TaskContext *tc)
1643 {
1644   struct IndexInfo *pos;  
1645
1646   if (NULL != core)
1647     GNUNET_CORE_disconnect (core);
1648   GNUNET_DATASTORE_disconnect (dsh,
1649                                GNUNET_NO);
1650   dsh = NULL;
1651   // FIXME: iterate over 'request_map' to free entries!
1652   GNUNET_CONTAINER_multihashmap_destroy (request_map);
1653   request_map = NULL;
1654   GNUNET_CONTAINER_multihashmap_destroy (ifm);
1655   ifm = NULL;
1656   while (NULL != (pos = indexed_files))
1657     {
1658       indexed_files = pos->next;
1659       GNUNET_free (pos);
1660     }
1661 }
1662
1663
1664 /**
1665  * Free (each) request made by the peer.
1666  *
1667  * @param cls closure, points to peer that the request belongs to
1668  * @param key current key code
1669  * @param value value in the hash map
1670  * @return GNUNET_YES (we should continue to iterate)
1671  */
1672 static int
1673 destroy_request (void *cls,
1674                  const GNUNET_HashCode * key,
1675                  void *value)
1676 {
1677   const struct GNUNET_PeerIdentity * peer = cls;
1678   struct PendingRequest *pr = value;
1679   
1680   GNUNET_CONTAINER_multihashmap_remove (requests_by_peer,
1681                                         &peer->hashPubKey,
1682                                         pr);
1683   destroy_pending_request (pr);
1684   return GNUNET_YES;
1685 }
1686
1687
1688 /**
1689  * Method called whenever a peer disconnects.
1690  *
1691  * @param cls closure, not used
1692  * @param peer peer identity this notification is about
1693  */
1694 static void
1695 peer_disconnect_handler (void *cls,
1696                          const struct
1697                          GNUNET_PeerIdentity * peer)
1698 {
1699   GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_peer,
1700                                               &peer->hashPubKey,
1701                                               &destroy_request,
1702                                               (void*) peer);
1703 }
1704
1705
1706 /**
1707  * We're processing a GET request from
1708  * another peer and have decided to forward
1709  * it to other peers.
1710  *
1711  * @param cls our "struct ProcessGetContext *"
1712  * @param tc unused
1713  */
1714 static void
1715 forward_get_request (void *cls,
1716                      const struct GNUNET_SCHEDULER_TaskContext *tc)
1717 {
1718   struct ProcessGetContext *pgc = cls;
1719
1720   // FIXME: install entry in
1721   // 'request_map' and do actual
1722   // forwarding...
1723   if (pgc->bf != NULL)
1724     GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
1725   GNUNET_free (pgc); 
1726 }
1727
1728
1729 /**
1730  * Transmit the given message by copying it to
1731  * the target buffer "buf".  "buf" will be
1732  * NULL and "size" zero if the socket was closed for
1733  * writing in the meantime.  In that case, only
1734
1735  * free the message
1736  *
1737  * @param cls closure, pointer to the message
1738  * @param size number of bytes available in buf
1739  * @param buf where the callee should write the message
1740  * @return number of bytes written to buf
1741  */
1742 static size_t
1743 transmit_message (void *cls,
1744                   size_t size, void *buf)
1745 {
1746   struct GNUNET_MessageHeader *msg = cls;
1747   uint16_t msize;
1748   
1749   if (NULL == buf)
1750     {
1751 #if DEBUG_FS
1752       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1753                   "Dropping reply, core too busy.\n");
1754 #endif
1755       GNUNET_free (msg);
1756       return 0;
1757     }
1758   msize = ntohs (msg->size);
1759   GNUNET_assert (size >= msize);
1760   memcpy (buf, msg, msize);
1761   GNUNET_free (msg);
1762   return msize;
1763 }
1764
1765
1766 /**
1767  * Test if the load on this peer is too high
1768  * to even consider processing the query at
1769  * all.
1770  * 
1771  * @return GNUNET_YES if the load is too high, GNUNET_NO otherwise
1772  */
1773 static int
1774 test_load_too_high ()
1775 {
1776   return GNUNET_NO; // FIXME
1777 }
1778
1779
1780 /**
1781  * We're processing (local) results for a search request
1782  * from another peer.  Pass applicable results to the
1783  * peer and if we are done either clean up (operation
1784  * complete) or forward to other peers (more results possible).
1785  *
1786  * @param cls our closure (struct LocalGetContext)
1787  * @param key key for the content
1788  * @param size number of bytes in data
1789  * @param data content stored
1790  * @param type type of the content
1791  * @param priority priority of the content
1792  * @param anonymity anonymity-level for the content
1793  * @param expiration expiration time for the content
1794  * @param uid unique identifier for the datum;
1795  *        maybe 0 if no unique identifier is available
1796  */
1797 static void
1798 process_p2p_get_result (void *cls,
1799                         const GNUNET_HashCode * key,
1800                         uint32_t size,
1801                         const void *data,
1802                         uint32_t type,
1803                         uint32_t priority,
1804                         uint32_t anonymity,
1805                         struct GNUNET_TIME_Absolute
1806                         expiration, 
1807                         uint64_t uid)
1808 {
1809   struct ProcessGetContext *pgc = cls;
1810   GNUNET_HashCode dhash;
1811   GNUNET_HashCode mhash;
1812   struct PutMessage *reply;
1813   
1814   if (NULL == key)
1815     {
1816       /* no more results */
1817       if ( ( (pgc->policy & ROUTING_POLICY_FORWARD) ==  ROUTING_POLICY_FORWARD) &&
1818            ( (0 == pgc->results_found) ||
1819              (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
1820              (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
1821              (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) ) )
1822         {
1823           GNUNET_SCHEDULER_add_continuation (sched,
1824                                              GNUNET_NO,
1825                                              &forward_get_request,
1826                                              pgc,
1827                                              GNUNET_SCHEDULER_REASON_PREREQ_DONE);
1828         }
1829       else
1830         {
1831           if (pgc->bf != NULL)
1832             GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
1833           GNUNET_free (pgc); 
1834         }
1835       next_ds_request ();
1836       return;
1837     }
1838   if (type == GNUNET_DATASTORE_BLOCKTYPE_ONDEMAND)
1839     {
1840       handle_on_demand_block (key, size, data, type, priority, 
1841                               anonymity, expiration, uid,
1842                               &process_p2p_get_result,
1843                               pgc);
1844       return;
1845     }
1846   /* check for duplicates */
1847   GNUNET_CRYPTO_hash (data, size, &dhash);
1848   mingle_hash (&dhash, 
1849                pgc->mingle,
1850                &mhash);
1851   if ( (pgc->bf != NULL) &&
1852        (GNUNET_YES ==
1853         GNUNET_CONTAINER_bloomfilter_test (pgc->bf,
1854                                            &mhash)) )
1855     {      
1856 #if DEBUG_FS
1857       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1858                   "Result from datastore filtered by bloomfilter.\n");
1859 #endif
1860       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1861       return;
1862     }
1863   pgc->results_found++;
1864   if ( (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
1865        (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
1866        (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) )
1867     {
1868       if (pgc->bf == NULL)
1869         pgc->bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
1870                                                      32, 
1871                                                      BLOOMFILTER_K);
1872       GNUNET_CONTAINER_bloomfilter_add (pgc->bf, 
1873                                         &mhash);
1874     }
1875
1876   reply = GNUNET_malloc (sizeof (struct PutMessage) + size);
1877   reply->header.size = htons (sizeof (struct PutMessage) + size);
1878   reply->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
1879   reply->type = htonl (type);
1880   reply->expiration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (expiration));
1881   memcpy (&reply[1], data, size);
1882   GNUNET_CORE_notify_transmit_ready (core,
1883                                      pgc->priority,
1884                                      ACCEPTABLE_REPLY_DELAY,
1885                                      &pgc->reply_to,
1886                                      sizeof (struct PutMessage) + size,
1887                                      &transmit_message,
1888                                      reply);
1889   if ( (GNUNET_YES == test_load_too_high()) ||
1890        (pgc->results_found > 5 + 2 * pgc->priority) )
1891     {
1892       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
1893       pgc->policy &= ~ ROUTING_POLICY_FORWARD;
1894       return;
1895     }
1896   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1897 }
1898   
1899
1900 /**
1901  * We're processing a GET request from
1902  * another peer.  Give it to our local
1903  * datastore.
1904  *
1905  * @param cls our "struct ProcessGetContext"
1906  * @param ok did we get a datastore slice or not?
1907  */
1908 static void
1909 ds_get_request (void *cls, 
1910                 int ok)
1911 {
1912   struct ProcessGetContext *pgc = cls;
1913   uint32_t type;
1914   struct GNUNET_TIME_Relative timeout;
1915
1916   if (GNUNET_OK != ok)
1917     {
1918       /* no point in doing P2P stuff if we can't even do local */
1919       GNUNET_free (dsh);
1920       return;
1921     }
1922   type = pgc->type;
1923   if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK)
1924     type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */
1925   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
1926                                            (pgc->priority + 1));
1927   GNUNET_DATASTORE_get (dsh,
1928                         &pgc->query,
1929                         type,
1930                         &process_p2p_get_result,
1931                         pgc,
1932                         timeout);
1933 }
1934
1935
1936 /**
1937  * The priority level imposes a bound on the maximum
1938  * value for the ttl that can be requested.
1939  *
1940  * @param ttl_in requested ttl
1941  * @param priority given priority
1942  * @return ttl_in if ttl_in is below the limit,
1943  *         otherwise the ttl-limit for the given priority
1944  */
1945 static int32_t
1946 bound_ttl (int32_t ttl_in, uint32_t prio)
1947 {
1948   unsigned long long allowed;
1949
1950   if (ttl_in <= 0)
1951     return ttl_in;
1952   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
1953   if (ttl_in > allowed)      
1954     {
1955       if (allowed >= (1 << 30))
1956         return 1 << 30;
1957       return allowed;
1958     }
1959   return ttl_in;
1960 }
1961
1962
1963 /**
1964  * We've received a request with the specified
1965  * priority.  Bound it according to how much
1966  * we trust the given peer.
1967  * 
1968  * @param prio_in requested priority
1969  * @param peer the peer making the request
1970  * @return effective priority
1971  */
1972 static uint32_t
1973 bound_priority (uint32_t prio_in,
1974                 const struct GNUNET_PeerIdentity *peer)
1975 {
1976   return 0; // FIXME!
1977 }
1978
1979
1980 /**
1981  * Handle P2P "GET" request.
1982  *
1983  * @param cls closure, always NULL
1984  * @param peer the other peer involved (sender or receiver, NULL
1985  *        for loopback messages where we are both sender and receiver)
1986  * @param message the actual message
1987  * @return GNUNET_OK to keep the connection open,
1988  *         GNUNET_SYSERR to close it (signal serious error)
1989  */
1990 static int
1991 handle_p2p_get (void *cls,
1992                 const struct GNUNET_PeerIdentity *other,
1993                 const struct GNUNET_MessageHeader *message)
1994 {
1995   uint16_t msize;
1996   const struct GetMessage *gm;
1997   unsigned int bits;
1998   const GNUNET_HashCode *opt;
1999   struct ProcessGetContext *pgc;
2000   uint32_t bm;
2001   size_t bfsize;
2002   uint32_t ttl_decrement;
2003   double preference;
2004   int net_load_up;
2005   int net_load_down;
2006
2007   msize = ntohs(message->size);
2008   if (msize < sizeof (struct GetMessage))
2009     {
2010       GNUNET_break_op (0);
2011       return GNUNET_SYSERR;
2012     }
2013   gm = (const struct GetMessage*) message;
2014   bm = ntohl (gm->hash_bitmap);
2015   bits = 0;
2016   while (bm > 0)
2017     {
2018       if (1 == (bm & 1))
2019         bits++;
2020       bm >>= 1;
2021     }
2022   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
2023     {
2024       GNUNET_break_op (0);
2025       return GNUNET_SYSERR;
2026     }  
2027   opt = (const GNUNET_HashCode*) &gm[1];
2028   bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
2029   pgc = GNUNET_malloc (sizeof (struct ProcessGetContext));
2030   if (bfsize > 0)
2031     pgc->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &pgc[1],
2032                                                  bfsize,
2033                                                  BLOOMFILTER_K);
2034   pgc->type = ntohl (gm->type);
2035   pgc->bm = ntohl (gm->hash_bitmap);
2036   pgc->mingle = gm->filter_mutator;
2037   bits = 0;
2038   if (0 != (pgc->bm & GET_MESSAGE_BIT_RETURN_TO))
2039     pgc->reply_to.hashPubKey = opt[bits++];
2040   else
2041     pgc->reply_to = *other;
2042   if (0 != (pgc->bm & GET_MESSAGE_BIT_SKS_NAMESPACE))
2043     pgc->namespace = opt[bits++];
2044   else if (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)
2045     {
2046       GNUNET_break_op (0);
2047       GNUNET_free (pgc);
2048       return GNUNET_SYSERR;
2049     }
2050   if (0 != (pgc->bm & GET_MESSAGE_BIT_TRANSMIT_TO))
2051     pgc->prime_target.hashPubKey = opt[bits++];
2052   /* note that we can really only check load here since otherwise
2053      peers could find out that we are overloaded by being disconnected
2054      after sending us a malformed query... */
2055   if (GNUNET_YES == test_load_too_high ())
2056     {
2057       if (NULL != pgc->bf)
2058         GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2059       GNUNET_free (pgc);
2060 #if DEBUG_FS
2061       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2062                   "Dropping query from `%s', this peer is too busy.\n",
2063                   GNUNET_h2s (other));
2064 #endif
2065       return GNUNET_OK;
2066     }
2067   net_load_up = 50; // FIXME
2068   net_load_down = 50; // FIXME
2069   pgc->policy = ROUTING_POLICY_NONE;
2070   if ( (net_load_up < IDLE_LOAD_THRESHOLD) &&
2071        (net_load_down < IDLE_LOAD_THRESHOLD) )
2072     {
2073       pgc->policy |= ROUTING_POLICY_ALL;
2074       pgc->priority = 0; /* no charge */
2075     }
2076   else
2077     {
2078       pgc->priority = bound_priority (ntohl (gm->priority), other);
2079       if ( (net_load_up < 
2080             IDLE_LOAD_THRESHOLD + pgc->priority * pgc->priority) &&
2081            (net_load_down < 
2082             IDLE_LOAD_THRESHOLD + pgc->priority * pgc->priority) )
2083         {
2084           pgc->policy |= ROUTING_POLICY_ALL;
2085         }
2086       else
2087         {
2088           // FIXME: is this sound?
2089           if (net_load_up < 90 + 10 * pgc->priority)
2090             pgc->policy |= ROUTING_POLICY_FORWARD;
2091           if (net_load_down < 90 + 10 * pgc->priority)
2092             pgc->policy |= ROUTING_POLICY_ANSWER;
2093         }
2094     }
2095   if (pgc->policy == ROUTING_POLICY_NONE)
2096     {
2097 #if DEBUG_FS
2098       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2099                   "Dropping query from `%s', network saturated.\n",
2100                   GNUNET_h2s (other));
2101 #endif
2102       if (NULL != pgc->bf)
2103         GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2104       GNUNET_free (pgc);
2105       return GNUNET_OK;     /* drop */
2106     }
2107   if ((pgc->policy & ROUTING_POLICY_INDIRECT) != ROUTING_POLICY_INDIRECT)
2108     pgc->priority = 0;  /* kill the priority (we cannot benefit) */
2109   pgc->ttl = bound_ttl (ntohl (gm->ttl), pgc->priority);
2110   /* decrement ttl (always) */
2111   ttl_decrement = 2 * TTL_DECREMENT +
2112     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2113                               TTL_DECREMENT);
2114   if ( (pgc->ttl < 0) &&
2115        (pgc->ttl - ttl_decrement > 0) )
2116     {
2117 #if DEBUG_FS
2118       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2119                   "Dropping query from `%s' due to TTL underflow.\n",
2120                   GNUNET_h2s (other));
2121 #endif
2122       /* integer underflow => drop (should be very rare)! */
2123       if (NULL != pgc->bf)
2124         GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2125       GNUNET_free (pgc);
2126       return GNUNET_OK;
2127     }
2128   pgc->ttl -= ttl_decrement;
2129   pgc->start_time = GNUNET_TIME_absolute_get ();
2130   preference = (double) pgc->priority;
2131   if (preference < QUERY_BANDWIDTH_VALUE)
2132     preference = QUERY_BANDWIDTH_VALUE;
2133   // FIXME: also reserve bandwidth for reply?
2134   GNUNET_CORE_peer_configure (core,
2135                               other,
2136                               GNUNET_TIME_UNIT_FOREVER_REL,
2137                               0, 0, preference, NULL, NULL);
2138   if (0 != (pgc->policy & ROUTING_POLICY_ANSWER))
2139     pgc->drq = queue_ds_request (BASIC_DATASTORE_REQUEST_DELAY,
2140                                  &ds_get_request,
2141                                  pgc);
2142   else
2143     GNUNET_SCHEDULER_add_continuation (sched,
2144                                        GNUNET_NO,
2145                                        &forward_get_request,
2146                                        pgc,
2147                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
2148   return GNUNET_OK;
2149 }
2150
2151
2152 /**
2153  * Iterator over pending requests.
2154  *
2155  * @param cls response (struct ProcessReplyClosure)
2156  * @param key our query
2157  * @param value value in the hash map (meta-info about the query)
2158  * @return GNUNET_YES (we should continue to iterate)
2159  */
2160 static int
2161 process_reply (void *cls,
2162                const GNUNET_HashCode * key,
2163                void *value)
2164 {
2165   struct ProcessReplyClosure *prq = cls;
2166   struct PendingRequest *pr = value;
2167
2168   fprintf (stderr, "FIXME %p %p\n", prq, pr);
2169   // FIXME: forward reply to client
2170   // or other peers (depending on pr...)
2171   return GNUNET_YES;
2172 }
2173
2174
2175 /**
2176  * Handle P2P "PUT" request.
2177  *
2178  * @param cls closure, always NULL
2179  * @param peer the other peer involved (sender or receiver, NULL
2180  *        for loopback messages where we are both sender and receiver)
2181  * @param message the actual message
2182  * @return GNUNET_OK to keep the connection open,
2183  *         GNUNET_SYSERR to close it (signal serious error)
2184  */
2185 static int
2186 handle_p2p_put (void *cls,
2187                 const struct GNUNET_PeerIdentity *other,
2188                 const struct GNUNET_MessageHeader *message)
2189 {
2190   const struct PutMessage *put;
2191   uint16_t msize;
2192   size_t dsize;
2193   uint32_t type;
2194   struct GNUNET_TIME_Absolute expiration;
2195   GNUNET_HashCode query;
2196   const struct KBlock *kb;
2197   struct ProcessReplyClosure prq;
2198
2199   msize = ntohs (message->size);
2200   if (msize < sizeof (struct PutMessage))
2201     {
2202       GNUNET_break_op(0);
2203       return GNUNET_SYSERR;
2204     }
2205   put = (const struct PutMessage*) message;
2206   dsize = msize - sizeof (struct PutMessage);
2207   type = ntohl (put->type);
2208   expiration = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (put->expiration));
2209
2210   /* first, validate! */
2211   switch (type)
2212     {
2213     case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
2214     case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
2215       GNUNET_CRYPTO_hash (&put[1], dsize, &query);
2216       break;
2217     case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
2218       if (dsize < sizeof (struct KBlock))
2219         {
2220           GNUNET_break_op (0);
2221           return GNUNET_SYSERR;
2222         }
2223       kb = (const struct KBlock*) &put[1];
2224       // FIXME -- validation code below broken...
2225       if ( (dsize != ntohs (kb->purpose.size) + 42) ||
2226            (GNUNET_OK !=
2227             GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_KBLOCK,
2228                                       &kb->purpose,
2229                                       &kb->signature,
2230                                       &kb->keyspace)) )
2231         {
2232           GNUNET_break_op (0);
2233           return GNUNET_SYSERR;
2234         }
2235       GNUNET_CRYPTO_hash (&kb->keyspace,
2236                           sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
2237                           &query);
2238       break;
2239     case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
2240       // FIXME -- validate SBLOCK!
2241       GNUNET_break (0);
2242       return GNUNET_OK;
2243     case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK:
2244       // FIXME -- validate SKBLOCK!
2245       GNUNET_break (0);
2246       return GNUNET_OK;
2247     default:
2248       /* unknown block type */
2249       GNUNET_break_op (0);
2250       return GNUNET_SYSERR;
2251     }
2252
2253   /* now, lookup 'query' */
2254   prq.data = (const void*) &put[1];
2255   prq.size = dsize;
2256   prq.type = type;
2257   prq.expiration = expiration;
2258   prq.priority = 0;
2259   GNUNET_CONTAINER_multihashmap_get_multiple (request_map,
2260                                               &query,
2261                                               &process_reply,
2262                                               &prq);
2263   // FIXME: if migration is on and load is low,
2264   // queue to store data in datastore;
2265   // use "prq.priority" for that!
2266   return GNUNET_OK;
2267 }
2268
2269
2270 /**
2271  * List of handlers for P2P messages
2272  * that we care about.
2273  */
2274 static struct GNUNET_CORE_MessageHandler p2p_handlers[] =
2275   {
2276     { &handle_p2p_get, 
2277       GNUNET_MESSAGE_TYPE_FS_GET, 0 },
2278     { &handle_p2p_put, 
2279       GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
2280     { NULL, 0, 0 }
2281   };
2282
2283
2284 /**
2285  * Task that will try to initiate a connection with the
2286  * core service.
2287  * 
2288  * @param cls unused
2289  * @param tc unused
2290  */
2291 static void
2292 core_connect_task (void *cls,
2293                    const struct GNUNET_SCHEDULER_TaskContext *tc);
2294
2295
2296 /**
2297  * Function called by the core after we've
2298  * connected.
2299  */
2300 static void
2301 core_start_cb (void *cls,
2302                struct GNUNET_CORE_Handle * server,
2303                const struct GNUNET_PeerIdentity *
2304                my_identity,
2305                const struct
2306                GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *
2307                publicKey)
2308 {
2309   if (server == NULL)
2310     {
2311       GNUNET_SCHEDULER_add_delayed (sched,
2312                                     GNUNET_NO,
2313                                     GNUNET_SCHEDULER_PRIORITY_HIGH,
2314                                     GNUNET_SCHEDULER_NO_TASK,
2315                                     GNUNET_TIME_UNIT_SECONDS,
2316                                     &core_connect_task,
2317                                     NULL);
2318       return;
2319     }
2320   core = server;
2321 }
2322
2323
2324 /**
2325  * Task that will try to initiate a connection with the
2326  * core service.
2327  * 
2328  * @param cls unused
2329  * @param tc unused
2330  */
2331 static void
2332 core_connect_task (void *cls,
2333                    const struct GNUNET_SCHEDULER_TaskContext *tc)
2334 {
2335   GNUNET_CORE_connect (sched,
2336                        cfg,
2337                        GNUNET_TIME_UNIT_FOREVER_REL,
2338                        NULL,
2339                        &core_start_cb,
2340                        NULL,
2341                        &peer_disconnect_handler,
2342                        NULL, 
2343                        NULL, GNUNET_NO,
2344                        NULL, GNUNET_NO,
2345                        p2p_handlers);
2346 }
2347
2348
2349 /**
2350  * Process fs requests.
2351  *
2352  * @param cls closure
2353  * @param sched scheduler to use
2354  * @param server the initialized server
2355  * @param cfg configuration to use
2356  */
2357 static void
2358 run (void *cls,
2359      struct GNUNET_SCHEDULER_Handle *s,
2360      struct GNUNET_SERVER_Handle *server,
2361      const struct GNUNET_CONFIGURATION_Handle *c)
2362 {
2363   sched = s;
2364   cfg = c;
2365
2366   ifm = GNUNET_CONTAINER_multihashmap_create (128);
2367   request_map = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
2368   read_index_list ();
2369   dsh = GNUNET_DATASTORE_connect (cfg,
2370                                   sched);
2371   if (NULL == dsh)
2372     {
2373       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2374                   _("Failed to connect to datastore service.\n"));
2375       return;
2376     }
2377   GNUNET_SERVER_disconnect_notify (server, 
2378                                    &handle_client_disconnect,
2379                                    NULL);
2380   GNUNET_SERVER_add_handlers (server, handlers);
2381   core_connect_task (NULL, NULL);
2382   GNUNET_SCHEDULER_add_delayed (sched,
2383                                 GNUNET_YES,
2384                                 GNUNET_SCHEDULER_PRIORITY_IDLE,
2385                                 GNUNET_SCHEDULER_NO_TASK,
2386                                 GNUNET_TIME_UNIT_FOREVER_REL,
2387                                 &shutdown_task,
2388                                 NULL);
2389 }
2390
2391
2392 /**
2393  * The main function for the fs service.
2394  *
2395  * @param argc number of arguments from the command line
2396  * @param argv command line arguments
2397  * @return 0 ok, 1 on error
2398  */
2399 int
2400 main (int argc, char *const *argv)
2401 {
2402   return (GNUNET_OK ==
2403           GNUNET_SERVICE_run (argc,
2404                               argv,
2405                               "fs", &run, NULL, NULL, NULL)) ? 0 : 1;
2406 }
2407
2408 /* end of gnunet-service-fs.c */