fixing indexing and unindexing issues
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs.c
23  * @brief program that provides the file-sharing service
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - fix gazillion of minor FIXME's
28  * - possible major issue: we may queue "gazillions" of (K|S)Blocks for the
29  *   core to transmit to another peer; need to make sure this is bounded overall...
30  * - randomly delay processing for improved anonymity (can wait)
31  * - content migration (put in local DS) (can wait)
32  * - handle some special cases when forwarding replies based on tracked requests (can wait)
33  * - tracking of success correlations for hot-path routing (can wait)
34  * - various load-based actions (can wait)
35  * - validation of KSBLOCKS (can wait)
36  * - remove on-demand blocks if they keep failing (can wait)
37  * - check that we decrement PIDs always where necessary (can wait)
38  * - find out how to use core-pulling instead of pushing (at least for some cases)
39  */
40 #include "platform.h"
41 #include <float.h>
42 #include "gnunet_core_service.h"
43 #include "gnunet_datastore_service.h"
44 #include "gnunet_peer_lib.h"
45 #include "gnunet_protocols.h"
46 #include "gnunet_signatures.h"
47 #include "gnunet_util_lib.h"
48 #include "fs.h"
49
50 #define DEBUG_FS GNUNET_NO
51
52
53 /**
54  * In-memory information about indexed files (also available
55  * on-disk).
56  */
57 struct IndexInfo
58 {
59   
60   /**
61    * This is a linked list.
62    */
63   struct IndexInfo *next;
64
65   /**
66    * Name of the indexed file.  Memory allocated
67    * at the end of this struct (do not free).
68    */
69   const char *filename;
70
71   /**
72    * Context for transmitting confirmation to client,
73    * NULL if we've done this already.
74    */
75   struct GNUNET_SERVER_TransmitContext *tc;
76   
77   /**
78    * Hash of the contents of the file.
79    */
80   GNUNET_HashCode file_id;
81
82 };
83
84
85 /**
86  * Signature of a function that is called whenever a datastore
87  * request can be processed (or an entry put on the queue times out).
88  *
89  * @param cls closure
90  * @param ok GNUNET_OK if DS is ready, GNUNET_SYSERR on timeout
91  */
92 typedef void (*RequestFunction)(void *cls,
93                                 int ok);
94
95
96 /**
97  * Doubly-linked list of our requests for the datastore.
98  */
99 struct DatastoreRequestQueue
100 {
101
102   /**
103    * This is a doubly-linked list.
104    */
105   struct DatastoreRequestQueue *next;
106
107   /**
108    * This is a doubly-linked list.
109    */
110   struct DatastoreRequestQueue *prev;
111
112   /**
113    * Function to call (will issue the request).
114    */
115   RequestFunction req;
116
117   /**
118    * Closure for req.
119    */
120   void *req_cls;
121
122   /**
123    * When should this request time-out because we don't care anymore?
124    */
125   struct GNUNET_TIME_Absolute timeout;
126     
127   /**
128    * ID of task used for signaling timeout.
129    */
130   GNUNET_SCHEDULER_TaskIdentifier task;
131
132 };
133
134
135 /**
136  * Closure for processing START_SEARCH messages from a client.
137  */
138 struct LocalGetContext
139 {
140
141   /**
142    * This is a doubly-linked list.
143    */
144   struct LocalGetContext *next;
145
146   /**
147    * This is a doubly-linked list.
148    */
149   struct LocalGetContext *prev;
150
151   /**
152    * Client that initiated the search.
153    */
154   struct GNUNET_SERVER_Client *client;
155
156   /**
157    * Array of results that we've already received 
158    * (can be NULL).
159    */
160   GNUNET_HashCode *results; 
161
162   /**
163    * Bloomfilter over all results (for fast query construction);
164    * NULL if we don't have any results.
165    *
166    * FIXME: this member is not used, is that OK? If so, it should
167    * be removed!
168    */
169   struct GNUNET_CONTAINER_BloomFilter *results_bf; 
170
171   /**
172    * DS request associated with this operation.
173    */
174   struct DatastoreRequestQueue *req;
175
176   /**
177    * Current result message to transmit to client (or NULL).
178    */
179   struct ContentMessage *result;
180   
181   /**
182    * Type of the content that we're looking for.
183    * 0 for any.
184    */
185   uint32_t type;
186
187   /**
188    * Desired anonymity level.
189    */
190   uint32_t anonymity_level;
191
192   /**
193    * Number of results actually stored in the results array.
194    */
195   unsigned int results_used;
196   
197   /**
198    * Size of the results array in memory.
199    */
200   unsigned int results_size;
201   
202   /**
203    * Size (in bytes) of the 'results_bf' bloomfilter.
204    *
205    * FIXME: this member is not used, is that OK? If so, it should
206    * be removed!
207    */
208   size_t results_bf_size;
209
210   /**
211    * If the request is for a DBLOCK or IBLOCK, this is the identity of
212    * the peer that is known to have a response.  Set to all-zeros if
213    * such a target is not known (note that even if OUR anonymity
214    * level is >0 we may happen to know the responder's identity;
215    * nevertheless, we should probably not use it for a DHT-lookup
216    * or similar blunt actions in order to avoid exposing ourselves).
217    */
218   struct GNUNET_PeerIdentity target;
219
220   /**
221    * If the request is for an SBLOCK, this is the identity of the
222    * pseudonym to which the SBLOCK belongs. 
223    */
224   GNUNET_HashCode namespace;
225
226   /**
227    * Hash of the keyword (aka query) for KBLOCKs; Hash of
228    * the CHK-encoded block for DBLOCKS and IBLOCKS (aka query)
229    * and hash of the identifier XORed with the target for
230    * SBLOCKS (aka query).
231    */
232   GNUNET_HashCode query;
233
234 };
235
236
237 /**
238  * Possible routing policies for an FS-GET request.
239  */
240 enum RoutingPolicy
241   {
242     /**
243      * Simply drop the request.
244      */
245     ROUTING_POLICY_NONE = 0,
246     
247     /**
248      * Answer it if we can from local datastore.
249      */
250     ROUTING_POLICY_ANSWER = 1,
251
252     /**
253      * Forward the request to other peers (if possible).
254      */
255     ROUTING_POLICY_FORWARD = 2,
256
257     /**
258      * Forward to other peers, and ask them to route
259      * the response via ourselves.
260      */
261     ROUTING_POLICY_INDIRECT = 6,
262     
263     /**
264      * Do everything we could possibly do (that would
265      * make sense).
266      */
267     ROUTING_POLICY_ALL = 7
268   };
269
270
271 /**
272  * Internal context we use for our initial processing
273  * of a GET request.
274  */
275 struct ProcessGetContext
276 {
277   /**
278    * The search query (used for datastore lookup).
279    */
280   GNUNET_HashCode query;
281   
282   /**
283    * Which peer we should forward the response to.
284    */
285   struct GNUNET_PeerIdentity reply_to;
286
287   /**
288    * Namespace for the result (only set for SKS requests)
289    */
290   GNUNET_HashCode namespace;
291
292   /**
293    * Peer that we should forward the query to if possible
294    * (since that peer likely has the content).
295    */
296   struct GNUNET_PeerIdentity prime_target;
297
298   /**
299    * When did we receive this request?
300    */
301   struct GNUNET_TIME_Absolute start_time;
302
303   /**
304    * Our entry in the DRQ (non-NULL while we wait for our
305    * turn to interact with the local database).
306    */
307   struct DatastoreRequestQueue *drq;
308
309   /**
310    * Filter used to eliminate duplicate results.  Can be NULL if we
311    * are not yet filtering any results.
312    */
313   struct GNUNET_CONTAINER_BloomFilter *bf;
314
315   /**
316    * Bitmap describing which of the optional
317    * hash codes / peer identities were given to us.
318    */
319   uint32_t bm;
320
321   /**
322    * Desired block type.
323    */
324   uint32_t type;
325
326   /**
327    * Priority of the request.
328    */
329   uint32_t priority;
330
331   /**
332    * Size of the 'bf' (in bytes).
333    */
334   size_t bf_size;
335
336   /**
337    * In what ways are we going to process
338    * the request?
339    */
340   enum RoutingPolicy policy;
341
342   /**
343    * Time-to-live for the request (value
344    * we use).
345    */
346   int32_t ttl;
347
348   /**
349    * Number to mingle hashes for bloom-filter
350    * tests with.
351    */
352   int32_t mingle;
353
354   /**
355    * Number of results that were found so far.
356    */
357   unsigned int results_found;
358 };
359
360
361 /**
362  * Information we keep for each pending reply.
363  */
364 struct PendingReply
365 {
366   /**
367    * This is a linked list.
368    */
369   struct PendingReply *next;
370
371   /**
372    * Size of the reply; actual reply message follows
373    * at the end of this struct.
374    */
375   size_t msize;
376
377 };
378
379
380 /**
381  * All requests from a client are kept in a doubly-linked list.
382  */
383 struct ClientRequestList;
384
385
386 /**
387  * Information we keep for each pending request.  We should try to
388  * keep this struct as small as possible since its memory consumption
389  * is key to how many requests we can have pending at once.
390  */
391 struct PendingRequest
392 {
393
394   /**
395    * ID of a client making a request, NULL if this entry is for a
396    * peer.
397    */
398   struct GNUNET_SERVER_Client *client;
399
400   /**
401    * If this request was made by a client,
402    * this is our entry in the client request
403    * list; otherwise NULL.
404    */
405   struct ClientRequestList *crl_entry;
406
407   /**
408    * If this is a namespace query, pointer to the hash of the public
409    * key of the namespace; otherwise NULL.
410    */
411   GNUNET_HashCode *namespace;
412
413   /**
414    * Bloomfilter we use to filter out replies that we don't care about
415    * (anymore).  NULL as long as we are interested in all replies.
416    */
417   struct GNUNET_CONTAINER_BloomFilter *bf;
418
419   /**
420    * Replies that we have received but were unable to forward yet
421    * (typically non-null only if we have a pending transmission
422    * request with the client or the respective peer).
423    */
424   struct PendingReply *replies_pending;
425
426   /**
427    * Pending transmission request with the core service for the target
428    * peer (for processing of 'replies_pending') or Handle for a
429    * pending query-request for P2P-transmission with the core service.
430    * If non-NULL, this request must be cancelled should this struct be
431    * destroyed!
432    */
433   struct GNUNET_CORE_TransmitHandle *cth;
434
435   /**
436    * Pending transmission request for the target client (for processing of
437    * 'replies_pending').
438    */
439   struct GNUNET_CONNECTION_TransmitHandle *th;
440
441   /**
442    * Hash code of all replies that we have seen so far (only valid
443    * if client is not NULL since we only track replies like this for
444    * our own clients).
445    */
446   GNUNET_HashCode *replies_seen;
447
448   /**
449    * When did we first see this request (form this peer), or, if our
450    * client is initiating, when did we last initiate a search?
451    */
452   struct GNUNET_TIME_Absolute start_time;
453
454   /**
455    * The query that this request is for.
456    */
457   GNUNET_HashCode query;
458
459   /**
460    * The task responsible for transmitting queries
461    * for this request.
462    */
463   GNUNET_SCHEDULER_TaskIdentifier task;
464
465   /**
466    * (Interned) Peer identifier (only valid if "client" is NULL)
467    * that identifies a peer that gave us this request.
468    */
469   GNUNET_PEER_Id source_pid;
470
471   /**
472    * (Interned) Peer identifier that identifies a preferred target
473    * for requests.
474    */
475   GNUNET_PEER_Id target_pid;
476
477   /**
478    * (Interned) Peer identifiers of peers that have already
479    * received our query for this content.
480    */
481   GNUNET_PEER_Id *used_pids;
482
483   /**
484    * Size of the 'bf' (in bytes).
485    */
486   size_t bf_size;
487
488   /**
489    * Desired anonymity level; only valid for requests from a local client.
490    */
491   uint32_t anonymity_level;
492
493   /**
494    * How many entries in "used_pids" are actually valid?
495    */
496   unsigned int used_pids_off;
497
498   /**
499    * How long is the "used_pids" array?
500    */
501   unsigned int used_pids_size;
502
503   /**
504    * How many entries in "replies_seen" are actually valid?
505    */
506   unsigned int replies_seen_off;
507
508   /**
509    * How long is the "replies_seen" array?
510    */
511   unsigned int replies_seen_size;
512   
513   /**
514    * Priority with which this request was made.  If one of our clients
515    * made the request, then this is the current priority that we are
516    * using when initiating the request.  This value is used when
517    * we decide to reward other peers with trust for providing a reply.
518    */
519   uint32_t priority;
520
521   /**
522    * Priority points left for us to spend when forwarding this request
523    * to other peers.
524    */
525   uint32_t remaining_priority;
526
527   /**
528    * Number to mingle hashes for bloom-filter
529    * tests with.
530    */
531   int32_t mingle;
532
533   /**
534    * TTL with which we saw this request (or, if we initiated, TTL that
535    * we used for the request).
536    */
537   int32_t ttl;
538   
539   /**
540    * Type of the content that this request is for.
541    */
542   uint32_t type;
543
544 };
545
546
547 /**
548  * All requests from a client are kept in a doubly-linked list.
549  */
550 struct ClientRequestList
551 {
552   /**
553    * This is a doubly-linked list.
554    */
555   struct ClientRequestList *next;
556
557   /**
558    * This is a doubly-linked list.
559    */ 
560   struct ClientRequestList *prev;
561
562   /**
563    * A request from this client.
564    */
565   struct PendingRequest *req;
566
567   /**
568    * Client list with the head and tail of this DLL.
569    */
570   struct ClientList *cl;
571 };
572
573
574 /**
575  * Linked list of all clients that we are 
576  * currently processing requests for.
577  */
578 struct ClientList
579 {
580
581   /**
582    * This is a linked list.
583    */
584   struct ClientList *next;
585
586   /**
587    * What client is this entry for?
588    */
589   struct GNUNET_SERVER_Client* client;
590
591   /**
592    * Head of the DLL of requests from this client.
593    */
594   struct ClientRequestList *head;
595
596   /**
597    * Tail of the DLL of requests from this client.
598    */
599   struct ClientRequestList *tail;
600
601 };
602
603
604 /**
605  * Closure for "process_reply" function.
606  */
607 struct ProcessReplyClosure
608 {
609   /**
610    * The data for the reply.
611    */
612   const void *data;
613
614   /**
615    * When the reply expires.
616    */
617   struct GNUNET_TIME_Absolute expiration;
618
619   /**
620    * Size of data.
621    */
622   size_t size;
623
624   /**
625    * Namespace that this reply belongs to
626    * (if it is of type SBLOCK).
627    */
628   GNUNET_HashCode namespace;
629
630   /**
631    * Type of the block.
632    */
633   uint32_t type;
634
635   /**
636    * How much was this reply worth to us?
637    */
638   uint32_t priority;
639 };
640
641
642 /**
643  * Information about a peer that we are connected to.
644  * We track data that is useful for determining which
645  * peers should receive our requests.
646  */
647 struct ConnectedPeer
648 {
649
650   /**
651    * List of the last clients for which this peer
652    * successfully answered a query. 
653    */
654   struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
655
656   /**
657    * List of the last PIDs for which
658    * this peer successfully answered a query;
659    * We use 0 to indicate no successful reply.
660    */
661   GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
662
663   /**
664    * Average delay between sending the peer a request and
665    * getting a reply (only calculated over the requests for
666    * which we actually got a reply).   Calculated
667    * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
668    */ 
669   struct GNUNET_TIME_Relative avg_delay;
670
671   /**
672    * Average priority of successful replies.  Calculated
673    * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
674    */
675   double avg_priority;
676
677   /**
678    * The peer's identity.
679    */
680   GNUNET_PEER_Id pid;  
681
682   /**
683    * Number of requests we have currently pending
684    * with this peer (that is, requests that were
685    * transmitted so recently that we would not retransmit
686    * them right now).
687    */
688   unsigned int pending_requests;
689
690   /**
691    * Which offset in "last_p2p_replies" will be updated next?
692    * (we go round-robin).
693    */
694   unsigned int last_p2p_replies_woff;
695
696   /**
697    * Which offset in "last_client_replies" will be updated next?
698    * (we go round-robin).
699    */
700   unsigned int last_client_replies_woff;
701
702 };
703
704
705 /**
706  * Our connection to the datastore.
707  */
708 static struct GNUNET_DATASTORE_Handle *dsh;
709
710 /**
711  * Our scheduler.
712  */
713 static struct GNUNET_SCHEDULER_Handle *sched;
714
715 /**
716  * Our configuration.
717  */
718 const struct GNUNET_CONFIGURATION_Handle *cfg;
719
720 /**
721  * Handle to the core service (NULL until we've connected to it).
722  */
723 struct GNUNET_CORE_Handle *core;
724
725 /**
726  * Head of doubly-linked LGC list.
727  */
728 static struct LocalGetContext *lgc_head;
729
730 /**
731  * Tail of doubly-linked LGC list.
732  */
733 static struct LocalGetContext *lgc_tail;
734
735 /**
736  * Head of request queue for the datastore, sorted by timeout.
737  */
738 static struct DatastoreRequestQueue *drq_head;
739
740 /**
741  * Tail of request queue for the datastore.
742  */
743 static struct DatastoreRequestQueue *drq_tail;
744
745 /**
746  * Linked list of indexed files.
747  */
748 static struct IndexInfo *indexed_files;
749
750 /**
751  * Maps hash over content of indexed files to the respective filename.
752  * The filenames are pointers into the indexed_files linked list and
753  * do not need to be freed.
754  */
755 static struct GNUNET_CONTAINER_MultiHashMap *ifm;
756
757 /**
758  * Map of query hash codes to requests.
759  */
760 static struct GNUNET_CONTAINER_MultiHashMap *requests_by_query;
761
762 /**
763  * Map of peer IDs to requests (for those requests coming
764  * from other peers).
765  */
766 static struct GNUNET_CONTAINER_MultiHashMap *requests_by_peer;
767
768 /**
769  * Linked list of all of our clients and their requests.
770  */
771 static struct ClientList *clients;
772
773 /**
774  * Heap with the request that will expire next at the top.  Contains
775  * pointers of type "struct PendingRequest*"; these will *also* be
776  * aliased from the "requests_by_peer" data structures and the
777  * "requests_by_query" table.  Note that requests from our clients
778  * don't expire and are thus NOT in the "requests_by_expiration"
779  * (or the "requests_by_peer" tables).
780  */
781 static struct GNUNET_CONTAINER_Heap *requests_by_expiration;
782
783 /**
784  * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
785  */
786 static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
787
788 /**
789  * Maximum number of requests (from other peers) that we're
790  * willing to have pending at any given point in time.
791  * FIXME: set from configuration (and 32 is a tiny value for testing only).
792  */
793 static uint64_t max_pending_requests = 32;
794
795 /**
796  * Write the current index information list to disk.
797  */ 
798 static void
799 write_index_list ()
800 {
801   struct GNUNET_BIO_WriteHandle *wh;
802   char *fn;
803   struct IndexInfo *pos;  
804
805   if (GNUNET_OK !=
806       GNUNET_CONFIGURATION_get_value_filename (cfg,
807                                                "FS",
808                                                "INDEXDB",
809                                                &fn))
810     {
811       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
812                   _("Configuration option `%s' in section `%s' missing.\n"),
813                   "INDEXDB",
814                   "FS");
815       return;
816     }
817   wh = GNUNET_BIO_write_open (fn);
818   if (NULL == wh)
819     {
820       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
821                   _("Could not open `%s'.\n"),
822                   fn);
823       GNUNET_free (fn);
824       return;
825     }
826   pos = indexed_files;
827   while (pos != NULL)
828     {
829       if ( (GNUNET_OK !=
830             GNUNET_BIO_write (wh,
831                               &pos->file_id,
832                               sizeof (GNUNET_HashCode))) ||
833            (GNUNET_OK !=
834             GNUNET_BIO_write_string (wh,
835                                      pos->filename)) )
836         break;
837       pos = pos->next;
838     }
839   if (GNUNET_OK != 
840       GNUNET_BIO_write_close (wh))
841     {
842       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
843                   _("Error writing `%s'.\n"),
844                   fn);
845       GNUNET_free (fn);
846       return;
847     }
848   GNUNET_free (fn);
849 }
850
851
852 /**
853  * Read index information from disk.
854  */
855 static void
856 read_index_list ()
857 {
858   struct GNUNET_BIO_ReadHandle *rh;
859   char *fn;
860   struct IndexInfo *pos;  
861   char *fname;
862   GNUNET_HashCode hc;
863   size_t slen;
864   char *emsg;
865
866   if (GNUNET_OK !=
867       GNUNET_CONFIGURATION_get_value_filename (cfg,
868                                                "FS",
869                                                "INDEXDB",
870                                                &fn))
871     {
872       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
873                   _("Configuration option `%s' in section `%s' missing.\n"),
874                   "INDEXDB",
875                   "FS");
876       return;
877     }
878   if (GNUNET_NO == GNUNET_DISK_file_test (fn))
879     {
880       /* no index info  yet */
881       GNUNET_free (fn);
882       return;
883     }
884   rh = GNUNET_BIO_read_open (fn);
885   if (NULL == rh)
886     {
887       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
888                   _("Could not open `%s'.\n"),
889                   fn);
890       GNUNET_free (fn);
891       return;
892     }
893
894   while ( (GNUNET_OK ==
895            GNUNET_BIO_read (rh,
896                             "Hash of indexed file",
897                             &hc,
898                             sizeof (GNUNET_HashCode))) &&
899           (GNUNET_OK ==
900            GNUNET_BIO_read_string (rh, 
901                                    "Name of indexed file",
902                                    &fname,
903                                    1024 * 16)) )
904     {
905       slen = strlen (fname) + 1;
906       pos = GNUNET_malloc (sizeof (struct IndexInfo) + slen);
907       pos->file_id = hc;
908       pos->filename = (const char *) &pos[1];
909       memcpy (&pos[1], fname, slen);
910       if (GNUNET_SYSERR ==
911           GNUNET_CONTAINER_multihashmap_put (ifm,
912                                              &hc,
913                                              (void*) pos->filename,
914                                              GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
915         {
916           GNUNET_free (pos);
917         }
918       else
919         {
920           pos->next = indexed_files;
921           indexed_files = pos;
922         }
923       GNUNET_free (fname);
924     }
925   if (GNUNET_OK != 
926       GNUNET_BIO_read_close (rh, &emsg))
927     GNUNET_free (emsg);
928   GNUNET_free (fn);
929 }
930
931
932 /**
933  * We've validated the hash of the file we're about to
934  * index.  Signal success to the client and update
935  * our internal data structures.
936  *
937  * @param ii the index info entry for the request
938  */
939 static void
940 signal_index_ok (struct IndexInfo *ii)
941 {
942   if (GNUNET_SYSERR ==
943       GNUNET_CONTAINER_multihashmap_put (ifm,
944                                          &ii->file_id,
945                                          (void*) ii->filename,
946                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
947     {
948       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
949                   _("Index request received for file `%s' is indexed as `%s'.  Permitting anyway.\n"),
950                   ii->filename,
951                   (const char*) GNUNET_CONTAINER_multihashmap_get (ifm,
952                                                                    &ii->file_id));
953       GNUNET_SERVER_transmit_context_append (ii->tc,
954                                              NULL, 0,
955                                              GNUNET_MESSAGE_TYPE_FS_INDEX_START_OK);
956       GNUNET_SERVER_transmit_context_run (ii->tc,
957                                           GNUNET_TIME_UNIT_MINUTES);
958       GNUNET_free (ii);
959       return;
960     }
961   ii->next = indexed_files;
962   indexed_files = ii;
963   write_index_list ();
964   GNUNET_SERVER_transmit_context_append (ii->tc,
965                                          NULL, 0,
966                                          GNUNET_MESSAGE_TYPE_FS_INDEX_START_OK);
967   GNUNET_SERVER_transmit_context_run (ii->tc,
968                                       GNUNET_TIME_UNIT_MINUTES);
969   ii->tc = NULL;
970 }
971
972
973 /**
974  * Function called once the hash computation over an
975  * indexed file has completed.
976  *
977  * @param cls closure, our publishing context
978  * @param res resulting hash, NULL on error
979  */
980 static void 
981 hash_for_index_val (void *cls,
982                     const GNUNET_HashCode *
983                     res)
984 {
985   struct IndexInfo *ii = cls;
986   
987   if ( (res == NULL) ||
988        (0 != memcmp (res,
989                      &ii->file_id,
990                      sizeof(GNUNET_HashCode))) )
991     {
992       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
993                   _("Hash mismatch trying to index file `%s' which has hash `%s'\n"),
994                   ii->filename,
995                   GNUNET_h2s (res));
996 #if DEBUG_FS
997       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
998                   "Wanted `%s'\n",
999                   GNUNET_h2s (&ii->file_id));
1000 #endif
1001       GNUNET_SERVER_transmit_context_append (ii->tc,
1002                                              NULL, 0,
1003                                              GNUNET_MESSAGE_TYPE_FS_INDEX_START_FAILED);
1004       GNUNET_SERVER_transmit_context_run (ii->tc,
1005                                           GNUNET_TIME_UNIT_MINUTES);
1006       GNUNET_free (ii);
1007       return;
1008     }
1009   signal_index_ok (ii);
1010 }
1011
1012
1013 /**
1014  * Handle INDEX_START-message.
1015  *
1016  * @param cls closure
1017  * @param client identification of the client
1018  * @param message the actual message
1019  */
1020 static void
1021 handle_index_start (void *cls,
1022                     struct GNUNET_SERVER_Client *client,
1023                     const struct GNUNET_MessageHeader *message)
1024 {
1025   const struct IndexStartMessage *ism;
1026   const char *fn;
1027   uint16_t msize;
1028   struct IndexInfo *ii;
1029   size_t slen;
1030   uint32_t dev;
1031   uint64_t ino;
1032   uint32_t mydev;
1033   uint64_t myino;
1034
1035   msize = ntohs(message->size);
1036   if ( (msize <= sizeof (struct IndexStartMessage)) ||
1037        ( ((const char *)message)[msize-1] != '\0') )
1038     {
1039       GNUNET_break (0);
1040       GNUNET_SERVER_receive_done (client,
1041                                   GNUNET_SYSERR);
1042       return;
1043     }
1044   ism = (const struct IndexStartMessage*) message;
1045   fn = (const char*) &ism[1];
1046   dev = ntohl (ism->device);
1047   ino = GNUNET_ntohll (ism->inode);
1048   ism = (const struct IndexStartMessage*) message;
1049   slen = strlen (fn) + 1;
1050   ii = GNUNET_malloc (sizeof (struct IndexInfo) + slen);
1051   ii->filename = (const char*) &ii[1];
1052   memcpy (&ii[1], fn, slen);
1053   ii->file_id = ism->file_id;  
1054   ii->tc = GNUNET_SERVER_transmit_context_create (client);
1055   if ( ( (dev != 0) ||
1056          (ino != 0) ) &&
1057        (GNUNET_OK == GNUNET_DISK_file_get_identifiers (fn,
1058                                                        &mydev,
1059                                                        &myino)) &&
1060        ( (dev == mydev) &&
1061          (ino == myino) ) )
1062     {      
1063       /* fast validation OK! */
1064       signal_index_ok (ii);
1065       return;
1066     }
1067 #if DEBUG_FS
1068   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1069               "Mismatch in file identifiers (%llu != %llu or %u != %u), need to hash.\n",
1070               (unsigned long long) ino,
1071               (unsigned long long) myino,
1072               (unsigned int) dev,
1073               (unsigned int) mydev);
1074 #endif
1075   /* slow validation, need to hash full file (again) */
1076   GNUNET_CRYPTO_hash_file (sched,
1077                            GNUNET_SCHEDULER_PRIORITY_IDLE,
1078                            GNUNET_NO,
1079                            fn,
1080                            HASHING_BLOCKSIZE,
1081                            &hash_for_index_val,
1082                            ii);
1083 }
1084
1085
1086 /**
1087  * Handle INDEX_LIST_GET-message.
1088  *
1089  * @param cls closure
1090  * @param client identification of the client
1091  * @param message the actual message
1092  */
1093 static void
1094 handle_index_list_get (void *cls,
1095                        struct GNUNET_SERVER_Client *client,
1096                        const struct GNUNET_MessageHeader *message)
1097 {
1098   struct GNUNET_SERVER_TransmitContext *tc;
1099   struct IndexInfoMessage *iim;
1100   char buf[GNUNET_SERVER_MAX_MESSAGE_SIZE];
1101   size_t slen;
1102   const char *fn;
1103   struct GNUNET_MessageHeader *msg;
1104   struct IndexInfo *pos;
1105
1106   tc = GNUNET_SERVER_transmit_context_create (client);
1107   iim = (struct IndexInfoMessage*) buf;
1108   msg = &iim->header;
1109   pos = indexed_files;
1110   while (NULL != pos)
1111     {
1112       iim->reserved = 0;
1113       iim->file_id = pos->file_id;
1114       fn = pos->filename;
1115       slen = strlen (fn) + 1;
1116       if (slen + sizeof (struct IndexInfoMessage) > 
1117           GNUNET_SERVER_MAX_MESSAGE_SIZE)
1118         {
1119           GNUNET_break (0);
1120           break;
1121         }
1122       memcpy (&iim[1], fn, slen);
1123       GNUNET_SERVER_transmit_context_append
1124         (tc,
1125          &msg[1],
1126          sizeof (struct IndexInfoMessage) 
1127          - sizeof (struct GNUNET_MessageHeader) + slen,
1128          GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_ENTRY);
1129       pos = pos->next;
1130     }
1131   GNUNET_SERVER_transmit_context_append (tc,
1132                                          NULL, 0,
1133                                          GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_END);
1134   GNUNET_SERVER_transmit_context_run (tc,
1135                                       GNUNET_TIME_UNIT_MINUTES);
1136 }
1137
1138
1139 /**
1140  * Handle UNINDEX-message.
1141  *
1142  * @param cls closure
1143  * @param client identification of the client
1144  * @param message the actual message
1145  */
1146 static void
1147 handle_unindex (void *cls,
1148                 struct GNUNET_SERVER_Client *client,
1149                 const struct GNUNET_MessageHeader *message)
1150 {
1151   const struct UnindexMessage *um;
1152   struct IndexInfo *pos;
1153   struct IndexInfo *prev;
1154   struct IndexInfo *next;
1155   struct GNUNET_SERVER_TransmitContext *tc;
1156   int found;
1157   
1158   um = (const struct UnindexMessage*) message;
1159   found = GNUNET_NO;
1160   prev = NULL;
1161   pos = indexed_files;
1162   while (NULL != pos)
1163     {
1164       next = pos->next;
1165       if (0 == memcmp (&pos->file_id,
1166                        &um->file_id,
1167                        sizeof (GNUNET_HashCode)))
1168         {
1169           if (prev == NULL)
1170             indexed_files = pos->next;
1171           else
1172             prev->next = pos->next;
1173           GNUNET_free (pos);
1174           found = GNUNET_YES;
1175         }
1176       else
1177         {
1178           prev = pos;
1179         }
1180       pos = next;
1181     }
1182 #if DEBUG_FS
1183   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1184               "Client requested unindexing of file `%s': %s\n",
1185               GNUNET_h2s (&um->file_id),
1186               found ? "found" : "not found");
1187 #endif
1188   if (GNUNET_YES == found)    
1189     write_index_list ();
1190   tc = GNUNET_SERVER_transmit_context_create (client);
1191   GNUNET_SERVER_transmit_context_append (tc,
1192                                          NULL, 0,
1193                                          GNUNET_MESSAGE_TYPE_FS_UNINDEX_OK);
1194   GNUNET_SERVER_transmit_context_run (tc,
1195                                       GNUNET_TIME_UNIT_MINUTES);
1196 }
1197
1198
1199 /**
1200  * Run the next DS request in our
1201  * queue, we're done with the current one.
1202  */
1203 static void
1204 next_ds_request ()
1205 {
1206   struct DatastoreRequestQueue *e;
1207   
1208   while (NULL != (e = drq_head))
1209     {
1210       if (0 != GNUNET_TIME_absolute_get_remaining (e->timeout).value)
1211         break;
1212       if (e->task != GNUNET_SCHEDULER_NO_TASK)
1213         GNUNET_SCHEDULER_cancel (sched, e->task);
1214       GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
1215       e->req (e->req_cls, GNUNET_NO);
1216       GNUNET_free (e);  
1217     }
1218   if (e == NULL)
1219     return;
1220   if (e->task != GNUNET_SCHEDULER_NO_TASK)
1221     GNUNET_SCHEDULER_cancel (sched, e->task);
1222   e->task = GNUNET_SCHEDULER_NO_TASK;
1223   e->req (e->req_cls, GNUNET_YES);
1224   GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
1225   GNUNET_free (e);  
1226 }
1227
1228
1229 /**
1230  * A datastore request had to be timed out. 
1231  *
1232  * @param cls closure (of type "struct DatastoreRequestQueue*")
1233  * @param tc task context, unused
1234  */
1235 static void
1236 timeout_ds_request (void *cls,
1237                     const struct GNUNET_SCHEDULER_TaskContext *tc)
1238 {
1239   struct DatastoreRequestQueue *e = cls;
1240
1241   e->task = GNUNET_SCHEDULER_NO_TASK;
1242   GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
1243   e->req (e->req_cls, GNUNET_NO);
1244   GNUNET_free (e);  
1245 }
1246
1247
1248 /**
1249  * Queue a request for the datastore.
1250  *
1251  * @param deadline by when the request should run
1252  * @param fun function to call once the request can be run
1253  * @param fun_cls closure for fun
1254  */
1255 static struct DatastoreRequestQueue *
1256 queue_ds_request (struct GNUNET_TIME_Relative deadline,
1257                   RequestFunction fun,
1258                   void *fun_cls)
1259 {
1260   struct DatastoreRequestQueue *e;
1261   struct DatastoreRequestQueue *bef;
1262
1263   if (drq_head == NULL)
1264     {
1265       /* no other requests pending, run immediately */
1266       fun (fun_cls, GNUNET_OK);
1267       return NULL;
1268     }
1269   e = GNUNET_malloc (sizeof (struct DatastoreRequestQueue));
1270   e->timeout = GNUNET_TIME_relative_to_absolute (deadline);
1271   e->req = fun;
1272   e->req_cls = fun_cls;
1273   if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
1274     {
1275       /* local request, highest prio, put at head of queue
1276          regardless of deadline */
1277       bef = NULL;
1278     }
1279   else
1280     {
1281       bef = drq_tail;
1282       while ( (NULL != bef) &&
1283               (e->timeout.value < bef->timeout.value) )
1284         bef = bef->prev;
1285     }
1286   GNUNET_CONTAINER_DLL_insert_after (drq_head, drq_tail, bef, e);
1287   if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
1288     return e;
1289   e->task = GNUNET_SCHEDULER_add_delayed (sched,
1290                                           GNUNET_NO,
1291                                           GNUNET_SCHEDULER_PRIORITY_BACKGROUND,
1292                                           GNUNET_SCHEDULER_NO_TASK,
1293                                           deadline,
1294                                           &timeout_ds_request,
1295                                           e);
1296   return e;                                    
1297 }
1298
1299
1300 /**
1301  * Free the state associated with a local get context.
1302  *
1303  * @param lgc the lgc to free
1304  */
1305 static void
1306 local_get_context_free (struct LocalGetContext *lgc) 
1307 {
1308   GNUNET_CONTAINER_DLL_remove (lgc_head, lgc_tail, lgc);
1309   GNUNET_SERVER_client_drop (lgc->client); 
1310   GNUNET_free_non_null (lgc->results);
1311   if (lgc->results_bf != NULL)
1312     GNUNET_CONTAINER_bloomfilter_free (lgc->results_bf);
1313   if (lgc->req != NULL)
1314     {
1315       if (lgc->req->task != GNUNET_SCHEDULER_NO_TASK)
1316         GNUNET_SCHEDULER_cancel (sched, lgc->req->task);
1317       GNUNET_CONTAINER_DLL_remove (lgc_head, lgc_tail, lgc);
1318       GNUNET_free (lgc->req);
1319     }
1320   GNUNET_free (lgc);
1321 }
1322
1323
1324 /**
1325  * We're able to transmit the next (local) result to the client.
1326  * Do it and ask the datastore for more.  Or, on error, tell
1327  * the datastore to stop giving us more.
1328  *
1329  * @param cls our closure (struct LocalGetContext)
1330  * @param max maximum number of bytes we can transmit
1331  * @param buf where to copy our message
1332  * @return number of bytes copied to buf
1333  */
1334 static size_t
1335 transmit_local_result (void *cls,
1336                        size_t max,
1337                        void *buf)
1338 {
1339   struct LocalGetContext *lgc = cls;  
1340   uint16_t msize;
1341
1342   if (NULL == buf)
1343     {
1344 #if DEBUG_FS
1345       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1346                   "Failed to transmit result to local client, aborting datastore iteration.\n");
1347 #endif
1348       /* error, abort! */
1349       GNUNET_free (lgc->result);
1350       lgc->result = NULL;
1351       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
1352       return 0;
1353     }
1354   msize = ntohs (lgc->result->header.size);
1355 #if DEBUG_FS
1356   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1357               "Transmitting %u bytes of result to local client.\n",
1358               msize);
1359 #endif
1360   GNUNET_assert (max >= msize);
1361   memcpy (buf, lgc->result, msize);
1362   GNUNET_free (lgc->result);
1363   lgc->result = NULL;
1364   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1365   return msize;
1366 }
1367
1368
1369 /**
1370  * Continuation called from datastore's remove
1371  * function.
1372  *
1373  * @param cls unused
1374  * @param success did the deletion work?
1375  * @param msg error message
1376  */
1377 static void
1378 remove_cont (void *cls,
1379              int success,
1380              const char *msg)
1381 {
1382   if (GNUNET_OK != success)
1383     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1384                 _("Failed to delete bogus block: %s\n"),
1385                 msg);
1386   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1387 }
1388
1389
1390 /**
1391  * Mingle hash with the mingle_number to
1392  * produce different bits.
1393  */
1394 static void
1395 mingle_hash (const GNUNET_HashCode * in,
1396              int32_t mingle_number, 
1397              GNUNET_HashCode * hc)
1398 {
1399   GNUNET_HashCode m;
1400
1401   GNUNET_CRYPTO_hash (&mingle_number, 
1402                       sizeof (int32_t), 
1403                       &m);
1404   GNUNET_CRYPTO_hash_xor (&m, in, hc);
1405 }
1406
1407
1408 /**
1409  * We've received an on-demand encoded block
1410  * from the datastore.  Attempt to do on-demand
1411  * encoding and (if successful), call the 
1412  * continuation with the resulting block.  On
1413  * error, clean up and ask the datastore for
1414  * more results.
1415  *
1416  * @param key key for the content
1417  * @param size number of bytes in data
1418  * @param data content stored
1419  * @param type type of the content
1420  * @param priority priority of the content
1421  * @param anonymity anonymity-level for the content
1422  * @param expiration expiration time for the content
1423  * @param uid unique identifier for the datum;
1424  *        maybe 0 if no unique identifier is available
1425  * @param cont function to call with the actual block
1426  * @param cont_cls closure for cont
1427  */
1428 static void
1429 handle_on_demand_block (const GNUNET_HashCode * key,
1430                         uint32_t size,
1431                         const void *data,
1432                         uint32_t type,
1433                         uint32_t priority,
1434                         uint32_t anonymity,
1435                         struct GNUNET_TIME_Absolute
1436                         expiration, uint64_t uid,
1437                         GNUNET_DATASTORE_Iterator cont,
1438                         void *cont_cls)
1439 {
1440   const struct OnDemandBlock *odb;
1441   GNUNET_HashCode nkey;
1442   struct GNUNET_CRYPTO_AesSessionKey skey;
1443   struct GNUNET_CRYPTO_AesInitializationVector iv;
1444   GNUNET_HashCode query;
1445   ssize_t nsize;
1446   char ndata[DBLOCK_SIZE];
1447   char edata[DBLOCK_SIZE];
1448   const char *fn;
1449   struct GNUNET_DISK_FileHandle *fh;
1450   uint64_t off;
1451
1452   if (size != sizeof (struct OnDemandBlock))
1453     {
1454       GNUNET_break (0);
1455       GNUNET_DATASTORE_remove (dsh, 
1456                                key,
1457                                size,
1458                                data,
1459                                &remove_cont,
1460                                NULL,
1461                                GNUNET_TIME_UNIT_FOREVER_REL);     
1462       return;
1463     }
1464   odb = (const struct OnDemandBlock*) data;
1465   off = GNUNET_ntohll (odb->offset);
1466   fn = (const char*) GNUNET_CONTAINER_multihashmap_get (ifm,
1467                                                         &odb->file_id);
1468   fh = NULL;
1469   if ( (NULL == fn) ||
1470        (NULL == (fh = GNUNET_DISK_file_open (fn, 
1471                                              GNUNET_DISK_OPEN_READ,
1472                                              GNUNET_DISK_PERM_NONE))) ||
1473        (off !=
1474         GNUNET_DISK_file_seek (fh,
1475                                off,
1476                                GNUNET_DISK_SEEK_SET)) ||
1477        (-1 ==
1478         (nsize = GNUNET_DISK_file_read (fh,
1479                                         ndata,
1480                                         sizeof (ndata)))) )
1481     {
1482       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1483                   _("Could not access indexed file `%s' at offset %llu: %s\n"),
1484                   GNUNET_h2s (&odb->file_id),
1485                   (unsigned long long) off,
1486                   STRERROR (errno));
1487       if (fh != NULL)
1488         GNUNET_DISK_file_close (fh);
1489       /* FIXME: if this happens often, we need
1490          to remove the OnDemand block from the DS! */
1491       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);        
1492       return;
1493     }
1494   GNUNET_DISK_file_close (fh);
1495   GNUNET_CRYPTO_hash (ndata,
1496                       nsize,
1497                       &nkey);
1498   GNUNET_CRYPTO_hash_to_aes_key (&nkey, &skey, &iv);
1499   GNUNET_CRYPTO_aes_encrypt (ndata,
1500                              nsize,
1501                              &skey,
1502                              &iv,
1503                              edata);
1504   GNUNET_CRYPTO_hash (edata,
1505                       nsize,
1506                       &query);
1507   if (0 != memcmp (&query, 
1508                    key,
1509                    sizeof (GNUNET_HashCode)))
1510     {
1511       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1512                   _("Indexed file `%s' changed at offset %llu\n"),
1513                   fn,
1514                   (unsigned long long) off);
1515       /* FIXME: if this happens often, we need
1516          to remove the OnDemand block from the DS! */
1517       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1518       return;
1519     }
1520   cont (cont_cls,
1521         key,
1522         nsize,
1523         edata,
1524         GNUNET_DATASTORE_BLOCKTYPE_DBLOCK,
1525         priority,
1526         anonymity,
1527         expiration,
1528         uid);
1529 }
1530
1531
1532 /**
1533  * How many bytes should a bloomfilter be if we have already seen
1534  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
1535  * of bits set per entry.  Furthermore, we should not re-size the
1536  * filter too often (to keep it cheap).
1537  *
1538  * Since other peers will also add entries but not resize the filter,
1539  * we should generally pick a slightly larger size than what the
1540  * strict math would suggest.
1541  *
1542  * @return must be a power of two and smaller or equal to 2^15.
1543  */
1544 static size_t
1545 compute_bloomfilter_size (unsigned int entry_count)
1546 {
1547   size_t size;
1548   unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
1549   uint16_t max = 1 << 15;
1550
1551   if (entry_count > max)
1552     return max;
1553   size = 8;
1554   while ((size < max) && (size < ideal))
1555     size *= 2;
1556   if (size > max)
1557     return max;
1558   return size;
1559 }
1560
1561
1562 /**
1563  * Recalculate our bloom filter for filtering replies.
1564  *
1565  * @param count number of entries we are filtering right now
1566  * @param mingle set to our new mingling value
1567  * @param bf_size set to the size of the bloomfilter
1568  * @param entries the entries to filter
1569  * @return updated bloomfilter, NULL for none
1570  */
1571 static struct GNUNET_CONTAINER_BloomFilter *
1572 refresh_bloomfilter (unsigned int count,
1573                      int32_t * mingle,
1574                      size_t *bf_size,
1575                      const GNUNET_HashCode *entries)
1576 {
1577   struct GNUNET_CONTAINER_BloomFilter *bf;
1578   size_t nsize;
1579   unsigned int i;
1580   GNUNET_HashCode mhash;
1581
1582   if (0 == count)
1583     return NULL;
1584   nsize = compute_bloomfilter_size (count);
1585   *mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
1586   *bf_size = nsize;
1587   bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
1588                                           nsize,
1589                                           BLOOMFILTER_K);
1590   for (i=0;i<count;i++)
1591     {
1592       mingle_hash (&entries[i], *mingle, &mhash);
1593       GNUNET_CONTAINER_bloomfilter_add (bf, &mhash);
1594     }
1595   return bf;
1596 }
1597
1598
1599 /**
1600  * Closure used for "target_peer_select_cb".
1601  */
1602 struct PeerSelectionContext 
1603 {
1604   /**
1605    * The request for which we are selecting
1606    * peers.
1607    */
1608   struct PendingRequest *pr;
1609
1610   /**
1611    * Current "prime" target.
1612    */
1613   struct GNUNET_PeerIdentity target;
1614
1615   /**
1616    * How much do we like this target?
1617    */
1618   double target_score;
1619
1620 };
1621
1622
1623 /**
1624  * Function called for each connected peer to determine
1625  * which one(s) would make good targets for forwarding.
1626  *
1627  * @param cls closure (struct PeerSelectionContext)
1628  * @param key current key code (peer identity)
1629  * @param value value in the hash map (struct ConnectedPeer)
1630  * @return GNUNET_YES if we should continue to
1631  *         iterate,
1632  *         GNUNET_NO if not.
1633  */
1634 static int
1635 target_peer_select_cb (void *cls,
1636                        const GNUNET_HashCode * key,
1637                        void *value)
1638 {
1639   struct PeerSelectionContext *psc = cls;
1640   struct ConnectedPeer *cp = value;
1641   struct PendingRequest *pr = psc->pr;
1642   double score;
1643   unsigned int i;
1644
1645   /* 1) check if we have already (recently) forwarded to this peer */
1646   for (i=0;i<pr->used_pids_off;i++)
1647     if (pr->used_pids[i] == cp->pid)
1648       return GNUNET_YES; /* skip */
1649   // 2) calculate how much we'd like to forward to this peer
1650   score = 0; // FIXME!
1651   
1652   /* store best-fit in closure */
1653   if (score > psc->target_score)
1654     {
1655       psc->target_score = score;
1656       psc->target.hashPubKey = *key; 
1657     }
1658   return GNUNET_YES;
1659 }
1660
1661
1662 /**
1663  * We use a random delay to make the timing of requests
1664  * less predictable.  This function returns such a random
1665  * delay.
1666  *
1667  * @return random delay to use for some request, between 0 and TTL_DECREMENT ms
1668  */
1669 static struct GNUNET_TIME_Relative
1670 get_processing_delay ()
1671 {
1672   return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
1673                                         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1674                                                                   TTL_DECREMENT));
1675 }
1676
1677
1678 /**
1679  * Task that is run for each request with the
1680  * goal of forwarding the associated query to
1681  * other peers.  The task should re-schedule
1682  * itself to be re-run once the TTL has expired.
1683  * (or at a later time if more peers should
1684  * be queried earlier).
1685  *
1686  * @param cls the requests "struct PendingRequest*"
1687  * @param tc task context (unused)
1688  */
1689 static void
1690 forward_request_task (void *cls,
1691                       const struct GNUNET_SCHEDULER_TaskContext *tc);
1692
1693
1694 /**
1695  * We've selected a peer for forwarding of a query.
1696  * Construct the message and then re-schedule the
1697  * task to forward again to (other) peers.
1698  *
1699  * @param cls closure
1700  * @param size number of bytes available in buf
1701  * @param buf where the callee should write the message
1702  * @return number of bytes written to buf
1703  */
1704 static size_t
1705 transmit_request_cb (void *cls,
1706                      size_t size, 
1707                      void *buf)
1708 {
1709   struct PendingRequest *pr = cls;
1710   struct GetMessage *gm;
1711   GNUNET_HashCode *ext;
1712   char *bfdata;
1713   size_t msize;
1714   unsigned int k;
1715
1716   pr->cth = NULL;
1717   /* (1) check for timeout */
1718   if (NULL == buf)
1719     {
1720       /* timeout, try another peer immediately again */
1721       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1722                                                GNUNET_NO,
1723                                                GNUNET_SCHEDULER_PRIORITY_IDLE,
1724                                                GNUNET_SCHEDULER_NO_TASK,
1725                                                GNUNET_TIME_UNIT_ZERO,
1726                                                &forward_request_task,
1727                                                pr);
1728       return 0;
1729     }
1730   /* (2) build query message */
1731   k = 0; // FIXME: count hash codes!
1732   msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
1733   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1734   gm = (struct GetMessage*) buf;
1735   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
1736   gm->header.size = htons (msize);
1737   gm->type = htonl (pr->type);
1738   pr->remaining_priority /= 2;
1739   gm->priority = htonl (pr->remaining_priority);
1740   gm->ttl = htonl (pr->ttl);
1741   gm->filter_mutator = htonl(pr->mingle);
1742   gm->hash_bitmap = htonl (42);
1743   gm->query = pr->query;
1744   ext = (GNUNET_HashCode*) &gm[1];
1745   // FIXME: setup "ext[0]..[k-1]"
1746   bfdata = (char *) &ext[k];
1747   if (pr->bf != NULL)
1748     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
1749                                                bfdata,
1750                                                pr->bf_size);
1751   
1752   /* (3) schedule job to do it again (or another peer, etc.) */
1753   pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1754                                            GNUNET_NO,
1755                                            GNUNET_SCHEDULER_PRIORITY_IDLE,
1756                                            GNUNET_SCHEDULER_NO_TASK,
1757                                            get_processing_delay (), // FIXME!
1758                                            &forward_request_task,
1759                                            pr);
1760
1761   return msize;
1762 }
1763
1764
1765 /**
1766  * Function called after we've tried to reserve
1767  * a certain amount of bandwidth for a reply.
1768  * Check if we succeeded and if so send our query.
1769  *
1770  * @param cls the requests "struct PendingRequest*"
1771  * @param peer identifies the peer
1772  * @param latency current latency estimate, "FOREVER" if we have been
1773  *                disconnected
1774  * @param bpm_in set to the current bandwidth limit (receiving) for this peer
1775  * @param bpm_out set to the current bandwidth limit (sending) for this peer
1776  * @param amount set to the amount that was actually reserved or unreserved
1777  * @param preference current traffic preference for the given peer
1778  */
1779 static void
1780 target_reservation_cb (void *cls,
1781                        const struct
1782                        GNUNET_PeerIdentity * peer,
1783                        unsigned int bpm_in,
1784                        unsigned int bpm_out,
1785                        struct GNUNET_TIME_Relative
1786                        latency, int amount,
1787                        unsigned long long preference)
1788 {
1789   struct PendingRequest *pr = cls;
1790   uint32_t priority;
1791   uint16_t size;
1792   struct GNUNET_TIME_Relative maxdelay;
1793
1794   GNUNET_assert (peer != NULL);
1795   if ( (amount != DBLOCK_SIZE) ||
1796        (pr->cth != NULL) )
1797     {
1798       /* try again later; FIXME: we may need to un-reserve "amount"? */
1799       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1800                                                GNUNET_NO,
1801                                                GNUNET_SCHEDULER_PRIORITY_IDLE,
1802                                                GNUNET_SCHEDULER_NO_TASK,
1803                                                get_processing_delay (), // FIXME: longer?
1804                                                &forward_request_task,
1805                                                pr);
1806       return;
1807     }
1808   // (2) transmit, update ttl/priority
1809   // FIXME: calculate priority, maxdelay, size properly!
1810   priority = 0;
1811   size = 60000;
1812   maxdelay = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
1813   pr->cth = GNUNET_CORE_notify_transmit_ready (core,
1814                                                priority,
1815                                                maxdelay,
1816                                                peer,
1817                                                size,
1818                                                &transmit_request_cb,
1819                                                pr);
1820   if (pr->cth == NULL)
1821     {
1822       /* try again later */
1823       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1824                                                GNUNET_NO,
1825                                                GNUNET_SCHEDULER_PRIORITY_IDLE,
1826                                                GNUNET_SCHEDULER_NO_TASK,
1827                                                get_processing_delay (), // FIXME: longer?
1828                                                &forward_request_task,
1829                                                pr);
1830     }
1831 }
1832
1833
1834 /**
1835  * Task that is run for each request with the
1836  * goal of forwarding the associated query to
1837  * other peers.  The task should re-schedule
1838  * itself to be re-run once the TTL has expired.
1839  * (or at a later time if more peers should
1840  * be queried earlier).
1841  *
1842  * @param cls the requests "struct PendingRequest*"
1843  * @param tc task context (unused)
1844  */
1845 static void
1846 forward_request_task (void *cls,
1847                       const struct GNUNET_SCHEDULER_TaskContext *tc)
1848 {
1849   struct PendingRequest *pr = cls;
1850   struct PeerSelectionContext psc;
1851
1852   pr->task = GNUNET_SCHEDULER_NO_TASK;
1853   if (pr->cth != NULL) 
1854     {
1855       /* we're busy transmitting a result, wait a bit */
1856       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1857                                                GNUNET_NO,
1858                                                GNUNET_SCHEDULER_PRIORITY_IDLE,
1859                                                GNUNET_SCHEDULER_NO_TASK,
1860                                                get_processing_delay (), 
1861                                                &forward_request_task,
1862                                                pr);
1863       return;
1864     }
1865   /* (1) select target */
1866   psc.pr = pr;
1867   psc.target_score = DBL_MIN;
1868   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1869                                          &target_peer_select_cb,
1870                                          &psc);
1871   if (psc.target_score == DBL_MIN)
1872     {
1873       /* no possible target found, wait some time */
1874       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1875                                                GNUNET_NO,
1876                                                GNUNET_SCHEDULER_PRIORITY_IDLE,
1877                                                GNUNET_SCHEDULER_NO_TASK,
1878                                                get_processing_delay (), // FIXME: exponential back-off? or at least wait longer...
1879                                                &forward_request_task,
1880                                                pr);
1881       return;
1882     }
1883   /* (2) reserve reply bandwidth */
1884   // FIXME: need a way to cancel; this
1885   // async operation is problematic (segv-problematic)
1886   // if "pr" is destroyed while it happens!
1887   GNUNET_CORE_peer_configure (core,
1888                               &psc.target,
1889                               GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
1890                               -1,
1891                               DBLOCK_SIZE, // FIXME: make dependent on type?
1892                               0,
1893                               &target_reservation_cb,
1894                               pr);
1895 }
1896
1897
1898 /**
1899  * We're processing (local) results for a search request
1900  * from a (local) client.  Pass applicable results to the
1901  * client and if we are done either clean up (operation
1902  * complete) or switch to P2P search (more results possible).
1903  *
1904  * @param cls our closure (struct LocalGetContext)
1905  * @param key key for the content
1906  * @param size number of bytes in data
1907  * @param data content stored
1908  * @param type type of the content
1909  * @param priority priority of the content
1910  * @param anonymity anonymity-level for the content
1911  * @param expiration expiration time for the content
1912  * @param uid unique identifier for the datum;
1913  *        maybe 0 if no unique identifier is available
1914  */
1915 static void
1916 process_local_get_result (void *cls,
1917                           const GNUNET_HashCode * key,
1918                           uint32_t size,
1919                           const void *data,
1920                           uint32_t type,
1921                           uint32_t priority,
1922                           uint32_t anonymity,
1923                           struct GNUNET_TIME_Absolute
1924                           expiration, 
1925                           uint64_t uid)
1926 {
1927   struct LocalGetContext *lgc = cls;
1928   struct PendingRequest *pr;
1929   struct ClientRequestList *crl;
1930   struct ClientList *cl;
1931   size_t msize;
1932   unsigned int i;
1933
1934   if (key == NULL)
1935     {
1936 #if DEBUG_FS
1937       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1938                   "Received last result for `%s' from local datastore, deciding what to do next.\n",
1939                   GNUNET_h2s (&lgc->query));
1940 #endif
1941       /* no further results from datastore; continue
1942          processing further requests from the client and
1943          allow the next task to use the datastore; also,
1944          switch to P2P requests or clean up our state. */
1945       next_ds_request ();
1946       GNUNET_SERVER_receive_done (lgc->client,
1947                                   GNUNET_OK);
1948       if ( (lgc->results_used == 0) ||
1949            (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
1950            (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
1951            (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) )
1952         {
1953 #if DEBUG_FS
1954           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1955                       "Forwarding query for `%s' to network.\n",
1956                       GNUNET_h2s (&lgc->query));
1957 #endif
1958           cl = clients;
1959           while ( (NULL != cl) &&
1960                   (cl->client != lgc->client) )
1961             cl = cl->next;
1962           if (cl == NULL)
1963             {
1964               cl = GNUNET_malloc (sizeof (struct ClientList));
1965               cl->client = lgc->client;
1966               cl->next = clients;
1967               clients = cl;
1968             }
1969           crl = GNUNET_malloc (sizeof (struct ClientRequestList));
1970           crl->cl = cl;
1971           GNUNET_CONTAINER_DLL_insert (cl->head, cl->tail, crl);
1972           pr = GNUNET_malloc (sizeof (struct PendingRequest));
1973           pr->client = lgc->client;
1974           GNUNET_SERVER_client_keep (pr->client);
1975           pr->crl_entry = crl;
1976           crl->req = pr;
1977           if (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)
1978             {
1979               pr->namespace = GNUNET_malloc (sizeof (GNUNET_HashCode));
1980               *pr->namespace = lgc->namespace;
1981             }
1982           pr->replies_seen = lgc->results;
1983           lgc->results = NULL;
1984           pr->start_time = GNUNET_TIME_absolute_get ();
1985           pr->query = lgc->query;
1986           pr->target_pid = GNUNET_PEER_intern (&lgc->target);
1987           pr->replies_seen_off = lgc->results_used;
1988           pr->replies_seen_size = lgc->results_size;
1989           lgc->results_size = 0;
1990           pr->type = lgc->type;
1991           pr->anonymity_level = lgc->anonymity_level;
1992           pr->bf = refresh_bloomfilter (pr->replies_seen_off,
1993                                         &pr->mingle,
1994                                         &pr->bf_size,
1995                                         pr->replies_seen);
1996           GNUNET_CONTAINER_multihashmap_put (requests_by_query,
1997                                              &pr->query,
1998                                              pr,
1999                                              GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
2000           pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2001                                                    GNUNET_NO,
2002                                                    GNUNET_SCHEDULER_PRIORITY_IDLE,
2003                                                    GNUNET_SCHEDULER_NO_TASK,
2004                                                    get_processing_delay (),
2005                                                    &forward_request_task,
2006                                                    pr);
2007           local_get_context_free (lgc);
2008           return;
2009         }
2010       /* got all possible results, clean up! */
2011 #if DEBUG_FS
2012       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2013                   "Found all possible results for query for `%s', done!\n",
2014                   GNUNET_h2s (&lgc->query));
2015 #endif
2016       local_get_context_free (lgc);
2017       return;
2018     }
2019   if (type == GNUNET_DATASTORE_BLOCKTYPE_ONDEMAND)
2020     {
2021 #if DEBUG_FS
2022       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2023                   "Received on-demand block for `%s' from local datastore, fetching data.\n",
2024                   GNUNET_h2s (&lgc->query));
2025 #endif
2026       handle_on_demand_block (key, size, data, type, priority, 
2027                               anonymity, expiration, uid,
2028                               &process_local_get_result,
2029                               lgc);
2030       return;
2031     }
2032   if ( (type != lgc->type) &&
2033        (lgc->type != GNUNET_DATASTORE_BLOCKTYPE_ANY) )
2034     {
2035       /* this should be virtually impossible to reach (DBLOCK 
2036          query hash being identical to KBLOCK/SBLOCK query hash);
2037          nevertheless, if it happens, the correct thing is to
2038          simply skip the result. */
2039 #if DEBUG_FS
2040       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2041                   "Received block of unexpected type (%u, want %u) for `%s' from local datastore, ignoring.\n",
2042                   type,
2043                   lgc->type,
2044                   GNUNET_h2s (&lgc->query));
2045 #endif
2046       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);        
2047       return;
2048     }
2049   /* check if this is a result we've alredy
2050      received */
2051   for (i=0;i<lgc->results_used;i++)
2052     if (0 == memcmp (key,
2053                      &lgc->results[i],
2054                      sizeof (GNUNET_HashCode)))
2055       {
2056 #if DEBUG_FS
2057         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2058                     "Received duplicate result for `%s' from local datastore, ignoring.\n",
2059                     GNUNET_h2s (&lgc->query));
2060 #endif
2061         GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2062         return; 
2063       }
2064   if (lgc->results_used == lgc->results_size)
2065     GNUNET_array_grow (lgc->results,
2066                        lgc->results_size,
2067                        lgc->results_size * 2 + 2);
2068   GNUNET_CRYPTO_hash (data, 
2069                       size, 
2070                       &lgc->results[lgc->results_used++]);    
2071   msize = size + sizeof (struct ContentMessage);
2072   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
2073   lgc->result = GNUNET_malloc (msize);
2074   lgc->result->header.size = htons (msize);
2075   lgc->result->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT);
2076   lgc->result->type = htonl (type);
2077   lgc->result->expiration = GNUNET_TIME_absolute_hton (expiration);
2078   memcpy (&lgc->result[1],
2079           data,
2080           size);
2081 #if DEBUG_FS
2082   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2083               "Received new result for `%s' from local datastore, passing to client.\n",
2084               GNUNET_h2s (&lgc->query));
2085 #endif
2086   GNUNET_SERVER_notify_transmit_ready (lgc->client,
2087                                        msize,
2088                                        GNUNET_TIME_UNIT_FOREVER_REL,
2089                                        &transmit_local_result,
2090                                        lgc);
2091 }
2092
2093
2094 /**
2095  * We're processing a search request from a local
2096  * client.  Now it is our turn to query the datastore.
2097  * 
2098  * @param cls our closure (struct LocalGetContext)
2099  * @param tc unused
2100  */
2101 static void
2102 transmit_local_get (void *cls,
2103                     const struct GNUNET_SCHEDULER_TaskContext *tc)
2104 {
2105   struct LocalGetContext *lgc = cls;
2106   uint32_t type;
2107   
2108   type = lgc->type;
2109   if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK)
2110     type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */
2111   GNUNET_DATASTORE_get (dsh,
2112                         &lgc->query,
2113                         type,
2114                         &process_local_get_result,
2115                         lgc,
2116                         GNUNET_TIME_UNIT_FOREVER_REL);
2117 }
2118
2119
2120 /**
2121  * We're processing a search request from a local
2122  * client.  Now it is our turn to query the datastore.
2123  * 
2124  * @param cls our closure (struct LocalGetContext)
2125  * @param ok did we succeed to queue for datastore access, should always be GNUNET_OK
2126  */
2127 static void 
2128 transmit_local_get_ready (void *cls,
2129                           int ok)
2130 {
2131   struct LocalGetContext *lgc = cls;
2132
2133   GNUNET_assert (GNUNET_OK == ok);
2134   GNUNET_SCHEDULER_add_continuation (sched,
2135                                      GNUNET_NO,
2136                                      &transmit_local_get,
2137                                      lgc,
2138                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
2139 }
2140
2141
2142 /**
2143  * Handle START_SEARCH-message (search request from client).
2144  *
2145  * @param cls closure
2146  * @param client identification of the client
2147  * @param message the actual message
2148  */
2149 static void
2150 handle_start_search (void *cls,
2151                      struct GNUNET_SERVER_Client *client,
2152                      const struct GNUNET_MessageHeader *message)
2153 {
2154   const struct SearchMessage *sm;
2155   struct LocalGetContext *lgc;
2156   uint16_t msize;
2157   unsigned int sc;
2158   
2159   msize = ntohs (message->size);
2160   if ( (msize < sizeof (struct SearchMessage)) ||
2161        (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
2162     {
2163       GNUNET_break (0);
2164       GNUNET_SERVER_receive_done (client,
2165                                   GNUNET_SYSERR);
2166       return;
2167     }
2168   sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
2169   sm = (const struct SearchMessage*) message;
2170   GNUNET_SERVER_client_keep (client);
2171   lgc = GNUNET_malloc (sizeof (struct LocalGetContext));
2172   if  (sc > 0)
2173     {
2174       lgc->results_used = sc;
2175       GNUNET_array_grow (lgc->results,
2176                          lgc->results_size,
2177                          sc * 2);
2178       memcpy (lgc->results,
2179               &sm[1],
2180               sc * sizeof (GNUNET_HashCode));
2181     }
2182   lgc->client = client;
2183   lgc->type = ntohl (sm->type);
2184   lgc->anonymity_level = ntohl (sm->anonymity_level);
2185   switch (lgc->type)
2186     {
2187     case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
2188     case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
2189       lgc->target.hashPubKey = sm->target;
2190       break;
2191     case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
2192       lgc->namespace = sm->target;
2193       break;
2194     default:
2195       break;
2196     }
2197   lgc->query = sm->query;
2198   GNUNET_CONTAINER_DLL_insert (lgc_head, lgc_tail, lgc);
2199   lgc->req = queue_ds_request (GNUNET_TIME_UNIT_FOREVER_REL,
2200                                &transmit_local_get_ready,
2201                                lgc);
2202 }
2203
2204
2205 /**
2206  * List of handlers for the messages understood by this
2207  * service.
2208  */
2209 static struct GNUNET_SERVER_MessageHandler handlers[] = {
2210   {&handle_index_start, NULL, 
2211    GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
2212   {&handle_index_list_get, NULL, 
2213    GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
2214   {&handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
2215    sizeof (struct UnindexMessage) },
2216   {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
2217    0 },
2218   {NULL, NULL, 0, 0}
2219 };
2220
2221
2222 /**
2223  * Clean up the memory used by the PendingRequest structure (except
2224  * for the client or peer list that the request may be part of).
2225  *
2226  * @param pr request to clean up
2227  */
2228 static void
2229 destroy_pending_request (struct PendingRequest *pr)
2230 {
2231   struct PendingReply *reply;
2232   struct ClientList *cl;
2233
2234   GNUNET_CONTAINER_multihashmap_remove (requests_by_query,
2235                                         &pr->query,
2236                                         pr);
2237   // FIXME: not sure how this can work (efficiently)
2238   // also, what does the return value mean?
2239   if (pr->client == NULL)
2240     {
2241       GNUNET_CONTAINER_heap_remove_node (requests_by_expiration,
2242                                          pr);
2243     }
2244   else
2245     {
2246       cl = pr->crl_entry->cl;
2247       GNUNET_CONTAINER_DLL_remove (cl->head,
2248                                    cl->tail,
2249                                    pr->crl_entry);
2250     }
2251   if (GNUNET_SCHEDULER_NO_TASK != pr->task)
2252     GNUNET_SCHEDULER_cancel (sched, pr->task);
2253   if (NULL != pr->cth)
2254     GNUNET_CORE_notify_transmit_ready_cancel (pr->cth);
2255   if (NULL != pr->bf)
2256     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
2257   if (NULL != pr->th)
2258     GNUNET_CONNECTION_notify_transmit_ready_cancel (pr->th);
2259   while (NULL != (reply = pr->replies_pending))
2260     {
2261       pr->replies_pending = reply->next;
2262       GNUNET_free (reply);
2263     }
2264   GNUNET_PEER_change_rc (pr->source_pid, -1);
2265   GNUNET_PEER_change_rc (pr->target_pid, -1);
2266   GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
2267   GNUNET_free_non_null (pr->used_pids);
2268   GNUNET_free_non_null (pr->replies_seen);
2269   GNUNET_free_non_null (pr->namespace);
2270   GNUNET_free (pr);
2271 }
2272
2273
2274 /**
2275  * A client disconnected.  Remove all of its pending queries.
2276  *
2277  * @param cls closure, NULL
2278  * @param client identification of the client
2279  */
2280 static void
2281 handle_client_disconnect (void *cls,
2282                           struct GNUNET_SERVER_Client
2283                           * client)
2284 {
2285   struct LocalGetContext *lgc;
2286   struct ClientList *cpos;
2287   struct ClientList *cprev;
2288   struct ClientRequestList *rl;
2289
2290   lgc = lgc_head;
2291   while ( (NULL != lgc) &&
2292           (lgc->client != client) )
2293     lgc = lgc->next;
2294   if (lgc != NULL)
2295     local_get_context_free (lgc);
2296   cprev = NULL;
2297   cpos = clients;
2298   while ( (NULL != cpos) &&
2299           (clients->client != client) )
2300     {
2301       cprev = cpos;
2302       cpos = cpos->next;
2303     }
2304   if (cpos != NULL)
2305     {
2306       if (cprev == NULL)
2307         clients = cpos->next;
2308       else
2309         cprev->next = cpos->next;
2310       while (NULL != (rl = cpos->head))
2311         {
2312           cpos->head = rl->next;
2313           destroy_pending_request (rl->req);
2314           GNUNET_free (rl);
2315         }
2316       GNUNET_free (cpos);
2317     }
2318 }
2319
2320
2321 /**
2322  * Iterator over entries in the "requests_by_query" map
2323  * that frees all the entries.
2324  *
2325  * @param cls closure, NULL
2326  * @param key current key code (the query, unused) 
2327  * @param value value in the hash map, of type "struct PendingRequest*"
2328  * @return GNUNET_YES (we should continue to  iterate)
2329  */
2330 static int 
2331 destroy_pending_request_cb (void *cls,
2332                             const GNUNET_HashCode * key,
2333                             void *value)
2334 {
2335   struct PendingRequest *pr = value;
2336
2337   destroy_pending_request (pr);
2338   return GNUNET_YES;
2339 }
2340
2341
2342 /**
2343  * Task run during shutdown.
2344  *
2345  * @param cls unused
2346  * @param tc unused
2347  */
2348 static void
2349 shutdown_task (void *cls,
2350                const struct GNUNET_SCHEDULER_TaskContext *tc)
2351 {
2352   struct IndexInfo *pos;  
2353
2354   if (NULL != core)
2355     GNUNET_CORE_disconnect (core);
2356   GNUNET_DATASTORE_disconnect (dsh,
2357                                GNUNET_NO);
2358   dsh = NULL;
2359   GNUNET_CONTAINER_multihashmap_iterate (requests_by_query,
2360                                          &destroy_pending_request_cb,
2361                                          NULL);
2362   while (clients != NULL)
2363     handle_client_disconnect (NULL,
2364                               clients->client);
2365   GNUNET_CONTAINER_multihashmap_destroy (requests_by_query);
2366   requests_by_query = NULL;
2367   GNUNET_CONTAINER_multihashmap_destroy (requests_by_peer);
2368   requests_by_peer = NULL;
2369   GNUNET_CONTAINER_heap_destroy (requests_by_expiration);
2370   requests_by_expiration = NULL;
2371   // FIXME: iterate over entries and free individually?
2372   // (or do we get disconnect notifications?)
2373   GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
2374   connected_peers = NULL;
2375   GNUNET_CONTAINER_multihashmap_destroy (ifm);
2376   ifm = NULL;
2377   while (NULL != (pos = indexed_files))
2378     {
2379       indexed_files = pos->next;
2380       GNUNET_free (pos);
2381     }
2382 }
2383
2384
2385 /**
2386  * Free (each) request made by the peer.
2387  *
2388  * @param cls closure, points to peer that the request belongs to
2389  * @param key current key code
2390  * @param value value in the hash map
2391  * @return GNUNET_YES (we should continue to iterate)
2392  */
2393 static int
2394 destroy_request (void *cls,
2395                  const GNUNET_HashCode * key,
2396                  void *value)
2397 {
2398   const struct GNUNET_PeerIdentity * peer = cls;
2399   struct PendingRequest *pr = value;
2400   
2401   GNUNET_CONTAINER_multihashmap_remove (requests_by_peer,
2402                                         &peer->hashPubKey,
2403                                         pr);
2404   destroy_pending_request (pr);
2405   return GNUNET_YES;
2406 }
2407
2408
2409
2410 /**
2411  * Method called whenever a given peer connects.
2412  *
2413  * @param cls closure, not used
2414  * @param peer peer identity this notification is about
2415  */
2416 static void 
2417 peer_connect_handler (void *cls,
2418                       const struct
2419                       GNUNET_PeerIdentity * peer)
2420 {
2421   struct ConnectedPeer *cp;
2422
2423   cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
2424   cp->pid = GNUNET_PEER_intern (peer);
2425   GNUNET_CONTAINER_multihashmap_put (connected_peers,
2426                                      &peer->hashPubKey,
2427                                      cp,
2428                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
2429 }
2430
2431
2432 /**
2433  * Method called whenever a peer disconnects.
2434  *
2435  * @param cls closure, not used
2436  * @param peer peer identity this notification is about
2437  */
2438 static void
2439 peer_disconnect_handler (void *cls,
2440                          const struct
2441                          GNUNET_PeerIdentity * peer)
2442 {
2443   struct ConnectedPeer *cp;
2444
2445   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2446                                           &peer->hashPubKey);
2447   GNUNET_PEER_change_rc (cp->pid, -1);
2448   GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
2449   GNUNET_free (cp);
2450   GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_peer,
2451                                               &peer->hashPubKey,
2452                                               &destroy_request,
2453                                               (void*) peer);
2454 }
2455
2456
2457 /**
2458  * We're processing a GET request from
2459  * another peer and have decided to forward
2460  * it to other peers.
2461  *
2462  * @param cls our "struct ProcessGetContext *"
2463  * @param tc unused
2464  */
2465 static void
2466 forward_get_request (void *cls,
2467                      const struct GNUNET_SCHEDULER_TaskContext *tc)
2468 {
2469   struct ProcessGetContext *pgc = cls;
2470   struct PendingRequest *pr;
2471   struct PendingRequest *eer;
2472   struct GNUNET_PeerIdentity target;
2473
2474   pr = GNUNET_malloc (sizeof (struct PendingRequest));
2475   if (GET_MESSAGE_BIT_SKS_NAMESPACE == (GET_MESSAGE_BIT_SKS_NAMESPACE & pgc->bm))
2476     {
2477       pr->namespace = GNUNET_malloc (sizeof(GNUNET_HashCode));
2478       *pr->namespace = pgc->namespace;
2479     }
2480   pr->bf = pgc->bf;
2481   pr->bf_size = pgc->bf_size;
2482   pgc->bf = NULL;
2483   pr->start_time = pgc->start_time;
2484   pr->query = pgc->query;
2485   pr->source_pid = GNUNET_PEER_intern (&pgc->reply_to);
2486   if (GET_MESSAGE_BIT_TRANSMIT_TO == (GET_MESSAGE_BIT_TRANSMIT_TO & pgc->bm))
2487     pr->target_pid = GNUNET_PEER_intern (&pgc->prime_target);
2488   pr->anonymity_level = 1; /* default */
2489   pr->priority = pgc->priority;
2490   pr->remaining_priority = pr->priority;
2491   pr->mingle = pgc->mingle;
2492   pr->ttl = pgc->ttl; 
2493   pr->type = pgc->type;
2494   GNUNET_CONTAINER_multihashmap_put (requests_by_query,
2495                                      &pr->query,
2496                                      pr,
2497                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
2498   GNUNET_CONTAINER_multihashmap_put (requests_by_peer,
2499                                      &pgc->reply_to.hashPubKey,
2500                                      pr,
2501                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
2502   GNUNET_CONTAINER_heap_insert (requests_by_expiration,
2503                                 pr,
2504                                 pr->start_time.value + pr->ttl);
2505   if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) > max_pending_requests)
2506     {
2507       /* expire oldest request! */
2508       eer = GNUNET_CONTAINER_heap_peek (requests_by_expiration);
2509       GNUNET_PEER_resolve (eer->source_pid,
2510                            &target);    
2511       GNUNET_CONTAINER_multihashmap_remove (requests_by_peer,
2512                                             &target.hashPubKey,
2513                                             eer);
2514       destroy_pending_request (eer);     
2515     }
2516   pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2517                                            GNUNET_NO,
2518                                            GNUNET_SCHEDULER_PRIORITY_IDLE,
2519                                            GNUNET_SCHEDULER_NO_TASK,
2520                                            get_processing_delay (),
2521                                            &forward_request_task,
2522                                            pr);
2523   GNUNET_free (pgc); 
2524 }
2525 /**
2526  * Transmit the given message by copying it to
2527  * the target buffer "buf".  "buf" will be
2528  * NULL and "size" zero if the socket was closed for
2529  * writing in the meantime.  In that case, only
2530
2531  * free the message
2532  *
2533  * @param cls closure, pointer to the message
2534  * @param size number of bytes available in buf
2535  * @param buf where the callee should write the message
2536  * @return number of bytes written to buf
2537  */
2538 static size_t
2539 transmit_message (void *cls,
2540                   size_t size, void *buf)
2541 {
2542   struct GNUNET_MessageHeader *msg = cls;
2543   uint16_t msize;
2544   
2545   if (NULL == buf)
2546     {
2547 #if DEBUG_FS
2548       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2549                   "Dropping reply, core too busy.\n");
2550 #endif
2551       GNUNET_free (msg);
2552       return 0;
2553     }
2554   msize = ntohs (msg->size);
2555   GNUNET_assert (size >= msize);
2556   memcpy (buf, msg, msize);
2557   GNUNET_free (msg);
2558   return msize;
2559 }
2560
2561
2562 /**
2563  * Test if the load on this peer is too high
2564  * to even consider processing the query at
2565  * all.
2566  * 
2567  * @return GNUNET_YES if the load is too high, GNUNET_NO otherwise
2568  */
2569 static int
2570 test_load_too_high ()
2571 {
2572   return GNUNET_NO; // FIXME
2573 }
2574
2575
2576 /**
2577  * We're processing (local) results for a search request
2578  * from another peer.  Pass applicable results to the
2579  * peer and if we are done either clean up (operation
2580  * complete) or forward to other peers (more results possible).
2581  *
2582  * @param cls our closure (struct LocalGetContext)
2583  * @param key key for the content
2584  * @param size number of bytes in data
2585  * @param data content stored
2586  * @param type type of the content
2587  * @param priority priority of the content
2588  * @param anonymity anonymity-level for the content
2589  * @param expiration expiration time for the content
2590  * @param uid unique identifier for the datum;
2591  *        maybe 0 if no unique identifier is available
2592  */
2593 static void
2594 process_p2p_get_result (void *cls,
2595                         const GNUNET_HashCode * key,
2596                         uint32_t size,
2597                         const void *data,
2598                         uint32_t type,
2599                         uint32_t priority,
2600                         uint32_t anonymity,
2601                         struct GNUNET_TIME_Absolute
2602                         expiration, 
2603                         uint64_t uid)
2604 {
2605   struct ProcessGetContext *pgc = cls;
2606   GNUNET_HashCode dhash;
2607   GNUNET_HashCode mhash;
2608   struct PutMessage *reply;
2609   
2610   if (NULL == key)
2611     {
2612       /* no more results */
2613       if ( ( (pgc->policy & ROUTING_POLICY_FORWARD) ==  ROUTING_POLICY_FORWARD) &&
2614            ( (0 == pgc->results_found) ||
2615              (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
2616              (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
2617              (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) ) )
2618         {
2619           GNUNET_SCHEDULER_add_continuation (sched,
2620                                              GNUNET_NO,
2621                                              &forward_get_request,
2622                                              pgc,
2623                                              GNUNET_SCHEDULER_REASON_PREREQ_DONE);
2624         }
2625       else
2626         {
2627           if (pgc->bf != NULL)
2628             GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2629           GNUNET_free (pgc); 
2630         }
2631       next_ds_request ();
2632       return;
2633     }
2634   if (type == GNUNET_DATASTORE_BLOCKTYPE_ONDEMAND)
2635     {
2636       handle_on_demand_block (key, size, data, type, priority, 
2637                               anonymity, expiration, uid,
2638                               &process_p2p_get_result,
2639                               pgc);
2640       return;
2641     }
2642   /* check for duplicates */
2643   GNUNET_CRYPTO_hash (data, size, &dhash);
2644   mingle_hash (&dhash, 
2645                pgc->mingle,
2646                &mhash);
2647   if ( (pgc->bf != NULL) &&
2648        (GNUNET_YES ==
2649         GNUNET_CONTAINER_bloomfilter_test (pgc->bf,
2650                                            &mhash)) )
2651     {      
2652 #if DEBUG_FS
2653       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2654                   "Result from datastore filtered by bloomfilter.\n");
2655 #endif
2656       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2657       return;
2658     }
2659   pgc->results_found++;
2660   if ( (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
2661        (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
2662        (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) )
2663     {
2664       if (pgc->bf == NULL)
2665         {
2666           pgc->bf_size = 32;
2667           pgc->bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
2668                                                        pgc->bf_size, 
2669                                                        BLOOMFILTER_K);
2670         }
2671       GNUNET_CONTAINER_bloomfilter_add (pgc->bf, 
2672                                         &mhash);
2673     }
2674
2675   reply = GNUNET_malloc (sizeof (struct PutMessage) + size);
2676   reply->header.size = htons (sizeof (struct PutMessage) + size);
2677   reply->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2678   reply->type = htonl (type);
2679   reply->expiration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (expiration));
2680   memcpy (&reply[1], data, size);
2681   GNUNET_CORE_notify_transmit_ready (core,
2682                                      pgc->priority,
2683                                      ACCEPTABLE_REPLY_DELAY,
2684                                      &pgc->reply_to,
2685                                      sizeof (struct PutMessage) + size,
2686                                      &transmit_message,
2687                                      reply);
2688   if ( (GNUNET_YES == test_load_too_high()) ||
2689        (pgc->results_found > 5 + 2 * pgc->priority) )
2690     {
2691       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
2692       pgc->policy &= ~ ROUTING_POLICY_FORWARD;
2693       return;
2694     }
2695   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2696 }
2697   
2698
2699 /**
2700  * We're processing a GET request from another peer.  Give it to our
2701  * local datastore.
2702  *
2703  * @param cls our "struct ProcessGetContext"
2704  * @param ok did we get a datastore slice or not?
2705  */
2706 static void
2707 ds_get_request (void *cls, 
2708                 int ok)
2709 {
2710   struct ProcessGetContext *pgc = cls;
2711   uint32_t type;
2712   struct GNUNET_TIME_Relative timeout;
2713
2714   if (GNUNET_OK != ok)
2715     {
2716       /* no point in doing P2P stuff if we can't even do local */
2717       GNUNET_free (dsh);
2718       return;
2719     }
2720   type = pgc->type;
2721   if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK)
2722     type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */
2723   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
2724                                            (pgc->priority + 1));
2725   GNUNET_DATASTORE_get (dsh,
2726                         &pgc->query,
2727                         type,
2728                         &process_p2p_get_result,
2729                         pgc,
2730                         timeout);
2731 }
2732
2733
2734 /**
2735  * The priority level imposes a bound on the maximum
2736  * value for the ttl that can be requested.
2737  *
2738  * @param ttl_in requested ttl
2739  * @param prio given priority
2740  * @return ttl_in if ttl_in is below the limit,
2741  *         otherwise the ttl-limit for the given priority
2742  */
2743 static int32_t
2744 bound_ttl (int32_t ttl_in, uint32_t prio)
2745 {
2746   unsigned long long allowed;
2747
2748   if (ttl_in <= 0)
2749     return ttl_in;
2750   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
2751   if (ttl_in > allowed)      
2752     {
2753       if (allowed >= (1 << 30))
2754         return 1 << 30;
2755       return allowed;
2756     }
2757   return ttl_in;
2758 }
2759
2760
2761 /**
2762  * We've received a request with the specified
2763  * priority.  Bound it according to how much
2764  * we trust the given peer.
2765  * 
2766  * @param prio_in requested priority
2767  * @param peer the peer making the request
2768  * @return effective priority
2769  */
2770 static uint32_t
2771 bound_priority (uint32_t prio_in,
2772                 const struct GNUNET_PeerIdentity *peer)
2773 {
2774   return 0; // FIXME!
2775 }
2776
2777
2778 /**
2779  * Handle P2P "GET" request.
2780  *
2781  * @param cls closure, always NULL
2782  * @param other the other peer involved (sender or receiver, NULL
2783  *        for loopback messages where we are both sender and receiver)
2784  * @param message the actual message
2785  * @return GNUNET_OK to keep the connection open,
2786  *         GNUNET_SYSERR to close it (signal serious error)
2787  */
2788 static int
2789 handle_p2p_get (void *cls,
2790                 const struct GNUNET_PeerIdentity *other,
2791                 const struct GNUNET_MessageHeader *message)
2792 {
2793   uint16_t msize;
2794   const struct GetMessage *gm;
2795   unsigned int bits;
2796   const GNUNET_HashCode *opt;
2797   struct ProcessGetContext *pgc;
2798   uint32_t bm;
2799   size_t bfsize;
2800   uint32_t ttl_decrement;
2801   double preference;
2802   int net_load_up;
2803   int net_load_down;
2804
2805   msize = ntohs(message->size);
2806   if (msize < sizeof (struct GetMessage))
2807     {
2808       GNUNET_break_op (0);
2809       return GNUNET_SYSERR;
2810     }
2811   gm = (const struct GetMessage*) message;
2812   bm = ntohl (gm->hash_bitmap);
2813   bits = 0;
2814   while (bm > 0)
2815     {
2816       if (1 == (bm & 1))
2817         bits++;
2818       bm >>= 1;
2819     }
2820   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
2821     {
2822       GNUNET_break_op (0);
2823       return GNUNET_SYSERR;
2824     }  
2825   opt = (const GNUNET_HashCode*) &gm[1];
2826   bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
2827   pgc = GNUNET_malloc (sizeof (struct ProcessGetContext));
2828   if (bfsize > 0)
2829     {
2830       pgc->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &pgc[1],
2831                                                    bfsize,
2832                                                    BLOOMFILTER_K);
2833       pgc->bf_size = bfsize;
2834     }
2835   pgc->type = ntohl (gm->type);
2836   pgc->bm = ntohl (gm->hash_bitmap);
2837   pgc->mingle = gm->filter_mutator;
2838   bits = 0;
2839   if (0 != (pgc->bm & GET_MESSAGE_BIT_RETURN_TO))
2840     pgc->reply_to.hashPubKey = opt[bits++];
2841   else
2842     pgc->reply_to = *other;
2843   if (0 != (pgc->bm & GET_MESSAGE_BIT_SKS_NAMESPACE))
2844     pgc->namespace = opt[bits++];
2845   else if (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)
2846     {
2847       GNUNET_break_op (0);
2848       GNUNET_free (pgc);
2849       return GNUNET_SYSERR;
2850     }
2851   if (0 != (pgc->bm & GET_MESSAGE_BIT_TRANSMIT_TO))
2852     pgc->prime_target.hashPubKey = opt[bits++];
2853   /* note that we can really only check load here since otherwise
2854      peers could find out that we are overloaded by being disconnected
2855      after sending us a malformed query... */
2856   if (GNUNET_YES == test_load_too_high ())
2857     {
2858       if (NULL != pgc->bf)
2859         GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2860       GNUNET_free (pgc);
2861 #if DEBUG_FS
2862       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2863                   "Dropping query from `%s', this peer is too busy.\n",
2864                   GNUNET_i2s (other));
2865 #endif
2866       return GNUNET_OK;
2867     }
2868   net_load_up = 50; // FIXME
2869   net_load_down = 50; // FIXME
2870   pgc->policy = ROUTING_POLICY_NONE;
2871   if ( (net_load_up < IDLE_LOAD_THRESHOLD) &&
2872        (net_load_down < IDLE_LOAD_THRESHOLD) )
2873     {
2874       pgc->policy |= ROUTING_POLICY_ALL;
2875       pgc->priority = 0; /* no charge */
2876     }
2877   else
2878     {
2879       pgc->priority = bound_priority (ntohl (gm->priority), other);
2880       if ( (net_load_up < 
2881             IDLE_LOAD_THRESHOLD + pgc->priority * pgc->priority) &&
2882            (net_load_down < 
2883             IDLE_LOAD_THRESHOLD + pgc->priority * pgc->priority) )
2884         {
2885           pgc->policy |= ROUTING_POLICY_ALL;
2886         }
2887       else
2888         {
2889           // FIXME: is this sound?
2890           if (net_load_up < 90 + 10 * pgc->priority)
2891             pgc->policy |= ROUTING_POLICY_FORWARD;
2892           if (net_load_down < 90 + 10 * pgc->priority)
2893             pgc->policy |= ROUTING_POLICY_ANSWER;
2894         }
2895     }
2896   if (pgc->policy == ROUTING_POLICY_NONE)
2897     {
2898 #if DEBUG_FS
2899       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2900                   "Dropping query from `%s', network saturated.\n",
2901                   GNUNET_i2s (other));
2902 #endif
2903       if (NULL != pgc->bf)
2904         GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2905       GNUNET_free (pgc);
2906       return GNUNET_OK;     /* drop */
2907     }
2908   if ((pgc->policy & ROUTING_POLICY_INDIRECT) != ROUTING_POLICY_INDIRECT)
2909     pgc->priority = 0;  /* kill the priority (we cannot benefit) */
2910   pgc->ttl = bound_ttl (ntohl (gm->ttl), pgc->priority);
2911   /* decrement ttl (always) */
2912   ttl_decrement = 2 * TTL_DECREMENT +
2913     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2914                               TTL_DECREMENT);
2915   if ( (pgc->ttl < 0) &&
2916        (pgc->ttl - ttl_decrement > 0) )
2917     {
2918 #if DEBUG_FS
2919       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2920                   "Dropping query from `%s' due to TTL underflow.\n",
2921                   GNUNET_i2s (other));
2922 #endif
2923       /* integer underflow => drop (should be very rare)! */
2924       if (NULL != pgc->bf)
2925         GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2926       GNUNET_free (pgc);
2927       return GNUNET_OK;
2928     }
2929   pgc->ttl -= ttl_decrement;
2930   pgc->start_time = GNUNET_TIME_absolute_get ();
2931   preference = (double) pgc->priority;
2932   if (preference < QUERY_BANDWIDTH_VALUE)
2933     preference = QUERY_BANDWIDTH_VALUE;
2934   // FIXME: also reserve bandwidth for reply?
2935   GNUNET_CORE_peer_configure (core,
2936                               other,
2937                               GNUNET_TIME_UNIT_FOREVER_REL,
2938                               0, 0, preference, NULL, NULL);
2939   if (0 != (pgc->policy & ROUTING_POLICY_ANSWER))
2940     pgc->drq = queue_ds_request (BASIC_DATASTORE_REQUEST_DELAY,
2941                                  &ds_get_request,
2942                                  pgc);
2943   else
2944     GNUNET_SCHEDULER_add_continuation (sched,
2945                                        GNUNET_NO,
2946                                        &forward_get_request,
2947                                        pgc,
2948                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
2949   return GNUNET_OK;
2950 }
2951
2952
2953 /**
2954  * Function called to notify us that we can now transmit a reply to a
2955  * client or peer.  "buf" will be NULL and "size" zero if the socket was
2956  * closed for writing in the meantime.
2957  *
2958  * @param cls closure, points to a "struct PendingRequest*" with
2959  *            one or more pending replies
2960  * @param size number of bytes available in buf
2961  * @param buf where the callee should write the message
2962  * @return number of bytes written to buf
2963  */
2964 static size_t
2965 transmit_result (void *cls,
2966                  size_t size, 
2967                  void *buf)
2968 {
2969   struct PendingRequest *pr = cls;
2970   char *cbuf = buf;
2971   struct PendingReply *reply;
2972   size_t ret;
2973
2974   ret = 0;
2975   while (NULL != (reply = pr->replies_pending))
2976     {
2977       if ( (reply->msize + ret < ret) ||
2978            (reply->msize + ret > size) )
2979         break;
2980       pr->replies_pending = reply->next;
2981       memcpy (&cbuf[ret], &reply[1], reply->msize);
2982       ret += reply->msize;
2983       GNUNET_free (reply);
2984     }
2985   return ret;
2986 }
2987
2988
2989 /**
2990  * Iterator over pending requests.
2991  *
2992  * @param cls response (struct ProcessReplyClosure)
2993  * @param key our query
2994  * @param value value in the hash map (meta-info about the query)
2995  * @return GNUNET_YES (we should continue to iterate)
2996  */
2997 static int
2998 process_reply (void *cls,
2999                const GNUNET_HashCode * key,
3000                void *value)
3001 {
3002   struct ProcessReplyClosure *prq = cls;
3003   struct PendingRequest *pr = value;
3004   struct PendingRequest *eer;
3005   struct PendingReply *reply;
3006   struct PutMessage *pm;
3007   struct ContentMessage *cm;
3008   GNUNET_HashCode chash;
3009   GNUNET_HashCode mhash;
3010   struct GNUNET_PeerIdentity target;
3011   size_t msize;
3012   uint32_t prio;
3013   struct GNUNET_TIME_Relative max_delay;
3014   
3015   GNUNET_CRYPTO_hash (prq->data,
3016                       prq->size,
3017                       &chash);
3018   switch (prq->type)
3019     {
3020     case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
3021     case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
3022       break;
3023     case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
3024       /* FIXME: does prq->namespace match our expectations? */
3025       /* then: fall-through??? */
3026     case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
3027       if (pr->bf != NULL) 
3028         {
3029           mingle_hash (&chash, pr->mingle, &mhash);
3030           if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (pr->bf,
3031                                                                &mhash))
3032             return GNUNET_YES; /* duplicate */
3033           GNUNET_CONTAINER_bloomfilter_add (pr->bf,
3034                                             &mhash);
3035         }
3036       break;
3037     case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK:
3038       // FIXME: any checks against duplicates for SKBlocks?
3039       break;
3040     }
3041   prio = pr->priority;
3042   prq->priority += pr->remaining_priority;
3043   pr->remaining_priority = 0;
3044   if (pr->client != NULL)
3045     {
3046       if (pr->replies_seen_size == pr->replies_seen_off)
3047         GNUNET_array_grow (pr->replies_seen,
3048                            pr->replies_seen_size,
3049                            pr->replies_seen_size * 2 + 4);
3050       pr->replies_seen[pr->replies_seen_off++] = chash;
3051       // FIXME: possibly recalculate BF!
3052     }
3053   if (pr->client == NULL)
3054     {
3055       msize = sizeof (struct ContentMessage) + prq->size;
3056       reply = GNUNET_malloc (msize + sizeof (struct PendingReply));
3057       reply->msize = msize;
3058       cm = (struct ContentMessage*) &reply[1];
3059       cm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT);
3060       cm->header.size = htons (msize);
3061       cm->type = htonl (prq->type);
3062       cm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3063       reply->next = pr->replies_pending;
3064       pr->replies_pending = reply;
3065       memcpy (&reply[1], prq->data, prq->size);
3066       if (pr->cth != NULL)
3067         return GNUNET_YES;
3068       max_delay = GNUNET_TIME_UNIT_FOREVER_REL;
3069       if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) >= max_pending_requests)
3070         {
3071           /* estimate expiration time from time difference between
3072              first request that will be discarded and this request */
3073           eer = GNUNET_CONTAINER_heap_peek (requests_by_expiration);
3074           max_delay = GNUNET_TIME_absolute_get_difference (pr->start_time,
3075                                                            eer->start_time);
3076         }
3077       GNUNET_PEER_resolve (pr->source_pid,
3078                            &target);
3079       pr->cth = GNUNET_CORE_notify_transmit_ready (core,
3080                                                    prio,
3081                                                    max_delay,
3082                                                    &target,
3083                                                    msize,
3084                                                    &transmit_result,
3085                                                    pr);
3086       if (NULL == pr->cth)
3087         {
3088           // FIXME: now what? discard?
3089         }
3090     }
3091   else
3092     {
3093       msize = sizeof (struct PutMessage) + prq->size;
3094       reply = GNUNET_malloc (msize + sizeof (struct PendingReply));
3095       reply->msize = msize;
3096       reply->next = pr->replies_pending;
3097       pm = (struct PutMessage*) &reply[1];
3098       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3099       pm->header.size = htons (msize);
3100       pm->type = htonl (prq->type);
3101       pm->expiration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (prq->expiration));
3102       pr->replies_pending = reply;
3103       memcpy (&reply[1], prq->data, prq->size);
3104       if (pr->th != NULL)
3105         return GNUNET_YES;
3106       pr->th = GNUNET_SERVER_notify_transmit_ready (pr->client,
3107                                                     msize,
3108                                                     GNUNET_TIME_UNIT_FOREVER_REL,
3109                                                     &transmit_result,
3110                                                     pr);
3111       if (pr->th == NULL)
3112         {
3113           // FIXME: need to try again later (not much
3114           // to do here specifically, but we need to
3115           // check somewhere else to handle this case!)
3116         }
3117     }
3118   // FIXME: implement hot-path routing statistics keeping!
3119   return GNUNET_YES;
3120 }
3121
3122
3123 /**
3124  * Check if the given KBlock is well-formed.
3125  *
3126  * @param kb the kblock data (or at least "dsize" bytes claiming to be one)
3127  * @param dsize size of "kb" in bytes; check for < sizeof(struct KBlock)!
3128  * @param query where to store the query that this block answers
3129  * @return GNUNET_OK if this is actually a well-formed KBlock
3130  */
3131 static int
3132 check_kblock (const struct KBlock *kb,
3133               size_t dsize,
3134               GNUNET_HashCode *query)
3135 {
3136   if (dsize < sizeof (struct KBlock))
3137     {
3138       GNUNET_break_op (0);
3139       return GNUNET_SYSERR;
3140     }
3141   if (dsize - sizeof (struct KBlock) !=
3142       ntohs (kb->purpose.size) 
3143       - sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) 
3144       - sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) ) 
3145     {
3146       GNUNET_break_op (0);
3147       return GNUNET_SYSERR;
3148     }
3149   if (GNUNET_OK !=
3150       GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_KBLOCK,
3151                                 &kb->purpose,
3152                                 &kb->signature,
3153                                 &kb->keyspace)) 
3154     {
3155       GNUNET_break_op (0);
3156       return GNUNET_SYSERR;
3157     }
3158   if (query != NULL)
3159     GNUNET_CRYPTO_hash (&kb->keyspace,
3160                         sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
3161                         query);
3162   return GNUNET_OK;
3163 }
3164
3165
3166 /**
3167  * Check if the given SBlock is well-formed.
3168  *
3169  * @param sb the sblock data (or at least "dsize" bytes claiming to be one)
3170  * @param dsize size of "kb" in bytes; check for < sizeof(struct SBlock)!
3171  * @param query where to store the query that this block answers
3172  * @param namespace where to store the namespace that this block belongs to
3173  * @return GNUNET_OK if this is actually a well-formed SBlock
3174  */
3175 static int
3176 check_sblock (const struct SBlock *sb,
3177               size_t dsize,
3178               GNUNET_HashCode *query,   
3179               GNUNET_HashCode *namespace)
3180 {
3181   if (dsize < sizeof (struct SBlock))
3182     {
3183       GNUNET_break_op (0);
3184       return GNUNET_SYSERR;
3185     }
3186   if (dsize !=
3187       ntohs (sb->purpose.size) + sizeof (struct GNUNET_CRYPTO_RsaSignature))
3188     {
3189       GNUNET_break_op (0);
3190       return GNUNET_SYSERR;
3191     }
3192   if (GNUNET_OK !=
3193       GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_SBLOCK,
3194                                 &sb->purpose,
3195                                 &sb->signature,
3196                                 &sb->subspace)) 
3197     {
3198       GNUNET_break_op (0);
3199       return GNUNET_SYSERR;
3200     }
3201   if (query != NULL)
3202     *query = sb->identifier;
3203   if (namespace != NULL)
3204     GNUNET_CRYPTO_hash (&sb->subspace,
3205                         sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
3206                         namespace);
3207   return GNUNET_OK;
3208 }
3209
3210
3211 /**
3212  * Handle P2P "PUT" request.
3213  *
3214  * @param cls closure, always NULL
3215  * @param other the other peer involved (sender or receiver, NULL
3216  *        for loopback messages where we are both sender and receiver)
3217  * @param message the actual message
3218  * @return GNUNET_OK to keep the connection open,
3219  *         GNUNET_SYSERR to close it (signal serious error)
3220  */
3221 static int
3222 handle_p2p_put (void *cls,
3223                 const struct GNUNET_PeerIdentity *other,
3224                 const struct GNUNET_MessageHeader *message)
3225 {
3226   const struct PutMessage *put;
3227   uint16_t msize;
3228   size_t dsize;
3229   uint32_t type;
3230   struct GNUNET_TIME_Absolute expiration;
3231   GNUNET_HashCode query;
3232   struct ProcessReplyClosure prq;
3233
3234   msize = ntohs (message->size);
3235   if (msize < sizeof (struct PutMessage))
3236     {
3237       GNUNET_break_op(0);
3238       return GNUNET_SYSERR;
3239     }
3240   put = (const struct PutMessage*) message;
3241   dsize = msize - sizeof (struct PutMessage);
3242   type = ntohl (put->type);
3243   expiration = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (put->expiration));
3244
3245   /* first, validate! */
3246   switch (type)
3247     {
3248     case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
3249     case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
3250       GNUNET_CRYPTO_hash (&put[1], dsize, &query);
3251       break;
3252     case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
3253       if (GNUNET_OK !=
3254           check_kblock ((const struct KBlock*) &put[1],
3255                         dsize,
3256                         &query))
3257         return GNUNET_SYSERR;
3258       break;
3259     case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
3260       if (GNUNET_OK !=
3261           check_sblock ((const struct SBlock*) &put[1],
3262                         dsize,
3263                         &query,
3264                         &prq.namespace))
3265         return GNUNET_SYSERR;
3266       break;
3267     case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK:
3268       // FIXME -- validate SKBLOCK!
3269       GNUNET_break (0);
3270       return GNUNET_OK;
3271     default:
3272       /* unknown block type */
3273       GNUNET_break_op (0);
3274       return GNUNET_SYSERR;
3275     }
3276
3277   /* now, lookup 'query' */
3278   prq.data = (const void*) &put[1];
3279   prq.size = dsize;
3280   prq.type = type;
3281   prq.expiration = expiration;
3282   prq.priority = 0;
3283   GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_query,
3284                                               &query,
3285                                               &process_reply,
3286                                               &prq);
3287   // FIXME: if migration is on and load is low,
3288   // queue to store data in datastore;
3289   // use "prq.priority" for that!
3290   return GNUNET_OK;
3291 }
3292
3293
3294 /**
3295  * List of handlers for P2P messages
3296  * that we care about.
3297  */
3298 static struct GNUNET_CORE_MessageHandler p2p_handlers[] =
3299   {
3300     { &handle_p2p_get, 
3301       GNUNET_MESSAGE_TYPE_FS_GET, 0 },
3302     { &handle_p2p_put, 
3303       GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
3304     { NULL, 0, 0 }
3305   };
3306
3307
3308 /**
3309  * Task that will try to initiate a connection with the
3310  * core service.
3311  * 
3312  * @param cls unused
3313  * @param tc unused
3314  */
3315 static void
3316 core_connect_task (void *cls,
3317                    const struct GNUNET_SCHEDULER_TaskContext *tc);
3318
3319
3320 /**
3321  * Function called by the core after we've
3322  * connected.
3323  *
3324  * @param cls closure, unused
3325  * @param server handle to the core service
3326  * @param my_identity our peer identity (unused)
3327  * @param publicKey our public key (unused)
3328  */
3329 static void
3330 core_start_cb (void *cls,
3331                struct GNUNET_CORE_Handle * server,
3332                const struct GNUNET_PeerIdentity *
3333                my_identity,
3334                const struct
3335                GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *
3336                publicKey)
3337 {
3338   if (server == NULL)
3339     {
3340       GNUNET_SCHEDULER_add_delayed (sched,
3341                                     GNUNET_NO,
3342                                     GNUNET_SCHEDULER_PRIORITY_HIGH,
3343                                     GNUNET_SCHEDULER_NO_TASK,
3344                                     GNUNET_TIME_UNIT_SECONDS,
3345                                     &core_connect_task,
3346                                     NULL);
3347       return;
3348     }
3349   core = server;
3350 }
3351
3352
3353 /**
3354  * Task that will try to initiate a connection with the
3355  * core service.
3356  * 
3357  * @param cls unused
3358  * @param tc unused
3359  */
3360 static void
3361 core_connect_task (void *cls,
3362                    const struct GNUNET_SCHEDULER_TaskContext *tc)
3363 {
3364   GNUNET_CORE_connect (sched,
3365                        cfg,
3366                        GNUNET_TIME_UNIT_FOREVER_REL,
3367                        NULL,
3368                        &core_start_cb,
3369                        &peer_connect_handler,
3370                        &peer_disconnect_handler,
3371                        NULL, 
3372                        NULL, GNUNET_NO,
3373                        NULL, GNUNET_NO,
3374                        p2p_handlers);
3375 }
3376
3377
3378 /**
3379  * Process fs requests.
3380  *
3381  * @param cls closure
3382  * @param s scheduler to use
3383  * @param server the initialized server
3384  * @param c configuration to use
3385  */
3386 static void
3387 run (void *cls,
3388      struct GNUNET_SCHEDULER_Handle *s,
3389      struct GNUNET_SERVER_Handle *server,
3390      const struct GNUNET_CONFIGURATION_Handle *c)
3391 {
3392   sched = s;
3393   cfg = c;
3394
3395   ifm = GNUNET_CONTAINER_multihashmap_create (128);
3396   requests_by_query = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
3397   requests_by_peer = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
3398   connected_peers = GNUNET_CONTAINER_multihashmap_create (64);
3399   requests_by_expiration = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
3400   read_index_list ();
3401   dsh = GNUNET_DATASTORE_connect (cfg,
3402                                   sched);
3403   if (NULL == dsh)
3404     {
3405       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3406                   _("Failed to connect to datastore service.\n"));
3407       return;
3408     }
3409   GNUNET_SERVER_disconnect_notify (server, 
3410                                    &handle_client_disconnect,
3411                                    NULL);
3412   GNUNET_SERVER_add_handlers (server, handlers);
3413   core_connect_task (NULL, NULL);
3414   GNUNET_SCHEDULER_add_delayed (sched,
3415                                 GNUNET_YES,
3416                                 GNUNET_SCHEDULER_PRIORITY_IDLE,
3417                                 GNUNET_SCHEDULER_NO_TASK,
3418                                 GNUNET_TIME_UNIT_FOREVER_REL,
3419                                 &shutdown_task,
3420                                 NULL);
3421 }
3422
3423
3424 /**
3425  * The main function for the fs service.
3426  *
3427  * @param argc number of arguments from the command line
3428  * @param argv command line arguments
3429  * @return 0 ok, 1 on error
3430  */
3431 int
3432 main (int argc, char *const *argv)
3433 {
3434   return (GNUNET_OK ==
3435           GNUNET_SERVICE_run (argc,
3436                               argv,
3437                               "fs", &run, NULL, NULL, NULL)) ? 0 : 1;
3438 }
3439
3440 /* end of gnunet-service-fs.c */