bugfixes, including double frees
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs.c
23  * @brief program that provides the file-sharing service
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - fix gazillion of minor FIXME's
28  * - possible major issue: we may queue "gazillions" of (K|S)Blocks for the
29  *   core to transmit to another peer; need to make sure this is bounded overall...
30  * - randomly delay processing for improved anonymity (can wait)
31  * - content migration (put in local DS) (can wait)
32  * - handle some special cases when forwarding replies based on tracked requests (can wait)
33  * - tracking of success correlations for hot-path routing (can wait)
34  * - various load-based actions (can wait)
35  * - validation of KSBLOCKS (can wait)
36  * - remove on-demand blocks if they keep failing (can wait)
37  * - check that we decrement PIDs always where necessary (can wait)
38  * - find out how to use core-pulling instead of pushing (at least for some cases)
39  */
40 #include "platform.h"
41 #include <float.h>
42 #include "gnunet_core_service.h"
43 #include "gnunet_datastore_service.h"
44 #include "gnunet_peer_lib.h"
45 #include "gnunet_protocols.h"
46 #include "gnunet_signatures.h"
47 #include "gnunet_util_lib.h"
48 #include "fs.h"
49
50 #define DEBUG_FS GNUNET_YES
51
52
53 /**
54  * In-memory information about indexed files (also available
55  * on-disk).
56  */
57 struct IndexInfo
58 {
59   
60   /**
61    * This is a linked list.
62    */
63   struct IndexInfo *next;
64
65   /**
66    * Name of the indexed file.  Memory allocated
67    * at the end of this struct (do not free).
68    */
69   const char *filename;
70
71   /**
72    * Context for transmitting confirmation to client,
73    * NULL if we've done this already.
74    */
75   struct GNUNET_SERVER_TransmitContext *tc;
76   
77   /**
78    * Hash of the contents of the file.
79    */
80   GNUNET_HashCode file_id;
81
82 };
83
84
85 /**
86  * Signature of a function that is called whenever a datastore
87  * request can be processed (or an entry put on the queue times out).
88  *
89  * @param cls closure
90  * @param ok GNUNET_OK if DS is ready, GNUNET_SYSERR on timeout
91  */
92 typedef void (*RequestFunction)(void *cls,
93                                 int ok);
94
95
96 /**
97  * Doubly-linked list of our requests for the datastore.
98  */
99 struct DatastoreRequestQueue
100 {
101
102   /**
103    * This is a doubly-linked list.
104    */
105   struct DatastoreRequestQueue *next;
106
107   /**
108    * This is a doubly-linked list.
109    */
110   struct DatastoreRequestQueue *prev;
111
112   /**
113    * Function to call (will issue the request).
114    */
115   RequestFunction req;
116
117   /**
118    * Closure for req.
119    */
120   void *req_cls;
121
122   /**
123    * When should this request time-out because we don't care anymore?
124    */
125   struct GNUNET_TIME_Absolute timeout;
126     
127   /**
128    * ID of task used for signaling timeout.
129    */
130   GNUNET_SCHEDULER_TaskIdentifier task;
131
132 };
133
134
135 /**
136  * Closure for processing START_SEARCH messages from a client.
137  */
138 struct LocalGetContext
139 {
140
141   /**
142    * This is a doubly-linked list.
143    */
144   struct LocalGetContext *next;
145
146   /**
147    * This is a doubly-linked list.
148    */
149   struct LocalGetContext *prev;
150
151   /**
152    * Client that initiated the search.
153    */
154   struct GNUNET_SERVER_Client *client;
155
156   /**
157    * Array of results that we've already received 
158    * (can be NULL).
159    */
160   GNUNET_HashCode *results; 
161
162   /**
163    * Bloomfilter over all results (for fast query construction);
164    * NULL if we don't have any results.
165    *
166    * FIXME: this member is not used, is that OK? If so, it should
167    * be removed!
168    */
169   struct GNUNET_CONTAINER_BloomFilter *results_bf; 
170
171   /**
172    * DS request associated with this operation.
173    */
174   struct DatastoreRequestQueue *req;
175
176   /**
177    * Current result message to transmit to client (or NULL).
178    */
179   struct ContentMessage *result;
180   
181   /**
182    * Type of the content that we're looking for.
183    * 0 for any.
184    */
185   uint32_t type;
186
187   /**
188    * Desired anonymity level.
189    */
190   uint32_t anonymity_level;
191
192   /**
193    * Number of results actually stored in the results array.
194    */
195   unsigned int results_used;
196   
197   /**
198    * Size of the results array in memory.
199    */
200   unsigned int results_size;
201   
202   /**
203    * Size (in bytes) of the 'results_bf' bloomfilter.
204    *
205    * FIXME: this member is not used, is that OK? If so, it should
206    * be removed!
207    */
208   size_t results_bf_size;
209
210   /**
211    * If the request is for a DBLOCK or IBLOCK, this is the identity of
212    * the peer that is known to have a response.  Set to all-zeros if
213    * such a target is not known (note that even if OUR anonymity
214    * level is >0 we may happen to know the responder's identity;
215    * nevertheless, we should probably not use it for a DHT-lookup
216    * or similar blunt actions in order to avoid exposing ourselves).
217    */
218   struct GNUNET_PeerIdentity target;
219
220   /**
221    * If the request is for an SBLOCK, this is the identity of the
222    * pseudonym to which the SBLOCK belongs. 
223    */
224   GNUNET_HashCode namespace;
225
226   /**
227    * Hash of the keyword (aka query) for KBLOCKs; Hash of
228    * the CHK-encoded block for DBLOCKS and IBLOCKS (aka query)
229    * and hash of the identifier XORed with the target for
230    * SBLOCKS (aka query).
231    */
232   GNUNET_HashCode query;
233
234 };
235
236
237 /**
238  * Possible routing policies for an FS-GET request.
239  */
240 enum RoutingPolicy
241   {
242     /**
243      * Simply drop the request.
244      */
245     ROUTING_POLICY_NONE = 0,
246     
247     /**
248      * Answer it if we can from local datastore.
249      */
250     ROUTING_POLICY_ANSWER = 1,
251
252     /**
253      * Forward the request to other peers (if possible).
254      */
255     ROUTING_POLICY_FORWARD = 2,
256
257     /**
258      * Forward to other peers, and ask them to route
259      * the response via ourselves.
260      */
261     ROUTING_POLICY_INDIRECT = 6,
262     
263     /**
264      * Do everything we could possibly do (that would
265      * make sense).
266      */
267     ROUTING_POLICY_ALL = 7
268   };
269
270
271 /**
272  * Internal context we use for our initial processing
273  * of a GET request.
274  */
275 struct ProcessGetContext
276 {
277   /**
278    * The search query (used for datastore lookup).
279    */
280   GNUNET_HashCode query;
281   
282   /**
283    * Which peer we should forward the response to.
284    */
285   struct GNUNET_PeerIdentity reply_to;
286
287   /**
288    * Namespace for the result (only set for SKS requests)
289    */
290   GNUNET_HashCode namespace;
291
292   /**
293    * Peer that we should forward the query to if possible
294    * (since that peer likely has the content).
295    */
296   struct GNUNET_PeerIdentity prime_target;
297
298   /**
299    * When did we receive this request?
300    */
301   struct GNUNET_TIME_Absolute start_time;
302
303   /**
304    * Our entry in the DRQ (non-NULL while we wait for our
305    * turn to interact with the local database).
306    */
307   struct DatastoreRequestQueue *drq;
308
309   /**
310    * Filter used to eliminate duplicate results.  Can be NULL if we
311    * are not yet filtering any results.
312    */
313   struct GNUNET_CONTAINER_BloomFilter *bf;
314
315   /**
316    * Bitmap describing which of the optional
317    * hash codes / peer identities were given to us.
318    */
319   uint32_t bm;
320
321   /**
322    * Desired block type.
323    */
324   uint32_t type;
325
326   /**
327    * Priority of the request.
328    */
329   uint32_t priority;
330
331   /**
332    * Size of the 'bf' (in bytes).
333    */
334   size_t bf_size;
335
336   /**
337    * In what ways are we going to process
338    * the request?
339    */
340   enum RoutingPolicy policy;
341
342   /**
343    * Time-to-live for the request (value
344    * we use).
345    */
346   int32_t ttl;
347
348   /**
349    * Number to mingle hashes for bloom-filter
350    * tests with.
351    */
352   int32_t mingle;
353
354   /**
355    * Number of results that were found so far.
356    */
357   unsigned int results_found;
358 };
359
360
361 /**
362  * Information we keep for each pending reply.
363  */
364 struct PendingReply
365 {
366   /**
367    * This is a linked list.
368    */
369   struct PendingReply *next;
370
371   /**
372    * Size of the reply; actual reply message follows
373    * at the end of this struct.
374    */
375   size_t msize;
376
377 };
378
379
380 /**
381  * All requests from a client are kept in a doubly-linked list.
382  */
383 struct ClientRequestList;
384
385
386 /**
387  * Information we keep for each pending request.  We should try to
388  * keep this struct as small as possible since its memory consumption
389  * is key to how many requests we can have pending at once.
390  */
391 struct PendingRequest
392 {
393
394   /**
395    * ID of a client making a request, NULL if this entry is for a
396    * peer.
397    */
398   struct GNUNET_SERVER_Client *client;
399
400   /**
401    * If this request was made by a client,
402    * this is our entry in the client request
403    * list; otherwise NULL.
404    */
405   struct ClientRequestList *crl_entry;
406
407   /**
408    * If this is a namespace query, pointer to the hash of the public
409    * key of the namespace; otherwise NULL.
410    */
411   GNUNET_HashCode *namespace;
412
413   /**
414    * Bloomfilter we use to filter out replies that we don't care about
415    * (anymore).  NULL as long as we are interested in all replies.
416    */
417   struct GNUNET_CONTAINER_BloomFilter *bf;
418
419   /**
420    * Replies that we have received but were unable to forward yet
421    * (typically non-null only if we have a pending transmission
422    * request with the client or the respective peer).
423    */
424   struct PendingReply *replies_pending;
425
426   /**
427    * Pending transmission request with the core service for the target
428    * peer (for processing of 'replies_pending') or Handle for a
429    * pending query-request for P2P-transmission with the core service.
430    * If non-NULL, this request must be cancelled should this struct be
431    * destroyed!
432    */
433   struct GNUNET_CORE_TransmitHandle *cth;
434
435   /**
436    * Pending transmission request for the target client (for processing of
437    * 'replies_pending').
438    */
439   struct GNUNET_CONNECTION_TransmitHandle *th;
440
441   /**
442    * Hash code of all replies that we have seen so far (only valid
443    * if client is not NULL since we only track replies like this for
444    * our own clients).
445    */
446   GNUNET_HashCode *replies_seen;
447
448   /**
449    * When did we first see this request (form this peer), or, if our
450    * client is initiating, when did we last initiate a search?
451    */
452   struct GNUNET_TIME_Absolute start_time;
453
454   /**
455    * The query that this request is for.
456    */
457   GNUNET_HashCode query;
458
459   /**
460    * The task responsible for transmitting queries
461    * for this request.
462    */
463   GNUNET_SCHEDULER_TaskIdentifier task;
464
465   /**
466    * (Interned) Peer identifier (only valid if "client" is NULL)
467    * that identifies a peer that gave us this request.
468    */
469   GNUNET_PEER_Id source_pid;
470
471   /**
472    * (Interned) Peer identifier that identifies a preferred target
473    * for requests.
474    */
475   GNUNET_PEER_Id target_pid;
476
477   /**
478    * (Interned) Peer identifiers of peers that have already
479    * received our query for this content.
480    */
481   GNUNET_PEER_Id *used_pids;
482
483   /**
484    * Size of the 'bf' (in bytes).
485    */
486   size_t bf_size;
487
488   /**
489    * Desired anonymity level; only valid for requests from a local client.
490    */
491   uint32_t anonymity_level;
492
493   /**
494    * How many entries in "used_pids" are actually valid?
495    */
496   unsigned int used_pids_off;
497
498   /**
499    * How long is the "used_pids" array?
500    */
501   unsigned int used_pids_size;
502
503   /**
504    * How many entries in "replies_seen" are actually valid?
505    */
506   unsigned int replies_seen_off;
507
508   /**
509    * How long is the "replies_seen" array?
510    */
511   unsigned int replies_seen_size;
512   
513   /**
514    * Priority with which this request was made.  If one of our clients
515    * made the request, then this is the current priority that we are
516    * using when initiating the request.  This value is used when
517    * we decide to reward other peers with trust for providing a reply.
518    */
519   uint32_t priority;
520
521   /**
522    * Priority points left for us to spend when forwarding this request
523    * to other peers.
524    */
525   uint32_t remaining_priority;
526
527   /**
528    * Number to mingle hashes for bloom-filter
529    * tests with.
530    */
531   int32_t mingle;
532
533   /**
534    * TTL with which we saw this request (or, if we initiated, TTL that
535    * we used for the request).
536    */
537   int32_t ttl;
538   
539   /**
540    * Type of the content that this request is for.
541    */
542   uint32_t type;
543
544 };
545
546
547 /**
548  * All requests from a client are kept in a doubly-linked list.
549  */
550 struct ClientRequestList
551 {
552   /**
553    * This is a doubly-linked list.
554    */
555   struct ClientRequestList *next;
556
557   /**
558    * This is a doubly-linked list.
559    */ 
560   struct ClientRequestList *prev;
561
562   /**
563    * A request from this client.
564    */
565   struct PendingRequest *req;
566
567   /**
568    * Client list with the head and tail of this DLL.
569    */
570   struct ClientList *cl;
571 };
572
573
574 /**
575  * Linked list of all clients that we are 
576  * currently processing requests for.
577  */
578 struct ClientList
579 {
580
581   /**
582    * This is a linked list.
583    */
584   struct ClientList *next;
585
586   /**
587    * What client is this entry for?
588    */
589   struct GNUNET_SERVER_Client* client;
590
591   /**
592    * Head of the DLL of requests from this client.
593    */
594   struct ClientRequestList *head;
595
596   /**
597    * Tail of the DLL of requests from this client.
598    */
599   struct ClientRequestList *tail;
600
601 };
602
603
604 /**
605  * Closure for "process_reply" function.
606  */
607 struct ProcessReplyClosure
608 {
609   /**
610    * The data for the reply.
611    */
612   const void *data;
613
614   /**
615    * When the reply expires.
616    */
617   struct GNUNET_TIME_Absolute expiration;
618
619   /**
620    * Size of data.
621    */
622   size_t size;
623
624   /**
625    * Namespace that this reply belongs to
626    * (if it is of type SBLOCK).
627    */
628   GNUNET_HashCode namespace;
629
630   /**
631    * Type of the block.
632    */
633   uint32_t type;
634
635   /**
636    * How much was this reply worth to us?
637    */
638   uint32_t priority;
639 };
640
641
642 /**
643  * Information about a peer that we are connected to.
644  * We track data that is useful for determining which
645  * peers should receive our requests.
646  */
647 struct ConnectedPeer
648 {
649
650   /**
651    * List of the last clients for which this peer
652    * successfully answered a query. 
653    */
654   struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
655
656   /**
657    * List of the last PIDs for which
658    * this peer successfully answered a query;
659    * We use 0 to indicate no successful reply.
660    */
661   GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
662
663   /**
664    * Average delay between sending the peer a request and
665    * getting a reply (only calculated over the requests for
666    * which we actually got a reply).   Calculated
667    * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
668    */ 
669   struct GNUNET_TIME_Relative avg_delay;
670
671   /**
672    * Average priority of successful replies.  Calculated
673    * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
674    */
675   double avg_priority;
676
677   /**
678    * The peer's identity.
679    */
680   GNUNET_PEER_Id pid;  
681
682   /**
683    * Number of requests we have currently pending
684    * with this peer (that is, requests that were
685    * transmitted so recently that we would not retransmit
686    * them right now).
687    */
688   unsigned int pending_requests;
689
690   /**
691    * Which offset in "last_p2p_replies" will be updated next?
692    * (we go round-robin).
693    */
694   unsigned int last_p2p_replies_woff;
695
696   /**
697    * Which offset in "last_client_replies" will be updated next?
698    * (we go round-robin).
699    */
700   unsigned int last_client_replies_woff;
701
702 };
703
704
705 /**
706  * Our connection to the datastore.
707  */
708 static struct GNUNET_DATASTORE_Handle *dsh;
709
710 /**
711  * Our scheduler.
712  */
713 static struct GNUNET_SCHEDULER_Handle *sched;
714
715 /**
716  * Our configuration.
717  */
718 const struct GNUNET_CONFIGURATION_Handle *cfg;
719
720 /**
721  * Handle to the core service (NULL until we've connected to it).
722  */
723 struct GNUNET_CORE_Handle *core;
724
725 /**
726  * Head of doubly-linked LGC list.
727  */
728 static struct LocalGetContext *lgc_head;
729
730 /**
731  * Tail of doubly-linked LGC list.
732  */
733 static struct LocalGetContext *lgc_tail;
734
735 /**
736  * Head of request queue for the datastore, sorted by timeout.
737  */
738 static struct DatastoreRequestQueue *drq_head;
739
740 /**
741  * Tail of request queue for the datastore.
742  */
743 static struct DatastoreRequestQueue *drq_tail;
744
745 /**
746  * Linked list of indexed files.
747  */
748 static struct IndexInfo *indexed_files;
749
750 /**
751  * Maps hash over content of indexed files to the respective filename.
752  * The filenames are pointers into the indexed_files linked list and
753  * do not need to be freed.
754  */
755 static struct GNUNET_CONTAINER_MultiHashMap *ifm;
756
757 /**
758  * Map of query hash codes to requests.
759  */
760 static struct GNUNET_CONTAINER_MultiHashMap *requests_by_query;
761
762 /**
763  * Map of peer IDs to requests (for those requests coming
764  * from other peers).
765  */
766 static struct GNUNET_CONTAINER_MultiHashMap *requests_by_peer;
767
768 /**
769  * Linked list of all of our clients and their requests.
770  */
771 static struct ClientList *clients;
772
773 /**
774  * Heap with the request that will expire next at the top.  Contains
775  * pointers of type "struct PendingRequest*"; these will *also* be
776  * aliased from the "requests_by_peer" data structures and the
777  * "requests_by_query" table.  Note that requests from our clients
778  * don't expire and are thus NOT in the "requests_by_expiration"
779  * (or the "requests_by_peer" tables).
780  */
781 static struct GNUNET_CONTAINER_Heap *requests_by_expiration;
782
783 /**
784  * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
785  */
786 static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
787
788 /**
789  * Maximum number of requests (from other peers) that we're
790  * willing to have pending at any given point in time.
791  * FIXME: set from configuration (and 32 is a tiny value for testing only).
792  */
793 static uint64_t max_pending_requests = 32;
794
795 /**
796  * Write the current index information list to disk.
797  */ 
798 static void
799 write_index_list ()
800 {
801   struct GNUNET_BIO_WriteHandle *wh;
802   char *fn;
803   struct IndexInfo *pos;  
804
805   if (GNUNET_OK !=
806       GNUNET_CONFIGURATION_get_value_filename (cfg,
807                                                "FS",
808                                                "INDEXDB",
809                                                &fn))
810     {
811       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
812                   _("Configuration option `%s' in section `%s' missing.\n"),
813                   "INDEXDB",
814                   "FS");
815       return;
816     }
817   wh = GNUNET_BIO_write_open (fn);
818   if (NULL == wh)
819     {
820       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
821                   _("Could not open `%s'.\n"),
822                   fn);
823       GNUNET_free (fn);
824       return;
825     }
826   pos = indexed_files;
827   while (pos != NULL)
828     {
829       if ( (GNUNET_OK !=
830             GNUNET_BIO_write (wh,
831                               &pos->file_id,
832                               sizeof (GNUNET_HashCode))) ||
833            (GNUNET_OK !=
834             GNUNET_BIO_write_string (wh,
835                                      pos->filename)) )
836         break;
837       pos = pos->next;
838     }
839   if (GNUNET_OK != 
840       GNUNET_BIO_write_close (wh))
841     {
842       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
843                   _("Error writing `%s'.\n"),
844                   fn);
845       GNUNET_free (fn);
846       return;
847     }
848   GNUNET_free (fn);
849 }
850
851
852 /**
853  * Read index information from disk.
854  */
855 static void
856 read_index_list ()
857 {
858   struct GNUNET_BIO_ReadHandle *rh;
859   char *fn;
860   struct IndexInfo *pos;  
861   char *fname;
862   GNUNET_HashCode hc;
863   size_t slen;
864   char *emsg;
865
866   if (GNUNET_OK !=
867       GNUNET_CONFIGURATION_get_value_filename (cfg,
868                                                "FS",
869                                                "INDEXDB",
870                                                &fn))
871     {
872       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
873                   _("Configuration option `%s' in section `%s' missing.\n"),
874                   "INDEXDB",
875                   "FS");
876       return;
877     }
878   if (GNUNET_NO == GNUNET_DISK_file_test (fn))
879     {
880       /* no index info  yet */
881       GNUNET_free (fn);
882       return;
883     }
884   rh = GNUNET_BIO_read_open (fn);
885   if (NULL == rh)
886     {
887       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
888                   _("Could not open `%s'.\n"),
889                   fn);
890       GNUNET_free (fn);
891       return;
892     }
893
894   while ( (GNUNET_OK ==
895            GNUNET_BIO_read (rh,
896                             "Hash of indexed file",
897                             &hc,
898                             sizeof (GNUNET_HashCode))) &&
899           (GNUNET_OK ==
900            GNUNET_BIO_read_string (rh, 
901                                    "Name of indexed file",
902                                    &fname,
903                                    1024 * 16)) )
904     {
905       slen = strlen (fname) + 1;
906       pos = GNUNET_malloc (sizeof (struct IndexInfo) + slen);
907       pos->file_id = hc;
908       pos->filename = (const char *) &pos[1];
909       memcpy (&pos[1], fname, slen);
910       if (GNUNET_SYSERR ==
911           GNUNET_CONTAINER_multihashmap_put (ifm,
912                                              &hc,
913                                              (void*) pos->filename,
914                                              GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
915         {
916           GNUNET_free (pos);
917         }
918       else
919         {
920           pos->next = indexed_files;
921           indexed_files = pos;
922         }
923     }
924   if (GNUNET_OK != 
925       GNUNET_BIO_read_close (rh, &emsg))
926     GNUNET_free (emsg);
927   GNUNET_free (fn);
928 }
929
930
931 /**
932  * We've validated the hash of the file we're about to
933  * index.  Signal success to the client and update
934  * our internal data structures.
935  *
936  * @param ii the index info entry for the request
937  */
938 static void
939 signal_index_ok (struct IndexInfo *ii)
940 {
941   if (GNUNET_SYSERR ==
942       GNUNET_CONTAINER_multihashmap_put (ifm,
943                                          &ii->file_id,
944                                          (void*) ii->filename,
945                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
946     {
947       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
948                   _("Index request received for file `%s' is indexed as `%s'.  Permitting anyway.\n"),
949                   ii->filename,
950                   (const char*) GNUNET_CONTAINER_multihashmap_get (ifm,
951                                                                    &ii->file_id));
952       GNUNET_SERVER_transmit_context_append (ii->tc,
953                                              NULL, 0,
954                                              GNUNET_MESSAGE_TYPE_FS_INDEX_START_OK);
955       GNUNET_SERVER_transmit_context_run (ii->tc,
956                                           GNUNET_TIME_UNIT_MINUTES);
957       GNUNET_free (ii);
958       return;
959     }
960   ii->next = indexed_files;
961   indexed_files = ii;
962   write_index_list ();
963   GNUNET_SERVER_transmit_context_append (ii->tc,
964                                          NULL, 0,
965                                          GNUNET_MESSAGE_TYPE_FS_INDEX_START_OK);
966   GNUNET_SERVER_transmit_context_run (ii->tc,
967                                       GNUNET_TIME_UNIT_MINUTES);
968   ii->tc = NULL;
969 }
970
971
972 /**
973  * Function called once the hash computation over an
974  * indexed file has completed.
975  *
976  * @param cls closure, our publishing context
977  * @param res resulting hash, NULL on error
978  */
979 static void 
980 hash_for_index_val (void *cls,
981                     const GNUNET_HashCode *
982                     res)
983 {
984   struct IndexInfo *ii = cls;
985   
986   if ( (res == NULL) ||
987        (0 != memcmp (res,
988                      &ii->file_id,
989                      sizeof(GNUNET_HashCode))) )
990     {
991       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
992                   _("Hash mismatch trying to index file `%s'\n"),
993                   ii->filename);
994       GNUNET_SERVER_transmit_context_append (ii->tc,
995                                              NULL, 0,
996                                              GNUNET_MESSAGE_TYPE_FS_INDEX_START_FAILED);
997       GNUNET_SERVER_transmit_context_run (ii->tc,
998                                           GNUNET_TIME_UNIT_MINUTES);
999       GNUNET_free (ii);
1000       return;
1001     }
1002   signal_index_ok (ii);
1003 }
1004
1005
1006 /**
1007  * Handle INDEX_START-message.
1008  *
1009  * @param cls closure
1010  * @param client identification of the client
1011  * @param message the actual message
1012  */
1013 static void
1014 handle_index_start (void *cls,
1015                     struct GNUNET_SERVER_Client *client,
1016                     const struct GNUNET_MessageHeader *message)
1017 {
1018   const struct IndexStartMessage *ism;
1019   const char *fn;
1020   uint16_t msize;
1021   struct IndexInfo *ii;
1022   size_t slen;
1023   uint32_t dev;
1024   uint64_t ino;
1025   uint32_t mydev;
1026   uint64_t myino;
1027
1028   msize = ntohs(message->size);
1029   if ( (msize <= sizeof (struct IndexStartMessage)) ||
1030        ( ((const char *)message)[msize-1] != '\0') )
1031     {
1032       GNUNET_break (0);
1033       GNUNET_SERVER_receive_done (client,
1034                                   GNUNET_SYSERR);
1035       return;
1036     }
1037   ism = (const struct IndexStartMessage*) message;
1038   fn = (const char*) &ism[1];
1039   dev = ntohl (ism->device);
1040   ino = GNUNET_ntohll (ism->inode);
1041   ism = (const struct IndexStartMessage*) message;
1042   slen = strlen (fn) + 1;
1043   ii = GNUNET_malloc (sizeof (struct IndexInfo) + slen);
1044   ii->filename = (const char*) &ii[1];
1045   memcpy (&ii[1], fn, slen);
1046   ii->file_id = ism->file_id;  
1047   ii->tc = GNUNET_SERVER_transmit_context_create (client);
1048   if ( ( (dev != 0) ||
1049          (ino != 0) ) &&
1050        (GNUNET_OK == GNUNET_DISK_file_get_identifiers (fn,
1051                                                        &mydev,
1052                                                        &myino)) &&
1053        ( (dev == mydev) &&
1054          (ino == myino) ) )
1055     {      
1056       /* fast validation OK! */
1057       signal_index_ok (ii);
1058       return;
1059     }
1060   /* slow validation, need to hash full file (again) */
1061   GNUNET_CRYPTO_hash_file (sched,
1062                            GNUNET_SCHEDULER_PRIORITY_IDLE,
1063                            GNUNET_NO,
1064                            fn,
1065                            HASHING_BLOCKSIZE,
1066                            &hash_for_index_val,
1067                            ii);
1068 }
1069
1070
1071 /**
1072  * Handle INDEX_LIST_GET-message.
1073  *
1074  * @param cls closure
1075  * @param client identification of the client
1076  * @param message the actual message
1077  */
1078 static void
1079 handle_index_list_get (void *cls,
1080                        struct GNUNET_SERVER_Client *client,
1081                        const struct GNUNET_MessageHeader *message)
1082 {
1083   struct GNUNET_SERVER_TransmitContext *tc;
1084   struct IndexInfoMessage *iim;
1085   char buf[GNUNET_SERVER_MAX_MESSAGE_SIZE];
1086   size_t slen;
1087   const char *fn;
1088   struct GNUNET_MessageHeader *msg;
1089   struct IndexInfo *pos;
1090
1091   tc = GNUNET_SERVER_transmit_context_create (client);
1092   iim = (struct IndexInfoMessage*) buf;
1093   msg = &iim->header;
1094   pos = indexed_files;
1095   while (NULL != pos)
1096     {
1097       iim->reserved = 0;
1098       iim->file_id = pos->file_id;
1099       fn = pos->filename;
1100       slen = strlen (fn) + 1;
1101       if (slen + sizeof (struct IndexInfoMessage) > 
1102           GNUNET_SERVER_MAX_MESSAGE_SIZE)
1103         {
1104           GNUNET_break (0);
1105           break;
1106         }
1107       memcpy (&iim[1], fn, slen);
1108       GNUNET_SERVER_transmit_context_append
1109         (tc,
1110          &msg[1],
1111          sizeof (struct IndexInfoMessage) 
1112          - sizeof (struct GNUNET_MessageHeader) + slen,
1113          GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_ENTRY);
1114       pos = pos->next;
1115     }
1116   GNUNET_SERVER_transmit_context_append (tc,
1117                                          NULL, 0,
1118                                          GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_END);
1119   GNUNET_SERVER_transmit_context_run (tc,
1120                                       GNUNET_TIME_UNIT_MINUTES);
1121 }
1122
1123
1124 /**
1125  * Handle UNINDEX-message.
1126  *
1127  * @param cls closure
1128  * @param client identification of the client
1129  * @param message the actual message
1130  */
1131 static void
1132 handle_unindex (void *cls,
1133                 struct GNUNET_SERVER_Client *client,
1134                 const struct GNUNET_MessageHeader *message)
1135 {
1136   const struct UnindexMessage *um;
1137   struct IndexInfo *pos;
1138   struct IndexInfo *prev;
1139   struct IndexInfo *next;
1140   struct GNUNET_SERVER_TransmitContext *tc;
1141   int found;
1142   
1143   um = (const struct UnindexMessage*) message;
1144   found = GNUNET_NO;
1145   prev = NULL;
1146   pos = indexed_files;
1147   while (NULL != pos)
1148     {
1149       next = pos->next;
1150       if (0 == memcmp (&pos->file_id,
1151                        &um->file_id,
1152                        sizeof (GNUNET_HashCode)))
1153         {
1154           if (prev == NULL)
1155             indexed_files = pos->next;
1156           else
1157             prev->next = pos->next;
1158           GNUNET_free (pos);
1159           found = GNUNET_YES;
1160         }
1161       else
1162         {
1163           prev = pos;
1164         }
1165       pos = next;
1166     }
1167   if (GNUNET_YES == found)
1168     write_index_list ();
1169   tc = GNUNET_SERVER_transmit_context_create (client);
1170   GNUNET_SERVER_transmit_context_append (tc,
1171                                          NULL, 0,
1172                                          GNUNET_MESSAGE_TYPE_FS_UNINDEX_OK);
1173   GNUNET_SERVER_transmit_context_run (tc,
1174                                       GNUNET_TIME_UNIT_MINUTES);
1175 }
1176
1177
1178 /**
1179  * Run the next DS request in our
1180  * queue, we're done with the current one.
1181  */
1182 static void
1183 next_ds_request ()
1184 {
1185   struct DatastoreRequestQueue *e;
1186   
1187   while (NULL != (e = drq_head))
1188     {
1189       if (0 != GNUNET_TIME_absolute_get_remaining (e->timeout).value)
1190         break;
1191       if (e->task != GNUNET_SCHEDULER_NO_TASK)
1192         GNUNET_SCHEDULER_cancel (sched, e->task);
1193       GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
1194       e->req (e->req_cls, GNUNET_NO);
1195       GNUNET_free (e);  
1196     }
1197   if (e == NULL)
1198     return;
1199   if (e->task != GNUNET_SCHEDULER_NO_TASK)
1200     GNUNET_SCHEDULER_cancel (sched, e->task);
1201   e->task = GNUNET_SCHEDULER_NO_TASK;
1202   e->req (e->req_cls, GNUNET_YES);
1203   GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
1204   GNUNET_free (e);  
1205 }
1206
1207
1208 /**
1209  * A datastore request had to be timed out. 
1210  *
1211  * @param cls closure (of type "struct DatastoreRequestQueue*")
1212  * @param tc task context, unused
1213  */
1214 static void
1215 timeout_ds_request (void *cls,
1216                     const struct GNUNET_SCHEDULER_TaskContext *tc)
1217 {
1218   struct DatastoreRequestQueue *e = cls;
1219
1220   e->task = GNUNET_SCHEDULER_NO_TASK;
1221   GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
1222   e->req (e->req_cls, GNUNET_NO);
1223   GNUNET_free (e);  
1224 }
1225
1226
1227 /**
1228  * Queue a request for the datastore.
1229  *
1230  * @param deadline by when the request should run
1231  * @param fun function to call once the request can be run
1232  * @param fun_cls closure for fun
1233  */
1234 static struct DatastoreRequestQueue *
1235 queue_ds_request (struct GNUNET_TIME_Relative deadline,
1236                   RequestFunction fun,
1237                   void *fun_cls)
1238 {
1239   struct DatastoreRequestQueue *e;
1240   struct DatastoreRequestQueue *bef;
1241
1242   if (drq_head == NULL)
1243     {
1244       /* no other requests pending, run immediately */
1245       fun (fun_cls, GNUNET_OK);
1246       return NULL;
1247     }
1248   e = GNUNET_malloc (sizeof (struct DatastoreRequestQueue));
1249   e->timeout = GNUNET_TIME_relative_to_absolute (deadline);
1250   e->req = fun;
1251   e->req_cls = fun_cls;
1252   if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
1253     {
1254       /* local request, highest prio, put at head of queue
1255          regardless of deadline */
1256       bef = NULL;
1257     }
1258   else
1259     {
1260       bef = drq_tail;
1261       while ( (NULL != bef) &&
1262               (e->timeout.value < bef->timeout.value) )
1263         bef = bef->prev;
1264     }
1265   GNUNET_CONTAINER_DLL_insert_after (drq_head, drq_tail, bef, e);
1266   if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
1267     return e;
1268   e->task = GNUNET_SCHEDULER_add_delayed (sched,
1269                                           GNUNET_NO,
1270                                           GNUNET_SCHEDULER_PRIORITY_BACKGROUND,
1271                                           GNUNET_SCHEDULER_NO_TASK,
1272                                           deadline,
1273                                           &timeout_ds_request,
1274                                           e);
1275   return e;                                    
1276 }
1277
1278
1279 /**
1280  * Free the state associated with a local get context.
1281  *
1282  * @param lgc the lgc to free
1283  */
1284 static void
1285 local_get_context_free (struct LocalGetContext *lgc) 
1286 {
1287   GNUNET_CONTAINER_DLL_remove (lgc_head, lgc_tail, lgc);
1288   GNUNET_SERVER_client_drop (lgc->client); 
1289   GNUNET_free_non_null (lgc->results);
1290   if (lgc->results_bf != NULL)
1291     GNUNET_CONTAINER_bloomfilter_free (lgc->results_bf);
1292   if (lgc->req != NULL)
1293     {
1294       if (lgc->req->task != GNUNET_SCHEDULER_NO_TASK)
1295         GNUNET_SCHEDULER_cancel (sched, lgc->req->task);
1296       GNUNET_CONTAINER_DLL_remove (lgc_head, lgc_tail, lgc);
1297       GNUNET_free (lgc->req);
1298     }
1299   GNUNET_free (lgc);
1300 }
1301
1302
1303 /**
1304  * We're able to transmit the next (local) result to the client.
1305  * Do it and ask the datastore for more.  Or, on error, tell
1306  * the datastore to stop giving us more.
1307  *
1308  * @param cls our closure (struct LocalGetContext)
1309  * @param max maximum number of bytes we can transmit
1310  * @param buf where to copy our message
1311  * @return number of bytes copied to buf
1312  */
1313 static size_t
1314 transmit_local_result (void *cls,
1315                        size_t max,
1316                        void *buf)
1317 {
1318   struct LocalGetContext *lgc = cls;  
1319   uint16_t msize;
1320
1321   if (NULL == buf)
1322     {
1323 #if DEBUG_FS
1324       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1325                   "Failed to transmit result to local client, aborting datastore iteration.\n");
1326 #endif
1327       /* error, abort! */
1328       GNUNET_free (lgc->result);
1329       lgc->result = NULL;
1330       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
1331       return 0;
1332     }
1333   msize = ntohs (lgc->result->header.size);
1334 #if DEBUG_FS
1335   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1336               "Transmitting %u bytes of result to local client.\n",
1337               msize);
1338 #endif
1339   GNUNET_assert (max >= msize);
1340   memcpy (buf, lgc->result, msize);
1341   GNUNET_free (lgc->result);
1342   lgc->result = NULL;
1343   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1344   return msize;
1345 }
1346
1347
1348 /**
1349  * Continuation called from datastore's remove
1350  * function.
1351  *
1352  * @param cls unused
1353  * @param success did the deletion work?
1354  * @param msg error message
1355  */
1356 static void
1357 remove_cont (void *cls,
1358              int success,
1359              const char *msg)
1360 {
1361   if (GNUNET_OK != success)
1362     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1363                 _("Failed to delete bogus block: %s\n"),
1364                 msg);
1365   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1366 }
1367
1368
1369 /**
1370  * Mingle hash with the mingle_number to
1371  * produce different bits.
1372  */
1373 static void
1374 mingle_hash (const GNUNET_HashCode * in,
1375              int32_t mingle_number, 
1376              GNUNET_HashCode * hc)
1377 {
1378   GNUNET_HashCode m;
1379
1380   GNUNET_CRYPTO_hash (&mingle_number, 
1381                       sizeof (int32_t), 
1382                       &m);
1383   GNUNET_CRYPTO_hash_xor (&m, in, hc);
1384 }
1385
1386
1387 /**
1388  * We've received an on-demand encoded block
1389  * from the datastore.  Attempt to do on-demand
1390  * encoding and (if successful), call the 
1391  * continuation with the resulting block.  On
1392  * error, clean up and ask the datastore for
1393  * more results.
1394  *
1395  * @param key key for the content
1396  * @param size number of bytes in data
1397  * @param data content stored
1398  * @param type type of the content
1399  * @param priority priority of the content
1400  * @param anonymity anonymity-level for the content
1401  * @param expiration expiration time for the content
1402  * @param uid unique identifier for the datum;
1403  *        maybe 0 if no unique identifier is available
1404  * @param cont function to call with the actual block
1405  * @param cont_cls closure for cont
1406  */
1407 static void
1408 handle_on_demand_block (const GNUNET_HashCode * key,
1409                         uint32_t size,
1410                         const void *data,
1411                         uint32_t type,
1412                         uint32_t priority,
1413                         uint32_t anonymity,
1414                         struct GNUNET_TIME_Absolute
1415                         expiration, uint64_t uid,
1416                         GNUNET_DATASTORE_Iterator cont,
1417                         void *cont_cls)
1418 {
1419   const struct OnDemandBlock *odb;
1420   GNUNET_HashCode nkey;
1421   struct GNUNET_CRYPTO_AesSessionKey skey;
1422   struct GNUNET_CRYPTO_AesInitializationVector iv;
1423   GNUNET_HashCode query;
1424   ssize_t nsize;
1425   char ndata[DBLOCK_SIZE];
1426   char edata[DBLOCK_SIZE];
1427   const char *fn;
1428   struct GNUNET_DISK_FileHandle *fh;
1429   uint64_t off;
1430
1431   if (size != sizeof (struct OnDemandBlock))
1432     {
1433       GNUNET_break (0);
1434       GNUNET_DATASTORE_remove (dsh, 
1435                                key,
1436                                size,
1437                                data,
1438                                &remove_cont,
1439                                NULL,
1440                                GNUNET_TIME_UNIT_FOREVER_REL);     
1441       return;
1442     }
1443   odb = (const struct OnDemandBlock*) data;
1444   off = GNUNET_ntohll (odb->offset);
1445   fn = (const char*) GNUNET_CONTAINER_multihashmap_get (ifm,
1446                                                         &odb->file_id);
1447   fh = NULL;
1448   if ( (NULL == fn) ||
1449        (NULL == (fh = GNUNET_DISK_file_open (fn, 
1450                                              GNUNET_DISK_OPEN_READ,
1451                                              GNUNET_DISK_PERM_NONE))) ||
1452        (off !=
1453         GNUNET_DISK_file_seek (fh,
1454                                off,
1455                                GNUNET_DISK_SEEK_SET)) ||
1456        (-1 ==
1457         (nsize = GNUNET_DISK_file_read (fh,
1458                                         ndata,
1459                                         sizeof (ndata)))) )
1460     {
1461       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1462                   _("Could not access indexed file `%s' at offset %llu: %s\n"),
1463                   GNUNET_h2s (&odb->file_id),
1464                   (unsigned long long) off,
1465                   STRERROR (errno));
1466       if (fh != NULL)
1467         GNUNET_DISK_file_close (fh);
1468       /* FIXME: if this happens often, we need
1469          to remove the OnDemand block from the DS! */
1470       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);        
1471       return;
1472     }
1473   GNUNET_DISK_file_close (fh);
1474   GNUNET_CRYPTO_hash (ndata,
1475                       nsize,
1476                       &nkey);
1477   GNUNET_CRYPTO_hash_to_aes_key (&nkey, &skey, &iv);
1478   GNUNET_CRYPTO_aes_encrypt (ndata,
1479                              nsize,
1480                              &skey,
1481                              &iv,
1482                              edata);
1483   GNUNET_CRYPTO_hash (edata,
1484                       nsize,
1485                       &query);
1486   if (0 != memcmp (&query, 
1487                    key,
1488                    sizeof (GNUNET_HashCode)))
1489     {
1490       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1491                   _("Indexed file `%s' changed at offset %llu\n"),
1492                   fn,
1493                   (unsigned long long) off);
1494       /* FIXME: if this happens often, we need
1495          to remove the OnDemand block from the DS! */
1496       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1497       return;
1498     }
1499   cont (cont_cls,
1500         key,
1501         nsize,
1502         edata,
1503         GNUNET_DATASTORE_BLOCKTYPE_DBLOCK,
1504         priority,
1505         anonymity,
1506         expiration,
1507         uid);
1508 }
1509
1510
1511 /**
1512  * How many bytes should a bloomfilter be if we have already seen
1513  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
1514  * of bits set per entry.  Furthermore, we should not re-size the
1515  * filter too often (to keep it cheap).
1516  *
1517  * Since other peers will also add entries but not resize the filter,
1518  * we should generally pick a slightly larger size than what the
1519  * strict math would suggest.
1520  *
1521  * @return must be a power of two and smaller or equal to 2^15.
1522  */
1523 static size_t
1524 compute_bloomfilter_size (unsigned int entry_count)
1525 {
1526   size_t size;
1527   unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
1528   uint16_t max = 1 << 15;
1529
1530   if (entry_count > max)
1531     return max;
1532   size = 8;
1533   while ((size < max) && (size < ideal))
1534     size *= 2;
1535   if (size > max)
1536     return max;
1537   return size;
1538 }
1539
1540
1541 /**
1542  * Recalculate our bloom filter for filtering replies.
1543  *
1544  * @param count number of entries we are filtering right now
1545  * @param mingle set to our new mingling value
1546  * @param bf_size set to the size of the bloomfilter
1547  * @param entries the entries to filter
1548  * @return updated bloomfilter, NULL for none
1549  */
1550 static struct GNUNET_CONTAINER_BloomFilter *
1551 refresh_bloomfilter (unsigned int count,
1552                      int32_t * mingle,
1553                      size_t *bf_size,
1554                      const GNUNET_HashCode *entries)
1555 {
1556   struct GNUNET_CONTAINER_BloomFilter *bf;
1557   size_t nsize;
1558   unsigned int i;
1559   GNUNET_HashCode mhash;
1560
1561   if (0 == count)
1562     return NULL;
1563   nsize = compute_bloomfilter_size (count);
1564   *mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
1565   *bf_size = nsize;
1566   bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
1567                                           nsize,
1568                                           BLOOMFILTER_K);
1569   for (i=0;i<count;i++)
1570     {
1571       mingle_hash (&entries[i], *mingle, &mhash);
1572       GNUNET_CONTAINER_bloomfilter_add (bf, &mhash);
1573     }
1574   return bf;
1575 }
1576
1577
1578 /**
1579  * Closure used for "target_peer_select_cb".
1580  */
1581 struct PeerSelectionContext 
1582 {
1583   /**
1584    * The request for which we are selecting
1585    * peers.
1586    */
1587   struct PendingRequest *pr;
1588
1589   /**
1590    * Current "prime" target.
1591    */
1592   struct GNUNET_PeerIdentity target;
1593
1594   /**
1595    * How much do we like this target?
1596    */
1597   double target_score;
1598
1599 };
1600
1601
1602 /**
1603  * Function called for each connected peer to determine
1604  * which one(s) would make good targets for forwarding.
1605  *
1606  * @param cls closure (struct PeerSelectionContext)
1607  * @param key current key code (peer identity)
1608  * @param value value in the hash map (struct ConnectedPeer)
1609  * @return GNUNET_YES if we should continue to
1610  *         iterate,
1611  *         GNUNET_NO if not.
1612  */
1613 static int
1614 target_peer_select_cb (void *cls,
1615                        const GNUNET_HashCode * key,
1616                        void *value)
1617 {
1618   struct PeerSelectionContext *psc = cls;
1619   struct ConnectedPeer *cp = value;
1620   struct PendingRequest *pr = psc->pr;
1621   double score;
1622   unsigned int i;
1623
1624   /* 1) check if we have already (recently) forwarded to this peer */
1625   for (i=0;i<pr->used_pids_off;i++)
1626     if (pr->used_pids[i] == cp->pid)
1627       return GNUNET_YES; /* skip */
1628   // 2) calculate how much we'd like to forward to this peer
1629   score = 0; // FIXME!
1630   
1631   /* store best-fit in closure */
1632   if (score > psc->target_score)
1633     {
1634       psc->target_score = score;
1635       psc->target.hashPubKey = *key; 
1636     }
1637   return GNUNET_YES;
1638 }
1639
1640
1641 /**
1642  * We use a random delay to make the timing of requests
1643  * less predictable.  This function returns such a random
1644  * delay.
1645  *
1646  * @return random delay to use for some request, between 0 and TTL_DECREMENT ms
1647  */
1648 static struct GNUNET_TIME_Relative
1649 get_processing_delay ()
1650 {
1651   return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
1652                                         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1653                                                                   TTL_DECREMENT));
1654 }
1655
1656
1657 /**
1658  * Task that is run for each request with the
1659  * goal of forwarding the associated query to
1660  * other peers.  The task should re-schedule
1661  * itself to be re-run once the TTL has expired.
1662  * (or at a later time if more peers should
1663  * be queried earlier).
1664  *
1665  * @param cls the requests "struct PendingRequest*"
1666  * @param tc task context (unused)
1667  */
1668 static void
1669 forward_request_task (void *cls,
1670                       const struct GNUNET_SCHEDULER_TaskContext *tc);
1671
1672
1673 /**
1674  * We've selected a peer for forwarding of a query.
1675  * Construct the message and then re-schedule the
1676  * task to forward again to (other) peers.
1677  *
1678  * @param cls closure
1679  * @param size number of bytes available in buf
1680  * @param buf where the callee should write the message
1681  * @return number of bytes written to buf
1682  */
1683 static size_t
1684 transmit_request_cb (void *cls,
1685                      size_t size, 
1686                      void *buf)
1687 {
1688   struct PendingRequest *pr = cls;
1689   struct GetMessage *gm;
1690   GNUNET_HashCode *ext;
1691   char *bfdata;
1692   size_t msize;
1693   unsigned int k;
1694
1695   pr->cth = NULL;
1696   /* (1) check for timeout */
1697   if (NULL == buf)
1698     {
1699       /* timeout, try another peer immediately again */
1700       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1701                                                GNUNET_NO,
1702                                                GNUNET_SCHEDULER_PRIORITY_IDLE,
1703                                                GNUNET_SCHEDULER_NO_TASK,
1704                                                GNUNET_TIME_UNIT_ZERO,
1705                                                &forward_request_task,
1706                                                pr);
1707       return 0;
1708     }
1709   /* (2) build query message */
1710   k = 0; // FIXME: count hash codes!
1711   msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
1712   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1713   gm = (struct GetMessage*) buf;
1714   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
1715   gm->header.size = htons (msize);
1716   gm->type = htonl (pr->type);
1717   pr->remaining_priority /= 2;
1718   gm->priority = htonl (pr->remaining_priority);
1719   gm->ttl = htonl (pr->ttl);
1720   gm->filter_mutator = htonl(pr->mingle);
1721   gm->hash_bitmap = htonl (42);
1722   gm->query = pr->query;
1723   ext = (GNUNET_HashCode*) &gm[1];
1724   // FIXME: setup "ext[0]..[k-1]"
1725   bfdata = (char *) &ext[k];
1726   if (pr->bf != NULL)
1727     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
1728                                                bfdata,
1729                                                pr->bf_size);
1730   
1731   /* (3) schedule job to do it again (or another peer, etc.) */
1732   pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1733                                            GNUNET_NO,
1734                                            GNUNET_SCHEDULER_PRIORITY_IDLE,
1735                                            GNUNET_SCHEDULER_NO_TASK,
1736                                            get_processing_delay (), // FIXME!
1737                                            &forward_request_task,
1738                                            pr);
1739
1740   return msize;
1741 }
1742
1743
1744 /**
1745  * Function called after we've tried to reserve
1746  * a certain amount of bandwidth for a reply.
1747  * Check if we succeeded and if so send our query.
1748  *
1749  * @param cls the requests "struct PendingRequest*"
1750  * @param peer identifies the peer
1751  * @param latency current latency estimate, "FOREVER" if we have been
1752  *                disconnected
1753  * @param bpm_in set to the current bandwidth limit (receiving) for this peer
1754  * @param bpm_out set to the current bandwidth limit (sending) for this peer
1755  * @param amount set to the amount that was actually reserved or unreserved
1756  * @param preference current traffic preference for the given peer
1757  */
1758 static void
1759 target_reservation_cb (void *cls,
1760                        const struct
1761                        GNUNET_PeerIdentity * peer,
1762                        unsigned int bpm_in,
1763                        unsigned int bpm_out,
1764                        struct GNUNET_TIME_Relative
1765                        latency, int amount,
1766                        unsigned long long preference)
1767 {
1768   struct PendingRequest *pr = cls;
1769   uint32_t priority;
1770   uint16_t size;
1771   struct GNUNET_TIME_Relative maxdelay;
1772
1773   GNUNET_assert (peer != NULL);
1774   if ( (amount != DBLOCK_SIZE) ||
1775        (pr->cth != NULL) )
1776     {
1777       /* try again later; FIXME: we may need to un-reserve "amount"? */
1778       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1779                                                GNUNET_NO,
1780                                                GNUNET_SCHEDULER_PRIORITY_IDLE,
1781                                                GNUNET_SCHEDULER_NO_TASK,
1782                                                get_processing_delay (), // FIXME: longer?
1783                                                &forward_request_task,
1784                                                pr);
1785       return;
1786     }
1787   // (2) transmit, update ttl/priority
1788   // FIXME: calculate priority, maxdelay, size properly!
1789   priority = 0;
1790   size = 60000;
1791   maxdelay = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
1792   pr->cth = GNUNET_CORE_notify_transmit_ready (core,
1793                                                priority,
1794                                                maxdelay,
1795                                                peer,
1796                                                size,
1797                                                &transmit_request_cb,
1798                                                pr);
1799   if (pr->cth == NULL)
1800     {
1801       /* try again later */
1802       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1803                                                GNUNET_NO,
1804                                                GNUNET_SCHEDULER_PRIORITY_IDLE,
1805                                                GNUNET_SCHEDULER_NO_TASK,
1806                                                get_processing_delay (), // FIXME: longer?
1807                                                &forward_request_task,
1808                                                pr);
1809     }
1810 }
1811
1812
1813 /**
1814  * Task that is run for each request with the
1815  * goal of forwarding the associated query to
1816  * other peers.  The task should re-schedule
1817  * itself to be re-run once the TTL has expired.
1818  * (or at a later time if more peers should
1819  * be queried earlier).
1820  *
1821  * @param cls the requests "struct PendingRequest*"
1822  * @param tc task context (unused)
1823  */
1824 static void
1825 forward_request_task (void *cls,
1826                       const struct GNUNET_SCHEDULER_TaskContext *tc)
1827 {
1828   struct PendingRequest *pr = cls;
1829   struct PeerSelectionContext psc;
1830
1831   pr->task = GNUNET_SCHEDULER_NO_TASK;
1832   if (pr->cth != NULL) 
1833     {
1834       /* we're busy transmitting a result, wait a bit */
1835       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1836                                                GNUNET_NO,
1837                                                GNUNET_SCHEDULER_PRIORITY_IDLE,
1838                                                GNUNET_SCHEDULER_NO_TASK,
1839                                                get_processing_delay (), 
1840                                                &forward_request_task,
1841                                                pr);
1842       return;
1843     }
1844   /* (1) select target */
1845   psc.pr = pr;
1846   psc.target_score = DBL_MIN;
1847   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1848                                          &target_peer_select_cb,
1849                                          &psc);
1850   if (psc.target_score == DBL_MIN)
1851     {
1852       /* no possible target found, wait some time */
1853       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1854                                                GNUNET_NO,
1855                                                GNUNET_SCHEDULER_PRIORITY_IDLE,
1856                                                GNUNET_SCHEDULER_NO_TASK,
1857                                                get_processing_delay (), // FIXME: exponential back-off? or at least wait longer...
1858                                                &forward_request_task,
1859                                                pr);
1860       return;
1861     }
1862   /* (2) reserve reply bandwidth */
1863   // FIXME: need a way to cancel; this
1864   // async operation is problematic (segv-problematic)
1865   // if "pr" is destroyed while it happens!
1866   GNUNET_CORE_peer_configure (core,
1867                               &psc.target,
1868                               GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
1869                               -1,
1870                               DBLOCK_SIZE, // FIXME: make dependent on type?
1871                               0,
1872                               &target_reservation_cb,
1873                               pr);
1874 }
1875
1876
1877 /**
1878  * We're processing (local) results for a search request
1879  * from a (local) client.  Pass applicable results to the
1880  * client and if we are done either clean up (operation
1881  * complete) or switch to P2P search (more results possible).
1882  *
1883  * @param cls our closure (struct LocalGetContext)
1884  * @param key key for the content
1885  * @param size number of bytes in data
1886  * @param data content stored
1887  * @param type type of the content
1888  * @param priority priority of the content
1889  * @param anonymity anonymity-level for the content
1890  * @param expiration expiration time for the content
1891  * @param uid unique identifier for the datum;
1892  *        maybe 0 if no unique identifier is available
1893  */
1894 static void
1895 process_local_get_result (void *cls,
1896                           const GNUNET_HashCode * key,
1897                           uint32_t size,
1898                           const void *data,
1899                           uint32_t type,
1900                           uint32_t priority,
1901                           uint32_t anonymity,
1902                           struct GNUNET_TIME_Absolute
1903                           expiration, 
1904                           uint64_t uid)
1905 {
1906   struct LocalGetContext *lgc = cls;
1907   struct PendingRequest *pr;
1908   struct ClientRequestList *crl;
1909   struct ClientList *cl;
1910   size_t msize;
1911   unsigned int i;
1912
1913   if (key == NULL)
1914     {
1915 #if DEBUG_FS
1916       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1917                   "Received last result for `%s' from local datastore, deciding what to do next.\n",
1918                   GNUNET_h2s (&lgc->query));
1919 #endif
1920       /* no further results from datastore; continue
1921          processing further requests from the client and
1922          allow the next task to use the datastore; also,
1923          switch to P2P requests or clean up our state. */
1924       next_ds_request ();
1925       GNUNET_SERVER_receive_done (lgc->client,
1926                                   GNUNET_OK);
1927       if ( (lgc->results_used == 0) ||
1928            (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
1929            (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
1930            (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) )
1931         {
1932 #if DEBUG_FS
1933           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1934                       "Forwarding query for `%s' to network.\n",
1935                       GNUNET_h2s (&lgc->query));
1936 #endif
1937           cl = clients;
1938           while ( (NULL != cl) &&
1939                   (cl->client != lgc->client) )
1940             cl = cl->next;
1941           if (cl == NULL)
1942             {
1943               cl = GNUNET_malloc (sizeof (struct ClientList));
1944               cl->client = lgc->client;
1945               cl->next = clients;
1946               clients = cl;
1947             }
1948           crl = GNUNET_malloc (sizeof (struct ClientRequestList));
1949           crl->cl = cl;
1950           GNUNET_CONTAINER_DLL_insert (cl->head, cl->tail, crl);
1951           pr = GNUNET_malloc (sizeof (struct PendingRequest));
1952           pr->client = lgc->client;
1953           GNUNET_SERVER_client_keep (pr->client);
1954           pr->crl_entry = crl;
1955           crl->req = pr;
1956           if (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)
1957             {
1958               pr->namespace = GNUNET_malloc (sizeof (GNUNET_HashCode));
1959               *pr->namespace = lgc->namespace;
1960             }
1961           pr->replies_seen = lgc->results;
1962           lgc->results = NULL;
1963           pr->start_time = GNUNET_TIME_absolute_get ();
1964           pr->query = lgc->query;
1965           pr->target_pid = GNUNET_PEER_intern (&lgc->target);
1966           pr->replies_seen_off = lgc->results_used;
1967           pr->replies_seen_size = lgc->results_size;
1968           lgc->results_size = 0;
1969           pr->type = lgc->type;
1970           pr->anonymity_level = lgc->anonymity_level;
1971           pr->bf = refresh_bloomfilter (pr->replies_seen_off,
1972                                         &pr->mingle,
1973                                         &pr->bf_size,
1974                                         pr->replies_seen);
1975           GNUNET_CONTAINER_multihashmap_put (requests_by_query,
1976                                              &pr->query,
1977                                              pr,
1978                                              GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1979           pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1980                                                    GNUNET_NO,
1981                                                    GNUNET_SCHEDULER_PRIORITY_IDLE,
1982                                                    GNUNET_SCHEDULER_NO_TASK,
1983                                                    get_processing_delay (),
1984                                                    &forward_request_task,
1985                                                    pr);
1986           local_get_context_free (lgc);
1987           return;
1988         }
1989       /* got all possible results, clean up! */
1990 #if DEBUG_FS
1991       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1992                   "Found all possible results for query for `%s', done!\n",
1993                   GNUNET_h2s (&lgc->query));
1994 #endif
1995       local_get_context_free (lgc);
1996       return;
1997     }
1998   if (type == GNUNET_DATASTORE_BLOCKTYPE_ONDEMAND)
1999     {
2000 #if DEBUG_FS
2001       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2002                   "Received on-demand block for `%s' from local datastore, fetching data.\n",
2003                   GNUNET_h2s (&lgc->query));
2004 #endif
2005       handle_on_demand_block (key, size, data, type, priority, 
2006                               anonymity, expiration, uid,
2007                               &process_local_get_result,
2008                               lgc);
2009       return;
2010     }
2011   if ( (type != lgc->type) &&
2012        (lgc->type != GNUNET_DATASTORE_BLOCKTYPE_ANY) )
2013     {
2014       /* this should be virtually impossible to reach (DBLOCK 
2015          query hash being identical to KBLOCK/SBLOCK query hash);
2016          nevertheless, if it happens, the correct thing is to
2017          simply skip the result. */
2018 #if DEBUG_FS
2019       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2020                   "Received block of unexpected type (%u, want %u) for `%s' from local datastore, ignoring.\n",
2021                   type,
2022                   lgc->type,
2023                   GNUNET_h2s (&lgc->query));
2024 #endif
2025       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);        
2026       return;
2027     }
2028   /* check if this is a result we've alredy
2029      received */
2030   for (i=0;i<lgc->results_used;i++)
2031     if (0 == memcmp (key,
2032                      &lgc->results[i],
2033                      sizeof (GNUNET_HashCode)))
2034       {
2035 #if DEBUG_FS
2036         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2037                     "Received duplicate result for `%s' from local datastore, ignoring.\n",
2038                     GNUNET_h2s (&lgc->query));
2039 #endif
2040         GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2041         return; 
2042       }
2043   if (lgc->results_used == lgc->results_size)
2044     GNUNET_array_grow (lgc->results,
2045                        lgc->results_size,
2046                        lgc->results_size * 2 + 2);
2047   GNUNET_CRYPTO_hash (data, 
2048                       size, 
2049                       &lgc->results[lgc->results_used++]);    
2050   msize = size + sizeof (struct ContentMessage);
2051   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
2052   lgc->result = GNUNET_malloc (msize);
2053   lgc->result->header.size = htons (msize);
2054   lgc->result->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT);
2055   lgc->result->type = htonl (type);
2056   lgc->result->expiration = GNUNET_TIME_absolute_hton (expiration);
2057   memcpy (&lgc->result[1],
2058           data,
2059           size);
2060 #if DEBUG_FS
2061   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2062               "Received new result for `%s' from local datastore, passing to client.\n",
2063               GNUNET_h2s (&lgc->query));
2064 #endif
2065   GNUNET_SERVER_notify_transmit_ready (lgc->client,
2066                                        msize,
2067                                        GNUNET_TIME_UNIT_FOREVER_REL,
2068                                        &transmit_local_result,
2069                                        lgc);
2070 }
2071
2072
2073 /**
2074  * We're processing a search request from a local
2075  * client.  Now it is our turn to query the datastore.
2076  * 
2077  * @param cls our closure (struct LocalGetContext)
2078  * @param tc unused
2079  */
2080 static void
2081 transmit_local_get (void *cls,
2082                     const struct GNUNET_SCHEDULER_TaskContext *tc)
2083 {
2084   struct LocalGetContext *lgc = cls;
2085   uint32_t type;
2086   
2087   type = lgc->type;
2088   if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK)
2089     type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */
2090   GNUNET_DATASTORE_get (dsh,
2091                         &lgc->query,
2092                         type,
2093                         &process_local_get_result,
2094                         lgc,
2095                         GNUNET_TIME_UNIT_FOREVER_REL);
2096 }
2097
2098
2099 /**
2100  * We're processing a search request from a local
2101  * client.  Now it is our turn to query the datastore.
2102  * 
2103  * @param cls our closure (struct LocalGetContext)
2104  * @param ok did we succeed to queue for datastore access, should always be GNUNET_OK
2105  */
2106 static void 
2107 transmit_local_get_ready (void *cls,
2108                           int ok)
2109 {
2110   struct LocalGetContext *lgc = cls;
2111
2112   GNUNET_assert (GNUNET_OK == ok);
2113   GNUNET_SCHEDULER_add_continuation (sched,
2114                                      GNUNET_NO,
2115                                      &transmit_local_get,
2116                                      lgc,
2117                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
2118 }
2119
2120
2121 /**
2122  * Handle START_SEARCH-message (search request from client).
2123  *
2124  * @param cls closure
2125  * @param client identification of the client
2126  * @param message the actual message
2127  */
2128 static void
2129 handle_start_search (void *cls,
2130                      struct GNUNET_SERVER_Client *client,
2131                      const struct GNUNET_MessageHeader *message)
2132 {
2133   const struct SearchMessage *sm;
2134   struct LocalGetContext *lgc;
2135   uint16_t msize;
2136   unsigned int sc;
2137   
2138   msize = ntohs (message->size);
2139   if ( (msize < sizeof (struct SearchMessage)) ||
2140        (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
2141     {
2142       GNUNET_break (0);
2143       GNUNET_SERVER_receive_done (client,
2144                                   GNUNET_SYSERR);
2145       return;
2146     }
2147   sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
2148   sm = (const struct SearchMessage*) message;
2149   GNUNET_SERVER_client_keep (client);
2150   lgc = GNUNET_malloc (sizeof (struct LocalGetContext));
2151   if  (sc > 0)
2152     {
2153       lgc->results_used = sc;
2154       GNUNET_array_grow (lgc->results,
2155                          lgc->results_size,
2156                          sc * 2);
2157       memcpy (lgc->results,
2158               &sm[1],
2159               sc * sizeof (GNUNET_HashCode));
2160     }
2161   lgc->client = client;
2162   lgc->type = ntohl (sm->type);
2163   lgc->anonymity_level = ntohl (sm->anonymity_level);
2164   switch (lgc->type)
2165     {
2166     case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
2167     case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
2168       lgc->target.hashPubKey = sm->target;
2169       break;
2170     case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
2171       lgc->namespace = sm->target;
2172       break;
2173     default:
2174       break;
2175     }
2176   lgc->query = sm->query;
2177   GNUNET_CONTAINER_DLL_insert (lgc_head, lgc_tail, lgc);
2178   lgc->req = queue_ds_request (GNUNET_TIME_UNIT_FOREVER_REL,
2179                                &transmit_local_get_ready,
2180                                lgc);
2181 }
2182
2183
2184 /**
2185  * List of handlers for the messages understood by this
2186  * service.
2187  */
2188 static struct GNUNET_SERVER_MessageHandler handlers[] = {
2189   {&handle_index_start, NULL, 
2190    GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
2191   {&handle_index_list_get, NULL, 
2192    GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
2193   {&handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
2194    sizeof (struct UnindexMessage) },
2195   {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
2196    0 },
2197   {NULL, NULL, 0, 0}
2198 };
2199
2200
2201 /**
2202  * Clean up the memory used by the PendingRequest structure (except
2203  * for the client or peer list that the request may be part of).
2204  *
2205  * @param pr request to clean up
2206  */
2207 static void
2208 destroy_pending_request (struct PendingRequest *pr)
2209 {
2210   struct PendingReply *reply;
2211   struct ClientList *cl;
2212
2213   GNUNET_CONTAINER_multihashmap_remove (requests_by_query,
2214                                         &pr->query,
2215                                         pr);
2216   // FIXME: not sure how this can work (efficiently)
2217   // also, what does the return value mean?
2218   if (pr->client == NULL)
2219     {
2220       GNUNET_CONTAINER_heap_remove_node (requests_by_expiration,
2221                                          pr);
2222     }
2223   else
2224     {
2225       cl = pr->crl_entry->cl;
2226       GNUNET_CONTAINER_DLL_remove (cl->head,
2227                                    cl->tail,
2228                                    pr->crl_entry);
2229     }
2230   if (GNUNET_SCHEDULER_NO_TASK != pr->task)
2231     GNUNET_SCHEDULER_cancel (sched, pr->task);
2232   if (NULL != pr->cth)
2233     GNUNET_CORE_notify_transmit_ready_cancel (pr->cth);
2234   if (NULL != pr->bf)
2235     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
2236   if (NULL != pr->th)
2237     GNUNET_CONNECTION_notify_transmit_ready_cancel (pr->th);
2238   while (NULL != (reply = pr->replies_pending))
2239     {
2240       pr->replies_pending = reply->next;
2241       GNUNET_free (reply);
2242     }
2243   GNUNET_PEER_change_rc (pr->source_pid, -1);
2244   GNUNET_PEER_change_rc (pr->target_pid, -1);
2245   GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
2246   GNUNET_free_non_null (pr->used_pids);
2247   GNUNET_free_non_null (pr->replies_seen);
2248   GNUNET_free_non_null (pr->namespace);
2249   GNUNET_free (pr);
2250 }
2251
2252
2253 /**
2254  * A client disconnected.  Remove all of its pending queries.
2255  *
2256  * @param cls closure, NULL
2257  * @param client identification of the client
2258  */
2259 static void
2260 handle_client_disconnect (void *cls,
2261                           struct GNUNET_SERVER_Client
2262                           * client)
2263 {
2264   struct LocalGetContext *lgc;
2265   struct ClientList *cpos;
2266   struct ClientList *cprev;
2267   struct ClientRequestList *rl;
2268
2269   lgc = lgc_head;
2270   while ( (NULL != lgc) &&
2271           (lgc->client != client) )
2272     lgc = lgc->next;
2273   if (lgc != NULL)
2274     local_get_context_free (lgc);
2275   cprev = NULL;
2276   cpos = clients;
2277   while ( (NULL != cpos) &&
2278           (clients->client != client) )
2279     {
2280       cprev = cpos;
2281       cpos = cpos->next;
2282     }
2283   if (cpos != NULL)
2284     {
2285       if (cprev == NULL)
2286         clients = cpos->next;
2287       else
2288         cprev->next = cpos->next;
2289       while (NULL != (rl = cpos->head))
2290         {
2291           cpos->head = rl->next;
2292           destroy_pending_request (rl->req);
2293           GNUNET_free (rl);
2294         }
2295       GNUNET_free (cpos);
2296     }
2297 }
2298
2299
2300 /**
2301  * Iterator over entries in the "requests_by_query" map
2302  * that frees all the entries.
2303  *
2304  * @param cls closure, NULL
2305  * @param key current key code (the query, unused) 
2306  * @param value value in the hash map, of type "struct PendingRequest*"
2307  * @return GNUNET_YES (we should continue to  iterate)
2308  */
2309 static int 
2310 destroy_pending_request_cb (void *cls,
2311                             const GNUNET_HashCode * key,
2312                             void *value)
2313 {
2314   struct PendingRequest *pr = value;
2315
2316   destroy_pending_request (pr);
2317   return GNUNET_YES;
2318 }
2319
2320
2321 /**
2322  * Task run during shutdown.
2323  *
2324  * @param cls unused
2325  * @param tc unused
2326  */
2327 static void
2328 shutdown_task (void *cls,
2329                const struct GNUNET_SCHEDULER_TaskContext *tc)
2330 {
2331   struct IndexInfo *pos;  
2332
2333   if (NULL != core)
2334     GNUNET_CORE_disconnect (core);
2335   GNUNET_DATASTORE_disconnect (dsh,
2336                                GNUNET_NO);
2337   dsh = NULL;
2338   GNUNET_CONTAINER_multihashmap_iterate (requests_by_query,
2339                                          &destroy_pending_request_cb,
2340                                          NULL);
2341   while (clients != NULL)
2342     handle_client_disconnect (NULL,
2343                               clients->client);
2344   GNUNET_CONTAINER_multihashmap_destroy (requests_by_query);
2345   requests_by_query = NULL;
2346   GNUNET_CONTAINER_multihashmap_destroy (requests_by_peer);
2347   requests_by_peer = NULL;
2348   GNUNET_CONTAINER_heap_destroy (requests_by_expiration);
2349   requests_by_expiration = NULL;
2350   // FIXME: iterate over entries and free individually?
2351   // (or do we get disconnect notifications?)
2352   GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
2353   connected_peers = NULL;
2354   GNUNET_CONTAINER_multihashmap_destroy (ifm);
2355   ifm = NULL;
2356   while (NULL != (pos = indexed_files))
2357     {
2358       indexed_files = pos->next;
2359       GNUNET_free (pos);
2360     }
2361 }
2362
2363
2364 /**
2365  * Free (each) request made by the peer.
2366  *
2367  * @param cls closure, points to peer that the request belongs to
2368  * @param key current key code
2369  * @param value value in the hash map
2370  * @return GNUNET_YES (we should continue to iterate)
2371  */
2372 static int
2373 destroy_request (void *cls,
2374                  const GNUNET_HashCode * key,
2375                  void *value)
2376 {
2377   const struct GNUNET_PeerIdentity * peer = cls;
2378   struct PendingRequest *pr = value;
2379   
2380   GNUNET_CONTAINER_multihashmap_remove (requests_by_peer,
2381                                         &peer->hashPubKey,
2382                                         pr);
2383   destroy_pending_request (pr);
2384   return GNUNET_YES;
2385 }
2386
2387
2388
2389 /**
2390  * Method called whenever a given peer connects.
2391  *
2392  * @param cls closure, not used
2393  * @param peer peer identity this notification is about
2394  */
2395 static void 
2396 peer_connect_handler (void *cls,
2397                       const struct
2398                       GNUNET_PeerIdentity * peer)
2399 {
2400   struct ConnectedPeer *cp;
2401
2402   cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
2403   cp->pid = GNUNET_PEER_intern (peer);
2404   GNUNET_CONTAINER_multihashmap_put (connected_peers,
2405                                      &peer->hashPubKey,
2406                                      cp,
2407                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
2408 }
2409
2410
2411 /**
2412  * Method called whenever a peer disconnects.
2413  *
2414  * @param cls closure, not used
2415  * @param peer peer identity this notification is about
2416  */
2417 static void
2418 peer_disconnect_handler (void *cls,
2419                          const struct
2420                          GNUNET_PeerIdentity * peer)
2421 {
2422   struct ConnectedPeer *cp;
2423
2424   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2425                                           &peer->hashPubKey);
2426   GNUNET_PEER_change_rc (cp->pid, -1);
2427   GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
2428   GNUNET_free (cp);
2429   GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_peer,
2430                                               &peer->hashPubKey,
2431                                               &destroy_request,
2432                                               (void*) peer);
2433 }
2434
2435
2436 /**
2437  * We're processing a GET request from
2438  * another peer and have decided to forward
2439  * it to other peers.
2440  *
2441  * @param cls our "struct ProcessGetContext *"
2442  * @param tc unused
2443  */
2444 static void
2445 forward_get_request (void *cls,
2446                      const struct GNUNET_SCHEDULER_TaskContext *tc)
2447 {
2448   struct ProcessGetContext *pgc = cls;
2449   struct PendingRequest *pr;
2450   struct PendingRequest *eer;
2451   struct GNUNET_PeerIdentity target;
2452
2453   pr = GNUNET_malloc (sizeof (struct PendingRequest));
2454   if (GET_MESSAGE_BIT_SKS_NAMESPACE == (GET_MESSAGE_BIT_SKS_NAMESPACE & pgc->bm))
2455     {
2456       pr->namespace = GNUNET_malloc (sizeof(GNUNET_HashCode));
2457       *pr->namespace = pgc->namespace;
2458     }
2459   pr->bf = pgc->bf;
2460   pr->bf_size = pgc->bf_size;
2461   pgc->bf = NULL;
2462   pr->start_time = pgc->start_time;
2463   pr->query = pgc->query;
2464   pr->source_pid = GNUNET_PEER_intern (&pgc->reply_to);
2465   if (GET_MESSAGE_BIT_TRANSMIT_TO == (GET_MESSAGE_BIT_TRANSMIT_TO & pgc->bm))
2466     pr->target_pid = GNUNET_PEER_intern (&pgc->prime_target);
2467   pr->anonymity_level = 1; /* default */
2468   pr->priority = pgc->priority;
2469   pr->remaining_priority = pr->priority;
2470   pr->mingle = pgc->mingle;
2471   pr->ttl = pgc->ttl; 
2472   pr->type = pgc->type;
2473   GNUNET_CONTAINER_multihashmap_put (requests_by_query,
2474                                      &pr->query,
2475                                      pr,
2476                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
2477   GNUNET_CONTAINER_multihashmap_put (requests_by_peer,
2478                                      &pgc->reply_to.hashPubKey,
2479                                      pr,
2480                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
2481   GNUNET_CONTAINER_heap_insert (requests_by_expiration,
2482                                 pr,
2483                                 pr->start_time.value + pr->ttl);
2484   if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) > max_pending_requests)
2485     {
2486       /* expire oldest request! */
2487       eer = GNUNET_CONTAINER_heap_peek (requests_by_expiration);
2488       GNUNET_PEER_resolve (eer->source_pid,
2489                            &target);    
2490       GNUNET_CONTAINER_multihashmap_remove (requests_by_peer,
2491                                             &target.hashPubKey,
2492                                             eer);
2493       destroy_pending_request (eer);     
2494     }
2495   pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2496                                            GNUNET_NO,
2497                                            GNUNET_SCHEDULER_PRIORITY_IDLE,
2498                                            GNUNET_SCHEDULER_NO_TASK,
2499                                            get_processing_delay (),
2500                                            &forward_request_task,
2501                                            pr);
2502   GNUNET_free (pgc); 
2503 }
2504 /**
2505  * Transmit the given message by copying it to
2506  * the target buffer "buf".  "buf" will be
2507  * NULL and "size" zero if the socket was closed for
2508  * writing in the meantime.  In that case, only
2509
2510  * free the message
2511  *
2512  * @param cls closure, pointer to the message
2513  * @param size number of bytes available in buf
2514  * @param buf where the callee should write the message
2515  * @return number of bytes written to buf
2516  */
2517 static size_t
2518 transmit_message (void *cls,
2519                   size_t size, void *buf)
2520 {
2521   struct GNUNET_MessageHeader *msg = cls;
2522   uint16_t msize;
2523   
2524   if (NULL == buf)
2525     {
2526 #if DEBUG_FS
2527       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2528                   "Dropping reply, core too busy.\n");
2529 #endif
2530       GNUNET_free (msg);
2531       return 0;
2532     }
2533   msize = ntohs (msg->size);
2534   GNUNET_assert (size >= msize);
2535   memcpy (buf, msg, msize);
2536   GNUNET_free (msg);
2537   return msize;
2538 }
2539
2540
2541 /**
2542  * Test if the load on this peer is too high
2543  * to even consider processing the query at
2544  * all.
2545  * 
2546  * @return GNUNET_YES if the load is too high, GNUNET_NO otherwise
2547  */
2548 static int
2549 test_load_too_high ()
2550 {
2551   return GNUNET_NO; // FIXME
2552 }
2553
2554
2555 /**
2556  * We're processing (local) results for a search request
2557  * from another peer.  Pass applicable results to the
2558  * peer and if we are done either clean up (operation
2559  * complete) or forward to other peers (more results possible).
2560  *
2561  * @param cls our closure (struct LocalGetContext)
2562  * @param key key for the content
2563  * @param size number of bytes in data
2564  * @param data content stored
2565  * @param type type of the content
2566  * @param priority priority of the content
2567  * @param anonymity anonymity-level for the content
2568  * @param expiration expiration time for the content
2569  * @param uid unique identifier for the datum;
2570  *        maybe 0 if no unique identifier is available
2571  */
2572 static void
2573 process_p2p_get_result (void *cls,
2574                         const GNUNET_HashCode * key,
2575                         uint32_t size,
2576                         const void *data,
2577                         uint32_t type,
2578                         uint32_t priority,
2579                         uint32_t anonymity,
2580                         struct GNUNET_TIME_Absolute
2581                         expiration, 
2582                         uint64_t uid)
2583 {
2584   struct ProcessGetContext *pgc = cls;
2585   GNUNET_HashCode dhash;
2586   GNUNET_HashCode mhash;
2587   struct PutMessage *reply;
2588   
2589   if (NULL == key)
2590     {
2591       /* no more results */
2592       if ( ( (pgc->policy & ROUTING_POLICY_FORWARD) ==  ROUTING_POLICY_FORWARD) &&
2593            ( (0 == pgc->results_found) ||
2594              (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
2595              (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
2596              (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) ) )
2597         {
2598           GNUNET_SCHEDULER_add_continuation (sched,
2599                                              GNUNET_NO,
2600                                              &forward_get_request,
2601                                              pgc,
2602                                              GNUNET_SCHEDULER_REASON_PREREQ_DONE);
2603         }
2604       else
2605         {
2606           if (pgc->bf != NULL)
2607             GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2608           GNUNET_free (pgc); 
2609         }
2610       next_ds_request ();
2611       return;
2612     }
2613   if (type == GNUNET_DATASTORE_BLOCKTYPE_ONDEMAND)
2614     {
2615       handle_on_demand_block (key, size, data, type, priority, 
2616                               anonymity, expiration, uid,
2617                               &process_p2p_get_result,
2618                               pgc);
2619       return;
2620     }
2621   /* check for duplicates */
2622   GNUNET_CRYPTO_hash (data, size, &dhash);
2623   mingle_hash (&dhash, 
2624                pgc->mingle,
2625                &mhash);
2626   if ( (pgc->bf != NULL) &&
2627        (GNUNET_YES ==
2628         GNUNET_CONTAINER_bloomfilter_test (pgc->bf,
2629                                            &mhash)) )
2630     {      
2631 #if DEBUG_FS
2632       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2633                   "Result from datastore filtered by bloomfilter.\n");
2634 #endif
2635       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2636       return;
2637     }
2638   pgc->results_found++;
2639   if ( (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
2640        (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
2641        (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) )
2642     {
2643       if (pgc->bf == NULL)
2644         {
2645           pgc->bf_size = 32;
2646           pgc->bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
2647                                                        pgc->bf_size, 
2648                                                        BLOOMFILTER_K);
2649         }
2650       GNUNET_CONTAINER_bloomfilter_add (pgc->bf, 
2651                                         &mhash);
2652     }
2653
2654   reply = GNUNET_malloc (sizeof (struct PutMessage) + size);
2655   reply->header.size = htons (sizeof (struct PutMessage) + size);
2656   reply->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2657   reply->type = htonl (type);
2658   reply->expiration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (expiration));
2659   memcpy (&reply[1], data, size);
2660   GNUNET_CORE_notify_transmit_ready (core,
2661                                      pgc->priority,
2662                                      ACCEPTABLE_REPLY_DELAY,
2663                                      &pgc->reply_to,
2664                                      sizeof (struct PutMessage) + size,
2665                                      &transmit_message,
2666                                      reply);
2667   if ( (GNUNET_YES == test_load_too_high()) ||
2668        (pgc->results_found > 5 + 2 * pgc->priority) )
2669     {
2670       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
2671       pgc->policy &= ~ ROUTING_POLICY_FORWARD;
2672       return;
2673     }
2674   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2675 }
2676   
2677
2678 /**
2679  * We're processing a GET request from another peer.  Give it to our
2680  * local datastore.
2681  *
2682  * @param cls our "struct ProcessGetContext"
2683  * @param ok did we get a datastore slice or not?
2684  */
2685 static void
2686 ds_get_request (void *cls, 
2687                 int ok)
2688 {
2689   struct ProcessGetContext *pgc = cls;
2690   uint32_t type;
2691   struct GNUNET_TIME_Relative timeout;
2692
2693   if (GNUNET_OK != ok)
2694     {
2695       /* no point in doing P2P stuff if we can't even do local */
2696       GNUNET_free (dsh);
2697       return;
2698     }
2699   type = pgc->type;
2700   if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK)
2701     type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */
2702   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
2703                                            (pgc->priority + 1));
2704   GNUNET_DATASTORE_get (dsh,
2705                         &pgc->query,
2706                         type,
2707                         &process_p2p_get_result,
2708                         pgc,
2709                         timeout);
2710 }
2711
2712
2713 /**
2714  * The priority level imposes a bound on the maximum
2715  * value for the ttl that can be requested.
2716  *
2717  * @param ttl_in requested ttl
2718  * @param prio given priority
2719  * @return ttl_in if ttl_in is below the limit,
2720  *         otherwise the ttl-limit for the given priority
2721  */
2722 static int32_t
2723 bound_ttl (int32_t ttl_in, uint32_t prio)
2724 {
2725   unsigned long long allowed;
2726
2727   if (ttl_in <= 0)
2728     return ttl_in;
2729   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
2730   if (ttl_in > allowed)      
2731     {
2732       if (allowed >= (1 << 30))
2733         return 1 << 30;
2734       return allowed;
2735     }
2736   return ttl_in;
2737 }
2738
2739
2740 /**
2741  * We've received a request with the specified
2742  * priority.  Bound it according to how much
2743  * we trust the given peer.
2744  * 
2745  * @param prio_in requested priority
2746  * @param peer the peer making the request
2747  * @return effective priority
2748  */
2749 static uint32_t
2750 bound_priority (uint32_t prio_in,
2751                 const struct GNUNET_PeerIdentity *peer)
2752 {
2753   return 0; // FIXME!
2754 }
2755
2756
2757 /**
2758  * Handle P2P "GET" request.
2759  *
2760  * @param cls closure, always NULL
2761  * @param other the other peer involved (sender or receiver, NULL
2762  *        for loopback messages where we are both sender and receiver)
2763  * @param message the actual message
2764  * @return GNUNET_OK to keep the connection open,
2765  *         GNUNET_SYSERR to close it (signal serious error)
2766  */
2767 static int
2768 handle_p2p_get (void *cls,
2769                 const struct GNUNET_PeerIdentity *other,
2770                 const struct GNUNET_MessageHeader *message)
2771 {
2772   uint16_t msize;
2773   const struct GetMessage *gm;
2774   unsigned int bits;
2775   const GNUNET_HashCode *opt;
2776   struct ProcessGetContext *pgc;
2777   uint32_t bm;
2778   size_t bfsize;
2779   uint32_t ttl_decrement;
2780   double preference;
2781   int net_load_up;
2782   int net_load_down;
2783
2784   msize = ntohs(message->size);
2785   if (msize < sizeof (struct GetMessage))
2786     {
2787       GNUNET_break_op (0);
2788       return GNUNET_SYSERR;
2789     }
2790   gm = (const struct GetMessage*) message;
2791   bm = ntohl (gm->hash_bitmap);
2792   bits = 0;
2793   while (bm > 0)
2794     {
2795       if (1 == (bm & 1))
2796         bits++;
2797       bm >>= 1;
2798     }
2799   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
2800     {
2801       GNUNET_break_op (0);
2802       return GNUNET_SYSERR;
2803     }  
2804   opt = (const GNUNET_HashCode*) &gm[1];
2805   bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
2806   pgc = GNUNET_malloc (sizeof (struct ProcessGetContext));
2807   if (bfsize > 0)
2808     {
2809       pgc->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &pgc[1],
2810                                                    bfsize,
2811                                                    BLOOMFILTER_K);
2812       pgc->bf_size = bfsize;
2813     }
2814   pgc->type = ntohl (gm->type);
2815   pgc->bm = ntohl (gm->hash_bitmap);
2816   pgc->mingle = gm->filter_mutator;
2817   bits = 0;
2818   if (0 != (pgc->bm & GET_MESSAGE_BIT_RETURN_TO))
2819     pgc->reply_to.hashPubKey = opt[bits++];
2820   else
2821     pgc->reply_to = *other;
2822   if (0 != (pgc->bm & GET_MESSAGE_BIT_SKS_NAMESPACE))
2823     pgc->namespace = opt[bits++];
2824   else if (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)
2825     {
2826       GNUNET_break_op (0);
2827       GNUNET_free (pgc);
2828       return GNUNET_SYSERR;
2829     }
2830   if (0 != (pgc->bm & GET_MESSAGE_BIT_TRANSMIT_TO))
2831     pgc->prime_target.hashPubKey = opt[bits++];
2832   /* note that we can really only check load here since otherwise
2833      peers could find out that we are overloaded by being disconnected
2834      after sending us a malformed query... */
2835   if (GNUNET_YES == test_load_too_high ())
2836     {
2837       if (NULL != pgc->bf)
2838         GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2839       GNUNET_free (pgc);
2840 #if DEBUG_FS
2841       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2842                   "Dropping query from `%s', this peer is too busy.\n",
2843                   GNUNET_i2s (other));
2844 #endif
2845       return GNUNET_OK;
2846     }
2847   net_load_up = 50; // FIXME
2848   net_load_down = 50; // FIXME
2849   pgc->policy = ROUTING_POLICY_NONE;
2850   if ( (net_load_up < IDLE_LOAD_THRESHOLD) &&
2851        (net_load_down < IDLE_LOAD_THRESHOLD) )
2852     {
2853       pgc->policy |= ROUTING_POLICY_ALL;
2854       pgc->priority = 0; /* no charge */
2855     }
2856   else
2857     {
2858       pgc->priority = bound_priority (ntohl (gm->priority), other);
2859       if ( (net_load_up < 
2860             IDLE_LOAD_THRESHOLD + pgc->priority * pgc->priority) &&
2861            (net_load_down < 
2862             IDLE_LOAD_THRESHOLD + pgc->priority * pgc->priority) )
2863         {
2864           pgc->policy |= ROUTING_POLICY_ALL;
2865         }
2866       else
2867         {
2868           // FIXME: is this sound?
2869           if (net_load_up < 90 + 10 * pgc->priority)
2870             pgc->policy |= ROUTING_POLICY_FORWARD;
2871           if (net_load_down < 90 + 10 * pgc->priority)
2872             pgc->policy |= ROUTING_POLICY_ANSWER;
2873         }
2874     }
2875   if (pgc->policy == ROUTING_POLICY_NONE)
2876     {
2877 #if DEBUG_FS
2878       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2879                   "Dropping query from `%s', network saturated.\n",
2880                   GNUNET_i2s (other));
2881 #endif
2882       if (NULL != pgc->bf)
2883         GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2884       GNUNET_free (pgc);
2885       return GNUNET_OK;     /* drop */
2886     }
2887   if ((pgc->policy & ROUTING_POLICY_INDIRECT) != ROUTING_POLICY_INDIRECT)
2888     pgc->priority = 0;  /* kill the priority (we cannot benefit) */
2889   pgc->ttl = bound_ttl (ntohl (gm->ttl), pgc->priority);
2890   /* decrement ttl (always) */
2891   ttl_decrement = 2 * TTL_DECREMENT +
2892     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2893                               TTL_DECREMENT);
2894   if ( (pgc->ttl < 0) &&
2895        (pgc->ttl - ttl_decrement > 0) )
2896     {
2897 #if DEBUG_FS
2898       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2899                   "Dropping query from `%s' due to TTL underflow.\n",
2900                   GNUNET_i2s (other));
2901 #endif
2902       /* integer underflow => drop (should be very rare)! */
2903       if (NULL != pgc->bf)
2904         GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2905       GNUNET_free (pgc);
2906       return GNUNET_OK;
2907     }
2908   pgc->ttl -= ttl_decrement;
2909   pgc->start_time = GNUNET_TIME_absolute_get ();
2910   preference = (double) pgc->priority;
2911   if (preference < QUERY_BANDWIDTH_VALUE)
2912     preference = QUERY_BANDWIDTH_VALUE;
2913   // FIXME: also reserve bandwidth for reply?
2914   GNUNET_CORE_peer_configure (core,
2915                               other,
2916                               GNUNET_TIME_UNIT_FOREVER_REL,
2917                               0, 0, preference, NULL, NULL);
2918   if (0 != (pgc->policy & ROUTING_POLICY_ANSWER))
2919     pgc->drq = queue_ds_request (BASIC_DATASTORE_REQUEST_DELAY,
2920                                  &ds_get_request,
2921                                  pgc);
2922   else
2923     GNUNET_SCHEDULER_add_continuation (sched,
2924                                        GNUNET_NO,
2925                                        &forward_get_request,
2926                                        pgc,
2927                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
2928   return GNUNET_OK;
2929 }
2930
2931
2932 /**
2933  * Function called to notify us that we can now transmit a reply to a
2934  * client or peer.  "buf" will be NULL and "size" zero if the socket was
2935  * closed for writing in the meantime.
2936  *
2937  * @param cls closure, points to a "struct PendingRequest*" with
2938  *            one or more pending replies
2939  * @param size number of bytes available in buf
2940  * @param buf where the callee should write the message
2941  * @return number of bytes written to buf
2942  */
2943 static size_t
2944 transmit_result (void *cls,
2945                  size_t size, 
2946                  void *buf)
2947 {
2948   struct PendingRequest *pr = cls;
2949   char *cbuf = buf;
2950   struct PendingReply *reply;
2951   size_t ret;
2952
2953   ret = 0;
2954   while (NULL != (reply = pr->replies_pending))
2955     {
2956       if ( (reply->msize + ret < ret) ||
2957            (reply->msize + ret > size) )
2958         break;
2959       pr->replies_pending = reply->next;
2960       memcpy (&cbuf[ret], &reply[1], reply->msize);
2961       ret += reply->msize;
2962       GNUNET_free (reply);
2963     }
2964   return ret;
2965 }
2966
2967
2968 /**
2969  * Iterator over pending requests.
2970  *
2971  * @param cls response (struct ProcessReplyClosure)
2972  * @param key our query
2973  * @param value value in the hash map (meta-info about the query)
2974  * @return GNUNET_YES (we should continue to iterate)
2975  */
2976 static int
2977 process_reply (void *cls,
2978                const GNUNET_HashCode * key,
2979                void *value)
2980 {
2981   struct ProcessReplyClosure *prq = cls;
2982   struct PendingRequest *pr = value;
2983   struct PendingRequest *eer;
2984   struct PendingReply *reply;
2985   struct PutMessage *pm;
2986   struct ContentMessage *cm;
2987   GNUNET_HashCode chash;
2988   GNUNET_HashCode mhash;
2989   struct GNUNET_PeerIdentity target;
2990   size_t msize;
2991   uint32_t prio;
2992   struct GNUNET_TIME_Relative max_delay;
2993   
2994   GNUNET_CRYPTO_hash (prq->data,
2995                       prq->size,
2996                       &chash);
2997   switch (prq->type)
2998     {
2999     case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
3000     case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
3001       break;
3002     case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
3003       /* FIXME: does prq->namespace match our expectations? */
3004       /* then: fall-through??? */
3005     case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
3006       if (pr->bf != NULL) 
3007         {
3008           mingle_hash (&chash, pr->mingle, &mhash);
3009           if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (pr->bf,
3010                                                                &mhash))
3011             return GNUNET_YES; /* duplicate */
3012           GNUNET_CONTAINER_bloomfilter_add (pr->bf,
3013                                             &mhash);
3014         }
3015       break;
3016     case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK:
3017       // FIXME: any checks against duplicates for SKBlocks?
3018       break;
3019     }
3020   prio = pr->priority;
3021   prq->priority += pr->remaining_priority;
3022   pr->remaining_priority = 0;
3023   if (pr->client != NULL)
3024     {
3025       if (pr->replies_seen_size == pr->replies_seen_off)
3026         GNUNET_array_grow (pr->replies_seen,
3027                            pr->replies_seen_size,
3028                            pr->replies_seen_size * 2 + 4);
3029       pr->replies_seen[pr->replies_seen_off++] = chash;
3030       // FIXME: possibly recalculate BF!
3031     }
3032   if (pr->client == NULL)
3033     {
3034       msize = sizeof (struct ContentMessage) + prq->size;
3035       reply = GNUNET_malloc (msize + sizeof (struct PendingReply));
3036       reply->msize = msize;
3037       cm = (struct ContentMessage*) &reply[1];
3038       cm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT);
3039       cm->header.size = htons (msize);
3040       cm->type = htonl (prq->type);
3041       cm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3042       reply->next = pr->replies_pending;
3043       pr->replies_pending = reply;
3044       memcpy (&reply[1], prq->data, prq->size);
3045       if (pr->cth != NULL)
3046         return GNUNET_YES;
3047       max_delay = GNUNET_TIME_UNIT_FOREVER_REL;
3048       if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) >= max_pending_requests)
3049         {
3050           /* estimate expiration time from time difference between
3051              first request that will be discarded and this request */
3052           eer = GNUNET_CONTAINER_heap_peek (requests_by_expiration);
3053           max_delay = GNUNET_TIME_absolute_get_difference (pr->start_time,
3054                                                            eer->start_time);
3055         }
3056       GNUNET_PEER_resolve (pr->source_pid,
3057                            &target);
3058       pr->cth = GNUNET_CORE_notify_transmit_ready (core,
3059                                                    prio,
3060                                                    max_delay,
3061                                                    &target,
3062                                                    msize,
3063                                                    &transmit_result,
3064                                                    pr);
3065       if (NULL == pr->cth)
3066         {
3067           // FIXME: now what? discard?
3068         }
3069     }
3070   else
3071     {
3072       msize = sizeof (struct PutMessage) + prq->size;
3073       reply = GNUNET_malloc (msize + sizeof (struct PendingReply));
3074       reply->msize = msize;
3075       reply->next = pr->replies_pending;
3076       pm = (struct PutMessage*) &reply[1];
3077       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3078       pm->header.size = htons (msize);
3079       pm->type = htonl (prq->type);
3080       pm->expiration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (prq->expiration));
3081       pr->replies_pending = reply;
3082       memcpy (&reply[1], prq->data, prq->size);
3083       if (pr->th != NULL)
3084         return GNUNET_YES;
3085       pr->th = GNUNET_SERVER_notify_transmit_ready (pr->client,
3086                                                     msize,
3087                                                     GNUNET_TIME_UNIT_FOREVER_REL,
3088                                                     &transmit_result,
3089                                                     pr);
3090       if (pr->th == NULL)
3091         {
3092           // FIXME: need to try again later (not much
3093           // to do here specifically, but we need to
3094           // check somewhere else to handle this case!)
3095         }
3096     }
3097   // FIXME: implement hot-path routing statistics keeping!
3098   return GNUNET_YES;
3099 }
3100
3101
3102 /**
3103  * Check if the given KBlock is well-formed.
3104  *
3105  * @param kb the kblock data (or at least "dsize" bytes claiming to be one)
3106  * @param dsize size of "kb" in bytes; check for < sizeof(struct KBlock)!
3107  * @param query where to store the query that this block answers
3108  * @return GNUNET_OK if this is actually a well-formed KBlock
3109  */
3110 static int
3111 check_kblock (const struct KBlock *kb,
3112               size_t dsize,
3113               GNUNET_HashCode *query)
3114 {
3115   if (dsize < sizeof (struct KBlock))
3116     {
3117       GNUNET_break_op (0);
3118       return GNUNET_SYSERR;
3119     }
3120   if (dsize - sizeof (struct KBlock) !=
3121       ntohs (kb->purpose.size) 
3122       - sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) 
3123       - sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) ) 
3124     {
3125       GNUNET_break_op (0);
3126       return GNUNET_SYSERR;
3127     }
3128   if (GNUNET_OK !=
3129       GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_KBLOCK,
3130                                 &kb->purpose,
3131                                 &kb->signature,
3132                                 &kb->keyspace)) 
3133     {
3134       GNUNET_break_op (0);
3135       return GNUNET_SYSERR;
3136     }
3137   if (query != NULL)
3138     GNUNET_CRYPTO_hash (&kb->keyspace,
3139                         sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
3140                         query);
3141   return GNUNET_OK;
3142 }
3143
3144
3145 /**
3146  * Check if the given SBlock is well-formed.
3147  *
3148  * @param sb the sblock data (or at least "dsize" bytes claiming to be one)
3149  * @param dsize size of "kb" in bytes; check for < sizeof(struct SBlock)!
3150  * @param query where to store the query that this block answers
3151  * @param namespace where to store the namespace that this block belongs to
3152  * @return GNUNET_OK if this is actually a well-formed SBlock
3153  */
3154 static int
3155 check_sblock (const struct SBlock *sb,
3156               size_t dsize,
3157               GNUNET_HashCode *query,   
3158               GNUNET_HashCode *namespace)
3159 {
3160   if (dsize < sizeof (struct SBlock))
3161     {
3162       GNUNET_break_op (0);
3163       return GNUNET_SYSERR;
3164     }
3165   if (dsize !=
3166       ntohs (sb->purpose.size) + sizeof (struct GNUNET_CRYPTO_RsaSignature))
3167     {
3168       GNUNET_break_op (0);
3169       return GNUNET_SYSERR;
3170     }
3171   if (GNUNET_OK !=
3172       GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_SBLOCK,
3173                                 &sb->purpose,
3174                                 &sb->signature,
3175                                 &sb->subspace)) 
3176     {
3177       GNUNET_break_op (0);
3178       return GNUNET_SYSERR;
3179     }
3180   if (query != NULL)
3181     *query = sb->identifier;
3182   if (namespace != NULL)
3183     GNUNET_CRYPTO_hash (&sb->subspace,
3184                         sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
3185                         namespace);
3186   return GNUNET_OK;
3187 }
3188
3189
3190 /**
3191  * Handle P2P "PUT" request.
3192  *
3193  * @param cls closure, always NULL
3194  * @param other the other peer involved (sender or receiver, NULL
3195  *        for loopback messages where we are both sender and receiver)
3196  * @param message the actual message
3197  * @return GNUNET_OK to keep the connection open,
3198  *         GNUNET_SYSERR to close it (signal serious error)
3199  */
3200 static int
3201 handle_p2p_put (void *cls,
3202                 const struct GNUNET_PeerIdentity *other,
3203                 const struct GNUNET_MessageHeader *message)
3204 {
3205   const struct PutMessage *put;
3206   uint16_t msize;
3207   size_t dsize;
3208   uint32_t type;
3209   struct GNUNET_TIME_Absolute expiration;
3210   GNUNET_HashCode query;
3211   struct ProcessReplyClosure prq;
3212
3213   msize = ntohs (message->size);
3214   if (msize < sizeof (struct PutMessage))
3215     {
3216       GNUNET_break_op(0);
3217       return GNUNET_SYSERR;
3218     }
3219   put = (const struct PutMessage*) message;
3220   dsize = msize - sizeof (struct PutMessage);
3221   type = ntohl (put->type);
3222   expiration = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (put->expiration));
3223
3224   /* first, validate! */
3225   switch (type)
3226     {
3227     case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
3228     case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
3229       GNUNET_CRYPTO_hash (&put[1], dsize, &query);
3230       break;
3231     case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
3232       if (GNUNET_OK !=
3233           check_kblock ((const struct KBlock*) &put[1],
3234                         dsize,
3235                         &query))
3236         return GNUNET_SYSERR;
3237       break;
3238     case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
3239       if (GNUNET_OK !=
3240           check_sblock ((const struct SBlock*) &put[1],
3241                         dsize,
3242                         &query,
3243                         &prq.namespace))
3244         return GNUNET_SYSERR;
3245       break;
3246     case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK:
3247       // FIXME -- validate SKBLOCK!
3248       GNUNET_break (0);
3249       return GNUNET_OK;
3250     default:
3251       /* unknown block type */
3252       GNUNET_break_op (0);
3253       return GNUNET_SYSERR;
3254     }
3255
3256   /* now, lookup 'query' */
3257   prq.data = (const void*) &put[1];
3258   prq.size = dsize;
3259   prq.type = type;
3260   prq.expiration = expiration;
3261   prq.priority = 0;
3262   GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_query,
3263                                               &query,
3264                                               &process_reply,
3265                                               &prq);
3266   // FIXME: if migration is on and load is low,
3267   // queue to store data in datastore;
3268   // use "prq.priority" for that!
3269   return GNUNET_OK;
3270 }
3271
3272
3273 /**
3274  * List of handlers for P2P messages
3275  * that we care about.
3276  */
3277 static struct GNUNET_CORE_MessageHandler p2p_handlers[] =
3278   {
3279     { &handle_p2p_get, 
3280       GNUNET_MESSAGE_TYPE_FS_GET, 0 },
3281     { &handle_p2p_put, 
3282       GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
3283     { NULL, 0, 0 }
3284   };
3285
3286
3287 /**
3288  * Task that will try to initiate a connection with the
3289  * core service.
3290  * 
3291  * @param cls unused
3292  * @param tc unused
3293  */
3294 static void
3295 core_connect_task (void *cls,
3296                    const struct GNUNET_SCHEDULER_TaskContext *tc);
3297
3298
3299 /**
3300  * Function called by the core after we've
3301  * connected.
3302  *
3303  * @param cls closure, unused
3304  * @param server handle to the core service
3305  * @param my_identity our peer identity (unused)
3306  * @param publicKey our public key (unused)
3307  */
3308 static void
3309 core_start_cb (void *cls,
3310                struct GNUNET_CORE_Handle * server,
3311                const struct GNUNET_PeerIdentity *
3312                my_identity,
3313                const struct
3314                GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *
3315                publicKey)
3316 {
3317   if (server == NULL)
3318     {
3319       GNUNET_SCHEDULER_add_delayed (sched,
3320                                     GNUNET_NO,
3321                                     GNUNET_SCHEDULER_PRIORITY_HIGH,
3322                                     GNUNET_SCHEDULER_NO_TASK,
3323                                     GNUNET_TIME_UNIT_SECONDS,
3324                                     &core_connect_task,
3325                                     NULL);
3326       return;
3327     }
3328   core = server;
3329 }
3330
3331
3332 /**
3333  * Task that will try to initiate a connection with the
3334  * core service.
3335  * 
3336  * @param cls unused
3337  * @param tc unused
3338  */
3339 static void
3340 core_connect_task (void *cls,
3341                    const struct GNUNET_SCHEDULER_TaskContext *tc)
3342 {
3343   GNUNET_CORE_connect (sched,
3344                        cfg,
3345                        GNUNET_TIME_UNIT_FOREVER_REL,
3346                        NULL,
3347                        &core_start_cb,
3348                        &peer_connect_handler,
3349                        &peer_disconnect_handler,
3350                        NULL, 
3351                        NULL, GNUNET_NO,
3352                        NULL, GNUNET_NO,
3353                        p2p_handlers);
3354 }
3355
3356
3357 /**
3358  * Process fs requests.
3359  *
3360  * @param cls closure
3361  * @param s scheduler to use
3362  * @param server the initialized server
3363  * @param c configuration to use
3364  */
3365 static void
3366 run (void *cls,
3367      struct GNUNET_SCHEDULER_Handle *s,
3368      struct GNUNET_SERVER_Handle *server,
3369      const struct GNUNET_CONFIGURATION_Handle *c)
3370 {
3371   sched = s;
3372   cfg = c;
3373
3374   ifm = GNUNET_CONTAINER_multihashmap_create (128);
3375   requests_by_query = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
3376   requests_by_peer = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
3377   connected_peers = GNUNET_CONTAINER_multihashmap_create (64);
3378   requests_by_expiration = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
3379   read_index_list ();
3380   dsh = GNUNET_DATASTORE_connect (cfg,
3381                                   sched);
3382   if (NULL == dsh)
3383     {
3384       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3385                   _("Failed to connect to datastore service.\n"));
3386       return;
3387     }
3388   GNUNET_SERVER_disconnect_notify (server, 
3389                                    &handle_client_disconnect,
3390                                    NULL);
3391   GNUNET_SERVER_add_handlers (server, handlers);
3392   core_connect_task (NULL, NULL);
3393   GNUNET_SCHEDULER_add_delayed (sched,
3394                                 GNUNET_YES,
3395                                 GNUNET_SCHEDULER_PRIORITY_IDLE,
3396                                 GNUNET_SCHEDULER_NO_TASK,
3397                                 GNUNET_TIME_UNIT_FOREVER_REL,
3398                                 &shutdown_task,
3399                                 NULL);
3400 }
3401
3402
3403 /**
3404  * The main function for the fs service.
3405  *
3406  * @param argc number of arguments from the command line
3407  * @param argv command line arguments
3408  * @return 0 ok, 1 on error
3409  */
3410 int
3411 main (int argc, char *const *argv)
3412 {
3413   return (GNUNET_OK ==
3414           GNUNET_SERVICE_run (argc,
3415                               argv,
3416                               "fs", &run, NULL, NULL, NULL)) ? 0 : 1;
3417 }
3418
3419 /* end of gnunet-service-fs.c */