updating heap API and implementation
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs.c
23  * @brief program that provides the file-sharing service
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - fix gazillion of minor FIXME's
28  * - possible major issue: we may queue "gazillions" of (K|S)Blocks for the
29  *   core to transmit to another peer; need to make sure this is bounded overall...
30  * - randomly delay processing for improved anonymity (can wait)
31  * - content migration (put in local DS) (can wait)
32  * - handle some special cases when forwarding replies based on tracked requests (can wait)
33  * - tracking of success correlations for hot-path routing (can wait)
34  * - various load-based actions (can wait)
35  * - validation of KSBLOCKS (can wait)
36  * - remove on-demand blocks if they keep failing (can wait)
37  * - check that we decrement PIDs always where necessary (can wait)
38  * - find out how to use core-pulling instead of pushing (at least for some cases)
39  */
40 #include "platform.h"
41 #include <float.h>
42 #include "gnunet_core_service.h"
43 #include "gnunet_datastore_service.h"
44 #include "gnunet_peer_lib.h"
45 #include "gnunet_protocols.h"
46 #include "gnunet_signatures.h"
47 #include "gnunet_util_lib.h"
48 #include "fs.h"
49
50 #define DEBUG_FS GNUNET_NO
51
52
53 /**
54  * In-memory information about indexed files (also available
55  * on-disk).
56  */
57 struct IndexInfo
58 {
59   
60   /**
61    * This is a linked list.
62    */
63   struct IndexInfo *next;
64
65   /**
66    * Name of the indexed file.  Memory allocated
67    * at the end of this struct (do not free).
68    */
69   const char *filename;
70
71   /**
72    * Context for transmitting confirmation to client,
73    * NULL if we've done this already.
74    */
75   struct GNUNET_SERVER_TransmitContext *tc;
76   
77   /**
78    * Hash of the contents of the file.
79    */
80   GNUNET_HashCode file_id;
81
82 };
83
84
85 /**
86  * Signature of a function that is called whenever a datastore
87  * request can be processed (or an entry put on the queue times out).
88  *
89  * @param cls closure
90  * @param ok GNUNET_OK if DS is ready, GNUNET_SYSERR on timeout
91  */
92 typedef void (*RequestFunction)(void *cls,
93                                 int ok);
94
95
96 /**
97  * Doubly-linked list of our requests for the datastore.
98  */
99 struct DatastoreRequestQueue
100 {
101
102   /**
103    * This is a doubly-linked list.
104    */
105   struct DatastoreRequestQueue *next;
106
107   /**
108    * This is a doubly-linked list.
109    */
110   struct DatastoreRequestQueue *prev;
111
112   /**
113    * Function to call (will issue the request).
114    */
115   RequestFunction req;
116
117   /**
118    * Closure for req.
119    */
120   void *req_cls;
121
122   /**
123    * When should this request time-out because we don't care anymore?
124    */
125   struct GNUNET_TIME_Absolute timeout;
126     
127   /**
128    * ID of task used for signaling timeout.
129    */
130   GNUNET_SCHEDULER_TaskIdentifier task;
131
132 };
133
134
135 /**
136  * Closure for processing START_SEARCH messages from a client.
137  */
138 struct LocalGetContext
139 {
140
141   /**
142    * This is a doubly-linked list.
143    */
144   struct LocalGetContext *next;
145
146   /**
147    * This is a doubly-linked list.
148    */
149   struct LocalGetContext *prev;
150
151   /**
152    * Client that initiated the search.
153    */
154   struct GNUNET_SERVER_Client *client;
155
156   /**
157    * Array of results that we've already received 
158    * (can be NULL).
159    */
160   GNUNET_HashCode *results; 
161
162   /**
163    * Bloomfilter over all results (for fast query construction);
164    * NULL if we don't have any results.
165    *
166    * FIXME: this member is not used, is that OK? If so, it should
167    * be removed!
168    */
169   struct GNUNET_CONTAINER_BloomFilter *results_bf; 
170
171   /**
172    * DS request associated with this operation.
173    */
174   struct DatastoreRequestQueue *req;
175
176   /**
177    * Current result message to transmit to client (or NULL).
178    */
179   struct ContentMessage *result;
180   
181   /**
182    * Type of the content that we're looking for.
183    * 0 for any.
184    */
185   uint32_t type;
186
187   /**
188    * Desired anonymity level.
189    */
190   uint32_t anonymity_level;
191
192   /**
193    * Number of results actually stored in the results array.
194    */
195   unsigned int results_used;
196   
197   /**
198    * Size of the results array in memory.
199    */
200   unsigned int results_size;
201   
202   /**
203    * Size (in bytes) of the 'results_bf' bloomfilter.
204    *
205    * FIXME: this member is not used, is that OK? If so, it should
206    * be removed!
207    */
208   size_t results_bf_size;
209
210   /**
211    * If the request is for a DBLOCK or IBLOCK, this is the identity of
212    * the peer that is known to have a response.  Set to all-zeros if
213    * such a target is not known (note that even if OUR anonymity
214    * level is >0 we may happen to know the responder's identity;
215    * nevertheless, we should probably not use it for a DHT-lookup
216    * or similar blunt actions in order to avoid exposing ourselves).
217    */
218   struct GNUNET_PeerIdentity target;
219
220   /**
221    * If the request is for an SBLOCK, this is the identity of the
222    * pseudonym to which the SBLOCK belongs. 
223    */
224   GNUNET_HashCode namespace;
225
226   /**
227    * Hash of the keyword (aka query) for KBLOCKs; Hash of
228    * the CHK-encoded block for DBLOCKS and IBLOCKS (aka query)
229    * and hash of the identifier XORed with the target for
230    * SBLOCKS (aka query).
231    */
232   GNUNET_HashCode query;
233
234 };
235
236
237 /**
238  * Possible routing policies for an FS-GET request.
239  */
240 enum RoutingPolicy
241   {
242     /**
243      * Simply drop the request.
244      */
245     ROUTING_POLICY_NONE = 0,
246     
247     /**
248      * Answer it if we can from local datastore.
249      */
250     ROUTING_POLICY_ANSWER = 1,
251
252     /**
253      * Forward the request to other peers (if possible).
254      */
255     ROUTING_POLICY_FORWARD = 2,
256
257     /**
258      * Forward to other peers, and ask them to route
259      * the response via ourselves.
260      */
261     ROUTING_POLICY_INDIRECT = 6,
262     
263     /**
264      * Do everything we could possibly do (that would
265      * make sense).
266      */
267     ROUTING_POLICY_ALL = 7
268   };
269
270
271 /**
272  * Internal context we use for our initial processing
273  * of a GET request.
274  */
275 struct ProcessGetContext
276 {
277   /**
278    * The search query (used for datastore lookup).
279    */
280   GNUNET_HashCode query;
281   
282   /**
283    * Which peer we should forward the response to.
284    */
285   struct GNUNET_PeerIdentity reply_to;
286
287   /**
288    * Namespace for the result (only set for SKS requests)
289    */
290   GNUNET_HashCode namespace;
291
292   /**
293    * Peer that we should forward the query to if possible
294    * (since that peer likely has the content).
295    */
296   struct GNUNET_PeerIdentity prime_target;
297
298   /**
299    * When did we receive this request?
300    */
301   struct GNUNET_TIME_Absolute start_time;
302
303   /**
304    * Our entry in the DRQ (non-NULL while we wait for our
305    * turn to interact with the local database).
306    */
307   struct DatastoreRequestQueue *drq;
308
309   /**
310    * Filter used to eliminate duplicate results.  Can be NULL if we
311    * are not yet filtering any results.
312    */
313   struct GNUNET_CONTAINER_BloomFilter *bf;
314
315   /**
316    * Bitmap describing which of the optional
317    * hash codes / peer identities were given to us.
318    */
319   uint32_t bm;
320
321   /**
322    * Desired block type.
323    */
324   uint32_t type;
325
326   /**
327    * Priority of the request.
328    */
329   uint32_t priority;
330
331   /**
332    * Size of the 'bf' (in bytes).
333    */
334   size_t bf_size;
335
336   /**
337    * In what ways are we going to process
338    * the request?
339    */
340   enum RoutingPolicy policy;
341
342   /**
343    * Time-to-live for the request (value
344    * we use).
345    */
346   int32_t ttl;
347
348   /**
349    * Number to mingle hashes for bloom-filter
350    * tests with.
351    */
352   int32_t mingle;
353
354   /**
355    * Number of results that were found so far.
356    */
357   unsigned int results_found;
358 };
359
360
361 /**
362  * Information we keep for each pending reply.
363  */
364 struct PendingReply
365 {
366   /**
367    * This is a linked list.
368    */
369   struct PendingReply *next;
370
371   /**
372    * Size of the reply; actual reply message follows
373    * at the end of this struct.
374    */
375   size_t msize;
376
377 };
378
379
380 /**
381  * All requests from a client are kept in a doubly-linked list.
382  */
383 struct ClientRequestList;
384
385
386 /**
387  * Information we keep for each pending request.  We should try to
388  * keep this struct as small as possible since its memory consumption
389  * is key to how many requests we can have pending at once.
390  */
391 struct PendingRequest
392 {
393
394   /**
395    * ID of a client making a request, NULL if this entry is for a
396    * peer.
397    */
398   struct GNUNET_SERVER_Client *client;
399
400   /**
401    * If this request was made by a client,
402    * this is our entry in the client request
403    * list; otherwise NULL.
404    */
405   struct ClientRequestList *crl_entry;
406
407   /**
408    * If this is a namespace query, pointer to the hash of the public
409    * key of the namespace; otherwise NULL.
410    */
411   GNUNET_HashCode *namespace;
412
413   /**
414    * Bloomfilter we use to filter out replies that we don't care about
415    * (anymore).  NULL as long as we are interested in all replies.
416    */
417   struct GNUNET_CONTAINER_BloomFilter *bf;
418
419   /**
420    * Replies that we have received but were unable to forward yet
421    * (typically non-null only if we have a pending transmission
422    * request with the client or the respective peer).
423    */
424   struct PendingReply *replies_pending;
425
426   /**
427    * Pending transmission request with the core service for the target
428    * peer (for processing of 'replies_pending') or Handle for a
429    * pending query-request for P2P-transmission with the core service.
430    * If non-NULL, this request must be cancelled should this struct be
431    * destroyed!
432    */
433   struct GNUNET_CORE_TransmitHandle *cth;
434
435   /**
436    * Pending transmission request for the target client (for processing of
437    * 'replies_pending').
438    */
439   struct GNUNET_CONNECTION_TransmitHandle *th;
440
441   /**
442    * Hash code of all replies that we have seen so far (only valid
443    * if client is not NULL since we only track replies like this for
444    * our own clients).
445    */
446   GNUNET_HashCode *replies_seen;
447
448   /**
449    * Node in the heap representing this entry.
450    */
451   struct GNUNET_CONTAINER_HeapNode *hnode;
452
453   /**
454    * When did we first see this request (form this peer), or, if our
455    * client is initiating, when did we last initiate a search?
456    */
457   struct GNUNET_TIME_Absolute start_time;
458
459   /**
460    * The query that this request is for.
461    */
462   GNUNET_HashCode query;
463
464   /**
465    * The task responsible for transmitting queries
466    * for this request.
467    */
468   GNUNET_SCHEDULER_TaskIdentifier task;
469
470   /**
471    * (Interned) Peer identifier (only valid if "client" is NULL)
472    * that identifies a peer that gave us this request.
473    */
474   GNUNET_PEER_Id source_pid;
475
476   /**
477    * (Interned) Peer identifier that identifies a preferred target
478    * for requests.
479    */
480   GNUNET_PEER_Id target_pid;
481
482   /**
483    * (Interned) Peer identifiers of peers that have already
484    * received our query for this content.
485    */
486   GNUNET_PEER_Id *used_pids;
487
488   /**
489    * Size of the 'bf' (in bytes).
490    */
491   size_t bf_size;
492
493   /**
494    * Desired anonymity level; only valid for requests from a local client.
495    */
496   uint32_t anonymity_level;
497
498   /**
499    * How many entries in "used_pids" are actually valid?
500    */
501   unsigned int used_pids_off;
502
503   /**
504    * How long is the "used_pids" array?
505    */
506   unsigned int used_pids_size;
507
508   /**
509    * How many entries in "replies_seen" are actually valid?
510    */
511   unsigned int replies_seen_off;
512
513   /**
514    * How long is the "replies_seen" array?
515    */
516   unsigned int replies_seen_size;
517   
518   /**
519    * Priority with which this request was made.  If one of our clients
520    * made the request, then this is the current priority that we are
521    * using when initiating the request.  This value is used when
522    * we decide to reward other peers with trust for providing a reply.
523    */
524   uint32_t priority;
525
526   /**
527    * Priority points left for us to spend when forwarding this request
528    * to other peers.
529    */
530   uint32_t remaining_priority;
531
532   /**
533    * Number to mingle hashes for bloom-filter
534    * tests with.
535    */
536   int32_t mingle;
537
538   /**
539    * TTL with which we saw this request (or, if we initiated, TTL that
540    * we used for the request).
541    */
542   int32_t ttl;
543   
544   /**
545    * Type of the content that this request is for.
546    */
547   uint32_t type;
548
549 };
550
551
552 /**
553  * All requests from a client are kept in a doubly-linked list.
554  */
555 struct ClientRequestList
556 {
557   /**
558    * This is a doubly-linked list.
559    */
560   struct ClientRequestList *next;
561
562   /**
563    * This is a doubly-linked list.
564    */ 
565   struct ClientRequestList *prev;
566
567   /**
568    * A request from this client.
569    */
570   struct PendingRequest *req;
571
572   /**
573    * Client list with the head and tail of this DLL.
574    */
575   struct ClientList *cl;
576 };
577
578
579 /**
580  * Linked list of all clients that we are 
581  * currently processing requests for.
582  */
583 struct ClientList
584 {
585
586   /**
587    * This is a linked list.
588    */
589   struct ClientList *next;
590
591   /**
592    * What client is this entry for?
593    */
594   struct GNUNET_SERVER_Client* client;
595
596   /**
597    * Head of the DLL of requests from this client.
598    */
599   struct ClientRequestList *head;
600
601   /**
602    * Tail of the DLL of requests from this client.
603    */
604   struct ClientRequestList *tail;
605
606 };
607
608
609 /**
610  * Closure for "process_reply" function.
611  */
612 struct ProcessReplyClosure
613 {
614   /**
615    * The data for the reply.
616    */
617   const void *data;
618
619   /**
620    * When the reply expires.
621    */
622   struct GNUNET_TIME_Absolute expiration;
623
624   /**
625    * Size of data.
626    */
627   size_t size;
628
629   /**
630    * Namespace that this reply belongs to
631    * (if it is of type SBLOCK).
632    */
633   GNUNET_HashCode namespace;
634
635   /**
636    * Type of the block.
637    */
638   uint32_t type;
639
640   /**
641    * How much was this reply worth to us?
642    */
643   uint32_t priority;
644 };
645
646
647 /**
648  * Information about a peer that we are connected to.
649  * We track data that is useful for determining which
650  * peers should receive our requests.
651  */
652 struct ConnectedPeer
653 {
654
655   /**
656    * List of the last clients for which this peer
657    * successfully answered a query. 
658    */
659   struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
660
661   /**
662    * List of the last PIDs for which
663    * this peer successfully answered a query;
664    * We use 0 to indicate no successful reply.
665    */
666   GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
667
668   /**
669    * Average delay between sending the peer a request and
670    * getting a reply (only calculated over the requests for
671    * which we actually got a reply).   Calculated
672    * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
673    */ 
674   struct GNUNET_TIME_Relative avg_delay;
675
676   /**
677    * Average priority of successful replies.  Calculated
678    * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
679    */
680   double avg_priority;
681
682   /**
683    * The peer's identity.
684    */
685   GNUNET_PEER_Id pid;  
686
687   /**
688    * Number of requests we have currently pending
689    * with this peer (that is, requests that were
690    * transmitted so recently that we would not retransmit
691    * them right now).
692    */
693   unsigned int pending_requests;
694
695   /**
696    * Which offset in "last_p2p_replies" will be updated next?
697    * (we go round-robin).
698    */
699   unsigned int last_p2p_replies_woff;
700
701   /**
702    * Which offset in "last_client_replies" will be updated next?
703    * (we go round-robin).
704    */
705   unsigned int last_client_replies_woff;
706
707 };
708
709
710 /**
711  * Our connection to the datastore.
712  */
713 static struct GNUNET_DATASTORE_Handle *dsh;
714
715 /**
716  * Our scheduler.
717  */
718 static struct GNUNET_SCHEDULER_Handle *sched;
719
720 /**
721  * Our configuration.
722  */
723 const struct GNUNET_CONFIGURATION_Handle *cfg;
724
725 /**
726  * Handle to the core service (NULL until we've connected to it).
727  */
728 struct GNUNET_CORE_Handle *core;
729
730 /**
731  * Head of doubly-linked LGC list.
732  */
733 static struct LocalGetContext *lgc_head;
734
735 /**
736  * Tail of doubly-linked LGC list.
737  */
738 static struct LocalGetContext *lgc_tail;
739
740 /**
741  * Head of request queue for the datastore, sorted by timeout.
742  */
743 static struct DatastoreRequestQueue *drq_head;
744
745 /**
746  * Tail of request queue for the datastore.
747  */
748 static struct DatastoreRequestQueue *drq_tail;
749
750 /**
751  * Linked list of indexed files.
752  */
753 static struct IndexInfo *indexed_files;
754
755 /**
756  * Maps hash over content of indexed files to the respective filename.
757  * The filenames are pointers into the indexed_files linked list and
758  * do not need to be freed.
759  */
760 static struct GNUNET_CONTAINER_MultiHashMap *ifm;
761
762 /**
763  * Map of query hash codes to requests.
764  */
765 static struct GNUNET_CONTAINER_MultiHashMap *requests_by_query;
766
767 /**
768  * Map of peer IDs to requests (for those requests coming
769  * from other peers).
770  */
771 static struct GNUNET_CONTAINER_MultiHashMap *requests_by_peer;
772
773 /**
774  * Linked list of all of our clients and their requests.
775  */
776 static struct ClientList *clients;
777
778 /**
779  * Heap with the request that will expire next at the top.  Contains
780  * pointers of type "struct PendingRequest*"; these will *also* be
781  * aliased from the "requests_by_peer" data structures and the
782  * "requests_by_query" table.  Note that requests from our clients
783  * don't expire and are thus NOT in the "requests_by_expiration"
784  * (or the "requests_by_peer" tables).
785  */
786 static struct GNUNET_CONTAINER_Heap *requests_by_expiration;
787
788 /**
789  * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
790  */
791 static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
792
793 /**
794  * Maximum number of requests (from other peers) that we're
795  * willing to have pending at any given point in time.
796  * FIXME: set from configuration (and 32 is a tiny value for testing only).
797  */
798 static uint64_t max_pending_requests = 32;
799
800
801 /**
802  * Write the current index information list to disk.
803  */ 
804 static void
805 write_index_list ()
806 {
807   struct GNUNET_BIO_WriteHandle *wh;
808   char *fn;
809   struct IndexInfo *pos;  
810
811   if (GNUNET_OK !=
812       GNUNET_CONFIGURATION_get_value_filename (cfg,
813                                                "FS",
814                                                "INDEXDB",
815                                                &fn))
816     {
817       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
818                   _("Configuration option `%s' in section `%s' missing.\n"),
819                   "INDEXDB",
820                   "FS");
821       return;
822     }
823   wh = GNUNET_BIO_write_open (fn);
824   if (NULL == wh)
825     {
826       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
827                   _("Could not open `%s'.\n"),
828                   fn);
829       GNUNET_free (fn);
830       return;
831     }
832   pos = indexed_files;
833   while (pos != NULL)
834     {
835       if ( (GNUNET_OK !=
836             GNUNET_BIO_write (wh,
837                               &pos->file_id,
838                               sizeof (GNUNET_HashCode))) ||
839            (GNUNET_OK !=
840             GNUNET_BIO_write_string (wh,
841                                      pos->filename)) )
842         break;
843       pos = pos->next;
844     }
845   if (GNUNET_OK != 
846       GNUNET_BIO_write_close (wh))
847     {
848       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
849                   _("Error writing `%s'.\n"),
850                   fn);
851       GNUNET_free (fn);
852       return;
853     }
854   GNUNET_free (fn);
855 }
856
857
858 /**
859  * Read index information from disk.
860  */
861 static void
862 read_index_list ()
863 {
864   struct GNUNET_BIO_ReadHandle *rh;
865   char *fn;
866   struct IndexInfo *pos;  
867   char *fname;
868   GNUNET_HashCode hc;
869   size_t slen;
870   char *emsg;
871
872   if (GNUNET_OK !=
873       GNUNET_CONFIGURATION_get_value_filename (cfg,
874                                                "FS",
875                                                "INDEXDB",
876                                                &fn))
877     {
878       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
879                   _("Configuration option `%s' in section `%s' missing.\n"),
880                   "INDEXDB",
881                   "FS");
882       return;
883     }
884   if (GNUNET_NO == GNUNET_DISK_file_test (fn))
885     {
886       /* no index info  yet */
887       GNUNET_free (fn);
888       return;
889     }
890   rh = GNUNET_BIO_read_open (fn);
891   if (NULL == rh)
892     {
893       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
894                   _("Could not open `%s'.\n"),
895                   fn);
896       GNUNET_free (fn);
897       return;
898     }
899
900   while ( (GNUNET_OK ==
901            GNUNET_BIO_read (rh,
902                             "Hash of indexed file",
903                             &hc,
904                             sizeof (GNUNET_HashCode))) &&
905           (GNUNET_OK ==
906            GNUNET_BIO_read_string (rh, 
907                                    "Name of indexed file",
908                                    &fname,
909                                    1024 * 16)) )
910     {
911       slen = strlen (fname) + 1;
912       pos = GNUNET_malloc (sizeof (struct IndexInfo) + slen);
913       pos->file_id = hc;
914       pos->filename = (const char *) &pos[1];
915       memcpy (&pos[1], fname, slen);
916       if (GNUNET_SYSERR ==
917           GNUNET_CONTAINER_multihashmap_put (ifm,
918                                              &hc,
919                                              (void*) pos->filename,
920                                              GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
921         {
922           GNUNET_free (pos);
923         }
924       else
925         {
926           pos->next = indexed_files;
927           indexed_files = pos;
928         }
929       GNUNET_free (fname);
930     }
931   if (GNUNET_OK != 
932       GNUNET_BIO_read_close (rh, &emsg))
933     GNUNET_free (emsg);
934   GNUNET_free (fn);
935 }
936
937
938 /**
939  * We've validated the hash of the file we're about to
940  * index.  Signal success to the client and update
941  * our internal data structures.
942  *
943  * @param ii the index info entry for the request
944  */
945 static void
946 signal_index_ok (struct IndexInfo *ii)
947 {
948   if (GNUNET_SYSERR ==
949       GNUNET_CONTAINER_multihashmap_put (ifm,
950                                          &ii->file_id,
951                                          (void*) ii->filename,
952                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
953     {
954       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
955                   _("Index request received for file `%s' is indexed as `%s'.  Permitting anyway.\n"),
956                   ii->filename,
957                   (const char*) GNUNET_CONTAINER_multihashmap_get (ifm,
958                                                                    &ii->file_id));
959       GNUNET_SERVER_transmit_context_append (ii->tc,
960                                              NULL, 0,
961                                              GNUNET_MESSAGE_TYPE_FS_INDEX_START_OK);
962       GNUNET_SERVER_transmit_context_run (ii->tc,
963                                           GNUNET_TIME_UNIT_MINUTES);
964       GNUNET_free (ii);
965       return;
966     }
967   ii->next = indexed_files;
968   indexed_files = ii;
969   write_index_list ();
970   GNUNET_SERVER_transmit_context_append (ii->tc,
971                                          NULL, 0,
972                                          GNUNET_MESSAGE_TYPE_FS_INDEX_START_OK);
973   GNUNET_SERVER_transmit_context_run (ii->tc,
974                                       GNUNET_TIME_UNIT_MINUTES);
975   ii->tc = NULL;
976 }
977
978
979 /**
980  * Function called once the hash computation over an
981  * indexed file has completed.
982  *
983  * @param cls closure, our publishing context
984  * @param res resulting hash, NULL on error
985  */
986 static void 
987 hash_for_index_val (void *cls,
988                     const GNUNET_HashCode *
989                     res)
990 {
991   struct IndexInfo *ii = cls;
992   
993   if ( (res == NULL) ||
994        (0 != memcmp (res,
995                      &ii->file_id,
996                      sizeof(GNUNET_HashCode))) )
997     {
998       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
999                   _("Hash mismatch trying to index file `%s' which has hash `%s'\n"),
1000                   ii->filename,
1001                   GNUNET_h2s (res));
1002 #if DEBUG_FS
1003       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1004                   "Wanted `%s'\n",
1005                   GNUNET_h2s (&ii->file_id));
1006 #endif
1007       GNUNET_SERVER_transmit_context_append (ii->tc,
1008                                              NULL, 0,
1009                                              GNUNET_MESSAGE_TYPE_FS_INDEX_START_FAILED);
1010       GNUNET_SERVER_transmit_context_run (ii->tc,
1011                                           GNUNET_TIME_UNIT_MINUTES);
1012       GNUNET_free (ii);
1013       return;
1014     }
1015   signal_index_ok (ii);
1016 }
1017
1018
1019 /**
1020  * Handle INDEX_START-message.
1021  *
1022  * @param cls closure
1023  * @param client identification of the client
1024  * @param message the actual message
1025  */
1026 static void
1027 handle_index_start (void *cls,
1028                     struct GNUNET_SERVER_Client *client,
1029                     const struct GNUNET_MessageHeader *message)
1030 {
1031   const struct IndexStartMessage *ism;
1032   const char *fn;
1033   uint16_t msize;
1034   struct IndexInfo *ii;
1035   size_t slen;
1036   uint32_t dev;
1037   uint64_t ino;
1038   uint32_t mydev;
1039   uint64_t myino;
1040
1041   msize = ntohs(message->size);
1042   if ( (msize <= sizeof (struct IndexStartMessage)) ||
1043        ( ((const char *)message)[msize-1] != '\0') )
1044     {
1045       GNUNET_break (0);
1046       GNUNET_SERVER_receive_done (client,
1047                                   GNUNET_SYSERR);
1048       return;
1049     }
1050   ism = (const struct IndexStartMessage*) message;
1051   fn = (const char*) &ism[1];
1052   dev = ntohl (ism->device);
1053   ino = GNUNET_ntohll (ism->inode);
1054   ism = (const struct IndexStartMessage*) message;
1055   slen = strlen (fn) + 1;
1056   ii = GNUNET_malloc (sizeof (struct IndexInfo) + slen);
1057   ii->filename = (const char*) &ii[1];
1058   memcpy (&ii[1], fn, slen);
1059   ii->file_id = ism->file_id;  
1060   ii->tc = GNUNET_SERVER_transmit_context_create (client);
1061   if ( ( (dev != 0) ||
1062          (ino != 0) ) &&
1063        (GNUNET_OK == GNUNET_DISK_file_get_identifiers (fn,
1064                                                        &mydev,
1065                                                        &myino)) &&
1066        ( (dev == mydev) &&
1067          (ino == myino) ) )
1068     {      
1069       /* fast validation OK! */
1070       signal_index_ok (ii);
1071       return;
1072     }
1073 #if DEBUG_FS
1074   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1075               "Mismatch in file identifiers (%llu != %llu or %u != %u), need to hash.\n",
1076               (unsigned long long) ino,
1077               (unsigned long long) myino,
1078               (unsigned int) dev,
1079               (unsigned int) mydev);
1080 #endif
1081   /* slow validation, need to hash full file (again) */
1082   GNUNET_CRYPTO_hash_file (sched,
1083                            GNUNET_SCHEDULER_PRIORITY_IDLE,
1084                            fn,
1085                            HASHING_BLOCKSIZE,
1086                            &hash_for_index_val,
1087                            ii);
1088 }
1089
1090
1091 /**
1092  * Handle INDEX_LIST_GET-message.
1093  *
1094  * @param cls closure
1095  * @param client identification of the client
1096  * @param message the actual message
1097  */
1098 static void
1099 handle_index_list_get (void *cls,
1100                        struct GNUNET_SERVER_Client *client,
1101                        const struct GNUNET_MessageHeader *message)
1102 {
1103   struct GNUNET_SERVER_TransmitContext *tc;
1104   struct IndexInfoMessage *iim;
1105   char buf[GNUNET_SERVER_MAX_MESSAGE_SIZE];
1106   size_t slen;
1107   const char *fn;
1108   struct GNUNET_MessageHeader *msg;
1109   struct IndexInfo *pos;
1110
1111   tc = GNUNET_SERVER_transmit_context_create (client);
1112   iim = (struct IndexInfoMessage*) buf;
1113   msg = &iim->header;
1114   pos = indexed_files;
1115   while (NULL != pos)
1116     {
1117       iim->reserved = 0;
1118       iim->file_id = pos->file_id;
1119       fn = pos->filename;
1120       slen = strlen (fn) + 1;
1121       if (slen + sizeof (struct IndexInfoMessage) > 
1122           GNUNET_SERVER_MAX_MESSAGE_SIZE)
1123         {
1124           GNUNET_break (0);
1125           break;
1126         }
1127       memcpy (&iim[1], fn, slen);
1128       GNUNET_SERVER_transmit_context_append
1129         (tc,
1130          &msg[1],
1131          sizeof (struct IndexInfoMessage) 
1132          - sizeof (struct GNUNET_MessageHeader) + slen,
1133          GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_ENTRY);
1134       pos = pos->next;
1135     }
1136   GNUNET_SERVER_transmit_context_append (tc,
1137                                          NULL, 0,
1138                                          GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_END);
1139   GNUNET_SERVER_transmit_context_run (tc,
1140                                       GNUNET_TIME_UNIT_MINUTES);
1141 }
1142
1143
1144 /**
1145  * Handle UNINDEX-message.
1146  *
1147  * @param cls closure
1148  * @param client identification of the client
1149  * @param message the actual message
1150  */
1151 static void
1152 handle_unindex (void *cls,
1153                 struct GNUNET_SERVER_Client *client,
1154                 const struct GNUNET_MessageHeader *message)
1155 {
1156   const struct UnindexMessage *um;
1157   struct IndexInfo *pos;
1158   struct IndexInfo *prev;
1159   struct IndexInfo *next;
1160   struct GNUNET_SERVER_TransmitContext *tc;
1161   int found;
1162   
1163   um = (const struct UnindexMessage*) message;
1164   found = GNUNET_NO;
1165   prev = NULL;
1166   pos = indexed_files;
1167   while (NULL != pos)
1168     {
1169       next = pos->next;
1170       if (0 == memcmp (&pos->file_id,
1171                        &um->file_id,
1172                        sizeof (GNUNET_HashCode)))
1173         {
1174           if (prev == NULL)
1175             indexed_files = pos->next;
1176           else
1177             prev->next = pos->next;
1178           GNUNET_free (pos);
1179           found = GNUNET_YES;
1180         }
1181       else
1182         {
1183           prev = pos;
1184         }
1185       pos = next;
1186     }
1187 #if DEBUG_FS
1188   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1189               "Client requested unindexing of file `%s': %s\n",
1190               GNUNET_h2s (&um->file_id),
1191               found ? "found" : "not found");
1192 #endif
1193   if (GNUNET_YES == found)    
1194     write_index_list ();
1195   tc = GNUNET_SERVER_transmit_context_create (client);
1196   GNUNET_SERVER_transmit_context_append (tc,
1197                                          NULL, 0,
1198                                          GNUNET_MESSAGE_TYPE_FS_UNINDEX_OK);
1199   GNUNET_SERVER_transmit_context_run (tc,
1200                                       GNUNET_TIME_UNIT_MINUTES);
1201 }
1202
1203
1204 /**
1205  * Run the next DS request in our
1206  * queue, we're done with the current one.
1207  */
1208 static void
1209 next_ds_request ()
1210 {
1211   struct DatastoreRequestQueue *e;
1212   
1213   while (NULL != (e = drq_head))
1214     {
1215       if (0 != GNUNET_TIME_absolute_get_remaining (e->timeout).value)
1216         break;
1217       if (e->task != GNUNET_SCHEDULER_NO_TASK)
1218         GNUNET_SCHEDULER_cancel (sched, e->task);
1219       GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
1220       e->req (e->req_cls, GNUNET_NO);
1221       GNUNET_free (e);  
1222     }
1223   if (e == NULL)
1224     return;
1225   if (e->task != GNUNET_SCHEDULER_NO_TASK)
1226     GNUNET_SCHEDULER_cancel (sched, e->task);
1227   e->task = GNUNET_SCHEDULER_NO_TASK;
1228   e->req (e->req_cls, GNUNET_YES);
1229   GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
1230   GNUNET_free (e);  
1231 }
1232
1233
1234 /**
1235  * A datastore request had to be timed out. 
1236  *
1237  * @param cls closure (of type "struct DatastoreRequestQueue*")
1238  * @param tc task context, unused
1239  */
1240 static void
1241 timeout_ds_request (void *cls,
1242                     const struct GNUNET_SCHEDULER_TaskContext *tc)
1243 {
1244   struct DatastoreRequestQueue *e = cls;
1245
1246   e->task = GNUNET_SCHEDULER_NO_TASK;
1247   GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
1248   e->req (e->req_cls, GNUNET_NO);
1249   GNUNET_free (e);  
1250 }
1251
1252
1253 /**
1254  * Queue a request for the datastore.
1255  *
1256  * @param deadline by when the request should run
1257  * @param fun function to call once the request can be run
1258  * @param fun_cls closure for fun
1259  */
1260 static struct DatastoreRequestQueue *
1261 queue_ds_request (struct GNUNET_TIME_Relative deadline,
1262                   RequestFunction fun,
1263                   void *fun_cls)
1264 {
1265   struct DatastoreRequestQueue *e;
1266   struct DatastoreRequestQueue *bef;
1267
1268   if (drq_head == NULL)
1269     {
1270       /* no other requests pending, run immediately */
1271       fun (fun_cls, GNUNET_OK);
1272       return NULL;
1273     }
1274   e = GNUNET_malloc (sizeof (struct DatastoreRequestQueue));
1275   e->timeout = GNUNET_TIME_relative_to_absolute (deadline);
1276   e->req = fun;
1277   e->req_cls = fun_cls;
1278   if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
1279     {
1280       /* local request, highest prio, put at head of queue
1281          regardless of deadline */
1282       bef = NULL;
1283     }
1284   else
1285     {
1286       bef = drq_tail;
1287       while ( (NULL != bef) &&
1288               (e->timeout.value < bef->timeout.value) )
1289         bef = bef->prev;
1290     }
1291   GNUNET_CONTAINER_DLL_insert_after (drq_head, drq_tail, bef, e);
1292   if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
1293     return e;
1294   e->task = GNUNET_SCHEDULER_add_delayed (sched,
1295                                           deadline,
1296                                           &timeout_ds_request,
1297                                           e);
1298   return e;                                    
1299 }
1300
1301
1302 /**
1303  * Free the state associated with a local get context.
1304  *
1305  * @param lgc the lgc to free
1306  */
1307 static void
1308 local_get_context_free (struct LocalGetContext *lgc) 
1309 {
1310   GNUNET_CONTAINER_DLL_remove (lgc_head, lgc_tail, lgc);
1311   GNUNET_SERVER_client_drop (lgc->client); 
1312   GNUNET_free_non_null (lgc->results);
1313   if (lgc->results_bf != NULL)
1314     GNUNET_CONTAINER_bloomfilter_free (lgc->results_bf);
1315   if (lgc->req != NULL)
1316     {
1317       if (lgc->req->task != GNUNET_SCHEDULER_NO_TASK)
1318         GNUNET_SCHEDULER_cancel (sched, lgc->req->task);
1319       GNUNET_CONTAINER_DLL_remove (lgc_head, lgc_tail, lgc);
1320       GNUNET_free (lgc->req);
1321     }
1322   GNUNET_free (lgc);
1323 }
1324
1325
1326 /**
1327  * We're able to transmit the next (local) result to the client.
1328  * Do it and ask the datastore for more.  Or, on error, tell
1329  * the datastore to stop giving us more.
1330  *
1331  * @param cls our closure (struct LocalGetContext)
1332  * @param max maximum number of bytes we can transmit
1333  * @param buf where to copy our message
1334  * @return number of bytes copied to buf
1335  */
1336 static size_t
1337 transmit_local_result (void *cls,
1338                        size_t max,
1339                        void *buf)
1340 {
1341   struct LocalGetContext *lgc = cls;  
1342   uint16_t msize;
1343
1344   if (NULL == buf)
1345     {
1346 #if DEBUG_FS
1347       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1348                   "Failed to transmit result to local client, aborting datastore iteration.\n");
1349 #endif
1350       /* error, abort! */
1351       GNUNET_free (lgc->result);
1352       lgc->result = NULL;
1353       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
1354       return 0;
1355     }
1356   msize = ntohs (lgc->result->header.size);
1357 #if DEBUG_FS
1358   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1359               "Transmitting %u bytes of result to local client.\n",
1360               msize);
1361 #endif
1362   GNUNET_assert (max >= msize);
1363   memcpy (buf, lgc->result, msize);
1364   GNUNET_free (lgc->result);
1365   lgc->result = NULL;
1366   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1367   return msize;
1368 }
1369
1370
1371 /**
1372  * Continuation called from datastore's remove
1373  * function.
1374  *
1375  * @param cls unused
1376  * @param success did the deletion work?
1377  * @param msg error message
1378  */
1379 static void
1380 remove_cont (void *cls,
1381              int success,
1382              const char *msg)
1383 {
1384   if (GNUNET_OK != success)
1385     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1386                 _("Failed to delete bogus block: %s\n"),
1387                 msg);
1388   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1389 }
1390
1391
1392 /**
1393  * Mingle hash with the mingle_number to
1394  * produce different bits.
1395  */
1396 static void
1397 mingle_hash (const GNUNET_HashCode * in,
1398              int32_t mingle_number, 
1399              GNUNET_HashCode * hc)
1400 {
1401   GNUNET_HashCode m;
1402
1403   GNUNET_CRYPTO_hash (&mingle_number, 
1404                       sizeof (int32_t), 
1405                       &m);
1406   GNUNET_CRYPTO_hash_xor (&m, in, hc);
1407 }
1408
1409
1410 /**
1411  * We've received an on-demand encoded block
1412  * from the datastore.  Attempt to do on-demand
1413  * encoding and (if successful), call the 
1414  * continuation with the resulting block.  On
1415  * error, clean up and ask the datastore for
1416  * more results.
1417  *
1418  * @param key key for the content
1419  * @param size number of bytes in data
1420  * @param data content stored
1421  * @param type type of the content
1422  * @param priority priority of the content
1423  * @param anonymity anonymity-level for the content
1424  * @param expiration expiration time for the content
1425  * @param uid unique identifier for the datum;
1426  *        maybe 0 if no unique identifier is available
1427  * @param cont function to call with the actual block
1428  * @param cont_cls closure for cont
1429  */
1430 static void
1431 handle_on_demand_block (const GNUNET_HashCode * key,
1432                         uint32_t size,
1433                         const void *data,
1434                         uint32_t type,
1435                         uint32_t priority,
1436                         uint32_t anonymity,
1437                         struct GNUNET_TIME_Absolute
1438                         expiration, uint64_t uid,
1439                         GNUNET_DATASTORE_Iterator cont,
1440                         void *cont_cls)
1441 {
1442   const struct OnDemandBlock *odb;
1443   GNUNET_HashCode nkey;
1444   struct GNUNET_CRYPTO_AesSessionKey skey;
1445   struct GNUNET_CRYPTO_AesInitializationVector iv;
1446   GNUNET_HashCode query;
1447   ssize_t nsize;
1448   char ndata[DBLOCK_SIZE];
1449   char edata[DBLOCK_SIZE];
1450   const char *fn;
1451   struct GNUNET_DISK_FileHandle *fh;
1452   uint64_t off;
1453
1454   if (size != sizeof (struct OnDemandBlock))
1455     {
1456       GNUNET_break (0);
1457       GNUNET_DATASTORE_remove (dsh, 
1458                                key,
1459                                size,
1460                                data,
1461                                &remove_cont,
1462                                NULL,
1463                                GNUNET_TIME_UNIT_FOREVER_REL);     
1464       return;
1465     }
1466   odb = (const struct OnDemandBlock*) data;
1467   off = GNUNET_ntohll (odb->offset);
1468   fn = (const char*) GNUNET_CONTAINER_multihashmap_get (ifm,
1469                                                         &odb->file_id);
1470   fh = NULL;
1471   if ( (NULL == fn) ||
1472        (NULL == (fh = GNUNET_DISK_file_open (fn, 
1473                                              GNUNET_DISK_OPEN_READ,
1474                                              GNUNET_DISK_PERM_NONE))) ||
1475        (off !=
1476         GNUNET_DISK_file_seek (fh,
1477                                off,
1478                                GNUNET_DISK_SEEK_SET)) ||
1479        (-1 ==
1480         (nsize = GNUNET_DISK_file_read (fh,
1481                                         ndata,
1482                                         sizeof (ndata)))) )
1483     {
1484       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1485                   _("Could not access indexed file `%s' at offset %llu: %s\n"),
1486                   GNUNET_h2s (&odb->file_id),
1487                   (unsigned long long) off,
1488                   STRERROR (errno));
1489       if (fh != NULL)
1490         GNUNET_DISK_file_close (fh);
1491       /* FIXME: if this happens often, we need
1492          to remove the OnDemand block from the DS! */
1493       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);        
1494       return;
1495     }
1496   GNUNET_DISK_file_close (fh);
1497   GNUNET_CRYPTO_hash (ndata,
1498                       nsize,
1499                       &nkey);
1500   GNUNET_CRYPTO_hash_to_aes_key (&nkey, &skey, &iv);
1501   GNUNET_CRYPTO_aes_encrypt (ndata,
1502                              nsize,
1503                              &skey,
1504                              &iv,
1505                              edata);
1506   GNUNET_CRYPTO_hash (edata,
1507                       nsize,
1508                       &query);
1509   if (0 != memcmp (&query, 
1510                    key,
1511                    sizeof (GNUNET_HashCode)))
1512     {
1513       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1514                   _("Indexed file `%s' changed at offset %llu\n"),
1515                   fn,
1516                   (unsigned long long) off);
1517       /* FIXME: if this happens often, we need
1518          to remove the OnDemand block from the DS! */
1519       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1520       return;
1521     }
1522   cont (cont_cls,
1523         key,
1524         nsize,
1525         edata,
1526         GNUNET_DATASTORE_BLOCKTYPE_DBLOCK,
1527         priority,
1528         anonymity,
1529         expiration,
1530         uid);
1531 }
1532
1533
1534 /**
1535  * How many bytes should a bloomfilter be if we have already seen
1536  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
1537  * of bits set per entry.  Furthermore, we should not re-size the
1538  * filter too often (to keep it cheap).
1539  *
1540  * Since other peers will also add entries but not resize the filter,
1541  * we should generally pick a slightly larger size than what the
1542  * strict math would suggest.
1543  *
1544  * @return must be a power of two and smaller or equal to 2^15.
1545  */
1546 static size_t
1547 compute_bloomfilter_size (unsigned int entry_count)
1548 {
1549   size_t size;
1550   unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
1551   uint16_t max = 1 << 15;
1552
1553   if (entry_count > max)
1554     return max;
1555   size = 8;
1556   while ((size < max) && (size < ideal))
1557     size *= 2;
1558   if (size > max)
1559     return max;
1560   return size;
1561 }
1562
1563
1564 /**
1565  * Recalculate our bloom filter for filtering replies.
1566  *
1567  * @param count number of entries we are filtering right now
1568  * @param mingle set to our new mingling value
1569  * @param bf_size set to the size of the bloomfilter
1570  * @param entries the entries to filter
1571  * @return updated bloomfilter, NULL for none
1572  */
1573 static struct GNUNET_CONTAINER_BloomFilter *
1574 refresh_bloomfilter (unsigned int count,
1575                      int32_t * mingle,
1576                      size_t *bf_size,
1577                      const GNUNET_HashCode *entries)
1578 {
1579   struct GNUNET_CONTAINER_BloomFilter *bf;
1580   size_t nsize;
1581   unsigned int i;
1582   GNUNET_HashCode mhash;
1583
1584   if (0 == count)
1585     return NULL;
1586   nsize = compute_bloomfilter_size (count);
1587   *mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
1588   *bf_size = nsize;
1589   bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
1590                                           nsize,
1591                                           BLOOMFILTER_K);
1592   for (i=0;i<count;i++)
1593     {
1594       mingle_hash (&entries[i], *mingle, &mhash);
1595       GNUNET_CONTAINER_bloomfilter_add (bf, &mhash);
1596     }
1597   return bf;
1598 }
1599
1600
1601 /**
1602  * Closure used for "target_peer_select_cb".
1603  */
1604 struct PeerSelectionContext 
1605 {
1606   /**
1607    * The request for which we are selecting
1608    * peers.
1609    */
1610   struct PendingRequest *pr;
1611
1612   /**
1613    * Current "prime" target.
1614    */
1615   struct GNUNET_PeerIdentity target;
1616
1617   /**
1618    * How much do we like this target?
1619    */
1620   double target_score;
1621
1622 };
1623
1624
1625 /**
1626  * Function called for each connected peer to determine
1627  * which one(s) would make good targets for forwarding.
1628  *
1629  * @param cls closure (struct PeerSelectionContext)
1630  * @param key current key code (peer identity)
1631  * @param value value in the hash map (struct ConnectedPeer)
1632  * @return GNUNET_YES if we should continue to
1633  *         iterate,
1634  *         GNUNET_NO if not.
1635  */
1636 static int
1637 target_peer_select_cb (void *cls,
1638                        const GNUNET_HashCode * key,
1639                        void *value)
1640 {
1641   struct PeerSelectionContext *psc = cls;
1642   struct ConnectedPeer *cp = value;
1643   struct PendingRequest *pr = psc->pr;
1644   double score;
1645   unsigned int i;
1646
1647   /* 1) check if we have already (recently) forwarded to this peer */
1648   for (i=0;i<pr->used_pids_off;i++)
1649     if (pr->used_pids[i] == cp->pid)
1650       return GNUNET_YES; /* skip */
1651   // 2) calculate how much we'd like to forward to this peer
1652   score = 0; // FIXME!
1653   
1654   /* store best-fit in closure */
1655   if (score > psc->target_score)
1656     {
1657       psc->target_score = score;
1658       psc->target.hashPubKey = *key; 
1659     }
1660   return GNUNET_YES;
1661 }
1662
1663
1664 /**
1665  * We use a random delay to make the timing of requests
1666  * less predictable.  This function returns such a random
1667  * delay.
1668  *
1669  * @return random delay to use for some request, between 0 and TTL_DECREMENT ms
1670  */
1671 static struct GNUNET_TIME_Relative
1672 get_processing_delay ()
1673 {
1674   return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
1675                                         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1676                                                                   TTL_DECREMENT));
1677 }
1678
1679
1680 /**
1681  * Task that is run for each request with the
1682  * goal of forwarding the associated query to
1683  * other peers.  The task should re-schedule
1684  * itself to be re-run once the TTL has expired.
1685  * (or at a later time if more peers should
1686  * be queried earlier).
1687  *
1688  * @param cls the requests "struct PendingRequest*"
1689  * @param tc task context (unused)
1690  */
1691 static void
1692 forward_request_task (void *cls,
1693                       const struct GNUNET_SCHEDULER_TaskContext *tc);
1694
1695
1696 /**
1697  * We've selected a peer for forwarding of a query.
1698  * Construct the message and then re-schedule the
1699  * task to forward again to (other) peers.
1700  *
1701  * @param cls closure
1702  * @param size number of bytes available in buf
1703  * @param buf where the callee should write the message
1704  * @return number of bytes written to buf
1705  */
1706 static size_t
1707 transmit_request_cb (void *cls,
1708                      size_t size, 
1709                      void *buf)
1710 {
1711   struct PendingRequest *pr = cls;
1712   struct GetMessage *gm;
1713   GNUNET_HashCode *ext;
1714   char *bfdata;
1715   size_t msize;
1716   unsigned int k;
1717
1718   pr->cth = NULL;
1719   /* (1) check for timeout */
1720   if (NULL == buf)
1721     {
1722       /* timeout, try another peer immediately again */
1723       pr->task = GNUNET_SCHEDULER_add_with_priority (sched,
1724                                                      GNUNET_SCHEDULER_PRIORITY_IDLE,
1725                                                      &forward_request_task,
1726                                                      pr);
1727       return 0;
1728     }
1729   /* (2) build query message */
1730   k = 0; // FIXME: count hash codes!
1731   msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
1732   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1733   gm = (struct GetMessage*) buf;
1734   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
1735   gm->header.size = htons (msize);
1736   gm->type = htonl (pr->type);
1737   pr->remaining_priority /= 2;
1738   gm->priority = htonl (pr->remaining_priority);
1739   gm->ttl = htonl (pr->ttl);
1740   gm->filter_mutator = htonl(pr->mingle);
1741   gm->hash_bitmap = htonl (42);
1742   gm->query = pr->query;
1743   ext = (GNUNET_HashCode*) &gm[1];
1744   // FIXME: setup "ext[0]..[k-1]"
1745   bfdata = (char *) &ext[k];
1746   if (pr->bf != NULL)
1747     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
1748                                                bfdata,
1749                                                pr->bf_size);
1750   
1751   /* (3) schedule job to do it again (or another peer, etc.) */
1752   pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1753                                            get_processing_delay (), // FIXME!
1754                                            &forward_request_task,
1755                                            pr);
1756
1757   return msize;
1758 }
1759
1760
1761 /**
1762  * Function called after we've tried to reserve
1763  * a certain amount of bandwidth for a reply.
1764  * Check if we succeeded and if so send our query.
1765  *
1766  * @param cls the requests "struct PendingRequest*"
1767  * @param peer identifies the peer
1768  * @param latency current latency estimate, "FOREVER" if we have been
1769  *                disconnected
1770  * @param bpm_in set to the current bandwidth limit (receiving) for this peer
1771  * @param bpm_out set to the current bandwidth limit (sending) for this peer
1772  * @param amount set to the amount that was actually reserved or unreserved
1773  * @param preference current traffic preference for the given peer
1774  */
1775 static void
1776 target_reservation_cb (void *cls,
1777                        const struct
1778                        GNUNET_PeerIdentity * peer,
1779                        unsigned int bpm_in,
1780                        unsigned int bpm_out,
1781                        struct GNUNET_TIME_Relative
1782                        latency, int amount,
1783                        unsigned long long preference)
1784 {
1785   struct PendingRequest *pr = cls;
1786   uint32_t priority;
1787   uint16_t size;
1788   struct GNUNET_TIME_Relative maxdelay;
1789
1790   GNUNET_assert (peer != NULL);
1791   if ( (amount != DBLOCK_SIZE) ||
1792        (pr->cth != NULL) )
1793     {
1794       /* try again later; FIXME: we may need to un-reserve "amount"? */
1795       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1796                                                get_processing_delay (), // FIXME: longer?
1797                                                &forward_request_task,
1798                                                pr);
1799       return;
1800     }
1801   // (2) transmit, update ttl/priority
1802   // FIXME: calculate priority, maxdelay, size properly!
1803   priority = 0;
1804   size = 60000;
1805   maxdelay = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
1806   pr->cth = GNUNET_CORE_notify_transmit_ready (core,
1807                                                priority,
1808                                                maxdelay,
1809                                                peer,
1810                                                size,
1811                                                &transmit_request_cb,
1812                                                pr);
1813   if (pr->cth == NULL)
1814     {
1815       /* try again later */
1816       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1817                                                get_processing_delay (), // FIXME: longer?
1818                                                &forward_request_task,
1819                                                pr);
1820     }
1821 }
1822
1823
1824 /**
1825  * Task that is run for each request with the
1826  * goal of forwarding the associated query to
1827  * other peers.  The task should re-schedule
1828  * itself to be re-run once the TTL has expired.
1829  * (or at a later time if more peers should
1830  * be queried earlier).
1831  *
1832  * @param cls the requests "struct PendingRequest*"
1833  * @param tc task context (unused)
1834  */
1835 static void
1836 forward_request_task (void *cls,
1837                       const struct GNUNET_SCHEDULER_TaskContext *tc)
1838 {
1839   struct PendingRequest *pr = cls;
1840   struct PeerSelectionContext psc;
1841
1842   pr->task = GNUNET_SCHEDULER_NO_TASK;
1843   if (pr->cth != NULL) 
1844     {
1845       /* we're busy transmitting a result, wait a bit */
1846       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1847                                                get_processing_delay (), 
1848                                                &forward_request_task,
1849                                                pr);
1850       return;
1851     }
1852   /* (1) select target */
1853   psc.pr = pr;
1854   psc.target_score = DBL_MIN;
1855   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1856                                          &target_peer_select_cb,
1857                                          &psc);
1858   if (psc.target_score == DBL_MIN)
1859     {
1860       /* no possible target found, wait some time */
1861       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1862                                                get_processing_delay (), // FIXME: exponential back-off? or at least wait longer...
1863                                                &forward_request_task,
1864                                                pr);
1865       return;
1866     }
1867   /* (2) reserve reply bandwidth */
1868   // FIXME: need a way to cancel; this
1869   // async operation is problematic (segv-problematic)
1870   // if "pr" is destroyed while it happens!
1871   GNUNET_CORE_peer_configure (core,
1872                               &psc.target,
1873                               GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
1874                               -1,
1875                               DBLOCK_SIZE, // FIXME: make dependent on type?
1876                               0,
1877                               &target_reservation_cb,
1878                               pr);
1879 }
1880
1881
1882 /**
1883  * We're processing (local) results for a search request
1884  * from a (local) client.  Pass applicable results to the
1885  * client and if we are done either clean up (operation
1886  * complete) or switch to P2P search (more results possible).
1887  *
1888  * @param cls our closure (struct LocalGetContext)
1889  * @param key key for the content
1890  * @param size number of bytes in data
1891  * @param data content stored
1892  * @param type type of the content
1893  * @param priority priority of the content
1894  * @param anonymity anonymity-level for the content
1895  * @param expiration expiration time for the content
1896  * @param uid unique identifier for the datum;
1897  *        maybe 0 if no unique identifier is available
1898  */
1899 static void
1900 process_local_get_result (void *cls,
1901                           const GNUNET_HashCode * key,
1902                           uint32_t size,
1903                           const void *data,
1904                           uint32_t type,
1905                           uint32_t priority,
1906                           uint32_t anonymity,
1907                           struct GNUNET_TIME_Absolute
1908                           expiration, 
1909                           uint64_t uid)
1910 {
1911   struct LocalGetContext *lgc = cls;
1912   struct PendingRequest *pr;
1913   struct ClientRequestList *crl;
1914   struct ClientList *cl;
1915   size_t msize;
1916   unsigned int i;
1917
1918   if (key == NULL)
1919     {
1920 #if DEBUG_FS
1921       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1922                   "Received last result for `%s' from local datastore, deciding what to do next.\n",
1923                   GNUNET_h2s (&lgc->query));
1924 #endif
1925       /* no further results from datastore; continue
1926          processing further requests from the client and
1927          allow the next task to use the datastore; also,
1928          switch to P2P requests or clean up our state. */
1929       next_ds_request ();
1930       GNUNET_SERVER_receive_done (lgc->client,
1931                                   GNUNET_OK);
1932       if ( (lgc->results_used == 0) ||
1933            (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
1934            (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
1935            (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) )
1936         {
1937 #if DEBUG_FS
1938           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1939                       "Forwarding query for `%s' to network.\n",
1940                       GNUNET_h2s (&lgc->query));
1941 #endif
1942           cl = clients;
1943           while ( (NULL != cl) &&
1944                   (cl->client != lgc->client) )
1945             cl = cl->next;
1946           if (cl == NULL)
1947             {
1948               cl = GNUNET_malloc (sizeof (struct ClientList));
1949               cl->client = lgc->client;
1950               cl->next = clients;
1951               clients = cl;
1952             }
1953           crl = GNUNET_malloc (sizeof (struct ClientRequestList));
1954           crl->cl = cl;
1955           GNUNET_CONTAINER_DLL_insert (cl->head, cl->tail, crl);
1956           pr = GNUNET_malloc (sizeof (struct PendingRequest));
1957           pr->client = lgc->client;
1958           GNUNET_SERVER_client_keep (pr->client);
1959           pr->crl_entry = crl;
1960           crl->req = pr;
1961           if (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)
1962             {
1963               pr->namespace = GNUNET_malloc (sizeof (GNUNET_HashCode));
1964               *pr->namespace = lgc->namespace;
1965             }
1966           pr->replies_seen = lgc->results;
1967           lgc->results = NULL;
1968           pr->start_time = GNUNET_TIME_absolute_get ();
1969           pr->query = lgc->query;
1970           pr->target_pid = GNUNET_PEER_intern (&lgc->target);
1971           pr->replies_seen_off = lgc->results_used;
1972           pr->replies_seen_size = lgc->results_size;
1973           lgc->results_size = 0;
1974           pr->type = lgc->type;
1975           pr->anonymity_level = lgc->anonymity_level;
1976           pr->bf = refresh_bloomfilter (pr->replies_seen_off,
1977                                         &pr->mingle,
1978                                         &pr->bf_size,
1979                                         pr->replies_seen);
1980           GNUNET_CONTAINER_multihashmap_put (requests_by_query,
1981                                              &pr->query,
1982                                              pr,
1983                                              GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1984           pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1985                                                    get_processing_delay (),
1986                                                    &forward_request_task,
1987                                                    pr);
1988           local_get_context_free (lgc);
1989           return;
1990         }
1991       /* got all possible results, clean up! */
1992 #if DEBUG_FS
1993       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1994                   "Found all possible results for query for `%s', done!\n",
1995                   GNUNET_h2s (&lgc->query));
1996 #endif
1997       local_get_context_free (lgc);
1998       return;
1999     }
2000   if (type == GNUNET_DATASTORE_BLOCKTYPE_ONDEMAND)
2001     {
2002 #if DEBUG_FS
2003       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2004                   "Received on-demand block for `%s' from local datastore, fetching data.\n",
2005                   GNUNET_h2s (&lgc->query));
2006 #endif
2007       handle_on_demand_block (key, size, data, type, priority, 
2008                               anonymity, expiration, uid,
2009                               &process_local_get_result,
2010                               lgc);
2011       return;
2012     }
2013   if ( (type != lgc->type) &&
2014        (lgc->type != GNUNET_DATASTORE_BLOCKTYPE_ANY) )
2015     {
2016       /* this should be virtually impossible to reach (DBLOCK 
2017          query hash being identical to KBLOCK/SBLOCK query hash);
2018          nevertheless, if it happens, the correct thing is to
2019          simply skip the result. */
2020 #if DEBUG_FS
2021       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2022                   "Received block of unexpected type (%u, want %u) for `%s' from local datastore, ignoring.\n",
2023                   type,
2024                   lgc->type,
2025                   GNUNET_h2s (&lgc->query));
2026 #endif
2027       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);        
2028       return;
2029     }
2030   /* check if this is a result we've alredy
2031      received */
2032   for (i=0;i<lgc->results_used;i++)
2033     if (0 == memcmp (key,
2034                      &lgc->results[i],
2035                      sizeof (GNUNET_HashCode)))
2036       {
2037 #if DEBUG_FS
2038         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2039                     "Received duplicate result for `%s' from local datastore, ignoring.\n",
2040                     GNUNET_h2s (&lgc->query));
2041 #endif
2042         GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2043         return; 
2044       }
2045   if (lgc->results_used == lgc->results_size)
2046     GNUNET_array_grow (lgc->results,
2047                        lgc->results_size,
2048                        lgc->results_size * 2 + 2);
2049   GNUNET_CRYPTO_hash (data, 
2050                       size, 
2051                       &lgc->results[lgc->results_used++]);    
2052   msize = size + sizeof (struct ContentMessage);
2053   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
2054   lgc->result = GNUNET_malloc (msize);
2055   lgc->result->header.size = htons (msize);
2056   lgc->result->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT);
2057   lgc->result->type = htonl (type);
2058   lgc->result->expiration = GNUNET_TIME_absolute_hton (expiration);
2059   memcpy (&lgc->result[1],
2060           data,
2061           size);
2062 #if DEBUG_FS
2063   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2064               "Received new result for `%s' from local datastore, passing to client.\n",
2065               GNUNET_h2s (&lgc->query));
2066 #endif
2067   GNUNET_SERVER_notify_transmit_ready (lgc->client,
2068                                        msize,
2069                                        GNUNET_TIME_UNIT_FOREVER_REL,
2070                                        &transmit_local_result,
2071                                        lgc);
2072 }
2073
2074
2075 /**
2076  * We're processing a search request from a local
2077  * client.  Now it is our turn to query the datastore.
2078  * 
2079  * @param cls our closure (struct LocalGetContext)
2080  * @param tc unused
2081  */
2082 static void
2083 transmit_local_get (void *cls,
2084                     const struct GNUNET_SCHEDULER_TaskContext *tc)
2085 {
2086   struct LocalGetContext *lgc = cls;
2087   uint32_t type;
2088   
2089   type = lgc->type;
2090   if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK)
2091     type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */
2092   GNUNET_DATASTORE_get (dsh,
2093                         &lgc->query,
2094                         type,
2095                         &process_local_get_result,
2096                         lgc,
2097                         GNUNET_TIME_UNIT_FOREVER_REL);
2098 }
2099
2100
2101 /**
2102  * We're processing a search request from a local
2103  * client.  Now it is our turn to query the datastore.
2104  * 
2105  * @param cls our closure (struct LocalGetContext)
2106  * @param ok did we succeed to queue for datastore access, should always be GNUNET_OK
2107  */
2108 static void 
2109 transmit_local_get_ready (void *cls,
2110                           int ok)
2111 {
2112   struct LocalGetContext *lgc = cls;
2113
2114   GNUNET_assert (GNUNET_OK == ok);
2115   GNUNET_SCHEDULER_add_continuation (sched,
2116                                      &transmit_local_get,
2117                                      lgc,
2118                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
2119 }
2120
2121
2122 /**
2123  * Handle START_SEARCH-message (search request from client).
2124  *
2125  * @param cls closure
2126  * @param client identification of the client
2127  * @param message the actual message
2128  */
2129 static void
2130 handle_start_search (void *cls,
2131                      struct GNUNET_SERVER_Client *client,
2132                      const struct GNUNET_MessageHeader *message)
2133 {
2134   const struct SearchMessage *sm;
2135   struct LocalGetContext *lgc;
2136   uint16_t msize;
2137   unsigned int sc;
2138   
2139   msize = ntohs (message->size);
2140   if ( (msize < sizeof (struct SearchMessage)) ||
2141        (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
2142     {
2143       GNUNET_break (0);
2144       GNUNET_SERVER_receive_done (client,
2145                                   GNUNET_SYSERR);
2146       return;
2147     }
2148   sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
2149   sm = (const struct SearchMessage*) message;
2150   GNUNET_SERVER_client_keep (client);
2151   lgc = GNUNET_malloc (sizeof (struct LocalGetContext));
2152   if  (sc > 0)
2153     {
2154       lgc->results_used = sc;
2155       GNUNET_array_grow (lgc->results,
2156                          lgc->results_size,
2157                          sc * 2);
2158       memcpy (lgc->results,
2159               &sm[1],
2160               sc * sizeof (GNUNET_HashCode));
2161     }
2162   lgc->client = client;
2163   lgc->type = ntohl (sm->type);
2164   lgc->anonymity_level = ntohl (sm->anonymity_level);
2165   switch (lgc->type)
2166     {
2167     case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
2168     case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
2169       lgc->target.hashPubKey = sm->target;
2170       break;
2171     case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
2172       lgc->namespace = sm->target;
2173       break;
2174     default:
2175       break;
2176     }
2177   lgc->query = sm->query;
2178   GNUNET_CONTAINER_DLL_insert (lgc_head, lgc_tail, lgc);
2179   lgc->req = queue_ds_request (GNUNET_TIME_UNIT_FOREVER_REL,
2180                                &transmit_local_get_ready,
2181                                lgc);
2182 }
2183
2184
2185 /**
2186  * List of handlers for the messages understood by this
2187  * service.
2188  */
2189 static struct GNUNET_SERVER_MessageHandler handlers[] = {
2190   {&handle_index_start, NULL, 
2191    GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
2192   {&handle_index_list_get, NULL, 
2193    GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
2194   {&handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
2195    sizeof (struct UnindexMessage) },
2196   {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
2197    0 },
2198   {NULL, NULL, 0, 0}
2199 };
2200
2201
2202 /**
2203  * Clean up the memory used by the PendingRequest structure (except
2204  * for the client or peer list that the request may be part of).
2205  *
2206  * @param pr request to clean up
2207  */
2208 static void
2209 destroy_pending_request (struct PendingRequest *pr)
2210 {
2211   struct PendingReply *reply;
2212   struct ClientList *cl;
2213
2214   GNUNET_CONTAINER_multihashmap_remove (requests_by_query,
2215                                         &pr->query,
2216                                         pr);
2217   // FIXME: not sure how this can work (efficiently)
2218   // also, what does the return value mean?
2219   if (pr->client == NULL)
2220     {
2221       GNUNET_CONTAINER_heap_remove_node (requests_by_expiration,
2222                                          pr->hnode);
2223     }
2224   else
2225     {
2226       cl = pr->crl_entry->cl;
2227       GNUNET_CONTAINER_DLL_remove (cl->head,
2228                                    cl->tail,
2229                                    pr->crl_entry);
2230     }
2231   if (GNUNET_SCHEDULER_NO_TASK != pr->task)
2232     GNUNET_SCHEDULER_cancel (sched, pr->task);
2233   if (NULL != pr->cth)
2234     GNUNET_CORE_notify_transmit_ready_cancel (pr->cth);
2235   if (NULL != pr->bf)
2236     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
2237   if (NULL != pr->th)
2238     GNUNET_CONNECTION_notify_transmit_ready_cancel (pr->th);
2239   while (NULL != (reply = pr->replies_pending))
2240     {
2241       pr->replies_pending = reply->next;
2242       GNUNET_free (reply);
2243     }
2244   GNUNET_PEER_change_rc (pr->source_pid, -1);
2245   GNUNET_PEER_change_rc (pr->target_pid, -1);
2246   GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
2247   GNUNET_free_non_null (pr->used_pids);
2248   GNUNET_free_non_null (pr->replies_seen);
2249   GNUNET_free_non_null (pr->namespace);
2250   GNUNET_free (pr);
2251 }
2252
2253
2254 /**
2255  * A client disconnected.  Remove all of its pending queries.
2256  *
2257  * @param cls closure, NULL
2258  * @param client identification of the client
2259  */
2260 static void
2261 handle_client_disconnect (void *cls,
2262                           struct GNUNET_SERVER_Client
2263                           * client)
2264 {
2265   struct LocalGetContext *lgc;
2266   struct ClientList *cpos;
2267   struct ClientList *cprev;
2268   struct ClientRequestList *rl;
2269
2270   lgc = lgc_head;
2271   while ( (NULL != lgc) &&
2272           (lgc->client != client) )
2273     lgc = lgc->next;
2274   if (lgc != NULL)
2275     local_get_context_free (lgc);
2276   cprev = NULL;
2277   cpos = clients;
2278   while ( (NULL != cpos) &&
2279           (clients->client != client) )
2280     {
2281       cprev = cpos;
2282       cpos = cpos->next;
2283     }
2284   if (cpos != NULL)
2285     {
2286       if (cprev == NULL)
2287         clients = cpos->next;
2288       else
2289         cprev->next = cpos->next;
2290       while (NULL != (rl = cpos->head))
2291         {
2292           cpos->head = rl->next;
2293           destroy_pending_request (rl->req);
2294           GNUNET_free (rl);
2295         }
2296       GNUNET_free (cpos);
2297     }
2298 }
2299
2300
2301 /**
2302  * Iterator over entries in the "requests_by_query" map
2303  * that frees all the entries.
2304  *
2305  * @param cls closure, NULL
2306  * @param key current key code (the query, unused) 
2307  * @param value value in the hash map, of type "struct PendingRequest*"
2308  * @return GNUNET_YES (we should continue to  iterate)
2309  */
2310 static int 
2311 destroy_pending_request_cb (void *cls,
2312                             const GNUNET_HashCode * key,
2313                             void *value)
2314 {
2315   struct PendingRequest *pr = value;
2316
2317   destroy_pending_request (pr);
2318   return GNUNET_YES;
2319 }
2320
2321
2322 /**
2323  * Task run during shutdown.
2324  *
2325  * @param cls unused
2326  * @param tc unused
2327  */
2328 static void
2329 shutdown_task (void *cls,
2330                const struct GNUNET_SCHEDULER_TaskContext *tc)
2331 {
2332   struct IndexInfo *pos;  
2333
2334   if (NULL != core)
2335     {
2336       GNUNET_CORE_disconnect (core);
2337       core = NULL;
2338     }
2339   if (NULL != dsh)
2340     {
2341       GNUNET_DATASTORE_disconnect (dsh,
2342                                    GNUNET_NO);
2343       dsh = NULL;
2344     }
2345   GNUNET_CONTAINER_multihashmap_iterate (requests_by_query,
2346                                          &destroy_pending_request_cb,
2347                                          NULL);
2348   while (clients != NULL)
2349     handle_client_disconnect (NULL,
2350                               clients->client);
2351   GNUNET_CONTAINER_multihashmap_destroy (requests_by_query);
2352   requests_by_query = NULL;
2353   GNUNET_CONTAINER_multihashmap_destroy (requests_by_peer);
2354   requests_by_peer = NULL;
2355   GNUNET_CONTAINER_heap_destroy (requests_by_expiration);
2356   requests_by_expiration = NULL;
2357   // FIXME: iterate over entries and free individually?
2358   // (or do we get disconnect notifications?)
2359   GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
2360   connected_peers = NULL;
2361   GNUNET_CONTAINER_multihashmap_destroy (ifm);
2362   ifm = NULL;
2363   while (NULL != (pos = indexed_files))
2364     {
2365       indexed_files = pos->next;
2366       GNUNET_free (pos);
2367     }
2368 }
2369
2370
2371 /**
2372  * Free (each) request made by the peer.
2373  *
2374  * @param cls closure, points to peer that the request belongs to
2375  * @param key current key code
2376  * @param value value in the hash map
2377  * @return GNUNET_YES (we should continue to iterate)
2378  */
2379 static int
2380 destroy_request (void *cls,
2381                  const GNUNET_HashCode * key,
2382                  void *value)
2383 {
2384   const struct GNUNET_PeerIdentity * peer = cls;
2385   struct PendingRequest *pr = value;
2386   
2387   GNUNET_CONTAINER_multihashmap_remove (requests_by_peer,
2388                                         &peer->hashPubKey,
2389                                         pr);
2390   destroy_pending_request (pr);
2391   return GNUNET_YES;
2392 }
2393
2394
2395
2396 /**
2397  * Method called whenever a given peer connects.
2398  *
2399  * @param cls closure, not used
2400  * @param peer peer identity this notification is about
2401  */
2402 static void 
2403 peer_connect_handler (void *cls,
2404                       const struct
2405                       GNUNET_PeerIdentity * peer)
2406 {
2407   struct ConnectedPeer *cp;
2408
2409   cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
2410   cp->pid = GNUNET_PEER_intern (peer);
2411   GNUNET_CONTAINER_multihashmap_put (connected_peers,
2412                                      &peer->hashPubKey,
2413                                      cp,
2414                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
2415 }
2416
2417
2418 /**
2419  * Method called whenever a peer disconnects.
2420  *
2421  * @param cls closure, not used
2422  * @param peer peer identity this notification is about
2423  */
2424 static void
2425 peer_disconnect_handler (void *cls,
2426                          const struct
2427                          GNUNET_PeerIdentity * peer)
2428 {
2429   struct ConnectedPeer *cp;
2430
2431   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2432                                           &peer->hashPubKey);
2433   GNUNET_PEER_change_rc (cp->pid, -1);
2434   GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
2435   GNUNET_free (cp);
2436   GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_peer,
2437                                               &peer->hashPubKey,
2438                                               &destroy_request,
2439                                               (void*) peer);
2440 }
2441
2442
2443 /**
2444  * We're processing a GET request from
2445  * another peer and have decided to forward
2446  * it to other peers.
2447  *
2448  * @param cls our "struct ProcessGetContext *"
2449  * @param tc unused
2450  */
2451 static void
2452 forward_get_request (void *cls,
2453                      const struct GNUNET_SCHEDULER_TaskContext *tc)
2454 {
2455   struct ProcessGetContext *pgc = cls;
2456   struct PendingRequest *pr;
2457   struct PendingRequest *eer;
2458   struct GNUNET_PeerIdentity target;
2459
2460   pr = GNUNET_malloc (sizeof (struct PendingRequest));
2461   if (GET_MESSAGE_BIT_SKS_NAMESPACE == (GET_MESSAGE_BIT_SKS_NAMESPACE & pgc->bm))
2462     {
2463       pr->namespace = GNUNET_malloc (sizeof(GNUNET_HashCode));
2464       *pr->namespace = pgc->namespace;
2465     }
2466   pr->bf = pgc->bf;
2467   pr->bf_size = pgc->bf_size;
2468   pgc->bf = NULL;
2469   pr->start_time = pgc->start_time;
2470   pr->query = pgc->query;
2471   pr->source_pid = GNUNET_PEER_intern (&pgc->reply_to);
2472   if (GET_MESSAGE_BIT_TRANSMIT_TO == (GET_MESSAGE_BIT_TRANSMIT_TO & pgc->bm))
2473     pr->target_pid = GNUNET_PEER_intern (&pgc->prime_target);
2474   pr->anonymity_level = 1; /* default */
2475   pr->priority = pgc->priority;
2476   pr->remaining_priority = pr->priority;
2477   pr->mingle = pgc->mingle;
2478   pr->ttl = pgc->ttl; 
2479   pr->type = pgc->type;
2480   GNUNET_CONTAINER_multihashmap_put (requests_by_query,
2481                                      &pr->query,
2482                                      pr,
2483                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
2484   GNUNET_CONTAINER_multihashmap_put (requests_by_peer,
2485                                      &pgc->reply_to.hashPubKey,
2486                                      pr,
2487                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
2488   pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration,
2489                                             pr,
2490                                             pr->start_time.value + pr->ttl);
2491   if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) > max_pending_requests)
2492     {
2493       /* expire oldest request! */
2494       eer = GNUNET_CONTAINER_heap_peek (requests_by_expiration);
2495       GNUNET_PEER_resolve (eer->source_pid,
2496                            &target);    
2497       GNUNET_CONTAINER_multihashmap_remove (requests_by_peer,
2498                                             &target.hashPubKey,
2499                                             eer);
2500       destroy_pending_request (eer);     
2501     }
2502   pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2503                                            get_processing_delay (),
2504                                            &forward_request_task,
2505                                            pr);
2506   GNUNET_free (pgc); 
2507 }
2508 /**
2509  * Transmit the given message by copying it to
2510  * the target buffer "buf".  "buf" will be
2511  * NULL and "size" zero if the socket was closed for
2512  * writing in the meantime.  In that case, only
2513
2514  * free the message
2515  *
2516  * @param cls closure, pointer to the message
2517  * @param size number of bytes available in buf
2518  * @param buf where the callee should write the message
2519  * @return number of bytes written to buf
2520  */
2521 static size_t
2522 transmit_message (void *cls,
2523                   size_t size, void *buf)
2524 {
2525   struct GNUNET_MessageHeader *msg = cls;
2526   uint16_t msize;
2527   
2528   if (NULL == buf)
2529     {
2530 #if DEBUG_FS
2531       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2532                   "Dropping reply, core too busy.\n");
2533 #endif
2534       GNUNET_free (msg);
2535       return 0;
2536     }
2537   msize = ntohs (msg->size);
2538   GNUNET_assert (size >= msize);
2539   memcpy (buf, msg, msize);
2540   GNUNET_free (msg);
2541   return msize;
2542 }
2543
2544
2545 /**
2546  * Test if the load on this peer is too high
2547  * to even consider processing the query at
2548  * all.
2549  * 
2550  * @return GNUNET_YES if the load is too high, GNUNET_NO otherwise
2551  */
2552 static int
2553 test_load_too_high ()
2554 {
2555   return GNUNET_NO; // FIXME
2556 }
2557
2558
2559 /**
2560  * We're processing (local) results for a search request
2561  * from another peer.  Pass applicable results to the
2562  * peer and if we are done either clean up (operation
2563  * complete) or forward to other peers (more results possible).
2564  *
2565  * @param cls our closure (struct LocalGetContext)
2566  * @param key key for the content
2567  * @param size number of bytes in data
2568  * @param data content stored
2569  * @param type type of the content
2570  * @param priority priority of the content
2571  * @param anonymity anonymity-level for the content
2572  * @param expiration expiration time for the content
2573  * @param uid unique identifier for the datum;
2574  *        maybe 0 if no unique identifier is available
2575  */
2576 static void
2577 process_p2p_get_result (void *cls,
2578                         const GNUNET_HashCode * key,
2579                         uint32_t size,
2580                         const void *data,
2581                         uint32_t type,
2582                         uint32_t priority,
2583                         uint32_t anonymity,
2584                         struct GNUNET_TIME_Absolute
2585                         expiration, 
2586                         uint64_t uid)
2587 {
2588   struct ProcessGetContext *pgc = cls;
2589   GNUNET_HashCode dhash;
2590   GNUNET_HashCode mhash;
2591   struct PutMessage *reply;
2592   
2593   if (NULL == key)
2594     {
2595       /* no more results */
2596       if ( ( (pgc->policy & ROUTING_POLICY_FORWARD) ==  ROUTING_POLICY_FORWARD) &&
2597            ( (0 == pgc->results_found) ||
2598              (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
2599              (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
2600              (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) ) )
2601         {
2602           GNUNET_SCHEDULER_add_continuation (sched,
2603                                              &forward_get_request,
2604                                              pgc,
2605                                              GNUNET_SCHEDULER_REASON_PREREQ_DONE);
2606         }
2607       else
2608         {
2609           if (pgc->bf != NULL)
2610             GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2611           GNUNET_free (pgc); 
2612         }
2613       next_ds_request ();
2614       return;
2615     }
2616   if (type == GNUNET_DATASTORE_BLOCKTYPE_ONDEMAND)
2617     {
2618       handle_on_demand_block (key, size, data, type, priority, 
2619                               anonymity, expiration, uid,
2620                               &process_p2p_get_result,
2621                               pgc);
2622       return;
2623     }
2624   /* check for duplicates */
2625   GNUNET_CRYPTO_hash (data, size, &dhash);
2626   mingle_hash (&dhash, 
2627                pgc->mingle,
2628                &mhash);
2629   if ( (pgc->bf != NULL) &&
2630        (GNUNET_YES ==
2631         GNUNET_CONTAINER_bloomfilter_test (pgc->bf,
2632                                            &mhash)) )
2633     {      
2634 #if DEBUG_FS
2635       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2636                   "Result from datastore filtered by bloomfilter.\n");
2637 #endif
2638       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2639       return;
2640     }
2641   pgc->results_found++;
2642   if ( (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
2643        (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
2644        (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) )
2645     {
2646       if (pgc->bf == NULL)
2647         {
2648           pgc->bf_size = 32;
2649           pgc->bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
2650                                                        pgc->bf_size, 
2651                                                        BLOOMFILTER_K);
2652         }
2653       GNUNET_CONTAINER_bloomfilter_add (pgc->bf, 
2654                                         &mhash);
2655     }
2656
2657   reply = GNUNET_malloc (sizeof (struct PutMessage) + size);
2658   reply->header.size = htons (sizeof (struct PutMessage) + size);
2659   reply->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2660   reply->type = htonl (type);
2661   reply->expiration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (expiration));
2662   memcpy (&reply[1], data, size);
2663   GNUNET_CORE_notify_transmit_ready (core,
2664                                      pgc->priority,
2665                                      ACCEPTABLE_REPLY_DELAY,
2666                                      &pgc->reply_to,
2667                                      sizeof (struct PutMessage) + size,
2668                                      &transmit_message,
2669                                      reply);
2670   if ( (GNUNET_YES == test_load_too_high()) ||
2671        (pgc->results_found > 5 + 2 * pgc->priority) )
2672     {
2673       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
2674       pgc->policy &= ~ ROUTING_POLICY_FORWARD;
2675       return;
2676     }
2677   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2678 }
2679   
2680
2681 /**
2682  * We're processing a GET request from another peer.  Give it to our
2683  * local datastore.
2684  *
2685  * @param cls our "struct ProcessGetContext"
2686  * @param ok did we get a datastore slice or not?
2687  */
2688 static void
2689 ds_get_request (void *cls, 
2690                 int ok)
2691 {
2692   struct ProcessGetContext *pgc = cls;
2693   uint32_t type;
2694   struct GNUNET_TIME_Relative timeout;
2695
2696   if (GNUNET_OK != ok)
2697     {
2698       /* no point in doing P2P stuff if we can't even do local */
2699       GNUNET_free (dsh);
2700       return;
2701     }
2702   type = pgc->type;
2703   if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK)
2704     type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */
2705   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
2706                                            (pgc->priority + 1));
2707   GNUNET_DATASTORE_get (dsh,
2708                         &pgc->query,
2709                         type,
2710                         &process_p2p_get_result,
2711                         pgc,
2712                         timeout);
2713 }
2714
2715
2716 /**
2717  * The priority level imposes a bound on the maximum
2718  * value for the ttl that can be requested.
2719  *
2720  * @param ttl_in requested ttl
2721  * @param prio given priority
2722  * @return ttl_in if ttl_in is below the limit,
2723  *         otherwise the ttl-limit for the given priority
2724  */
2725 static int32_t
2726 bound_ttl (int32_t ttl_in, uint32_t prio)
2727 {
2728   unsigned long long allowed;
2729
2730   if (ttl_in <= 0)
2731     return ttl_in;
2732   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
2733   if (ttl_in > allowed)      
2734     {
2735       if (allowed >= (1 << 30))
2736         return 1 << 30;
2737       return allowed;
2738     }
2739   return ttl_in;
2740 }
2741
2742
2743 /**
2744  * We've received a request with the specified
2745  * priority.  Bound it according to how much
2746  * we trust the given peer.
2747  * 
2748  * @param prio_in requested priority
2749  * @param peer the peer making the request
2750  * @return effective priority
2751  */
2752 static uint32_t
2753 bound_priority (uint32_t prio_in,
2754                 const struct GNUNET_PeerIdentity *peer)
2755 {
2756   return 0; // FIXME!
2757 }
2758
2759
2760 /**
2761  * Handle P2P "GET" request.
2762  *
2763  * @param cls closure, always NULL
2764  * @param other the other peer involved (sender or receiver, NULL
2765  *        for loopback messages where we are both sender and receiver)
2766  * @param message the actual message
2767  * @return GNUNET_OK to keep the connection open,
2768  *         GNUNET_SYSERR to close it (signal serious error)
2769  */
2770 static int
2771 handle_p2p_get (void *cls,
2772                 const struct GNUNET_PeerIdentity *other,
2773                 const struct GNUNET_MessageHeader *message)
2774 {
2775   uint16_t msize;
2776   const struct GetMessage *gm;
2777   unsigned int bits;
2778   const GNUNET_HashCode *opt;
2779   struct ProcessGetContext *pgc;
2780   uint32_t bm;
2781   size_t bfsize;
2782   uint32_t ttl_decrement;
2783   double preference;
2784   int net_load_up;
2785   int net_load_down;
2786
2787   msize = ntohs(message->size);
2788   if (msize < sizeof (struct GetMessage))
2789     {
2790       GNUNET_break_op (0);
2791       return GNUNET_SYSERR;
2792     }
2793   gm = (const struct GetMessage*) message;
2794   bm = ntohl (gm->hash_bitmap);
2795   bits = 0;
2796   while (bm > 0)
2797     {
2798       if (1 == (bm & 1))
2799         bits++;
2800       bm >>= 1;
2801     }
2802   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
2803     {
2804       GNUNET_break_op (0);
2805       return GNUNET_SYSERR;
2806     }  
2807   opt = (const GNUNET_HashCode*) &gm[1];
2808   bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
2809   pgc = GNUNET_malloc (sizeof (struct ProcessGetContext));
2810   if (bfsize > 0)
2811     {
2812       pgc->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &pgc[1],
2813                                                    bfsize,
2814                                                    BLOOMFILTER_K);
2815       pgc->bf_size = bfsize;
2816     }
2817   pgc->type = ntohl (gm->type);
2818   pgc->bm = ntohl (gm->hash_bitmap);
2819   pgc->mingle = gm->filter_mutator;
2820   bits = 0;
2821   if (0 != (pgc->bm & GET_MESSAGE_BIT_RETURN_TO))
2822     pgc->reply_to.hashPubKey = opt[bits++];
2823   else
2824     pgc->reply_to = *other;
2825   if (0 != (pgc->bm & GET_MESSAGE_BIT_SKS_NAMESPACE))
2826     pgc->namespace = opt[bits++];
2827   else if (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)
2828     {
2829       GNUNET_break_op (0);
2830       GNUNET_free (pgc);
2831       return GNUNET_SYSERR;
2832     }
2833   if (0 != (pgc->bm & GET_MESSAGE_BIT_TRANSMIT_TO))
2834     pgc->prime_target.hashPubKey = opt[bits++];
2835   /* note that we can really only check load here since otherwise
2836      peers could find out that we are overloaded by being disconnected
2837      after sending us a malformed query... */
2838   if (GNUNET_YES == test_load_too_high ())
2839     {
2840       if (NULL != pgc->bf)
2841         GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2842       GNUNET_free (pgc);
2843 #if DEBUG_FS
2844       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2845                   "Dropping query from `%s', this peer is too busy.\n",
2846                   GNUNET_i2s (other));
2847 #endif
2848       return GNUNET_OK;
2849     }
2850   net_load_up = 50; // FIXME
2851   net_load_down = 50; // FIXME
2852   pgc->policy = ROUTING_POLICY_NONE;
2853   if ( (net_load_up < IDLE_LOAD_THRESHOLD) &&
2854        (net_load_down < IDLE_LOAD_THRESHOLD) )
2855     {
2856       pgc->policy |= ROUTING_POLICY_ALL;
2857       pgc->priority = 0; /* no charge */
2858     }
2859   else
2860     {
2861       pgc->priority = bound_priority (ntohl (gm->priority), other);
2862       if ( (net_load_up < 
2863             IDLE_LOAD_THRESHOLD + pgc->priority * pgc->priority) &&
2864            (net_load_down < 
2865             IDLE_LOAD_THRESHOLD + pgc->priority * pgc->priority) )
2866         {
2867           pgc->policy |= ROUTING_POLICY_ALL;
2868         }
2869       else
2870         {
2871           // FIXME: is this sound?
2872           if (net_load_up < 90 + 10 * pgc->priority)
2873             pgc->policy |= ROUTING_POLICY_FORWARD;
2874           if (net_load_down < 90 + 10 * pgc->priority)
2875             pgc->policy |= ROUTING_POLICY_ANSWER;
2876         }
2877     }
2878   if (pgc->policy == ROUTING_POLICY_NONE)
2879     {
2880 #if DEBUG_FS
2881       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2882                   "Dropping query from `%s', network saturated.\n",
2883                   GNUNET_i2s (other));
2884 #endif
2885       if (NULL != pgc->bf)
2886         GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2887       GNUNET_free (pgc);
2888       return GNUNET_OK;     /* drop */
2889     }
2890   if ((pgc->policy & ROUTING_POLICY_INDIRECT) != ROUTING_POLICY_INDIRECT)
2891     pgc->priority = 0;  /* kill the priority (we cannot benefit) */
2892   pgc->ttl = bound_ttl (ntohl (gm->ttl), pgc->priority);
2893   /* decrement ttl (always) */
2894   ttl_decrement = 2 * TTL_DECREMENT +
2895     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2896                               TTL_DECREMENT);
2897   if ( (pgc->ttl < 0) &&
2898        (pgc->ttl - ttl_decrement > 0) )
2899     {
2900 #if DEBUG_FS
2901       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2902                   "Dropping query from `%s' due to TTL underflow.\n",
2903                   GNUNET_i2s (other));
2904 #endif
2905       /* integer underflow => drop (should be very rare)! */
2906       if (NULL != pgc->bf)
2907         GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2908       GNUNET_free (pgc);
2909       return GNUNET_OK;
2910     }
2911   pgc->ttl -= ttl_decrement;
2912   pgc->start_time = GNUNET_TIME_absolute_get ();
2913   preference = (double) pgc->priority;
2914   if (preference < QUERY_BANDWIDTH_VALUE)
2915     preference = QUERY_BANDWIDTH_VALUE;
2916   // FIXME: also reserve bandwidth for reply?
2917   GNUNET_CORE_peer_configure (core,
2918                               other,
2919                               GNUNET_TIME_UNIT_FOREVER_REL,
2920                               0, 0, preference, NULL, NULL);
2921   if (0 != (pgc->policy & ROUTING_POLICY_ANSWER))
2922     pgc->drq = queue_ds_request (BASIC_DATASTORE_REQUEST_DELAY,
2923                                  &ds_get_request,
2924                                  pgc);
2925   else
2926     GNUNET_SCHEDULER_add_continuation (sched,
2927                                        &forward_get_request,
2928                                        pgc,
2929                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
2930   return GNUNET_OK;
2931 }
2932
2933
2934 /**
2935  * Function called to notify us that we can now transmit a reply to a
2936  * client or peer.  "buf" will be NULL and "size" zero if the socket was
2937  * closed for writing in the meantime.
2938  *
2939  * @param cls closure, points to a "struct PendingRequest*" with
2940  *            one or more pending replies
2941  * @param size number of bytes available in buf
2942  * @param buf where the callee should write the message
2943  * @return number of bytes written to buf
2944  */
2945 static size_t
2946 transmit_result (void *cls,
2947                  size_t size, 
2948                  void *buf)
2949 {
2950   struct PendingRequest *pr = cls;
2951   char *cbuf = buf;
2952   struct PendingReply *reply;
2953   size_t ret;
2954
2955   ret = 0;
2956   while (NULL != (reply = pr->replies_pending))
2957     {
2958       if ( (reply->msize + ret < ret) ||
2959            (reply->msize + ret > size) )
2960         break;
2961       pr->replies_pending = reply->next;
2962       memcpy (&cbuf[ret], &reply[1], reply->msize);
2963       ret += reply->msize;
2964       GNUNET_free (reply);
2965     }
2966   return ret;
2967 }
2968
2969
2970 /**
2971  * Iterator over pending requests.
2972  *
2973  * @param cls response (struct ProcessReplyClosure)
2974  * @param key our query
2975  * @param value value in the hash map (meta-info about the query)
2976  * @return GNUNET_YES (we should continue to iterate)
2977  */
2978 static int
2979 process_reply (void *cls,
2980                const GNUNET_HashCode * key,
2981                void *value)
2982 {
2983   struct ProcessReplyClosure *prq = cls;
2984   struct PendingRequest *pr = value;
2985   struct PendingRequest *eer;
2986   struct PendingReply *reply;
2987   struct PutMessage *pm;
2988   struct ContentMessage *cm;
2989   GNUNET_HashCode chash;
2990   GNUNET_HashCode mhash;
2991   struct GNUNET_PeerIdentity target;
2992   size_t msize;
2993   uint32_t prio;
2994   struct GNUNET_TIME_Relative max_delay;
2995   
2996   GNUNET_CRYPTO_hash (prq->data,
2997                       prq->size,
2998                       &chash);
2999   switch (prq->type)
3000     {
3001     case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
3002     case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
3003       break;
3004     case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
3005       /* FIXME: does prq->namespace match our expectations? */
3006       /* then: fall-through??? */
3007     case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
3008       if (pr->bf != NULL) 
3009         {
3010           mingle_hash (&chash, pr->mingle, &mhash);
3011           if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (pr->bf,
3012                                                                &mhash))
3013             return GNUNET_YES; /* duplicate */
3014           GNUNET_CONTAINER_bloomfilter_add (pr->bf,
3015                                             &mhash);
3016         }
3017       break;
3018     case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK:
3019       // FIXME: any checks against duplicates for SKBlocks?
3020       break;
3021     }
3022   prio = pr->priority;
3023   prq->priority += pr->remaining_priority;
3024   pr->remaining_priority = 0;
3025   if (pr->client != NULL)
3026     {
3027       if (pr->replies_seen_size == pr->replies_seen_off)
3028         GNUNET_array_grow (pr->replies_seen,
3029                            pr->replies_seen_size,
3030                            pr->replies_seen_size * 2 + 4);
3031       pr->replies_seen[pr->replies_seen_off++] = chash;
3032       // FIXME: possibly recalculate BF!
3033     }
3034   if (pr->client == NULL)
3035     {
3036       msize = sizeof (struct ContentMessage) + prq->size;
3037       reply = GNUNET_malloc (msize + sizeof (struct PendingReply));
3038       reply->msize = msize;
3039       cm = (struct ContentMessage*) &reply[1];
3040       cm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT);
3041       cm->header.size = htons (msize);
3042       cm->type = htonl (prq->type);
3043       cm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3044       reply->next = pr->replies_pending;
3045       pr->replies_pending = reply;
3046       memcpy (&reply[1], prq->data, prq->size);
3047       if (pr->cth != NULL)
3048         return GNUNET_YES;
3049       max_delay = GNUNET_TIME_UNIT_FOREVER_REL;
3050       if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) >= max_pending_requests)
3051         {
3052           /* estimate expiration time from time difference between
3053              first request that will be discarded and this request */
3054           eer = GNUNET_CONTAINER_heap_peek (requests_by_expiration);
3055           max_delay = GNUNET_TIME_absolute_get_difference (pr->start_time,
3056                                                            eer->start_time);
3057         }
3058       GNUNET_PEER_resolve (pr->source_pid,
3059                            &target);
3060       pr->cth = GNUNET_CORE_notify_transmit_ready (core,
3061                                                    prio,
3062                                                    max_delay,
3063                                                    &target,
3064                                                    msize,
3065                                                    &transmit_result,
3066                                                    pr);
3067       if (NULL == pr->cth)
3068         {
3069           // FIXME: now what? discard?
3070         }
3071     }
3072   else
3073     {
3074       msize = sizeof (struct PutMessage) + prq->size;
3075       reply = GNUNET_malloc (msize + sizeof (struct PendingReply));
3076       reply->msize = msize;
3077       reply->next = pr->replies_pending;
3078       pm = (struct PutMessage*) &reply[1];
3079       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3080       pm->header.size = htons (msize);
3081       pm->type = htonl (prq->type);
3082       pm->expiration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (prq->expiration));
3083       pr->replies_pending = reply;
3084       memcpy (&reply[1], prq->data, prq->size);
3085       if (pr->th != NULL)
3086         return GNUNET_YES;
3087       pr->th = GNUNET_SERVER_notify_transmit_ready (pr->client,
3088                                                     msize,
3089                                                     GNUNET_TIME_UNIT_FOREVER_REL,
3090                                                     &transmit_result,
3091                                                     pr);
3092       if (pr->th == NULL)
3093         {
3094           // FIXME: need to try again later (not much
3095           // to do here specifically, but we need to
3096           // check somewhere else to handle this case!)
3097         }
3098     }
3099   // FIXME: implement hot-path routing statistics keeping!
3100   return GNUNET_YES;
3101 }
3102
3103
3104 /**
3105  * Check if the given KBlock is well-formed.
3106  *
3107  * @param kb the kblock data (or at least "dsize" bytes claiming to be one)
3108  * @param dsize size of "kb" in bytes; check for < sizeof(struct KBlock)!
3109  * @param query where to store the query that this block answers
3110  * @return GNUNET_OK if this is actually a well-formed KBlock
3111  */
3112 static int
3113 check_kblock (const struct KBlock *kb,
3114               size_t dsize,
3115               GNUNET_HashCode *query)
3116 {
3117   if (dsize < sizeof (struct KBlock))
3118     {
3119       GNUNET_break_op (0);
3120       return GNUNET_SYSERR;
3121     }
3122   if (dsize - sizeof (struct KBlock) !=
3123       ntohs (kb->purpose.size) 
3124       - sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) 
3125       - sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) ) 
3126     {
3127       GNUNET_break_op (0);
3128       return GNUNET_SYSERR;
3129     }
3130   if (GNUNET_OK !=
3131       GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_KBLOCK,
3132                                 &kb->purpose,
3133                                 &kb->signature,
3134                                 &kb->keyspace)) 
3135     {
3136       GNUNET_break_op (0);
3137       return GNUNET_SYSERR;
3138     }
3139   if (query != NULL)
3140     GNUNET_CRYPTO_hash (&kb->keyspace,
3141                         sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
3142                         query);
3143   return GNUNET_OK;
3144 }
3145
3146
3147 /**
3148  * Check if the given SBlock is well-formed.
3149  *
3150  * @param sb the sblock data (or at least "dsize" bytes claiming to be one)
3151  * @param dsize size of "kb" in bytes; check for < sizeof(struct SBlock)!
3152  * @param query where to store the query that this block answers
3153  * @param namespace where to store the namespace that this block belongs to
3154  * @return GNUNET_OK if this is actually a well-formed SBlock
3155  */
3156 static int
3157 check_sblock (const struct SBlock *sb,
3158               size_t dsize,
3159               GNUNET_HashCode *query,   
3160               GNUNET_HashCode *namespace)
3161 {
3162   if (dsize < sizeof (struct SBlock))
3163     {
3164       GNUNET_break_op (0);
3165       return GNUNET_SYSERR;
3166     }
3167   if (dsize !=
3168       ntohs (sb->purpose.size) + sizeof (struct GNUNET_CRYPTO_RsaSignature))
3169     {
3170       GNUNET_break_op (0);
3171       return GNUNET_SYSERR;
3172     }
3173   if (GNUNET_OK !=
3174       GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_SBLOCK,
3175                                 &sb->purpose,
3176                                 &sb->signature,
3177                                 &sb->subspace)) 
3178     {
3179       GNUNET_break_op (0);
3180       return GNUNET_SYSERR;
3181     }
3182   if (query != NULL)
3183     *query = sb->identifier;
3184   if (namespace != NULL)
3185     GNUNET_CRYPTO_hash (&sb->subspace,
3186                         sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
3187                         namespace);
3188   return GNUNET_OK;
3189 }
3190
3191
3192 /**
3193  * Handle P2P "PUT" request.
3194  *
3195  * @param cls closure, always NULL
3196  * @param other the other peer involved (sender or receiver, NULL
3197  *        for loopback messages where we are both sender and receiver)
3198  * @param message the actual message
3199  * @return GNUNET_OK to keep the connection open,
3200  *         GNUNET_SYSERR to close it (signal serious error)
3201  */
3202 static int
3203 handle_p2p_put (void *cls,
3204                 const struct GNUNET_PeerIdentity *other,
3205                 const struct GNUNET_MessageHeader *message)
3206 {
3207   const struct PutMessage *put;
3208   uint16_t msize;
3209   size_t dsize;
3210   uint32_t type;
3211   struct GNUNET_TIME_Absolute expiration;
3212   GNUNET_HashCode query;
3213   struct ProcessReplyClosure prq;
3214
3215   msize = ntohs (message->size);
3216   if (msize < sizeof (struct PutMessage))
3217     {
3218       GNUNET_break_op(0);
3219       return GNUNET_SYSERR;
3220     }
3221   put = (const struct PutMessage*) message;
3222   dsize = msize - sizeof (struct PutMessage);
3223   type = ntohl (put->type);
3224   expiration = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (put->expiration));
3225
3226   /* first, validate! */
3227   switch (type)
3228     {
3229     case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
3230     case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
3231       GNUNET_CRYPTO_hash (&put[1], dsize, &query);
3232       break;
3233     case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
3234       if (GNUNET_OK !=
3235           check_kblock ((const struct KBlock*) &put[1],
3236                         dsize,
3237                         &query))
3238         return GNUNET_SYSERR;
3239       break;
3240     case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
3241       if (GNUNET_OK !=
3242           check_sblock ((const struct SBlock*) &put[1],
3243                         dsize,
3244                         &query,
3245                         &prq.namespace))
3246         return GNUNET_SYSERR;
3247       break;
3248     case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK:
3249       // FIXME -- validate SKBLOCK!
3250       GNUNET_break (0);
3251       return GNUNET_OK;
3252     default:
3253       /* unknown block type */
3254       GNUNET_break_op (0);
3255       return GNUNET_SYSERR;
3256     }
3257
3258   /* now, lookup 'query' */
3259   prq.data = (const void*) &put[1];
3260   prq.size = dsize;
3261   prq.type = type;
3262   prq.expiration = expiration;
3263   prq.priority = 0;
3264   GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_query,
3265                                               &query,
3266                                               &process_reply,
3267                                               &prq);
3268   // FIXME: if migration is on and load is low,
3269   // queue to store data in datastore;
3270   // use "prq.priority" for that!
3271   return GNUNET_OK;
3272 }
3273
3274
3275 /**
3276  * List of handlers for P2P messages
3277  * that we care about.
3278  */
3279 static struct GNUNET_CORE_MessageHandler p2p_handlers[] =
3280   {
3281     { &handle_p2p_get, 
3282       GNUNET_MESSAGE_TYPE_FS_GET, 0 },
3283     { &handle_p2p_put, 
3284       GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
3285     { NULL, 0, 0 }
3286   };
3287
3288
3289 /**
3290  * Process fs requests.
3291  *
3292  * @param cls closure
3293  * @param s scheduler to use
3294  * @param server the initialized server
3295  * @param c configuration to use
3296  */
3297 static void
3298 run (void *cls,
3299      struct GNUNET_SCHEDULER_Handle *s,
3300      struct GNUNET_SERVER_Handle *server,
3301      const struct GNUNET_CONFIGURATION_Handle *c)
3302 {
3303   sched = s;
3304   cfg = c;
3305
3306   ifm = GNUNET_CONTAINER_multihashmap_create (128);
3307   requests_by_query = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
3308   requests_by_peer = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
3309   connected_peers = GNUNET_CONTAINER_multihashmap_create (64);
3310   requests_by_expiration = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
3311   read_index_list ();
3312   dsh = GNUNET_DATASTORE_connect (cfg,
3313                                   sched);
3314   core = GNUNET_CORE_connect (sched,
3315                               cfg,
3316                               GNUNET_TIME_UNIT_FOREVER_REL,
3317                               NULL,
3318                               NULL,
3319                               &peer_connect_handler,
3320                               &peer_disconnect_handler,
3321                               NULL, 
3322                               NULL, GNUNET_NO,
3323                               NULL, GNUNET_NO,
3324                               p2p_handlers);
3325
3326   GNUNET_SERVER_disconnect_notify (server, 
3327                                    &handle_client_disconnect,
3328                                    NULL);
3329   GNUNET_SERVER_add_handlers (server, handlers);
3330   GNUNET_SCHEDULER_add_delayed (sched,
3331                                 GNUNET_TIME_UNIT_FOREVER_REL,
3332                                 &shutdown_task,
3333                                 NULL);
3334   if (NULL == dsh)
3335     {
3336       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3337                   _("Failed to connect to `%s' service.\n"),
3338                   "datastore");
3339       GNUNET_SCHEDULER_shutdown (sched);
3340       return;
3341     }
3342   if (NULL == core)
3343     {
3344       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3345                   _("Failed to connect to `%s' service.\n"),
3346                   "core");
3347       GNUNET_SCHEDULER_shutdown (sched);
3348       return;
3349     }
3350 }
3351
3352
3353 /**
3354  * The main function for the fs service.
3355  *
3356  * @param argc number of arguments from the command line
3357  * @param argv command line arguments
3358  * @return 0 ok, 1 on error
3359  */
3360 int
3361 main (int argc, char *const *argv)
3362 {
3363   return (GNUNET_OK ==
3364           GNUNET_SERVICE_run (argc,
3365                               argv,
3366                               "fs",
3367                               GNUNET_SERVICE_OPTION_NONE,
3368                               &run, NULL)) ? 0 : 1;
3369 }
3370
3371 /* end of gnunet-service-fs.c */