updates to peerinfo to use new nc API
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs.c
23  * @brief program that provides the file-sharing service
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - fix gazillion of minor FIXME's
28  * - possible major issue: we may queue "gazillions" of (K|S)Blocks for the
29  *   core to transmit to another peer; need to make sure this is bounded overall...
30  * - randomly delay processing for improved anonymity (can wait)
31  * - content migration (put in local DS) (can wait)
32  * - handle some special cases when forwarding replies based on tracked requests (can wait)
33  * - tracking of success correlations for hot-path routing (can wait)
34  * - various load-based actions (can wait)
35  * - validation of KSBLOCKS (can wait)
36  * - remove on-demand blocks if they keep failing (can wait)
37  * - check that we decrement PIDs always where necessary (can wait)
38  * - find out how to use core-pulling instead of pushing (at least for some cases)
39  */
40 #include "platform.h"
41 #include <float.h>
42 #include "gnunet_core_service.h"
43 #include "gnunet_datastore_service.h"
44 #include "gnunet_peer_lib.h"
45 #include "gnunet_protocols.h"
46 #include "gnunet_signatures.h"
47 #include "gnunet_util_lib.h"
48 #include "fs.h"
49
50 #define DEBUG_FS GNUNET_NO
51
52
53 /**
54  * In-memory information about indexed files (also available
55  * on-disk).
56  */
57 struct IndexInfo
58 {
59   
60   /**
61    * This is a linked list.
62    */
63   struct IndexInfo *next;
64
65   /**
66    * Name of the indexed file.  Memory allocated
67    * at the end of this struct (do not free).
68    */
69   const char *filename;
70
71   /**
72    * Context for transmitting confirmation to client,
73    * NULL if we've done this already.
74    */
75   struct GNUNET_SERVER_TransmitContext *tc;
76   
77   /**
78    * Hash of the contents of the file.
79    */
80   GNUNET_HashCode file_id;
81
82 };
83
84
85 /**
86  * Signature of a function that is called whenever a datastore
87  * request can be processed (or an entry put on the queue times out).
88  *
89  * @param cls closure
90  * @param ok GNUNET_OK if DS is ready, GNUNET_SYSERR on timeout
91  */
92 typedef void (*RequestFunction)(void *cls,
93                                 int ok);
94
95
96 /**
97  * Doubly-linked list of our requests for the datastore.
98  */
99 struct DatastoreRequestQueue
100 {
101
102   /**
103    * This is a doubly-linked list.
104    */
105   struct DatastoreRequestQueue *next;
106
107   /**
108    * This is a doubly-linked list.
109    */
110   struct DatastoreRequestQueue *prev;
111
112   /**
113    * Function to call (will issue the request).
114    */
115   RequestFunction req;
116
117   /**
118    * Closure for req.
119    */
120   void *req_cls;
121
122   /**
123    * When should this request time-out because we don't care anymore?
124    */
125   struct GNUNET_TIME_Absolute timeout;
126     
127   /**
128    * ID of task used for signaling timeout.
129    */
130   GNUNET_SCHEDULER_TaskIdentifier task;
131
132 };
133
134
135 /**
136  * Closure for processing START_SEARCH messages from a client.
137  */
138 struct LocalGetContext
139 {
140
141   /**
142    * This is a doubly-linked list.
143    */
144   struct LocalGetContext *next;
145
146   /**
147    * This is a doubly-linked list.
148    */
149   struct LocalGetContext *prev;
150
151   /**
152    * Client that initiated the search.
153    */
154   struct GNUNET_SERVER_Client *client;
155
156   /**
157    * Array of results that we've already received 
158    * (can be NULL).
159    */
160   GNUNET_HashCode *results; 
161
162   /**
163    * Bloomfilter over all results (for fast query construction);
164    * NULL if we don't have any results.
165    *
166    * FIXME: this member is not used, is that OK? If so, it should
167    * be removed!
168    */
169   struct GNUNET_CONTAINER_BloomFilter *results_bf; 
170
171   /**
172    * DS request associated with this operation.
173    */
174   struct DatastoreRequestQueue *req;
175
176   /**
177    * Current result message to transmit to client (or NULL).
178    */
179   struct ContentMessage *result;
180   
181   /**
182    * Type of the content that we're looking for.
183    * 0 for any.
184    */
185   uint32_t type;
186
187   /**
188    * Desired anonymity level.
189    */
190   uint32_t anonymity_level;
191
192   /**
193    * Number of results actually stored in the results array.
194    */
195   unsigned int results_used;
196   
197   /**
198    * Size of the results array in memory.
199    */
200   unsigned int results_size;
201   
202   /**
203    * Size (in bytes) of the 'results_bf' bloomfilter.
204    *
205    * FIXME: this member is not used, is that OK? If so, it should
206    * be removed!
207    */
208   size_t results_bf_size;
209
210   /**
211    * If the request is for a DBLOCK or IBLOCK, this is the identity of
212    * the peer that is known to have a response.  Set to all-zeros if
213    * such a target is not known (note that even if OUR anonymity
214    * level is >0 we may happen to know the responder's identity;
215    * nevertheless, we should probably not use it for a DHT-lookup
216    * or similar blunt actions in order to avoid exposing ourselves).
217    */
218   struct GNUNET_PeerIdentity target;
219
220   /**
221    * If the request is for an SBLOCK, this is the identity of the
222    * pseudonym to which the SBLOCK belongs. 
223    */
224   GNUNET_HashCode namespace;
225
226   /**
227    * Hash of the keyword (aka query) for KBLOCKs; Hash of
228    * the CHK-encoded block for DBLOCKS and IBLOCKS (aka query)
229    * and hash of the identifier XORed with the target for
230    * SBLOCKS (aka query).
231    */
232   GNUNET_HashCode query;
233
234 };
235
236
237 /**
238  * Possible routing policies for an FS-GET request.
239  */
240 enum RoutingPolicy
241   {
242     /**
243      * Simply drop the request.
244      */
245     ROUTING_POLICY_NONE = 0,
246     
247     /**
248      * Answer it if we can from local datastore.
249      */
250     ROUTING_POLICY_ANSWER = 1,
251
252     /**
253      * Forward the request to other peers (if possible).
254      */
255     ROUTING_POLICY_FORWARD = 2,
256
257     /**
258      * Forward to other peers, and ask them to route
259      * the response via ourselves.
260      */
261     ROUTING_POLICY_INDIRECT = 6,
262     
263     /**
264      * Do everything we could possibly do (that would
265      * make sense).
266      */
267     ROUTING_POLICY_ALL = 7
268   };
269
270
271 /**
272  * Internal context we use for our initial processing
273  * of a GET request.
274  */
275 struct ProcessGetContext
276 {
277   /**
278    * The search query (used for datastore lookup).
279    */
280   GNUNET_HashCode query;
281   
282   /**
283    * Which peer we should forward the response to.
284    */
285   struct GNUNET_PeerIdentity reply_to;
286
287   /**
288    * Namespace for the result (only set for SKS requests)
289    */
290   GNUNET_HashCode namespace;
291
292   /**
293    * Peer that we should forward the query to if possible
294    * (since that peer likely has the content).
295    */
296   struct GNUNET_PeerIdentity prime_target;
297
298   /**
299    * When did we receive this request?
300    */
301   struct GNUNET_TIME_Absolute start_time;
302
303   /**
304    * Our entry in the DRQ (non-NULL while we wait for our
305    * turn to interact with the local database).
306    */
307   struct DatastoreRequestQueue *drq;
308
309   /**
310    * Filter used to eliminate duplicate results.  Can be NULL if we
311    * are not yet filtering any results.
312    */
313   struct GNUNET_CONTAINER_BloomFilter *bf;
314
315   /**
316    * Bitmap describing which of the optional
317    * hash codes / peer identities were given to us.
318    */
319   uint32_t bm;
320
321   /**
322    * Desired block type.
323    */
324   uint32_t type;
325
326   /**
327    * Priority of the request.
328    */
329   uint32_t priority;
330
331   /**
332    * Size of the 'bf' (in bytes).
333    */
334   size_t bf_size;
335
336   /**
337    * In what ways are we going to process
338    * the request?
339    */
340   enum RoutingPolicy policy;
341
342   /**
343    * Time-to-live for the request (value
344    * we use).
345    */
346   int32_t ttl;
347
348   /**
349    * Number to mingle hashes for bloom-filter
350    * tests with.
351    */
352   int32_t mingle;
353
354   /**
355    * Number of results that were found so far.
356    */
357   unsigned int results_found;
358 };
359
360
361 /**
362  * Information we keep for each pending reply.
363  */
364 struct PendingReply
365 {
366   /**
367    * This is a linked list.
368    */
369   struct PendingReply *next;
370
371   /**
372    * Size of the reply; actual reply message follows
373    * at the end of this struct.
374    */
375   size_t msize;
376
377 };
378
379
380 /**
381  * All requests from a client are kept in a doubly-linked list.
382  */
383 struct ClientRequestList;
384
385
386 /**
387  * Information we keep for each pending request.  We should try to
388  * keep this struct as small as possible since its memory consumption
389  * is key to how many requests we can have pending at once.
390  */
391 struct PendingRequest
392 {
393
394   /**
395    * ID of a client making a request, NULL if this entry is for a
396    * peer.
397    */
398   struct GNUNET_SERVER_Client *client;
399
400   /**
401    * If this request was made by a client,
402    * this is our entry in the client request
403    * list; otherwise NULL.
404    */
405   struct ClientRequestList *crl_entry;
406
407   /**
408    * If this is a namespace query, pointer to the hash of the public
409    * key of the namespace; otherwise NULL.
410    */
411   GNUNET_HashCode *namespace;
412
413   /**
414    * Bloomfilter we use to filter out replies that we don't care about
415    * (anymore).  NULL as long as we are interested in all replies.
416    */
417   struct GNUNET_CONTAINER_BloomFilter *bf;
418
419   /**
420    * Context of our GNUNET_CORE_peer_change_preference call.
421    */
422   struct GNUNET_CORE_InformationRequestContext *irc;
423
424   /**
425    * Replies that we have received but were unable to forward yet
426    * (typically non-null only if we have a pending transmission
427    * request with the client or the respective peer).
428    */
429   struct PendingReply *replies_pending;
430
431   /**
432    * Pending transmission request with the core service for the target
433    * peer (for processing of 'replies_pending') or Handle for a
434    * pending query-request for P2P-transmission with the core service.
435    * If non-NULL, this request must be cancelled should this struct be
436    * destroyed!
437    */
438   struct GNUNET_CORE_TransmitHandle *cth;
439
440   /**
441    * Pending transmission request for the target client (for processing of
442    * 'replies_pending').
443    */
444   struct GNUNET_CONNECTION_TransmitHandle *th;
445
446   /**
447    * Hash code of all replies that we have seen so far (only valid
448    * if client is not NULL since we only track replies like this for
449    * our own clients).
450    */
451   GNUNET_HashCode *replies_seen;
452
453   /**
454    * Node in the heap representing this entry.
455    */
456   struct GNUNET_CONTAINER_HeapNode *hnode;
457
458   /**
459    * When did we first see this request (form this peer), or, if our
460    * client is initiating, when did we last initiate a search?
461    */
462   struct GNUNET_TIME_Absolute start_time;
463
464   /**
465    * The query that this request is for.
466    */
467   GNUNET_HashCode query;
468
469   /**
470    * The task responsible for transmitting queries
471    * for this request.
472    */
473   GNUNET_SCHEDULER_TaskIdentifier task;
474
475   /**
476    * (Interned) Peer identifier (only valid if "client" is NULL)
477    * that identifies a peer that gave us this request.
478    */
479   GNUNET_PEER_Id source_pid;
480
481   /**
482    * (Interned) Peer identifier that identifies a preferred target
483    * for requests.
484    */
485   GNUNET_PEER_Id target_pid;
486
487   /**
488    * (Interned) Peer identifiers of peers that have already
489    * received our query for this content.
490    */
491   GNUNET_PEER_Id *used_pids;
492
493   /**
494    * Size of the 'bf' (in bytes).
495    */
496   size_t bf_size;
497
498   /**
499    * Desired anonymity level; only valid for requests from a local client.
500    */
501   uint32_t anonymity_level;
502
503   /**
504    * How many entries in "used_pids" are actually valid?
505    */
506   unsigned int used_pids_off;
507
508   /**
509    * How long is the "used_pids" array?
510    */
511   unsigned int used_pids_size;
512
513   /**
514    * How many entries in "replies_seen" are actually valid?
515    */
516   unsigned int replies_seen_off;
517
518   /**
519    * How long is the "replies_seen" array?
520    */
521   unsigned int replies_seen_size;
522   
523   /**
524    * Priority with which this request was made.  If one of our clients
525    * made the request, then this is the current priority that we are
526    * using when initiating the request.  This value is used when
527    * we decide to reward other peers with trust for providing a reply.
528    */
529   uint32_t priority;
530
531   /**
532    * Priority points left for us to spend when forwarding this request
533    * to other peers.
534    */
535   uint32_t remaining_priority;
536
537   /**
538    * Number to mingle hashes for bloom-filter
539    * tests with.
540    */
541   int32_t mingle;
542
543   /**
544    * TTL with which we saw this request (or, if we initiated, TTL that
545    * we used for the request).
546    */
547   int32_t ttl;
548   
549   /**
550    * Type of the content that this request is for.
551    */
552   uint32_t type;
553
554 };
555
556
557 /**
558  * All requests from a client are kept in a doubly-linked list.
559  */
560 struct ClientRequestList
561 {
562   /**
563    * This is a doubly-linked list.
564    */
565   struct ClientRequestList *next;
566
567   /**
568    * This is a doubly-linked list.
569    */ 
570   struct ClientRequestList *prev;
571
572   /**
573    * A request from this client.
574    */
575   struct PendingRequest *req;
576
577   /**
578    * Client list with the head and tail of this DLL.
579    */
580   struct ClientList *cl;
581 };
582
583
584 /**
585  * Linked list of all clients that we are 
586  * currently processing requests for.
587  */
588 struct ClientList
589 {
590
591   /**
592    * This is a linked list.
593    */
594   struct ClientList *next;
595
596   /**
597    * What client is this entry for?
598    */
599   struct GNUNET_SERVER_Client* client;
600
601   /**
602    * Head of the DLL of requests from this client.
603    */
604   struct ClientRequestList *head;
605
606   /**
607    * Tail of the DLL of requests from this client.
608    */
609   struct ClientRequestList *tail;
610
611 };
612
613
614 /**
615  * Closure for "process_reply" function.
616  */
617 struct ProcessReplyClosure
618 {
619   /**
620    * The data for the reply.
621    */
622   const void *data;
623
624   /**
625    * When the reply expires.
626    */
627   struct GNUNET_TIME_Absolute expiration;
628
629   /**
630    * Size of data.
631    */
632   size_t size;
633
634   /**
635    * Namespace that this reply belongs to
636    * (if it is of type SBLOCK).
637    */
638   GNUNET_HashCode namespace;
639
640   /**
641    * Type of the block.
642    */
643   uint32_t type;
644
645   /**
646    * How much was this reply worth to us?
647    */
648   uint32_t priority;
649 };
650
651
652 /**
653  * Information about a peer that we are connected to.
654  * We track data that is useful for determining which
655  * peers should receive our requests.
656  */
657 struct ConnectedPeer
658 {
659
660   /**
661    * List of the last clients for which this peer
662    * successfully answered a query. 
663    */
664   struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
665
666   /**
667    * List of the last PIDs for which
668    * this peer successfully answered a query;
669    * We use 0 to indicate no successful reply.
670    */
671   GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
672
673   /**
674    * Average delay between sending the peer a request and
675    * getting a reply (only calculated over the requests for
676    * which we actually got a reply).   Calculated
677    * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
678    */ 
679   struct GNUNET_TIME_Relative avg_delay;
680
681   /**
682    * Average priority of successful replies.  Calculated
683    * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
684    */
685   double avg_priority;
686
687   /**
688    * The peer's identity.
689    */
690   GNUNET_PEER_Id pid;  
691
692   /**
693    * Number of requests we have currently pending
694    * with this peer (that is, requests that were
695    * transmitted so recently that we would not retransmit
696    * them right now).
697    */
698   unsigned int pending_requests;
699
700   /**
701    * Which offset in "last_p2p_replies" will be updated next?
702    * (we go round-robin).
703    */
704   unsigned int last_p2p_replies_woff;
705
706   /**
707    * Which offset in "last_client_replies" will be updated next?
708    * (we go round-robin).
709    */
710   unsigned int last_client_replies_woff;
711
712 };
713
714
715 /**
716  * Our connection to the datastore.
717  */
718 static struct GNUNET_DATASTORE_Handle *dsh;
719
720 /**
721  * Our scheduler.
722  */
723 static struct GNUNET_SCHEDULER_Handle *sched;
724
725 /**
726  * Our configuration.
727  */
728 const struct GNUNET_CONFIGURATION_Handle *cfg;
729
730 /**
731  * Handle to the core service (NULL until we've connected to it).
732  */
733 struct GNUNET_CORE_Handle *core;
734
735 /**
736  * Head of doubly-linked LGC list.
737  */
738 static struct LocalGetContext *lgc_head;
739
740 /**
741  * Tail of doubly-linked LGC list.
742  */
743 static struct LocalGetContext *lgc_tail;
744
745 /**
746  * Head of request queue for the datastore, sorted by timeout.
747  */
748 static struct DatastoreRequestQueue *drq_head;
749
750 /**
751  * Tail of request queue for the datastore.
752  */
753 static struct DatastoreRequestQueue *drq_tail;
754
755 /**
756  * Linked list of indexed files.
757  */
758 static struct IndexInfo *indexed_files;
759
760 /**
761  * Maps hash over content of indexed files to the respective filename.
762  * The filenames are pointers into the indexed_files linked list and
763  * do not need to be freed.
764  */
765 static struct GNUNET_CONTAINER_MultiHashMap *ifm;
766
767 /**
768  * Map of query hash codes to requests.
769  */
770 static struct GNUNET_CONTAINER_MultiHashMap *requests_by_query;
771
772 /**
773  * Map of peer IDs to requests (for those requests coming
774  * from other peers).
775  */
776 static struct GNUNET_CONTAINER_MultiHashMap *requests_by_peer;
777
778 /**
779  * Linked list of all of our clients and their requests.
780  */
781 static struct ClientList *clients;
782
783 /**
784  * Heap with the request that will expire next at the top.  Contains
785  * pointers of type "struct PendingRequest*"; these will *also* be
786  * aliased from the "requests_by_peer" data structures and the
787  * "requests_by_query" table.  Note that requests from our clients
788  * don't expire and are thus NOT in the "requests_by_expiration"
789  * (or the "requests_by_peer" tables).
790  */
791 static struct GNUNET_CONTAINER_Heap *requests_by_expiration;
792
793 /**
794  * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
795  */
796 static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
797
798 /**
799  * Maximum number of requests (from other peers) that we're
800  * willing to have pending at any given point in time.
801  * FIXME: set from configuration (and 32 is a tiny value for testing only).
802  */
803 static uint64_t max_pending_requests = 32;
804
805
806 /**
807  * Write the current index information list to disk.
808  */ 
809 static void
810 write_index_list ()
811 {
812   struct GNUNET_BIO_WriteHandle *wh;
813   char *fn;
814   struct IndexInfo *pos;  
815
816   if (GNUNET_OK !=
817       GNUNET_CONFIGURATION_get_value_filename (cfg,
818                                                "FS",
819                                                "INDEXDB",
820                                                &fn))
821     {
822       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
823                   _("Configuration option `%s' in section `%s' missing.\n"),
824                   "INDEXDB",
825                   "FS");
826       return;
827     }
828   wh = GNUNET_BIO_write_open (fn);
829   if (NULL == wh)
830     {
831       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
832                   _("Could not open `%s'.\n"),
833                   fn);
834       GNUNET_free (fn);
835       return;
836     }
837   pos = indexed_files;
838   while (pos != NULL)
839     {
840       if ( (GNUNET_OK !=
841             GNUNET_BIO_write (wh,
842                               &pos->file_id,
843                               sizeof (GNUNET_HashCode))) ||
844            (GNUNET_OK !=
845             GNUNET_BIO_write_string (wh,
846                                      pos->filename)) )
847         break;
848       pos = pos->next;
849     }
850   if (GNUNET_OK != 
851       GNUNET_BIO_write_close (wh))
852     {
853       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
854                   _("Error writing `%s'.\n"),
855                   fn);
856       GNUNET_free (fn);
857       return;
858     }
859   GNUNET_free (fn);
860 }
861
862
863 /**
864  * Read index information from disk.
865  */
866 static void
867 read_index_list ()
868 {
869   struct GNUNET_BIO_ReadHandle *rh;
870   char *fn;
871   struct IndexInfo *pos;  
872   char *fname;
873   GNUNET_HashCode hc;
874   size_t slen;
875   char *emsg;
876
877   if (GNUNET_OK !=
878       GNUNET_CONFIGURATION_get_value_filename (cfg,
879                                                "FS",
880                                                "INDEXDB",
881                                                &fn))
882     {
883       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
884                   _("Configuration option `%s' in section `%s' missing.\n"),
885                   "INDEXDB",
886                   "FS");
887       return;
888     }
889   if (GNUNET_NO == GNUNET_DISK_file_test (fn))
890     {
891       /* no index info  yet */
892       GNUNET_free (fn);
893       return;
894     }
895   rh = GNUNET_BIO_read_open (fn);
896   if (NULL == rh)
897     {
898       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
899                   _("Could not open `%s'.\n"),
900                   fn);
901       GNUNET_free (fn);
902       return;
903     }
904
905   while ( (GNUNET_OK ==
906            GNUNET_BIO_read (rh,
907                             "Hash of indexed file",
908                             &hc,
909                             sizeof (GNUNET_HashCode))) &&
910           (GNUNET_OK ==
911            GNUNET_BIO_read_string (rh, 
912                                    "Name of indexed file",
913                                    &fname,
914                                    1024 * 16)) )
915     {
916       slen = strlen (fname) + 1;
917       pos = GNUNET_malloc (sizeof (struct IndexInfo) + slen);
918       pos->file_id = hc;
919       pos->filename = (const char *) &pos[1];
920       memcpy (&pos[1], fname, slen);
921       if (GNUNET_SYSERR ==
922           GNUNET_CONTAINER_multihashmap_put (ifm,
923                                              &hc,
924                                              (void*) pos->filename,
925                                              GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
926         {
927           GNUNET_free (pos);
928         }
929       else
930         {
931           pos->next = indexed_files;
932           indexed_files = pos;
933         }
934       GNUNET_free (fname);
935     }
936   if (GNUNET_OK != 
937       GNUNET_BIO_read_close (rh, &emsg))
938     GNUNET_free (emsg);
939   GNUNET_free (fn);
940 }
941
942
943 /**
944  * We've validated the hash of the file we're about to
945  * index.  Signal success to the client and update
946  * our internal data structures.
947  *
948  * @param ii the index info entry for the request
949  */
950 static void
951 signal_index_ok (struct IndexInfo *ii)
952 {
953   if (GNUNET_SYSERR ==
954       GNUNET_CONTAINER_multihashmap_put (ifm,
955                                          &ii->file_id,
956                                          (void*) ii->filename,
957                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
958     {
959       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
960                   _("Index request received for file `%s' is indexed as `%s'.  Permitting anyway.\n"),
961                   ii->filename,
962                   (const char*) GNUNET_CONTAINER_multihashmap_get (ifm,
963                                                                    &ii->file_id));
964       GNUNET_SERVER_transmit_context_append (ii->tc,
965                                              NULL, 0,
966                                              GNUNET_MESSAGE_TYPE_FS_INDEX_START_OK);
967       GNUNET_SERVER_transmit_context_run (ii->tc,
968                                           GNUNET_TIME_UNIT_MINUTES);
969       GNUNET_free (ii);
970       return;
971     }
972   ii->next = indexed_files;
973   indexed_files = ii;
974   write_index_list ();
975   GNUNET_SERVER_transmit_context_append (ii->tc,
976                                          NULL, 0,
977                                          GNUNET_MESSAGE_TYPE_FS_INDEX_START_OK);
978   GNUNET_SERVER_transmit_context_run (ii->tc,
979                                       GNUNET_TIME_UNIT_MINUTES);
980   ii->tc = NULL;
981 }
982
983
984 /**
985  * Function called once the hash computation over an
986  * indexed file has completed.
987  *
988  * @param cls closure, our publishing context
989  * @param res resulting hash, NULL on error
990  */
991 static void 
992 hash_for_index_val (void *cls,
993                     const GNUNET_HashCode *
994                     res)
995 {
996   struct IndexInfo *ii = cls;
997   
998   if ( (res == NULL) ||
999        (0 != memcmp (res,
1000                      &ii->file_id,
1001                      sizeof(GNUNET_HashCode))) )
1002     {
1003       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1004                   _("Hash mismatch trying to index file `%s' which has hash `%s'\n"),
1005                   ii->filename,
1006                   GNUNET_h2s (res));
1007 #if DEBUG_FS
1008       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1009                   "Wanted `%s'\n",
1010                   GNUNET_h2s (&ii->file_id));
1011 #endif
1012       GNUNET_SERVER_transmit_context_append (ii->tc,
1013                                              NULL, 0,
1014                                              GNUNET_MESSAGE_TYPE_FS_INDEX_START_FAILED);
1015       GNUNET_SERVER_transmit_context_run (ii->tc,
1016                                           GNUNET_TIME_UNIT_MINUTES);
1017       GNUNET_free (ii);
1018       return;
1019     }
1020   signal_index_ok (ii);
1021 }
1022
1023
1024 /**
1025  * Handle INDEX_START-message.
1026  *
1027  * @param cls closure
1028  * @param client identification of the client
1029  * @param message the actual message
1030  */
1031 static void
1032 handle_index_start (void *cls,
1033                     struct GNUNET_SERVER_Client *client,
1034                     const struct GNUNET_MessageHeader *message)
1035 {
1036   const struct IndexStartMessage *ism;
1037   const char *fn;
1038   uint16_t msize;
1039   struct IndexInfo *ii;
1040   size_t slen;
1041   uint32_t dev;
1042   uint64_t ino;
1043   uint32_t mydev;
1044   uint64_t myino;
1045
1046   msize = ntohs(message->size);
1047   if ( (msize <= sizeof (struct IndexStartMessage)) ||
1048        ( ((const char *)message)[msize-1] != '\0') )
1049     {
1050       GNUNET_break (0);
1051       GNUNET_SERVER_receive_done (client,
1052                                   GNUNET_SYSERR);
1053       return;
1054     }
1055   ism = (const struct IndexStartMessage*) message;
1056   fn = (const char*) &ism[1];
1057   dev = ntohl (ism->device);
1058   ino = GNUNET_ntohll (ism->inode);
1059   ism = (const struct IndexStartMessage*) message;
1060   slen = strlen (fn) + 1;
1061   ii = GNUNET_malloc (sizeof (struct IndexInfo) + slen);
1062   ii->filename = (const char*) &ii[1];
1063   memcpy (&ii[1], fn, slen);
1064   ii->file_id = ism->file_id;  
1065   ii->tc = GNUNET_SERVER_transmit_context_create (client);
1066   if ( ( (dev != 0) ||
1067          (ino != 0) ) &&
1068        (GNUNET_OK == GNUNET_DISK_file_get_identifiers (fn,
1069                                                        &mydev,
1070                                                        &myino)) &&
1071        ( (dev == mydev) &&
1072          (ino == myino) ) )
1073     {      
1074       /* fast validation OK! */
1075       signal_index_ok (ii);
1076       return;
1077     }
1078 #if DEBUG_FS
1079   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1080               "Mismatch in file identifiers (%llu != %llu or %u != %u), need to hash.\n",
1081               (unsigned long long) ino,
1082               (unsigned long long) myino,
1083               (unsigned int) dev,
1084               (unsigned int) mydev);
1085 #endif
1086   /* slow validation, need to hash full file (again) */
1087   GNUNET_CRYPTO_hash_file (sched,
1088                            GNUNET_SCHEDULER_PRIORITY_IDLE,
1089                            fn,
1090                            HASHING_BLOCKSIZE,
1091                            &hash_for_index_val,
1092                            ii);
1093 }
1094
1095
1096 /**
1097  * Handle INDEX_LIST_GET-message.
1098  *
1099  * @param cls closure
1100  * @param client identification of the client
1101  * @param message the actual message
1102  */
1103 static void
1104 handle_index_list_get (void *cls,
1105                        struct GNUNET_SERVER_Client *client,
1106                        const struct GNUNET_MessageHeader *message)
1107 {
1108   struct GNUNET_SERVER_TransmitContext *tc;
1109   struct IndexInfoMessage *iim;
1110   char buf[GNUNET_SERVER_MAX_MESSAGE_SIZE];
1111   size_t slen;
1112   const char *fn;
1113   struct GNUNET_MessageHeader *msg;
1114   struct IndexInfo *pos;
1115
1116   tc = GNUNET_SERVER_transmit_context_create (client);
1117   iim = (struct IndexInfoMessage*) buf;
1118   msg = &iim->header;
1119   pos = indexed_files;
1120   while (NULL != pos)
1121     {
1122       iim->reserved = 0;
1123       iim->file_id = pos->file_id;
1124       fn = pos->filename;
1125       slen = strlen (fn) + 1;
1126       if (slen + sizeof (struct IndexInfoMessage) > 
1127           GNUNET_SERVER_MAX_MESSAGE_SIZE)
1128         {
1129           GNUNET_break (0);
1130           break;
1131         }
1132       memcpy (&iim[1], fn, slen);
1133       GNUNET_SERVER_transmit_context_append
1134         (tc,
1135          &msg[1],
1136          sizeof (struct IndexInfoMessage) 
1137          - sizeof (struct GNUNET_MessageHeader) + slen,
1138          GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_ENTRY);
1139       pos = pos->next;
1140     }
1141   GNUNET_SERVER_transmit_context_append (tc,
1142                                          NULL, 0,
1143                                          GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_END);
1144   GNUNET_SERVER_transmit_context_run (tc,
1145                                       GNUNET_TIME_UNIT_MINUTES);
1146 }
1147
1148
1149 /**
1150  * Handle UNINDEX-message.
1151  *
1152  * @param cls closure
1153  * @param client identification of the client
1154  * @param message the actual message
1155  */
1156 static void
1157 handle_unindex (void *cls,
1158                 struct GNUNET_SERVER_Client *client,
1159                 const struct GNUNET_MessageHeader *message)
1160 {
1161   const struct UnindexMessage *um;
1162   struct IndexInfo *pos;
1163   struct IndexInfo *prev;
1164   struct IndexInfo *next;
1165   struct GNUNET_SERVER_TransmitContext *tc;
1166   int found;
1167   
1168   um = (const struct UnindexMessage*) message;
1169   found = GNUNET_NO;
1170   prev = NULL;
1171   pos = indexed_files;
1172   while (NULL != pos)
1173     {
1174       next = pos->next;
1175       if (0 == memcmp (&pos->file_id,
1176                        &um->file_id,
1177                        sizeof (GNUNET_HashCode)))
1178         {
1179           if (prev == NULL)
1180             indexed_files = next;
1181           else
1182             prev->next = next;
1183           GNUNET_free (pos);
1184           found = GNUNET_YES;
1185         }
1186       else
1187         {
1188           prev = pos;
1189         }
1190       pos = next;
1191     }
1192 #if DEBUG_FS
1193   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1194               "Client requested unindexing of file `%s': %s\n",
1195               GNUNET_h2s (&um->file_id),
1196               found ? "found" : "not found");
1197 #endif
1198   if (GNUNET_YES == found)    
1199     write_index_list ();
1200   tc = GNUNET_SERVER_transmit_context_create (client);
1201   GNUNET_SERVER_transmit_context_append (tc,
1202                                          NULL, 0,
1203                                          GNUNET_MESSAGE_TYPE_FS_UNINDEX_OK);
1204   GNUNET_SERVER_transmit_context_run (tc,
1205                                       GNUNET_TIME_UNIT_MINUTES);
1206 }
1207
1208
1209 /**
1210  * Run the next DS request in our
1211  * queue, we're done with the current one.
1212  */
1213 static void
1214 next_ds_request ()
1215 {
1216   struct DatastoreRequestQueue *e;
1217   
1218   while (NULL != (e = drq_head))
1219     {
1220       if (0 != GNUNET_TIME_absolute_get_remaining (e->timeout).value)
1221         break;
1222       if (e->task != GNUNET_SCHEDULER_NO_TASK)
1223         GNUNET_SCHEDULER_cancel (sched, e->task);
1224       GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
1225       e->req (e->req_cls, GNUNET_NO);
1226       GNUNET_free (e);  
1227     }
1228   if (e == NULL)
1229     return;
1230   if (e->task != GNUNET_SCHEDULER_NO_TASK)
1231     GNUNET_SCHEDULER_cancel (sched, e->task);
1232   e->task = GNUNET_SCHEDULER_NO_TASK;
1233   e->req (e->req_cls, GNUNET_YES);
1234   GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
1235   GNUNET_free (e);  
1236 }
1237
1238
1239 /**
1240  * A datastore request had to be timed out. 
1241  *
1242  * @param cls closure (of type "struct DatastoreRequestQueue*")
1243  * @param tc task context, unused
1244  */
1245 static void
1246 timeout_ds_request (void *cls,
1247                     const struct GNUNET_SCHEDULER_TaskContext *tc)
1248 {
1249   struct DatastoreRequestQueue *e = cls;
1250
1251   e->task = GNUNET_SCHEDULER_NO_TASK;
1252   GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
1253   e->req (e->req_cls, GNUNET_NO);
1254   GNUNET_free (e);  
1255 }
1256
1257
1258 /**
1259  * Queue a request for the datastore.
1260  *
1261  * @param deadline by when the request should run
1262  * @param fun function to call once the request can be run
1263  * @param fun_cls closure for fun
1264  */
1265 static struct DatastoreRequestQueue *
1266 queue_ds_request (struct GNUNET_TIME_Relative deadline,
1267                   RequestFunction fun,
1268                   void *fun_cls)
1269 {
1270   struct DatastoreRequestQueue *e;
1271   struct DatastoreRequestQueue *bef;
1272
1273   if (drq_head == NULL)
1274     {
1275       /* no other requests pending, run immediately */
1276       fun (fun_cls, GNUNET_OK);
1277       return NULL;
1278     }
1279   e = GNUNET_malloc (sizeof (struct DatastoreRequestQueue));
1280   e->timeout = GNUNET_TIME_relative_to_absolute (deadline);
1281   e->req = fun;
1282   e->req_cls = fun_cls;
1283   if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
1284     {
1285       /* local request, highest prio, put at head of queue
1286          regardless of deadline */
1287       bef = NULL;
1288     }
1289   else
1290     {
1291       bef = drq_tail;
1292       while ( (NULL != bef) &&
1293               (e->timeout.value < bef->timeout.value) )
1294         bef = bef->prev;
1295     }
1296   GNUNET_CONTAINER_DLL_insert_after (drq_head, drq_tail, bef, e);
1297   if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
1298     return e;
1299   e->task = GNUNET_SCHEDULER_add_delayed (sched,
1300                                           deadline,
1301                                           &timeout_ds_request,
1302                                           e);
1303   return e;                                    
1304 }
1305
1306
1307 /**
1308  * Free the state associated with a local get context.
1309  *
1310  * @param lgc the lgc to free
1311  */
1312 static void
1313 local_get_context_free (struct LocalGetContext *lgc) 
1314 {
1315   GNUNET_CONTAINER_DLL_remove (lgc_head, lgc_tail, lgc);
1316   GNUNET_SERVER_client_drop (lgc->client); 
1317   GNUNET_free_non_null (lgc->results);
1318   if (lgc->results_bf != NULL)
1319     GNUNET_CONTAINER_bloomfilter_free (lgc->results_bf);
1320   if (lgc->req != NULL)
1321     {
1322       if (lgc->req->task != GNUNET_SCHEDULER_NO_TASK)
1323         GNUNET_SCHEDULER_cancel (sched, lgc->req->task);
1324       GNUNET_CONTAINER_DLL_remove (lgc_head, lgc_tail, lgc);
1325       GNUNET_free (lgc->req);
1326     }
1327   GNUNET_free (lgc);
1328 }
1329
1330
1331 /**
1332  * We're able to transmit the next (local) result to the client.
1333  * Do it and ask the datastore for more.  Or, on error, tell
1334  * the datastore to stop giving us more.
1335  *
1336  * @param cls our closure (struct LocalGetContext)
1337  * @param max maximum number of bytes we can transmit
1338  * @param buf where to copy our message
1339  * @return number of bytes copied to buf
1340  */
1341 static size_t
1342 transmit_local_result (void *cls,
1343                        size_t max,
1344                        void *buf)
1345 {
1346   struct LocalGetContext *lgc = cls;  
1347   uint16_t msize;
1348
1349   if (NULL == buf)
1350     {
1351 #if DEBUG_FS
1352       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1353                   "Failed to transmit result to local client, aborting datastore iteration.\n");
1354 #endif
1355       /* error, abort! */
1356       GNUNET_free (lgc->result);
1357       lgc->result = NULL;
1358       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
1359       return 0;
1360     }
1361   msize = ntohs (lgc->result->header.size);
1362 #if DEBUG_FS
1363   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1364               "Transmitting %u bytes of result to local client.\n",
1365               msize);
1366 #endif
1367   GNUNET_assert (max >= msize);
1368   memcpy (buf, lgc->result, msize);
1369   GNUNET_free (lgc->result);
1370   lgc->result = NULL;
1371   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1372   return msize;
1373 }
1374
1375
1376 /**
1377  * Continuation called from datastore's remove
1378  * function.
1379  *
1380  * @param cls unused
1381  * @param success did the deletion work?
1382  * @param msg error message
1383  */
1384 static void
1385 remove_cont (void *cls,
1386              int success,
1387              const char *msg)
1388 {
1389   if (GNUNET_OK != success)
1390     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1391                 _("Failed to delete bogus block: %s\n"),
1392                 msg);
1393   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1394 }
1395
1396
1397 /**
1398  * Mingle hash with the mingle_number to
1399  * produce different bits.
1400  */
1401 static void
1402 mingle_hash (const GNUNET_HashCode * in,
1403              int32_t mingle_number, 
1404              GNUNET_HashCode * hc)
1405 {
1406   GNUNET_HashCode m;
1407
1408   GNUNET_CRYPTO_hash (&mingle_number, 
1409                       sizeof (int32_t), 
1410                       &m);
1411   GNUNET_CRYPTO_hash_xor (&m, in, hc);
1412 }
1413
1414
1415 /**
1416  * We've received an on-demand encoded block
1417  * from the datastore.  Attempt to do on-demand
1418  * encoding and (if successful), call the 
1419  * continuation with the resulting block.  On
1420  * error, clean up and ask the datastore for
1421  * more results.
1422  *
1423  * @param key key for the content
1424  * @param size number of bytes in data
1425  * @param data content stored
1426  * @param type type of the content
1427  * @param priority priority of the content
1428  * @param anonymity anonymity-level for the content
1429  * @param expiration expiration time for the content
1430  * @param uid unique identifier for the datum;
1431  *        maybe 0 if no unique identifier is available
1432  * @param cont function to call with the actual block
1433  * @param cont_cls closure for cont
1434  */
1435 static void
1436 handle_on_demand_block (const GNUNET_HashCode * key,
1437                         uint32_t size,
1438                         const void *data,
1439                         uint32_t type,
1440                         uint32_t priority,
1441                         uint32_t anonymity,
1442                         struct GNUNET_TIME_Absolute
1443                         expiration, uint64_t uid,
1444                         GNUNET_DATASTORE_Iterator cont,
1445                         void *cont_cls)
1446 {
1447   const struct OnDemandBlock *odb;
1448   GNUNET_HashCode nkey;
1449   struct GNUNET_CRYPTO_AesSessionKey skey;
1450   struct GNUNET_CRYPTO_AesInitializationVector iv;
1451   GNUNET_HashCode query;
1452   ssize_t nsize;
1453   char ndata[DBLOCK_SIZE];
1454   char edata[DBLOCK_SIZE];
1455   const char *fn;
1456   struct GNUNET_DISK_FileHandle *fh;
1457   uint64_t off;
1458
1459   if (size != sizeof (struct OnDemandBlock))
1460     {
1461       GNUNET_break (0);
1462       GNUNET_DATASTORE_remove (dsh, 
1463                                key,
1464                                size,
1465                                data,
1466                                &remove_cont,
1467                                NULL,
1468                                GNUNET_TIME_UNIT_FOREVER_REL);     
1469       return;
1470     }
1471   odb = (const struct OnDemandBlock*) data;
1472   off = GNUNET_ntohll (odb->offset);
1473   fn = (const char*) GNUNET_CONTAINER_multihashmap_get (ifm,
1474                                                         &odb->file_id);
1475   fh = NULL;
1476   if ( (NULL == fn) ||
1477        (NULL == (fh = GNUNET_DISK_file_open (fn, 
1478                                              GNUNET_DISK_OPEN_READ,
1479                                              GNUNET_DISK_PERM_NONE))) ||
1480        (off !=
1481         GNUNET_DISK_file_seek (fh,
1482                                off,
1483                                GNUNET_DISK_SEEK_SET)) ||
1484        (-1 ==
1485         (nsize = GNUNET_DISK_file_read (fh,
1486                                         ndata,
1487                                         sizeof (ndata)))) )
1488     {
1489       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1490                   _("Could not access indexed file `%s' at offset %llu: %s\n"),
1491                   GNUNET_h2s (&odb->file_id),
1492                   (unsigned long long) off,
1493                   STRERROR (errno));
1494       if (fh != NULL)
1495         GNUNET_DISK_file_close (fh);
1496       /* FIXME: if this happens often, we need
1497          to remove the OnDemand block from the DS! */
1498       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);        
1499       return;
1500     }
1501   GNUNET_DISK_file_close (fh);
1502   GNUNET_CRYPTO_hash (ndata,
1503                       nsize,
1504                       &nkey);
1505   GNUNET_CRYPTO_hash_to_aes_key (&nkey, &skey, &iv);
1506   GNUNET_CRYPTO_aes_encrypt (ndata,
1507                              nsize,
1508                              &skey,
1509                              &iv,
1510                              edata);
1511   GNUNET_CRYPTO_hash (edata,
1512                       nsize,
1513                       &query);
1514   if (0 != memcmp (&query, 
1515                    key,
1516                    sizeof (GNUNET_HashCode)))
1517     {
1518       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1519                   _("Indexed file `%s' changed at offset %llu\n"),
1520                   fn,
1521                   (unsigned long long) off);
1522       /* FIXME: if this happens often, we need
1523          to remove the OnDemand block from the DS! */
1524       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1525       return;
1526     }
1527   cont (cont_cls,
1528         key,
1529         nsize,
1530         edata,
1531         GNUNET_DATASTORE_BLOCKTYPE_DBLOCK,
1532         priority,
1533         anonymity,
1534         expiration,
1535         uid);
1536 }
1537
1538
1539 /**
1540  * How many bytes should a bloomfilter be if we have already seen
1541  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
1542  * of bits set per entry.  Furthermore, we should not re-size the
1543  * filter too often (to keep it cheap).
1544  *
1545  * Since other peers will also add entries but not resize the filter,
1546  * we should generally pick a slightly larger size than what the
1547  * strict math would suggest.
1548  *
1549  * @return must be a power of two and smaller or equal to 2^15.
1550  */
1551 static size_t
1552 compute_bloomfilter_size (unsigned int entry_count)
1553 {
1554   size_t size;
1555   unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
1556   uint16_t max = 1 << 15;
1557
1558   if (entry_count > max)
1559     return max;
1560   size = 8;
1561   while ((size < max) && (size < ideal))
1562     size *= 2;
1563   if (size > max)
1564     return max;
1565   return size;
1566 }
1567
1568
1569 /**
1570  * Recalculate our bloom filter for filtering replies.
1571  *
1572  * @param count number of entries we are filtering right now
1573  * @param mingle set to our new mingling value
1574  * @param bf_size set to the size of the bloomfilter
1575  * @param entries the entries to filter
1576  * @return updated bloomfilter, NULL for none
1577  */
1578 static struct GNUNET_CONTAINER_BloomFilter *
1579 refresh_bloomfilter (unsigned int count,
1580                      int32_t * mingle,
1581                      size_t *bf_size,
1582                      const GNUNET_HashCode *entries)
1583 {
1584   struct GNUNET_CONTAINER_BloomFilter *bf;
1585   size_t nsize;
1586   unsigned int i;
1587   GNUNET_HashCode mhash;
1588
1589   if (0 == count)
1590     return NULL;
1591   nsize = compute_bloomfilter_size (count);
1592   *mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
1593   *bf_size = nsize;
1594   bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
1595                                           nsize,
1596                                           BLOOMFILTER_K);
1597   for (i=0;i<count;i++)
1598     {
1599       mingle_hash (&entries[i], *mingle, &mhash);
1600       GNUNET_CONTAINER_bloomfilter_add (bf, &mhash);
1601     }
1602   return bf;
1603 }
1604
1605
1606 /**
1607  * Closure used for "target_peer_select_cb".
1608  */
1609 struct PeerSelectionContext 
1610 {
1611   /**
1612    * The request for which we are selecting
1613    * peers.
1614    */
1615   struct PendingRequest *pr;
1616
1617   /**
1618    * Current "prime" target.
1619    */
1620   struct GNUNET_PeerIdentity target;
1621
1622   /**
1623    * How much do we like this target?
1624    */
1625   double target_score;
1626
1627 };
1628
1629
1630 /**
1631  * Function called for each connected peer to determine
1632  * which one(s) would make good targets for forwarding.
1633  *
1634  * @param cls closure (struct PeerSelectionContext)
1635  * @param key current key code (peer identity)
1636  * @param value value in the hash map (struct ConnectedPeer)
1637  * @return GNUNET_YES if we should continue to
1638  *         iterate,
1639  *         GNUNET_NO if not.
1640  */
1641 static int
1642 target_peer_select_cb (void *cls,
1643                        const GNUNET_HashCode * key,
1644                        void *value)
1645 {
1646   struct PeerSelectionContext *psc = cls;
1647   struct ConnectedPeer *cp = value;
1648   struct PendingRequest *pr = psc->pr;
1649   double score;
1650   unsigned int i;
1651
1652   /* 1) check if we have already (recently) forwarded to this peer */
1653   for (i=0;i<pr->used_pids_off;i++)
1654     if (pr->used_pids[i] == cp->pid)
1655       return GNUNET_YES; /* skip */
1656   // 2) calculate how much we'd like to forward to this peer
1657   score = 0; // FIXME!
1658   
1659   /* store best-fit in closure */
1660   if (score > psc->target_score)
1661     {
1662       psc->target_score = score;
1663       psc->target.hashPubKey = *key; 
1664     }
1665   return GNUNET_YES;
1666 }
1667
1668
1669 /**
1670  * We use a random delay to make the timing of requests
1671  * less predictable.  This function returns such a random
1672  * delay.
1673  *
1674  * @return random delay to use for some request, between 0 and TTL_DECREMENT ms
1675  */
1676 static struct GNUNET_TIME_Relative
1677 get_processing_delay ()
1678 {
1679   return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
1680                                         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1681                                                                   TTL_DECREMENT));
1682 }
1683
1684
1685 /**
1686  * Task that is run for each request with the
1687  * goal of forwarding the associated query to
1688  * other peers.  The task should re-schedule
1689  * itself to be re-run once the TTL has expired.
1690  * (or at a later time if more peers should
1691  * be queried earlier).
1692  *
1693  * @param cls the requests "struct PendingRequest*"
1694  * @param tc task context (unused)
1695  */
1696 static void
1697 forward_request_task (void *cls,
1698                       const struct GNUNET_SCHEDULER_TaskContext *tc);
1699
1700
1701 /**
1702  * We've selected a peer for forwarding of a query.
1703  * Construct the message and then re-schedule the
1704  * task to forward again to (other) peers.
1705  *
1706  * @param cls closure
1707  * @param size number of bytes available in buf
1708  * @param buf where the callee should write the message
1709  * @return number of bytes written to buf
1710  */
1711 static size_t
1712 transmit_request_cb (void *cls,
1713                      size_t size, 
1714                      void *buf)
1715 {
1716   struct PendingRequest *pr = cls;
1717   struct GetMessage *gm;
1718   GNUNET_HashCode *ext;
1719   char *bfdata;
1720   size_t msize;
1721   unsigned int k;
1722
1723   pr->cth = NULL;
1724   /* (1) check for timeout */
1725   if (NULL == buf)
1726     {
1727       /* timeout, try another peer immediately again */
1728       pr->task = GNUNET_SCHEDULER_add_with_priority (sched,
1729                                                      GNUNET_SCHEDULER_PRIORITY_IDLE,
1730                                                      &forward_request_task,
1731                                                      pr);
1732       return 0;
1733     }
1734   /* (2) build query message */
1735   k = 0; // FIXME: count hash codes!
1736   msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
1737   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1738   gm = (struct GetMessage*) buf;
1739   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
1740   gm->header.size = htons (msize);
1741   gm->type = htonl (pr->type);
1742   pr->remaining_priority /= 2;
1743   gm->priority = htonl (pr->remaining_priority);
1744   gm->ttl = htonl (pr->ttl);
1745   gm->filter_mutator = htonl(pr->mingle);
1746   gm->hash_bitmap = htonl (42);
1747   gm->query = pr->query;
1748   ext = (GNUNET_HashCode*) &gm[1];
1749   // FIXME: setup "ext[0]..[k-1]"
1750   bfdata = (char *) &ext[k];
1751   if (pr->bf != NULL)
1752     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
1753                                                bfdata,
1754                                                pr->bf_size);
1755   
1756   /* (3) schedule job to do it again (or another peer, etc.) */
1757   pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1758                                            get_processing_delay (), // FIXME!
1759                                            &forward_request_task,
1760                                            pr);
1761
1762   return msize;
1763 }
1764
1765
1766 /**
1767  * Function called after we've tried to reserve
1768  * a certain amount of bandwidth for a reply.
1769  * Check if we succeeded and if so send our query.
1770  *
1771  * @param cls the requests "struct PendingRequest*"
1772  * @param peer identifies the peer
1773  * @param bpm_in set to the current bandwidth limit (receiving) for this peer
1774  * @param bpm_out set to the current bandwidth limit (sending) for this peer
1775  * @param amount set to the amount that was actually reserved or unreserved
1776  * @param preference current traffic preference for the given peer
1777  */
1778 static void
1779 target_reservation_cb (void *cls,
1780                        const struct
1781                        GNUNET_PeerIdentity * peer,
1782                        unsigned int bpm_in,
1783                        unsigned int bpm_out,
1784                        int amount,
1785                        uint64_t preference)
1786 {
1787   struct PendingRequest *pr = cls;
1788   uint32_t priority;
1789   uint16_t size;
1790   struct GNUNET_TIME_Relative maxdelay;
1791
1792   pr->irc = NULL;
1793   GNUNET_assert (peer != NULL);
1794   if ( (amount != DBLOCK_SIZE) ||
1795        (pr->cth != NULL) )
1796     {
1797       /* try again later; FIXME: we may need to un-reserve "amount"? */
1798       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1799                                                get_processing_delay (), // FIXME: longer?
1800                                                &forward_request_task,
1801                                                pr);
1802       return;
1803     }
1804   // (2) transmit, update ttl/priority
1805   // FIXME: calculate priority, maxdelay, size properly!
1806   priority = 0;
1807   size = 60000;
1808   maxdelay = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
1809   pr->cth = GNUNET_CORE_notify_transmit_ready (core,
1810                                                priority,
1811                                                maxdelay,
1812                                                peer,
1813                                                size,
1814                                                &transmit_request_cb,
1815                                                pr);
1816   if (pr->cth == NULL)
1817     {
1818       /* try again later */
1819       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1820                                                get_processing_delay (), // FIXME: longer?
1821                                                &forward_request_task,
1822                                                pr);
1823     }
1824 }
1825
1826
1827 /**
1828  * Task that is run for each request with the
1829  * goal of forwarding the associated query to
1830  * other peers.  The task should re-schedule
1831  * itself to be re-run once the TTL has expired.
1832  * (or at a later time if more peers should
1833  * be queried earlier).
1834  *
1835  * @param cls the requests "struct PendingRequest*"
1836  * @param tc task context (unused)
1837  */
1838 static void
1839 forward_request_task (void *cls,
1840                       const struct GNUNET_SCHEDULER_TaskContext *tc)
1841 {
1842   struct PendingRequest *pr = cls;
1843   struct PeerSelectionContext psc;
1844
1845   pr->task = GNUNET_SCHEDULER_NO_TASK;
1846   if (pr->cth != NULL) 
1847     {
1848       /* we're busy transmitting a result, wait a bit */
1849       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1850                                                get_processing_delay (), 
1851                                                &forward_request_task,
1852                                                pr);
1853       return;
1854     }
1855   /* (1) select target */
1856   psc.pr = pr;
1857   psc.target_score = DBL_MIN;
1858   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1859                                          &target_peer_select_cb,
1860                                          &psc);
1861   if (psc.target_score == DBL_MIN)
1862     {
1863       /* no possible target found, wait some time */
1864       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1865                                                get_processing_delay (), // FIXME: exponential back-off? or at least wait longer...
1866                                                &forward_request_task,
1867                                                pr);
1868       return;
1869     }
1870   /* (2) reserve reply bandwidth */
1871   GNUNET_assert (NULL == pr->irc);
1872   pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
1873                                        &psc.target,
1874                                        GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
1875                                        -1,
1876                                        DBLOCK_SIZE, // FIXME: make dependent on type?
1877                                        0,
1878                                        &target_reservation_cb,
1879                                        pr);
1880 }
1881
1882
1883 /**
1884  * We're processing (local) results for a search request
1885  * from a (local) client.  Pass applicable results to the
1886  * client and if we are done either clean up (operation
1887  * complete) or switch to P2P search (more results possible).
1888  *
1889  * @param cls our closure (struct LocalGetContext)
1890  * @param key key for the content
1891  * @param size number of bytes in data
1892  * @param data content stored
1893  * @param type type of the content
1894  * @param priority priority of the content
1895  * @param anonymity anonymity-level for the content
1896  * @param expiration expiration time for the content
1897  * @param uid unique identifier for the datum;
1898  *        maybe 0 if no unique identifier is available
1899  */
1900 static void
1901 process_local_get_result (void *cls,
1902                           const GNUNET_HashCode * key,
1903                           uint32_t size,
1904                           const void *data,
1905                           uint32_t type,
1906                           uint32_t priority,
1907                           uint32_t anonymity,
1908                           struct GNUNET_TIME_Absolute
1909                           expiration, 
1910                           uint64_t uid)
1911 {
1912   struct LocalGetContext *lgc = cls;
1913   struct PendingRequest *pr;
1914   struct ClientRequestList *crl;
1915   struct ClientList *cl;
1916   size_t msize;
1917   unsigned int i;
1918
1919   if (key == NULL)
1920     {
1921 #if DEBUG_FS
1922       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1923                   "Received last result for `%s' from local datastore, deciding what to do next.\n",
1924                   GNUNET_h2s (&lgc->query));
1925 #endif
1926       /* no further results from datastore; continue
1927          processing further requests from the client and
1928          allow the next task to use the datastore; also,
1929          switch to P2P requests or clean up our state. */
1930       next_ds_request ();
1931       GNUNET_SERVER_receive_done (lgc->client,
1932                                   GNUNET_OK);
1933       if ( (lgc->results_used == 0) ||
1934            (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
1935            (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
1936            (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) )
1937         {
1938 #if DEBUG_FS
1939           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1940                       "Forwarding query for `%s' to network.\n",
1941                       GNUNET_h2s (&lgc->query));
1942 #endif
1943           cl = clients;
1944           while ( (NULL != cl) &&
1945                   (cl->client != lgc->client) )
1946             cl = cl->next;
1947           if (cl == NULL)
1948             {
1949               cl = GNUNET_malloc (sizeof (struct ClientList));
1950               cl->client = lgc->client;
1951               cl->next = clients;
1952               clients = cl;
1953             }
1954           crl = GNUNET_malloc (sizeof (struct ClientRequestList));
1955           crl->cl = cl;
1956           GNUNET_CONTAINER_DLL_insert (cl->head, cl->tail, crl);
1957           pr = GNUNET_malloc (sizeof (struct PendingRequest));
1958           pr->client = lgc->client;
1959           GNUNET_SERVER_client_keep (pr->client);
1960           pr->crl_entry = crl;
1961           crl->req = pr;
1962           if (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)
1963             {
1964               pr->namespace = GNUNET_malloc (sizeof (GNUNET_HashCode));
1965               *pr->namespace = lgc->namespace;
1966             }
1967           pr->replies_seen = lgc->results;
1968           lgc->results = NULL;
1969           pr->start_time = GNUNET_TIME_absolute_get ();
1970           pr->query = lgc->query;
1971           pr->target_pid = GNUNET_PEER_intern (&lgc->target);
1972           pr->replies_seen_off = lgc->results_used;
1973           pr->replies_seen_size = lgc->results_size;
1974           lgc->results_size = 0;
1975           pr->type = lgc->type;
1976           pr->anonymity_level = lgc->anonymity_level;
1977           pr->bf = refresh_bloomfilter (pr->replies_seen_off,
1978                                         &pr->mingle,
1979                                         &pr->bf_size,
1980                                         pr->replies_seen);
1981           GNUNET_CONTAINER_multihashmap_put (requests_by_query,
1982                                              &pr->query,
1983                                              pr,
1984                                              GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1985           pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1986                                                    get_processing_delay (),
1987                                                    &forward_request_task,
1988                                                    pr);
1989           local_get_context_free (lgc);
1990           return;
1991         }
1992       /* got all possible results, clean up! */
1993 #if DEBUG_FS
1994       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1995                   "Found all possible results for query for `%s', done!\n",
1996                   GNUNET_h2s (&lgc->query));
1997 #endif
1998       local_get_context_free (lgc);
1999       return;
2000     }
2001   if (type == GNUNET_DATASTORE_BLOCKTYPE_ONDEMAND)
2002     {
2003 #if DEBUG_FS
2004       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2005                   "Received on-demand block for `%s' from local datastore, fetching data.\n",
2006                   GNUNET_h2s (&lgc->query));
2007 #endif
2008       handle_on_demand_block (key, size, data, type, priority, 
2009                               anonymity, expiration, uid,
2010                               &process_local_get_result,
2011                               lgc);
2012       return;
2013     }
2014   if ( (type != lgc->type) &&
2015        (lgc->type != GNUNET_DATASTORE_BLOCKTYPE_ANY) )
2016     {
2017       /* this should be virtually impossible to reach (DBLOCK 
2018          query hash being identical to KBLOCK/SBLOCK query hash);
2019          nevertheless, if it happens, the correct thing is to
2020          simply skip the result. */
2021 #if DEBUG_FS
2022       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2023                   "Received block of unexpected type (%u, want %u) for `%s' from local datastore, ignoring.\n",
2024                   type,
2025                   lgc->type,
2026                   GNUNET_h2s (&lgc->query));
2027 #endif
2028       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);        
2029       return;
2030     }
2031   /* check if this is a result we've alredy
2032      received */
2033   for (i=0;i<lgc->results_used;i++)
2034     if (0 == memcmp (key,
2035                      &lgc->results[i],
2036                      sizeof (GNUNET_HashCode)))
2037       {
2038 #if DEBUG_FS
2039         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2040                     "Received duplicate result for `%s' from local datastore, ignoring.\n",
2041                     GNUNET_h2s (&lgc->query));
2042 #endif
2043         GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2044         return; 
2045       }
2046   if (lgc->results_used == lgc->results_size)
2047     GNUNET_array_grow (lgc->results,
2048                        lgc->results_size,
2049                        lgc->results_size * 2 + 2);
2050   GNUNET_CRYPTO_hash (data, 
2051                       size, 
2052                       &lgc->results[lgc->results_used++]);    
2053   msize = size + sizeof (struct ContentMessage);
2054   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
2055   lgc->result = GNUNET_malloc (msize);
2056   lgc->result->header.size = htons (msize);
2057   lgc->result->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT);
2058   lgc->result->type = htonl (type);
2059   lgc->result->expiration = GNUNET_TIME_absolute_hton (expiration);
2060   memcpy (&lgc->result[1],
2061           data,
2062           size);
2063 #if DEBUG_FS
2064   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2065               "Received new result for `%s' from local datastore, passing to client.\n",
2066               GNUNET_h2s (&lgc->query));
2067 #endif
2068   GNUNET_SERVER_notify_transmit_ready (lgc->client,
2069                                        msize,
2070                                        GNUNET_TIME_UNIT_FOREVER_REL,
2071                                        &transmit_local_result,
2072                                        lgc);
2073 }
2074
2075
2076 /**
2077  * We're processing a search request from a local
2078  * client.  Now it is our turn to query the datastore.
2079  * 
2080  * @param cls our closure (struct LocalGetContext)
2081  * @param tc unused
2082  */
2083 static void
2084 transmit_local_get (void *cls,
2085                     const struct GNUNET_SCHEDULER_TaskContext *tc)
2086 {
2087   struct LocalGetContext *lgc = cls;
2088   uint32_t type;
2089   
2090   type = lgc->type;
2091   if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK)
2092     type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */
2093   GNUNET_DATASTORE_get (dsh,
2094                         &lgc->query,
2095                         type,
2096                         &process_local_get_result,
2097                         lgc,
2098                         GNUNET_TIME_UNIT_FOREVER_REL);
2099 }
2100
2101
2102 /**
2103  * We're processing a search request from a local
2104  * client.  Now it is our turn to query the datastore.
2105  * 
2106  * @param cls our closure (struct LocalGetContext)
2107  * @param ok did we succeed to queue for datastore access, should always be GNUNET_OK
2108  */
2109 static void 
2110 transmit_local_get_ready (void *cls,
2111                           int ok)
2112 {
2113   struct LocalGetContext *lgc = cls;
2114
2115   GNUNET_assert (GNUNET_OK == ok);
2116   GNUNET_SCHEDULER_add_continuation (sched,
2117                                      &transmit_local_get,
2118                                      lgc,
2119                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
2120 }
2121
2122
2123 /**
2124  * Handle START_SEARCH-message (search request from client).
2125  *
2126  * @param cls closure
2127  * @param client identification of the client
2128  * @param message the actual message
2129  */
2130 static void
2131 handle_start_search (void *cls,
2132                      struct GNUNET_SERVER_Client *client,
2133                      const struct GNUNET_MessageHeader *message)
2134 {
2135   const struct SearchMessage *sm;
2136   struct LocalGetContext *lgc;
2137   uint16_t msize;
2138   unsigned int sc;
2139   
2140   msize = ntohs (message->size);
2141   if ( (msize < sizeof (struct SearchMessage)) ||
2142        (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
2143     {
2144       GNUNET_break (0);
2145       GNUNET_SERVER_receive_done (client,
2146                                   GNUNET_SYSERR);
2147       return;
2148     }
2149   sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
2150   sm = (const struct SearchMessage*) message;
2151   GNUNET_SERVER_client_keep (client);
2152   lgc = GNUNET_malloc (sizeof (struct LocalGetContext));
2153   if  (sc > 0)
2154     {
2155       lgc->results_used = sc;
2156       GNUNET_array_grow (lgc->results,
2157                          lgc->results_size,
2158                          sc * 2);
2159       memcpy (lgc->results,
2160               &sm[1],
2161               sc * sizeof (GNUNET_HashCode));
2162     }
2163   lgc->client = client;
2164   lgc->type = ntohl (sm->type);
2165   lgc->anonymity_level = ntohl (sm->anonymity_level);
2166   switch (lgc->type)
2167     {
2168     case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
2169     case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
2170       lgc->target.hashPubKey = sm->target;
2171       break;
2172     case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
2173       lgc->namespace = sm->target;
2174       break;
2175     default:
2176       break;
2177     }
2178   lgc->query = sm->query;
2179   GNUNET_CONTAINER_DLL_insert (lgc_head, lgc_tail, lgc);
2180   lgc->req = queue_ds_request (GNUNET_TIME_UNIT_FOREVER_REL,
2181                                &transmit_local_get_ready,
2182                                lgc);
2183 }
2184
2185
2186 /**
2187  * List of handlers for the messages understood by this
2188  * service.
2189  */
2190 static struct GNUNET_SERVER_MessageHandler handlers[] = {
2191   {&handle_index_start, NULL, 
2192    GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
2193   {&handle_index_list_get, NULL, 
2194    GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
2195   {&handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
2196    sizeof (struct UnindexMessage) },
2197   {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
2198    0 },
2199   {NULL, NULL, 0, 0}
2200 };
2201
2202
2203 /**
2204  * Clean up the memory used by the PendingRequest structure (except
2205  * for the client or peer list that the request may be part of).
2206  *
2207  * @param pr request to clean up
2208  */
2209 static void
2210 destroy_pending_request (struct PendingRequest *pr)
2211 {
2212   struct PendingReply *reply;
2213   struct ClientList *cl;
2214
2215   GNUNET_CONTAINER_multihashmap_remove (requests_by_query,
2216                                         &pr->query,
2217                                         pr);
2218   // FIXME: not sure how this can work (efficiently)
2219   // also, what does the return value mean?
2220   if (pr->irc != NULL)
2221     {
2222       GNUNET_CORE_peer_change_preference_cancel (pr->irc);
2223       pr->irc = NULL;
2224     }
2225   if (pr->client == NULL)
2226     {
2227       GNUNET_CONTAINER_heap_remove_node (requests_by_expiration,
2228                                          pr->hnode);
2229     }
2230   else
2231     {
2232       cl = pr->crl_entry->cl;
2233       GNUNET_CONTAINER_DLL_remove (cl->head,
2234                                    cl->tail,
2235                                    pr->crl_entry);
2236     }
2237   if (GNUNET_SCHEDULER_NO_TASK != pr->task)
2238     GNUNET_SCHEDULER_cancel (sched, pr->task);
2239   if (NULL != pr->cth)
2240     GNUNET_CORE_notify_transmit_ready_cancel (pr->cth);
2241   if (NULL != pr->bf)
2242     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
2243   if (NULL != pr->th)
2244     GNUNET_CONNECTION_notify_transmit_ready_cancel (pr->th);
2245   while (NULL != (reply = pr->replies_pending))
2246     {
2247       pr->replies_pending = reply->next;
2248       GNUNET_free (reply);
2249     }
2250   GNUNET_PEER_change_rc (pr->source_pid, -1);
2251   GNUNET_PEER_change_rc (pr->target_pid, -1);
2252   GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
2253   GNUNET_free_non_null (pr->used_pids);
2254   GNUNET_free_non_null (pr->replies_seen);
2255   GNUNET_free_non_null (pr->namespace);
2256   GNUNET_free (pr);
2257 }
2258
2259
2260 /**
2261  * A client disconnected.  Remove all of its pending queries.
2262  *
2263  * @param cls closure, NULL
2264  * @param client identification of the client
2265  */
2266 static void
2267 handle_client_disconnect (void *cls,
2268                           struct GNUNET_SERVER_Client
2269                           * client)
2270 {
2271   struct LocalGetContext *lgc;
2272   struct ClientList *cpos;
2273   struct ClientList *cprev;
2274   struct ClientRequestList *rl;
2275
2276   if (client == NULL)
2277     return;
2278   lgc = lgc_head;
2279   while ( (NULL != lgc) &&
2280           (lgc->client != client) )
2281     lgc = lgc->next;
2282   if (lgc != NULL)
2283     local_get_context_free (lgc);
2284   cprev = NULL;
2285   cpos = clients;
2286   while ( (NULL != cpos) &&
2287           (clients->client != client) )
2288     {
2289       cprev = cpos;
2290       cpos = cpos->next;
2291     }
2292   if (cpos != NULL)
2293     {
2294       if (cprev == NULL)
2295         clients = cpos->next;
2296       else
2297         cprev->next = cpos->next;
2298       while (NULL != (rl = cpos->head))
2299         {
2300           cpos->head = rl->next;
2301           destroy_pending_request (rl->req);
2302           GNUNET_free (rl);
2303         }
2304       GNUNET_free (cpos);
2305     }
2306 }
2307
2308
2309 /**
2310  * Iterator over entries in the "requests_by_query" map
2311  * that frees all the entries.
2312  *
2313  * @param cls closure, NULL
2314  * @param key current key code (the query, unused) 
2315  * @param value value in the hash map, of type "struct PendingRequest*"
2316  * @return GNUNET_YES (we should continue to  iterate)
2317  */
2318 static int 
2319 destroy_pending_request_cb (void *cls,
2320                             const GNUNET_HashCode * key,
2321                             void *value)
2322 {
2323   struct PendingRequest *pr = value;
2324
2325   destroy_pending_request (pr);
2326   return GNUNET_YES;
2327 }
2328
2329
2330 /**
2331  * Task run during shutdown.
2332  *
2333  * @param cls unused
2334  * @param tc unused
2335  */
2336 static void
2337 shutdown_task (void *cls,
2338                const struct GNUNET_SCHEDULER_TaskContext *tc)
2339 {
2340   struct IndexInfo *pos;  
2341
2342   if (NULL != core)
2343     {
2344       GNUNET_CORE_disconnect (core);
2345       core = NULL;
2346     }
2347   if (NULL != dsh)
2348     {
2349       GNUNET_DATASTORE_disconnect (dsh,
2350                                    GNUNET_NO);
2351       dsh = NULL;
2352     }
2353   GNUNET_CONTAINER_multihashmap_iterate (requests_by_query,
2354                                          &destroy_pending_request_cb,
2355                                          NULL);
2356   while (clients != NULL)
2357     handle_client_disconnect (NULL,
2358                               clients->client);
2359   GNUNET_CONTAINER_multihashmap_destroy (requests_by_query);
2360   requests_by_query = NULL;
2361   GNUNET_CONTAINER_multihashmap_destroy (requests_by_peer);
2362   requests_by_peer = NULL;
2363   GNUNET_CONTAINER_heap_destroy (requests_by_expiration);
2364   requests_by_expiration = NULL;
2365   // FIXME: iterate over entries and free individually?
2366   // (or do we get disconnect notifications?)
2367   GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
2368   connected_peers = NULL;
2369   GNUNET_CONTAINER_multihashmap_destroy (ifm);
2370   ifm = NULL;
2371   while (NULL != (pos = indexed_files))
2372     {
2373       indexed_files = pos->next;
2374       GNUNET_free (pos);
2375     }
2376 }
2377
2378
2379 /**
2380  * Free (each) request made by the peer.
2381  *
2382  * @param cls closure, points to peer that the request belongs to
2383  * @param key current key code
2384  * @param value value in the hash map
2385  * @return GNUNET_YES (we should continue to iterate)
2386  */
2387 static int
2388 destroy_request (void *cls,
2389                  const GNUNET_HashCode * key,
2390                  void *value)
2391 {
2392   const struct GNUNET_PeerIdentity * peer = cls;
2393   struct PendingRequest *pr = value;
2394   
2395   GNUNET_CONTAINER_multihashmap_remove (requests_by_peer,
2396                                         &peer->hashPubKey,
2397                                         pr);
2398   destroy_pending_request (pr);
2399   return GNUNET_YES;
2400 }
2401
2402
2403
2404 /**
2405  * Method called whenever a given peer connects.
2406  *
2407  * @param cls closure, not used
2408  * @param peer peer identity this notification is about
2409  * @param latency reported latency of the connection with 'other'
2410  * @param distance reported distance (DV) to 'other' 
2411  */
2412 static void 
2413 peer_connect_handler (void *cls,
2414                       const struct
2415                       GNUNET_PeerIdentity * peer,
2416                       struct GNUNET_TIME_Relative latency,
2417                       uint32_t distance)
2418 {
2419   struct ConnectedPeer *cp;
2420
2421   cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
2422   cp->pid = GNUNET_PEER_intern (peer);
2423   GNUNET_CONTAINER_multihashmap_put (connected_peers,
2424                                      &peer->hashPubKey,
2425                                      cp,
2426                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
2427 }
2428
2429
2430 /**
2431  * Method called whenever a peer disconnects.
2432  *
2433  * @param cls closure, not used
2434  * @param peer peer identity this notification is about
2435  */
2436 static void
2437 peer_disconnect_handler (void *cls,
2438                          const struct
2439                          GNUNET_PeerIdentity * peer)
2440 {
2441   struct ConnectedPeer *cp;
2442
2443   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2444                                           &peer->hashPubKey);
2445   GNUNET_PEER_change_rc (cp->pid, -1);
2446   GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
2447   GNUNET_free (cp);
2448   GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_peer,
2449                                               &peer->hashPubKey,
2450                                               &destroy_request,
2451                                               (void*) peer);
2452 }
2453
2454
2455 /**
2456  * We're processing a GET request from
2457  * another peer and have decided to forward
2458  * it to other peers.
2459  *
2460  * @param cls our "struct ProcessGetContext *"
2461  * @param tc unused
2462  */
2463 static void
2464 forward_get_request (void *cls,
2465                      const struct GNUNET_SCHEDULER_TaskContext *tc)
2466 {
2467   struct ProcessGetContext *pgc = cls;
2468   struct PendingRequest *pr;
2469   struct PendingRequest *eer;
2470   struct GNUNET_PeerIdentity target;
2471
2472   pr = GNUNET_malloc (sizeof (struct PendingRequest));
2473   if (GET_MESSAGE_BIT_SKS_NAMESPACE == (GET_MESSAGE_BIT_SKS_NAMESPACE & pgc->bm))
2474     {
2475       pr->namespace = GNUNET_malloc (sizeof(GNUNET_HashCode));
2476       *pr->namespace = pgc->namespace;
2477     }
2478   pr->bf = pgc->bf;
2479   pr->bf_size = pgc->bf_size;
2480   pgc->bf = NULL;
2481   pr->start_time = pgc->start_time;
2482   pr->query = pgc->query;
2483   pr->source_pid = GNUNET_PEER_intern (&pgc->reply_to);
2484   if (GET_MESSAGE_BIT_TRANSMIT_TO == (GET_MESSAGE_BIT_TRANSMIT_TO & pgc->bm))
2485     pr->target_pid = GNUNET_PEER_intern (&pgc->prime_target);
2486   pr->anonymity_level = 1; /* default */
2487   pr->priority = pgc->priority;
2488   pr->remaining_priority = pr->priority;
2489   pr->mingle = pgc->mingle;
2490   pr->ttl = pgc->ttl; 
2491   pr->type = pgc->type;
2492   GNUNET_CONTAINER_multihashmap_put (requests_by_query,
2493                                      &pr->query,
2494                                      pr,
2495                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
2496   GNUNET_CONTAINER_multihashmap_put (requests_by_peer,
2497                                      &pgc->reply_to.hashPubKey,
2498                                      pr,
2499                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
2500   pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration,
2501                                             pr,
2502                                             pr->start_time.value + pr->ttl);
2503   if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) > max_pending_requests)
2504     {
2505       /* expire oldest request! */
2506       eer = GNUNET_CONTAINER_heap_peek (requests_by_expiration);
2507       GNUNET_PEER_resolve (eer->source_pid,
2508                            &target);    
2509       GNUNET_CONTAINER_multihashmap_remove (requests_by_peer,
2510                                             &target.hashPubKey,
2511                                             eer);
2512       destroy_pending_request (eer);     
2513     }
2514   pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2515                                            get_processing_delay (),
2516                                            &forward_request_task,
2517                                            pr);
2518   GNUNET_free (pgc); 
2519 }
2520 /**
2521  * Transmit the given message by copying it to
2522  * the target buffer "buf".  "buf" will be
2523  * NULL and "size" zero if the socket was closed for
2524  * writing in the meantime.  In that case, only
2525
2526  * free the message
2527  *
2528  * @param cls closure, pointer to the message
2529  * @param size number of bytes available in buf
2530  * @param buf where the callee should write the message
2531  * @return number of bytes written to buf
2532  */
2533 static size_t
2534 transmit_message (void *cls,
2535                   size_t size, void *buf)
2536 {
2537   struct GNUNET_MessageHeader *msg = cls;
2538   uint16_t msize;
2539   
2540   if (NULL == buf)
2541     {
2542 #if DEBUG_FS
2543       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2544                   "Dropping reply, core too busy.\n");
2545 #endif
2546       GNUNET_free (msg);
2547       return 0;
2548     }
2549   msize = ntohs (msg->size);
2550   GNUNET_assert (size >= msize);
2551   memcpy (buf, msg, msize);
2552   GNUNET_free (msg);
2553   return msize;
2554 }
2555
2556
2557 /**
2558  * Test if the load on this peer is too high
2559  * to even consider processing the query at
2560  * all.
2561  * 
2562  * @return GNUNET_YES if the load is too high, GNUNET_NO otherwise
2563  */
2564 static int
2565 test_load_too_high ()
2566 {
2567   return GNUNET_NO; // FIXME
2568 }
2569
2570
2571 /**
2572  * We're processing (local) results for a search request
2573  * from another peer.  Pass applicable results to the
2574  * peer and if we are done either clean up (operation
2575  * complete) or forward to other peers (more results possible).
2576  *
2577  * @param cls our closure (struct LocalGetContext)
2578  * @param key key for the content
2579  * @param size number of bytes in data
2580  * @param data content stored
2581  * @param type type of the content
2582  * @param priority priority of the content
2583  * @param anonymity anonymity-level for the content
2584  * @param expiration expiration time for the content
2585  * @param uid unique identifier for the datum;
2586  *        maybe 0 if no unique identifier is available
2587  */
2588 static void
2589 process_p2p_get_result (void *cls,
2590                         const GNUNET_HashCode * key,
2591                         uint32_t size,
2592                         const void *data,
2593                         uint32_t type,
2594                         uint32_t priority,
2595                         uint32_t anonymity,
2596                         struct GNUNET_TIME_Absolute
2597                         expiration, 
2598                         uint64_t uid)
2599 {
2600   struct ProcessGetContext *pgc = cls;
2601   GNUNET_HashCode dhash;
2602   GNUNET_HashCode mhash;
2603   struct PutMessage *reply;
2604   
2605   if (NULL == key)
2606     {
2607       /* no more results */
2608       if ( ( (pgc->policy & ROUTING_POLICY_FORWARD) ==  ROUTING_POLICY_FORWARD) &&
2609            ( (0 == pgc->results_found) ||
2610              (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
2611              (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
2612              (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) ) )
2613         {
2614           GNUNET_SCHEDULER_add_continuation (sched,
2615                                              &forward_get_request,
2616                                              pgc,
2617                                              GNUNET_SCHEDULER_REASON_PREREQ_DONE);
2618         }
2619       else
2620         {
2621           if (pgc->bf != NULL)
2622             GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2623           GNUNET_free (pgc); 
2624         }
2625       next_ds_request ();
2626       return;
2627     }
2628   if (type == GNUNET_DATASTORE_BLOCKTYPE_ONDEMAND)
2629     {
2630       handle_on_demand_block (key, size, data, type, priority, 
2631                               anonymity, expiration, uid,
2632                               &process_p2p_get_result,
2633                               pgc);
2634       return;
2635     }
2636   /* check for duplicates */
2637   GNUNET_CRYPTO_hash (data, size, &dhash);
2638   mingle_hash (&dhash, 
2639                pgc->mingle,
2640                &mhash);
2641   if ( (pgc->bf != NULL) &&
2642        (GNUNET_YES ==
2643         GNUNET_CONTAINER_bloomfilter_test (pgc->bf,
2644                                            &mhash)) )
2645     {      
2646 #if DEBUG_FS
2647       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2648                   "Result from datastore filtered by bloomfilter.\n");
2649 #endif
2650       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2651       return;
2652     }
2653   pgc->results_found++;
2654   if ( (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
2655        (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
2656        (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) )
2657     {
2658       if (pgc->bf == NULL)
2659         {
2660           pgc->bf_size = 32;
2661           pgc->bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
2662                                                        pgc->bf_size, 
2663                                                        BLOOMFILTER_K);
2664         }
2665       GNUNET_CONTAINER_bloomfilter_add (pgc->bf, 
2666                                         &mhash);
2667     }
2668
2669   reply = GNUNET_malloc (sizeof (struct PutMessage) + size);
2670   reply->header.size = htons (sizeof (struct PutMessage) + size);
2671   reply->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2672   reply->type = htonl (type);
2673   reply->expiration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (expiration));
2674   memcpy (&reply[1], data, size);
2675   GNUNET_CORE_notify_transmit_ready (core,
2676                                      pgc->priority,
2677                                      ACCEPTABLE_REPLY_DELAY,
2678                                      &pgc->reply_to,
2679                                      sizeof (struct PutMessage) + size,
2680                                      &transmit_message,
2681                                      reply);
2682   if ( (GNUNET_YES == test_load_too_high()) ||
2683        (pgc->results_found > 5 + 2 * pgc->priority) )
2684     {
2685       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
2686       pgc->policy &= ~ ROUTING_POLICY_FORWARD;
2687       return;
2688     }
2689   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2690 }
2691   
2692
2693 /**
2694  * We're processing a GET request from another peer.  Give it to our
2695  * local datastore.
2696  *
2697  * @param cls our "struct ProcessGetContext"
2698  * @param ok did we get a datastore slice or not?
2699  */
2700 static void
2701 ds_get_request (void *cls, 
2702                 int ok)
2703 {
2704   struct ProcessGetContext *pgc = cls;
2705   uint32_t type;
2706   struct GNUNET_TIME_Relative timeout;
2707
2708   if (GNUNET_OK != ok)
2709     {
2710       /* no point in doing P2P stuff if we can't even do local */
2711       GNUNET_free (dsh);
2712       return;
2713     }
2714   type = pgc->type;
2715   if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK)
2716     type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */
2717   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
2718                                            (pgc->priority + 1));
2719   GNUNET_DATASTORE_get (dsh,
2720                         &pgc->query,
2721                         type,
2722                         &process_p2p_get_result,
2723                         pgc,
2724                         timeout);
2725 }
2726
2727
2728 /**
2729  * The priority level imposes a bound on the maximum
2730  * value for the ttl that can be requested.
2731  *
2732  * @param ttl_in requested ttl
2733  * @param prio given priority
2734  * @return ttl_in if ttl_in is below the limit,
2735  *         otherwise the ttl-limit for the given priority
2736  */
2737 static int32_t
2738 bound_ttl (int32_t ttl_in, uint32_t prio)
2739 {
2740   unsigned long long allowed;
2741
2742   if (ttl_in <= 0)
2743     return ttl_in;
2744   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
2745   if (ttl_in > allowed)      
2746     {
2747       if (allowed >= (1 << 30))
2748         return 1 << 30;
2749       return allowed;
2750     }
2751   return ttl_in;
2752 }
2753
2754
2755 /**
2756  * We've received a request with the specified
2757  * priority.  Bound it according to how much
2758  * we trust the given peer.
2759  * 
2760  * @param prio_in requested priority
2761  * @param peer the peer making the request
2762  * @return effective priority
2763  */
2764 static uint32_t
2765 bound_priority (uint32_t prio_in,
2766                 const struct GNUNET_PeerIdentity *peer)
2767 {
2768   return 0; // FIXME!
2769 }
2770
2771
2772 /**
2773  * Handle P2P "GET" request.
2774  *
2775  * @param cls closure, always NULL
2776  * @param other the other peer involved (sender or receiver, NULL
2777  *        for loopback messages where we are both sender and receiver)
2778  * @param message the actual message
2779  * @param latency reported latency of the connection with 'other'
2780  * @param distance reported distance (DV) to 'other' 
2781  * @return GNUNET_OK to keep the connection open,
2782  *         GNUNET_SYSERR to close it (signal serious error)
2783  */
2784 static int
2785 handle_p2p_get (void *cls,
2786                 const struct GNUNET_PeerIdentity *other,
2787                 const struct GNUNET_MessageHeader *message,
2788                                   struct GNUNET_TIME_Relative latency,
2789                                   uint32_t distance)
2790 {
2791   uint16_t msize;
2792   const struct GetMessage *gm;
2793   unsigned int bits;
2794   const GNUNET_HashCode *opt;
2795   struct ProcessGetContext *pgc;
2796   uint32_t bm;
2797   size_t bfsize;
2798   uint32_t ttl_decrement;
2799   double preference;
2800   int net_load_up;
2801   int net_load_down;
2802
2803   msize = ntohs(message->size);
2804   if (msize < sizeof (struct GetMessage))
2805     {
2806       GNUNET_break_op (0);
2807       return GNUNET_SYSERR;
2808     }
2809   gm = (const struct GetMessage*) message;
2810   bm = ntohl (gm->hash_bitmap);
2811   bits = 0;
2812   while (bm > 0)
2813     {
2814       if (1 == (bm & 1))
2815         bits++;
2816       bm >>= 1;
2817     }
2818   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
2819     {
2820       GNUNET_break_op (0);
2821       return GNUNET_SYSERR;
2822     }  
2823   opt = (const GNUNET_HashCode*) &gm[1];
2824   bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
2825   pgc = GNUNET_malloc (sizeof (struct ProcessGetContext));
2826   if (bfsize > 0)
2827     {
2828       pgc->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &pgc[1],
2829                                                    bfsize,
2830                                                    BLOOMFILTER_K);
2831       pgc->bf_size = bfsize;
2832     }
2833   pgc->type = ntohl (gm->type);
2834   pgc->bm = ntohl (gm->hash_bitmap);
2835   pgc->mingle = gm->filter_mutator;
2836   bits = 0;
2837   if (0 != (pgc->bm & GET_MESSAGE_BIT_RETURN_TO))
2838     pgc->reply_to.hashPubKey = opt[bits++];
2839   else
2840     pgc->reply_to = *other;
2841   if (0 != (pgc->bm & GET_MESSAGE_BIT_SKS_NAMESPACE))
2842     pgc->namespace = opt[bits++];
2843   else if (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)
2844     {
2845       GNUNET_break_op (0);
2846       GNUNET_free (pgc);
2847       return GNUNET_SYSERR;
2848     }
2849   if (0 != (pgc->bm & GET_MESSAGE_BIT_TRANSMIT_TO))
2850     pgc->prime_target.hashPubKey = opt[bits++];
2851   /* note that we can really only check load here since otherwise
2852      peers could find out that we are overloaded by being disconnected
2853      after sending us a malformed query... */
2854   if (GNUNET_YES == test_load_too_high ())
2855     {
2856       if (NULL != pgc->bf)
2857         GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2858       GNUNET_free (pgc);
2859 #if DEBUG_FS
2860       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2861                   "Dropping query from `%s', this peer is too busy.\n",
2862                   GNUNET_i2s (other));
2863 #endif
2864       return GNUNET_OK;
2865     }
2866   net_load_up = 50; // FIXME
2867   net_load_down = 50; // FIXME
2868   pgc->policy = ROUTING_POLICY_NONE;
2869   if ( (net_load_up < IDLE_LOAD_THRESHOLD) &&
2870        (net_load_down < IDLE_LOAD_THRESHOLD) )
2871     {
2872       pgc->policy |= ROUTING_POLICY_ALL;
2873       pgc->priority = 0; /* no charge */
2874     }
2875   else
2876     {
2877       pgc->priority = bound_priority (ntohl (gm->priority), other);
2878       if ( (net_load_up < 
2879             IDLE_LOAD_THRESHOLD + pgc->priority * pgc->priority) &&
2880            (net_load_down < 
2881             IDLE_LOAD_THRESHOLD + pgc->priority * pgc->priority) )
2882         {
2883           pgc->policy |= ROUTING_POLICY_ALL;
2884         }
2885       else
2886         {
2887           // FIXME: is this sound?
2888           if (net_load_up < 90 + 10 * pgc->priority)
2889             pgc->policy |= ROUTING_POLICY_FORWARD;
2890           if (net_load_down < 90 + 10 * pgc->priority)
2891             pgc->policy |= ROUTING_POLICY_ANSWER;
2892         }
2893     }
2894   if (pgc->policy == ROUTING_POLICY_NONE)
2895     {
2896 #if DEBUG_FS
2897       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2898                   "Dropping query from `%s', network saturated.\n",
2899                   GNUNET_i2s (other));
2900 #endif
2901       if (NULL != pgc->bf)
2902         GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2903       GNUNET_free (pgc);
2904       return GNUNET_OK;     /* drop */
2905     }
2906   if ((pgc->policy & ROUTING_POLICY_INDIRECT) != ROUTING_POLICY_INDIRECT)
2907     pgc->priority = 0;  /* kill the priority (we cannot benefit) */
2908   pgc->ttl = bound_ttl (ntohl (gm->ttl), pgc->priority);
2909   /* decrement ttl (always) */
2910   ttl_decrement = 2 * TTL_DECREMENT +
2911     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2912                               TTL_DECREMENT);
2913   if ( (pgc->ttl < 0) &&
2914        (pgc->ttl - ttl_decrement > 0) )
2915     {
2916 #if DEBUG_FS
2917       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2918                   "Dropping query from `%s' due to TTL underflow.\n",
2919                   GNUNET_i2s (other));
2920 #endif
2921       /* integer underflow => drop (should be very rare)! */
2922       if (NULL != pgc->bf)
2923         GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2924       GNUNET_free (pgc);
2925       return GNUNET_OK;
2926     }
2927   pgc->ttl -= ttl_decrement;
2928   pgc->start_time = GNUNET_TIME_absolute_get ();
2929   preference = (double) pgc->priority;
2930   if (preference < QUERY_BANDWIDTH_VALUE)
2931     preference = QUERY_BANDWIDTH_VALUE;
2932   // FIXME: also reserve bandwidth for reply?
2933   (void) GNUNET_CORE_peer_change_preference (sched, cfg,
2934                                              other,
2935                                              GNUNET_TIME_UNIT_FOREVER_REL,
2936                                              0, 0, preference, NULL, NULL);
2937   if (0 != (pgc->policy & ROUTING_POLICY_ANSWER))
2938     pgc->drq = queue_ds_request (BASIC_DATASTORE_REQUEST_DELAY,
2939                                  &ds_get_request,
2940                                  pgc);
2941   else
2942     GNUNET_SCHEDULER_add_continuation (sched,
2943                                        &forward_get_request,
2944                                        pgc,
2945                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
2946   return GNUNET_OK;
2947 }
2948
2949
2950 /**
2951  * Function called to notify us that we can now transmit a reply to a
2952  * client or peer.  "buf" will be NULL and "size" zero if the socket was
2953  * closed for writing in the meantime.
2954  *
2955  * @param cls closure, points to a "struct PendingRequest*" with
2956  *            one or more pending replies
2957  * @param size number of bytes available in buf
2958  * @param buf where the callee should write the message
2959  * @return number of bytes written to buf
2960  */
2961 static size_t
2962 transmit_result (void *cls,
2963                  size_t size, 
2964                  void *buf)
2965 {
2966   struct PendingRequest *pr = cls;
2967   char *cbuf = buf;
2968   struct PendingReply *reply;
2969   size_t ret;
2970
2971   ret = 0;
2972   while (NULL != (reply = pr->replies_pending))
2973     {
2974       if ( (reply->msize + ret < ret) ||
2975            (reply->msize + ret > size) )
2976         break;
2977       pr->replies_pending = reply->next;
2978       memcpy (&cbuf[ret], &reply[1], reply->msize);
2979       ret += reply->msize;
2980       GNUNET_free (reply);
2981     }
2982   return ret;
2983 }
2984
2985
2986 /**
2987  * Iterator over pending requests.
2988  *
2989  * @param cls response (struct ProcessReplyClosure)
2990  * @param key our query
2991  * @param value value in the hash map (meta-info about the query)
2992  * @return GNUNET_YES (we should continue to iterate)
2993  */
2994 static int
2995 process_reply (void *cls,
2996                const GNUNET_HashCode * key,
2997                void *value)
2998 {
2999   struct ProcessReplyClosure *prq = cls;
3000   struct PendingRequest *pr = value;
3001   struct PendingRequest *eer;
3002   struct PendingReply *reply;
3003   struct PutMessage *pm;
3004   struct ContentMessage *cm;
3005   GNUNET_HashCode chash;
3006   GNUNET_HashCode mhash;
3007   struct GNUNET_PeerIdentity target;
3008   size_t msize;
3009   uint32_t prio;
3010   struct GNUNET_TIME_Relative max_delay;
3011   
3012   GNUNET_CRYPTO_hash (prq->data,
3013                       prq->size,
3014                       &chash);
3015   switch (prq->type)
3016     {
3017     case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
3018     case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
3019       break;
3020     case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
3021       /* FIXME: does prq->namespace match our expectations? */
3022       /* then: fall-through??? */
3023     case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
3024       if (pr->bf != NULL) 
3025         {
3026           mingle_hash (&chash, pr->mingle, &mhash);
3027           if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (pr->bf,
3028                                                                &mhash))
3029             return GNUNET_YES; /* duplicate */
3030           GNUNET_CONTAINER_bloomfilter_add (pr->bf,
3031                                             &mhash);
3032         }
3033       break;
3034     case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK:
3035       // FIXME: any checks against duplicates for SKBlocks?
3036       break;
3037     }
3038   prio = pr->priority;
3039   prq->priority += pr->remaining_priority;
3040   pr->remaining_priority = 0;
3041   if (pr->client != NULL)
3042     {
3043       if (pr->replies_seen_size == pr->replies_seen_off)
3044         GNUNET_array_grow (pr->replies_seen,
3045                            pr->replies_seen_size,
3046                            pr->replies_seen_size * 2 + 4);
3047       pr->replies_seen[pr->replies_seen_off++] = chash;
3048       // FIXME: possibly recalculate BF!
3049     }
3050   if (pr->client == NULL)
3051     {
3052       msize = sizeof (struct ContentMessage) + prq->size;
3053       reply = GNUNET_malloc (msize + sizeof (struct PendingReply));
3054       reply->msize = msize;
3055       cm = (struct ContentMessage*) &reply[1];
3056       cm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT);
3057       cm->header.size = htons (msize);
3058       cm->type = htonl (prq->type);
3059       cm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3060       reply->next = pr->replies_pending;
3061       pr->replies_pending = reply;
3062       memcpy (&reply[1], prq->data, prq->size);
3063       if (pr->cth != NULL)
3064         return GNUNET_YES;
3065       max_delay = GNUNET_TIME_UNIT_FOREVER_REL;
3066       if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) >= max_pending_requests)
3067         {
3068           /* estimate expiration time from time difference between
3069              first request that will be discarded and this request */
3070           eer = GNUNET_CONTAINER_heap_peek (requests_by_expiration);
3071           max_delay = GNUNET_TIME_absolute_get_difference (pr->start_time,
3072                                                            eer->start_time);
3073         }
3074       GNUNET_PEER_resolve (pr->source_pid,
3075                            &target);
3076       pr->cth = GNUNET_CORE_notify_transmit_ready (core,
3077                                                    prio,
3078                                                    max_delay,
3079                                                    &target,
3080                                                    msize,
3081                                                    &transmit_result,
3082                                                    pr);
3083       if (NULL == pr->cth)
3084         {
3085           // FIXME: now what? discard?
3086         }
3087     }
3088   else
3089     {
3090       msize = sizeof (struct PutMessage) + prq->size;
3091       reply = GNUNET_malloc (msize + sizeof (struct PendingReply));
3092       reply->msize = msize;
3093       reply->next = pr->replies_pending;
3094       pm = (struct PutMessage*) &reply[1];
3095       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3096       pm->header.size = htons (msize);
3097       pm->type = htonl (prq->type);
3098       pm->expiration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (prq->expiration));
3099       pr->replies_pending = reply;
3100       memcpy (&reply[1], prq->data, prq->size);
3101       if (pr->th != NULL)
3102         return GNUNET_YES;
3103       pr->th = GNUNET_SERVER_notify_transmit_ready (pr->client,
3104                                                     msize,
3105                                                     GNUNET_TIME_UNIT_FOREVER_REL,
3106                                                     &transmit_result,
3107                                                     pr);
3108       if (pr->th == NULL)
3109         {
3110           // FIXME: need to try again later (not much
3111           // to do here specifically, but we need to
3112           // check somewhere else to handle this case!)
3113         }
3114     }
3115   // FIXME: implement hot-path routing statistics keeping!
3116   return GNUNET_YES;
3117 }
3118
3119
3120 /**
3121  * Check if the given KBlock is well-formed.
3122  *
3123  * @param kb the kblock data (or at least "dsize" bytes claiming to be one)
3124  * @param dsize size of "kb" in bytes; check for < sizeof(struct KBlock)!
3125  * @param query where to store the query that this block answers
3126  * @return GNUNET_OK if this is actually a well-formed KBlock
3127  */
3128 static int
3129 check_kblock (const struct KBlock *kb,
3130               size_t dsize,
3131               GNUNET_HashCode *query)
3132 {
3133   if (dsize < sizeof (struct KBlock))
3134     {
3135       GNUNET_break_op (0);
3136       return GNUNET_SYSERR;
3137     }
3138   if (dsize - sizeof (struct KBlock) !=
3139       ntohs (kb->purpose.size) 
3140       - sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) 
3141       - sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) ) 
3142     {
3143       GNUNET_break_op (0);
3144       return GNUNET_SYSERR;
3145     }
3146   if (GNUNET_OK !=
3147       GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_KBLOCK,
3148                                 &kb->purpose,
3149                                 &kb->signature,
3150                                 &kb->keyspace)) 
3151     {
3152       GNUNET_break_op (0);
3153       return GNUNET_SYSERR;
3154     }
3155   if (query != NULL)
3156     GNUNET_CRYPTO_hash (&kb->keyspace,
3157                         sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
3158                         query);
3159   return GNUNET_OK;
3160 }
3161
3162
3163 /**
3164  * Check if the given SBlock is well-formed.
3165  *
3166  * @param sb the sblock data (or at least "dsize" bytes claiming to be one)
3167  * @param dsize size of "kb" in bytes; check for < sizeof(struct SBlock)!
3168  * @param query where to store the query that this block answers
3169  * @param namespace where to store the namespace that this block belongs to
3170  * @return GNUNET_OK if this is actually a well-formed SBlock
3171  */
3172 static int
3173 check_sblock (const struct SBlock *sb,
3174               size_t dsize,
3175               GNUNET_HashCode *query,   
3176               GNUNET_HashCode *namespace)
3177 {
3178   if (dsize < sizeof (struct SBlock))
3179     {
3180       GNUNET_break_op (0);
3181       return GNUNET_SYSERR;
3182     }
3183   if (dsize !=
3184       ntohs (sb->purpose.size) + sizeof (struct GNUNET_CRYPTO_RsaSignature))
3185     {
3186       GNUNET_break_op (0);
3187       return GNUNET_SYSERR;
3188     }
3189   if (GNUNET_OK !=
3190       GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_SBLOCK,
3191                                 &sb->purpose,
3192                                 &sb->signature,
3193                                 &sb->subspace)) 
3194     {
3195       GNUNET_break_op (0);
3196       return GNUNET_SYSERR;
3197     }
3198   if (query != NULL)
3199     *query = sb->identifier;
3200   if (namespace != NULL)
3201     GNUNET_CRYPTO_hash (&sb->subspace,
3202                         sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
3203                         namespace);
3204   return GNUNET_OK;
3205 }
3206
3207
3208 /**
3209  * Handle P2P "PUT" request.
3210  *
3211  * @param cls closure, always NULL
3212  * @param other the other peer involved (sender or receiver, NULL
3213  *        for loopback messages where we are both sender and receiver)
3214  * @param message the actual message
3215  * @param latency reported latency of the connection with 'other'
3216  * @param distance reported distance (DV) to 'other' 
3217  * @return GNUNET_OK to keep the connection open,
3218  *         GNUNET_SYSERR to close it (signal serious error)
3219  */
3220 static int
3221 handle_p2p_put (void *cls,
3222                 const struct GNUNET_PeerIdentity *other,
3223                 const struct GNUNET_MessageHeader *message,
3224                 struct GNUNET_TIME_Relative latency,
3225                 uint32_t distance)
3226 {
3227   const struct PutMessage *put;
3228   uint16_t msize;
3229   size_t dsize;
3230   uint32_t type;
3231   struct GNUNET_TIME_Absolute expiration;
3232   GNUNET_HashCode query;
3233   struct ProcessReplyClosure prq;
3234
3235   msize = ntohs (message->size);
3236   if (msize < sizeof (struct PutMessage))
3237     {
3238       GNUNET_break_op(0);
3239       return GNUNET_SYSERR;
3240     }
3241   put = (const struct PutMessage*) message;
3242   dsize = msize - sizeof (struct PutMessage);
3243   type = ntohl (put->type);
3244   expiration = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (put->expiration));
3245
3246   /* first, validate! */
3247   switch (type)
3248     {
3249     case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
3250     case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
3251       GNUNET_CRYPTO_hash (&put[1], dsize, &query);
3252       break;
3253     case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
3254       if (GNUNET_OK !=
3255           check_kblock ((const struct KBlock*) &put[1],
3256                         dsize,
3257                         &query))
3258         return GNUNET_SYSERR;
3259       break;
3260     case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
3261       if (GNUNET_OK !=
3262           check_sblock ((const struct SBlock*) &put[1],
3263                         dsize,
3264                         &query,
3265                         &prq.namespace))
3266         return GNUNET_SYSERR;
3267       break;
3268     case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK:
3269       // FIXME -- validate SKBLOCK!
3270       GNUNET_break (0);
3271       return GNUNET_OK;
3272     default:
3273       /* unknown block type */
3274       GNUNET_break_op (0);
3275       return GNUNET_SYSERR;
3276     }
3277
3278   /* now, lookup 'query' */
3279   prq.data = (const void*) &put[1];
3280   prq.size = dsize;
3281   prq.type = type;
3282   prq.expiration = expiration;
3283   prq.priority = 0;
3284   GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_query,
3285                                               &query,
3286                                               &process_reply,
3287                                               &prq);
3288   // FIXME: if migration is on and load is low,
3289   // queue to store data in datastore;
3290   // use "prq.priority" for that!
3291   return GNUNET_OK;
3292 }
3293
3294
3295 /**
3296  * List of handlers for P2P messages
3297  * that we care about.
3298  */
3299 static struct GNUNET_CORE_MessageHandler p2p_handlers[] =
3300   {
3301     { &handle_p2p_get, 
3302       GNUNET_MESSAGE_TYPE_FS_GET, 0 },
3303     { &handle_p2p_put, 
3304       GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
3305     { NULL, 0, 0 }
3306   };
3307
3308
3309 /**
3310  * Process fs requests.
3311  *
3312  * @param cls closure
3313  * @param s scheduler to use
3314  * @param server the initialized server
3315  * @param c configuration to use
3316  */
3317 static void
3318 run (void *cls,
3319      struct GNUNET_SCHEDULER_Handle *s,
3320      struct GNUNET_SERVER_Handle *server,
3321      const struct GNUNET_CONFIGURATION_Handle *c)
3322 {
3323   sched = s;
3324   cfg = c;
3325
3326   ifm = GNUNET_CONTAINER_multihashmap_create (128);
3327   requests_by_query = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
3328   requests_by_peer = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
3329   connected_peers = GNUNET_CONTAINER_multihashmap_create (64);
3330   requests_by_expiration = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
3331   read_index_list ();
3332   dsh = GNUNET_DATASTORE_connect (cfg,
3333                                   sched);
3334   core = GNUNET_CORE_connect (sched,
3335                               cfg,
3336                               GNUNET_TIME_UNIT_FOREVER_REL,
3337                               NULL,
3338                               NULL,
3339                               NULL,
3340                               &peer_connect_handler,
3341                               &peer_disconnect_handler,
3342                               NULL, GNUNET_NO,
3343                               NULL, GNUNET_NO,
3344                               p2p_handlers);
3345
3346   GNUNET_SERVER_disconnect_notify (server, 
3347                                    &handle_client_disconnect,
3348                                    NULL);
3349   GNUNET_SERVER_add_handlers (server, handlers);
3350   GNUNET_SCHEDULER_add_delayed (sched,
3351                                 GNUNET_TIME_UNIT_FOREVER_REL,
3352                                 &shutdown_task,
3353                                 NULL);
3354   if (NULL == dsh)
3355     {
3356       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3357                   _("Failed to connect to `%s' service.\n"),
3358                   "datastore");
3359       GNUNET_SCHEDULER_shutdown (sched);
3360       return;
3361     }
3362   if (NULL == core)
3363     {
3364       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3365                   _("Failed to connect to `%s' service.\n"),
3366                   "core");
3367       GNUNET_SCHEDULER_shutdown (sched);
3368       return;
3369     }
3370 }
3371
3372
3373 /**
3374  * The main function for the fs service.
3375  *
3376  * @param argc number of arguments from the command line
3377  * @param argv command line arguments
3378  * @return 0 ok, 1 on error
3379  */
3380 int
3381 main (int argc, char *const *argv)
3382 {
3383   return (GNUNET_OK ==
3384           GNUNET_SERVICE_run (argc,
3385                               argv,
3386                               "fs",
3387                               GNUNET_SERVICE_OPTION_NONE,
3388                               &run, NULL)) ? 0 : 1;
3389 }
3390
3391 /* end of gnunet-service-fs.c */