fixing bugs, diagnosis summary
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs.c
23  * @brief program that provides the file-sharing service
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - fix gazillion of minor FIXME's
28  * - possible major issue: we may queue "gazillions" of (K|S)Blocks for the
29  *   core to transmit to another peer; need to make sure this is bounded overall...
30  * - randomly delay processing for improved anonymity (can wait)
31  * - content migration (put in local DS) (can wait)
32  * - handle some special cases when forwarding replies based on tracked requests (can wait)
33  * - tracking of success correlations for hot-path routing (can wait)
34  * - various load-based actions (can wait)
35  * - validation of KSBLOCKS (can wait)
36  * - remove on-demand blocks if they keep failing (can wait)
37  * - check that we decrement PIDs always where necessary (can wait)
38  * - find out how to use core-pulling instead of pushing (at least for some cases)
39  */
40 #include "platform.h"
41 #include <float.h>
42 #include "gnunet_core_service.h"
43 #include "gnunet_datastore_service.h"
44 #include "gnunet_peer_lib.h"
45 #include "gnunet_protocols.h"
46 #include "gnunet_signatures.h"
47 #include "gnunet_util_lib.h"
48 #include "fs.h"
49
50 #define DEBUG_FS GNUNET_YES
51
52
53 /**
54  * In-memory information about indexed files (also available
55  * on-disk).
56  */
57 struct IndexInfo
58 {
59   
60   /**
61    * This is a linked list.
62    */
63   struct IndexInfo *next;
64
65   /**
66    * Name of the indexed file.  Memory allocated
67    * at the end of this struct (do not free).
68    */
69   const char *filename;
70
71   /**
72    * Context for transmitting confirmation to client,
73    * NULL if we've done this already.
74    */
75   struct GNUNET_SERVER_TransmitContext *tc;
76   
77   /**
78    * Hash of the contents of the file.
79    */
80   GNUNET_HashCode file_id;
81
82 };
83
84
85 /**
86  * Signature of a function that is called whenever a datastore
87  * request can be processed (or an entry put on the queue times out).
88  *
89  * @param cls closure
90  * @param ok GNUNET_OK if DS is ready, GNUNET_SYSERR on timeout
91  */
92 typedef void (*RequestFunction)(void *cls,
93                                 int ok);
94
95
96 /**
97  * Doubly-linked list of our requests for the datastore.
98  */
99 struct DatastoreRequestQueue
100 {
101
102   /**
103    * This is a doubly-linked list.
104    */
105   struct DatastoreRequestQueue *next;
106
107   /**
108    * This is a doubly-linked list.
109    */
110   struct DatastoreRequestQueue *prev;
111
112   /**
113    * Function to call (will issue the request).
114    */
115   RequestFunction req;
116
117   /**
118    * Closure for req.
119    */
120   void *req_cls;
121
122   /**
123    * When should this request time-out because we don't care anymore?
124    */
125   struct GNUNET_TIME_Absolute timeout;
126     
127   /**
128    * ID of task used for signaling timeout.
129    */
130   GNUNET_SCHEDULER_TaskIdentifier task;
131
132 };
133
134
135 /**
136  * Closure for processing START_SEARCH messages from a client.
137  */
138 struct LocalGetContext
139 {
140
141   /**
142    * This is a doubly-linked list.
143    */
144   struct LocalGetContext *next;
145
146   /**
147    * This is a doubly-linked list.
148    */
149   struct LocalGetContext *prev;
150
151   /**
152    * Client that initiated the search.
153    */
154   struct GNUNET_SERVER_Client *client;
155
156   /**
157    * Array of results that we've already received 
158    * (can be NULL).
159    */
160   GNUNET_HashCode *results; 
161
162   /**
163    * Bloomfilter over all results (for fast query construction);
164    * NULL if we don't have any results.
165    *
166    * FIXME: this member is not used, is that OK? If so, it should
167    * be removed!
168    */
169   struct GNUNET_CONTAINER_BloomFilter *results_bf; 
170
171   /**
172    * DS request associated with this operation.
173    */
174   struct DatastoreRequestQueue *req;
175
176   /**
177    * Current result message to transmit to client (or NULL).
178    */
179   struct ContentMessage *result;
180   
181   /**
182    * Type of the content that we're looking for.
183    * 0 for any.
184    */
185   uint32_t type;
186
187   /**
188    * Desired anonymity level.
189    */
190   uint32_t anonymity_level;
191
192   /**
193    * Number of results actually stored in the results array.
194    */
195   unsigned int results_used;
196   
197   /**
198    * Size of the results array in memory.
199    */
200   unsigned int results_size;
201   
202   /**
203    * Size (in bytes) of the 'results_bf' bloomfilter.
204    *
205    * FIXME: this member is not used, is that OK? If so, it should
206    * be removed!
207    */
208   size_t results_bf_size;
209
210   /**
211    * If the request is for a DBLOCK or IBLOCK, this is the identity of
212    * the peer that is known to have a response.  Set to all-zeros if
213    * such a target is not known (note that even if OUR anonymity
214    * level is >0 we may happen to know the responder's identity;
215    * nevertheless, we should probably not use it for a DHT-lookup
216    * or similar blunt actions in order to avoid exposing ourselves).
217    */
218   struct GNUNET_PeerIdentity target;
219
220   /**
221    * If the request is for an SBLOCK, this is the identity of the
222    * pseudonym to which the SBLOCK belongs. 
223    */
224   GNUNET_HashCode namespace;
225
226   /**
227    * Hash of the keyword (aka query) for KBLOCKs; Hash of
228    * the CHK-encoded block for DBLOCKS and IBLOCKS (aka query)
229    * and hash of the identifier XORed with the target for
230    * SBLOCKS (aka query).
231    */
232   GNUNET_HashCode query;
233
234 };
235
236
237 /**
238  * Possible routing policies for an FS-GET request.
239  */
240 enum RoutingPolicy
241   {
242     /**
243      * Simply drop the request.
244      */
245     ROUTING_POLICY_NONE = 0,
246     
247     /**
248      * Answer it if we can from local datastore.
249      */
250     ROUTING_POLICY_ANSWER = 1,
251
252     /**
253      * Forward the request to other peers (if possible).
254      */
255     ROUTING_POLICY_FORWARD = 2,
256
257     /**
258      * Forward to other peers, and ask them to route
259      * the response via ourselves.
260      */
261     ROUTING_POLICY_INDIRECT = 6,
262     
263     /**
264      * Do everything we could possibly do (that would
265      * make sense).
266      */
267     ROUTING_POLICY_ALL = 7
268   };
269
270
271 /**
272  * Internal context we use for our initial processing
273  * of a GET request.
274  */
275 struct ProcessGetContext
276 {
277   /**
278    * The search query (used for datastore lookup).
279    */
280   GNUNET_HashCode query;
281   
282   /**
283    * Which peer we should forward the response to.
284    */
285   struct GNUNET_PeerIdentity reply_to;
286
287   /**
288    * Namespace for the result (only set for SKS requests)
289    */
290   GNUNET_HashCode namespace;
291
292   /**
293    * Peer that we should forward the query to if possible
294    * (since that peer likely has the content).
295    */
296   struct GNUNET_PeerIdentity prime_target;
297
298   /**
299    * When did we receive this request?
300    */
301   struct GNUNET_TIME_Absolute start_time;
302
303   /**
304    * Our entry in the DRQ (non-NULL while we wait for our
305    * turn to interact with the local database).
306    */
307   struct DatastoreRequestQueue *drq;
308
309   /**
310    * Filter used to eliminate duplicate results.  Can be NULL if we
311    * are not yet filtering any results.
312    */
313   struct GNUNET_CONTAINER_BloomFilter *bf;
314
315   /**
316    * Bitmap describing which of the optional
317    * hash codes / peer identities were given to us.
318    */
319   uint32_t bm;
320
321   /**
322    * Desired block type.
323    */
324   uint32_t type;
325
326   /**
327    * Priority of the request.
328    */
329   uint32_t priority;
330
331   /**
332    * Size of the 'bf' (in bytes).
333    */
334   size_t bf_size;
335
336   /**
337    * In what ways are we going to process
338    * the request?
339    */
340   enum RoutingPolicy policy;
341
342   /**
343    * Time-to-live for the request (value
344    * we use).
345    */
346   int32_t ttl;
347
348   /**
349    * Number to mingle hashes for bloom-filter
350    * tests with.
351    */
352   int32_t mingle;
353
354   /**
355    * Number of results that were found so far.
356    */
357   unsigned int results_found;
358 };
359
360
361 /**
362  * Information we keep for each pending reply.
363  */
364 struct PendingReply
365 {
366   /**
367    * This is a linked list.
368    */
369   struct PendingReply *next;
370
371   /**
372    * Size of the reply; actual reply message follows
373    * at the end of this struct.
374    */
375   size_t msize;
376
377 };
378
379
380 /**
381  * All requests from a client are kept in a doubly-linked list.
382  */
383 struct ClientRequestList;
384
385
386 /**
387  * Information we keep for each pending request.  We should try to
388  * keep this struct as small as possible since its memory consumption
389  * is key to how many requests we can have pending at once.
390  */
391 struct PendingRequest
392 {
393
394   /**
395    * ID of a client making a request, NULL if this entry is for a
396    * peer.
397    */
398   struct GNUNET_SERVER_Client *client;
399
400   /**
401    * If this request was made by a client,
402    * this is our entry in the client request
403    * list; otherwise NULL.
404    */
405   struct ClientRequestList *crl_entry;
406
407   /**
408    * If this is a namespace query, pointer to the hash of the public
409    * key of the namespace; otherwise NULL.
410    */
411   GNUNET_HashCode *namespace;
412
413   /**
414    * Bloomfilter we use to filter out replies that we don't care about
415    * (anymore).  NULL as long as we are interested in all replies.
416    */
417   struct GNUNET_CONTAINER_BloomFilter *bf;
418
419   /**
420    * Replies that we have received but were unable to forward yet
421    * (typically non-null only if we have a pending transmission
422    * request with the client or the respective peer).
423    */
424   struct PendingReply *replies_pending;
425
426   /**
427    * Pending transmission request with the core service for the target
428    * peer (for processing of 'replies_pending') or Handle for a
429    * pending query-request for P2P-transmission with the core service.
430    * If non-NULL, this request must be cancelled should this struct be
431    * destroyed!
432    */
433   struct GNUNET_CORE_TransmitHandle *cth;
434
435   /**
436    * Pending transmission request for the target client (for processing of
437    * 'replies_pending').
438    */
439   struct GNUNET_CONNECTION_TransmitHandle *th;
440
441   /**
442    * Hash code of all replies that we have seen so far (only valid
443    * if client is not NULL since we only track replies like this for
444    * our own clients).
445    */
446   GNUNET_HashCode *replies_seen;
447
448   /**
449    * When did we first see this request (form this peer), or, if our
450    * client is initiating, when did we last initiate a search?
451    */
452   struct GNUNET_TIME_Absolute start_time;
453
454   /**
455    * The query that this request is for.
456    */
457   GNUNET_HashCode query;
458
459   /**
460    * The task responsible for transmitting queries
461    * for this request.
462    */
463   GNUNET_SCHEDULER_TaskIdentifier task;
464
465   /**
466    * (Interned) Peer identifier (only valid if "client" is NULL)
467    * that identifies a peer that gave us this request.
468    */
469   GNUNET_PEER_Id source_pid;
470
471   /**
472    * (Interned) Peer identifier that identifies a preferred target
473    * for requests.
474    */
475   GNUNET_PEER_Id target_pid;
476
477   /**
478    * (Interned) Peer identifiers of peers that have already
479    * received our query for this content.
480    */
481   GNUNET_PEER_Id *used_pids;
482
483   /**
484    * Size of the 'bf' (in bytes).
485    */
486   size_t bf_size;
487
488   /**
489    * Desired anonymity level; only valid for requests from a local client.
490    */
491   uint32_t anonymity_level;
492
493   /**
494    * How many entries in "used_pids" are actually valid?
495    */
496   unsigned int used_pids_off;
497
498   /**
499    * How long is the "used_pids" array?
500    */
501   unsigned int used_pids_size;
502
503   /**
504    * How many entries in "replies_seen" are actually valid?
505    */
506   unsigned int replies_seen_off;
507
508   /**
509    * How long is the "replies_seen" array?
510    */
511   unsigned int replies_seen_size;
512   
513   /**
514    * Priority with which this request was made.  If one of our clients
515    * made the request, then this is the current priority that we are
516    * using when initiating the request.  This value is used when
517    * we decide to reward other peers with trust for providing a reply.
518    */
519   uint32_t priority;
520
521   /**
522    * Priority points left for us to spend when forwarding this request
523    * to other peers.
524    */
525   uint32_t remaining_priority;
526
527   /**
528    * Number to mingle hashes for bloom-filter
529    * tests with.
530    */
531   int32_t mingle;
532
533   /**
534    * TTL with which we saw this request (or, if we initiated, TTL that
535    * we used for the request).
536    */
537   int32_t ttl;
538   
539   /**
540    * Type of the content that this request is for.
541    */
542   uint32_t type;
543
544 };
545
546
547 /**
548  * All requests from a client are kept in a doubly-linked list.
549  */
550 struct ClientRequestList
551 {
552   /**
553    * This is a doubly-linked list.
554    */
555   struct ClientRequestList *next;
556
557   /**
558    * This is a doubly-linked list.
559    */ 
560   struct ClientRequestList *prev;
561
562   /**
563    * A request from this client.
564    */
565   struct PendingRequest *req;
566
567   /**
568    * Client list with the head and tail of this DLL.
569    */
570   struct ClientList *cl;
571 };
572
573
574 /**
575  * Linked list of all clients that we are 
576  * currently processing requests for.
577  */
578 struct ClientList
579 {
580
581   /**
582    * This is a linked list.
583    */
584   struct ClientList *next;
585
586   /**
587    * What client is this entry for?
588    */
589   struct GNUNET_SERVER_Client* client;
590
591   /**
592    * Head of the DLL of requests from this client.
593    */
594   struct ClientRequestList *head;
595
596   /**
597    * Tail of the DLL of requests from this client.
598    */
599   struct ClientRequestList *tail;
600
601 };
602
603
604 /**
605  * Closure for "process_reply" function.
606  */
607 struct ProcessReplyClosure
608 {
609   /**
610    * The data for the reply.
611    */
612   const void *data;
613
614   /**
615    * When the reply expires.
616    */
617   struct GNUNET_TIME_Absolute expiration;
618
619   /**
620    * Size of data.
621    */
622   size_t size;
623
624   /**
625    * Namespace that this reply belongs to
626    * (if it is of type SBLOCK).
627    */
628   GNUNET_HashCode namespace;
629
630   /**
631    * Type of the block.
632    */
633   uint32_t type;
634
635   /**
636    * How much was this reply worth to us?
637    */
638   uint32_t priority;
639 };
640
641
642 /**
643  * Information about a peer that we are connected to.
644  * We track data that is useful for determining which
645  * peers should receive our requests.
646  */
647 struct ConnectedPeer
648 {
649
650   /**
651    * List of the last clients for which this peer
652    * successfully answered a query. 
653    */
654   struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
655
656   /**
657    * List of the last PIDs for which
658    * this peer successfully answered a query;
659    * We use 0 to indicate no successful reply.
660    */
661   GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
662
663   /**
664    * Average delay between sending the peer a request and
665    * getting a reply (only calculated over the requests for
666    * which we actually got a reply).   Calculated
667    * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
668    */ 
669   struct GNUNET_TIME_Relative avg_delay;
670
671   /**
672    * Average priority of successful replies.  Calculated
673    * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
674    */
675   double avg_priority;
676
677   /**
678    * The peer's identity.
679    */
680   GNUNET_PEER_Id pid;  
681
682   /**
683    * Number of requests we have currently pending
684    * with this peer (that is, requests that were
685    * transmitted so recently that we would not retransmit
686    * them right now).
687    */
688   unsigned int pending_requests;
689
690   /**
691    * Which offset in "last_p2p_replies" will be updated next?
692    * (we go round-robin).
693    */
694   unsigned int last_p2p_replies_woff;
695
696   /**
697    * Which offset in "last_client_replies" will be updated next?
698    * (we go round-robin).
699    */
700   unsigned int last_client_replies_woff;
701
702 };
703
704
705 /**
706  * Our connection to the datastore.
707  */
708 static struct GNUNET_DATASTORE_Handle *dsh;
709
710 /**
711  * Our scheduler.
712  */
713 static struct GNUNET_SCHEDULER_Handle *sched;
714
715 /**
716  * Our configuration.
717  */
718 const struct GNUNET_CONFIGURATION_Handle *cfg;
719
720 /**
721  * Handle to the core service (NULL until we've connected to it).
722  */
723 struct GNUNET_CORE_Handle *core;
724
725 /**
726  * Head of doubly-linked LGC list.
727  */
728 static struct LocalGetContext *lgc_head;
729
730 /**
731  * Tail of doubly-linked LGC list.
732  */
733 static struct LocalGetContext *lgc_tail;
734
735 /**
736  * Head of request queue for the datastore, sorted by timeout.
737  */
738 static struct DatastoreRequestQueue *drq_head;
739
740 /**
741  * Tail of request queue for the datastore.
742  */
743 static struct DatastoreRequestQueue *drq_tail;
744
745 /**
746  * Linked list of indexed files.
747  */
748 static struct IndexInfo *indexed_files;
749
750 /**
751  * Maps hash over content of indexed files to the respective filename.
752  * The filenames are pointers into the indexed_files linked list and
753  * do not need to be freed.
754  */
755 static struct GNUNET_CONTAINER_MultiHashMap *ifm;
756
757 /**
758  * Map of query hash codes to requests.
759  */
760 static struct GNUNET_CONTAINER_MultiHashMap *requests_by_query;
761
762 /**
763  * Map of peer IDs to requests (for those requests coming
764  * from other peers).
765  */
766 static struct GNUNET_CONTAINER_MultiHashMap *requests_by_peer;
767
768 /**
769  * Linked list of all of our clients and their requests.
770  */
771 static struct ClientList *clients;
772
773 /**
774  * Heap with the request that will expire next at the top.  Contains
775  * pointers of type "struct PendingRequest*"; these will *also* be
776  * aliased from the "requests_by_peer" data structures and the
777  * "requests_by_query" table.  Note that requests from our clients
778  * don't expire and are thus NOT in the "requests_by_expiration"
779  * (or the "requests_by_peer" tables).
780  */
781 static struct GNUNET_CONTAINER_Heap *requests_by_expiration;
782
783 /**
784  * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
785  */
786 static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
787
788 /**
789  * Maximum number of requests (from other peers) that we're
790  * willing to have pending at any given point in time.
791  * FIXME: set from configuration (and 32 is a tiny value for testing only).
792  */
793 static uint64_t max_pending_requests = 32;
794
795 /**
796  * Write the current index information list to disk.
797  */ 
798 static void
799 write_index_list ()
800 {
801   struct GNUNET_BIO_WriteHandle *wh;
802   char *fn;
803   struct IndexInfo *pos;  
804
805   if (GNUNET_OK !=
806       GNUNET_CONFIGURATION_get_value_filename (cfg,
807                                                "FS",
808                                                "INDEXDB",
809                                                &fn))
810     {
811       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
812                   _("Configuration option `%s' in section `%s' missing.\n"),
813                   "INDEXDB",
814                   "FS");
815       return;
816     }
817   wh = GNUNET_BIO_write_open (fn);
818   if (NULL == wh)
819     {
820       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
821                   _("Could not open `%s'.\n"),
822                   fn);
823       GNUNET_free (fn);
824       return;
825     }
826   pos = indexed_files;
827   while (pos != NULL)
828     {
829       if ( (GNUNET_OK !=
830             GNUNET_BIO_write (wh,
831                               &pos->file_id,
832                               sizeof (GNUNET_HashCode))) ||
833            (GNUNET_OK !=
834             GNUNET_BIO_write_string (wh,
835                                      pos->filename)) )
836         break;
837       pos = pos->next;
838     }
839   if (GNUNET_OK != 
840       GNUNET_BIO_write_close (wh))
841     {
842       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
843                   _("Error writing `%s'.\n"),
844                   fn);
845       GNUNET_free (fn);
846       return;
847     }
848   GNUNET_free (fn);
849 }
850
851
852 /**
853  * Read index information from disk.
854  */
855 static void
856 read_index_list ()
857 {
858   struct GNUNET_BIO_ReadHandle *rh;
859   char *fn;
860   struct IndexInfo *pos;  
861   char *fname;
862   GNUNET_HashCode hc;
863   size_t slen;
864   char *emsg;
865
866   if (GNUNET_OK !=
867       GNUNET_CONFIGURATION_get_value_filename (cfg,
868                                                "FS",
869                                                "INDEXDB",
870                                                &fn))
871     {
872       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
873                   _("Configuration option `%s' in section `%s' missing.\n"),
874                   "INDEXDB",
875                   "FS");
876       return;
877     }
878   rh = GNUNET_BIO_read_open (fn);
879   if (NULL == rh)
880     {
881       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
882                   _("Could not open `%s'.\n"),
883                   fn);
884       GNUNET_free (fn);
885       return;
886     }
887
888   while ( (GNUNET_OK ==
889            GNUNET_BIO_read (rh,
890                             "Hash of indexed file",
891                             &hc,
892                             sizeof (GNUNET_HashCode))) &&
893           (GNUNET_OK ==
894            GNUNET_BIO_read_string (rh, 
895                                    "Name of indexed file",
896                                    &fname,
897                                    1024 * 16)) )
898     {
899       slen = strlen (fname) + 1;
900       pos = GNUNET_malloc (sizeof (struct IndexInfo) + slen);
901       pos->file_id = hc;
902       pos->filename = (const char *) &pos[1];
903       memcpy (&pos[1], fname, slen);
904       if (GNUNET_SYSERR ==
905           GNUNET_CONTAINER_multihashmap_put (ifm,
906                                              &hc,
907                                              (void*) pos->filename,
908                                              GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
909         {
910           GNUNET_free (pos);
911         }
912       else
913         {
914           pos->next = indexed_files;
915           indexed_files = pos;
916         }
917     }
918   if (GNUNET_OK != 
919       GNUNET_BIO_read_close (rh, &emsg))
920     GNUNET_free (emsg);
921   GNUNET_free (fn);
922 }
923
924
925 /**
926  * We've validated the hash of the file we're about to
927  * index.  Signal success to the client and update
928  * our internal data structures.
929  *
930  * @param ii the index info entry for the request
931  */
932 static void
933 signal_index_ok (struct IndexInfo *ii)
934 {
935   if (GNUNET_SYSERR ==
936       GNUNET_CONTAINER_multihashmap_put (ifm,
937                                          &ii->file_id,
938                                          (void*) ii->filename,
939                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
940     {
941       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
942                   _("Index request received for file `%s' is indexed as `%s'.  Permitting anyway.\n"),
943                   ii->filename,
944                   (const char*) GNUNET_CONTAINER_multihashmap_get (ifm,
945                                                                    &ii->file_id));
946       GNUNET_SERVER_transmit_context_append (ii->tc,
947                                              NULL, 0,
948                                              GNUNET_MESSAGE_TYPE_FS_INDEX_START_OK);
949       GNUNET_SERVER_transmit_context_run (ii->tc,
950                                           GNUNET_TIME_UNIT_MINUTES);
951       GNUNET_free (ii);
952       return;
953     }
954   ii->next = indexed_files;
955   indexed_files = ii;
956   write_index_list ();
957   GNUNET_SERVER_transmit_context_append (ii->tc,
958                                          NULL, 0,
959                                          GNUNET_MESSAGE_TYPE_FS_INDEX_START_OK);
960   GNUNET_SERVER_transmit_context_run (ii->tc,
961                                       GNUNET_TIME_UNIT_MINUTES);
962   ii->tc = NULL;
963 }
964
965
966 /**
967  * Function called once the hash computation over an
968  * indexed file has completed.
969  *
970  * @param cls closure, our publishing context
971  * @param res resulting hash, NULL on error
972  */
973 static void 
974 hash_for_index_val (void *cls,
975                     const GNUNET_HashCode *
976                     res)
977 {
978   struct IndexInfo *ii = cls;
979   
980   if ( (res == NULL) ||
981        (0 != memcmp (res,
982                      &ii->file_id,
983                      sizeof(GNUNET_HashCode))) )
984     {
985       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
986                   _("Hash mismatch trying to index file `%s'\n"),
987                   ii->filename);
988       GNUNET_SERVER_transmit_context_append (ii->tc,
989                                              NULL, 0,
990                                              GNUNET_MESSAGE_TYPE_FS_INDEX_START_FAILED);
991       GNUNET_SERVER_transmit_context_run (ii->tc,
992                                           GNUNET_TIME_UNIT_MINUTES);
993       GNUNET_free (ii);
994       return;
995     }
996   signal_index_ok (ii);
997 }
998
999
1000 /**
1001  * Handle INDEX_START-message.
1002  *
1003  * @param cls closure
1004  * @param client identification of the client
1005  * @param message the actual message
1006  */
1007 static void
1008 handle_index_start (void *cls,
1009                     struct GNUNET_SERVER_Client *client,
1010                     const struct GNUNET_MessageHeader *message)
1011 {
1012   const struct IndexStartMessage *ism;
1013   const char *fn;
1014   uint16_t msize;
1015   struct IndexInfo *ii;
1016   size_t slen;
1017   uint32_t dev;
1018   uint64_t ino;
1019   uint32_t mydev;
1020   uint64_t myino;
1021
1022   msize = ntohs(message->size);
1023   if ( (msize <= sizeof (struct IndexStartMessage)) ||
1024        ( ((const char *)message)[msize-1] != '\0') )
1025     {
1026       GNUNET_break (0);
1027       GNUNET_SERVER_receive_done (client,
1028                                   GNUNET_SYSERR);
1029       return;
1030     }
1031   ism = (const struct IndexStartMessage*) message;
1032   fn = (const char*) &ism[1];
1033   dev = ntohl (ism->device);
1034   ino = GNUNET_ntohll (ism->inode);
1035   ism = (const struct IndexStartMessage*) message;
1036   slen = strlen (fn) + 1;
1037   ii = GNUNET_malloc (sizeof (struct IndexInfo) + slen);
1038   ii->filename = (const char*) &ii[1];
1039   memcpy (&ii[1], fn, slen);
1040   ii->file_id = ism->file_id;  
1041   ii->tc = GNUNET_SERVER_transmit_context_create (client);
1042   if ( ( (dev != 0) ||
1043          (ino != 0) ) &&
1044        (GNUNET_OK == GNUNET_DISK_file_get_identifiers (fn,
1045                                                        &mydev,
1046                                                        &myino)) &&
1047        ( (dev == mydev) &&
1048          (ino == myino) ) )
1049     {      
1050       /* fast validation OK! */
1051       signal_index_ok (ii);
1052       return;
1053     }
1054   /* slow validation, need to hash full file (again) */
1055   GNUNET_CRYPTO_hash_file (sched,
1056                            GNUNET_SCHEDULER_PRIORITY_IDLE,
1057                            GNUNET_NO,
1058                            fn,
1059                            HASHING_BLOCKSIZE,
1060                            &hash_for_index_val,
1061                            ii);
1062 }
1063
1064
1065 /**
1066  * Handle INDEX_LIST_GET-message.
1067  *
1068  * @param cls closure
1069  * @param client identification of the client
1070  * @param message the actual message
1071  */
1072 static void
1073 handle_index_list_get (void *cls,
1074                        struct GNUNET_SERVER_Client *client,
1075                        const struct GNUNET_MessageHeader *message)
1076 {
1077   struct GNUNET_SERVER_TransmitContext *tc;
1078   struct IndexInfoMessage *iim;
1079   char buf[GNUNET_SERVER_MAX_MESSAGE_SIZE];
1080   size_t slen;
1081   const char *fn;
1082   struct GNUNET_MessageHeader *msg;
1083   struct IndexInfo *pos;
1084
1085   tc = GNUNET_SERVER_transmit_context_create (client);
1086   iim = (struct IndexInfoMessage*) buf;
1087   msg = &iim->header;
1088   pos = indexed_files;
1089   while (NULL != pos)
1090     {
1091       iim->reserved = 0;
1092       iim->file_id = pos->file_id;
1093       fn = pos->filename;
1094       slen = strlen (fn) + 1;
1095       if (slen + sizeof (struct IndexInfoMessage) > 
1096           GNUNET_SERVER_MAX_MESSAGE_SIZE)
1097         {
1098           GNUNET_break (0);
1099           break;
1100         }
1101       memcpy (&iim[1], fn, slen);
1102       GNUNET_SERVER_transmit_context_append
1103         (tc,
1104          &msg[1],
1105          sizeof (struct IndexInfoMessage) 
1106          - sizeof (struct GNUNET_MessageHeader) + slen,
1107          GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_ENTRY);
1108       pos = pos->next;
1109     }
1110   GNUNET_SERVER_transmit_context_append (tc,
1111                                          NULL, 0,
1112                                          GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_END);
1113   GNUNET_SERVER_transmit_context_run (tc,
1114                                       GNUNET_TIME_UNIT_MINUTES);
1115 }
1116
1117
1118 /**
1119  * Handle UNINDEX-message.
1120  *
1121  * @param cls closure
1122  * @param client identification of the client
1123  * @param message the actual message
1124  */
1125 static void
1126 handle_unindex (void *cls,
1127                 struct GNUNET_SERVER_Client *client,
1128                 const struct GNUNET_MessageHeader *message)
1129 {
1130   const struct UnindexMessage *um;
1131   struct IndexInfo *pos;
1132   struct IndexInfo *prev;
1133   struct IndexInfo *next;
1134   struct GNUNET_SERVER_TransmitContext *tc;
1135   int found;
1136   
1137   um = (const struct UnindexMessage*) message;
1138   found = GNUNET_NO;
1139   prev = NULL;
1140   pos = indexed_files;
1141   while (NULL != pos)
1142     {
1143       next = pos->next;
1144       if (0 == memcmp (&pos->file_id,
1145                        &um->file_id,
1146                        sizeof (GNUNET_HashCode)))
1147         {
1148           if (prev == NULL)
1149             indexed_files = pos->next;
1150           else
1151             prev->next = pos->next;
1152           GNUNET_free (pos);
1153           found = GNUNET_YES;
1154         }
1155       else
1156         {
1157           prev = pos;
1158         }
1159       pos = next;
1160     }
1161   if (GNUNET_YES == found)
1162     write_index_list ();
1163   tc = GNUNET_SERVER_transmit_context_create (client);
1164   GNUNET_SERVER_transmit_context_append (tc,
1165                                          NULL, 0,
1166                                          GNUNET_MESSAGE_TYPE_FS_UNINDEX_OK);
1167   GNUNET_SERVER_transmit_context_run (tc,
1168                                       GNUNET_TIME_UNIT_MINUTES);
1169 }
1170
1171
1172 /**
1173  * Run the next DS request in our
1174  * queue, we're done with the current one.
1175  */
1176 static void
1177 next_ds_request ()
1178 {
1179   struct DatastoreRequestQueue *e;
1180   
1181   while (NULL != (e = drq_head))
1182     {
1183       if (0 != GNUNET_TIME_absolute_get_remaining (e->timeout).value)
1184         break;
1185       if (e->task != GNUNET_SCHEDULER_NO_TASK)
1186         GNUNET_SCHEDULER_cancel (sched, e->task);
1187       GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
1188       e->req (e->req_cls, GNUNET_NO);
1189       GNUNET_free (e);  
1190     }
1191   if (e == NULL)
1192     return;
1193   if (e->task != GNUNET_SCHEDULER_NO_TASK)
1194     GNUNET_SCHEDULER_cancel (sched, e->task);
1195   e->task = GNUNET_SCHEDULER_NO_TASK;
1196   e->req (e->req_cls, GNUNET_YES);
1197   GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
1198   GNUNET_free (e);  
1199 }
1200
1201
1202 /**
1203  * A datastore request had to be timed out. 
1204  *
1205  * @param cls closure (of type "struct DatastoreRequestQueue*")
1206  * @param tc task context, unused
1207  */
1208 static void
1209 timeout_ds_request (void *cls,
1210                     const struct GNUNET_SCHEDULER_TaskContext *tc)
1211 {
1212   struct DatastoreRequestQueue *e = cls;
1213
1214   e->task = GNUNET_SCHEDULER_NO_TASK;
1215   GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
1216   e->req (e->req_cls, GNUNET_NO);
1217   GNUNET_free (e);  
1218 }
1219
1220
1221 /**
1222  * Queue a request for the datastore.
1223  *
1224  * @param deadline by when the request should run
1225  * @param fun function to call once the request can be run
1226  * @param fun_cls closure for fun
1227  */
1228 static struct DatastoreRequestQueue *
1229 queue_ds_request (struct GNUNET_TIME_Relative deadline,
1230                   RequestFunction fun,
1231                   void *fun_cls)
1232 {
1233   struct DatastoreRequestQueue *e;
1234   struct DatastoreRequestQueue *bef;
1235
1236   if (drq_head == NULL)
1237     {
1238       /* no other requests pending, run immediately */
1239       fun (fun_cls, GNUNET_OK);
1240       return NULL;
1241     }
1242   e = GNUNET_malloc (sizeof (struct DatastoreRequestQueue));
1243   e->timeout = GNUNET_TIME_relative_to_absolute (deadline);
1244   e->req = fun;
1245   e->req_cls = fun_cls;
1246   if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
1247     {
1248       /* local request, highest prio, put at head of queue
1249          regardless of deadline */
1250       bef = NULL;
1251     }
1252   else
1253     {
1254       bef = drq_tail;
1255       while ( (NULL != bef) &&
1256               (e->timeout.value < bef->timeout.value) )
1257         bef = bef->prev;
1258     }
1259   GNUNET_CONTAINER_DLL_insert_after (drq_head, drq_tail, bef, e);
1260   if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
1261     return e;
1262   e->task = GNUNET_SCHEDULER_add_delayed (sched,
1263                                           GNUNET_NO,
1264                                           GNUNET_SCHEDULER_PRIORITY_BACKGROUND,
1265                                           GNUNET_SCHEDULER_NO_TASK,
1266                                           deadline,
1267                                           &timeout_ds_request,
1268                                           e);
1269   return e;                                    
1270 }
1271
1272
1273 /**
1274  * Free the state associated with a local get context.
1275  *
1276  * @param lgc the lgc to free
1277  */
1278 static void
1279 local_get_context_free (struct LocalGetContext *lgc) 
1280 {
1281   GNUNET_CONTAINER_DLL_remove (lgc_head, lgc_tail, lgc);
1282   GNUNET_SERVER_client_drop (lgc->client); 
1283   GNUNET_free_non_null (lgc->results);
1284   if (lgc->results_bf != NULL)
1285     GNUNET_CONTAINER_bloomfilter_free (lgc->results_bf);
1286   if (lgc->req != NULL)
1287     {
1288       if (lgc->req->task != GNUNET_SCHEDULER_NO_TASK)
1289         GNUNET_SCHEDULER_cancel (sched, lgc->req->task);
1290       GNUNET_CONTAINER_DLL_remove (lgc_head, lgc_tail, lgc);
1291       GNUNET_free (lgc->req);
1292     }
1293   GNUNET_free (lgc);
1294 }
1295
1296
1297 /**
1298  * We're able to transmit the next (local) result to the client.
1299  * Do it and ask the datastore for more.  Or, on error, tell
1300  * the datastore to stop giving us more.
1301  *
1302  * @param cls our closure (struct LocalGetContext)
1303  * @param max maximum number of bytes we can transmit
1304  * @param buf where to copy our message
1305  * @return number of bytes copied to buf
1306  */
1307 static size_t
1308 transmit_local_result (void *cls,
1309                        size_t max,
1310                        void *buf)
1311 {
1312   struct LocalGetContext *lgc = cls;  
1313   uint16_t msize;
1314
1315   if (NULL == buf)
1316     {
1317 #if DEBUG_FS
1318       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1319                   "Failed to transmit result to local client, aborting datastore iteration.\n");
1320 #endif
1321       /* error, abort! */
1322       GNUNET_free (lgc->result);
1323       lgc->result = NULL;
1324       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
1325       return 0;
1326     }
1327   msize = ntohs (lgc->result->header.size);
1328 #if DEBUG_FS
1329   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1330               "Transmitting %u bytes of result to local client.\n",
1331               msize);
1332 #endif
1333   GNUNET_assert (max >= msize);
1334   memcpy (buf, lgc->result, msize);
1335   GNUNET_free (lgc->result);
1336   lgc->result = NULL;
1337   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1338   return msize;
1339 }
1340
1341
1342 /**
1343  * Continuation called from datastore's remove
1344  * function.
1345  *
1346  * @param cls unused
1347  * @param success did the deletion work?
1348  * @param msg error message
1349  */
1350 static void
1351 remove_cont (void *cls,
1352              int success,
1353              const char *msg)
1354 {
1355   if (GNUNET_OK != success)
1356     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1357                 _("Failed to delete bogus block: %s\n"),
1358                 msg);
1359   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1360 }
1361
1362
1363 /**
1364  * Mingle hash with the mingle_number to
1365  * produce different bits.
1366  */
1367 static void
1368 mingle_hash (const GNUNET_HashCode * in,
1369              int32_t mingle_number, 
1370              GNUNET_HashCode * hc)
1371 {
1372   GNUNET_HashCode m;
1373
1374   GNUNET_CRYPTO_hash (&mingle_number, 
1375                       sizeof (int32_t), 
1376                       &m);
1377   GNUNET_CRYPTO_hash_xor (&m, in, hc);
1378 }
1379
1380
1381 /**
1382  * We've received an on-demand encoded block
1383  * from the datastore.  Attempt to do on-demand
1384  * encoding and (if successful), call the 
1385  * continuation with the resulting block.  On
1386  * error, clean up and ask the datastore for
1387  * more results.
1388  *
1389  * @param key key for the content
1390  * @param size number of bytes in data
1391  * @param data content stored
1392  * @param type type of the content
1393  * @param priority priority of the content
1394  * @param anonymity anonymity-level for the content
1395  * @param expiration expiration time for the content
1396  * @param uid unique identifier for the datum;
1397  *        maybe 0 if no unique identifier is available
1398  * @param cont function to call with the actual block
1399  * @param cont_cls closure for cont
1400  */
1401 static void
1402 handle_on_demand_block (const GNUNET_HashCode * key,
1403                         uint32_t size,
1404                         const void *data,
1405                         uint32_t type,
1406                         uint32_t priority,
1407                         uint32_t anonymity,
1408                         struct GNUNET_TIME_Absolute
1409                         expiration, uint64_t uid,
1410                         GNUNET_DATASTORE_Iterator cont,
1411                         void *cont_cls)
1412 {
1413   const struct OnDemandBlock *odb;
1414   GNUNET_HashCode nkey;
1415   struct GNUNET_CRYPTO_AesSessionKey skey;
1416   struct GNUNET_CRYPTO_AesInitializationVector iv;
1417   GNUNET_HashCode query;
1418   ssize_t nsize;
1419   char ndata[DBLOCK_SIZE];
1420   char edata[DBLOCK_SIZE];
1421   const char *fn;
1422   struct GNUNET_DISK_FileHandle *fh;
1423   uint64_t off;
1424
1425   if (size != sizeof (struct OnDemandBlock))
1426     {
1427       GNUNET_break (0);
1428       GNUNET_DATASTORE_remove (dsh, 
1429                                key,
1430                                size,
1431                                data,
1432                                &remove_cont,
1433                                NULL,
1434                                GNUNET_TIME_UNIT_FOREVER_REL);     
1435       return;
1436     }
1437   odb = (const struct OnDemandBlock*) data;
1438   off = GNUNET_ntohll (odb->offset);
1439   fn = (const char*) GNUNET_CONTAINER_multihashmap_get (ifm,
1440                                                         &odb->file_id);
1441   fh = NULL;
1442   if ( (NULL == fn) ||
1443        (NULL == (fh = GNUNET_DISK_file_open (fn, 
1444                                              GNUNET_DISK_OPEN_READ,
1445                                              GNUNET_DISK_PERM_NONE))) ||
1446        (off !=
1447         GNUNET_DISK_file_seek (fh,
1448                                off,
1449                                GNUNET_DISK_SEEK_SET)) ||
1450        (-1 ==
1451         (nsize = GNUNET_DISK_file_read (fh,
1452                                         ndata,
1453                                         sizeof (ndata)))) )
1454     {
1455       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1456                   _("Could not access indexed file `%s' at offset %llu: %s\n"),
1457                   GNUNET_h2s (&odb->file_id),
1458                   (unsigned long long) off,
1459                   STRERROR (errno));
1460       if (fh != NULL)
1461         GNUNET_DISK_file_close (fh);
1462       /* FIXME: if this happens often, we need
1463          to remove the OnDemand block from the DS! */
1464       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);        
1465       return;
1466     }
1467   GNUNET_DISK_file_close (fh);
1468   GNUNET_CRYPTO_hash (ndata,
1469                       nsize,
1470                       &nkey);
1471   GNUNET_CRYPTO_hash_to_aes_key (&nkey, &skey, &iv);
1472   GNUNET_CRYPTO_aes_encrypt (ndata,
1473                              nsize,
1474                              &skey,
1475                              &iv,
1476                              edata);
1477   GNUNET_CRYPTO_hash (edata,
1478                       nsize,
1479                       &query);
1480   if (0 != memcmp (&query, 
1481                    key,
1482                    sizeof (GNUNET_HashCode)))
1483     {
1484       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1485                   _("Indexed file `%s' changed at offset %llu\n"),
1486                   fn,
1487                   (unsigned long long) off);
1488       /* FIXME: if this happens often, we need
1489          to remove the OnDemand block from the DS! */
1490       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1491       return;
1492     }
1493   cont (cont_cls,
1494         key,
1495         nsize,
1496         edata,
1497         GNUNET_DATASTORE_BLOCKTYPE_DBLOCK,
1498         priority,
1499         anonymity,
1500         expiration,
1501         uid);
1502 }
1503
1504
1505 /**
1506  * How many bytes should a bloomfilter be if we have already seen
1507  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
1508  * of bits set per entry.  Furthermore, we should not re-size the
1509  * filter too often (to keep it cheap).
1510  *
1511  * Since other peers will also add entries but not resize the filter,
1512  * we should generally pick a slightly larger size than what the
1513  * strict math would suggest.
1514  *
1515  * @return must be a power of two and smaller or equal to 2^15.
1516  */
1517 static size_t
1518 compute_bloomfilter_size (unsigned int entry_count)
1519 {
1520   size_t size;
1521   unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
1522   uint16_t max = 1 << 15;
1523
1524   if (entry_count > max)
1525     return max;
1526   size = 8;
1527   while ((size < max) && (size < ideal))
1528     size *= 2;
1529   if (size > max)
1530     return max;
1531   return size;
1532 }
1533
1534
1535 /**
1536  * Recalculate our bloom filter for filtering replies.
1537  *
1538  * @param count number of entries we are filtering right now
1539  * @param mingle set to our new mingling value
1540  * @param bf_size set to the size of the bloomfilter
1541  * @param entries the entries to filter
1542  * @return updated bloomfilter, NULL for none
1543  */
1544 static struct GNUNET_CONTAINER_BloomFilter *
1545 refresh_bloomfilter (unsigned int count,
1546                      int32_t * mingle,
1547                      size_t *bf_size,
1548                      const GNUNET_HashCode *entries)
1549 {
1550   struct GNUNET_CONTAINER_BloomFilter *bf;
1551   size_t nsize;
1552   unsigned int i;
1553   GNUNET_HashCode mhash;
1554
1555   if (0 == count)
1556     return NULL;
1557   nsize = compute_bloomfilter_size (count);
1558   *mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
1559   *bf_size = nsize;
1560   bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
1561                                           nsize,
1562                                           BLOOMFILTER_K);
1563   for (i=0;i<count;i++)
1564     {
1565       mingle_hash (&entries[i], *mingle, &mhash);
1566       GNUNET_CONTAINER_bloomfilter_add (bf, &mhash);
1567     }
1568   return bf;
1569 }
1570
1571
1572 /**
1573  * Closure used for "target_peer_select_cb".
1574  */
1575 struct PeerSelectionContext 
1576 {
1577   /**
1578    * The request for which we are selecting
1579    * peers.
1580    */
1581   struct PendingRequest *pr;
1582
1583   /**
1584    * Current "prime" target.
1585    */
1586   struct GNUNET_PeerIdentity target;
1587
1588   /**
1589    * How much do we like this target?
1590    */
1591   double target_score;
1592
1593 };
1594
1595
1596 /**
1597  * Function called for each connected peer to determine
1598  * which one(s) would make good targets for forwarding.
1599  *
1600  * @param cls closure (struct PeerSelectionContext)
1601  * @param key current key code (peer identity)
1602  * @param value value in the hash map (struct ConnectedPeer)
1603  * @return GNUNET_YES if we should continue to
1604  *         iterate,
1605  *         GNUNET_NO if not.
1606  */
1607 static int
1608 target_peer_select_cb (void *cls,
1609                        const GNUNET_HashCode * key,
1610                        void *value)
1611 {
1612   struct PeerSelectionContext *psc = cls;
1613   struct ConnectedPeer *cp = value;
1614   struct PendingRequest *pr = psc->pr;
1615   double score;
1616   unsigned int i;
1617
1618   /* 1) check if we have already (recently) forwarded to this peer */
1619   for (i=0;i<pr->used_pids_off;i++)
1620     if (pr->used_pids[i] == cp->pid)
1621       return GNUNET_YES; /* skip */
1622   // 2) calculate how much we'd like to forward to this peer
1623   score = 0; // FIXME!
1624   
1625   /* store best-fit in closure */
1626   if (score > psc->target_score)
1627     {
1628       psc->target_score = score;
1629       psc->target.hashPubKey = *key; 
1630     }
1631   return GNUNET_YES;
1632 }
1633
1634
1635 /**
1636  * We use a random delay to make the timing of requests
1637  * less predictable.  This function returns such a random
1638  * delay.
1639  *
1640  * @return random delay to use for some request, between 0 and TTL_DECREMENT ms
1641  */
1642 static struct GNUNET_TIME_Relative
1643 get_processing_delay ()
1644 {
1645   return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
1646                                         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1647                                                                   TTL_DECREMENT));
1648 }
1649
1650
1651 /**
1652  * Task that is run for each request with the
1653  * goal of forwarding the associated query to
1654  * other peers.  The task should re-schedule
1655  * itself to be re-run once the TTL has expired.
1656  * (or at a later time if more peers should
1657  * be queried earlier).
1658  *
1659  * @param cls the requests "struct PendingRequest*"
1660  * @param tc task context (unused)
1661  */
1662 static void
1663 forward_request_task (void *cls,
1664                       const struct GNUNET_SCHEDULER_TaskContext *tc);
1665
1666
1667 /**
1668  * We've selected a peer for forwarding of a query.
1669  * Construct the message and then re-schedule the
1670  * task to forward again to (other) peers.
1671  *
1672  * @param cls closure
1673  * @param size number of bytes available in buf
1674  * @param buf where the callee should write the message
1675  * @return number of bytes written to buf
1676  */
1677 static size_t
1678 transmit_request_cb (void *cls,
1679                      size_t size, 
1680                      void *buf)
1681 {
1682   struct PendingRequest *pr = cls;
1683   struct GetMessage *gm;
1684   GNUNET_HashCode *ext;
1685   char *bfdata;
1686   size_t msize;
1687   unsigned int k;
1688
1689   pr->cth = NULL;
1690   /* (1) check for timeout */
1691   if (NULL == buf)
1692     {
1693       /* timeout, try another peer immediately again */
1694       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1695                                                GNUNET_NO,
1696                                                GNUNET_SCHEDULER_PRIORITY_IDLE,
1697                                                GNUNET_SCHEDULER_NO_TASK,
1698                                                GNUNET_TIME_UNIT_ZERO,
1699                                                &forward_request_task,
1700                                                pr);
1701       return 0;
1702     }
1703   /* (2) build query message */
1704   k = 0; // FIXME: count hash codes!
1705   msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
1706   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1707   gm = (struct GetMessage*) buf;
1708   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
1709   gm->header.size = htons (msize);
1710   gm->type = htonl (pr->type);
1711   pr->remaining_priority /= 2;
1712   gm->priority = htonl (pr->remaining_priority);
1713   gm->ttl = htonl (pr->ttl);
1714   gm->filter_mutator = htonl(pr->mingle);
1715   gm->hash_bitmap = htonl (42);
1716   gm->query = pr->query;
1717   ext = (GNUNET_HashCode*) &gm[1];
1718   // FIXME: setup "ext[0]..[k-1]"
1719   bfdata = (char *) &ext[k];
1720   if (pr->bf != NULL)
1721     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
1722                                                bfdata,
1723                                                pr->bf_size);
1724   
1725   /* (3) schedule job to do it again (or another peer, etc.) */
1726   pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1727                                            GNUNET_NO,
1728                                            GNUNET_SCHEDULER_PRIORITY_IDLE,
1729                                            GNUNET_SCHEDULER_NO_TASK,
1730                                            get_processing_delay (), // FIXME!
1731                                            &forward_request_task,
1732                                            pr);
1733
1734   return msize;
1735 }
1736
1737
1738 /**
1739  * Function called after we've tried to reserve
1740  * a certain amount of bandwidth for a reply.
1741  * Check if we succeeded and if so send our query.
1742  *
1743  * @param cls the requests "struct PendingRequest*"
1744  * @param peer identifies the peer
1745  * @param latency current latency estimate, "FOREVER" if we have been
1746  *                disconnected
1747  * @param bpm_in set to the current bandwidth limit (receiving) for this peer
1748  * @param bpm_out set to the current bandwidth limit (sending) for this peer
1749  * @param amount set to the amount that was actually reserved or unreserved
1750  * @param preference current traffic preference for the given peer
1751  */
1752 static void
1753 target_reservation_cb (void *cls,
1754                        const struct
1755                        GNUNET_PeerIdentity * peer,
1756                        unsigned int bpm_in,
1757                        unsigned int bpm_out,
1758                        struct GNUNET_TIME_Relative
1759                        latency, int amount,
1760                        unsigned long long preference)
1761 {
1762   struct PendingRequest *pr = cls;
1763   uint32_t priority;
1764   uint16_t size;
1765   struct GNUNET_TIME_Relative maxdelay;
1766
1767   GNUNET_assert (peer != NULL);
1768   if ( (amount != DBLOCK_SIZE) ||
1769        (pr->cth != NULL) )
1770     {
1771       /* try again later; FIXME: we may need to un-reserve "amount"? */
1772       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1773                                                GNUNET_NO,
1774                                                GNUNET_SCHEDULER_PRIORITY_IDLE,
1775                                                GNUNET_SCHEDULER_NO_TASK,
1776                                                get_processing_delay (), // FIXME: longer?
1777                                                &forward_request_task,
1778                                                pr);
1779       return;
1780     }
1781   // (2) transmit, update ttl/priority
1782   // FIXME: calculate priority, maxdelay, size properly!
1783   priority = 0;
1784   size = 60000;
1785   maxdelay = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
1786   pr->cth = GNUNET_CORE_notify_transmit_ready (core,
1787                                                priority,
1788                                                maxdelay,
1789                                                peer,
1790                                                size,
1791                                                &transmit_request_cb,
1792                                                pr);
1793   if (pr->cth == NULL)
1794     {
1795       /* try again later */
1796       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1797                                                GNUNET_NO,
1798                                                GNUNET_SCHEDULER_PRIORITY_IDLE,
1799                                                GNUNET_SCHEDULER_NO_TASK,
1800                                                get_processing_delay (), // FIXME: longer?
1801                                                &forward_request_task,
1802                                                pr);
1803     }
1804 }
1805
1806
1807 /**
1808  * Task that is run for each request with the
1809  * goal of forwarding the associated query to
1810  * other peers.  The task should re-schedule
1811  * itself to be re-run once the TTL has expired.
1812  * (or at a later time if more peers should
1813  * be queried earlier).
1814  *
1815  * @param cls the requests "struct PendingRequest*"
1816  * @param tc task context (unused)
1817  */
1818 static void
1819 forward_request_task (void *cls,
1820                       const struct GNUNET_SCHEDULER_TaskContext *tc)
1821 {
1822   struct PendingRequest *pr = cls;
1823   struct PeerSelectionContext psc;
1824
1825   pr->task = GNUNET_SCHEDULER_NO_TASK;
1826   if (pr->cth != NULL) 
1827     {
1828       /* we're busy transmitting a result, wait a bit */
1829       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1830                                                GNUNET_NO,
1831                                                GNUNET_SCHEDULER_PRIORITY_IDLE,
1832                                                GNUNET_SCHEDULER_NO_TASK,
1833                                                get_processing_delay (), 
1834                                                &forward_request_task,
1835                                                pr);
1836       return;
1837     }
1838   /* (1) select target */
1839   psc.pr = pr;
1840   psc.target_score = DBL_MIN;
1841   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1842                                          &target_peer_select_cb,
1843                                          &psc);
1844   if (psc.target_score == DBL_MIN)
1845     {
1846       /* no possible target found, wait some time */
1847       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1848                                                GNUNET_NO,
1849                                                GNUNET_SCHEDULER_PRIORITY_IDLE,
1850                                                GNUNET_SCHEDULER_NO_TASK,
1851                                                get_processing_delay (), // FIXME: exponential back-off? or at least wait longer...
1852                                                &forward_request_task,
1853                                                pr);
1854       return;
1855     }
1856   /* (2) reserve reply bandwidth */
1857   // FIXME: need a way to cancel; this
1858   // async operation is problematic (segv-problematic)
1859   // if "pr" is destroyed while it happens!
1860   GNUNET_CORE_peer_configure (core,
1861                               &psc.target,
1862                               GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
1863                               -1,
1864                               DBLOCK_SIZE, // FIXME: make dependent on type?
1865                               0,
1866                               &target_reservation_cb,
1867                               pr);
1868 }
1869
1870
1871 /**
1872  * We're processing (local) results for a search request
1873  * from a (local) client.  Pass applicable results to the
1874  * client and if we are done either clean up (operation
1875  * complete) or switch to P2P search (more results possible).
1876  *
1877  * @param cls our closure (struct LocalGetContext)
1878  * @param key key for the content
1879  * @param size number of bytes in data
1880  * @param data content stored
1881  * @param type type of the content
1882  * @param priority priority of the content
1883  * @param anonymity anonymity-level for the content
1884  * @param expiration expiration time for the content
1885  * @param uid unique identifier for the datum;
1886  *        maybe 0 if no unique identifier is available
1887  */
1888 static void
1889 process_local_get_result (void *cls,
1890                           const GNUNET_HashCode * key,
1891                           uint32_t size,
1892                           const void *data,
1893                           uint32_t type,
1894                           uint32_t priority,
1895                           uint32_t anonymity,
1896                           struct GNUNET_TIME_Absolute
1897                           expiration, 
1898                           uint64_t uid)
1899 {
1900   struct LocalGetContext *lgc = cls;
1901   struct PendingRequest *pr;
1902   struct ClientRequestList *crl;
1903   struct ClientList *cl;
1904   size_t msize;
1905   unsigned int i;
1906
1907   if (key == NULL)
1908     {
1909 #if DEBUG_FS
1910       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1911                   "Received last result for `%s' from local datastore, deciding what to do next.\n",
1912                   GNUNET_h2s (&lgc->query));
1913 #endif
1914       /* no further results from datastore; continue
1915          processing further requests from the client and
1916          allow the next task to use the datastore; also,
1917          switch to P2P requests or clean up our state. */
1918       next_ds_request ();
1919       GNUNET_SERVER_receive_done (lgc->client,
1920                                   GNUNET_OK);
1921       if ( (lgc->results_used == 0) ||
1922            (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
1923            (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
1924            (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) )
1925         {
1926 #if DEBUG_FS
1927           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1928                       "Forwarding query for `%s' to network.\n",
1929                       GNUNET_h2s (&lgc->query));
1930 #endif
1931           cl = clients;
1932           while ( (NULL != cl) &&
1933                   (cl->client != lgc->client) )
1934             cl = cl->next;
1935           if (cl == NULL)
1936             {
1937               cl = GNUNET_malloc (sizeof (struct ClientList));
1938               cl->client = lgc->client;
1939               cl->next = clients;
1940               clients = cl;
1941             }
1942           crl = GNUNET_malloc (sizeof (struct ClientRequestList));
1943           crl->cl = cl;
1944           GNUNET_CONTAINER_DLL_insert (cl->head, cl->tail, crl);
1945           pr = GNUNET_malloc (sizeof (struct PendingRequest));
1946           pr->client = lgc->client;
1947           GNUNET_SERVER_client_keep (pr->client);
1948           pr->crl_entry = crl;
1949           crl->req = pr;
1950           if (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)
1951             {
1952               pr->namespace = GNUNET_malloc (sizeof (GNUNET_HashCode));
1953               *pr->namespace = lgc->namespace;
1954             }
1955           pr->replies_seen = lgc->results;
1956           lgc->results = NULL;
1957           pr->start_time = GNUNET_TIME_absolute_get ();
1958           pr->query = lgc->query;
1959           pr->target_pid = GNUNET_PEER_intern (&lgc->target);
1960           pr->replies_seen_off = lgc->results_used;
1961           pr->replies_seen_size = lgc->results_size;
1962           lgc->results_size = 0;
1963           pr->type = lgc->type;
1964           pr->anonymity_level = lgc->anonymity_level;
1965           pr->bf = refresh_bloomfilter (pr->replies_seen_off,
1966                                         &pr->mingle,
1967                                         &pr->bf_size,
1968                                         pr->replies_seen);
1969           GNUNET_CONTAINER_multihashmap_put (requests_by_query,
1970                                              &pr->query,
1971                                              pr,
1972                                              GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1973           pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1974                                                    GNUNET_NO,
1975                                                    GNUNET_SCHEDULER_PRIORITY_IDLE,
1976                                                    GNUNET_SCHEDULER_NO_TASK,
1977                                                    get_processing_delay (),
1978                                                    &forward_request_task,
1979                                                    pr);
1980           local_get_context_free (lgc);
1981           return;
1982         }
1983       /* got all possible results, clean up! */
1984 #if DEBUG_FS
1985       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1986                   "Found all possible results for query for `%s', done!\n",
1987                   GNUNET_h2s (&lgc->query));
1988 #endif
1989       local_get_context_free (lgc);
1990       return;
1991     }
1992   if (type == GNUNET_DATASTORE_BLOCKTYPE_ONDEMAND)
1993     {
1994 #if DEBUG_FS
1995       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1996                   "Received on-demand block for `%s' from local datastore, fetching data.\n",
1997                   GNUNET_h2s (&lgc->query));
1998 #endif
1999       handle_on_demand_block (key, size, data, type, priority, 
2000                               anonymity, expiration, uid,
2001                               &process_local_get_result,
2002                               lgc);
2003       return;
2004     }
2005   if ( (type != lgc->type) &&
2006        (lgc->type != GNUNET_DATASTORE_BLOCKTYPE_ANY) )
2007     {
2008       /* this should be virtually impossible to reach (DBLOCK 
2009          query hash being identical to KBLOCK/SBLOCK query hash);
2010          nevertheless, if it happens, the correct thing is to
2011          simply skip the result. */
2012 #if DEBUG_FS
2013       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2014                   "Received block of unexpected type (%u, want %u) for `%s' from local datastore, ignoring.\n",
2015                   type,
2016                   lgc->type,
2017                   GNUNET_h2s (&lgc->query));
2018 #endif
2019       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);        
2020       return;
2021     }
2022   /* check if this is a result we've alredy
2023      received */
2024   for (i=0;i<lgc->results_used;i++)
2025     if (0 == memcmp (key,
2026                      &lgc->results[i],
2027                      sizeof (GNUNET_HashCode)))
2028       {
2029 #if DEBUG_FS
2030         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2031                     "Received duplicate result for `%s' from local datastore, ignoring.\n",
2032                     GNUNET_h2s (&lgc->query));
2033 #endif
2034         GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2035         return; 
2036       }
2037   if (lgc->results_used == lgc->results_size)
2038     GNUNET_array_grow (lgc->results,
2039                        lgc->results_size,
2040                        lgc->results_size * 2 + 2);
2041   GNUNET_CRYPTO_hash (data, 
2042                       size, 
2043                       &lgc->results[lgc->results_used++]);    
2044   msize = size + sizeof (struct ContentMessage);
2045   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
2046   lgc->result = GNUNET_malloc (msize);
2047   lgc->result->header.size = htons (msize);
2048   lgc->result->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT);
2049   lgc->result->type = htonl (type);
2050   lgc->result->expiration = GNUNET_TIME_absolute_hton (expiration);
2051   memcpy (&lgc->result[1],
2052           data,
2053           size);
2054 #if DEBUG_FS
2055   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2056               "Received new result for `%s' from local datastore, passing to client.\n",
2057               GNUNET_h2s (&lgc->query));
2058 #endif
2059   GNUNET_SERVER_notify_transmit_ready (lgc->client,
2060                                        msize,
2061                                        GNUNET_TIME_UNIT_FOREVER_REL,
2062                                        &transmit_local_result,
2063                                        lgc);
2064 }
2065
2066
2067 /**
2068  * We're processing a search request from a local
2069  * client.  Now it is our turn to query the datastore.
2070  * 
2071  * @param cls our closure (struct LocalGetContext)
2072  * @param tc unused
2073  */
2074 static void
2075 transmit_local_get (void *cls,
2076                     const struct GNUNET_SCHEDULER_TaskContext *tc)
2077 {
2078   struct LocalGetContext *lgc = cls;
2079   uint32_t type;
2080   
2081   type = lgc->type;
2082   if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK)
2083     type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */
2084   GNUNET_DATASTORE_get (dsh,
2085                         &lgc->query,
2086                         type,
2087                         &process_local_get_result,
2088                         lgc,
2089                         GNUNET_TIME_UNIT_FOREVER_REL);
2090 }
2091
2092
2093 /**
2094  * We're processing a search request from a local
2095  * client.  Now it is our turn to query the datastore.
2096  * 
2097  * @param cls our closure (struct LocalGetContext)
2098  * @param ok did we succeed to queue for datastore access, should always be GNUNET_OK
2099  */
2100 static void 
2101 transmit_local_get_ready (void *cls,
2102                           int ok)
2103 {
2104   struct LocalGetContext *lgc = cls;
2105
2106   GNUNET_assert (GNUNET_OK == ok);
2107   GNUNET_SCHEDULER_add_continuation (sched,
2108                                      GNUNET_NO,
2109                                      &transmit_local_get,
2110                                      lgc,
2111                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
2112 }
2113
2114
2115 /**
2116  * Handle START_SEARCH-message (search request from client).
2117  *
2118  * @param cls closure
2119  * @param client identification of the client
2120  * @param message the actual message
2121  */
2122 static void
2123 handle_start_search (void *cls,
2124                      struct GNUNET_SERVER_Client *client,
2125                      const struct GNUNET_MessageHeader *message)
2126 {
2127   const struct SearchMessage *sm;
2128   struct LocalGetContext *lgc;
2129   uint16_t msize;
2130   unsigned int sc;
2131   
2132   msize = ntohs (message->size);
2133   if ( (msize < sizeof (struct SearchMessage)) ||
2134        (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
2135     {
2136       GNUNET_break (0);
2137       GNUNET_SERVER_receive_done (client,
2138                                   GNUNET_SYSERR);
2139       return;
2140     }
2141   sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
2142   sm = (const struct SearchMessage*) message;
2143   GNUNET_SERVER_client_keep (client);
2144   lgc = GNUNET_malloc (sizeof (struct LocalGetContext));
2145   if  (sc > 0)
2146     {
2147       lgc->results_used = sc;
2148       GNUNET_array_grow (lgc->results,
2149                          lgc->results_size,
2150                          sc * 2);
2151       memcpy (lgc->results,
2152               &sm[1],
2153               sc * sizeof (GNUNET_HashCode));
2154     }
2155   lgc->client = client;
2156   lgc->type = ntohl (sm->type);
2157   lgc->anonymity_level = ntohl (sm->anonymity_level);
2158   switch (lgc->type)
2159     {
2160     case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
2161     case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
2162       lgc->target.hashPubKey = sm->target;
2163       break;
2164     case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
2165       lgc->namespace = sm->target;
2166       break;
2167     default:
2168       break;
2169     }
2170   lgc->query = sm->query;
2171   GNUNET_CONTAINER_DLL_insert (lgc_head, lgc_tail, lgc);
2172   lgc->req = queue_ds_request (GNUNET_TIME_UNIT_FOREVER_REL,
2173                                &transmit_local_get_ready,
2174                                lgc);
2175 }
2176
2177
2178 /**
2179  * List of handlers for the messages understood by this
2180  * service.
2181  */
2182 static struct GNUNET_SERVER_MessageHandler handlers[] = {
2183   {&handle_index_start, NULL, 
2184    GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
2185   {&handle_index_list_get, NULL, 
2186    GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
2187   {&handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
2188    sizeof (struct UnindexMessage) },
2189   {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
2190    0 },
2191   {NULL, NULL, 0, 0}
2192 };
2193
2194
2195 /**
2196  * Clean up the memory used by the PendingRequest structure (except
2197  * for the client or peer list that the request may be part of).
2198  *
2199  * @param pr request to clean up
2200  */
2201 static void
2202 destroy_pending_request (struct PendingRequest *pr)
2203 {
2204   struct PendingReply *reply;
2205   struct ClientList *cl;
2206
2207   GNUNET_CONTAINER_multihashmap_remove (requests_by_query,
2208                                         &pr->query,
2209                                         pr);
2210   // FIXME: not sure how this can work (efficiently)
2211   // also, what does the return value mean?
2212   if (pr->client == NULL)
2213     {
2214       GNUNET_CONTAINER_heap_remove_node (requests_by_expiration,
2215                                          pr);
2216     }
2217   else
2218     {
2219       cl = pr->crl_entry->cl;
2220       GNUNET_CONTAINER_DLL_remove (cl->head,
2221                                    cl->tail,
2222                                    pr->crl_entry);
2223     }
2224   if (GNUNET_SCHEDULER_NO_TASK != pr->task)
2225     GNUNET_SCHEDULER_cancel (sched, pr->task);
2226   if (NULL != pr->cth)
2227     GNUNET_CORE_notify_transmit_ready_cancel (pr->cth);
2228   if (NULL != pr->bf)
2229     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
2230   if (NULL != pr->th)
2231     GNUNET_CONNECTION_notify_transmit_ready_cancel (pr->th);
2232   while (NULL != (reply = pr->replies_pending))
2233     {
2234       pr->replies_pending = reply->next;
2235       GNUNET_free (reply);
2236     }
2237   GNUNET_PEER_change_rc (pr->source_pid, -1);
2238   GNUNET_PEER_change_rc (pr->target_pid, -1);
2239   GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
2240   GNUNET_free_non_null (pr->used_pids);
2241   GNUNET_free_non_null (pr->replies_seen);
2242   GNUNET_free_non_null (pr->namespace);
2243   GNUNET_free (pr);
2244 }
2245
2246
2247 /**
2248  * A client disconnected.  Remove all of its pending queries.
2249  *
2250  * @param cls closure, NULL
2251  * @param client identification of the client
2252  */
2253 static void
2254 handle_client_disconnect (void *cls,
2255                           struct GNUNET_SERVER_Client
2256                           * client)
2257 {
2258   struct LocalGetContext *lgc;
2259   struct ClientList *cpos;
2260   struct ClientList *cprev;
2261   struct ClientRequestList *rl;
2262
2263   lgc = lgc_head;
2264   while ( (NULL != lgc) &&
2265           (lgc->client != client) )
2266     lgc = lgc->next;
2267   if (lgc != NULL)
2268     local_get_context_free (lgc);
2269   cprev = NULL;
2270   cpos = clients;
2271   while ( (NULL != cpos) &&
2272           (clients->client != client) )
2273     {
2274       cprev = cpos;
2275       cpos = cpos->next;
2276     }
2277   if (cpos != NULL)
2278     {
2279       if (cprev == NULL)
2280         clients = cpos->next;
2281       else
2282         cprev->next = cpos->next;
2283       while (NULL != (rl = cpos->head))
2284         {
2285           cpos->head = rl->next;
2286           destroy_pending_request (rl->req);
2287           GNUNET_free (rl);
2288         }
2289       GNUNET_free (cpos);
2290     }
2291 }
2292
2293
2294 /**
2295  * Iterator over entries in the "requests_by_query" map
2296  * that frees all the entries.
2297  *
2298  * @param cls closure, NULL
2299  * @param key current key code (the query, unused) 
2300  * @param value value in the hash map, of type "struct PendingRequest*"
2301  * @return GNUNET_YES (we should continue to  iterate)
2302  */
2303 static int 
2304 destroy_pending_request_cb (void *cls,
2305                             const GNUNET_HashCode * key,
2306                             void *value)
2307 {
2308   struct PendingRequest *pr = value;
2309
2310   destroy_pending_request (pr);
2311   return GNUNET_YES;
2312 }
2313
2314
2315 /**
2316  * Task run during shutdown.
2317  *
2318  * @param cls unused
2319  * @param tc unused
2320  */
2321 static void
2322 shutdown_task (void *cls,
2323                const struct GNUNET_SCHEDULER_TaskContext *tc)
2324 {
2325   struct IndexInfo *pos;  
2326
2327   if (NULL != core)
2328     GNUNET_CORE_disconnect (core);
2329   GNUNET_DATASTORE_disconnect (dsh,
2330                                GNUNET_NO);
2331   dsh = NULL;
2332   GNUNET_CONTAINER_multihashmap_iterate (requests_by_query,
2333                                          &destroy_pending_request_cb,
2334                                          NULL);
2335   while (clients != NULL)
2336     handle_client_disconnect (NULL,
2337                               clients->client);
2338   GNUNET_CONTAINER_multihashmap_destroy (requests_by_query);
2339   requests_by_query = NULL;
2340   GNUNET_CONTAINER_multihashmap_destroy (requests_by_peer);
2341   requests_by_peer = NULL;
2342   GNUNET_CONTAINER_heap_destroy (requests_by_expiration);
2343   requests_by_expiration = NULL;
2344   // FIXME: iterate over entries and free individually?
2345   // (or do we get disconnect notifications?)
2346   GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
2347   connected_peers = NULL;
2348   GNUNET_CONTAINER_multihashmap_destroy (ifm);
2349   ifm = NULL;
2350   while (NULL != (pos = indexed_files))
2351     {
2352       indexed_files = pos->next;
2353       GNUNET_free (pos);
2354     }
2355 }
2356
2357
2358 /**
2359  * Free (each) request made by the peer.
2360  *
2361  * @param cls closure, points to peer that the request belongs to
2362  * @param key current key code
2363  * @param value value in the hash map
2364  * @return GNUNET_YES (we should continue to iterate)
2365  */
2366 static int
2367 destroy_request (void *cls,
2368                  const GNUNET_HashCode * key,
2369                  void *value)
2370 {
2371   const struct GNUNET_PeerIdentity * peer = cls;
2372   struct PendingRequest *pr = value;
2373   
2374   GNUNET_CONTAINER_multihashmap_remove (requests_by_peer,
2375                                         &peer->hashPubKey,
2376                                         pr);
2377   destroy_pending_request (pr);
2378   return GNUNET_YES;
2379 }
2380
2381
2382
2383 /**
2384  * Method called whenever a given peer connects.
2385  *
2386  * @param cls closure, not used
2387  * @param peer peer identity this notification is about
2388  */
2389 static void 
2390 peer_connect_handler (void *cls,
2391                       const struct
2392                       GNUNET_PeerIdentity * peer)
2393 {
2394   struct ConnectedPeer *cp;
2395
2396   cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
2397   cp->pid = GNUNET_PEER_intern (peer);
2398   GNUNET_CONTAINER_multihashmap_put (connected_peers,
2399                                      &peer->hashPubKey,
2400                                      cp,
2401                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
2402 }
2403
2404
2405 /**
2406  * Method called whenever a peer disconnects.
2407  *
2408  * @param cls closure, not used
2409  * @param peer peer identity this notification is about
2410  */
2411 static void
2412 peer_disconnect_handler (void *cls,
2413                          const struct
2414                          GNUNET_PeerIdentity * peer)
2415 {
2416   struct ConnectedPeer *cp;
2417
2418   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2419                                           &peer->hashPubKey);
2420   GNUNET_PEER_change_rc (cp->pid, -1);
2421   GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
2422   GNUNET_free (cp);
2423   GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_peer,
2424                                               &peer->hashPubKey,
2425                                               &destroy_request,
2426                                               (void*) peer);
2427 }
2428
2429
2430 /**
2431  * We're processing a GET request from
2432  * another peer and have decided to forward
2433  * it to other peers.
2434  *
2435  * @param cls our "struct ProcessGetContext *"
2436  * @param tc unused
2437  */
2438 static void
2439 forward_get_request (void *cls,
2440                      const struct GNUNET_SCHEDULER_TaskContext *tc)
2441 {
2442   struct ProcessGetContext *pgc = cls;
2443   struct PendingRequest *pr;
2444   struct PendingRequest *eer;
2445   struct GNUNET_PeerIdentity target;
2446
2447   pr = GNUNET_malloc (sizeof (struct PendingRequest));
2448   if (GET_MESSAGE_BIT_SKS_NAMESPACE == (GET_MESSAGE_BIT_SKS_NAMESPACE & pgc->bm))
2449     {
2450       pr->namespace = GNUNET_malloc (sizeof(GNUNET_HashCode));
2451       *pr->namespace = pgc->namespace;
2452     }
2453   pr->bf = pgc->bf;
2454   pr->bf_size = pgc->bf_size;
2455   pgc->bf = NULL;
2456   pr->start_time = pgc->start_time;
2457   pr->query = pgc->query;
2458   pr->source_pid = GNUNET_PEER_intern (&pgc->reply_to);
2459   if (GET_MESSAGE_BIT_TRANSMIT_TO == (GET_MESSAGE_BIT_TRANSMIT_TO & pgc->bm))
2460     pr->target_pid = GNUNET_PEER_intern (&pgc->prime_target);
2461   pr->anonymity_level = 1; /* default */
2462   pr->priority = pgc->priority;
2463   pr->remaining_priority = pr->priority;
2464   pr->mingle = pgc->mingle;
2465   pr->ttl = pgc->ttl; 
2466   pr->type = pgc->type;
2467   GNUNET_CONTAINER_multihashmap_put (requests_by_query,
2468                                      &pr->query,
2469                                      pr,
2470                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
2471   GNUNET_CONTAINER_multihashmap_put (requests_by_peer,
2472                                      &pgc->reply_to.hashPubKey,
2473                                      pr,
2474                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
2475   GNUNET_CONTAINER_heap_insert (requests_by_expiration,
2476                                 pr,
2477                                 pr->start_time.value + pr->ttl);
2478   if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) > max_pending_requests)
2479     {
2480       /* expire oldest request! */
2481       eer = GNUNET_CONTAINER_heap_peek (requests_by_expiration);
2482       GNUNET_PEER_resolve (eer->source_pid,
2483                            &target);    
2484       GNUNET_CONTAINER_multihashmap_remove (requests_by_peer,
2485                                             &target.hashPubKey,
2486                                             eer);
2487       destroy_pending_request (eer);     
2488     }
2489   pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2490                                            GNUNET_NO,
2491                                            GNUNET_SCHEDULER_PRIORITY_IDLE,
2492                                            GNUNET_SCHEDULER_NO_TASK,
2493                                            get_processing_delay (),
2494                                            &forward_request_task,
2495                                            pr);
2496   GNUNET_free (pgc); 
2497 }
2498 /**
2499  * Transmit the given message by copying it to
2500  * the target buffer "buf".  "buf" will be
2501  * NULL and "size" zero if the socket was closed for
2502  * writing in the meantime.  In that case, only
2503
2504  * free the message
2505  *
2506  * @param cls closure, pointer to the message
2507  * @param size number of bytes available in buf
2508  * @param buf where the callee should write the message
2509  * @return number of bytes written to buf
2510  */
2511 static size_t
2512 transmit_message (void *cls,
2513                   size_t size, void *buf)
2514 {
2515   struct GNUNET_MessageHeader *msg = cls;
2516   uint16_t msize;
2517   
2518   if (NULL == buf)
2519     {
2520 #if DEBUG_FS
2521       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2522                   "Dropping reply, core too busy.\n");
2523 #endif
2524       GNUNET_free (msg);
2525       return 0;
2526     }
2527   msize = ntohs (msg->size);
2528   GNUNET_assert (size >= msize);
2529   memcpy (buf, msg, msize);
2530   GNUNET_free (msg);
2531   return msize;
2532 }
2533
2534
2535 /**
2536  * Test if the load on this peer is too high
2537  * to even consider processing the query at
2538  * all.
2539  * 
2540  * @return GNUNET_YES if the load is too high, GNUNET_NO otherwise
2541  */
2542 static int
2543 test_load_too_high ()
2544 {
2545   return GNUNET_NO; // FIXME
2546 }
2547
2548
2549 /**
2550  * We're processing (local) results for a search request
2551  * from another peer.  Pass applicable results to the
2552  * peer and if we are done either clean up (operation
2553  * complete) or forward to other peers (more results possible).
2554  *
2555  * @param cls our closure (struct LocalGetContext)
2556  * @param key key for the content
2557  * @param size number of bytes in data
2558  * @param data content stored
2559  * @param type type of the content
2560  * @param priority priority of the content
2561  * @param anonymity anonymity-level for the content
2562  * @param expiration expiration time for the content
2563  * @param uid unique identifier for the datum;
2564  *        maybe 0 if no unique identifier is available
2565  */
2566 static void
2567 process_p2p_get_result (void *cls,
2568                         const GNUNET_HashCode * key,
2569                         uint32_t size,
2570                         const void *data,
2571                         uint32_t type,
2572                         uint32_t priority,
2573                         uint32_t anonymity,
2574                         struct GNUNET_TIME_Absolute
2575                         expiration, 
2576                         uint64_t uid)
2577 {
2578   struct ProcessGetContext *pgc = cls;
2579   GNUNET_HashCode dhash;
2580   GNUNET_HashCode mhash;
2581   struct PutMessage *reply;
2582   
2583   if (NULL == key)
2584     {
2585       /* no more results */
2586       if ( ( (pgc->policy & ROUTING_POLICY_FORWARD) ==  ROUTING_POLICY_FORWARD) &&
2587            ( (0 == pgc->results_found) ||
2588              (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
2589              (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
2590              (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) ) )
2591         {
2592           GNUNET_SCHEDULER_add_continuation (sched,
2593                                              GNUNET_NO,
2594                                              &forward_get_request,
2595                                              pgc,
2596                                              GNUNET_SCHEDULER_REASON_PREREQ_DONE);
2597         }
2598       else
2599         {
2600           if (pgc->bf != NULL)
2601             GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2602           GNUNET_free (pgc); 
2603         }
2604       next_ds_request ();
2605       return;
2606     }
2607   if (type == GNUNET_DATASTORE_BLOCKTYPE_ONDEMAND)
2608     {
2609       handle_on_demand_block (key, size, data, type, priority, 
2610                               anonymity, expiration, uid,
2611                               &process_p2p_get_result,
2612                               pgc);
2613       return;
2614     }
2615   /* check for duplicates */
2616   GNUNET_CRYPTO_hash (data, size, &dhash);
2617   mingle_hash (&dhash, 
2618                pgc->mingle,
2619                &mhash);
2620   if ( (pgc->bf != NULL) &&
2621        (GNUNET_YES ==
2622         GNUNET_CONTAINER_bloomfilter_test (pgc->bf,
2623                                            &mhash)) )
2624     {      
2625 #if DEBUG_FS
2626       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2627                   "Result from datastore filtered by bloomfilter.\n");
2628 #endif
2629       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2630       return;
2631     }
2632   pgc->results_found++;
2633   if ( (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
2634        (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
2635        (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) )
2636     {
2637       if (pgc->bf == NULL)
2638         {
2639           pgc->bf_size = 32;
2640           pgc->bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
2641                                                        pgc->bf_size, 
2642                                                        BLOOMFILTER_K);
2643         }
2644       GNUNET_CONTAINER_bloomfilter_add (pgc->bf, 
2645                                         &mhash);
2646     }
2647
2648   reply = GNUNET_malloc (sizeof (struct PutMessage) + size);
2649   reply->header.size = htons (sizeof (struct PutMessage) + size);
2650   reply->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2651   reply->type = htonl (type);
2652   reply->expiration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (expiration));
2653   memcpy (&reply[1], data, size);
2654   GNUNET_CORE_notify_transmit_ready (core,
2655                                      pgc->priority,
2656                                      ACCEPTABLE_REPLY_DELAY,
2657                                      &pgc->reply_to,
2658                                      sizeof (struct PutMessage) + size,
2659                                      &transmit_message,
2660                                      reply);
2661   if ( (GNUNET_YES == test_load_too_high()) ||
2662        (pgc->results_found > 5 + 2 * pgc->priority) )
2663     {
2664       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
2665       pgc->policy &= ~ ROUTING_POLICY_FORWARD;
2666       return;
2667     }
2668   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2669 }
2670   
2671
2672 /**
2673  * We're processing a GET request from another peer.  Give it to our
2674  * local datastore.
2675  *
2676  * @param cls our "struct ProcessGetContext"
2677  * @param ok did we get a datastore slice or not?
2678  */
2679 static void
2680 ds_get_request (void *cls, 
2681                 int ok)
2682 {
2683   struct ProcessGetContext *pgc = cls;
2684   uint32_t type;
2685   struct GNUNET_TIME_Relative timeout;
2686
2687   if (GNUNET_OK != ok)
2688     {
2689       /* no point in doing P2P stuff if we can't even do local */
2690       GNUNET_free (dsh);
2691       return;
2692     }
2693   type = pgc->type;
2694   if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK)
2695     type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */
2696   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
2697                                            (pgc->priority + 1));
2698   GNUNET_DATASTORE_get (dsh,
2699                         &pgc->query,
2700                         type,
2701                         &process_p2p_get_result,
2702                         pgc,
2703                         timeout);
2704 }
2705
2706
2707 /**
2708  * The priority level imposes a bound on the maximum
2709  * value for the ttl that can be requested.
2710  *
2711  * @param ttl_in requested ttl
2712  * @param prio given priority
2713  * @return ttl_in if ttl_in is below the limit,
2714  *         otherwise the ttl-limit for the given priority
2715  */
2716 static int32_t
2717 bound_ttl (int32_t ttl_in, uint32_t prio)
2718 {
2719   unsigned long long allowed;
2720
2721   if (ttl_in <= 0)
2722     return ttl_in;
2723   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
2724   if (ttl_in > allowed)      
2725     {
2726       if (allowed >= (1 << 30))
2727         return 1 << 30;
2728       return allowed;
2729     }
2730   return ttl_in;
2731 }
2732
2733
2734 /**
2735  * We've received a request with the specified
2736  * priority.  Bound it according to how much
2737  * we trust the given peer.
2738  * 
2739  * @param prio_in requested priority
2740  * @param peer the peer making the request
2741  * @return effective priority
2742  */
2743 static uint32_t
2744 bound_priority (uint32_t prio_in,
2745                 const struct GNUNET_PeerIdentity *peer)
2746 {
2747   return 0; // FIXME!
2748 }
2749
2750
2751 /**
2752  * Handle P2P "GET" request.
2753  *
2754  * @param cls closure, always NULL
2755  * @param other the other peer involved (sender or receiver, NULL
2756  *        for loopback messages where we are both sender and receiver)
2757  * @param message the actual message
2758  * @return GNUNET_OK to keep the connection open,
2759  *         GNUNET_SYSERR to close it (signal serious error)
2760  */
2761 static int
2762 handle_p2p_get (void *cls,
2763                 const struct GNUNET_PeerIdentity *other,
2764                 const struct GNUNET_MessageHeader *message)
2765 {
2766   uint16_t msize;
2767   const struct GetMessage *gm;
2768   unsigned int bits;
2769   const GNUNET_HashCode *opt;
2770   struct ProcessGetContext *pgc;
2771   uint32_t bm;
2772   size_t bfsize;
2773   uint32_t ttl_decrement;
2774   double preference;
2775   int net_load_up;
2776   int net_load_down;
2777
2778   msize = ntohs(message->size);
2779   if (msize < sizeof (struct GetMessage))
2780     {
2781       GNUNET_break_op (0);
2782       return GNUNET_SYSERR;
2783     }
2784   gm = (const struct GetMessage*) message;
2785   bm = ntohl (gm->hash_bitmap);
2786   bits = 0;
2787   while (bm > 0)
2788     {
2789       if (1 == (bm & 1))
2790         bits++;
2791       bm >>= 1;
2792     }
2793   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
2794     {
2795       GNUNET_break_op (0);
2796       return GNUNET_SYSERR;
2797     }  
2798   opt = (const GNUNET_HashCode*) &gm[1];
2799   bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
2800   pgc = GNUNET_malloc (sizeof (struct ProcessGetContext));
2801   if (bfsize > 0)
2802     {
2803       pgc->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &pgc[1],
2804                                                    bfsize,
2805                                                    BLOOMFILTER_K);
2806       pgc->bf_size = bfsize;
2807     }
2808   pgc->type = ntohl (gm->type);
2809   pgc->bm = ntohl (gm->hash_bitmap);
2810   pgc->mingle = gm->filter_mutator;
2811   bits = 0;
2812   if (0 != (pgc->bm & GET_MESSAGE_BIT_RETURN_TO))
2813     pgc->reply_to.hashPubKey = opt[bits++];
2814   else
2815     pgc->reply_to = *other;
2816   if (0 != (pgc->bm & GET_MESSAGE_BIT_SKS_NAMESPACE))
2817     pgc->namespace = opt[bits++];
2818   else if (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)
2819     {
2820       GNUNET_break_op (0);
2821       GNUNET_free (pgc);
2822       return GNUNET_SYSERR;
2823     }
2824   if (0 != (pgc->bm & GET_MESSAGE_BIT_TRANSMIT_TO))
2825     pgc->prime_target.hashPubKey = opt[bits++];
2826   /* note that we can really only check load here since otherwise
2827      peers could find out that we are overloaded by being disconnected
2828      after sending us a malformed query... */
2829   if (GNUNET_YES == test_load_too_high ())
2830     {
2831       if (NULL != pgc->bf)
2832         GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2833       GNUNET_free (pgc);
2834 #if DEBUG_FS
2835       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2836                   "Dropping query from `%s', this peer is too busy.\n",
2837                   GNUNET_i2s (other));
2838 #endif
2839       return GNUNET_OK;
2840     }
2841   net_load_up = 50; // FIXME
2842   net_load_down = 50; // FIXME
2843   pgc->policy = ROUTING_POLICY_NONE;
2844   if ( (net_load_up < IDLE_LOAD_THRESHOLD) &&
2845        (net_load_down < IDLE_LOAD_THRESHOLD) )
2846     {
2847       pgc->policy |= ROUTING_POLICY_ALL;
2848       pgc->priority = 0; /* no charge */
2849     }
2850   else
2851     {
2852       pgc->priority = bound_priority (ntohl (gm->priority), other);
2853       if ( (net_load_up < 
2854             IDLE_LOAD_THRESHOLD + pgc->priority * pgc->priority) &&
2855            (net_load_down < 
2856             IDLE_LOAD_THRESHOLD + pgc->priority * pgc->priority) )
2857         {
2858           pgc->policy |= ROUTING_POLICY_ALL;
2859         }
2860       else
2861         {
2862           // FIXME: is this sound?
2863           if (net_load_up < 90 + 10 * pgc->priority)
2864             pgc->policy |= ROUTING_POLICY_FORWARD;
2865           if (net_load_down < 90 + 10 * pgc->priority)
2866             pgc->policy |= ROUTING_POLICY_ANSWER;
2867         }
2868     }
2869   if (pgc->policy == ROUTING_POLICY_NONE)
2870     {
2871 #if DEBUG_FS
2872       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2873                   "Dropping query from `%s', network saturated.\n",
2874                   GNUNET_i2s (other));
2875 #endif
2876       if (NULL != pgc->bf)
2877         GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2878       GNUNET_free (pgc);
2879       return GNUNET_OK;     /* drop */
2880     }
2881   if ((pgc->policy & ROUTING_POLICY_INDIRECT) != ROUTING_POLICY_INDIRECT)
2882     pgc->priority = 0;  /* kill the priority (we cannot benefit) */
2883   pgc->ttl = bound_ttl (ntohl (gm->ttl), pgc->priority);
2884   /* decrement ttl (always) */
2885   ttl_decrement = 2 * TTL_DECREMENT +
2886     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2887                               TTL_DECREMENT);
2888   if ( (pgc->ttl < 0) &&
2889        (pgc->ttl - ttl_decrement > 0) )
2890     {
2891 #if DEBUG_FS
2892       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2893                   "Dropping query from `%s' due to TTL underflow.\n",
2894                   GNUNET_i2s (other));
2895 #endif
2896       /* integer underflow => drop (should be very rare)! */
2897       if (NULL != pgc->bf)
2898         GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2899       GNUNET_free (pgc);
2900       return GNUNET_OK;
2901     }
2902   pgc->ttl -= ttl_decrement;
2903   pgc->start_time = GNUNET_TIME_absolute_get ();
2904   preference = (double) pgc->priority;
2905   if (preference < QUERY_BANDWIDTH_VALUE)
2906     preference = QUERY_BANDWIDTH_VALUE;
2907   // FIXME: also reserve bandwidth for reply?
2908   GNUNET_CORE_peer_configure (core,
2909                               other,
2910                               GNUNET_TIME_UNIT_FOREVER_REL,
2911                               0, 0, preference, NULL, NULL);
2912   if (0 != (pgc->policy & ROUTING_POLICY_ANSWER))
2913     pgc->drq = queue_ds_request (BASIC_DATASTORE_REQUEST_DELAY,
2914                                  &ds_get_request,
2915                                  pgc);
2916   else
2917     GNUNET_SCHEDULER_add_continuation (sched,
2918                                        GNUNET_NO,
2919                                        &forward_get_request,
2920                                        pgc,
2921                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
2922   return GNUNET_OK;
2923 }
2924
2925
2926 /**
2927  * Function called to notify us that we can now transmit a reply to a
2928  * client or peer.  "buf" will be NULL and "size" zero if the socket was
2929  * closed for writing in the meantime.
2930  *
2931  * @param cls closure, points to a "struct PendingRequest*" with
2932  *            one or more pending replies
2933  * @param size number of bytes available in buf
2934  * @param buf where the callee should write the message
2935  * @return number of bytes written to buf
2936  */
2937 static size_t
2938 transmit_result (void *cls,
2939                  size_t size, 
2940                  void *buf)
2941 {
2942   struct PendingRequest *pr = cls;
2943   char *cbuf = buf;
2944   struct PendingReply *reply;
2945   size_t ret;
2946
2947   ret = 0;
2948   while (NULL != (reply = pr->replies_pending))
2949     {
2950       if ( (reply->msize + ret < ret) ||
2951            (reply->msize + ret > size) )
2952         break;
2953       pr->replies_pending = reply->next;
2954       memcpy (&cbuf[ret], &reply[1], reply->msize);
2955       ret += reply->msize;
2956       GNUNET_free (pr);
2957     }
2958   return 0;
2959 }
2960
2961
2962 /**
2963  * Iterator over pending requests.
2964  *
2965  * @param cls response (struct ProcessReplyClosure)
2966  * @param key our query
2967  * @param value value in the hash map (meta-info about the query)
2968  * @return GNUNET_YES (we should continue to iterate)
2969  */
2970 static int
2971 process_reply (void *cls,
2972                const GNUNET_HashCode * key,
2973                void *value)
2974 {
2975   struct ProcessReplyClosure *prq = cls;
2976   struct PendingRequest *pr = value;
2977   struct PendingRequest *eer;
2978   struct PendingReply *reply;
2979   struct PutMessage *pm;
2980   struct ContentMessage *cm;
2981   GNUNET_HashCode chash;
2982   GNUNET_HashCode mhash;
2983   struct GNUNET_PeerIdentity target;
2984   size_t msize;
2985   uint32_t prio;
2986   struct GNUNET_TIME_Relative max_delay;
2987   
2988   GNUNET_CRYPTO_hash (prq->data,
2989                       prq->size,
2990                       &chash);
2991   switch (prq->type)
2992     {
2993     case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
2994     case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
2995       break;
2996     case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
2997       /* FIXME: does prq->namespace match our expectations? */
2998       /* then: fall-through??? */
2999     case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
3000       if (pr->bf != NULL) 
3001         {
3002           mingle_hash (&chash, pr->mingle, &mhash);
3003           if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (pr->bf,
3004                                                                &mhash))
3005             return GNUNET_YES; /* duplicate */
3006           GNUNET_CONTAINER_bloomfilter_add (pr->bf,
3007                                             &mhash);
3008         }
3009       break;
3010     case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK:
3011       // FIXME: any checks against duplicates for SKBlocks?
3012       break;
3013     }
3014   prio = pr->priority;
3015   prq->priority += pr->remaining_priority;
3016   pr->remaining_priority = 0;
3017   if (pr->client != NULL)
3018     {
3019       if (pr->replies_seen_size == pr->replies_seen_off)
3020         GNUNET_array_grow (pr->replies_seen,
3021                            pr->replies_seen_size,
3022                            pr->replies_seen_size * 2 + 4);
3023       pr->replies_seen[pr->replies_seen_off++] = chash;
3024       // FIXME: possibly recalculate BF!
3025     }
3026   if (pr->client == NULL)
3027     {
3028       msize = sizeof (struct ContentMessage) + prq->size;
3029       reply = GNUNET_malloc (msize + sizeof (struct PendingReply));
3030       reply->msize = msize;
3031       cm = (struct ContentMessage*) &reply[1];
3032       cm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT);
3033       cm->header.size = htons (msize);
3034       cm->type = htonl (prq->type);
3035       cm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3036       reply->next = pr->replies_pending;
3037       pr->replies_pending = reply;
3038       memcpy (&reply[1], prq->data, prq->size);
3039       if (pr->cth != NULL)
3040         return GNUNET_YES;
3041       max_delay = GNUNET_TIME_UNIT_FOREVER_REL;
3042       if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) >= max_pending_requests)
3043         {
3044           /* estimate expiration time from time difference between
3045              first request that will be discarded and this request */
3046           eer = GNUNET_CONTAINER_heap_peek (requests_by_expiration);
3047           max_delay = GNUNET_TIME_absolute_get_difference (pr->start_time,
3048                                                            eer->start_time);
3049         }
3050       GNUNET_PEER_resolve (pr->source_pid,
3051                            &target);
3052       pr->cth = GNUNET_CORE_notify_transmit_ready (core,
3053                                                    prio,
3054                                                    max_delay,
3055                                                    &target,
3056                                                    msize,
3057                                                    &transmit_result,
3058                                                    pr);
3059       if (NULL == pr->cth)
3060         {
3061           // FIXME: now what? discard?
3062         }
3063     }
3064   else
3065     {
3066       msize = sizeof (struct PutMessage) + prq->size;
3067       reply = GNUNET_malloc (msize + sizeof (struct PendingReply));
3068       reply->msize = msize;
3069       reply->next = pr->replies_pending;
3070       pm = (struct PutMessage*) &reply[1];
3071       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3072       pm->header.size = htons (msize);
3073       pm->type = htonl (prq->type);
3074       pm->expiration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (prq->expiration));
3075       pr->replies_pending = reply;
3076       memcpy (&reply[1], prq->data, prq->size);
3077       if (pr->th != NULL)
3078         return GNUNET_YES;
3079       pr->th = GNUNET_SERVER_notify_transmit_ready (pr->client,
3080                                                     msize,
3081                                                     GNUNET_TIME_UNIT_FOREVER_REL,
3082                                                     &transmit_result,
3083                                                     pr);
3084       if (pr->th == NULL)
3085         {
3086           // FIXME: need to try again later (not much
3087           // to do here specifically, but we need to
3088           // check somewhere else to handle this case!)
3089         }
3090     }
3091   // FIXME: implement hot-path routing statistics keeping!
3092   return GNUNET_YES;
3093 }
3094
3095
3096 /**
3097  * Check if the given KBlock is well-formed.
3098  *
3099  * @param kb the kblock data (or at least "dsize" bytes claiming to be one)
3100  * @param dsize size of "kb" in bytes; check for < sizeof(struct KBlock)!
3101  * @param query where to store the query that this block answers
3102  * @return GNUNET_OK if this is actually a well-formed KBlock
3103  */
3104 static int
3105 check_kblock (const struct KBlock *kb,
3106               size_t dsize,
3107               GNUNET_HashCode *query)
3108 {
3109   if (dsize < sizeof (struct KBlock))
3110     {
3111       GNUNET_break_op (0);
3112       return GNUNET_SYSERR;
3113     }
3114   if (dsize - sizeof (struct KBlock) !=
3115       ntohs (kb->purpose.size) 
3116       - sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) 
3117       - sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) ) 
3118     {
3119       GNUNET_break_op (0);
3120       return GNUNET_SYSERR;
3121     }
3122   if (GNUNET_OK !=
3123       GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_KBLOCK,
3124                                 &kb->purpose,
3125                                 &kb->signature,
3126                                 &kb->keyspace)) 
3127     {
3128       GNUNET_break_op (0);
3129       return GNUNET_SYSERR;
3130     }
3131   if (query != NULL)
3132     GNUNET_CRYPTO_hash (&kb->keyspace,
3133                         sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
3134                         query);
3135   return GNUNET_OK;
3136 }
3137
3138
3139 /**
3140  * Check if the given SBlock is well-formed.
3141  *
3142  * @param sb the sblock data (or at least "dsize" bytes claiming to be one)
3143  * @param dsize size of "kb" in bytes; check for < sizeof(struct SBlock)!
3144  * @param query where to store the query that this block answers
3145  * @param namespace where to store the namespace that this block belongs to
3146  * @return GNUNET_OK if this is actually a well-formed SBlock
3147  */
3148 static int
3149 check_sblock (const struct SBlock *sb,
3150               size_t dsize,
3151               GNUNET_HashCode *query,   
3152               GNUNET_HashCode *namespace)
3153 {
3154   if (dsize < sizeof (struct SBlock))
3155     {
3156       GNUNET_break_op (0);
3157       return GNUNET_SYSERR;
3158     }
3159   if (dsize !=
3160       ntohs (sb->purpose.size) + sizeof (struct GNUNET_CRYPTO_RsaSignature))
3161     {
3162       GNUNET_break_op (0);
3163       return GNUNET_SYSERR;
3164     }
3165   if (GNUNET_OK !=
3166       GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_SBLOCK,
3167                                 &sb->purpose,
3168                                 &sb->signature,
3169                                 &sb->subspace)) 
3170     {
3171       GNUNET_break_op (0);
3172       return GNUNET_SYSERR;
3173     }
3174   if (query != NULL)
3175     *query = sb->identifier;
3176   if (namespace != NULL)
3177     GNUNET_CRYPTO_hash (&sb->subspace,
3178                         sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
3179                         namespace);
3180   return GNUNET_OK;
3181 }
3182
3183
3184 /**
3185  * Handle P2P "PUT" request.
3186  *
3187  * @param cls closure, always NULL
3188  * @param other the other peer involved (sender or receiver, NULL
3189  *        for loopback messages where we are both sender and receiver)
3190  * @param message the actual message
3191  * @return GNUNET_OK to keep the connection open,
3192  *         GNUNET_SYSERR to close it (signal serious error)
3193  */
3194 static int
3195 handle_p2p_put (void *cls,
3196                 const struct GNUNET_PeerIdentity *other,
3197                 const struct GNUNET_MessageHeader *message)
3198 {
3199   const struct PutMessage *put;
3200   uint16_t msize;
3201   size_t dsize;
3202   uint32_t type;
3203   struct GNUNET_TIME_Absolute expiration;
3204   GNUNET_HashCode query;
3205   struct ProcessReplyClosure prq;
3206
3207   msize = ntohs (message->size);
3208   if (msize < sizeof (struct PutMessage))
3209     {
3210       GNUNET_break_op(0);
3211       return GNUNET_SYSERR;
3212     }
3213   put = (const struct PutMessage*) message;
3214   dsize = msize - sizeof (struct PutMessage);
3215   type = ntohl (put->type);
3216   expiration = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (put->expiration));
3217
3218   /* first, validate! */
3219   switch (type)
3220     {
3221     case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
3222     case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
3223       GNUNET_CRYPTO_hash (&put[1], dsize, &query);
3224       break;
3225     case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
3226       if (GNUNET_OK !=
3227           check_kblock ((const struct KBlock*) &put[1],
3228                         dsize,
3229                         &query))
3230         return GNUNET_SYSERR;
3231       break;
3232     case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
3233       if (GNUNET_OK !=
3234           check_sblock ((const struct SBlock*) &put[1],
3235                         dsize,
3236                         &query,
3237                         &prq.namespace))
3238         return GNUNET_SYSERR;
3239       break;
3240     case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK:
3241       // FIXME -- validate SKBLOCK!
3242       GNUNET_break (0);
3243       return GNUNET_OK;
3244     default:
3245       /* unknown block type */
3246       GNUNET_break_op (0);
3247       return GNUNET_SYSERR;
3248     }
3249
3250   /* now, lookup 'query' */
3251   prq.data = (const void*) &put[1];
3252   prq.size = dsize;
3253   prq.type = type;
3254   prq.expiration = expiration;
3255   prq.priority = 0;
3256   GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_query,
3257                                               &query,
3258                                               &process_reply,
3259                                               &prq);
3260   // FIXME: if migration is on and load is low,
3261   // queue to store data in datastore;
3262   // use "prq.priority" for that!
3263   return GNUNET_OK;
3264 }
3265
3266
3267 /**
3268  * List of handlers for P2P messages
3269  * that we care about.
3270  */
3271 static struct GNUNET_CORE_MessageHandler p2p_handlers[] =
3272   {
3273     { &handle_p2p_get, 
3274       GNUNET_MESSAGE_TYPE_FS_GET, 0 },
3275     { &handle_p2p_put, 
3276       GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
3277     { NULL, 0, 0 }
3278   };
3279
3280
3281 /**
3282  * Task that will try to initiate a connection with the
3283  * core service.
3284  * 
3285  * @param cls unused
3286  * @param tc unused
3287  */
3288 static void
3289 core_connect_task (void *cls,
3290                    const struct GNUNET_SCHEDULER_TaskContext *tc);
3291
3292
3293 /**
3294  * Function called by the core after we've
3295  * connected.
3296  *
3297  * @param cls closure, unused
3298  * @param server handle to the core service
3299  * @param my_identity our peer identity (unused)
3300  * @param publicKey our public key (unused)
3301  */
3302 static void
3303 core_start_cb (void *cls,
3304                struct GNUNET_CORE_Handle * server,
3305                const struct GNUNET_PeerIdentity *
3306                my_identity,
3307                const struct
3308                GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *
3309                publicKey)
3310 {
3311   if (server == NULL)
3312     {
3313       GNUNET_SCHEDULER_add_delayed (sched,
3314                                     GNUNET_NO,
3315                                     GNUNET_SCHEDULER_PRIORITY_HIGH,
3316                                     GNUNET_SCHEDULER_NO_TASK,
3317                                     GNUNET_TIME_UNIT_SECONDS,
3318                                     &core_connect_task,
3319                                     NULL);
3320       return;
3321     }
3322   core = server;
3323 }
3324
3325
3326 /**
3327  * Task that will try to initiate a connection with the
3328  * core service.
3329  * 
3330  * @param cls unused
3331  * @param tc unused
3332  */
3333 static void
3334 core_connect_task (void *cls,
3335                    const struct GNUNET_SCHEDULER_TaskContext *tc)
3336 {
3337   GNUNET_CORE_connect (sched,
3338                        cfg,
3339                        GNUNET_TIME_UNIT_FOREVER_REL,
3340                        NULL,
3341                        &core_start_cb,
3342                        &peer_connect_handler,
3343                        &peer_disconnect_handler,
3344                        NULL, 
3345                        NULL, GNUNET_NO,
3346                        NULL, GNUNET_NO,
3347                        p2p_handlers);
3348 }
3349
3350
3351 /**
3352  * Process fs requests.
3353  *
3354  * @param cls closure
3355  * @param s scheduler to use
3356  * @param server the initialized server
3357  * @param c configuration to use
3358  */
3359 static void
3360 run (void *cls,
3361      struct GNUNET_SCHEDULER_Handle *s,
3362      struct GNUNET_SERVER_Handle *server,
3363      const struct GNUNET_CONFIGURATION_Handle *c)
3364 {
3365   sched = s;
3366   cfg = c;
3367
3368   ifm = GNUNET_CONTAINER_multihashmap_create (128);
3369   requests_by_query = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
3370   requests_by_peer = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
3371   connected_peers = GNUNET_CONTAINER_multihashmap_create (64);
3372   requests_by_expiration = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
3373   read_index_list ();
3374   dsh = GNUNET_DATASTORE_connect (cfg,
3375                                   sched);
3376   if (NULL == dsh)
3377     {
3378       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3379                   _("Failed to connect to datastore service.\n"));
3380       return;
3381     }
3382   GNUNET_SERVER_disconnect_notify (server, 
3383                                    &handle_client_disconnect,
3384                                    NULL);
3385   GNUNET_SERVER_add_handlers (server, handlers);
3386   core_connect_task (NULL, NULL);
3387   GNUNET_SCHEDULER_add_delayed (sched,
3388                                 GNUNET_YES,
3389                                 GNUNET_SCHEDULER_PRIORITY_IDLE,
3390                                 GNUNET_SCHEDULER_NO_TASK,
3391                                 GNUNET_TIME_UNIT_FOREVER_REL,
3392                                 &shutdown_task,
3393                                 NULL);
3394 }
3395
3396
3397 /**
3398  * The main function for the fs service.
3399  *
3400  * @param argc number of arguments from the command line
3401  * @param argv command line arguments
3402  * @return 0 ok, 1 on error
3403  */
3404 int
3405 main (int argc, char *const *argv)
3406 {
3407   return (GNUNET_OK ==
3408           GNUNET_SERVICE_run (argc,
3409                               argv,
3410                               "fs", &run, NULL, NULL, NULL)) ? 0 : 1;
3411 }
3412
3413 /* end of gnunet-service-fs.c */