types
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs.c
23  * @brief program that provides the file-sharing service
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - validation of KBLOCKS (almost done)
28  * - validation of SBLOCKS
29  * - validation of KSBLOCKS
30  * - actually DO P2P search upon P2P/CS requests (pass appropriate handler to core
31  *   and work through request list there; also update ttl/priority for our client's requests)
32  * - randomly delay processing for improved anonymity (can wait)
33  * - content migration (put in local DS) (can wait)
34  * - check that we decrement PIDs always where necessary (can wait)
35  * - handle some special cases when forwarding replies based on tracked requests (can wait)
36  * - tracking of success correlations for hot-path routing (can wait)
37  * - possible major issue: we may queue "gazillions" of (K|S)Blocks for the
38  *   core to transmit to another peer; need to make sure this is bounded overall...
39  * - various load-based actions (can wait)
40  * - remove on-demand blocks if they keep failing (can wait)
41  */
42 #include "platform.h"
43 #include "gnunet_core_service.h"
44 #include "gnunet_datastore_service.h"
45 #include "gnunet_peer_lib.h"
46 #include "gnunet_protocols.h"
47 #include "gnunet_signatures.h"
48 #include "gnunet_util_lib.h"
49 #include "fs.h"
50
51
52 /**
53  * In-memory information about indexed files (also available
54  * on-disk).
55  */
56 struct IndexInfo
57 {
58   
59   /**
60    * This is a linked list.
61    */
62   struct IndexInfo *next;
63
64   /**
65    * Name of the indexed file.  Memory allocated
66    * at the end of this struct (do not free).
67    */
68   const char *filename;
69
70   /**
71    * Context for transmitting confirmation to client,
72    * NULL if we've done this already.
73    */
74   struct GNUNET_SERVER_TransmitContext *tc;
75   
76   /**
77    * Hash of the contents of the file.
78    */
79   GNUNET_HashCode file_id;
80
81 };
82
83
84 /**
85  * Signature of a function that is called whenever a datastore
86  * request can be processed (or an entry put on the queue times out).
87  *
88  * @param cls closure
89  * @param ok GNUNET_OK if DS is ready, GNUNET_SYSERR on timeout
90  */
91 typedef void (*RequestFunction)(void *cls,
92                                 int ok);
93
94
95 /**
96  * Doubly-linked list of our requests for the datastore.
97  */
98 struct DatastoreRequestQueue
99 {
100
101   /**
102    * This is a doubly-linked list.
103    */
104   struct DatastoreRequestQueue *next;
105
106   /**
107    * This is a doubly-linked list.
108    */
109   struct DatastoreRequestQueue *prev;
110
111   /**
112    * Function to call (will issue the request).
113    */
114   RequestFunction req;
115
116   /**
117    * Closure for req.
118    */
119   void *req_cls;
120
121   /**
122    * When should this request time-out because we don't care anymore?
123    */
124   struct GNUNET_TIME_Absolute timeout;
125     
126   /**
127    * ID of task used for signaling timeout.
128    */
129   GNUNET_SCHEDULER_TaskIdentifier task;
130
131 };
132
133
134 /**
135  * Closure for processing START_SEARCH messages from a client.
136  */
137 struct LocalGetContext
138 {
139
140   /**
141    * This is a doubly-linked list.
142    */
143   struct LocalGetContext *next;
144
145   /**
146    * This is a doubly-linked list.
147    */
148   struct LocalGetContext *prev;
149
150   /**
151    * Client that initiated the search.
152    */
153   struct GNUNET_SERVER_Client *client;
154
155   /**
156    * Array of results that we've already received 
157    * (can be NULL).
158    */
159   GNUNET_HashCode *results; 
160
161   /**
162    * Bloomfilter over all results (for fast query construction);
163    * NULL if we don't have any results.
164    */
165   struct GNUNET_CONTAINER_BloomFilter *results_bf; 
166
167   /**
168    * DS request associated with this operation.
169    */
170   struct DatastoreRequestQueue *req;
171
172   /**
173    * Current result message to transmit to client (or NULL).
174    */
175   struct ContentMessage *result;
176   
177   /**
178    * Type of the content that we're looking for.
179    * 0 for any.
180    */
181   uint32_t type;
182
183   /**
184    * Desired anonymity level.
185    */
186   uint32_t anonymity_level;
187
188   /**
189    * Number of results actually stored in the results array.
190    */
191   unsigned int results_used;
192   
193   /**
194    * Size of the results array in memory.
195    */
196   unsigned int results_size;
197
198   /**
199    * If the request is for a DBLOCK or IBLOCK, this is the identity of
200    * the peer that is known to have a response.  Set to all-zeros if
201    * such a target is not known (note that even if OUR anonymity
202    * level is >0 we may happen to know the responder's identity;
203    * nevertheless, we should probably not use it for a DHT-lookup
204    * or similar blunt actions in order to avoid exposing ourselves).
205    */
206   struct GNUNET_PeerIdentity target;
207
208   /**
209    * If the request is for an SBLOCK, this is the identity of the
210    * pseudonym to which the SBLOCK belongs. 
211    */
212   GNUNET_HashCode namespace;
213
214   /**
215    * Hash of the keyword (aka query) for KBLOCKs; Hash of
216    * the CHK-encoded block for DBLOCKS and IBLOCKS (aka query)
217    * and hash of the identifier XORed with the target for
218    * SBLOCKS (aka query).
219    */
220   GNUNET_HashCode query;
221
222 };
223
224
225 /**
226  * Possible routing policies for an FS-GET request.
227  */
228 enum RoutingPolicy
229   {
230     /**
231      * Simply drop the request.
232      */
233     ROUTING_POLICY_NONE = 0,
234     
235     /**
236      * Answer it if we can from local datastore.
237      */
238     ROUTING_POLICY_ANSWER = 1,
239
240     /**
241      * Forward the request to other peers (if possible).
242      */
243     ROUTING_POLICY_FORWARD = 2,
244
245     /**
246      * Forward to other peers, and ask them to route
247      * the response via ourselves.
248      */
249     ROUTING_POLICY_INDIRECT = 6,
250     
251     /**
252      * Do everything we could possibly do (that would
253      * make sense).
254      */
255     ROUTING_POLICY_ALL = 7
256   };
257
258
259 /**
260  * Internal context we use for our initial processing
261  * of a GET request.
262  */
263 struct ProcessGetContext
264 {
265   /**
266    * The search query (used for datastore lookup).
267    */
268   GNUNET_HashCode query;
269   
270   /**
271    * Which peer we should forward the response to.
272    */
273   struct GNUNET_PeerIdentity reply_to;
274
275   /**
276    * Namespace for the result (only set for SKS requests)
277    */
278   GNUNET_HashCode namespace;
279
280   /**
281    * Peer that we should forward the query to if possible
282    * (since that peer likely has the content).
283    */
284   struct GNUNET_PeerIdentity prime_target;
285
286   /**
287    * When did we receive this request?
288    */
289   struct GNUNET_TIME_Absolute start_time;
290
291   /**
292    * Our entry in the DRQ (non-NULL while we wait for our
293    * turn to interact with the local database).
294    */
295   struct DatastoreRequestQueue *drq;
296
297   /**
298    * Filter used to eliminate duplicate
299    * results.   Can be NULL if we are
300    * not yet filtering any results.
301    */
302   struct GNUNET_CONTAINER_BloomFilter *bf;
303
304   /**
305    * Bitmap describing which of the optional
306    * hash codes / peer identities were given to us.
307    */
308   uint32_t bm;
309
310   /**
311    * Desired block type.
312    */
313   uint32_t type;
314
315   /**
316    * Priority of the request.
317    */
318   uint32_t priority;
319
320   /**
321    * In what ways are we going to process
322    * the request?
323    */
324   enum RoutingPolicy policy;
325
326   /**
327    * Time-to-live for the request (value
328    * we use).
329    */
330   int32_t ttl;
331
332   /**
333    * Number to mingle hashes for bloom-filter
334    * tests with.
335    */
336   int32_t mingle;
337
338   /**
339    * Number of results that were found so far.
340    */
341   unsigned int results_found;
342 };
343
344
345 /**
346  * Information we keep for each pending reply.
347  */
348 struct PendingReply
349 {
350   /**
351    * This is a linked list.
352    */
353   struct PendingReply *next;
354
355   /**
356    * Size of the reply; actual reply message follows
357    * at the end of this struct.
358    */
359   size_t msize;
360
361 };
362
363
364 /**
365  * All requests from a client are 
366  * kept in a doubly-linked list.
367  */
368 struct ClientRequestList;
369
370
371 /**
372  * Information we keep for each pending request.  We should try to
373  * keep this struct as small as possible since its memory consumption
374  * is key to how many requests we can have pending at once.
375  */
376 struct PendingRequest
377 {
378
379   /**
380    * ID of a client making a request, NULL if this entry is for a
381    * peer.
382    */
383   struct GNUNET_SERVER_Client *client;
384
385   /**
386    * If this request was made by a client,
387    * this is our entry in the client request
388    * list; otherwise NULL.
389    */
390   struct ClientRequestList *crl_entry;
391
392   /**
393    * If this is a namespace query, pointer to the hash of the public
394    * key of the namespace; otherwise NULL.
395    */
396   GNUNET_HashCode *namespace;
397
398   /**
399    * Bloomfilter we use to filter out replies that we don't care about
400    * (anymore).  NULL as long as we are interested in all replies.
401    */
402   struct GNUNET_CONTAINER_BloomFilter *bf;
403
404   /**
405    * Replies that we have received but were unable to forward yet
406    * (typically non-null only if we have a pending transmission
407    * request with the client or the respective peer).
408    */
409   struct PendingReply *replies_pending;
410
411   /**
412    * Pending transmission request with the core service for the target
413    * peer (for processing of 'replies_pending').
414    */
415   struct GNUNET_CORE_TransmitHandle *cth;
416
417   /**
418    * Pending transmission request for the target client (for processing of
419    * 'replies_pending').
420    */
421   struct GNUNET_CONNECTION_TransmitHandle *th;
422
423   /**
424    * Hash code of all replies that we have seen so far (only valid
425    * if client is not NULL since we only track replies like this for
426    * our own clients).
427    */
428   GNUNET_HashCode *replies_seen;
429
430   /**
431    * When did we first see this request (form this peer), or, if our
432    * client is initiating, when did we last initiate a search?
433    */
434   struct GNUNET_TIME_Absolute start_time;
435
436   /**
437    * The query that this request is for.
438    */
439   GNUNET_HashCode query;
440
441   /**
442    * (Interned) Peer identifier (only valid if "client" is NULL)
443    * that identifies a peer that gave us this request.
444    */
445   GNUNET_PEER_Id source_pid;
446
447   /**
448    * (Interned) Peer identifier that identifies a preferred target
449    * for requests.
450    */
451   GNUNET_PEER_Id target_pid;
452
453   /**
454    * (Interned) Peer identifiers of peers that have already
455    * received our query for this content.
456    */
457   GNUNET_PEER_Id *used_pids;
458
459   /**
460    * Desired anonymity level; only valid for requests from a local client.
461    */
462   uint32_t anonymity_level;
463
464   /**
465    * How many entries in "used_pids" are actually valid?
466    */
467   unsigned int used_pids_off;
468
469   /**
470    * How long is the "used_pids" array?
471    */
472   unsigned int used_pids_size;
473
474   /**
475    * How many entries in "replies_seen" are actually valid?
476    */
477   unsigned int replies_seen_off;
478
479   /**
480    * How long is the "replies_seen" array?
481    */
482   unsigned int replies_seen_size;
483   
484   /**
485    * Priority with which this request was made.  If one of our clients
486    * made the request, then this is the current priority that we are
487    * using when initiating the request.  This value is used when
488    * we decide to reward other peers with trust for providing a reply.
489    */
490   uint32_t priority;
491
492   /**
493    * Priority points left for us to spend when forwarding this request
494    * to other peers.
495    */
496   uint32_t remaining_priority;
497
498   /**
499    * Number to mingle hashes for bloom-filter
500    * tests with.
501    */
502   int32_t mingle;
503
504   /**
505    * TTL with which we saw this request (or, if we initiated, TTL that
506    * we used for the request).
507    */
508   int32_t ttl;
509   
510   /**
511    * Type of the content that this request is for.
512    */
513   uint32_t type;
514
515 };
516
517
518 /**
519  * All requests from a client are 
520  * kept in a doubly-linked list.
521  */
522 struct ClientRequestList
523 {
524   /**
525    * This is a doubly-linked list.
526    */
527   struct ClientRequestList *next;
528
529   /**
530    * This is a doubly-linked list.
531    */ 
532   struct ClientRequestList *prev;
533
534   /**
535    * A request from this client.
536    */
537   struct PendingRequest *req;
538
539 };
540
541
542 /**
543  * Linked list of all clients that we are 
544  * currently processing requests for.
545  */
546 struct ClientList
547 {
548
549   /**
550    * This is a linked list.
551    */
552   struct ClientList *next;
553
554   /**
555    * What client is this entry for?
556    */
557   struct GNUNET_SERVER_Client* client;
558
559   /**
560    * Head of the DLL of requests from this client.
561    */
562   struct ClientRequestList *head;
563
564   /**
565    * Tail of the DLL of requests from this client.
566    */
567   struct ClientRequestList *tail;
568
569 };
570
571
572 /**
573  * Closure for "process_reply" function.
574  */
575 struct ProcessReplyClosure
576 {
577   /**
578    * The data for the reply.
579    */
580   const void *data;
581
582   /**
583    * When the reply expires.
584    */
585   struct GNUNET_TIME_Absolute expiration;
586
587   /**
588    * Size of data.
589    */
590   size_t size;
591
592   /**
593    * Type of the block.
594    */
595   uint32_t type;
596
597   /**
598    * How much was this reply worth to us?
599    */
600   uint32_t priority;
601 };
602
603
604 /**
605  * Our connection to the datastore.
606  */
607 static struct GNUNET_DATASTORE_Handle *dsh;
608
609 /**
610  * Our scheduler.
611  */
612 static struct GNUNET_SCHEDULER_Handle *sched;
613
614 /**
615  * Our configuration.
616  */
617 const struct GNUNET_CONFIGURATION_Handle *cfg;
618
619 /**
620  * Handle to the core service (NULL until we've connected to it).
621  */
622 struct GNUNET_CORE_Handle *core;
623
624 /**
625  * Head of doubly-linked LGC list.
626  */
627 static struct LocalGetContext *lgc_head;
628
629 /**
630  * Tail of doubly-linked LGC list.
631  */
632 static struct LocalGetContext *lgc_tail;
633
634 /**
635  * Head of request queue for the datastore, sorted by timeout.
636  */
637 static struct DatastoreRequestQueue *drq_head;
638
639 /**
640  * Tail of request queue for the datastore.
641  */
642 static struct DatastoreRequestQueue *drq_tail;
643
644 /**
645  * Linked list of indexed files.
646  */
647 static struct IndexInfo *indexed_files;
648
649 /**
650  * Maps hash over content of indexed files to the respective filename.
651  * The filenames are pointers into the indexed_files linked list and
652  * do not need to be freed.
653  */
654 static struct GNUNET_CONTAINER_MultiHashMap *ifm;
655
656 /**
657  * Map of query hash codes to requests.
658  */
659 static struct GNUNET_CONTAINER_MultiHashMap *requests_by_query;
660
661 /**
662  * Map of peer IDs to requests (for those requests coming
663  * from other peers).
664  */
665 static struct GNUNET_CONTAINER_MultiHashMap *requests_by_peer;
666
667 /**
668  * Linked list of all of our clients and their requests.
669  */
670 static struct ClientList *clients;
671
672 /**
673  * Heap with the request that will expire next at the top.  Contains
674  * pointers of type "struct PendingRequest*"; these will *also* be
675  * aliased from the "requests_by_peer" data structures and the
676  * "requests_by_query" table.  Note that requests from our clients
677  * don't expire and are thus NOT in the "requests_by_expiration"
678  * (or the "requests_by_peer" tables).
679  */
680 static struct GNUNET_CONTAINER_Heap *requests_by_expiration;
681
682 /**
683  * FIXME: set from configuration.
684  */
685 static uint64_t max_pending_requests = 32;
686
687 /**
688  * Write the current index information list to disk.
689  */ 
690 static void
691 write_index_list ()
692 {
693   struct GNUNET_BIO_WriteHandle *wh;
694   char *fn;
695   struct IndexInfo *pos;  
696
697   if (GNUNET_OK !=
698       GNUNET_CONFIGURATION_get_value_filename (cfg,
699                                                "FS",
700                                                "INDEXDB",
701                                                &fn))
702     {
703       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
704                   _("Configuration option `%s' in section `%s' missing.\n"),
705                   "INDEXDB",
706                   "FS");
707       return;
708     }
709   wh = GNUNET_BIO_write_open (fn);
710   if (NULL == wh)
711     {
712       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
713                   _("Could not open `%s'.\n"),
714                   fn);
715       GNUNET_free (fn);
716       return;
717     }
718   pos = indexed_files;
719   while (pos != NULL)
720     {
721       if ( (GNUNET_OK !=
722             GNUNET_BIO_write (wh,
723                               &pos->file_id,
724                               sizeof (GNUNET_HashCode))) ||
725            (GNUNET_OK !=
726             GNUNET_BIO_write_string (wh,
727                                      pos->filename)) )
728         break;
729       pos = pos->next;
730     }
731   if (GNUNET_OK != 
732       GNUNET_BIO_write_close (wh))
733     {
734       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
735                   _("Error writing `%s'.\n"),
736                   fn);
737       GNUNET_free (fn);
738       return;
739     }
740   GNUNET_free (fn);
741 }
742
743
744 /**
745  * Read index information from disk.
746  */
747 static void
748 read_index_list ()
749 {
750   struct GNUNET_BIO_ReadHandle *rh;
751   char *fn;
752   struct IndexInfo *pos;  
753   char *fname;
754   GNUNET_HashCode hc;
755   size_t slen;
756   char *emsg;
757
758   if (GNUNET_OK !=
759       GNUNET_CONFIGURATION_get_value_filename (cfg,
760                                                "FS",
761                                                "INDEXDB",
762                                                &fn))
763     {
764       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
765                   _("Configuration option `%s' in section `%s' missing.\n"),
766                   "INDEXDB",
767                   "FS");
768       return;
769     }
770   rh = GNUNET_BIO_read_open (fn);
771   if (NULL == rh)
772     {
773       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
774                   _("Could not open `%s'.\n"),
775                   fn);
776       GNUNET_free (fn);
777       return;
778     }
779
780   while ( (GNUNET_OK ==
781            GNUNET_BIO_read (rh,
782                             "Hash of indexed file",
783                             &hc,
784                             sizeof (GNUNET_HashCode))) &&
785           (GNUNET_OK ==
786            GNUNET_BIO_read_string (rh, 
787                                    "Name of indexed file",
788                                    &fname,
789                                    1024 * 16)) )
790     {
791       slen = strlen (fname) + 1;
792       pos = GNUNET_malloc (sizeof (struct IndexInfo) + slen);
793       pos->file_id = hc;
794       pos->filename = (const char *) &pos[1];
795       memcpy (&pos[1], fname, slen);
796       if (GNUNET_SYSERR ==
797           GNUNET_CONTAINER_multihashmap_put (ifm,
798                                              &hc,
799                                              (void*) pos->filename,
800                                              GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
801         {
802           GNUNET_free (pos);
803         }
804       else
805         {
806           pos->next = indexed_files;
807           indexed_files = pos;
808         }
809     }
810   if (GNUNET_OK != 
811       GNUNET_BIO_read_close (rh, &emsg))
812     GNUNET_free (emsg);
813   GNUNET_free (fn);
814 }
815
816
817 /**
818  * We've validated the hash of the file we're about to
819  * index.  Signal success to the client and update
820  * our internal data structures.
821  *
822  * @param ii the index info entry for the request
823  */
824 static void
825 signal_index_ok (struct IndexInfo *ii)
826 {
827   if (GNUNET_SYSERR ==
828       GNUNET_CONTAINER_multihashmap_put (ifm,
829                                          &ii->file_id,
830                                          (void*) ii->filename,
831                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
832     {
833       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
834                   _("Index request received for file `%s' is indexed as `%s'.  Permitting anyway.\n"),
835                   ii->filename,
836                   (const char*) GNUNET_CONTAINER_multihashmap_get (ifm,
837                                                                    &ii->file_id));
838       GNUNET_SERVER_transmit_context_append (ii->tc,
839                                              NULL, 0,
840                                              GNUNET_MESSAGE_TYPE_FS_INDEX_START_OK);
841       GNUNET_SERVER_transmit_context_run (ii->tc,
842                                           GNUNET_TIME_UNIT_MINUTES);
843       GNUNET_free (ii);
844       return;
845     }
846   ii->next = indexed_files;
847   indexed_files = ii;
848   write_index_list ();
849   GNUNET_SERVER_transmit_context_append (ii->tc,
850                                          NULL, 0,
851                                          GNUNET_MESSAGE_TYPE_FS_INDEX_START_OK);
852   GNUNET_SERVER_transmit_context_run (ii->tc,
853                                       GNUNET_TIME_UNIT_MINUTES);
854   ii->tc = NULL;
855 }
856
857
858 /**
859  * Function called once the hash computation over an
860  * indexed file has completed.
861  *
862  * @param cls closure, our publishing context
863  * @param res resulting hash, NULL on error
864  */
865 static void 
866 hash_for_index_val (void *cls,
867                     const GNUNET_HashCode *
868                     res)
869 {
870   struct IndexInfo *ii = cls;
871   
872   if ( (res == NULL) ||
873        (0 != memcmp (res,
874                      &ii->file_id,
875                      sizeof(GNUNET_HashCode))) )
876     {
877       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
878                   _("Hash mismatch trying to index file `%s'\n"),
879                   ii->filename);
880       GNUNET_SERVER_transmit_context_append (ii->tc,
881                                              NULL, 0,
882                                              GNUNET_MESSAGE_TYPE_FS_INDEX_START_FAILED);
883       GNUNET_SERVER_transmit_context_run (ii->tc,
884                                           GNUNET_TIME_UNIT_MINUTES);
885       GNUNET_free (ii);
886       return;
887     }
888   signal_index_ok (ii);
889 }
890
891
892 /**
893  * Handle INDEX_START-message.
894  *
895  * @param cls closure
896  * @param client identification of the client
897  * @param message the actual message
898  */
899 static void
900 handle_index_start (void *cls,
901                     struct GNUNET_SERVER_Client *client,
902                     const struct GNUNET_MessageHeader *message)
903 {
904   const struct IndexStartMessage *ism;
905   const char *fn;
906   uint16_t msize;
907   struct IndexInfo *ii;
908   size_t slen;
909   uint32_t dev;
910   uint64_t ino;
911   uint32_t mydev;
912   uint64_t myino;
913
914   msize = ntohs(message->size);
915   if ( (msize <= sizeof (struct IndexStartMessage)) ||
916        ( ((const char *)message)[msize-1] != '\0') )
917     {
918       GNUNET_break (0);
919       GNUNET_SERVER_receive_done (client,
920                                   GNUNET_SYSERR);
921       return;
922     }
923   ism = (const struct IndexStartMessage*) message;
924   fn = (const char*) &ism[1];
925   dev = ntohl (ism->device);
926   ino = GNUNET_ntohll (ism->inode);
927   ism = (const struct IndexStartMessage*) message;
928   slen = strlen (fn) + 1;
929   ii = GNUNET_malloc (sizeof (struct IndexInfo) + slen);
930   ii->filename = (const char*) &ii[1];
931   memcpy (&ii[1], fn, slen);
932   ii->file_id = ism->file_id;  
933   ii->tc = GNUNET_SERVER_transmit_context_create (client);
934   if ( ( (dev != 0) ||
935          (ino != 0) ) &&
936        (GNUNET_OK == GNUNET_DISK_file_get_identifiers (fn,
937                                                        &mydev,
938                                                        &myino)) &&
939        ( (dev == mydev) &&
940          (ino == myino) ) )
941     {      
942       /* fast validation OK! */
943       signal_index_ok (ii);
944       return;
945     }
946   /* slow validation, need to hash full file (again) */
947   GNUNET_CRYPTO_hash_file (sched,
948                            GNUNET_SCHEDULER_PRIORITY_IDLE,
949                            GNUNET_NO,
950                            fn,
951                            HASHING_BLOCKSIZE,
952                            &hash_for_index_val,
953                            ii);
954 }
955
956
957 /**
958  * Handle INDEX_LIST_GET-message.
959  *
960  * @param cls closure
961  * @param client identification of the client
962  * @param message the actual message
963  */
964 static void
965 handle_index_list_get (void *cls,
966                        struct GNUNET_SERVER_Client *client,
967                        const struct GNUNET_MessageHeader *message)
968 {
969   struct GNUNET_SERVER_TransmitContext *tc;
970   struct IndexInfoMessage *iim;
971   char buf[GNUNET_SERVER_MAX_MESSAGE_SIZE];
972   size_t slen;
973   const char *fn;
974   struct GNUNET_MessageHeader *msg;
975   struct IndexInfo *pos;
976
977   tc = GNUNET_SERVER_transmit_context_create (client);
978   iim = (struct IndexInfoMessage*) buf;
979   msg = &iim->header;
980   pos = indexed_files;
981   while (NULL != pos)
982     {
983       iim->reserved = 0;
984       iim->file_id = pos->file_id;
985       fn = pos->filename;
986       slen = strlen (fn) + 1;
987       if (slen + sizeof (struct IndexInfoMessage) > 
988           GNUNET_SERVER_MAX_MESSAGE_SIZE)
989         {
990           GNUNET_break (0);
991           break;
992         }
993       memcpy (&iim[1], fn, slen);
994       GNUNET_SERVER_transmit_context_append
995         (tc,
996          &msg[1],
997          sizeof (struct IndexInfoMessage) 
998          - sizeof (struct GNUNET_MessageHeader) + slen,
999          GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_ENTRY);
1000       pos = pos->next;
1001     }
1002   GNUNET_SERVER_transmit_context_append (tc,
1003                                          NULL, 0,
1004                                          GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_END);
1005   GNUNET_SERVER_transmit_context_run (tc,
1006                                       GNUNET_TIME_UNIT_MINUTES);
1007 }
1008
1009
1010 /**
1011  * Handle UNINDEX-message.
1012  *
1013  * @param cls closure
1014  * @param client identification of the client
1015  * @param message the actual message
1016  */
1017 static void
1018 handle_unindex (void *cls,
1019                 struct GNUNET_SERVER_Client *client,
1020                 const struct GNUNET_MessageHeader *message)
1021 {
1022   const struct UnindexMessage *um;
1023   struct IndexInfo *pos;
1024   struct IndexInfo *prev;
1025   struct IndexInfo *next;
1026   struct GNUNET_SERVER_TransmitContext *tc;
1027   int found;
1028   
1029   um = (const struct UnindexMessage*) message;
1030   found = GNUNET_NO;
1031   prev = NULL;
1032   pos = indexed_files;
1033   while (NULL != pos)
1034     {
1035       next = pos->next;
1036       if (0 == memcmp (&pos->file_id,
1037                        &um->file_id,
1038                        sizeof (GNUNET_HashCode)))
1039         {
1040           if (prev == NULL)
1041             indexed_files = pos->next;
1042           else
1043             prev->next = pos->next;
1044           GNUNET_free (pos);
1045           found = GNUNET_YES;
1046         }
1047       else
1048         {
1049           prev = pos;
1050         }
1051       pos = next;
1052     }
1053   if (GNUNET_YES == found)
1054     write_index_list ();
1055   tc = GNUNET_SERVER_transmit_context_create (client);
1056   GNUNET_SERVER_transmit_context_append (tc,
1057                                          NULL, 0,
1058                                          GNUNET_MESSAGE_TYPE_FS_UNINDEX_OK);
1059   GNUNET_SERVER_transmit_context_run (tc,
1060                                       GNUNET_TIME_UNIT_MINUTES);
1061 }
1062
1063
1064 /**
1065  * Run the next DS request in our
1066  * queue, we're done with the current one.
1067  */
1068 static void
1069 next_ds_request ()
1070 {
1071   struct DatastoreRequestQueue *e;
1072   
1073   while (NULL != (e = drq_head))
1074     {
1075       if (0 != GNUNET_TIME_absolute_get_remaining (e->timeout).value)
1076         break;
1077       if (e->task != GNUNET_SCHEDULER_NO_TASK)
1078         GNUNET_SCHEDULER_cancel (sched, e->task);
1079       GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
1080       e->req (e->req_cls, GNUNET_NO);
1081       GNUNET_free (e);  
1082     }
1083   if (e == NULL)
1084     return;
1085   if (e->task != GNUNET_SCHEDULER_NO_TASK)
1086     GNUNET_SCHEDULER_cancel (sched, e->task);
1087   e->task = GNUNET_SCHEDULER_NO_TASK;
1088   e->req (e->req_cls, GNUNET_YES);
1089   GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
1090   GNUNET_free (e);  
1091 }
1092
1093
1094 /**
1095  * A datastore request had to be timed out. 
1096  *
1097  * @param cls closure (of type "struct DatastoreRequestQueue*")
1098  * @param tc task context, unused
1099  */
1100 static void
1101 timeout_ds_request (void *cls,
1102                     const struct GNUNET_SCHEDULER_TaskContext *tc)
1103 {
1104   struct DatastoreRequestQueue *e = cls;
1105
1106   e->task = GNUNET_SCHEDULER_NO_TASK;
1107   GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
1108   e->req (e->req_cls, GNUNET_NO);
1109   GNUNET_free (e);  
1110 }
1111
1112
1113 /**
1114  * Queue a request for the datastore.
1115  *
1116  * @param deadline by when the request should run
1117  * @param fun function to call once the request can be run
1118  * @param fun_cls closure for fun
1119  */
1120 static struct DatastoreRequestQueue *
1121 queue_ds_request (struct GNUNET_TIME_Relative deadline,
1122                   RequestFunction fun,
1123                   void *fun_cls)
1124 {
1125   struct DatastoreRequestQueue *e;
1126   struct DatastoreRequestQueue *bef;
1127
1128   if (drq_head == NULL)
1129     {
1130       /* no other requests pending, run immediately */
1131       fun (fun_cls, GNUNET_OK);
1132       return NULL;
1133     }
1134   e = GNUNET_malloc (sizeof (struct DatastoreRequestQueue));
1135   e->timeout = GNUNET_TIME_relative_to_absolute (deadline);
1136   e->req = fun;
1137   e->req_cls = fun_cls;
1138   if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
1139     {
1140       /* local request, highest prio, put at head of queue
1141          regardless of deadline */
1142       bef = NULL;
1143     }
1144   else
1145     {
1146       bef = drq_tail;
1147       while ( (NULL != bef) &&
1148               (e->timeout.value < bef->timeout.value) )
1149         bef = bef->prev;
1150     }
1151   GNUNET_CONTAINER_DLL_insert_after (drq_head, drq_tail, bef, e);
1152   if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
1153     return e;
1154   e->task = GNUNET_SCHEDULER_add_delayed (sched,
1155                                           GNUNET_NO,
1156                                           GNUNET_SCHEDULER_PRIORITY_BACKGROUND,
1157                                           GNUNET_SCHEDULER_NO_TASK,
1158                                           deadline,
1159                                           &timeout_ds_request,
1160                                           e);
1161   return e;                                    
1162 }
1163
1164
1165 /**
1166  * Free the state associated with a local get context.
1167  *
1168  * @param lgc the lgc to free
1169  */
1170 static void
1171 local_get_context_free (struct LocalGetContext *lgc) 
1172 {
1173   GNUNET_CONTAINER_DLL_remove (lgc_head, lgc_tail, lgc);
1174   GNUNET_SERVER_client_drop (lgc->client); 
1175   GNUNET_free_non_null (lgc->results);
1176   if (lgc->results_bf != NULL)
1177     GNUNET_CONTAINER_bloomfilter_free (lgc->results_bf);
1178   if (lgc->req != NULL)
1179     {
1180       if (lgc->req->task != GNUNET_SCHEDULER_NO_TASK)
1181         GNUNET_SCHEDULER_cancel (sched, lgc->req->task);
1182       GNUNET_CONTAINER_DLL_remove (lgc_head, lgc_tail, lgc);
1183       GNUNET_free (lgc->req);
1184     }
1185   GNUNET_free (lgc);
1186 }
1187
1188
1189 /**
1190  * We're able to transmit the next (local) result to the client.
1191  * Do it and ask the datastore for more.  Or, on error, tell
1192  * the datastore to stop giving us more.
1193  *
1194  * @param cls our closure (struct LocalGetContext)
1195  * @param max maximum number of bytes we can transmit
1196  * @param buf where to copy our message
1197  * @return number of bytes copied to buf
1198  */
1199 static size_t
1200 transmit_local_result (void *cls,
1201                        size_t max,
1202                        void *buf)
1203 {
1204   struct LocalGetContext *lgc = cls;  
1205   uint16_t msize;
1206
1207   if (NULL == buf)
1208     {
1209       /* error, abort! */
1210       GNUNET_free (lgc->result);
1211       lgc->result = NULL;
1212       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
1213       return 0;
1214     }
1215   msize = ntohs (lgc->result->header.size);
1216   GNUNET_assert (max >= msize);
1217   memcpy (buf, lgc->result, msize);
1218   GNUNET_free (lgc->result);
1219   lgc->result = NULL;
1220   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1221   return msize;
1222 }
1223
1224
1225 /**
1226  * Continuation called from datastore's remove
1227  * function.
1228  *
1229  * @param cls unused
1230  * @param success did the deletion work?
1231  * @param msg error message
1232  */
1233 static void
1234 remove_cont (void *cls,
1235              int success,
1236              const char *msg)
1237 {
1238   if (GNUNET_OK != success)
1239     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1240                 _("Failed to delete bogus block: %s\n"),
1241                 msg);
1242   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1243 }
1244
1245
1246 /**
1247  * Mingle hash with the mingle_number to
1248  * produce different bits.
1249  */
1250 static void
1251 mingle_hash (const GNUNET_HashCode * in,
1252              int32_t mingle_number, 
1253              GNUNET_HashCode * hc)
1254 {
1255   GNUNET_HashCode m;
1256
1257   GNUNET_CRYPTO_hash (&mingle_number, 
1258                       sizeof (int32_t), 
1259                       &m);
1260   GNUNET_CRYPTO_hash_xor (&m, in, hc);
1261 }
1262
1263
1264 /**
1265  * We've received an on-demand encoded block
1266  * from the datastore.  Attempt to do on-demand
1267  * encoding and (if successful), call the 
1268  * continuation with the resulting block.  On
1269  * error, clean up and ask the datastore for
1270  * more results.
1271  *
1272  * @param key key for the content
1273  * @param size number of bytes in data
1274  * @param data content stored
1275  * @param type type of the content
1276  * @param priority priority of the content
1277  * @param anonymity anonymity-level for the content
1278  * @param expiration expiration time for the content
1279  * @param uid unique identifier for the datum;
1280  *        maybe 0 if no unique identifier is available
1281  * @param cont function to call with the actual block
1282  * @param cont_cls closure for cont
1283  */
1284 static void
1285 handle_on_demand_block (const GNUNET_HashCode * key,
1286                         uint32_t size,
1287                         const void *data,
1288                         uint32_t type,
1289                         uint32_t priority,
1290                         uint32_t anonymity,
1291                         struct GNUNET_TIME_Absolute
1292                         expiration, uint64_t uid,
1293                         GNUNET_DATASTORE_Iterator cont,
1294                         void *cont_cls)
1295 {
1296   const struct OnDemandBlock *odb;
1297   GNUNET_HashCode nkey;
1298   struct GNUNET_CRYPTO_AesSessionKey skey;
1299   struct GNUNET_CRYPTO_AesInitializationVector iv;
1300   GNUNET_HashCode query;
1301   ssize_t nsize;
1302   char ndata[DBLOCK_SIZE];
1303   char edata[DBLOCK_SIZE];
1304   const char *fn;
1305   struct GNUNET_DISK_FileHandle *fh;
1306   uint64_t off;
1307
1308   if (size != sizeof (struct OnDemandBlock))
1309     {
1310       GNUNET_break (0);
1311       GNUNET_DATASTORE_remove (dsh, 
1312                                key,
1313                                size,
1314                                data,
1315                                &remove_cont,
1316                                NULL,
1317                                GNUNET_TIME_UNIT_FOREVER_REL);     
1318       return;
1319     }
1320   odb = (const struct OnDemandBlock*) data;
1321   off = GNUNET_ntohll (odb->offset);
1322   fn = (const char*) GNUNET_CONTAINER_multihashmap_get (ifm,
1323                                                         &odb->file_id);
1324   fh = NULL;
1325   if ( (NULL == fn) ||
1326        (NULL == (fh = GNUNET_DISK_file_open (fn, 
1327                                              GNUNET_DISK_OPEN_READ))) ||
1328        (off !=
1329         GNUNET_DISK_file_seek (fh,
1330                                off,
1331                                GNUNET_DISK_SEEK_SET)) ||
1332        (-1 ==
1333         (nsize = GNUNET_DISK_file_read (fh,
1334                                         ndata,
1335                                         sizeof (ndata)))) )
1336     {
1337       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1338                   _("Could not access indexed file `%s' at offset %llu: %s\n"),
1339                   GNUNET_h2s (&odb->file_id),
1340                   (unsigned long long) off,
1341                   STRERROR (errno));
1342       if (fh != NULL)
1343         GNUNET_DISK_file_close (fh);
1344       /* FIXME: if this happens often, we need
1345          to remove the OnDemand block from the DS! */
1346       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);        
1347       return;
1348     }
1349   GNUNET_DISK_file_close (fh);
1350   GNUNET_CRYPTO_hash (ndata,
1351                       nsize,
1352                       &nkey);
1353   GNUNET_CRYPTO_hash_to_aes_key (&nkey, &skey, &iv);
1354   GNUNET_CRYPTO_aes_encrypt (ndata,
1355                              nsize,
1356                              &skey,
1357                              &iv,
1358                              edata);
1359   GNUNET_CRYPTO_hash (edata,
1360                       nsize,
1361                       &query);
1362   if (0 != memcmp (&query, 
1363                    key,
1364                    sizeof (GNUNET_HashCode)))
1365     {
1366       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1367                   _("Indexed file `%s' changed at offset %llu\n"),
1368                   fn,
1369                   (unsigned long long) off);
1370       /* FIXME: if this happens often, we need
1371          to remove the OnDemand block from the DS! */
1372       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1373       return;
1374     }
1375   cont (cont_cls,
1376         key,
1377         nsize,
1378         edata,
1379         GNUNET_DATASTORE_BLOCKTYPE_DBLOCK,
1380         priority,
1381         anonymity,
1382         expiration,
1383         uid);
1384 }
1385
1386
1387 /**
1388  * How many bytes should a bloomfilter be if we have already seen
1389  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
1390  * of bits set per entry.  Furthermore, we should not re-size the
1391  * filter too often (to keep it cheap).
1392  *
1393  * Since other peers will also add entries but not resize the filter,
1394  * we should generally pick a slightly larger size than what the
1395  * strict math would suggest.
1396  *
1397  * @return must be a power of two and smaller or equal to 2^15.
1398  */
1399 static unsigned int
1400 compute_bloomfilter_size (unsigned int entry_count)
1401 {
1402   unsigned int size;
1403   unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
1404   uint16_t max = 1 << 15;
1405
1406   if (entry_count > max)
1407     return max;
1408   size = 8;
1409   while ((size < max) && (size < ideal))
1410     size *= 2;
1411   if (size > max)
1412     return max;
1413   return size;
1414 }
1415
1416
1417 /**
1418  * Recalculate our bloom filter for filtering replies.
1419  *
1420  * @param count number of entries we are filtering right now
1421  * @param mingle set to our new mingling value
1422  * @param entries the entries to filter
1423  * @return updated bloomfilter, NULL for none
1424  */
1425 static struct GNUNET_CONTAINER_BloomFilter *
1426 refresh_bloomfilter (unsigned int count,
1427                      int32_t * mingle,
1428                      const GNUNET_HashCode *entries)
1429 {
1430   struct GNUNET_CONTAINER_BloomFilter *bf;
1431   unsigned int nsize;
1432   unsigned int i;
1433   GNUNET_HashCode mhash;
1434
1435   if (0 == count)
1436     return NULL;
1437   nsize = compute_bloomfilter_size (count);
1438   *mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
1439   bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
1440                                           nsize,
1441                                           BLOOMFILTER_K);
1442   for (i=0;i<count;i++)
1443     {
1444       mingle_hash (&entries[i], *mingle, &mhash);
1445       GNUNET_CONTAINER_bloomfilter_add (bf, &mhash);
1446     }
1447   return bf;
1448 }
1449
1450
1451 /**
1452  * We're processing (local) results for a search request
1453  * from a (local) client.  Pass applicable results to the
1454  * client and if we are done either clean up (operation
1455  * complete) or switch to P2P search (more results possible).
1456  *
1457  * @param cls our closure (struct LocalGetContext)
1458  * @param key key for the content
1459  * @param size number of bytes in data
1460  * @param data content stored
1461  * @param type type of the content
1462  * @param priority priority of the content
1463  * @param anonymity anonymity-level for the content
1464  * @param expiration expiration time for the content
1465  * @param uid unique identifier for the datum;
1466  *        maybe 0 if no unique identifier is available
1467  */
1468 static void
1469 process_local_get_result (void *cls,
1470                           const GNUNET_HashCode * key,
1471                           uint32_t size,
1472                           const void *data,
1473                           uint32_t type,
1474                           uint32_t priority,
1475                           uint32_t anonymity,
1476                           struct GNUNET_TIME_Absolute
1477                           expiration, 
1478                           uint64_t uid)
1479 {
1480   struct LocalGetContext *lgc = cls;
1481   struct PendingRequest *pr;
1482   struct ClientRequestList *crl;
1483   struct ClientList *cl;
1484   size_t msize;
1485   unsigned int i;
1486
1487   if (key == NULL)
1488     {
1489       /* no further results from datastore; continue
1490          processing further requests from the client and
1491          allow the next task to use the datastore; also,
1492          switch to P2P requests or clean up our state. */
1493       next_ds_request ();
1494       GNUNET_SERVER_receive_done (lgc->client,
1495                                   GNUNET_OK);
1496       if ( (lgc->results_used == 0) ||
1497            (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
1498            (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
1499            (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) )
1500         {
1501           cl = clients;
1502           while ( (NULL != cl) &&
1503                   (cl->client != lgc->client) )
1504             cl = cl->next;
1505           if (cl == NULL)
1506             {
1507               cl = GNUNET_malloc (sizeof (struct ClientList));
1508               cl->client = lgc->client;
1509               cl->next = clients;
1510               clients = cl;
1511             }
1512           crl = GNUNET_malloc (sizeof (struct ClientRequestList));
1513           GNUNET_CONTAINER_DLL_insert (cl->head, cl->tail, crl);
1514           pr = GNUNET_malloc (sizeof (struct PendingRequest));
1515           pr->client = lgc->client;
1516           GNUNET_SERVER_client_keep (pr->client);
1517           pr->crl_entry = crl;
1518           crl->req = pr;
1519           if (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)
1520             {
1521               pr->namespace = GNUNET_malloc (sizeof (GNUNET_HashCode));
1522               *pr->namespace = lgc->namespace;
1523             }
1524           pr->replies_seen = lgc->results;
1525           lgc->results = NULL;
1526           pr->start_time = GNUNET_TIME_absolute_get ();
1527           pr->query = lgc->query;
1528           pr->target_pid = GNUNET_PEER_intern (&lgc->target);
1529           pr->replies_seen_off = lgc->results_used;
1530           pr->replies_seen_size = lgc->results_size;
1531           lgc->results_size = 0;
1532           pr->type = lgc->type;
1533           pr->anonymity_level = lgc->anonymity_level;
1534           pr->bf = refresh_bloomfilter (pr->replies_seen_off,
1535                                         &pr->mingle,
1536                                         pr->replies_seen);
1537           GNUNET_CONTAINER_multihashmap_put (requests_by_query,
1538                                              &pr->query,
1539                                              pr,
1540                                              GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1541           
1542           // FIXME: trigger some processing NOW!
1543           local_get_context_free (lgc);
1544           return;
1545         }
1546       /* got all possible results, clean up! */
1547       local_get_context_free (lgc);
1548       return;
1549     }
1550   if (type == GNUNET_DATASTORE_BLOCKTYPE_ONDEMAND)
1551     {
1552       handle_on_demand_block (key, size, data, type, priority, 
1553                               anonymity, expiration, uid,
1554                               &process_local_get_result,
1555                               lgc);
1556       return;
1557     }
1558   if (type != lgc->type)
1559     {
1560       /* this should be virtually impossible to reach (DBLOCK 
1561          query hash being identical to KBLOCK/SBLOCK query hash);
1562          nevertheless, if it happens, the correct thing is to
1563          simply skip the result. */
1564       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);        
1565       return;
1566     }
1567   /* check if this is a result we've alredy
1568      received */
1569   for (i=0;i<lgc->results_used;i++)
1570     if (0 == memcmp (key,
1571                      &lgc->results[i],
1572                      sizeof (GNUNET_HashCode)))
1573       {
1574         GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1575         return; 
1576       }
1577   if (lgc->results_used == lgc->results_size)
1578     GNUNET_array_grow (lgc->results,
1579                        lgc->results_size,
1580                        lgc->results_size * 2 + 2);
1581   GNUNET_CRYPTO_hash (data, 
1582                       size, 
1583                       &lgc->results[lgc->results_used++]);    
1584   msize = size + sizeof (struct ContentMessage);
1585   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1586   lgc->result = GNUNET_malloc (msize);
1587   lgc->result->header.size = htons (msize);
1588   lgc->result->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT);
1589   lgc->result->type = htonl (type);
1590   lgc->result->expiration = GNUNET_TIME_absolute_hton (expiration);
1591   memcpy (&lgc->result[1],
1592           data,
1593           size);
1594   GNUNET_SERVER_notify_transmit_ready (lgc->client,
1595                                        msize,
1596                                        GNUNET_TIME_UNIT_FOREVER_REL,
1597                                        &transmit_local_result,
1598                                        lgc);
1599 }
1600
1601
1602 /**
1603  * We're processing a search request from a local
1604  * client.  Now it is our turn to query the datastore.
1605  * 
1606  * @param cls our closure (struct LocalGetContext)
1607  * @param tc unused
1608  */
1609 static void
1610 transmit_local_get (void *cls,
1611                     const struct GNUNET_SCHEDULER_TaskContext *tc)
1612 {
1613   struct LocalGetContext *lgc = cls;
1614   uint32_t type;
1615   
1616   type = lgc->type;
1617   if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK)
1618     type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */
1619   GNUNET_DATASTORE_get (dsh,
1620                         &lgc->query,
1621                         type,
1622                         &process_local_get_result,
1623                         lgc,
1624                         GNUNET_TIME_UNIT_FOREVER_REL);
1625 }
1626
1627
1628 /**
1629  * We're processing a search request from a local
1630  * client.  Now it is our turn to query the datastore.
1631  * 
1632  * @param cls our closure (struct LocalGetContext)
1633  * @param ok did we succeed to queue for datastore access, should always be GNUNET_OK
1634  */
1635 static void 
1636 transmit_local_get_ready (void *cls,
1637                           int ok)
1638 {
1639   struct LocalGetContext *lgc = cls;
1640
1641   GNUNET_assert (GNUNET_OK == ok);
1642   GNUNET_SCHEDULER_add_continuation (sched,
1643                                      GNUNET_NO,
1644                                      &transmit_local_get,
1645                                      lgc,
1646                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
1647 }
1648
1649
1650 /**
1651  * Handle START_SEARCH-message (search request from client).
1652  *
1653  * @param cls closure
1654  * @param client identification of the client
1655  * @param message the actual message
1656  */
1657 static void
1658 handle_start_search (void *cls,
1659                      struct GNUNET_SERVER_Client *client,
1660                      const struct GNUNET_MessageHeader *message)
1661 {
1662   const struct SearchMessage *sm;
1663   struct LocalGetContext *lgc;
1664   uint16_t msize;
1665   unsigned int sc;
1666   
1667   msize = ntohs (message->size);
1668   if ( (msize < sizeof (struct SearchMessage)) ||
1669        (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
1670     {
1671       GNUNET_break (0);
1672       GNUNET_SERVER_receive_done (client,
1673                                   GNUNET_SYSERR);
1674       return;
1675     }
1676   sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
1677   sm = (const struct SearchMessage*) message;
1678   GNUNET_SERVER_client_keep (client);
1679   lgc = GNUNET_malloc (sizeof (struct LocalGetContext));
1680   if  (sc > 0)
1681     {
1682       lgc->results_used = sc;
1683       GNUNET_array_grow (lgc->results,
1684                          lgc->results_size,
1685                          sc * 2);
1686       memcpy (lgc->results,
1687               &sm[1],
1688               sc * sizeof (GNUNET_HashCode));
1689     }
1690   lgc->client = client;
1691   lgc->type = ntohl (sm->type);
1692   lgc->anonymity_level = ntohl (sm->anonymity_level);
1693   switch (lgc->type)
1694     {
1695     case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
1696     case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
1697       lgc->target.hashPubKey = sm->target;
1698       break;
1699     case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
1700       lgc->namespace = sm->target;
1701       break;
1702     default:
1703       break;
1704     }
1705   lgc->query = sm->query;
1706   GNUNET_CONTAINER_DLL_insert (lgc_head, lgc_tail, lgc);
1707   lgc->req = queue_ds_request (GNUNET_TIME_UNIT_FOREVER_REL,
1708                                &transmit_local_get_ready,
1709                                lgc);
1710 }
1711
1712
1713 /**
1714  * List of handlers for the messages understood by this
1715  * service.
1716  */
1717 static struct GNUNET_SERVER_MessageHandler handlers[] = {
1718   {&handle_index_start, NULL, 
1719    GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
1720   {&handle_index_list_get, NULL, 
1721    GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
1722   {&handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
1723    sizeof (struct UnindexMessage) },
1724   {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
1725    0 },
1726   {NULL, NULL, 0, 0}
1727 };
1728
1729
1730 /**
1731  * Clean up the memory used by the PendingRequest
1732  * structure (except for the client or peer list
1733  * that the request may be part of).
1734  *
1735  * @param pr request to clean up
1736  */
1737 static void
1738 destroy_pending_request (struct PendingRequest *pr)
1739 {
1740   struct PendingReply *reply;
1741
1742   GNUNET_CONTAINER_multihashmap_remove (requests_by_query,
1743                                         &pr->query,
1744                                         pr);
1745   // FIXME: not sure how this can work (efficiently)
1746   // also, what does the return value mean?
1747   if (pr->client != NULL)
1748     GNUNET_CONTAINER_heap_remove_node (requests_by_expiration,
1749                                        pr);
1750   if (NULL != pr->bf)
1751     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
1752   if (NULL != pr->cth)
1753     GNUNET_CORE_notify_transmit_ready_cancel (pr->cth);
1754   if (NULL != pr->th)
1755     GNUNET_CONNECTION_notify_transmit_ready_cancel (pr->th);
1756   while (NULL != (reply = pr->replies_pending))
1757     {
1758       pr->replies_pending = reply->next;
1759       GNUNET_free (reply);
1760     }
1761   GNUNET_PEER_change_rc (pr->source_pid, -1);
1762   GNUNET_PEER_change_rc (pr->target_pid, -1);
1763   GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
1764   GNUNET_free_non_null (pr->used_pids);
1765   GNUNET_free_non_null (pr->replies_seen);
1766   GNUNET_free_non_null (pr->namespace);
1767   GNUNET_free (pr);
1768 }
1769
1770
1771 /**
1772  * A client disconnected.  Remove all of its pending queries.
1773  *
1774  * @param cls closure, NULL
1775  * @param client identification of the client
1776  */
1777 static void
1778 handle_client_disconnect (void *cls,
1779                           struct GNUNET_SERVER_Client
1780                           * client)
1781 {
1782   struct LocalGetContext *lgc;
1783   struct ClientList *cpos;
1784   struct ClientList *cprev;
1785   struct ClientRequestList *rl;
1786
1787   lgc = lgc_head;
1788   while ( (NULL != lgc) &&
1789           (lgc->client != client) )
1790     lgc = lgc->next;
1791   if (lgc != NULL)
1792     local_get_context_free (lgc);
1793   cprev = NULL;
1794   cpos = clients;
1795   while ( (NULL != cpos) &&
1796           (clients->client != client) )
1797     {
1798       cprev = cpos;
1799       cpos = cpos->next;
1800     }
1801   if (cpos != NULL)
1802     {
1803       if (cprev == NULL)
1804         clients = cpos->next;
1805       else
1806         cprev->next = cpos->next;
1807       while (NULL != (rl = cpos->head))
1808         {
1809           cpos->head = rl->next;
1810           destroy_pending_request (rl->req);
1811           GNUNET_free (rl);
1812         }
1813       GNUNET_free (cpos);
1814     }
1815 }
1816
1817
1818 /**
1819  * Task run during shutdown.
1820  *
1821  * @param cls unused
1822  * @param tc unused
1823  */
1824 static void
1825 shutdown_task (void *cls,
1826                const struct GNUNET_SCHEDULER_TaskContext *tc)
1827 {
1828   struct IndexInfo *pos;  
1829
1830   if (NULL != core)
1831     GNUNET_CORE_disconnect (core);
1832   GNUNET_DATASTORE_disconnect (dsh,
1833                                GNUNET_NO);
1834   dsh = NULL;
1835   // FIXME: iterate over maps to free entries!
1836   GNUNET_CONTAINER_multihashmap_destroy (requests_by_query);
1837   requests_by_query = NULL;
1838   GNUNET_CONTAINER_multihashmap_destroy (requests_by_peer);
1839   requests_by_peer = NULL;
1840   GNUNET_CONTAINER_heap_destroy (requests_by_expiration);
1841   requests_by_expiration = NULL;
1842   GNUNET_CONTAINER_multihashmap_destroy (ifm);
1843   ifm = NULL;
1844   while (NULL != (pos = indexed_files))
1845     {
1846       indexed_files = pos->next;
1847       GNUNET_free (pos);
1848     }
1849 }
1850
1851
1852 /**
1853  * Free (each) request made by the peer.
1854  *
1855  * @param cls closure, points to peer that the request belongs to
1856  * @param key current key code
1857  * @param value value in the hash map
1858  * @return GNUNET_YES (we should continue to iterate)
1859  */
1860 static int
1861 destroy_request (void *cls,
1862                  const GNUNET_HashCode * key,
1863                  void *value)
1864 {
1865   const struct GNUNET_PeerIdentity * peer = cls;
1866   struct PendingRequest *pr = value;
1867   
1868   GNUNET_CONTAINER_multihashmap_remove (requests_by_peer,
1869                                         &peer->hashPubKey,
1870                                         pr);
1871   destroy_pending_request (pr);
1872   return GNUNET_YES;
1873 }
1874
1875
1876 /**
1877  * Method called whenever a peer disconnects.
1878  *
1879  * @param cls closure, not used
1880  * @param peer peer identity this notification is about
1881  */
1882 static void
1883 peer_disconnect_handler (void *cls,
1884                          const struct
1885                          GNUNET_PeerIdentity * peer)
1886 {
1887   GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_peer,
1888                                               &peer->hashPubKey,
1889                                               &destroy_request,
1890                                               (void*) peer);
1891 }
1892
1893
1894 /**
1895  * We're processing a GET request from
1896  * another peer and have decided to forward
1897  * it to other peers.
1898  *
1899  * @param cls our "struct ProcessGetContext *"
1900  * @param tc unused
1901  */
1902 static void
1903 forward_get_request (void *cls,
1904                      const struct GNUNET_SCHEDULER_TaskContext *tc)
1905 {
1906   struct ProcessGetContext *pgc = cls;
1907   struct PendingRequest *pr;
1908   struct PendingRequest *eer;
1909   struct GNUNET_PeerIdentity target;
1910
1911   pr = GNUNET_malloc (sizeof (struct PendingRequest));
1912   if (GET_MESSAGE_BIT_SKS_NAMESPACE == (GET_MESSAGE_BIT_SKS_NAMESPACE & pgc->bm))
1913     {
1914       pr->namespace = GNUNET_malloc (sizeof(GNUNET_HashCode));
1915       *pr->namespace = pgc->namespace;
1916     }
1917   pr->bf = pgc->bf;
1918   pgc->bf = NULL;
1919   pr->start_time = pgc->start_time;
1920   pr->query = pgc->query;
1921   pr->source_pid = GNUNET_PEER_intern (&pgc->reply_to);
1922   if (GET_MESSAGE_BIT_TRANSMIT_TO == (GET_MESSAGE_BIT_TRANSMIT_TO & pgc->bm))
1923     pr->target_pid = GNUNET_PEER_intern (&pgc->prime_target);
1924   pr->anonymity_level = 1; /* default */
1925   pr->priority = pgc->priority;
1926   pr->remaining_priority = pr->priority;
1927   pr->mingle = pgc->mingle;
1928   pr->ttl = pgc->ttl; 
1929   pr->type = pgc->type;
1930   GNUNET_CONTAINER_multihashmap_put (requests_by_query,
1931                                      &pr->query,
1932                                      pr,
1933                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1934   GNUNET_CONTAINER_multihashmap_put (requests_by_peer,
1935                                      &pgc->reply_to.hashPubKey,
1936                                      pr,
1937                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1938   GNUNET_CONTAINER_heap_insert (requests_by_expiration,
1939                                 pr,
1940                                 pr->start_time.value + pr->ttl);
1941   if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) > max_pending_requests)
1942     {
1943       /* expire oldest request! */
1944       eer = GNUNET_CONTAINER_heap_peek (requests_by_expiration);
1945       GNUNET_PEER_resolve (eer->source_pid,
1946                            &target);    
1947       GNUNET_CONTAINER_multihashmap_remove (requests_by_peer,
1948                                             &target.hashPubKey,
1949                                             eer);
1950       destroy_pending_request (eer);     
1951     }
1952   // FIXME: trigger actual forwarding NOW!
1953   GNUNET_free (pgc); 
1954 }
1955
1956
1957 /**
1958  * Transmit the given message by copying it to
1959  * the target buffer "buf".  "buf" will be
1960  * NULL and "size" zero if the socket was closed for
1961  * writing in the meantime.  In that case, only
1962
1963  * free the message
1964  *
1965  * @param cls closure, pointer to the message
1966  * @param size number of bytes available in buf
1967  * @param buf where the callee should write the message
1968  * @return number of bytes written to buf
1969  */
1970 static size_t
1971 transmit_message (void *cls,
1972                   size_t size, void *buf)
1973 {
1974   struct GNUNET_MessageHeader *msg = cls;
1975   uint16_t msize;
1976   
1977   if (NULL == buf)
1978     {
1979 #if DEBUG_FS
1980       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1981                   "Dropping reply, core too busy.\n");
1982 #endif
1983       GNUNET_free (msg);
1984       return 0;
1985     }
1986   msize = ntohs (msg->size);
1987   GNUNET_assert (size >= msize);
1988   memcpy (buf, msg, msize);
1989   GNUNET_free (msg);
1990   return msize;
1991 }
1992
1993
1994 /**
1995  * Test if the load on this peer is too high
1996  * to even consider processing the query at
1997  * all.
1998  * 
1999  * @return GNUNET_YES if the load is too high, GNUNET_NO otherwise
2000  */
2001 static int
2002 test_load_too_high ()
2003 {
2004   return GNUNET_NO; // FIXME
2005 }
2006
2007
2008 /**
2009  * We're processing (local) results for a search request
2010  * from another peer.  Pass applicable results to the
2011  * peer and if we are done either clean up (operation
2012  * complete) or forward to other peers (more results possible).
2013  *
2014  * @param cls our closure (struct LocalGetContext)
2015  * @param key key for the content
2016  * @param size number of bytes in data
2017  * @param data content stored
2018  * @param type type of the content
2019  * @param priority priority of the content
2020  * @param anonymity anonymity-level for the content
2021  * @param expiration expiration time for the content
2022  * @param uid unique identifier for the datum;
2023  *        maybe 0 if no unique identifier is available
2024  */
2025 static void
2026 process_p2p_get_result (void *cls,
2027                         const GNUNET_HashCode * key,
2028                         uint32_t size,
2029                         const void *data,
2030                         uint32_t type,
2031                         uint32_t priority,
2032                         uint32_t anonymity,
2033                         struct GNUNET_TIME_Absolute
2034                         expiration, 
2035                         uint64_t uid)
2036 {
2037   struct ProcessGetContext *pgc = cls;
2038   GNUNET_HashCode dhash;
2039   GNUNET_HashCode mhash;
2040   struct PutMessage *reply;
2041   
2042   if (NULL == key)
2043     {
2044       /* no more results */
2045       if ( ( (pgc->policy & ROUTING_POLICY_FORWARD) ==  ROUTING_POLICY_FORWARD) &&
2046            ( (0 == pgc->results_found) ||
2047              (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
2048              (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
2049              (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) ) )
2050         {
2051           GNUNET_SCHEDULER_add_continuation (sched,
2052                                              GNUNET_NO,
2053                                              &forward_get_request,
2054                                              pgc,
2055                                              GNUNET_SCHEDULER_REASON_PREREQ_DONE);
2056         }
2057       else
2058         {
2059           if (pgc->bf != NULL)
2060             GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2061           GNUNET_free (pgc); 
2062         }
2063       next_ds_request ();
2064       return;
2065     }
2066   if (type == GNUNET_DATASTORE_BLOCKTYPE_ONDEMAND)
2067     {
2068       handle_on_demand_block (key, size, data, type, priority, 
2069                               anonymity, expiration, uid,
2070                               &process_p2p_get_result,
2071                               pgc);
2072       return;
2073     }
2074   /* check for duplicates */
2075   GNUNET_CRYPTO_hash (data, size, &dhash);
2076   mingle_hash (&dhash, 
2077                pgc->mingle,
2078                &mhash);
2079   if ( (pgc->bf != NULL) &&
2080        (GNUNET_YES ==
2081         GNUNET_CONTAINER_bloomfilter_test (pgc->bf,
2082                                            &mhash)) )
2083     {      
2084 #if DEBUG_FS
2085       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2086                   "Result from datastore filtered by bloomfilter.\n");
2087 #endif
2088       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2089       return;
2090     }
2091   pgc->results_found++;
2092   if ( (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
2093        (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
2094        (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) )
2095     {
2096       if (pgc->bf == NULL)
2097         pgc->bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
2098                                                      32, 
2099                                                      BLOOMFILTER_K);
2100       GNUNET_CONTAINER_bloomfilter_add (pgc->bf, 
2101                                         &mhash);
2102     }
2103
2104   reply = GNUNET_malloc (sizeof (struct PutMessage) + size);
2105   reply->header.size = htons (sizeof (struct PutMessage) + size);
2106   reply->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2107   reply->type = htonl (type);
2108   reply->expiration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (expiration));
2109   memcpy (&reply[1], data, size);
2110   GNUNET_CORE_notify_transmit_ready (core,
2111                                      pgc->priority,
2112                                      ACCEPTABLE_REPLY_DELAY,
2113                                      &pgc->reply_to,
2114                                      sizeof (struct PutMessage) + size,
2115                                      &transmit_message,
2116                                      reply);
2117   if ( (GNUNET_YES == test_load_too_high()) ||
2118        (pgc->results_found > 5 + 2 * pgc->priority) )
2119     {
2120       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
2121       pgc->policy &= ~ ROUTING_POLICY_FORWARD;
2122       return;
2123     }
2124   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2125 }
2126   
2127
2128 /**
2129  * We're processing a GET request from another peer.  Give it to our
2130  * local datastore.
2131  *
2132  * @param cls our "struct ProcessGetContext"
2133  * @param ok did we get a datastore slice or not?
2134  */
2135 static void
2136 ds_get_request (void *cls, 
2137                 int ok)
2138 {
2139   struct ProcessGetContext *pgc = cls;
2140   uint32_t type;
2141   struct GNUNET_TIME_Relative timeout;
2142
2143   if (GNUNET_OK != ok)
2144     {
2145       /* no point in doing P2P stuff if we can't even do local */
2146       GNUNET_free (dsh);
2147       return;
2148     }
2149   type = pgc->type;
2150   if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK)
2151     type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */
2152   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
2153                                            (pgc->priority + 1));
2154   GNUNET_DATASTORE_get (dsh,
2155                         &pgc->query,
2156                         type,
2157                         &process_p2p_get_result,
2158                         pgc,
2159                         timeout);
2160 }
2161
2162
2163 /**
2164  * The priority level imposes a bound on the maximum
2165  * value for the ttl that can be requested.
2166  *
2167  * @param ttl_in requested ttl
2168  * @param priority given priority
2169  * @return ttl_in if ttl_in is below the limit,
2170  *         otherwise the ttl-limit for the given priority
2171  */
2172 static int32_t
2173 bound_ttl (int32_t ttl_in, uint32_t prio)
2174 {
2175   unsigned long long allowed;
2176
2177   if (ttl_in <= 0)
2178     return ttl_in;
2179   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
2180   if (ttl_in > allowed)      
2181     {
2182       if (allowed >= (1 << 30))
2183         return 1 << 30;
2184       return allowed;
2185     }
2186   return ttl_in;
2187 }
2188
2189
2190 /**
2191  * We've received a request with the specified
2192  * priority.  Bound it according to how much
2193  * we trust the given peer.
2194  * 
2195  * @param prio_in requested priority
2196  * @param peer the peer making the request
2197  * @return effective priority
2198  */
2199 static uint32_t
2200 bound_priority (uint32_t prio_in,
2201                 const struct GNUNET_PeerIdentity *peer)
2202 {
2203   return 0; // FIXME!
2204 }
2205
2206
2207 /**
2208  * Handle P2P "GET" request.
2209  *
2210  * @param cls closure, always NULL
2211  * @param peer the other peer involved (sender or receiver, NULL
2212  *        for loopback messages where we are both sender and receiver)
2213  * @param message the actual message
2214  * @return GNUNET_OK to keep the connection open,
2215  *         GNUNET_SYSERR to close it (signal serious error)
2216  */
2217 static int
2218 handle_p2p_get (void *cls,
2219                 const struct GNUNET_PeerIdentity *other,
2220                 const struct GNUNET_MessageHeader *message)
2221 {
2222   uint16_t msize;
2223   const struct GetMessage *gm;
2224   unsigned int bits;
2225   const GNUNET_HashCode *opt;
2226   struct ProcessGetContext *pgc;
2227   uint32_t bm;
2228   size_t bfsize;
2229   uint32_t ttl_decrement;
2230   double preference;
2231   int net_load_up;
2232   int net_load_down;
2233
2234   msize = ntohs(message->size);
2235   if (msize < sizeof (struct GetMessage))
2236     {
2237       GNUNET_break_op (0);
2238       return GNUNET_SYSERR;
2239     }
2240   gm = (const struct GetMessage*) message;
2241   bm = ntohl (gm->hash_bitmap);
2242   bits = 0;
2243   while (bm > 0)
2244     {
2245       if (1 == (bm & 1))
2246         bits++;
2247       bm >>= 1;
2248     }
2249   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
2250     {
2251       GNUNET_break_op (0);
2252       return GNUNET_SYSERR;
2253     }  
2254   opt = (const GNUNET_HashCode*) &gm[1];
2255   bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
2256   pgc = GNUNET_malloc (sizeof (struct ProcessGetContext));
2257   if (bfsize > 0)
2258     pgc->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &pgc[1],
2259                                                  bfsize,
2260                                                  BLOOMFILTER_K);
2261   pgc->type = ntohl (gm->type);
2262   pgc->bm = ntohl (gm->hash_bitmap);
2263   pgc->mingle = gm->filter_mutator;
2264   bits = 0;
2265   if (0 != (pgc->bm & GET_MESSAGE_BIT_RETURN_TO))
2266     pgc->reply_to.hashPubKey = opt[bits++];
2267   else
2268     pgc->reply_to = *other;
2269   if (0 != (pgc->bm & GET_MESSAGE_BIT_SKS_NAMESPACE))
2270     pgc->namespace = opt[bits++];
2271   else if (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)
2272     {
2273       GNUNET_break_op (0);
2274       GNUNET_free (pgc);
2275       return GNUNET_SYSERR;
2276     }
2277   if (0 != (pgc->bm & GET_MESSAGE_BIT_TRANSMIT_TO))
2278     pgc->prime_target.hashPubKey = opt[bits++];
2279   /* note that we can really only check load here since otherwise
2280      peers could find out that we are overloaded by being disconnected
2281      after sending us a malformed query... */
2282   if (GNUNET_YES == test_load_too_high ())
2283     {
2284       if (NULL != pgc->bf)
2285         GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2286       GNUNET_free (pgc);
2287 #if DEBUG_FS
2288       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2289                   "Dropping query from `%s', this peer is too busy.\n",
2290                   GNUNET_h2s (other));
2291 #endif
2292       return GNUNET_OK;
2293     }
2294   net_load_up = 50; // FIXME
2295   net_load_down = 50; // FIXME
2296   pgc->policy = ROUTING_POLICY_NONE;
2297   if ( (net_load_up < IDLE_LOAD_THRESHOLD) &&
2298        (net_load_down < IDLE_LOAD_THRESHOLD) )
2299     {
2300       pgc->policy |= ROUTING_POLICY_ALL;
2301       pgc->priority = 0; /* no charge */
2302     }
2303   else
2304     {
2305       pgc->priority = bound_priority (ntohl (gm->priority), other);
2306       if ( (net_load_up < 
2307             IDLE_LOAD_THRESHOLD + pgc->priority * pgc->priority) &&
2308            (net_load_down < 
2309             IDLE_LOAD_THRESHOLD + pgc->priority * pgc->priority) )
2310         {
2311           pgc->policy |= ROUTING_POLICY_ALL;
2312         }
2313       else
2314         {
2315           // FIXME: is this sound?
2316           if (net_load_up < 90 + 10 * pgc->priority)
2317             pgc->policy |= ROUTING_POLICY_FORWARD;
2318           if (net_load_down < 90 + 10 * pgc->priority)
2319             pgc->policy |= ROUTING_POLICY_ANSWER;
2320         }
2321     }
2322   if (pgc->policy == ROUTING_POLICY_NONE)
2323     {
2324 #if DEBUG_FS
2325       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2326                   "Dropping query from `%s', network saturated.\n",
2327                   GNUNET_h2s (other));
2328 #endif
2329       if (NULL != pgc->bf)
2330         GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2331       GNUNET_free (pgc);
2332       return GNUNET_OK;     /* drop */
2333     }
2334   if ((pgc->policy & ROUTING_POLICY_INDIRECT) != ROUTING_POLICY_INDIRECT)
2335     pgc->priority = 0;  /* kill the priority (we cannot benefit) */
2336   pgc->ttl = bound_ttl (ntohl (gm->ttl), pgc->priority);
2337   /* decrement ttl (always) */
2338   ttl_decrement = 2 * TTL_DECREMENT +
2339     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2340                               TTL_DECREMENT);
2341   if ( (pgc->ttl < 0) &&
2342        (pgc->ttl - ttl_decrement > 0) )
2343     {
2344 #if DEBUG_FS
2345       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2346                   "Dropping query from `%s' due to TTL underflow.\n",
2347                   GNUNET_h2s (other));
2348 #endif
2349       /* integer underflow => drop (should be very rare)! */
2350       if (NULL != pgc->bf)
2351         GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2352       GNUNET_free (pgc);
2353       return GNUNET_OK;
2354     }
2355   pgc->ttl -= ttl_decrement;
2356   pgc->start_time = GNUNET_TIME_absolute_get ();
2357   preference = (double) pgc->priority;
2358   if (preference < QUERY_BANDWIDTH_VALUE)
2359     preference = QUERY_BANDWIDTH_VALUE;
2360   // FIXME: also reserve bandwidth for reply?
2361   GNUNET_CORE_peer_configure (core,
2362                               other,
2363                               GNUNET_TIME_UNIT_FOREVER_REL,
2364                               0, 0, preference, NULL, NULL);
2365   if (0 != (pgc->policy & ROUTING_POLICY_ANSWER))
2366     pgc->drq = queue_ds_request (BASIC_DATASTORE_REQUEST_DELAY,
2367                                  &ds_get_request,
2368                                  pgc);
2369   else
2370     GNUNET_SCHEDULER_add_continuation (sched,
2371                                        GNUNET_NO,
2372                                        &forward_get_request,
2373                                        pgc,
2374                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
2375   return GNUNET_OK;
2376 }
2377
2378
2379 /**
2380  * Function called to notify us that we can now transmit a reply to a
2381  * client or peer.  "buf" will be NULL and "size" zero if the socket was
2382  * closed for writing in the meantime.
2383  *
2384  * @param cls closure, points to a "struct PendingRequest*" with
2385  *            one or more pending replies
2386  * @param size number of bytes available in buf
2387  * @param buf where the callee should write the message
2388  * @return number of bytes written to buf
2389  */
2390 static size_t
2391 transmit_result (void *cls,
2392                  size_t size, 
2393                  void *buf)
2394 {
2395   struct PendingRequest *pr = cls;
2396   char *cbuf = buf;
2397   struct PendingReply *reply;
2398   size_t ret;
2399
2400   ret = 0;
2401   while (NULL != (reply = pr->replies_pending))
2402     {
2403       if ( (reply->msize + ret < ret) ||
2404            (reply->msize + ret > size) )
2405         break;
2406       pr->replies_pending = reply->next;
2407       memcpy (&cbuf[ret], &reply[1], reply->msize);
2408       ret += reply->msize;
2409       GNUNET_free (pr);
2410     }
2411   return 0;
2412 }
2413
2414
2415 /**
2416  * Iterator over pending requests.
2417  *
2418  * @param cls response (struct ProcessReplyClosure)
2419  * @param key our query
2420  * @param value value in the hash map (meta-info about the query)
2421  * @return GNUNET_YES (we should continue to iterate)
2422  */
2423 static int
2424 process_reply (void *cls,
2425                const GNUNET_HashCode * key,
2426                void *value)
2427 {
2428   struct ProcessReplyClosure *prq = cls;
2429   struct PendingRequest *pr = value;
2430   struct PendingRequest *eer;
2431   struct PendingReply *reply;
2432   struct PutMessage *pm;
2433   struct ContentMessage *cm;
2434   GNUNET_HashCode chash;
2435   GNUNET_HashCode mhash;
2436   struct GNUNET_PeerIdentity target;
2437   size_t msize;
2438   uint32_t prio;
2439   struct GNUNET_TIME_Relative max_delay;
2440   
2441   GNUNET_CRYPTO_hash (prq->data,
2442                       prq->size,
2443                       &chash);
2444   switch (prq->type)
2445     {
2446     case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
2447     case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
2448       if (0 != memcmp (&chash,
2449                        key,
2450                        sizeof (GNUNET_HashCode)))
2451         {
2452           GNUNET_break_op (0);
2453           return GNUNET_YES;
2454         }
2455       break;
2456     case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
2457       // FIXME: validate KBlock!
2458       if (pr->bf != NULL) 
2459         {
2460           mingle_hash (&chash, pr->mingle, &mhash);
2461           if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (pr->bf,
2462                                                                &mhash))
2463             return GNUNET_YES; /* duplicate */
2464           GNUNET_CONTAINER_bloomfilter_add (pr->bf,
2465                                             &mhash);
2466         }      
2467       break;
2468     case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
2469       // FIXME: validate SBlock!
2470       if (pr->bf != NULL) 
2471         {
2472           mingle_hash (&chash, pr->mingle, &mhash);
2473           if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (pr->bf,
2474                                                                &mhash))
2475             return GNUNET_YES; /* duplicate */
2476           GNUNET_CONTAINER_bloomfilter_add (pr->bf,
2477                                             &mhash);
2478         }
2479       break;
2480     case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK:
2481       // FIXME: validate SKBlock!
2482       break;
2483     }
2484   prio = pr->priority;
2485   prq->priority += pr->remaining_priority;
2486   pr->remaining_priority = 0;
2487   if (pr->client != NULL)
2488     {
2489       if (pr->replies_seen_size == pr->replies_seen_off)
2490         GNUNET_array_grow (pr->replies_seen,
2491                            pr->replies_seen_size,
2492                            pr->replies_seen_size * 2 + 4);
2493       pr->replies_seen[pr->replies_seen_off++] = chash;
2494       // FIXME: possibly recalculate BF!
2495     }
2496   if (pr->client == NULL)
2497     {
2498       msize = sizeof (struct ContentMessage) + prq->size;
2499       reply = GNUNET_malloc (msize + sizeof (struct PendingReply));
2500       reply->msize = msize;
2501       cm = (struct ContentMessage*) &reply[1];
2502       cm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT);
2503       cm->header.size = htons (msize);
2504       cm->type = htonl (prq->type);
2505       cm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
2506       reply->next = pr->replies_pending;
2507       pr->replies_pending = reply;
2508       memcpy (&reply[1], prq->data, prq->size);
2509       if (pr->cth != NULL)
2510         return GNUNET_YES;
2511       max_delay = GNUNET_TIME_UNIT_FOREVER_REL;
2512       if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) >= max_pending_requests)
2513         {
2514           /* estimate expiration time from time difference between
2515              first request that will be discarded and this request */
2516           eer = GNUNET_CONTAINER_heap_peek (requests_by_expiration);
2517           max_delay = GNUNET_TIME_absolute_get_difference (pr->start_time,
2518                                                            eer->start_time);
2519         }
2520       GNUNET_PEER_resolve (pr->source_pid,
2521                            &target);
2522       pr->cth = GNUNET_CORE_notify_transmit_ready (core,
2523                                                    prio,
2524                                                    max_delay,
2525                                                    &target,
2526                                                    msize,
2527                                                    &transmit_result,
2528                                                    pr);
2529       if (NULL == pr->cth)
2530         {
2531           // FIXME: now what? discard?
2532         }
2533     }
2534   else
2535     {
2536       msize = sizeof (struct PutMessage) + prq->size;
2537       reply = GNUNET_malloc (msize + sizeof (struct PendingReply));
2538       reply->msize = msize;
2539       reply->next = pr->replies_pending;
2540       pm = (struct PutMessage*) &reply[1];
2541       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2542       pm->header.size = htons (msize);
2543       pm->type = htonl (prq->type);
2544       pm->expiration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (prq->expiration));
2545       pr->replies_pending = reply;
2546       memcpy (&reply[1], prq->data, prq->size);
2547       if (pr->th != NULL)
2548         return GNUNET_YES;
2549       pr->th = GNUNET_SERVER_notify_transmit_ready (pr->client,
2550                                                     msize,
2551                                                     GNUNET_TIME_UNIT_FOREVER_REL,
2552                                                     &transmit_result,
2553                                                     pr);
2554       if (pr->th == NULL)
2555         {
2556           // FIXME: need to try again later (not much
2557           // to do here specifically, but we need to
2558           // check somewhere else to handle this case!)
2559         }
2560     }
2561   // FIXME: implement hot-path routing statistics keeping!
2562   return GNUNET_YES;
2563 }
2564
2565
2566 /**
2567  * Handle P2P "PUT" request.
2568  *
2569  * @param cls closure, always NULL
2570  * @param peer the other peer involved (sender or receiver, NULL
2571  *        for loopback messages where we are both sender and receiver)
2572  * @param message the actual message
2573  * @return GNUNET_OK to keep the connection open,
2574  *         GNUNET_SYSERR to close it (signal serious error)
2575  */
2576 static int
2577 handle_p2p_put (void *cls,
2578                 const struct GNUNET_PeerIdentity *other,
2579                 const struct GNUNET_MessageHeader *message)
2580 {
2581   const struct PutMessage *put;
2582   uint16_t msize;
2583   size_t dsize;
2584   uint32_t type;
2585   struct GNUNET_TIME_Absolute expiration;
2586   GNUNET_HashCode query;
2587   const struct KBlock *kb;
2588   struct ProcessReplyClosure prq;
2589
2590   msize = ntohs (message->size);
2591   if (msize < sizeof (struct PutMessage))
2592     {
2593       GNUNET_break_op(0);
2594       return GNUNET_SYSERR;
2595     }
2596   put = (const struct PutMessage*) message;
2597   dsize = msize - sizeof (struct PutMessage);
2598   type = ntohl (put->type);
2599   expiration = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (put->expiration));
2600
2601   /* first, validate! */
2602   switch (type)
2603     {
2604     case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
2605     case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
2606       GNUNET_CRYPTO_hash (&put[1], dsize, &query);
2607       break;
2608     case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
2609       if (dsize < sizeof (struct KBlock))
2610         {
2611           GNUNET_break_op (0);
2612           return GNUNET_SYSERR;
2613         }
2614       kb = (const struct KBlock*) &put[1];
2615       // FIXME -- validation code below broken...
2616       if ( (dsize != ntohs (kb->purpose.size) + 42) ||
2617            (GNUNET_OK !=
2618             GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_KBLOCK,
2619                                       &kb->purpose,
2620                                       &kb->signature,
2621                                       &kb->keyspace)) )
2622         {
2623           GNUNET_break_op (0);
2624           return GNUNET_SYSERR;
2625         }
2626       GNUNET_CRYPTO_hash (&kb->keyspace,
2627                           sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
2628                           &query);
2629       break;
2630     case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
2631       // FIXME -- validate SBLOCK!
2632       GNUNET_break (0);
2633       return GNUNET_OK;
2634     case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK:
2635       // FIXME -- validate SKBLOCK!
2636       GNUNET_break (0);
2637       return GNUNET_OK;
2638     default:
2639       /* unknown block type */
2640       GNUNET_break_op (0);
2641       return GNUNET_SYSERR;
2642     }
2643
2644   /* now, lookup 'query' */
2645   prq.data = (const void*) &put[1];
2646   prq.size = dsize;
2647   prq.type = type;
2648   prq.expiration = expiration;
2649   prq.priority = 0;
2650   GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_query,
2651                                               &query,
2652                                               &process_reply,
2653                                               &prq);
2654   // FIXME: if migration is on and load is low,
2655   // queue to store data in datastore;
2656   // use "prq.priority" for that!
2657   return GNUNET_OK;
2658 }
2659
2660
2661 /**
2662  * List of handlers for P2P messages
2663  * that we care about.
2664  */
2665 static struct GNUNET_CORE_MessageHandler p2p_handlers[] =
2666   {
2667     { &handle_p2p_get, 
2668       GNUNET_MESSAGE_TYPE_FS_GET, 0 },
2669     { &handle_p2p_put, 
2670       GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
2671     { NULL, 0, 0 }
2672   };
2673
2674
2675 /**
2676  * Task that will try to initiate a connection with the
2677  * core service.
2678  * 
2679  * @param cls unused
2680  * @param tc unused
2681  */
2682 static void
2683 core_connect_task (void *cls,
2684                    const struct GNUNET_SCHEDULER_TaskContext *tc);
2685
2686
2687 /**
2688  * Function called by the core after we've
2689  * connected.
2690  */
2691 static void
2692 core_start_cb (void *cls,
2693                struct GNUNET_CORE_Handle * server,
2694                const struct GNUNET_PeerIdentity *
2695                my_identity,
2696                const struct
2697                GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *
2698                publicKey)
2699 {
2700   if (server == NULL)
2701     {
2702       GNUNET_SCHEDULER_add_delayed (sched,
2703                                     GNUNET_NO,
2704                                     GNUNET_SCHEDULER_PRIORITY_HIGH,
2705                                     GNUNET_SCHEDULER_NO_TASK,
2706                                     GNUNET_TIME_UNIT_SECONDS,
2707                                     &core_connect_task,
2708                                     NULL);
2709       return;
2710     }
2711   core = server;
2712 }
2713
2714
2715 /**
2716  * Task that will try to initiate a connection with the
2717  * core service.
2718  * 
2719  * @param cls unused
2720  * @param tc unused
2721  */
2722 static void
2723 core_connect_task (void *cls,
2724                    const struct GNUNET_SCHEDULER_TaskContext *tc)
2725 {
2726   GNUNET_CORE_connect (sched,
2727                        cfg,
2728                        GNUNET_TIME_UNIT_FOREVER_REL,
2729                        NULL,
2730                        &core_start_cb,
2731                        NULL,
2732                        &peer_disconnect_handler,
2733                        NULL, 
2734                        NULL, GNUNET_NO,
2735                        NULL, GNUNET_NO,
2736                        p2p_handlers);
2737 }
2738
2739
2740 /**
2741  * Process fs requests.
2742  *
2743  * @param cls closure
2744  * @param sched scheduler to use
2745  * @param server the initialized server
2746  * @param cfg configuration to use
2747  */
2748 static void
2749 run (void *cls,
2750      struct GNUNET_SCHEDULER_Handle *s,
2751      struct GNUNET_SERVER_Handle *server,
2752      const struct GNUNET_CONFIGURATION_Handle *c)
2753 {
2754   sched = s;
2755   cfg = c;
2756
2757   ifm = GNUNET_CONTAINER_multihashmap_create (128);
2758   requests_by_query = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
2759   requests_by_peer = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
2760   requests_by_expiration = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
2761   read_index_list ();
2762   dsh = GNUNET_DATASTORE_connect (cfg,
2763                                   sched);
2764   if (NULL == dsh)
2765     {
2766       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2767                   _("Failed to connect to datastore service.\n"));
2768       return;
2769     }
2770   GNUNET_SERVER_disconnect_notify (server, 
2771                                    &handle_client_disconnect,
2772                                    NULL);
2773   GNUNET_SERVER_add_handlers (server, handlers);
2774   core_connect_task (NULL, NULL);
2775   GNUNET_SCHEDULER_add_delayed (sched,
2776                                 GNUNET_YES,
2777                                 GNUNET_SCHEDULER_PRIORITY_IDLE,
2778                                 GNUNET_SCHEDULER_NO_TASK,
2779                                 GNUNET_TIME_UNIT_FOREVER_REL,
2780                                 &shutdown_task,
2781                                 NULL);
2782 }
2783
2784
2785 /**
2786  * The main function for the fs service.
2787  *
2788  * @param argc number of arguments from the command line
2789  * @param argv command line arguments
2790  * @return 0 ok, 1 on error
2791  */
2792 int
2793 main (int argc, char *const *argv)
2794 {
2795   return (GNUNET_OK ==
2796           GNUNET_SERVICE_run (argc,
2797                               argv,
2798                               "fs", &run, NULL, NULL, NULL)) ? 0 : 1;
2799 }
2800
2801 /* end of gnunet-service-fs.c */