debug code and fixes
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs.c
23  * @brief program that provides the file-sharing service
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - fix gazillion of minor FIXME's
28  * - possible major issue: we may queue "gazillions" of (K|S)Blocks for the
29  *   core to transmit to another peer; need to make sure this is bounded overall...
30  * - randomly delay processing for improved anonymity (can wait)
31  * - content migration (put in local DS) (can wait)
32  * - handle some special cases when forwarding replies based on tracked requests (can wait)
33  * - tracking of success correlations for hot-path routing (can wait)
34  * - various load-based actions (can wait)
35  * - validation of KSBLOCKS (can wait)
36  * - remove on-demand blocks if they keep failing (can wait)
37  * - check that we decrement PIDs always where necessary (can wait)
38  * - find out how to use core-pulling instead of pushing (at least for some cases)
39  */
40 #include "platform.h"
41 #include <float.h>
42 #include "gnunet_core_service.h"
43 #include "gnunet_datastore_service.h"
44 #include "gnunet_peer_lib.h"
45 #include "gnunet_protocols.h"
46 #include "gnunet_signatures.h"
47 #include "gnunet_util_lib.h"
48 #include "fs.h"
49
50
51 /**
52  * In-memory information about indexed files (also available
53  * on-disk).
54  */
55 struct IndexInfo
56 {
57   
58   /**
59    * This is a linked list.
60    */
61   struct IndexInfo *next;
62
63   /**
64    * Name of the indexed file.  Memory allocated
65    * at the end of this struct (do not free).
66    */
67   const char *filename;
68
69   /**
70    * Context for transmitting confirmation to client,
71    * NULL if we've done this already.
72    */
73   struct GNUNET_SERVER_TransmitContext *tc;
74   
75   /**
76    * Hash of the contents of the file.
77    */
78   GNUNET_HashCode file_id;
79
80 };
81
82
83 /**
84  * Signature of a function that is called whenever a datastore
85  * request can be processed (or an entry put on the queue times out).
86  *
87  * @param cls closure
88  * @param ok GNUNET_OK if DS is ready, GNUNET_SYSERR on timeout
89  */
90 typedef void (*RequestFunction)(void *cls,
91                                 int ok);
92
93
94 /**
95  * Doubly-linked list of our requests for the datastore.
96  */
97 struct DatastoreRequestQueue
98 {
99
100   /**
101    * This is a doubly-linked list.
102    */
103   struct DatastoreRequestQueue *next;
104
105   /**
106    * This is a doubly-linked list.
107    */
108   struct DatastoreRequestQueue *prev;
109
110   /**
111    * Function to call (will issue the request).
112    */
113   RequestFunction req;
114
115   /**
116    * Closure for req.
117    */
118   void *req_cls;
119
120   /**
121    * When should this request time-out because we don't care anymore?
122    */
123   struct GNUNET_TIME_Absolute timeout;
124     
125   /**
126    * ID of task used for signaling timeout.
127    */
128   GNUNET_SCHEDULER_TaskIdentifier task;
129
130 };
131
132
133 /**
134  * Closure for processing START_SEARCH messages from a client.
135  */
136 struct LocalGetContext
137 {
138
139   /**
140    * This is a doubly-linked list.
141    */
142   struct LocalGetContext *next;
143
144   /**
145    * This is a doubly-linked list.
146    */
147   struct LocalGetContext *prev;
148
149   /**
150    * Client that initiated the search.
151    */
152   struct GNUNET_SERVER_Client *client;
153
154   /**
155    * Array of results that we've already received 
156    * (can be NULL).
157    */
158   GNUNET_HashCode *results; 
159
160   /**
161    * Bloomfilter over all results (for fast query construction);
162    * NULL if we don't have any results.
163    *
164    * FIXME: this member is not used, is that OK? If so, it should
165    * be removed!
166    */
167   struct GNUNET_CONTAINER_BloomFilter *results_bf; 
168
169   /**
170    * DS request associated with this operation.
171    */
172   struct DatastoreRequestQueue *req;
173
174   /**
175    * Current result message to transmit to client (or NULL).
176    */
177   struct ContentMessage *result;
178   
179   /**
180    * Type of the content that we're looking for.
181    * 0 for any.
182    */
183   uint32_t type;
184
185   /**
186    * Desired anonymity level.
187    */
188   uint32_t anonymity_level;
189
190   /**
191    * Number of results actually stored in the results array.
192    */
193   unsigned int results_used;
194   
195   /**
196    * Size of the results array in memory.
197    */
198   unsigned int results_size;
199   
200   /**
201    * Size (in bytes) of the 'results_bf' bloomfilter.
202    *
203    * FIXME: this member is not used, is that OK? If so, it should
204    * be removed!
205    */
206   size_t results_bf_size;
207
208   /**
209    * If the request is for a DBLOCK or IBLOCK, this is the identity of
210    * the peer that is known to have a response.  Set to all-zeros if
211    * such a target is not known (note that even if OUR anonymity
212    * level is >0 we may happen to know the responder's identity;
213    * nevertheless, we should probably not use it for a DHT-lookup
214    * or similar blunt actions in order to avoid exposing ourselves).
215    */
216   struct GNUNET_PeerIdentity target;
217
218   /**
219    * If the request is for an SBLOCK, this is the identity of the
220    * pseudonym to which the SBLOCK belongs. 
221    */
222   GNUNET_HashCode namespace;
223
224   /**
225    * Hash of the keyword (aka query) for KBLOCKs; Hash of
226    * the CHK-encoded block for DBLOCKS and IBLOCKS (aka query)
227    * and hash of the identifier XORed with the target for
228    * SBLOCKS (aka query).
229    */
230   GNUNET_HashCode query;
231
232 };
233
234
235 /**
236  * Possible routing policies for an FS-GET request.
237  */
238 enum RoutingPolicy
239   {
240     /**
241      * Simply drop the request.
242      */
243     ROUTING_POLICY_NONE = 0,
244     
245     /**
246      * Answer it if we can from local datastore.
247      */
248     ROUTING_POLICY_ANSWER = 1,
249
250     /**
251      * Forward the request to other peers (if possible).
252      */
253     ROUTING_POLICY_FORWARD = 2,
254
255     /**
256      * Forward to other peers, and ask them to route
257      * the response via ourselves.
258      */
259     ROUTING_POLICY_INDIRECT = 6,
260     
261     /**
262      * Do everything we could possibly do (that would
263      * make sense).
264      */
265     ROUTING_POLICY_ALL = 7
266   };
267
268
269 /**
270  * Internal context we use for our initial processing
271  * of a GET request.
272  */
273 struct ProcessGetContext
274 {
275   /**
276    * The search query (used for datastore lookup).
277    */
278   GNUNET_HashCode query;
279   
280   /**
281    * Which peer we should forward the response to.
282    */
283   struct GNUNET_PeerIdentity reply_to;
284
285   /**
286    * Namespace for the result (only set for SKS requests)
287    */
288   GNUNET_HashCode namespace;
289
290   /**
291    * Peer that we should forward the query to if possible
292    * (since that peer likely has the content).
293    */
294   struct GNUNET_PeerIdentity prime_target;
295
296   /**
297    * When did we receive this request?
298    */
299   struct GNUNET_TIME_Absolute start_time;
300
301   /**
302    * Our entry in the DRQ (non-NULL while we wait for our
303    * turn to interact with the local database).
304    */
305   struct DatastoreRequestQueue *drq;
306
307   /**
308    * Filter used to eliminate duplicate results.  Can be NULL if we
309    * are not yet filtering any results.
310    */
311   struct GNUNET_CONTAINER_BloomFilter *bf;
312
313   /**
314    * Bitmap describing which of the optional
315    * hash codes / peer identities were given to us.
316    */
317   uint32_t bm;
318
319   /**
320    * Desired block type.
321    */
322   uint32_t type;
323
324   /**
325    * Priority of the request.
326    */
327   uint32_t priority;
328
329   /**
330    * Size of the 'bf' (in bytes).
331    */
332   size_t bf_size;
333
334   /**
335    * In what ways are we going to process
336    * the request?
337    */
338   enum RoutingPolicy policy;
339
340   /**
341    * Time-to-live for the request (value
342    * we use).
343    */
344   int32_t ttl;
345
346   /**
347    * Number to mingle hashes for bloom-filter
348    * tests with.
349    */
350   int32_t mingle;
351
352   /**
353    * Number of results that were found so far.
354    */
355   unsigned int results_found;
356 };
357
358
359 /**
360  * Information we keep for each pending reply.
361  */
362 struct PendingReply
363 {
364   /**
365    * This is a linked list.
366    */
367   struct PendingReply *next;
368
369   /**
370    * Size of the reply; actual reply message follows
371    * at the end of this struct.
372    */
373   size_t msize;
374
375 };
376
377
378 /**
379  * All requests from a client are kept in a doubly-linked list.
380  */
381 struct ClientRequestList;
382
383
384 /**
385  * Information we keep for each pending request.  We should try to
386  * keep this struct as small as possible since its memory consumption
387  * is key to how many requests we can have pending at once.
388  */
389 struct PendingRequest
390 {
391
392   /**
393    * ID of a client making a request, NULL if this entry is for a
394    * peer.
395    */
396   struct GNUNET_SERVER_Client *client;
397
398   /**
399    * If this request was made by a client,
400    * this is our entry in the client request
401    * list; otherwise NULL.
402    */
403   struct ClientRequestList *crl_entry;
404
405   /**
406    * If this is a namespace query, pointer to the hash of the public
407    * key of the namespace; otherwise NULL.
408    */
409   GNUNET_HashCode *namespace;
410
411   /**
412    * Bloomfilter we use to filter out replies that we don't care about
413    * (anymore).  NULL as long as we are interested in all replies.
414    */
415   struct GNUNET_CONTAINER_BloomFilter *bf;
416
417   /**
418    * Replies that we have received but were unable to forward yet
419    * (typically non-null only if we have a pending transmission
420    * request with the client or the respective peer).
421    */
422   struct PendingReply *replies_pending;
423
424   /**
425    * Pending transmission request with the core service for the target
426    * peer (for processing of 'replies_pending') or Handle for a
427    * pending query-request for P2P-transmission with the core service.
428    * If non-NULL, this request must be cancelled should this struct be
429    * destroyed!
430    */
431   struct GNUNET_CORE_TransmitHandle *cth;
432
433   /**
434    * Pending transmission request for the target client (for processing of
435    * 'replies_pending').
436    */
437   struct GNUNET_CONNECTION_TransmitHandle *th;
438
439   /**
440    * Hash code of all replies that we have seen so far (only valid
441    * if client is not NULL since we only track replies like this for
442    * our own clients).
443    */
444   GNUNET_HashCode *replies_seen;
445
446   /**
447    * When did we first see this request (form this peer), or, if our
448    * client is initiating, when did we last initiate a search?
449    */
450   struct GNUNET_TIME_Absolute start_time;
451
452   /**
453    * The query that this request is for.
454    */
455   GNUNET_HashCode query;
456
457   /**
458    * The task responsible for transmitting queries
459    * for this request.
460    */
461   GNUNET_SCHEDULER_TaskIdentifier task;
462
463   /**
464    * (Interned) Peer identifier (only valid if "client" is NULL)
465    * that identifies a peer that gave us this request.
466    */
467   GNUNET_PEER_Id source_pid;
468
469   /**
470    * (Interned) Peer identifier that identifies a preferred target
471    * for requests.
472    */
473   GNUNET_PEER_Id target_pid;
474
475   /**
476    * (Interned) Peer identifiers of peers that have already
477    * received our query for this content.
478    */
479   GNUNET_PEER_Id *used_pids;
480
481   /**
482    * Size of the 'bf' (in bytes).
483    */
484   size_t bf_size;
485
486   /**
487    * Desired anonymity level; only valid for requests from a local client.
488    */
489   uint32_t anonymity_level;
490
491   /**
492    * How many entries in "used_pids" are actually valid?
493    */
494   unsigned int used_pids_off;
495
496   /**
497    * How long is the "used_pids" array?
498    */
499   unsigned int used_pids_size;
500
501   /**
502    * How many entries in "replies_seen" are actually valid?
503    */
504   unsigned int replies_seen_off;
505
506   /**
507    * How long is the "replies_seen" array?
508    */
509   unsigned int replies_seen_size;
510   
511   /**
512    * Priority with which this request was made.  If one of our clients
513    * made the request, then this is the current priority that we are
514    * using when initiating the request.  This value is used when
515    * we decide to reward other peers with trust for providing a reply.
516    */
517   uint32_t priority;
518
519   /**
520    * Priority points left for us to spend when forwarding this request
521    * to other peers.
522    */
523   uint32_t remaining_priority;
524
525   /**
526    * Number to mingle hashes for bloom-filter
527    * tests with.
528    */
529   int32_t mingle;
530
531   /**
532    * TTL with which we saw this request (or, if we initiated, TTL that
533    * we used for the request).
534    */
535   int32_t ttl;
536   
537   /**
538    * Type of the content that this request is for.
539    */
540   uint32_t type;
541
542 };
543
544
545 /**
546  * All requests from a client are kept in a doubly-linked list.
547  */
548 struct ClientRequestList
549 {
550   /**
551    * This is a doubly-linked list.
552    */
553   struct ClientRequestList *next;
554
555   /**
556    * This is a doubly-linked list.
557    */ 
558   struct ClientRequestList *prev;
559
560   /**
561    * A request from this client.
562    */
563   struct PendingRequest *req;
564
565   /**
566    * Client list with the head and tail of this DLL.
567    */
568   struct ClientList *cl;
569 };
570
571
572 /**
573  * Linked list of all clients that we are 
574  * currently processing requests for.
575  */
576 struct ClientList
577 {
578
579   /**
580    * This is a linked list.
581    */
582   struct ClientList *next;
583
584   /**
585    * What client is this entry for?
586    */
587   struct GNUNET_SERVER_Client* client;
588
589   /**
590    * Head of the DLL of requests from this client.
591    */
592   struct ClientRequestList *head;
593
594   /**
595    * Tail of the DLL of requests from this client.
596    */
597   struct ClientRequestList *tail;
598
599 };
600
601
602 /**
603  * Closure for "process_reply" function.
604  */
605 struct ProcessReplyClosure
606 {
607   /**
608    * The data for the reply.
609    */
610   const void *data;
611
612   /**
613    * When the reply expires.
614    */
615   struct GNUNET_TIME_Absolute expiration;
616
617   /**
618    * Size of data.
619    */
620   size_t size;
621
622   /**
623    * Namespace that this reply belongs to
624    * (if it is of type SBLOCK).
625    */
626   GNUNET_HashCode namespace;
627
628   /**
629    * Type of the block.
630    */
631   uint32_t type;
632
633   /**
634    * How much was this reply worth to us?
635    */
636   uint32_t priority;
637 };
638
639
640 /**
641  * Information about a peer that we are connected to.
642  * We track data that is useful for determining which
643  * peers should receive our requests.
644  */
645 struct ConnectedPeer
646 {
647
648   /**
649    * List of the last clients for which this peer
650    * successfully answered a query. 
651    */
652   struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
653
654   /**
655    * List of the last PIDs for which
656    * this peer successfully answered a query;
657    * We use 0 to indicate no successful reply.
658    */
659   GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
660
661   /**
662    * Average delay between sending the peer a request and
663    * getting a reply (only calculated over the requests for
664    * which we actually got a reply).   Calculated
665    * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
666    */ 
667   struct GNUNET_TIME_Relative avg_delay;
668
669   /**
670    * Average priority of successful replies.  Calculated
671    * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
672    */
673   double avg_priority;
674
675   /**
676    * The peer's identity.
677    */
678   GNUNET_PEER_Id pid;  
679
680   /**
681    * Number of requests we have currently pending
682    * with this peer (that is, requests that were
683    * transmitted so recently that we would not retransmit
684    * them right now).
685    */
686   unsigned int pending_requests;
687
688   /**
689    * Which offset in "last_p2p_replies" will be updated next?
690    * (we go round-robin).
691    */
692   unsigned int last_p2p_replies_woff;
693
694   /**
695    * Which offset in "last_client_replies" will be updated next?
696    * (we go round-robin).
697    */
698   unsigned int last_client_replies_woff;
699
700 };
701
702
703 /**
704  * Our connection to the datastore.
705  */
706 static struct GNUNET_DATASTORE_Handle *dsh;
707
708 /**
709  * Our scheduler.
710  */
711 static struct GNUNET_SCHEDULER_Handle *sched;
712
713 /**
714  * Our configuration.
715  */
716 const struct GNUNET_CONFIGURATION_Handle *cfg;
717
718 /**
719  * Handle to the core service (NULL until we've connected to it).
720  */
721 struct GNUNET_CORE_Handle *core;
722
723 /**
724  * Head of doubly-linked LGC list.
725  */
726 static struct LocalGetContext *lgc_head;
727
728 /**
729  * Tail of doubly-linked LGC list.
730  */
731 static struct LocalGetContext *lgc_tail;
732
733 /**
734  * Head of request queue for the datastore, sorted by timeout.
735  */
736 static struct DatastoreRequestQueue *drq_head;
737
738 /**
739  * Tail of request queue for the datastore.
740  */
741 static struct DatastoreRequestQueue *drq_tail;
742
743 /**
744  * Linked list of indexed files.
745  */
746 static struct IndexInfo *indexed_files;
747
748 /**
749  * Maps hash over content of indexed files to the respective filename.
750  * The filenames are pointers into the indexed_files linked list and
751  * do not need to be freed.
752  */
753 static struct GNUNET_CONTAINER_MultiHashMap *ifm;
754
755 /**
756  * Map of query hash codes to requests.
757  */
758 static struct GNUNET_CONTAINER_MultiHashMap *requests_by_query;
759
760 /**
761  * Map of peer IDs to requests (for those requests coming
762  * from other peers).
763  */
764 static struct GNUNET_CONTAINER_MultiHashMap *requests_by_peer;
765
766 /**
767  * Linked list of all of our clients and their requests.
768  */
769 static struct ClientList *clients;
770
771 /**
772  * Heap with the request that will expire next at the top.  Contains
773  * pointers of type "struct PendingRequest*"; these will *also* be
774  * aliased from the "requests_by_peer" data structures and the
775  * "requests_by_query" table.  Note that requests from our clients
776  * don't expire and are thus NOT in the "requests_by_expiration"
777  * (or the "requests_by_peer" tables).
778  */
779 static struct GNUNET_CONTAINER_Heap *requests_by_expiration;
780
781 /**
782  * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
783  */
784 static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
785
786 /**
787  * Maximum number of requests (from other peers) that we're
788  * willing to have pending at any given point in time.
789  * FIXME: set from configuration (and 32 is a tiny value for testing only).
790  */
791 static uint64_t max_pending_requests = 32;
792
793 /**
794  * Write the current index information list to disk.
795  */ 
796 static void
797 write_index_list ()
798 {
799   struct GNUNET_BIO_WriteHandle *wh;
800   char *fn;
801   struct IndexInfo *pos;  
802
803   if (GNUNET_OK !=
804       GNUNET_CONFIGURATION_get_value_filename (cfg,
805                                                "FS",
806                                                "INDEXDB",
807                                                &fn))
808     {
809       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
810                   _("Configuration option `%s' in section `%s' missing.\n"),
811                   "INDEXDB",
812                   "FS");
813       return;
814     }
815   wh = GNUNET_BIO_write_open (fn);
816   if (NULL == wh)
817     {
818       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
819                   _("Could not open `%s'.\n"),
820                   fn);
821       GNUNET_free (fn);
822       return;
823     }
824   pos = indexed_files;
825   while (pos != NULL)
826     {
827       if ( (GNUNET_OK !=
828             GNUNET_BIO_write (wh,
829                               &pos->file_id,
830                               sizeof (GNUNET_HashCode))) ||
831            (GNUNET_OK !=
832             GNUNET_BIO_write_string (wh,
833                                      pos->filename)) )
834         break;
835       pos = pos->next;
836     }
837   if (GNUNET_OK != 
838       GNUNET_BIO_write_close (wh))
839     {
840       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
841                   _("Error writing `%s'.\n"),
842                   fn);
843       GNUNET_free (fn);
844       return;
845     }
846   GNUNET_free (fn);
847 }
848
849
850 /**
851  * Read index information from disk.
852  */
853 static void
854 read_index_list ()
855 {
856   struct GNUNET_BIO_ReadHandle *rh;
857   char *fn;
858   struct IndexInfo *pos;  
859   char *fname;
860   GNUNET_HashCode hc;
861   size_t slen;
862   char *emsg;
863
864   if (GNUNET_OK !=
865       GNUNET_CONFIGURATION_get_value_filename (cfg,
866                                                "FS",
867                                                "INDEXDB",
868                                                &fn))
869     {
870       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
871                   _("Configuration option `%s' in section `%s' missing.\n"),
872                   "INDEXDB",
873                   "FS");
874       return;
875     }
876   rh = GNUNET_BIO_read_open (fn);
877   if (NULL == rh)
878     {
879       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
880                   _("Could not open `%s'.\n"),
881                   fn);
882       GNUNET_free (fn);
883       return;
884     }
885
886   while ( (GNUNET_OK ==
887            GNUNET_BIO_read (rh,
888                             "Hash of indexed file",
889                             &hc,
890                             sizeof (GNUNET_HashCode))) &&
891           (GNUNET_OK ==
892            GNUNET_BIO_read_string (rh, 
893                                    "Name of indexed file",
894                                    &fname,
895                                    1024 * 16)) )
896     {
897       slen = strlen (fname) + 1;
898       pos = GNUNET_malloc (sizeof (struct IndexInfo) + slen);
899       pos->file_id = hc;
900       pos->filename = (const char *) &pos[1];
901       memcpy (&pos[1], fname, slen);
902       if (GNUNET_SYSERR ==
903           GNUNET_CONTAINER_multihashmap_put (ifm,
904                                              &hc,
905                                              (void*) pos->filename,
906                                              GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
907         {
908           GNUNET_free (pos);
909         }
910       else
911         {
912           pos->next = indexed_files;
913           indexed_files = pos;
914         }
915     }
916   if (GNUNET_OK != 
917       GNUNET_BIO_read_close (rh, &emsg))
918     GNUNET_free (emsg);
919   GNUNET_free (fn);
920 }
921
922
923 /**
924  * We've validated the hash of the file we're about to
925  * index.  Signal success to the client and update
926  * our internal data structures.
927  *
928  * @param ii the index info entry for the request
929  */
930 static void
931 signal_index_ok (struct IndexInfo *ii)
932 {
933   if (GNUNET_SYSERR ==
934       GNUNET_CONTAINER_multihashmap_put (ifm,
935                                          &ii->file_id,
936                                          (void*) ii->filename,
937                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
938     {
939       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
940                   _("Index request received for file `%s' is indexed as `%s'.  Permitting anyway.\n"),
941                   ii->filename,
942                   (const char*) GNUNET_CONTAINER_multihashmap_get (ifm,
943                                                                    &ii->file_id));
944       GNUNET_SERVER_transmit_context_append (ii->tc,
945                                              NULL, 0,
946                                              GNUNET_MESSAGE_TYPE_FS_INDEX_START_OK);
947       GNUNET_SERVER_transmit_context_run (ii->tc,
948                                           GNUNET_TIME_UNIT_MINUTES);
949       GNUNET_free (ii);
950       return;
951     }
952   ii->next = indexed_files;
953   indexed_files = ii;
954   write_index_list ();
955   GNUNET_SERVER_transmit_context_append (ii->tc,
956                                          NULL, 0,
957                                          GNUNET_MESSAGE_TYPE_FS_INDEX_START_OK);
958   GNUNET_SERVER_transmit_context_run (ii->tc,
959                                       GNUNET_TIME_UNIT_MINUTES);
960   ii->tc = NULL;
961 }
962
963
964 /**
965  * Function called once the hash computation over an
966  * indexed file has completed.
967  *
968  * @param cls closure, our publishing context
969  * @param res resulting hash, NULL on error
970  */
971 static void 
972 hash_for_index_val (void *cls,
973                     const GNUNET_HashCode *
974                     res)
975 {
976   struct IndexInfo *ii = cls;
977   
978   if ( (res == NULL) ||
979        (0 != memcmp (res,
980                      &ii->file_id,
981                      sizeof(GNUNET_HashCode))) )
982     {
983       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
984                   _("Hash mismatch trying to index file `%s'\n"),
985                   ii->filename);
986       GNUNET_SERVER_transmit_context_append (ii->tc,
987                                              NULL, 0,
988                                              GNUNET_MESSAGE_TYPE_FS_INDEX_START_FAILED);
989       GNUNET_SERVER_transmit_context_run (ii->tc,
990                                           GNUNET_TIME_UNIT_MINUTES);
991       GNUNET_free (ii);
992       return;
993     }
994   signal_index_ok (ii);
995 }
996
997
998 /**
999  * Handle INDEX_START-message.
1000  *
1001  * @param cls closure
1002  * @param client identification of the client
1003  * @param message the actual message
1004  */
1005 static void
1006 handle_index_start (void *cls,
1007                     struct GNUNET_SERVER_Client *client,
1008                     const struct GNUNET_MessageHeader *message)
1009 {
1010   const struct IndexStartMessage *ism;
1011   const char *fn;
1012   uint16_t msize;
1013   struct IndexInfo *ii;
1014   size_t slen;
1015   uint32_t dev;
1016   uint64_t ino;
1017   uint32_t mydev;
1018   uint64_t myino;
1019
1020   msize = ntohs(message->size);
1021   if ( (msize <= sizeof (struct IndexStartMessage)) ||
1022        ( ((const char *)message)[msize-1] != '\0') )
1023     {
1024       GNUNET_break (0);
1025       GNUNET_SERVER_receive_done (client,
1026                                   GNUNET_SYSERR);
1027       return;
1028     }
1029   ism = (const struct IndexStartMessage*) message;
1030   fn = (const char*) &ism[1];
1031   dev = ntohl (ism->device);
1032   ino = GNUNET_ntohll (ism->inode);
1033   ism = (const struct IndexStartMessage*) message;
1034   slen = strlen (fn) + 1;
1035   ii = GNUNET_malloc (sizeof (struct IndexInfo) + slen);
1036   ii->filename = (const char*) &ii[1];
1037   memcpy (&ii[1], fn, slen);
1038   ii->file_id = ism->file_id;  
1039   ii->tc = GNUNET_SERVER_transmit_context_create (client);
1040   if ( ( (dev != 0) ||
1041          (ino != 0) ) &&
1042        (GNUNET_OK == GNUNET_DISK_file_get_identifiers (fn,
1043                                                        &mydev,
1044                                                        &myino)) &&
1045        ( (dev == mydev) &&
1046          (ino == myino) ) )
1047     {      
1048       /* fast validation OK! */
1049       signal_index_ok (ii);
1050       return;
1051     }
1052   /* slow validation, need to hash full file (again) */
1053   GNUNET_CRYPTO_hash_file (sched,
1054                            GNUNET_SCHEDULER_PRIORITY_IDLE,
1055                            GNUNET_NO,
1056                            fn,
1057                            HASHING_BLOCKSIZE,
1058                            &hash_for_index_val,
1059                            ii);
1060 }
1061
1062
1063 /**
1064  * Handle INDEX_LIST_GET-message.
1065  *
1066  * @param cls closure
1067  * @param client identification of the client
1068  * @param message the actual message
1069  */
1070 static void
1071 handle_index_list_get (void *cls,
1072                        struct GNUNET_SERVER_Client *client,
1073                        const struct GNUNET_MessageHeader *message)
1074 {
1075   struct GNUNET_SERVER_TransmitContext *tc;
1076   struct IndexInfoMessage *iim;
1077   char buf[GNUNET_SERVER_MAX_MESSAGE_SIZE];
1078   size_t slen;
1079   const char *fn;
1080   struct GNUNET_MessageHeader *msg;
1081   struct IndexInfo *pos;
1082
1083   tc = GNUNET_SERVER_transmit_context_create (client);
1084   iim = (struct IndexInfoMessage*) buf;
1085   msg = &iim->header;
1086   pos = indexed_files;
1087   while (NULL != pos)
1088     {
1089       iim->reserved = 0;
1090       iim->file_id = pos->file_id;
1091       fn = pos->filename;
1092       slen = strlen (fn) + 1;
1093       if (slen + sizeof (struct IndexInfoMessage) > 
1094           GNUNET_SERVER_MAX_MESSAGE_SIZE)
1095         {
1096           GNUNET_break (0);
1097           break;
1098         }
1099       memcpy (&iim[1], fn, slen);
1100       GNUNET_SERVER_transmit_context_append
1101         (tc,
1102          &msg[1],
1103          sizeof (struct IndexInfoMessage) 
1104          - sizeof (struct GNUNET_MessageHeader) + slen,
1105          GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_ENTRY);
1106       pos = pos->next;
1107     }
1108   GNUNET_SERVER_transmit_context_append (tc,
1109                                          NULL, 0,
1110                                          GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_END);
1111   GNUNET_SERVER_transmit_context_run (tc,
1112                                       GNUNET_TIME_UNIT_MINUTES);
1113 }
1114
1115
1116 /**
1117  * Handle UNINDEX-message.
1118  *
1119  * @param cls closure
1120  * @param client identification of the client
1121  * @param message the actual message
1122  */
1123 static void
1124 handle_unindex (void *cls,
1125                 struct GNUNET_SERVER_Client *client,
1126                 const struct GNUNET_MessageHeader *message)
1127 {
1128   const struct UnindexMessage *um;
1129   struct IndexInfo *pos;
1130   struct IndexInfo *prev;
1131   struct IndexInfo *next;
1132   struct GNUNET_SERVER_TransmitContext *tc;
1133   int found;
1134   
1135   um = (const struct UnindexMessage*) message;
1136   found = GNUNET_NO;
1137   prev = NULL;
1138   pos = indexed_files;
1139   while (NULL != pos)
1140     {
1141       next = pos->next;
1142       if (0 == memcmp (&pos->file_id,
1143                        &um->file_id,
1144                        sizeof (GNUNET_HashCode)))
1145         {
1146           if (prev == NULL)
1147             indexed_files = pos->next;
1148           else
1149             prev->next = pos->next;
1150           GNUNET_free (pos);
1151           found = GNUNET_YES;
1152         }
1153       else
1154         {
1155           prev = pos;
1156         }
1157       pos = next;
1158     }
1159   if (GNUNET_YES == found)
1160     write_index_list ();
1161   tc = GNUNET_SERVER_transmit_context_create (client);
1162   GNUNET_SERVER_transmit_context_append (tc,
1163                                          NULL, 0,
1164                                          GNUNET_MESSAGE_TYPE_FS_UNINDEX_OK);
1165   GNUNET_SERVER_transmit_context_run (tc,
1166                                       GNUNET_TIME_UNIT_MINUTES);
1167 }
1168
1169
1170 /**
1171  * Run the next DS request in our
1172  * queue, we're done with the current one.
1173  */
1174 static void
1175 next_ds_request ()
1176 {
1177   struct DatastoreRequestQueue *e;
1178   
1179   while (NULL != (e = drq_head))
1180     {
1181       if (0 != GNUNET_TIME_absolute_get_remaining (e->timeout).value)
1182         break;
1183       if (e->task != GNUNET_SCHEDULER_NO_TASK)
1184         GNUNET_SCHEDULER_cancel (sched, e->task);
1185       GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
1186       e->req (e->req_cls, GNUNET_NO);
1187       GNUNET_free (e);  
1188     }
1189   if (e == NULL)
1190     return;
1191   if (e->task != GNUNET_SCHEDULER_NO_TASK)
1192     GNUNET_SCHEDULER_cancel (sched, e->task);
1193   e->task = GNUNET_SCHEDULER_NO_TASK;
1194   e->req (e->req_cls, GNUNET_YES);
1195   GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
1196   GNUNET_free (e);  
1197 }
1198
1199
1200 /**
1201  * A datastore request had to be timed out. 
1202  *
1203  * @param cls closure (of type "struct DatastoreRequestQueue*")
1204  * @param tc task context, unused
1205  */
1206 static void
1207 timeout_ds_request (void *cls,
1208                     const struct GNUNET_SCHEDULER_TaskContext *tc)
1209 {
1210   struct DatastoreRequestQueue *e = cls;
1211
1212   e->task = GNUNET_SCHEDULER_NO_TASK;
1213   GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
1214   e->req (e->req_cls, GNUNET_NO);
1215   GNUNET_free (e);  
1216 }
1217
1218
1219 /**
1220  * Queue a request for the datastore.
1221  *
1222  * @param deadline by when the request should run
1223  * @param fun function to call once the request can be run
1224  * @param fun_cls closure for fun
1225  */
1226 static struct DatastoreRequestQueue *
1227 queue_ds_request (struct GNUNET_TIME_Relative deadline,
1228                   RequestFunction fun,
1229                   void *fun_cls)
1230 {
1231   struct DatastoreRequestQueue *e;
1232   struct DatastoreRequestQueue *bef;
1233
1234   if (drq_head == NULL)
1235     {
1236       /* no other requests pending, run immediately */
1237       fun (fun_cls, GNUNET_OK);
1238       return NULL;
1239     }
1240   e = GNUNET_malloc (sizeof (struct DatastoreRequestQueue));
1241   e->timeout = GNUNET_TIME_relative_to_absolute (deadline);
1242   e->req = fun;
1243   e->req_cls = fun_cls;
1244   if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
1245     {
1246       /* local request, highest prio, put at head of queue
1247          regardless of deadline */
1248       bef = NULL;
1249     }
1250   else
1251     {
1252       bef = drq_tail;
1253       while ( (NULL != bef) &&
1254               (e->timeout.value < bef->timeout.value) )
1255         bef = bef->prev;
1256     }
1257   GNUNET_CONTAINER_DLL_insert_after (drq_head, drq_tail, bef, e);
1258   if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
1259     return e;
1260   e->task = GNUNET_SCHEDULER_add_delayed (sched,
1261                                           GNUNET_NO,
1262                                           GNUNET_SCHEDULER_PRIORITY_BACKGROUND,
1263                                           GNUNET_SCHEDULER_NO_TASK,
1264                                           deadline,
1265                                           &timeout_ds_request,
1266                                           e);
1267   return e;                                    
1268 }
1269
1270
1271 /**
1272  * Free the state associated with a local get context.
1273  *
1274  * @param lgc the lgc to free
1275  */
1276 static void
1277 local_get_context_free (struct LocalGetContext *lgc) 
1278 {
1279   GNUNET_CONTAINER_DLL_remove (lgc_head, lgc_tail, lgc);
1280   GNUNET_SERVER_client_drop (lgc->client); 
1281   GNUNET_free_non_null (lgc->results);
1282   if (lgc->results_bf != NULL)
1283     GNUNET_CONTAINER_bloomfilter_free (lgc->results_bf);
1284   if (lgc->req != NULL)
1285     {
1286       if (lgc->req->task != GNUNET_SCHEDULER_NO_TASK)
1287         GNUNET_SCHEDULER_cancel (sched, lgc->req->task);
1288       GNUNET_CONTAINER_DLL_remove (lgc_head, lgc_tail, lgc);
1289       GNUNET_free (lgc->req);
1290     }
1291   GNUNET_free (lgc);
1292 }
1293
1294
1295 /**
1296  * We're able to transmit the next (local) result to the client.
1297  * Do it and ask the datastore for more.  Or, on error, tell
1298  * the datastore to stop giving us more.
1299  *
1300  * @param cls our closure (struct LocalGetContext)
1301  * @param max maximum number of bytes we can transmit
1302  * @param buf where to copy our message
1303  * @return number of bytes copied to buf
1304  */
1305 static size_t
1306 transmit_local_result (void *cls,
1307                        size_t max,
1308                        void *buf)
1309 {
1310   struct LocalGetContext *lgc = cls;  
1311   uint16_t msize;
1312
1313   if (NULL == buf)
1314     {
1315 #if DEBUG_FS
1316       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1317                   "Failed to transmit result to local client, aborting datastore iteration.\n");
1318 #endif
1319       /* error, abort! */
1320       GNUNET_free (lgc->result);
1321       lgc->result = NULL;
1322       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
1323       return 0;
1324     }
1325   msize = ntohs (lgc->result->header.size);
1326 #if DEBUG_FS
1327   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1328               "Transmitting %u bytes of result to local client.\n",
1329               msize);
1330 #endif
1331   GNUNET_assert (max >= msize);
1332   memcpy (buf, lgc->result, msize);
1333   GNUNET_free (lgc->result);
1334   lgc->result = NULL;
1335   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1336   return msize;
1337 }
1338
1339
1340 /**
1341  * Continuation called from datastore's remove
1342  * function.
1343  *
1344  * @param cls unused
1345  * @param success did the deletion work?
1346  * @param msg error message
1347  */
1348 static void
1349 remove_cont (void *cls,
1350              int success,
1351              const char *msg)
1352 {
1353   if (GNUNET_OK != success)
1354     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1355                 _("Failed to delete bogus block: %s\n"),
1356                 msg);
1357   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1358 }
1359
1360
1361 /**
1362  * Mingle hash with the mingle_number to
1363  * produce different bits.
1364  */
1365 static void
1366 mingle_hash (const GNUNET_HashCode * in,
1367              int32_t mingle_number, 
1368              GNUNET_HashCode * hc)
1369 {
1370   GNUNET_HashCode m;
1371
1372   GNUNET_CRYPTO_hash (&mingle_number, 
1373                       sizeof (int32_t), 
1374                       &m);
1375   GNUNET_CRYPTO_hash_xor (&m, in, hc);
1376 }
1377
1378
1379 /**
1380  * We've received an on-demand encoded block
1381  * from the datastore.  Attempt to do on-demand
1382  * encoding and (if successful), call the 
1383  * continuation with the resulting block.  On
1384  * error, clean up and ask the datastore for
1385  * more results.
1386  *
1387  * @param key key for the content
1388  * @param size number of bytes in data
1389  * @param data content stored
1390  * @param type type of the content
1391  * @param priority priority of the content
1392  * @param anonymity anonymity-level for the content
1393  * @param expiration expiration time for the content
1394  * @param uid unique identifier for the datum;
1395  *        maybe 0 if no unique identifier is available
1396  * @param cont function to call with the actual block
1397  * @param cont_cls closure for cont
1398  */
1399 static void
1400 handle_on_demand_block (const GNUNET_HashCode * key,
1401                         uint32_t size,
1402                         const void *data,
1403                         uint32_t type,
1404                         uint32_t priority,
1405                         uint32_t anonymity,
1406                         struct GNUNET_TIME_Absolute
1407                         expiration, uint64_t uid,
1408                         GNUNET_DATASTORE_Iterator cont,
1409                         void *cont_cls)
1410 {
1411   const struct OnDemandBlock *odb;
1412   GNUNET_HashCode nkey;
1413   struct GNUNET_CRYPTO_AesSessionKey skey;
1414   struct GNUNET_CRYPTO_AesInitializationVector iv;
1415   GNUNET_HashCode query;
1416   ssize_t nsize;
1417   char ndata[DBLOCK_SIZE];
1418   char edata[DBLOCK_SIZE];
1419   const char *fn;
1420   struct GNUNET_DISK_FileHandle *fh;
1421   uint64_t off;
1422
1423   if (size != sizeof (struct OnDemandBlock))
1424     {
1425       GNUNET_break (0);
1426       GNUNET_DATASTORE_remove (dsh, 
1427                                key,
1428                                size,
1429                                data,
1430                                &remove_cont,
1431                                NULL,
1432                                GNUNET_TIME_UNIT_FOREVER_REL);     
1433       return;
1434     }
1435   odb = (const struct OnDemandBlock*) data;
1436   off = GNUNET_ntohll (odb->offset);
1437   fn = (const char*) GNUNET_CONTAINER_multihashmap_get (ifm,
1438                                                         &odb->file_id);
1439   fh = NULL;
1440   if ( (NULL == fn) ||
1441        (NULL == (fh = GNUNET_DISK_file_open (fn, 
1442                                              GNUNET_DISK_OPEN_READ,
1443                                              GNUNET_DISK_PERM_NONE))) ||
1444        (off !=
1445         GNUNET_DISK_file_seek (fh,
1446                                off,
1447                                GNUNET_DISK_SEEK_SET)) ||
1448        (-1 ==
1449         (nsize = GNUNET_DISK_file_read (fh,
1450                                         ndata,
1451                                         sizeof (ndata)))) )
1452     {
1453       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1454                   _("Could not access indexed file `%s' at offset %llu: %s\n"),
1455                   GNUNET_h2s (&odb->file_id),
1456                   (unsigned long long) off,
1457                   STRERROR (errno));
1458       if (fh != NULL)
1459         GNUNET_DISK_file_close (fh);
1460       /* FIXME: if this happens often, we need
1461          to remove the OnDemand block from the DS! */
1462       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);        
1463       return;
1464     }
1465   GNUNET_DISK_file_close (fh);
1466   GNUNET_CRYPTO_hash (ndata,
1467                       nsize,
1468                       &nkey);
1469   GNUNET_CRYPTO_hash_to_aes_key (&nkey, &skey, &iv);
1470   GNUNET_CRYPTO_aes_encrypt (ndata,
1471                              nsize,
1472                              &skey,
1473                              &iv,
1474                              edata);
1475   GNUNET_CRYPTO_hash (edata,
1476                       nsize,
1477                       &query);
1478   if (0 != memcmp (&query, 
1479                    key,
1480                    sizeof (GNUNET_HashCode)))
1481     {
1482       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1483                   _("Indexed file `%s' changed at offset %llu\n"),
1484                   fn,
1485                   (unsigned long long) off);
1486       /* FIXME: if this happens often, we need
1487          to remove the OnDemand block from the DS! */
1488       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1489       return;
1490     }
1491   cont (cont_cls,
1492         key,
1493         nsize,
1494         edata,
1495         GNUNET_DATASTORE_BLOCKTYPE_DBLOCK,
1496         priority,
1497         anonymity,
1498         expiration,
1499         uid);
1500 }
1501
1502
1503 /**
1504  * How many bytes should a bloomfilter be if we have already seen
1505  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
1506  * of bits set per entry.  Furthermore, we should not re-size the
1507  * filter too often (to keep it cheap).
1508  *
1509  * Since other peers will also add entries but not resize the filter,
1510  * we should generally pick a slightly larger size than what the
1511  * strict math would suggest.
1512  *
1513  * @return must be a power of two and smaller or equal to 2^15.
1514  */
1515 static size_t
1516 compute_bloomfilter_size (unsigned int entry_count)
1517 {
1518   size_t size;
1519   unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
1520   uint16_t max = 1 << 15;
1521
1522   if (entry_count > max)
1523     return max;
1524   size = 8;
1525   while ((size < max) && (size < ideal))
1526     size *= 2;
1527   if (size > max)
1528     return max;
1529   return size;
1530 }
1531
1532
1533 /**
1534  * Recalculate our bloom filter for filtering replies.
1535  *
1536  * @param count number of entries we are filtering right now
1537  * @param mingle set to our new mingling value
1538  * @param bf_size set to the size of the bloomfilter
1539  * @param entries the entries to filter
1540  * @return updated bloomfilter, NULL for none
1541  */
1542 static struct GNUNET_CONTAINER_BloomFilter *
1543 refresh_bloomfilter (unsigned int count,
1544                      int32_t * mingle,
1545                      size_t *bf_size,
1546                      const GNUNET_HashCode *entries)
1547 {
1548   struct GNUNET_CONTAINER_BloomFilter *bf;
1549   size_t nsize;
1550   unsigned int i;
1551   GNUNET_HashCode mhash;
1552
1553   if (0 == count)
1554     return NULL;
1555   nsize = compute_bloomfilter_size (count);
1556   *mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
1557   *bf_size = nsize;
1558   bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
1559                                           nsize,
1560                                           BLOOMFILTER_K);
1561   for (i=0;i<count;i++)
1562     {
1563       mingle_hash (&entries[i], *mingle, &mhash);
1564       GNUNET_CONTAINER_bloomfilter_add (bf, &mhash);
1565     }
1566   return bf;
1567 }
1568
1569
1570 /**
1571  * Closure used for "target_peer_select_cb".
1572  */
1573 struct PeerSelectionContext 
1574 {
1575   /**
1576    * The request for which we are selecting
1577    * peers.
1578    */
1579   struct PendingRequest *pr;
1580
1581   /**
1582    * Current "prime" target.
1583    */
1584   struct GNUNET_PeerIdentity target;
1585
1586   /**
1587    * How much do we like this target?
1588    */
1589   double target_score;
1590
1591 };
1592
1593
1594 /**
1595  * Function called for each connected peer to determine
1596  * which one(s) would make good targets for forwarding.
1597  *
1598  * @param cls closure (struct PeerSelectionContext)
1599  * @param key current key code (peer identity)
1600  * @param value value in the hash map (struct ConnectedPeer)
1601  * @return GNUNET_YES if we should continue to
1602  *         iterate,
1603  *         GNUNET_NO if not.
1604  */
1605 static int
1606 target_peer_select_cb (void *cls,
1607                        const GNUNET_HashCode * key,
1608                        void *value)
1609 {
1610   struct PeerSelectionContext *psc = cls;
1611   struct ConnectedPeer *cp = value;
1612   struct PendingRequest *pr = psc->pr;
1613   double score;
1614   unsigned int i;
1615
1616   /* 1) check if we have already (recently) forwarded to this peer */
1617   for (i=0;i<pr->used_pids_off;i++)
1618     if (pr->used_pids[i] == cp->pid)
1619       return GNUNET_YES; /* skip */
1620   // 2) calculate how much we'd like to forward to this peer
1621   score = 0; // FIXME!
1622   
1623   /* store best-fit in closure */
1624   if (score > psc->target_score)
1625     {
1626       psc->target_score = score;
1627       psc->target.hashPubKey = *key; 
1628     }
1629   return GNUNET_YES;
1630 }
1631
1632
1633 /**
1634  * We use a random delay to make the timing of requests
1635  * less predictable.  This function returns such a random
1636  * delay.
1637  *
1638  * @return random delay to use for some request, between 0 and TTL_DECREMENT ms
1639  */
1640 static struct GNUNET_TIME_Relative
1641 get_processing_delay ()
1642 {
1643   return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
1644                                         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1645                                                                   TTL_DECREMENT));
1646 }
1647
1648
1649 /**
1650  * Task that is run for each request with the
1651  * goal of forwarding the associated query to
1652  * other peers.  The task should re-schedule
1653  * itself to be re-run once the TTL has expired.
1654  * (or at a later time if more peers should
1655  * be queried earlier).
1656  *
1657  * @param cls the requests "struct PendingRequest*"
1658  * @param tc task context (unused)
1659  */
1660 static void
1661 forward_request_task (void *cls,
1662                       const struct GNUNET_SCHEDULER_TaskContext *tc);
1663
1664
1665 /**
1666  * We've selected a peer for forwarding of a query.
1667  * Construct the message and then re-schedule the
1668  * task to forward again to (other) peers.
1669  *
1670  * @param cls closure
1671  * @param size number of bytes available in buf
1672  * @param buf where the callee should write the message
1673  * @return number of bytes written to buf
1674  */
1675 static size_t
1676 transmit_request_cb (void *cls,
1677                      size_t size, 
1678                      void *buf)
1679 {
1680   struct PendingRequest *pr = cls;
1681   struct GetMessage *gm;
1682   GNUNET_HashCode *ext;
1683   char *bfdata;
1684   size_t msize;
1685   unsigned int k;
1686
1687   pr->cth = NULL;
1688   /* (1) check for timeout */
1689   if (NULL == buf)
1690     {
1691       /* timeout, try another peer immediately again */
1692       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1693                                                GNUNET_NO,
1694                                                GNUNET_SCHEDULER_PRIORITY_IDLE,
1695                                                GNUNET_SCHEDULER_NO_TASK,
1696                                                GNUNET_TIME_UNIT_ZERO,
1697                                                &forward_request_task,
1698                                                pr);
1699       return 0;
1700     }
1701   /* (2) build query message */
1702   k = 0; // FIXME: count hash codes!
1703   msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
1704   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1705   gm = (struct GetMessage*) buf;
1706   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
1707   gm->header.size = htons (msize);
1708   gm->type = htonl (pr->type);
1709   pr->remaining_priority /= 2;
1710   gm->priority = htonl (pr->remaining_priority);
1711   gm->ttl = htonl (pr->ttl);
1712   gm->filter_mutator = htonl(pr->mingle);
1713   gm->hash_bitmap = htonl (42);
1714   gm->query = pr->query;
1715   ext = (GNUNET_HashCode*) &gm[1];
1716   // FIXME: setup "ext[0]..[k-1]"
1717   bfdata = (char *) &ext[k];
1718   if (pr->bf != NULL)
1719     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
1720                                                bfdata,
1721                                                pr->bf_size);
1722   
1723   /* (3) schedule job to do it again (or another peer, etc.) */
1724   pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1725                                            GNUNET_NO,
1726                                            GNUNET_SCHEDULER_PRIORITY_IDLE,
1727                                            GNUNET_SCHEDULER_NO_TASK,
1728                                            get_processing_delay (), // FIXME!
1729                                            &forward_request_task,
1730                                            pr);
1731
1732   return msize;
1733 }
1734
1735
1736 /**
1737  * Function called after we've tried to reserve
1738  * a certain amount of bandwidth for a reply.
1739  * Check if we succeeded and if so send our query.
1740  *
1741  * @param cls the requests "struct PendingRequest*"
1742  * @param peer identifies the peer
1743  * @param latency current latency estimate, "FOREVER" if we have been
1744  *                disconnected
1745  * @param bpm_in set to the current bandwidth limit (receiving) for this peer
1746  * @param bpm_out set to the current bandwidth limit (sending) for this peer
1747  * @param amount set to the amount that was actually reserved or unreserved
1748  * @param preference current traffic preference for the given peer
1749  */
1750 static void
1751 target_reservation_cb (void *cls,
1752                        const struct
1753                        GNUNET_PeerIdentity * peer,
1754                        unsigned int bpm_in,
1755                        unsigned int bpm_out,
1756                        struct GNUNET_TIME_Relative
1757                        latency, int amount,
1758                        unsigned long long preference)
1759 {
1760   struct PendingRequest *pr = cls;
1761   uint32_t priority;
1762   uint16_t size;
1763   struct GNUNET_TIME_Relative maxdelay;
1764
1765   GNUNET_assert (peer != NULL);
1766   if ( (amount != DBLOCK_SIZE) ||
1767        (pr->cth != NULL) )
1768     {
1769       /* try again later; FIXME: we may need to un-reserve "amount"? */
1770       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1771                                                GNUNET_NO,
1772                                                GNUNET_SCHEDULER_PRIORITY_IDLE,
1773                                                GNUNET_SCHEDULER_NO_TASK,
1774                                                get_processing_delay (), // FIXME: longer?
1775                                                &forward_request_task,
1776                                                pr);
1777       return;
1778     }
1779   // (2) transmit, update ttl/priority
1780   // FIXME: calculate priority, maxdelay, size properly!
1781   priority = 0;
1782   size = 60000;
1783   maxdelay = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
1784   pr->cth = GNUNET_CORE_notify_transmit_ready (core,
1785                                                priority,
1786                                                maxdelay,
1787                                                peer,
1788                                                size,
1789                                                &transmit_request_cb,
1790                                                pr);
1791   if (pr->cth == NULL)
1792     {
1793       /* try again later */
1794       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1795                                                GNUNET_NO,
1796                                                GNUNET_SCHEDULER_PRIORITY_IDLE,
1797                                                GNUNET_SCHEDULER_NO_TASK,
1798                                                get_processing_delay (), // FIXME: longer?
1799                                                &forward_request_task,
1800                                                pr);
1801     }
1802 }
1803
1804
1805 /**
1806  * Task that is run for each request with the
1807  * goal of forwarding the associated query to
1808  * other peers.  The task should re-schedule
1809  * itself to be re-run once the TTL has expired.
1810  * (or at a later time if more peers should
1811  * be queried earlier).
1812  *
1813  * @param cls the requests "struct PendingRequest*"
1814  * @param tc task context (unused)
1815  */
1816 static void
1817 forward_request_task (void *cls,
1818                       const struct GNUNET_SCHEDULER_TaskContext *tc)
1819 {
1820   struct PendingRequest *pr = cls;
1821   struct PeerSelectionContext psc;
1822
1823   pr->task = GNUNET_SCHEDULER_NO_TASK;
1824   if (pr->cth != NULL) 
1825     {
1826       /* we're busy transmitting a result, wait a bit */
1827       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1828                                                GNUNET_NO,
1829                                                GNUNET_SCHEDULER_PRIORITY_IDLE,
1830                                                GNUNET_SCHEDULER_NO_TASK,
1831                                                get_processing_delay (), 
1832                                                &forward_request_task,
1833                                                pr);
1834       return;
1835     }
1836   /* (1) select target */
1837   psc.pr = pr;
1838   psc.target_score = DBL_MIN;
1839   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1840                                          &target_peer_select_cb,
1841                                          &psc);
1842   if (psc.target_score == DBL_MIN)
1843     {
1844       /* no possible target found, wait some time */
1845       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1846                                                GNUNET_NO,
1847                                                GNUNET_SCHEDULER_PRIORITY_IDLE,
1848                                                GNUNET_SCHEDULER_NO_TASK,
1849                                                get_processing_delay (), // FIXME: exponential back-off? or at least wait longer...
1850                                                &forward_request_task,
1851                                                pr);
1852       return;
1853     }
1854   /* (2) reserve reply bandwidth */
1855   // FIXME: need a way to cancel; this
1856   // async operation is problematic (segv-problematic)
1857   // if "pr" is destroyed while it happens!
1858   GNUNET_CORE_peer_configure (core,
1859                               &psc.target,
1860                               GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
1861                               -1,
1862                               DBLOCK_SIZE, // FIXME: make dependent on type?
1863                               0,
1864                               &target_reservation_cb,
1865                               pr);
1866 }
1867
1868
1869 /**
1870  * We're processing (local) results for a search request
1871  * from a (local) client.  Pass applicable results to the
1872  * client and if we are done either clean up (operation
1873  * complete) or switch to P2P search (more results possible).
1874  *
1875  * @param cls our closure (struct LocalGetContext)
1876  * @param key key for the content
1877  * @param size number of bytes in data
1878  * @param data content stored
1879  * @param type type of the content
1880  * @param priority priority of the content
1881  * @param anonymity anonymity-level for the content
1882  * @param expiration expiration time for the content
1883  * @param uid unique identifier for the datum;
1884  *        maybe 0 if no unique identifier is available
1885  */
1886 static void
1887 process_local_get_result (void *cls,
1888                           const GNUNET_HashCode * key,
1889                           uint32_t size,
1890                           const void *data,
1891                           uint32_t type,
1892                           uint32_t priority,
1893                           uint32_t anonymity,
1894                           struct GNUNET_TIME_Absolute
1895                           expiration, 
1896                           uint64_t uid)
1897 {
1898   struct LocalGetContext *lgc = cls;
1899   struct PendingRequest *pr;
1900   struct ClientRequestList *crl;
1901   struct ClientList *cl;
1902   size_t msize;
1903   unsigned int i;
1904
1905   if (key == NULL)
1906     {
1907 #if DEBUG_FS
1908       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1909                   "Received last result for `%s' from local datastore, deciding what to do next.\n",
1910                   GNUNET_h2s (&lgc->query));
1911 #endif
1912       /* no further results from datastore; continue
1913          processing further requests from the client and
1914          allow the next task to use the datastore; also,
1915          switch to P2P requests or clean up our state. */
1916       next_ds_request ();
1917       GNUNET_SERVER_receive_done (lgc->client,
1918                                   GNUNET_OK);
1919       if ( (lgc->results_used == 0) ||
1920            (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
1921            (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
1922            (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) )
1923         {
1924 #if DEBUG_FS
1925           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1926                       "Forwarding query for `%s' to network.\n",
1927                       GNUNET_h2s (&lgc->query));
1928 #endif
1929           cl = clients;
1930           while ( (NULL != cl) &&
1931                   (cl->client != lgc->client) )
1932             cl = cl->next;
1933           if (cl == NULL)
1934             {
1935               cl = GNUNET_malloc (sizeof (struct ClientList));
1936               cl->client = lgc->client;
1937               cl->next = clients;
1938               clients = cl;
1939             }
1940           crl = GNUNET_malloc (sizeof (struct ClientRequestList));
1941           crl->cl = cl;
1942           GNUNET_CONTAINER_DLL_insert (cl->head, cl->tail, crl);
1943           pr = GNUNET_malloc (sizeof (struct PendingRequest));
1944           pr->client = lgc->client;
1945           GNUNET_SERVER_client_keep (pr->client);
1946           pr->crl_entry = crl;
1947           crl->req = pr;
1948           if (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)
1949             {
1950               pr->namespace = GNUNET_malloc (sizeof (GNUNET_HashCode));
1951               *pr->namespace = lgc->namespace;
1952             }
1953           pr->replies_seen = lgc->results;
1954           lgc->results = NULL;
1955           pr->start_time = GNUNET_TIME_absolute_get ();
1956           pr->query = lgc->query;
1957           pr->target_pid = GNUNET_PEER_intern (&lgc->target);
1958           pr->replies_seen_off = lgc->results_used;
1959           pr->replies_seen_size = lgc->results_size;
1960           lgc->results_size = 0;
1961           pr->type = lgc->type;
1962           pr->anonymity_level = lgc->anonymity_level;
1963           pr->bf = refresh_bloomfilter (pr->replies_seen_off,
1964                                         &pr->mingle,
1965                                         &pr->bf_size,
1966                                         pr->replies_seen);
1967           GNUNET_CONTAINER_multihashmap_put (requests_by_query,
1968                                              &pr->query,
1969                                              pr,
1970                                              GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1971           pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1972                                                    GNUNET_NO,
1973                                                    GNUNET_SCHEDULER_PRIORITY_IDLE,
1974                                                    GNUNET_SCHEDULER_NO_TASK,
1975                                                    get_processing_delay (),
1976                                                    &forward_request_task,
1977                                                    pr);
1978           local_get_context_free (lgc);
1979           return;
1980         }
1981       /* got all possible results, clean up! */
1982 #if DEBUG_FS
1983       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1984                   "Found all possible results for query for `%s', done!\n",
1985                   GNUNET_h2s (&lgc->query));
1986 #endif
1987       local_get_context_free (lgc);
1988       return;
1989     }
1990   if (type == GNUNET_DATASTORE_BLOCKTYPE_ONDEMAND)
1991     {
1992 #if DEBUG_FS
1993       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1994                   "Received on-demand block for `%s' from local datastore, fetching data.\n",
1995                   GNUNET_h2s (&lgc->query));
1996 #endif
1997       handle_on_demand_block (key, size, data, type, priority, 
1998                               anonymity, expiration, uid,
1999                               &process_local_get_result,
2000                               lgc);
2001       return;
2002     }
2003   if (type != lgc->type)
2004     {
2005       /* this should be virtually impossible to reach (DBLOCK 
2006          query hash being identical to KBLOCK/SBLOCK query hash);
2007          nevertheless, if it happens, the correct thing is to
2008          simply skip the result. */
2009 #if DEBUG_FS
2010       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2011                   "Received block of unexpected type for `%s' from local datastore, ignoring.\n",
2012                   GNUNET_h2s (&lgc->query));
2013 #endif
2014       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);        
2015       return;
2016     }
2017   /* check if this is a result we've alredy
2018      received */
2019   for (i=0;i<lgc->results_used;i++)
2020     if (0 == memcmp (key,
2021                      &lgc->results[i],
2022                      sizeof (GNUNET_HashCode)))
2023       {
2024 #if DEBUG_FS
2025         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2026                     "Received duplicate result for `%s' from local datastore, ignoring.\n",
2027                     GNUNET_h2s (&lgc->query));
2028 #endif
2029         GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2030         return; 
2031       }
2032   if (lgc->results_used == lgc->results_size)
2033     GNUNET_array_grow (lgc->results,
2034                        lgc->results_size,
2035                        lgc->results_size * 2 + 2);
2036   GNUNET_CRYPTO_hash (data, 
2037                       size, 
2038                       &lgc->results[lgc->results_used++]);    
2039   msize = size + sizeof (struct ContentMessage);
2040   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
2041   lgc->result = GNUNET_malloc (msize);
2042   lgc->result->header.size = htons (msize);
2043   lgc->result->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT);
2044   lgc->result->type = htonl (type);
2045   lgc->result->expiration = GNUNET_TIME_absolute_hton (expiration);
2046   memcpy (&lgc->result[1],
2047           data,
2048           size);
2049 #if DEBUG_FS
2050   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2051               "Received new result for `%s' from local datastore, passing to client.\n",
2052               GNUNET_h2s (&lgc->query));
2053 #endif
2054   GNUNET_SERVER_notify_transmit_ready (lgc->client,
2055                                        msize,
2056                                        GNUNET_TIME_UNIT_FOREVER_REL,
2057                                        &transmit_local_result,
2058                                        lgc);
2059 }
2060
2061
2062 /**
2063  * We're processing a search request from a local
2064  * client.  Now it is our turn to query the datastore.
2065  * 
2066  * @param cls our closure (struct LocalGetContext)
2067  * @param tc unused
2068  */
2069 static void
2070 transmit_local_get (void *cls,
2071                     const struct GNUNET_SCHEDULER_TaskContext *tc)
2072 {
2073   struct LocalGetContext *lgc = cls;
2074   uint32_t type;
2075   
2076   type = lgc->type;
2077   if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK)
2078     type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */
2079   GNUNET_DATASTORE_get (dsh,
2080                         &lgc->query,
2081                         type,
2082                         &process_local_get_result,
2083                         lgc,
2084                         GNUNET_TIME_UNIT_FOREVER_REL);
2085 }
2086
2087
2088 /**
2089  * We're processing a search request from a local
2090  * client.  Now it is our turn to query the datastore.
2091  * 
2092  * @param cls our closure (struct LocalGetContext)
2093  * @param ok did we succeed to queue for datastore access, should always be GNUNET_OK
2094  */
2095 static void 
2096 transmit_local_get_ready (void *cls,
2097                           int ok)
2098 {
2099   struct LocalGetContext *lgc = cls;
2100
2101   GNUNET_assert (GNUNET_OK == ok);
2102   GNUNET_SCHEDULER_add_continuation (sched,
2103                                      GNUNET_NO,
2104                                      &transmit_local_get,
2105                                      lgc,
2106                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
2107 }
2108
2109
2110 /**
2111  * Handle START_SEARCH-message (search request from client).
2112  *
2113  * @param cls closure
2114  * @param client identification of the client
2115  * @param message the actual message
2116  */
2117 static void
2118 handle_start_search (void *cls,
2119                      struct GNUNET_SERVER_Client *client,
2120                      const struct GNUNET_MessageHeader *message)
2121 {
2122   const struct SearchMessage *sm;
2123   struct LocalGetContext *lgc;
2124   uint16_t msize;
2125   unsigned int sc;
2126   
2127   msize = ntohs (message->size);
2128   if ( (msize < sizeof (struct SearchMessage)) ||
2129        (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
2130     {
2131       GNUNET_break (0);
2132       GNUNET_SERVER_receive_done (client,
2133                                   GNUNET_SYSERR);
2134       return;
2135     }
2136   sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
2137   sm = (const struct SearchMessage*) message;
2138   GNUNET_SERVER_client_keep (client);
2139   lgc = GNUNET_malloc (sizeof (struct LocalGetContext));
2140   if  (sc > 0)
2141     {
2142       lgc->results_used = sc;
2143       GNUNET_array_grow (lgc->results,
2144                          lgc->results_size,
2145                          sc * 2);
2146       memcpy (lgc->results,
2147               &sm[1],
2148               sc * sizeof (GNUNET_HashCode));
2149     }
2150   lgc->client = client;
2151   lgc->type = ntohl (sm->type);
2152   lgc->anonymity_level = ntohl (sm->anonymity_level);
2153   switch (lgc->type)
2154     {
2155     case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
2156     case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
2157       lgc->target.hashPubKey = sm->target;
2158       break;
2159     case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
2160       lgc->namespace = sm->target;
2161       break;
2162     default:
2163       break;
2164     }
2165   lgc->query = sm->query;
2166   GNUNET_CONTAINER_DLL_insert (lgc_head, lgc_tail, lgc);
2167   lgc->req = queue_ds_request (GNUNET_TIME_UNIT_FOREVER_REL,
2168                                &transmit_local_get_ready,
2169                                lgc);
2170 }
2171
2172
2173 /**
2174  * List of handlers for the messages understood by this
2175  * service.
2176  */
2177 static struct GNUNET_SERVER_MessageHandler handlers[] = {
2178   {&handle_index_start, NULL, 
2179    GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
2180   {&handle_index_list_get, NULL, 
2181    GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
2182   {&handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
2183    sizeof (struct UnindexMessage) },
2184   {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
2185    0 },
2186   {NULL, NULL, 0, 0}
2187 };
2188
2189
2190 /**
2191  * Clean up the memory used by the PendingRequest structure (except
2192  * for the client or peer list that the request may be part of).
2193  *
2194  * @param pr request to clean up
2195  */
2196 static void
2197 destroy_pending_request (struct PendingRequest *pr)
2198 {
2199   struct PendingReply *reply;
2200   struct ClientList *cl;
2201
2202   GNUNET_CONTAINER_multihashmap_remove (requests_by_query,
2203                                         &pr->query,
2204                                         pr);
2205   // FIXME: not sure how this can work (efficiently)
2206   // also, what does the return value mean?
2207   if (pr->client == NULL)
2208     {
2209       GNUNET_CONTAINER_heap_remove_node (requests_by_expiration,
2210                                          pr);
2211     }
2212   else
2213     {
2214       cl = pr->crl_entry->cl;
2215       GNUNET_CONTAINER_DLL_remove (cl->head,
2216                                    cl->tail,
2217                                    pr->crl_entry);
2218     }
2219   if (GNUNET_SCHEDULER_NO_TASK != pr->task)
2220     GNUNET_SCHEDULER_cancel (sched, pr->task);
2221   if (NULL != pr->cth)
2222     GNUNET_CORE_notify_transmit_ready_cancel (pr->cth);
2223   if (NULL != pr->bf)
2224     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
2225   if (NULL != pr->th)
2226     GNUNET_CONNECTION_notify_transmit_ready_cancel (pr->th);
2227   while (NULL != (reply = pr->replies_pending))
2228     {
2229       pr->replies_pending = reply->next;
2230       GNUNET_free (reply);
2231     }
2232   GNUNET_PEER_change_rc (pr->source_pid, -1);
2233   GNUNET_PEER_change_rc (pr->target_pid, -1);
2234   GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
2235   GNUNET_free_non_null (pr->used_pids);
2236   GNUNET_free_non_null (pr->replies_seen);
2237   GNUNET_free_non_null (pr->namespace);
2238   GNUNET_free (pr);
2239 }
2240
2241
2242 /**
2243  * A client disconnected.  Remove all of its pending queries.
2244  *
2245  * @param cls closure, NULL
2246  * @param client identification of the client
2247  */
2248 static void
2249 handle_client_disconnect (void *cls,
2250                           struct GNUNET_SERVER_Client
2251                           * client)
2252 {
2253   struct LocalGetContext *lgc;
2254   struct ClientList *cpos;
2255   struct ClientList *cprev;
2256   struct ClientRequestList *rl;
2257
2258   lgc = lgc_head;
2259   while ( (NULL != lgc) &&
2260           (lgc->client != client) )
2261     lgc = lgc->next;
2262   if (lgc != NULL)
2263     local_get_context_free (lgc);
2264   cprev = NULL;
2265   cpos = clients;
2266   while ( (NULL != cpos) &&
2267           (clients->client != client) )
2268     {
2269       cprev = cpos;
2270       cpos = cpos->next;
2271     }
2272   if (cpos != NULL)
2273     {
2274       if (cprev == NULL)
2275         clients = cpos->next;
2276       else
2277         cprev->next = cpos->next;
2278       while (NULL != (rl = cpos->head))
2279         {
2280           cpos->head = rl->next;
2281           destroy_pending_request (rl->req);
2282           GNUNET_free (rl);
2283         }
2284       GNUNET_free (cpos);
2285     }
2286 }
2287
2288
2289 /**
2290  * Iterator over entries in the "requests_by_query" map
2291  * that frees all the entries.
2292  *
2293  * @param cls closure, NULL
2294  * @param key current key code (the query, unused) 
2295  * @param value value in the hash map, of type "struct PendingRequest*"
2296  * @return GNUNET_YES (we should continue to  iterate)
2297  */
2298 static int 
2299 destroy_pending_request_cb (void *cls,
2300                             const GNUNET_HashCode * key,
2301                             void *value)
2302 {
2303   struct PendingRequest *pr = value;
2304
2305   destroy_pending_request (pr);
2306   return GNUNET_YES;
2307 }
2308
2309
2310 /**
2311  * Task run during shutdown.
2312  *
2313  * @param cls unused
2314  * @param tc unused
2315  */
2316 static void
2317 shutdown_task (void *cls,
2318                const struct GNUNET_SCHEDULER_TaskContext *tc)
2319 {
2320   struct IndexInfo *pos;  
2321
2322   if (NULL != core)
2323     GNUNET_CORE_disconnect (core);
2324   GNUNET_DATASTORE_disconnect (dsh,
2325                                GNUNET_NO);
2326   dsh = NULL;
2327   GNUNET_CONTAINER_multihashmap_iterate (requests_by_query,
2328                                          &destroy_pending_request_cb,
2329                                          NULL);
2330   while (clients != NULL)
2331     handle_client_disconnect (NULL,
2332                               clients->client);
2333   GNUNET_CONTAINER_multihashmap_destroy (requests_by_query);
2334   requests_by_query = NULL;
2335   GNUNET_CONTAINER_multihashmap_destroy (requests_by_peer);
2336   requests_by_peer = NULL;
2337   GNUNET_CONTAINER_heap_destroy (requests_by_expiration);
2338   requests_by_expiration = NULL;
2339   // FIXME: iterate over entries and free individually?
2340   // (or do we get disconnect notifications?)
2341   GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
2342   connected_peers = NULL;
2343   GNUNET_CONTAINER_multihashmap_destroy (ifm);
2344   ifm = NULL;
2345   while (NULL != (pos = indexed_files))
2346     {
2347       indexed_files = pos->next;
2348       GNUNET_free (pos);
2349     }
2350 }
2351
2352
2353 /**
2354  * Free (each) request made by the peer.
2355  *
2356  * @param cls closure, points to peer that the request belongs to
2357  * @param key current key code
2358  * @param value value in the hash map
2359  * @return GNUNET_YES (we should continue to iterate)
2360  */
2361 static int
2362 destroy_request (void *cls,
2363                  const GNUNET_HashCode * key,
2364                  void *value)
2365 {
2366   const struct GNUNET_PeerIdentity * peer = cls;
2367   struct PendingRequest *pr = value;
2368   
2369   GNUNET_CONTAINER_multihashmap_remove (requests_by_peer,
2370                                         &peer->hashPubKey,
2371                                         pr);
2372   destroy_pending_request (pr);
2373   return GNUNET_YES;
2374 }
2375
2376
2377
2378 /**
2379  * Method called whenever a given peer connects.
2380  *
2381  * @param cls closure, not used
2382  * @param peer peer identity this notification is about
2383  */
2384 static void 
2385 peer_connect_handler (void *cls,
2386                       const struct
2387                       GNUNET_PeerIdentity * peer)
2388 {
2389   struct ConnectedPeer *cp;
2390
2391   cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
2392   cp->pid = GNUNET_PEER_intern (peer);
2393   GNUNET_CONTAINER_multihashmap_put (connected_peers,
2394                                      &peer->hashPubKey,
2395                                      cp,
2396                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
2397 }
2398
2399
2400 /**
2401  * Method called whenever a peer disconnects.
2402  *
2403  * @param cls closure, not used
2404  * @param peer peer identity this notification is about
2405  */
2406 static void
2407 peer_disconnect_handler (void *cls,
2408                          const struct
2409                          GNUNET_PeerIdentity * peer)
2410 {
2411   struct ConnectedPeer *cp;
2412
2413   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2414                                           &peer->hashPubKey);
2415   GNUNET_PEER_change_rc (cp->pid, -1);
2416   GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
2417   GNUNET_free (cp);
2418   GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_peer,
2419                                               &peer->hashPubKey,
2420                                               &destroy_request,
2421                                               (void*) peer);
2422 }
2423
2424
2425 /**
2426  * We're processing a GET request from
2427  * another peer and have decided to forward
2428  * it to other peers.
2429  *
2430  * @param cls our "struct ProcessGetContext *"
2431  * @param tc unused
2432  */
2433 static void
2434 forward_get_request (void *cls,
2435                      const struct GNUNET_SCHEDULER_TaskContext *tc)
2436 {
2437   struct ProcessGetContext *pgc = cls;
2438   struct PendingRequest *pr;
2439   struct PendingRequest *eer;
2440   struct GNUNET_PeerIdentity target;
2441
2442   pr = GNUNET_malloc (sizeof (struct PendingRequest));
2443   if (GET_MESSAGE_BIT_SKS_NAMESPACE == (GET_MESSAGE_BIT_SKS_NAMESPACE & pgc->bm))
2444     {
2445       pr->namespace = GNUNET_malloc (sizeof(GNUNET_HashCode));
2446       *pr->namespace = pgc->namespace;
2447     }
2448   pr->bf = pgc->bf;
2449   pr->bf_size = pgc->bf_size;
2450   pgc->bf = NULL;
2451   pr->start_time = pgc->start_time;
2452   pr->query = pgc->query;
2453   pr->source_pid = GNUNET_PEER_intern (&pgc->reply_to);
2454   if (GET_MESSAGE_BIT_TRANSMIT_TO == (GET_MESSAGE_BIT_TRANSMIT_TO & pgc->bm))
2455     pr->target_pid = GNUNET_PEER_intern (&pgc->prime_target);
2456   pr->anonymity_level = 1; /* default */
2457   pr->priority = pgc->priority;
2458   pr->remaining_priority = pr->priority;
2459   pr->mingle = pgc->mingle;
2460   pr->ttl = pgc->ttl; 
2461   pr->type = pgc->type;
2462   GNUNET_CONTAINER_multihashmap_put (requests_by_query,
2463                                      &pr->query,
2464                                      pr,
2465                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
2466   GNUNET_CONTAINER_multihashmap_put (requests_by_peer,
2467                                      &pgc->reply_to.hashPubKey,
2468                                      pr,
2469                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
2470   GNUNET_CONTAINER_heap_insert (requests_by_expiration,
2471                                 pr,
2472                                 pr->start_time.value + pr->ttl);
2473   if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) > max_pending_requests)
2474     {
2475       /* expire oldest request! */
2476       eer = GNUNET_CONTAINER_heap_peek (requests_by_expiration);
2477       GNUNET_PEER_resolve (eer->source_pid,
2478                            &target);    
2479       GNUNET_CONTAINER_multihashmap_remove (requests_by_peer,
2480                                             &target.hashPubKey,
2481                                             eer);
2482       destroy_pending_request (eer);     
2483     }
2484   pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2485                                            GNUNET_NO,
2486                                            GNUNET_SCHEDULER_PRIORITY_IDLE,
2487                                            GNUNET_SCHEDULER_NO_TASK,
2488                                            get_processing_delay (),
2489                                            &forward_request_task,
2490                                            pr);
2491   GNUNET_free (pgc); 
2492 }
2493 /**
2494  * Transmit the given message by copying it to
2495  * the target buffer "buf".  "buf" will be
2496  * NULL and "size" zero if the socket was closed for
2497  * writing in the meantime.  In that case, only
2498
2499  * free the message
2500  *
2501  * @param cls closure, pointer to the message
2502  * @param size number of bytes available in buf
2503  * @param buf where the callee should write the message
2504  * @return number of bytes written to buf
2505  */
2506 static size_t
2507 transmit_message (void *cls,
2508                   size_t size, void *buf)
2509 {
2510   struct GNUNET_MessageHeader *msg = cls;
2511   uint16_t msize;
2512   
2513   if (NULL == buf)
2514     {
2515 #if DEBUG_FS
2516       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2517                   "Dropping reply, core too busy.\n");
2518 #endif
2519       GNUNET_free (msg);
2520       return 0;
2521     }
2522   msize = ntohs (msg->size);
2523   GNUNET_assert (size >= msize);
2524   memcpy (buf, msg, msize);
2525   GNUNET_free (msg);
2526   return msize;
2527 }
2528
2529
2530 /**
2531  * Test if the load on this peer is too high
2532  * to even consider processing the query at
2533  * all.
2534  * 
2535  * @return GNUNET_YES if the load is too high, GNUNET_NO otherwise
2536  */
2537 static int
2538 test_load_too_high ()
2539 {
2540   return GNUNET_NO; // FIXME
2541 }
2542
2543
2544 /**
2545  * We're processing (local) results for a search request
2546  * from another peer.  Pass applicable results to the
2547  * peer and if we are done either clean up (operation
2548  * complete) or forward to other peers (more results possible).
2549  *
2550  * @param cls our closure (struct LocalGetContext)
2551  * @param key key for the content
2552  * @param size number of bytes in data
2553  * @param data content stored
2554  * @param type type of the content
2555  * @param priority priority of the content
2556  * @param anonymity anonymity-level for the content
2557  * @param expiration expiration time for the content
2558  * @param uid unique identifier for the datum;
2559  *        maybe 0 if no unique identifier is available
2560  */
2561 static void
2562 process_p2p_get_result (void *cls,
2563                         const GNUNET_HashCode * key,
2564                         uint32_t size,
2565                         const void *data,
2566                         uint32_t type,
2567                         uint32_t priority,
2568                         uint32_t anonymity,
2569                         struct GNUNET_TIME_Absolute
2570                         expiration, 
2571                         uint64_t uid)
2572 {
2573   struct ProcessGetContext *pgc = cls;
2574   GNUNET_HashCode dhash;
2575   GNUNET_HashCode mhash;
2576   struct PutMessage *reply;
2577   
2578   if (NULL == key)
2579     {
2580       /* no more results */
2581       if ( ( (pgc->policy & ROUTING_POLICY_FORWARD) ==  ROUTING_POLICY_FORWARD) &&
2582            ( (0 == pgc->results_found) ||
2583              (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
2584              (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
2585              (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) ) )
2586         {
2587           GNUNET_SCHEDULER_add_continuation (sched,
2588                                              GNUNET_NO,
2589                                              &forward_get_request,
2590                                              pgc,
2591                                              GNUNET_SCHEDULER_REASON_PREREQ_DONE);
2592         }
2593       else
2594         {
2595           if (pgc->bf != NULL)
2596             GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2597           GNUNET_free (pgc); 
2598         }
2599       next_ds_request ();
2600       return;
2601     }
2602   if (type == GNUNET_DATASTORE_BLOCKTYPE_ONDEMAND)
2603     {
2604       handle_on_demand_block (key, size, data, type, priority, 
2605                               anonymity, expiration, uid,
2606                               &process_p2p_get_result,
2607                               pgc);
2608       return;
2609     }
2610   /* check for duplicates */
2611   GNUNET_CRYPTO_hash (data, size, &dhash);
2612   mingle_hash (&dhash, 
2613                pgc->mingle,
2614                &mhash);
2615   if ( (pgc->bf != NULL) &&
2616        (GNUNET_YES ==
2617         GNUNET_CONTAINER_bloomfilter_test (pgc->bf,
2618                                            &mhash)) )
2619     {      
2620 #if DEBUG_FS
2621       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2622                   "Result from datastore filtered by bloomfilter.\n");
2623 #endif
2624       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2625       return;
2626     }
2627   pgc->results_found++;
2628   if ( (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
2629        (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
2630        (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) )
2631     {
2632       if (pgc->bf == NULL)
2633         {
2634           pgc->bf_size = 32;
2635           pgc->bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
2636                                                        pgc->bf_size, 
2637                                                        BLOOMFILTER_K);
2638         }
2639       GNUNET_CONTAINER_bloomfilter_add (pgc->bf, 
2640                                         &mhash);
2641     }
2642
2643   reply = GNUNET_malloc (sizeof (struct PutMessage) + size);
2644   reply->header.size = htons (sizeof (struct PutMessage) + size);
2645   reply->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2646   reply->type = htonl (type);
2647   reply->expiration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (expiration));
2648   memcpy (&reply[1], data, size);
2649   GNUNET_CORE_notify_transmit_ready (core,
2650                                      pgc->priority,
2651                                      ACCEPTABLE_REPLY_DELAY,
2652                                      &pgc->reply_to,
2653                                      sizeof (struct PutMessage) + size,
2654                                      &transmit_message,
2655                                      reply);
2656   if ( (GNUNET_YES == test_load_too_high()) ||
2657        (pgc->results_found > 5 + 2 * pgc->priority) )
2658     {
2659       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
2660       pgc->policy &= ~ ROUTING_POLICY_FORWARD;
2661       return;
2662     }
2663   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2664 }
2665   
2666
2667 /**
2668  * We're processing a GET request from another peer.  Give it to our
2669  * local datastore.
2670  *
2671  * @param cls our "struct ProcessGetContext"
2672  * @param ok did we get a datastore slice or not?
2673  */
2674 static void
2675 ds_get_request (void *cls, 
2676                 int ok)
2677 {
2678   struct ProcessGetContext *pgc = cls;
2679   uint32_t type;
2680   struct GNUNET_TIME_Relative timeout;
2681
2682   if (GNUNET_OK != ok)
2683     {
2684       /* no point in doing P2P stuff if we can't even do local */
2685       GNUNET_free (dsh);
2686       return;
2687     }
2688   type = pgc->type;
2689   if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK)
2690     type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */
2691   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
2692                                            (pgc->priority + 1));
2693   GNUNET_DATASTORE_get (dsh,
2694                         &pgc->query,
2695                         type,
2696                         &process_p2p_get_result,
2697                         pgc,
2698                         timeout);
2699 }
2700
2701
2702 /**
2703  * The priority level imposes a bound on the maximum
2704  * value for the ttl that can be requested.
2705  *
2706  * @param ttl_in requested ttl
2707  * @param prio given priority
2708  * @return ttl_in if ttl_in is below the limit,
2709  *         otherwise the ttl-limit for the given priority
2710  */
2711 static int32_t
2712 bound_ttl (int32_t ttl_in, uint32_t prio)
2713 {
2714   unsigned long long allowed;
2715
2716   if (ttl_in <= 0)
2717     return ttl_in;
2718   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
2719   if (ttl_in > allowed)      
2720     {
2721       if (allowed >= (1 << 30))
2722         return 1 << 30;
2723       return allowed;
2724     }
2725   return ttl_in;
2726 }
2727
2728
2729 /**
2730  * We've received a request with the specified
2731  * priority.  Bound it according to how much
2732  * we trust the given peer.
2733  * 
2734  * @param prio_in requested priority
2735  * @param peer the peer making the request
2736  * @return effective priority
2737  */
2738 static uint32_t
2739 bound_priority (uint32_t prio_in,
2740                 const struct GNUNET_PeerIdentity *peer)
2741 {
2742   return 0; // FIXME!
2743 }
2744
2745
2746 /**
2747  * Handle P2P "GET" request.
2748  *
2749  * @param cls closure, always NULL
2750  * @param other the other peer involved (sender or receiver, NULL
2751  *        for loopback messages where we are both sender and receiver)
2752  * @param message the actual message
2753  * @return GNUNET_OK to keep the connection open,
2754  *         GNUNET_SYSERR to close it (signal serious error)
2755  */
2756 static int
2757 handle_p2p_get (void *cls,
2758                 const struct GNUNET_PeerIdentity *other,
2759                 const struct GNUNET_MessageHeader *message)
2760 {
2761   uint16_t msize;
2762   const struct GetMessage *gm;
2763   unsigned int bits;
2764   const GNUNET_HashCode *opt;
2765   struct ProcessGetContext *pgc;
2766   uint32_t bm;
2767   size_t bfsize;
2768   uint32_t ttl_decrement;
2769   double preference;
2770   int net_load_up;
2771   int net_load_down;
2772
2773   msize = ntohs(message->size);
2774   if (msize < sizeof (struct GetMessage))
2775     {
2776       GNUNET_break_op (0);
2777       return GNUNET_SYSERR;
2778     }
2779   gm = (const struct GetMessage*) message;
2780   bm = ntohl (gm->hash_bitmap);
2781   bits = 0;
2782   while (bm > 0)
2783     {
2784       if (1 == (bm & 1))
2785         bits++;
2786       bm >>= 1;
2787     }
2788   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
2789     {
2790       GNUNET_break_op (0);
2791       return GNUNET_SYSERR;
2792     }  
2793   opt = (const GNUNET_HashCode*) &gm[1];
2794   bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
2795   pgc = GNUNET_malloc (sizeof (struct ProcessGetContext));
2796   if (bfsize > 0)
2797     {
2798       pgc->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &pgc[1],
2799                                                    bfsize,
2800                                                    BLOOMFILTER_K);
2801       pgc->bf_size = bfsize;
2802     }
2803   pgc->type = ntohl (gm->type);
2804   pgc->bm = ntohl (gm->hash_bitmap);
2805   pgc->mingle = gm->filter_mutator;
2806   bits = 0;
2807   if (0 != (pgc->bm & GET_MESSAGE_BIT_RETURN_TO))
2808     pgc->reply_to.hashPubKey = opt[bits++];
2809   else
2810     pgc->reply_to = *other;
2811   if (0 != (pgc->bm & GET_MESSAGE_BIT_SKS_NAMESPACE))
2812     pgc->namespace = opt[bits++];
2813   else if (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)
2814     {
2815       GNUNET_break_op (0);
2816       GNUNET_free (pgc);
2817       return GNUNET_SYSERR;
2818     }
2819   if (0 != (pgc->bm & GET_MESSAGE_BIT_TRANSMIT_TO))
2820     pgc->prime_target.hashPubKey = opt[bits++];
2821   /* note that we can really only check load here since otherwise
2822      peers could find out that we are overloaded by being disconnected
2823      after sending us a malformed query... */
2824   if (GNUNET_YES == test_load_too_high ())
2825     {
2826       if (NULL != pgc->bf)
2827         GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2828       GNUNET_free (pgc);
2829 #if DEBUG_FS
2830       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2831                   "Dropping query from `%s', this peer is too busy.\n",
2832                   GNUNET_h2s (other));
2833 #endif
2834       return GNUNET_OK;
2835     }
2836   net_load_up = 50; // FIXME
2837   net_load_down = 50; // FIXME
2838   pgc->policy = ROUTING_POLICY_NONE;
2839   if ( (net_load_up < IDLE_LOAD_THRESHOLD) &&
2840        (net_load_down < IDLE_LOAD_THRESHOLD) )
2841     {
2842       pgc->policy |= ROUTING_POLICY_ALL;
2843       pgc->priority = 0; /* no charge */
2844     }
2845   else
2846     {
2847       pgc->priority = bound_priority (ntohl (gm->priority), other);
2848       if ( (net_load_up < 
2849             IDLE_LOAD_THRESHOLD + pgc->priority * pgc->priority) &&
2850            (net_load_down < 
2851             IDLE_LOAD_THRESHOLD + pgc->priority * pgc->priority) )
2852         {
2853           pgc->policy |= ROUTING_POLICY_ALL;
2854         }
2855       else
2856         {
2857           // FIXME: is this sound?
2858           if (net_load_up < 90 + 10 * pgc->priority)
2859             pgc->policy |= ROUTING_POLICY_FORWARD;
2860           if (net_load_down < 90 + 10 * pgc->priority)
2861             pgc->policy |= ROUTING_POLICY_ANSWER;
2862         }
2863     }
2864   if (pgc->policy == ROUTING_POLICY_NONE)
2865     {
2866 #if DEBUG_FS
2867       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2868                   "Dropping query from `%s', network saturated.\n",
2869                   GNUNET_h2s (other));
2870 #endif
2871       if (NULL != pgc->bf)
2872         GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2873       GNUNET_free (pgc);
2874       return GNUNET_OK;     /* drop */
2875     }
2876   if ((pgc->policy & ROUTING_POLICY_INDIRECT) != ROUTING_POLICY_INDIRECT)
2877     pgc->priority = 0;  /* kill the priority (we cannot benefit) */
2878   pgc->ttl = bound_ttl (ntohl (gm->ttl), pgc->priority);
2879   /* decrement ttl (always) */
2880   ttl_decrement = 2 * TTL_DECREMENT +
2881     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2882                               TTL_DECREMENT);
2883   if ( (pgc->ttl < 0) &&
2884        (pgc->ttl - ttl_decrement > 0) )
2885     {
2886 #if DEBUG_FS
2887       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2888                   "Dropping query from `%s' due to TTL underflow.\n",
2889                   GNUNET_h2s (other));
2890 #endif
2891       /* integer underflow => drop (should be very rare)! */
2892       if (NULL != pgc->bf)
2893         GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2894       GNUNET_free (pgc);
2895       return GNUNET_OK;
2896     }
2897   pgc->ttl -= ttl_decrement;
2898   pgc->start_time = GNUNET_TIME_absolute_get ();
2899   preference = (double) pgc->priority;
2900   if (preference < QUERY_BANDWIDTH_VALUE)
2901     preference = QUERY_BANDWIDTH_VALUE;
2902   // FIXME: also reserve bandwidth for reply?
2903   GNUNET_CORE_peer_configure (core,
2904                               other,
2905                               GNUNET_TIME_UNIT_FOREVER_REL,
2906                               0, 0, preference, NULL, NULL);
2907   if (0 != (pgc->policy & ROUTING_POLICY_ANSWER))
2908     pgc->drq = queue_ds_request (BASIC_DATASTORE_REQUEST_DELAY,
2909                                  &ds_get_request,
2910                                  pgc);
2911   else
2912     GNUNET_SCHEDULER_add_continuation (sched,
2913                                        GNUNET_NO,
2914                                        &forward_get_request,
2915                                        pgc,
2916                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
2917   return GNUNET_OK;
2918 }
2919
2920
2921 /**
2922  * Function called to notify us that we can now transmit a reply to a
2923  * client or peer.  "buf" will be NULL and "size" zero if the socket was
2924  * closed for writing in the meantime.
2925  *
2926  * @param cls closure, points to a "struct PendingRequest*" with
2927  *            one or more pending replies
2928  * @param size number of bytes available in buf
2929  * @param buf where the callee should write the message
2930  * @return number of bytes written to buf
2931  */
2932 static size_t
2933 transmit_result (void *cls,
2934                  size_t size, 
2935                  void *buf)
2936 {
2937   struct PendingRequest *pr = cls;
2938   char *cbuf = buf;
2939   struct PendingReply *reply;
2940   size_t ret;
2941
2942   ret = 0;
2943   while (NULL != (reply = pr->replies_pending))
2944     {
2945       if ( (reply->msize + ret < ret) ||
2946            (reply->msize + ret > size) )
2947         break;
2948       pr->replies_pending = reply->next;
2949       memcpy (&cbuf[ret], &reply[1], reply->msize);
2950       ret += reply->msize;
2951       GNUNET_free (pr);
2952     }
2953   return 0;
2954 }
2955
2956
2957 /**
2958  * Iterator over pending requests.
2959  *
2960  * @param cls response (struct ProcessReplyClosure)
2961  * @param key our query
2962  * @param value value in the hash map (meta-info about the query)
2963  * @return GNUNET_YES (we should continue to iterate)
2964  */
2965 static int
2966 process_reply (void *cls,
2967                const GNUNET_HashCode * key,
2968                void *value)
2969 {
2970   struct ProcessReplyClosure *prq = cls;
2971   struct PendingRequest *pr = value;
2972   struct PendingRequest *eer;
2973   struct PendingReply *reply;
2974   struct PutMessage *pm;
2975   struct ContentMessage *cm;
2976   GNUNET_HashCode chash;
2977   GNUNET_HashCode mhash;
2978   struct GNUNET_PeerIdentity target;
2979   size_t msize;
2980   uint32_t prio;
2981   struct GNUNET_TIME_Relative max_delay;
2982   
2983   GNUNET_CRYPTO_hash (prq->data,
2984                       prq->size,
2985                       &chash);
2986   switch (prq->type)
2987     {
2988     case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
2989     case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
2990       break;
2991     case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
2992       /* FIXME: does prq->namespace match our expectations? */
2993       /* then: fall-through??? */
2994     case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
2995       if (pr->bf != NULL) 
2996         {
2997           mingle_hash (&chash, pr->mingle, &mhash);
2998           if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (pr->bf,
2999                                                                &mhash))
3000             return GNUNET_YES; /* duplicate */
3001           GNUNET_CONTAINER_bloomfilter_add (pr->bf,
3002                                             &mhash);
3003         }
3004       break;
3005     case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK:
3006       // FIXME: any checks against duplicates for SKBlocks?
3007       break;
3008     }
3009   prio = pr->priority;
3010   prq->priority += pr->remaining_priority;
3011   pr->remaining_priority = 0;
3012   if (pr->client != NULL)
3013     {
3014       if (pr->replies_seen_size == pr->replies_seen_off)
3015         GNUNET_array_grow (pr->replies_seen,
3016                            pr->replies_seen_size,
3017                            pr->replies_seen_size * 2 + 4);
3018       pr->replies_seen[pr->replies_seen_off++] = chash;
3019       // FIXME: possibly recalculate BF!
3020     }
3021   if (pr->client == NULL)
3022     {
3023       msize = sizeof (struct ContentMessage) + prq->size;
3024       reply = GNUNET_malloc (msize + sizeof (struct PendingReply));
3025       reply->msize = msize;
3026       cm = (struct ContentMessage*) &reply[1];
3027       cm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT);
3028       cm->header.size = htons (msize);
3029       cm->type = htonl (prq->type);
3030       cm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
3031       reply->next = pr->replies_pending;
3032       pr->replies_pending = reply;
3033       memcpy (&reply[1], prq->data, prq->size);
3034       if (pr->cth != NULL)
3035         return GNUNET_YES;
3036       max_delay = GNUNET_TIME_UNIT_FOREVER_REL;
3037       if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) >= max_pending_requests)
3038         {
3039           /* estimate expiration time from time difference between
3040              first request that will be discarded and this request */
3041           eer = GNUNET_CONTAINER_heap_peek (requests_by_expiration);
3042           max_delay = GNUNET_TIME_absolute_get_difference (pr->start_time,
3043                                                            eer->start_time);
3044         }
3045       GNUNET_PEER_resolve (pr->source_pid,
3046                            &target);
3047       pr->cth = GNUNET_CORE_notify_transmit_ready (core,
3048                                                    prio,
3049                                                    max_delay,
3050                                                    &target,
3051                                                    msize,
3052                                                    &transmit_result,
3053                                                    pr);
3054       if (NULL == pr->cth)
3055         {
3056           // FIXME: now what? discard?
3057         }
3058     }
3059   else
3060     {
3061       msize = sizeof (struct PutMessage) + prq->size;
3062       reply = GNUNET_malloc (msize + sizeof (struct PendingReply));
3063       reply->msize = msize;
3064       reply->next = pr->replies_pending;
3065       pm = (struct PutMessage*) &reply[1];
3066       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
3067       pm->header.size = htons (msize);
3068       pm->type = htonl (prq->type);
3069       pm->expiration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (prq->expiration));
3070       pr->replies_pending = reply;
3071       memcpy (&reply[1], prq->data, prq->size);
3072       if (pr->th != NULL)
3073         return GNUNET_YES;
3074       pr->th = GNUNET_SERVER_notify_transmit_ready (pr->client,
3075                                                     msize,
3076                                                     GNUNET_TIME_UNIT_FOREVER_REL,
3077                                                     &transmit_result,
3078                                                     pr);
3079       if (pr->th == NULL)
3080         {
3081           // FIXME: need to try again later (not much
3082           // to do here specifically, but we need to
3083           // check somewhere else to handle this case!)
3084         }
3085     }
3086   // FIXME: implement hot-path routing statistics keeping!
3087   return GNUNET_YES;
3088 }
3089
3090
3091 /**
3092  * Check if the given KBlock is well-formed.
3093  *
3094  * @param kb the kblock data (or at least "dsize" bytes claiming to be one)
3095  * @param dsize size of "kb" in bytes; check for < sizeof(struct KBlock)!
3096  * @param query where to store the query that this block answers
3097  * @return GNUNET_OK if this is actually a well-formed KBlock
3098  */
3099 static int
3100 check_kblock (const struct KBlock *kb,
3101               size_t dsize,
3102               GNUNET_HashCode *query)
3103 {
3104   if (dsize < sizeof (struct KBlock))
3105     {
3106       GNUNET_break_op (0);
3107       return GNUNET_SYSERR;
3108     }
3109   if (dsize - sizeof (struct KBlock) !=
3110       ntohs (kb->purpose.size) 
3111       - sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) 
3112       - sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) ) 
3113     {
3114       GNUNET_break_op (0);
3115       return GNUNET_SYSERR;
3116     }
3117   if (GNUNET_OK !=
3118       GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_KBLOCK,
3119                                 &kb->purpose,
3120                                 &kb->signature,
3121                                 &kb->keyspace)) 
3122     {
3123       GNUNET_break_op (0);
3124       return GNUNET_SYSERR;
3125     }
3126   if (query != NULL)
3127     GNUNET_CRYPTO_hash (&kb->keyspace,
3128                         sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
3129                         query);
3130   return GNUNET_OK;
3131 }
3132
3133
3134 /**
3135  * Check if the given SBlock is well-formed.
3136  *
3137  * @param sb the sblock data (or at least "dsize" bytes claiming to be one)
3138  * @param dsize size of "kb" in bytes; check for < sizeof(struct SBlock)!
3139  * @param query where to store the query that this block answers
3140  * @param namespace where to store the namespace that this block belongs to
3141  * @return GNUNET_OK if this is actually a well-formed SBlock
3142  */
3143 static int
3144 check_sblock (const struct SBlock *sb,
3145               size_t dsize,
3146               GNUNET_HashCode *query,   
3147               GNUNET_HashCode *namespace)
3148 {
3149   if (dsize < sizeof (struct SBlock))
3150     {
3151       GNUNET_break_op (0);
3152       return GNUNET_SYSERR;
3153     }
3154   if (dsize !=
3155       ntohs (sb->purpose.size) + sizeof (struct GNUNET_CRYPTO_RsaSignature))
3156     {
3157       GNUNET_break_op (0);
3158       return GNUNET_SYSERR;
3159     }
3160   if (GNUNET_OK !=
3161       GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_SBLOCK,
3162                                 &sb->purpose,
3163                                 &sb->signature,
3164                                 &sb->subspace)) 
3165     {
3166       GNUNET_break_op (0);
3167       return GNUNET_SYSERR;
3168     }
3169   if (query != NULL)
3170     *query = sb->identifier;
3171   if (namespace != NULL)
3172     GNUNET_CRYPTO_hash (&sb->subspace,
3173                         sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
3174                         namespace);
3175   return GNUNET_OK;
3176 }
3177
3178
3179 /**
3180  * Handle P2P "PUT" request.
3181  *
3182  * @param cls closure, always NULL
3183  * @param other the other peer involved (sender or receiver, NULL
3184  *        for loopback messages where we are both sender and receiver)
3185  * @param message the actual message
3186  * @return GNUNET_OK to keep the connection open,
3187  *         GNUNET_SYSERR to close it (signal serious error)
3188  */
3189 static int
3190 handle_p2p_put (void *cls,
3191                 const struct GNUNET_PeerIdentity *other,
3192                 const struct GNUNET_MessageHeader *message)
3193 {
3194   const struct PutMessage *put;
3195   uint16_t msize;
3196   size_t dsize;
3197   uint32_t type;
3198   struct GNUNET_TIME_Absolute expiration;
3199   GNUNET_HashCode query;
3200   struct ProcessReplyClosure prq;
3201
3202   msize = ntohs (message->size);
3203   if (msize < sizeof (struct PutMessage))
3204     {
3205       GNUNET_break_op(0);
3206       return GNUNET_SYSERR;
3207     }
3208   put = (const struct PutMessage*) message;
3209   dsize = msize - sizeof (struct PutMessage);
3210   type = ntohl (put->type);
3211   expiration = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (put->expiration));
3212
3213   /* first, validate! */
3214   switch (type)
3215     {
3216     case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
3217     case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
3218       GNUNET_CRYPTO_hash (&put[1], dsize, &query);
3219       break;
3220     case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
3221       if (GNUNET_OK !=
3222           check_kblock ((const struct KBlock*) &put[1],
3223                         dsize,
3224                         &query))
3225         return GNUNET_SYSERR;
3226       break;
3227     case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
3228       if (GNUNET_OK !=
3229           check_sblock ((const struct SBlock*) &put[1],
3230                         dsize,
3231                         &query,
3232                         &prq.namespace))
3233         return GNUNET_SYSERR;
3234       break;
3235     case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK:
3236       // FIXME -- validate SKBLOCK!
3237       GNUNET_break (0);
3238       return GNUNET_OK;
3239     default:
3240       /* unknown block type */
3241       GNUNET_break_op (0);
3242       return GNUNET_SYSERR;
3243     }
3244
3245   /* now, lookup 'query' */
3246   prq.data = (const void*) &put[1];
3247   prq.size = dsize;
3248   prq.type = type;
3249   prq.expiration = expiration;
3250   prq.priority = 0;
3251   GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_query,
3252                                               &query,
3253                                               &process_reply,
3254                                               &prq);
3255   // FIXME: if migration is on and load is low,
3256   // queue to store data in datastore;
3257   // use "prq.priority" for that!
3258   return GNUNET_OK;
3259 }
3260
3261
3262 /**
3263  * List of handlers for P2P messages
3264  * that we care about.
3265  */
3266 static struct GNUNET_CORE_MessageHandler p2p_handlers[] =
3267   {
3268     { &handle_p2p_get, 
3269       GNUNET_MESSAGE_TYPE_FS_GET, 0 },
3270     { &handle_p2p_put, 
3271       GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
3272     { NULL, 0, 0 }
3273   };
3274
3275
3276 /**
3277  * Task that will try to initiate a connection with the
3278  * core service.
3279  * 
3280  * @param cls unused
3281  * @param tc unused
3282  */
3283 static void
3284 core_connect_task (void *cls,
3285                    const struct GNUNET_SCHEDULER_TaskContext *tc);
3286
3287
3288 /**
3289  * Function called by the core after we've
3290  * connected.
3291  *
3292  * @param cls closure, unused
3293  * @param server handle to the core service
3294  * @param my_identity our peer identity (unused)
3295  * @param publicKey our public key (unused)
3296  */
3297 static void
3298 core_start_cb (void *cls,
3299                struct GNUNET_CORE_Handle * server,
3300                const struct GNUNET_PeerIdentity *
3301                my_identity,
3302                const struct
3303                GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *
3304                publicKey)
3305 {
3306   if (server == NULL)
3307     {
3308       GNUNET_SCHEDULER_add_delayed (sched,
3309                                     GNUNET_NO,
3310                                     GNUNET_SCHEDULER_PRIORITY_HIGH,
3311                                     GNUNET_SCHEDULER_NO_TASK,
3312                                     GNUNET_TIME_UNIT_SECONDS,
3313                                     &core_connect_task,
3314                                     NULL);
3315       return;
3316     }
3317   core = server;
3318 }
3319
3320
3321 /**
3322  * Task that will try to initiate a connection with the
3323  * core service.
3324  * 
3325  * @param cls unused
3326  * @param tc unused
3327  */
3328 static void
3329 core_connect_task (void *cls,
3330                    const struct GNUNET_SCHEDULER_TaskContext *tc)
3331 {
3332   GNUNET_CORE_connect (sched,
3333                        cfg,
3334                        GNUNET_TIME_UNIT_FOREVER_REL,
3335                        NULL,
3336                        &core_start_cb,
3337                        &peer_connect_handler,
3338                        &peer_disconnect_handler,
3339                        NULL, 
3340                        NULL, GNUNET_NO,
3341                        NULL, GNUNET_NO,
3342                        p2p_handlers);
3343 }
3344
3345
3346 /**
3347  * Process fs requests.
3348  *
3349  * @param cls closure
3350  * @param s scheduler to use
3351  * @param server the initialized server
3352  * @param c configuration to use
3353  */
3354 static void
3355 run (void *cls,
3356      struct GNUNET_SCHEDULER_Handle *s,
3357      struct GNUNET_SERVER_Handle *server,
3358      const struct GNUNET_CONFIGURATION_Handle *c)
3359 {
3360   sched = s;
3361   cfg = c;
3362
3363   ifm = GNUNET_CONTAINER_multihashmap_create (128);
3364   requests_by_query = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
3365   requests_by_peer = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
3366   connected_peers = GNUNET_CONTAINER_multihashmap_create (64);
3367   requests_by_expiration = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
3368   read_index_list ();
3369   dsh = GNUNET_DATASTORE_connect (cfg,
3370                                   sched);
3371   if (NULL == dsh)
3372     {
3373       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3374                   _("Failed to connect to datastore service.\n"));
3375       return;
3376     }
3377   GNUNET_SERVER_disconnect_notify (server, 
3378                                    &handle_client_disconnect,
3379                                    NULL);
3380   GNUNET_SERVER_add_handlers (server, handlers);
3381   core_connect_task (NULL, NULL);
3382   GNUNET_SCHEDULER_add_delayed (sched,
3383                                 GNUNET_YES,
3384                                 GNUNET_SCHEDULER_PRIORITY_IDLE,
3385                                 GNUNET_SCHEDULER_NO_TASK,
3386                                 GNUNET_TIME_UNIT_FOREVER_REL,
3387                                 &shutdown_task,
3388                                 NULL);
3389 }
3390
3391
3392 /**
3393  * The main function for the fs service.
3394  *
3395  * @param argc number of arguments from the command line
3396  * @param argv command line arguments
3397  * @return 0 ok, 1 on error
3398  */
3399 int
3400 main (int argc, char *const *argv)
3401 {
3402   return (GNUNET_OK ==
3403           GNUNET_SERVICE_run (argc,
3404                               argv,
3405                               "fs", &run, NULL, NULL, NULL)) ? 0 : 1;
3406 }
3407
3408 /* end of gnunet-service-fs.c */