more work on fs service
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs.c
23  * @brief program that provides the file-sharing service
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - actually DO P2P search upon P2P/CS requests (pass appropriate handler to core
28  *   and work through request list there; also update ttl/priority for our client's requests)
29  * - possible major issue: we may queue "gazillions" of (K|S)Blocks for the
30  *   core to transmit to another peer; need to make sure this is bounded overall...
31  * - randomly delay processing for improved anonymity (can wait)
32  * - content migration (put in local DS) (can wait)
33  * - handle some special cases when forwarding replies based on tracked requests (can wait)
34  * - tracking of success correlations for hot-path routing (can wait)
35  * - various load-based actions (can wait)
36  * - validation of KSBLOCKS (can wait)
37  * - remove on-demand blocks if they keep failing (can wait)
38  * - check that we decrement PIDs always where necessary (can wait)
39  */
40 #include "platform.h"
41 #include <values.h>
42 #include "gnunet_core_service.h"
43 #include "gnunet_datastore_service.h"
44 #include "gnunet_peer_lib.h"
45 #include "gnunet_protocols.h"
46 #include "gnunet_signatures.h"
47 #include "gnunet_util_lib.h"
48 #include "fs.h"
49
50
51 /**
52  * In-memory information about indexed files (also available
53  * on-disk).
54  */
55 struct IndexInfo
56 {
57   
58   /**
59    * This is a linked list.
60    */
61   struct IndexInfo *next;
62
63   /**
64    * Name of the indexed file.  Memory allocated
65    * at the end of this struct (do not free).
66    */
67   const char *filename;
68
69   /**
70    * Context for transmitting confirmation to client,
71    * NULL if we've done this already.
72    */
73   struct GNUNET_SERVER_TransmitContext *tc;
74   
75   /**
76    * Hash of the contents of the file.
77    */
78   GNUNET_HashCode file_id;
79
80 };
81
82
83 /**
84  * Signature of a function that is called whenever a datastore
85  * request can be processed (or an entry put on the queue times out).
86  *
87  * @param cls closure
88  * @param ok GNUNET_OK if DS is ready, GNUNET_SYSERR on timeout
89  */
90 typedef void (*RequestFunction)(void *cls,
91                                 int ok);
92
93
94 /**
95  * Doubly-linked list of our requests for the datastore.
96  */
97 struct DatastoreRequestQueue
98 {
99
100   /**
101    * This is a doubly-linked list.
102    */
103   struct DatastoreRequestQueue *next;
104
105   /**
106    * This is a doubly-linked list.
107    */
108   struct DatastoreRequestQueue *prev;
109
110   /**
111    * Function to call (will issue the request).
112    */
113   RequestFunction req;
114
115   /**
116    * Closure for req.
117    */
118   void *req_cls;
119
120   /**
121    * When should this request time-out because we don't care anymore?
122    */
123   struct GNUNET_TIME_Absolute timeout;
124     
125   /**
126    * ID of task used for signaling timeout.
127    */
128   GNUNET_SCHEDULER_TaskIdentifier task;
129
130 };
131
132
133 /**
134  * Closure for processing START_SEARCH messages from a client.
135  */
136 struct LocalGetContext
137 {
138
139   /**
140    * This is a doubly-linked list.
141    */
142   struct LocalGetContext *next;
143
144   /**
145    * This is a doubly-linked list.
146    */
147   struct LocalGetContext *prev;
148
149   /**
150    * Client that initiated the search.
151    */
152   struct GNUNET_SERVER_Client *client;
153
154   /**
155    * Array of results that we've already received 
156    * (can be NULL).
157    */
158   GNUNET_HashCode *results; 
159
160   /**
161    * Bloomfilter over all results (for fast query construction);
162    * NULL if we don't have any results.
163    */
164   struct GNUNET_CONTAINER_BloomFilter *results_bf; 
165
166   /**
167    * DS request associated with this operation.
168    */
169   struct DatastoreRequestQueue *req;
170
171   /**
172    * Current result message to transmit to client (or NULL).
173    */
174   struct ContentMessage *result;
175   
176   /**
177    * Type of the content that we're looking for.
178    * 0 for any.
179    */
180   uint32_t type;
181
182   /**
183    * Desired anonymity level.
184    */
185   uint32_t anonymity_level;
186
187   /**
188    * Number of results actually stored in the results array.
189    */
190   unsigned int results_used;
191   
192   /**
193    * Size of the results array in memory.
194    */
195   unsigned int results_size;
196
197   /**
198    * If the request is for a DBLOCK or IBLOCK, this is the identity of
199    * the peer that is known to have a response.  Set to all-zeros if
200    * such a target is not known (note that even if OUR anonymity
201    * level is >0 we may happen to know the responder's identity;
202    * nevertheless, we should probably not use it for a DHT-lookup
203    * or similar blunt actions in order to avoid exposing ourselves).
204    */
205   struct GNUNET_PeerIdentity target;
206
207   /**
208    * If the request is for an SBLOCK, this is the identity of the
209    * pseudonym to which the SBLOCK belongs. 
210    */
211   GNUNET_HashCode namespace;
212
213   /**
214    * Hash of the keyword (aka query) for KBLOCKs; Hash of
215    * the CHK-encoded block for DBLOCKS and IBLOCKS (aka query)
216    * and hash of the identifier XORed with the target for
217    * SBLOCKS (aka query).
218    */
219   GNUNET_HashCode query;
220
221 };
222
223
224 /**
225  * Possible routing policies for an FS-GET request.
226  */
227 enum RoutingPolicy
228   {
229     /**
230      * Simply drop the request.
231      */
232     ROUTING_POLICY_NONE = 0,
233     
234     /**
235      * Answer it if we can from local datastore.
236      */
237     ROUTING_POLICY_ANSWER = 1,
238
239     /**
240      * Forward the request to other peers (if possible).
241      */
242     ROUTING_POLICY_FORWARD = 2,
243
244     /**
245      * Forward to other peers, and ask them to route
246      * the response via ourselves.
247      */
248     ROUTING_POLICY_INDIRECT = 6,
249     
250     /**
251      * Do everything we could possibly do (that would
252      * make sense).
253      */
254     ROUTING_POLICY_ALL = 7
255   };
256
257
258 /**
259  * Internal context we use for our initial processing
260  * of a GET request.
261  */
262 struct ProcessGetContext
263 {
264   /**
265    * The search query (used for datastore lookup).
266    */
267   GNUNET_HashCode query;
268   
269   /**
270    * Which peer we should forward the response to.
271    */
272   struct GNUNET_PeerIdentity reply_to;
273
274   /**
275    * Namespace for the result (only set for SKS requests)
276    */
277   GNUNET_HashCode namespace;
278
279   /**
280    * Peer that we should forward the query to if possible
281    * (since that peer likely has the content).
282    */
283   struct GNUNET_PeerIdentity prime_target;
284
285   /**
286    * When did we receive this request?
287    */
288   struct GNUNET_TIME_Absolute start_time;
289
290   /**
291    * Our entry in the DRQ (non-NULL while we wait for our
292    * turn to interact with the local database).
293    */
294   struct DatastoreRequestQueue *drq;
295
296   /**
297    * Filter used to eliminate duplicate
298    * results.   Can be NULL if we are
299    * not yet filtering any results.
300    */
301   struct GNUNET_CONTAINER_BloomFilter *bf;
302
303   /**
304    * Bitmap describing which of the optional
305    * hash codes / peer identities were given to us.
306    */
307   uint32_t bm;
308
309   /**
310    * Desired block type.
311    */
312   uint32_t type;
313
314   /**
315    * Priority of the request.
316    */
317   uint32_t priority;
318
319   /**
320    * In what ways are we going to process
321    * the request?
322    */
323   enum RoutingPolicy policy;
324
325   /**
326    * Time-to-live for the request (value
327    * we use).
328    */
329   int32_t ttl;
330
331   /**
332    * Number to mingle hashes for bloom-filter
333    * tests with.
334    */
335   int32_t mingle;
336
337   /**
338    * Number of results that were found so far.
339    */
340   unsigned int results_found;
341 };
342
343
344 /**
345  * Information we keep for each pending reply.
346  */
347 struct PendingReply
348 {
349   /**
350    * This is a linked list.
351    */
352   struct PendingReply *next;
353
354   /**
355    * Size of the reply; actual reply message follows
356    * at the end of this struct.
357    */
358   size_t msize;
359
360 };
361
362
363 /**
364  * All requests from a client are kept in a doubly-linked list.
365  */
366 struct ClientRequestList;
367
368
369 /**
370  * Information we keep for each pending request.  We should try to
371  * keep this struct as small as possible since its memory consumption
372  * is key to how many requests we can have pending at once.
373  */
374 struct PendingRequest
375 {
376
377   /**
378    * ID of a client making a request, NULL if this entry is for a
379    * peer.
380    */
381   struct GNUNET_SERVER_Client *client;
382
383   /**
384    * If this request was made by a client,
385    * this is our entry in the client request
386    * list; otherwise NULL.
387    */
388   struct ClientRequestList *crl_entry;
389
390   /**
391    * If this is a namespace query, pointer to the hash of the public
392    * key of the namespace; otherwise NULL.
393    */
394   GNUNET_HashCode *namespace;
395
396   /**
397    * Bloomfilter we use to filter out replies that we don't care about
398    * (anymore).  NULL as long as we are interested in all replies.
399    */
400   struct GNUNET_CONTAINER_BloomFilter *bf;
401
402   /**
403    * Replies that we have received but were unable to forward yet
404    * (typically non-null only if we have a pending transmission
405    * request with the client or the respective peer).
406    */
407   struct PendingReply *replies_pending;
408
409   /**
410    * Pending transmission request with the core service for the target
411    * peer (for processing of 'replies_pending') or Handle for a
412    * pending query-request for P2P-transmission with the core service.
413    * If non-NULL, this request must be cancelled should this struct be
414    * destroyed!
415    */
416   struct GNUNET_CORE_TransmitHandle *cth;
417
418   /**
419    * Pending transmission request for the target client (for processing of
420    * 'replies_pending').
421    */
422   struct GNUNET_CONNECTION_TransmitHandle *th;
423
424   /**
425    * Hash code of all replies that we have seen so far (only valid
426    * if client is not NULL since we only track replies like this for
427    * our own clients).
428    */
429   GNUNET_HashCode *replies_seen;
430
431   /**
432    * When did we first see this request (form this peer), or, if our
433    * client is initiating, when did we last initiate a search?
434    */
435   struct GNUNET_TIME_Absolute start_time;
436
437   /**
438    * The query that this request is for.
439    */
440   GNUNET_HashCode query;
441
442   /**
443    * The task responsible for transmitting queries
444    * for this request.
445    */
446   GNUNET_SCHEDULER_TaskIdentifier task;
447
448   /**
449    * (Interned) Peer identifier (only valid if "client" is NULL)
450    * that identifies a peer that gave us this request.
451    */
452   GNUNET_PEER_Id source_pid;
453
454   /**
455    * (Interned) Peer identifier that identifies a preferred target
456    * for requests.
457    */
458   GNUNET_PEER_Id target_pid;
459
460   /**
461    * (Interned) Peer identifiers of peers that have already
462    * received our query for this content.
463    */
464   GNUNET_PEER_Id *used_pids;
465
466   /**
467    * Desired anonymity level; only valid for requests from a local client.
468    */
469   uint32_t anonymity_level;
470
471   /**
472    * How many entries in "used_pids" are actually valid?
473    */
474   unsigned int used_pids_off;
475
476   /**
477    * How long is the "used_pids" array?
478    */
479   unsigned int used_pids_size;
480
481   /**
482    * How many entries in "replies_seen" are actually valid?
483    */
484   unsigned int replies_seen_off;
485
486   /**
487    * How long is the "replies_seen" array?
488    */
489   unsigned int replies_seen_size;
490   
491   /**
492    * Priority with which this request was made.  If one of our clients
493    * made the request, then this is the current priority that we are
494    * using when initiating the request.  This value is used when
495    * we decide to reward other peers with trust for providing a reply.
496    */
497   uint32_t priority;
498
499   /**
500    * Priority points left for us to spend when forwarding this request
501    * to other peers.
502    */
503   uint32_t remaining_priority;
504
505   /**
506    * Number to mingle hashes for bloom-filter
507    * tests with.
508    */
509   int32_t mingle;
510
511   /**
512    * TTL with which we saw this request (or, if we initiated, TTL that
513    * we used for the request).
514    */
515   int32_t ttl;
516   
517   /**
518    * Type of the content that this request is for.
519    */
520   uint32_t type;
521
522 };
523
524
525 /**
526  * All requests from a client are kept in a doubly-linked list.
527  */
528 struct ClientRequestList
529 {
530   /**
531    * This is a doubly-linked list.
532    */
533   struct ClientRequestList *next;
534
535   /**
536    * This is a doubly-linked list.
537    */ 
538   struct ClientRequestList *prev;
539
540   /**
541    * A request from this client.
542    */
543   struct PendingRequest *req;
544
545   /**
546    * Client list with the head and tail of this DLL.
547    */
548   struct ClientList *cl;
549 };
550
551
552 /**
553  * Linked list of all clients that we are 
554  * currently processing requests for.
555  */
556 struct ClientList
557 {
558
559   /**
560    * This is a linked list.
561    */
562   struct ClientList *next;
563
564   /**
565    * What client is this entry for?
566    */
567   struct GNUNET_SERVER_Client* client;
568
569   /**
570    * Head of the DLL of requests from this client.
571    */
572   struct ClientRequestList *head;
573
574   /**
575    * Tail of the DLL of requests from this client.
576    */
577   struct ClientRequestList *tail;
578
579 };
580
581
582 /**
583  * Closure for "process_reply" function.
584  */
585 struct ProcessReplyClosure
586 {
587   /**
588    * The data for the reply.
589    */
590   const void *data;
591
592   /**
593    * When the reply expires.
594    */
595   struct GNUNET_TIME_Absolute expiration;
596
597   /**
598    * Size of data.
599    */
600   size_t size;
601
602   /**
603    * Namespace that this reply belongs to
604    * (if it is of type SBLOCK).
605    */
606   GNUNET_HashCode namespace;
607
608   /**
609    * Type of the block.
610    */
611   uint32_t type;
612
613   /**
614    * How much was this reply worth to us?
615    */
616   uint32_t priority;
617 };
618
619
620 /**
621  * Information about a peer that we are connected to.
622  * We track data that is useful for determining which
623  * peers should receive our requests.
624  */
625 struct ConnectedPeer
626 {
627
628   /**
629    * List of the last clients for which this peer
630    * successfully answered a query. 
631    */
632   struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
633
634   /**
635    * List of the last PIDs for which
636    * this peer successfully answered a query;
637    * We use 0 to indicate no successful reply.
638    */
639   GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
640
641   /**
642    * Average delay between sending the peer a request and
643    * getting a reply (only calculated over the requests for
644    * which we actually got a reply).   Calculated
645    * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
646    */ 
647   struct GNUNET_TIME_Relative avg_delay;
648
649   /**
650    * Average priority of successful replies.  Calculated
651    * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
652    */
653   double avg_priority;
654
655   /**
656    * The peer's identity.
657    */
658   GNUNET_PEER_Id pid;  
659
660   /**
661    * Number of requests we have currently pending
662    * with this peer (that is, requests that were
663    * transmitted so recently that we would not retransmit
664    * them right now).
665    */
666   unsigned int pending_requests;
667
668   /**
669    * Which offset in "last_p2p_replies" will be updated next?
670    * (we go round-robin).
671    */
672   unsigned int last_p2p_replies_woff;
673
674   /**
675    * Which offset in "last_client_replies" will be updated next?
676    * (we go round-robin).
677    */
678   unsigned int last_client_replies_woff;
679
680 };
681
682
683 /**
684  * Our connection to the datastore.
685  */
686 static struct GNUNET_DATASTORE_Handle *dsh;
687
688 /**
689  * Our scheduler.
690  */
691 static struct GNUNET_SCHEDULER_Handle *sched;
692
693 /**
694  * Our configuration.
695  */
696 const struct GNUNET_CONFIGURATION_Handle *cfg;
697
698 /**
699  * Handle to the core service (NULL until we've connected to it).
700  */
701 struct GNUNET_CORE_Handle *core;
702
703 /**
704  * Head of doubly-linked LGC list.
705  */
706 static struct LocalGetContext *lgc_head;
707
708 /**
709  * Tail of doubly-linked LGC list.
710  */
711 static struct LocalGetContext *lgc_tail;
712
713 /**
714  * Head of request queue for the datastore, sorted by timeout.
715  */
716 static struct DatastoreRequestQueue *drq_head;
717
718 /**
719  * Tail of request queue for the datastore.
720  */
721 static struct DatastoreRequestQueue *drq_tail;
722
723 /**
724  * Linked list of indexed files.
725  */
726 static struct IndexInfo *indexed_files;
727
728 /**
729  * Maps hash over content of indexed files to the respective filename.
730  * The filenames are pointers into the indexed_files linked list and
731  * do not need to be freed.
732  */
733 static struct GNUNET_CONTAINER_MultiHashMap *ifm;
734
735 /**
736  * Map of query hash codes to requests.
737  */
738 static struct GNUNET_CONTAINER_MultiHashMap *requests_by_query;
739
740 /**
741  * Map of peer IDs to requests (for those requests coming
742  * from other peers).
743  */
744 static struct GNUNET_CONTAINER_MultiHashMap *requests_by_peer;
745
746 /**
747  * Linked list of all of our clients and their requests.
748  */
749 static struct ClientList *clients;
750
751 /**
752  * Heap with the request that will expire next at the top.  Contains
753  * pointers of type "struct PendingRequest*"; these will *also* be
754  * aliased from the "requests_by_peer" data structures and the
755  * "requests_by_query" table.  Note that requests from our clients
756  * don't expire and are thus NOT in the "requests_by_expiration"
757  * (or the "requests_by_peer" tables).
758  */
759 static struct GNUNET_CONTAINER_Heap *requests_by_expiration;
760
761 /**
762  * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
763  */
764 static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
765
766 /**
767  * Maximum number of requests (from other peers) that we're
768  * willing to have pending at any given point in time.
769  * FIXME: set from configuration (and 32 is a tiny value for testing only).
770  */
771 static uint64_t max_pending_requests = 32;
772
773 /**
774  * Write the current index information list to disk.
775  */ 
776 static void
777 write_index_list ()
778 {
779   struct GNUNET_BIO_WriteHandle *wh;
780   char *fn;
781   struct IndexInfo *pos;  
782
783   if (GNUNET_OK !=
784       GNUNET_CONFIGURATION_get_value_filename (cfg,
785                                                "FS",
786                                                "INDEXDB",
787                                                &fn))
788     {
789       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
790                   _("Configuration option `%s' in section `%s' missing.\n"),
791                   "INDEXDB",
792                   "FS");
793       return;
794     }
795   wh = GNUNET_BIO_write_open (fn);
796   if (NULL == wh)
797     {
798       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
799                   _("Could not open `%s'.\n"),
800                   fn);
801       GNUNET_free (fn);
802       return;
803     }
804   pos = indexed_files;
805   while (pos != NULL)
806     {
807       if ( (GNUNET_OK !=
808             GNUNET_BIO_write (wh,
809                               &pos->file_id,
810                               sizeof (GNUNET_HashCode))) ||
811            (GNUNET_OK !=
812             GNUNET_BIO_write_string (wh,
813                                      pos->filename)) )
814         break;
815       pos = pos->next;
816     }
817   if (GNUNET_OK != 
818       GNUNET_BIO_write_close (wh))
819     {
820       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
821                   _("Error writing `%s'.\n"),
822                   fn);
823       GNUNET_free (fn);
824       return;
825     }
826   GNUNET_free (fn);
827 }
828
829
830 /**
831  * Read index information from disk.
832  */
833 static void
834 read_index_list ()
835 {
836   struct GNUNET_BIO_ReadHandle *rh;
837   char *fn;
838   struct IndexInfo *pos;  
839   char *fname;
840   GNUNET_HashCode hc;
841   size_t slen;
842   char *emsg;
843
844   if (GNUNET_OK !=
845       GNUNET_CONFIGURATION_get_value_filename (cfg,
846                                                "FS",
847                                                "INDEXDB",
848                                                &fn))
849     {
850       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
851                   _("Configuration option `%s' in section `%s' missing.\n"),
852                   "INDEXDB",
853                   "FS");
854       return;
855     }
856   rh = GNUNET_BIO_read_open (fn);
857   if (NULL == rh)
858     {
859       GNUNET_log (GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
860                   _("Could not open `%s'.\n"),
861                   fn);
862       GNUNET_free (fn);
863       return;
864     }
865
866   while ( (GNUNET_OK ==
867            GNUNET_BIO_read (rh,
868                             "Hash of indexed file",
869                             &hc,
870                             sizeof (GNUNET_HashCode))) &&
871           (GNUNET_OK ==
872            GNUNET_BIO_read_string (rh, 
873                                    "Name of indexed file",
874                                    &fname,
875                                    1024 * 16)) )
876     {
877       slen = strlen (fname) + 1;
878       pos = GNUNET_malloc (sizeof (struct IndexInfo) + slen);
879       pos->file_id = hc;
880       pos->filename = (const char *) &pos[1];
881       memcpy (&pos[1], fname, slen);
882       if (GNUNET_SYSERR ==
883           GNUNET_CONTAINER_multihashmap_put (ifm,
884                                              &hc,
885                                              (void*) pos->filename,
886                                              GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
887         {
888           GNUNET_free (pos);
889         }
890       else
891         {
892           pos->next = indexed_files;
893           indexed_files = pos;
894         }
895     }
896   if (GNUNET_OK != 
897       GNUNET_BIO_read_close (rh, &emsg))
898     GNUNET_free (emsg);
899   GNUNET_free (fn);
900 }
901
902
903 /**
904  * We've validated the hash of the file we're about to
905  * index.  Signal success to the client and update
906  * our internal data structures.
907  *
908  * @param ii the index info entry for the request
909  */
910 static void
911 signal_index_ok (struct IndexInfo *ii)
912 {
913   if (GNUNET_SYSERR ==
914       GNUNET_CONTAINER_multihashmap_put (ifm,
915                                          &ii->file_id,
916                                          (void*) ii->filename,
917                                          GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
918     {
919       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
920                   _("Index request received for file `%s' is indexed as `%s'.  Permitting anyway.\n"),
921                   ii->filename,
922                   (const char*) GNUNET_CONTAINER_multihashmap_get (ifm,
923                                                                    &ii->file_id));
924       GNUNET_SERVER_transmit_context_append (ii->tc,
925                                              NULL, 0,
926                                              GNUNET_MESSAGE_TYPE_FS_INDEX_START_OK);
927       GNUNET_SERVER_transmit_context_run (ii->tc,
928                                           GNUNET_TIME_UNIT_MINUTES);
929       GNUNET_free (ii);
930       return;
931     }
932   ii->next = indexed_files;
933   indexed_files = ii;
934   write_index_list ();
935   GNUNET_SERVER_transmit_context_append (ii->tc,
936                                          NULL, 0,
937                                          GNUNET_MESSAGE_TYPE_FS_INDEX_START_OK);
938   GNUNET_SERVER_transmit_context_run (ii->tc,
939                                       GNUNET_TIME_UNIT_MINUTES);
940   ii->tc = NULL;
941 }
942
943
944 /**
945  * Function called once the hash computation over an
946  * indexed file has completed.
947  *
948  * @param cls closure, our publishing context
949  * @param res resulting hash, NULL on error
950  */
951 static void 
952 hash_for_index_val (void *cls,
953                     const GNUNET_HashCode *
954                     res)
955 {
956   struct IndexInfo *ii = cls;
957   
958   if ( (res == NULL) ||
959        (0 != memcmp (res,
960                      &ii->file_id,
961                      sizeof(GNUNET_HashCode))) )
962     {
963       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
964                   _("Hash mismatch trying to index file `%s'\n"),
965                   ii->filename);
966       GNUNET_SERVER_transmit_context_append (ii->tc,
967                                              NULL, 0,
968                                              GNUNET_MESSAGE_TYPE_FS_INDEX_START_FAILED);
969       GNUNET_SERVER_transmit_context_run (ii->tc,
970                                           GNUNET_TIME_UNIT_MINUTES);
971       GNUNET_free (ii);
972       return;
973     }
974   signal_index_ok (ii);
975 }
976
977
978 /**
979  * Handle INDEX_START-message.
980  *
981  * @param cls closure
982  * @param client identification of the client
983  * @param message the actual message
984  */
985 static void
986 handle_index_start (void *cls,
987                     struct GNUNET_SERVER_Client *client,
988                     const struct GNUNET_MessageHeader *message)
989 {
990   const struct IndexStartMessage *ism;
991   const char *fn;
992   uint16_t msize;
993   struct IndexInfo *ii;
994   size_t slen;
995   uint32_t dev;
996   uint64_t ino;
997   uint32_t mydev;
998   uint64_t myino;
999
1000   msize = ntohs(message->size);
1001   if ( (msize <= sizeof (struct IndexStartMessage)) ||
1002        ( ((const char *)message)[msize-1] != '\0') )
1003     {
1004       GNUNET_break (0);
1005       GNUNET_SERVER_receive_done (client,
1006                                   GNUNET_SYSERR);
1007       return;
1008     }
1009   ism = (const struct IndexStartMessage*) message;
1010   fn = (const char*) &ism[1];
1011   dev = ntohl (ism->device);
1012   ino = GNUNET_ntohll (ism->inode);
1013   ism = (const struct IndexStartMessage*) message;
1014   slen = strlen (fn) + 1;
1015   ii = GNUNET_malloc (sizeof (struct IndexInfo) + slen);
1016   ii->filename = (const char*) &ii[1];
1017   memcpy (&ii[1], fn, slen);
1018   ii->file_id = ism->file_id;  
1019   ii->tc = GNUNET_SERVER_transmit_context_create (client);
1020   if ( ( (dev != 0) ||
1021          (ino != 0) ) &&
1022        (GNUNET_OK == GNUNET_DISK_file_get_identifiers (fn,
1023                                                        &mydev,
1024                                                        &myino)) &&
1025        ( (dev == mydev) &&
1026          (ino == myino) ) )
1027     {      
1028       /* fast validation OK! */
1029       signal_index_ok (ii);
1030       return;
1031     }
1032   /* slow validation, need to hash full file (again) */
1033   GNUNET_CRYPTO_hash_file (sched,
1034                            GNUNET_SCHEDULER_PRIORITY_IDLE,
1035                            GNUNET_NO,
1036                            fn,
1037                            HASHING_BLOCKSIZE,
1038                            &hash_for_index_val,
1039                            ii);
1040 }
1041
1042
1043 /**
1044  * Handle INDEX_LIST_GET-message.
1045  *
1046  * @param cls closure
1047  * @param client identification of the client
1048  * @param message the actual message
1049  */
1050 static void
1051 handle_index_list_get (void *cls,
1052                        struct GNUNET_SERVER_Client *client,
1053                        const struct GNUNET_MessageHeader *message)
1054 {
1055   struct GNUNET_SERVER_TransmitContext *tc;
1056   struct IndexInfoMessage *iim;
1057   char buf[GNUNET_SERVER_MAX_MESSAGE_SIZE];
1058   size_t slen;
1059   const char *fn;
1060   struct GNUNET_MessageHeader *msg;
1061   struct IndexInfo *pos;
1062
1063   tc = GNUNET_SERVER_transmit_context_create (client);
1064   iim = (struct IndexInfoMessage*) buf;
1065   msg = &iim->header;
1066   pos = indexed_files;
1067   while (NULL != pos)
1068     {
1069       iim->reserved = 0;
1070       iim->file_id = pos->file_id;
1071       fn = pos->filename;
1072       slen = strlen (fn) + 1;
1073       if (slen + sizeof (struct IndexInfoMessage) > 
1074           GNUNET_SERVER_MAX_MESSAGE_SIZE)
1075         {
1076           GNUNET_break (0);
1077           break;
1078         }
1079       memcpy (&iim[1], fn, slen);
1080       GNUNET_SERVER_transmit_context_append
1081         (tc,
1082          &msg[1],
1083          sizeof (struct IndexInfoMessage) 
1084          - sizeof (struct GNUNET_MessageHeader) + slen,
1085          GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_ENTRY);
1086       pos = pos->next;
1087     }
1088   GNUNET_SERVER_transmit_context_append (tc,
1089                                          NULL, 0,
1090                                          GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_END);
1091   GNUNET_SERVER_transmit_context_run (tc,
1092                                       GNUNET_TIME_UNIT_MINUTES);
1093 }
1094
1095
1096 /**
1097  * Handle UNINDEX-message.
1098  *
1099  * @param cls closure
1100  * @param client identification of the client
1101  * @param message the actual message
1102  */
1103 static void
1104 handle_unindex (void *cls,
1105                 struct GNUNET_SERVER_Client *client,
1106                 const struct GNUNET_MessageHeader *message)
1107 {
1108   const struct UnindexMessage *um;
1109   struct IndexInfo *pos;
1110   struct IndexInfo *prev;
1111   struct IndexInfo *next;
1112   struct GNUNET_SERVER_TransmitContext *tc;
1113   int found;
1114   
1115   um = (const struct UnindexMessage*) message;
1116   found = GNUNET_NO;
1117   prev = NULL;
1118   pos = indexed_files;
1119   while (NULL != pos)
1120     {
1121       next = pos->next;
1122       if (0 == memcmp (&pos->file_id,
1123                        &um->file_id,
1124                        sizeof (GNUNET_HashCode)))
1125         {
1126           if (prev == NULL)
1127             indexed_files = pos->next;
1128           else
1129             prev->next = pos->next;
1130           GNUNET_free (pos);
1131           found = GNUNET_YES;
1132         }
1133       else
1134         {
1135           prev = pos;
1136         }
1137       pos = next;
1138     }
1139   if (GNUNET_YES == found)
1140     write_index_list ();
1141   tc = GNUNET_SERVER_transmit_context_create (client);
1142   GNUNET_SERVER_transmit_context_append (tc,
1143                                          NULL, 0,
1144                                          GNUNET_MESSAGE_TYPE_FS_UNINDEX_OK);
1145   GNUNET_SERVER_transmit_context_run (tc,
1146                                       GNUNET_TIME_UNIT_MINUTES);
1147 }
1148
1149
1150 /**
1151  * Run the next DS request in our
1152  * queue, we're done with the current one.
1153  */
1154 static void
1155 next_ds_request ()
1156 {
1157   struct DatastoreRequestQueue *e;
1158   
1159   while (NULL != (e = drq_head))
1160     {
1161       if (0 != GNUNET_TIME_absolute_get_remaining (e->timeout).value)
1162         break;
1163       if (e->task != GNUNET_SCHEDULER_NO_TASK)
1164         GNUNET_SCHEDULER_cancel (sched, e->task);
1165       GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
1166       e->req (e->req_cls, GNUNET_NO);
1167       GNUNET_free (e);  
1168     }
1169   if (e == NULL)
1170     return;
1171   if (e->task != GNUNET_SCHEDULER_NO_TASK)
1172     GNUNET_SCHEDULER_cancel (sched, e->task);
1173   e->task = GNUNET_SCHEDULER_NO_TASK;
1174   e->req (e->req_cls, GNUNET_YES);
1175   GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
1176   GNUNET_free (e);  
1177 }
1178
1179
1180 /**
1181  * A datastore request had to be timed out. 
1182  *
1183  * @param cls closure (of type "struct DatastoreRequestQueue*")
1184  * @param tc task context, unused
1185  */
1186 static void
1187 timeout_ds_request (void *cls,
1188                     const struct GNUNET_SCHEDULER_TaskContext *tc)
1189 {
1190   struct DatastoreRequestQueue *e = cls;
1191
1192   e->task = GNUNET_SCHEDULER_NO_TASK;
1193   GNUNET_CONTAINER_DLL_remove (drq_head, drq_tail, e);
1194   e->req (e->req_cls, GNUNET_NO);
1195   GNUNET_free (e);  
1196 }
1197
1198
1199 /**
1200  * Queue a request for the datastore.
1201  *
1202  * @param deadline by when the request should run
1203  * @param fun function to call once the request can be run
1204  * @param fun_cls closure for fun
1205  */
1206 static struct DatastoreRequestQueue *
1207 queue_ds_request (struct GNUNET_TIME_Relative deadline,
1208                   RequestFunction fun,
1209                   void *fun_cls)
1210 {
1211   struct DatastoreRequestQueue *e;
1212   struct DatastoreRequestQueue *bef;
1213
1214   if (drq_head == NULL)
1215     {
1216       /* no other requests pending, run immediately */
1217       fun (fun_cls, GNUNET_OK);
1218       return NULL;
1219     }
1220   e = GNUNET_malloc (sizeof (struct DatastoreRequestQueue));
1221   e->timeout = GNUNET_TIME_relative_to_absolute (deadline);
1222   e->req = fun;
1223   e->req_cls = fun_cls;
1224   if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
1225     {
1226       /* local request, highest prio, put at head of queue
1227          regardless of deadline */
1228       bef = NULL;
1229     }
1230   else
1231     {
1232       bef = drq_tail;
1233       while ( (NULL != bef) &&
1234               (e->timeout.value < bef->timeout.value) )
1235         bef = bef->prev;
1236     }
1237   GNUNET_CONTAINER_DLL_insert_after (drq_head, drq_tail, bef, e);
1238   if (deadline.value == GNUNET_TIME_UNIT_FOREVER_REL.value)
1239     return e;
1240   e->task = GNUNET_SCHEDULER_add_delayed (sched,
1241                                           GNUNET_NO,
1242                                           GNUNET_SCHEDULER_PRIORITY_BACKGROUND,
1243                                           GNUNET_SCHEDULER_NO_TASK,
1244                                           deadline,
1245                                           &timeout_ds_request,
1246                                           e);
1247   return e;                                    
1248 }
1249
1250
1251 /**
1252  * Free the state associated with a local get context.
1253  *
1254  * @param lgc the lgc to free
1255  */
1256 static void
1257 local_get_context_free (struct LocalGetContext *lgc) 
1258 {
1259   GNUNET_CONTAINER_DLL_remove (lgc_head, lgc_tail, lgc);
1260   GNUNET_SERVER_client_drop (lgc->client); 
1261   GNUNET_free_non_null (lgc->results);
1262   if (lgc->results_bf != NULL)
1263     GNUNET_CONTAINER_bloomfilter_free (lgc->results_bf);
1264   if (lgc->req != NULL)
1265     {
1266       if (lgc->req->task != GNUNET_SCHEDULER_NO_TASK)
1267         GNUNET_SCHEDULER_cancel (sched, lgc->req->task);
1268       GNUNET_CONTAINER_DLL_remove (lgc_head, lgc_tail, lgc);
1269       GNUNET_free (lgc->req);
1270     }
1271   GNUNET_free (lgc);
1272 }
1273
1274
1275 /**
1276  * We're able to transmit the next (local) result to the client.
1277  * Do it and ask the datastore for more.  Or, on error, tell
1278  * the datastore to stop giving us more.
1279  *
1280  * @param cls our closure (struct LocalGetContext)
1281  * @param max maximum number of bytes we can transmit
1282  * @param buf where to copy our message
1283  * @return number of bytes copied to buf
1284  */
1285 static size_t
1286 transmit_local_result (void *cls,
1287                        size_t max,
1288                        void *buf)
1289 {
1290   struct LocalGetContext *lgc = cls;  
1291   uint16_t msize;
1292
1293   if (NULL == buf)
1294     {
1295       /* error, abort! */
1296       GNUNET_free (lgc->result);
1297       lgc->result = NULL;
1298       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
1299       return 0;
1300     }
1301   msize = ntohs (lgc->result->header.size);
1302   GNUNET_assert (max >= msize);
1303   memcpy (buf, lgc->result, msize);
1304   GNUNET_free (lgc->result);
1305   lgc->result = NULL;
1306   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1307   return msize;
1308 }
1309
1310
1311 /**
1312  * Continuation called from datastore's remove
1313  * function.
1314  *
1315  * @param cls unused
1316  * @param success did the deletion work?
1317  * @param msg error message
1318  */
1319 static void
1320 remove_cont (void *cls,
1321              int success,
1322              const char *msg)
1323 {
1324   if (GNUNET_OK != success)
1325     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1326                 _("Failed to delete bogus block: %s\n"),
1327                 msg);
1328   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1329 }
1330
1331
1332 /**
1333  * Mingle hash with the mingle_number to
1334  * produce different bits.
1335  */
1336 static void
1337 mingle_hash (const GNUNET_HashCode * in,
1338              int32_t mingle_number, 
1339              GNUNET_HashCode * hc)
1340 {
1341   GNUNET_HashCode m;
1342
1343   GNUNET_CRYPTO_hash (&mingle_number, 
1344                       sizeof (int32_t), 
1345                       &m);
1346   GNUNET_CRYPTO_hash_xor (&m, in, hc);
1347 }
1348
1349
1350 /**
1351  * We've received an on-demand encoded block
1352  * from the datastore.  Attempt to do on-demand
1353  * encoding and (if successful), call the 
1354  * continuation with the resulting block.  On
1355  * error, clean up and ask the datastore for
1356  * more results.
1357  *
1358  * @param key key for the content
1359  * @param size number of bytes in data
1360  * @param data content stored
1361  * @param type type of the content
1362  * @param priority priority of the content
1363  * @param anonymity anonymity-level for the content
1364  * @param expiration expiration time for the content
1365  * @param uid unique identifier for the datum;
1366  *        maybe 0 if no unique identifier is available
1367  * @param cont function to call with the actual block
1368  * @param cont_cls closure for cont
1369  */
1370 static void
1371 handle_on_demand_block (const GNUNET_HashCode * key,
1372                         uint32_t size,
1373                         const void *data,
1374                         uint32_t type,
1375                         uint32_t priority,
1376                         uint32_t anonymity,
1377                         struct GNUNET_TIME_Absolute
1378                         expiration, uint64_t uid,
1379                         GNUNET_DATASTORE_Iterator cont,
1380                         void *cont_cls)
1381 {
1382   const struct OnDemandBlock *odb;
1383   GNUNET_HashCode nkey;
1384   struct GNUNET_CRYPTO_AesSessionKey skey;
1385   struct GNUNET_CRYPTO_AesInitializationVector iv;
1386   GNUNET_HashCode query;
1387   ssize_t nsize;
1388   char ndata[DBLOCK_SIZE];
1389   char edata[DBLOCK_SIZE];
1390   const char *fn;
1391   struct GNUNET_DISK_FileHandle *fh;
1392   uint64_t off;
1393
1394   if (size != sizeof (struct OnDemandBlock))
1395     {
1396       GNUNET_break (0);
1397       GNUNET_DATASTORE_remove (dsh, 
1398                                key,
1399                                size,
1400                                data,
1401                                &remove_cont,
1402                                NULL,
1403                                GNUNET_TIME_UNIT_FOREVER_REL);     
1404       return;
1405     }
1406   odb = (const struct OnDemandBlock*) data;
1407   off = GNUNET_ntohll (odb->offset);
1408   fn = (const char*) GNUNET_CONTAINER_multihashmap_get (ifm,
1409                                                         &odb->file_id);
1410   fh = NULL;
1411   if ( (NULL == fn) ||
1412        (NULL == (fh = GNUNET_DISK_file_open (fn, 
1413                                              GNUNET_DISK_OPEN_READ))) ||
1414        (off !=
1415         GNUNET_DISK_file_seek (fh,
1416                                off,
1417                                GNUNET_DISK_SEEK_SET)) ||
1418        (-1 ==
1419         (nsize = GNUNET_DISK_file_read (fh,
1420                                         ndata,
1421                                         sizeof (ndata)))) )
1422     {
1423       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1424                   _("Could not access indexed file `%s' at offset %llu: %s\n"),
1425                   GNUNET_h2s (&odb->file_id),
1426                   (unsigned long long) off,
1427                   STRERROR (errno));
1428       if (fh != NULL)
1429         GNUNET_DISK_file_close (fh);
1430       /* FIXME: if this happens often, we need
1431          to remove the OnDemand block from the DS! */
1432       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);        
1433       return;
1434     }
1435   GNUNET_DISK_file_close (fh);
1436   GNUNET_CRYPTO_hash (ndata,
1437                       nsize,
1438                       &nkey);
1439   GNUNET_CRYPTO_hash_to_aes_key (&nkey, &skey, &iv);
1440   GNUNET_CRYPTO_aes_encrypt (ndata,
1441                              nsize,
1442                              &skey,
1443                              &iv,
1444                              edata);
1445   GNUNET_CRYPTO_hash (edata,
1446                       nsize,
1447                       &query);
1448   if (0 != memcmp (&query, 
1449                    key,
1450                    sizeof (GNUNET_HashCode)))
1451     {
1452       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1453                   _("Indexed file `%s' changed at offset %llu\n"),
1454                   fn,
1455                   (unsigned long long) off);
1456       /* FIXME: if this happens often, we need
1457          to remove the OnDemand block from the DS! */
1458       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1459       return;
1460     }
1461   cont (cont_cls,
1462         key,
1463         nsize,
1464         edata,
1465         GNUNET_DATASTORE_BLOCKTYPE_DBLOCK,
1466         priority,
1467         anonymity,
1468         expiration,
1469         uid);
1470 }
1471
1472
1473 /**
1474  * How many bytes should a bloomfilter be if we have already seen
1475  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
1476  * of bits set per entry.  Furthermore, we should not re-size the
1477  * filter too often (to keep it cheap).
1478  *
1479  * Since other peers will also add entries but not resize the filter,
1480  * we should generally pick a slightly larger size than what the
1481  * strict math would suggest.
1482  *
1483  * @return must be a power of two and smaller or equal to 2^15.
1484  */
1485 static unsigned int
1486 compute_bloomfilter_size (unsigned int entry_count)
1487 {
1488   unsigned int size;
1489   unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
1490   uint16_t max = 1 << 15;
1491
1492   if (entry_count > max)
1493     return max;
1494   size = 8;
1495   while ((size < max) && (size < ideal))
1496     size *= 2;
1497   if (size > max)
1498     return max;
1499   return size;
1500 }
1501
1502
1503 /**
1504  * Recalculate our bloom filter for filtering replies.
1505  *
1506  * @param count number of entries we are filtering right now
1507  * @param mingle set to our new mingling value
1508  * @param entries the entries to filter
1509  * @return updated bloomfilter, NULL for none
1510  */
1511 static struct GNUNET_CONTAINER_BloomFilter *
1512 refresh_bloomfilter (unsigned int count,
1513                      int32_t * mingle,
1514                      const GNUNET_HashCode *entries)
1515 {
1516   struct GNUNET_CONTAINER_BloomFilter *bf;
1517   unsigned int nsize;
1518   unsigned int i;
1519   GNUNET_HashCode mhash;
1520
1521   if (0 == count)
1522     return NULL;
1523   nsize = compute_bloomfilter_size (count);
1524   *mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
1525   bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
1526                                           nsize,
1527                                           BLOOMFILTER_K);
1528   for (i=0;i<count;i++)
1529     {
1530       mingle_hash (&entries[i], *mingle, &mhash);
1531       GNUNET_CONTAINER_bloomfilter_add (bf, &mhash);
1532     }
1533   return bf;
1534 }
1535
1536
1537 /**
1538  * Closure used for "target_peer_select_cb".
1539  */
1540 struct PeerSelectionContext 
1541 {
1542   /**
1543    * The request for which we are selecting
1544    * peers.
1545    */
1546   struct PendingRequest *pr;
1547
1548   /**
1549    * Current "prime" target.
1550    */
1551   struct GNUNET_PeerIdentity target;
1552
1553   /**
1554    * How much do we like this target?
1555    */
1556   double target_score;
1557
1558 };
1559
1560
1561 /**
1562  * Function called for each connected peer to determine
1563  * which one(s) would make good targets for forwarding.
1564  *
1565  * @param cls closure (struct PeerSelectionContext)
1566  * @param key current key code (peer identity)
1567  * @param value value in the hash map (struct ConnectedPeer)
1568  * @return GNUNET_YES if we should continue to
1569  *         iterate,
1570  *         GNUNET_NO if not.
1571  */
1572 static int
1573 target_peer_select_cb (void *cls,
1574                        const GNUNET_HashCode * key,
1575                        void *value)
1576 {
1577   struct PeerSelectionContext *psc = cls;
1578   // struct ConnectedPeer *cp = value;
1579   double score;
1580   // FIXME (CRITICAL: would  always sent to same peer without this!)
1581   // 1) check if we have already (recently) forwarded to this peer, if so, skip
1582   // 2) calculate how much we'd like to forward to this peer
1583   score = 0;
1584   
1585   // 3) store in closure
1586   if (score > psc->target_score)
1587     {
1588       psc->target_score = score;
1589       psc->target.hashPubKey = *key; 
1590     }
1591   return GNUNET_YES;
1592 }
1593
1594
1595
1596
1597 /**
1598  * We use a random delay to make the timing of requests
1599  * less predictable.  This function returns such a random
1600  * delay.
1601  *
1602  * @return random delay to use for some request, between 0 and TTL_DECREMENT ms
1603  */
1604 static struct GNUNET_TIME_Relative
1605 get_processing_delay ()
1606 {
1607   return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
1608                                         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1609                                                                   TTL_DECREMENT));
1610 }
1611
1612
1613 /**
1614  * Task that is run for each request with the
1615  * goal of forwarding the associated query to
1616  * other peers.  The task should re-schedule
1617  * itself to be re-run once the TTL has expired.
1618  * (or at a later time if more peers should
1619  * be queried earlier).
1620  *
1621  * @param cls the requests "struct PendingRequest*"
1622  * @param tc task context (unused)
1623  */
1624 static void
1625 forward_request_task (void *cls,
1626                       const struct GNUNET_SCHEDULER_TaskContext *tc);
1627
1628
1629 /**
1630  * We've selected a peer for forwarding of a query.
1631  * Construct the message and then re-schedule the
1632  * task to forward again to (other) peers.
1633  *
1634  * @param cls closure
1635  * @param size number of bytes available in buf
1636  * @param buf where the callee should write the message
1637  * @return number of bytes written to buf
1638  */
1639 static size_t
1640 transmit_request_cb (void *cls,
1641                      size_t size, 
1642                      void *buf)
1643 {
1644   struct PendingRequest *pr = cls;
1645   uint16_t msize;
1646
1647   pr->cth = NULL;
1648   /* (1) check for timeout */
1649   if (NULL == buf)
1650     {
1651       /* timeout, try another peer immediately again */
1652       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1653                                                GNUNET_NO,
1654                                                GNUNET_SCHEDULER_PRIORITY_IDLE,
1655                                                GNUNET_SCHEDULER_NO_TASK,
1656                                                GNUNET_TIME_UNIT_ZERO,
1657                                                &forward_request_task,
1658                                                pr);
1659       return 0;
1660     }
1661   /* (2) build query message */
1662   msize = 0;
1663   // CRITICAL-FIXME! (nothing goes without this!)
1664   /* (3) schedule job to do it again (or another peer, etc.) */
1665   pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1666                                            GNUNET_NO,
1667                                            GNUNET_SCHEDULER_PRIORITY_IDLE,
1668                                            GNUNET_SCHEDULER_NO_TASK,
1669                                            get_processing_delay (), // FIXME!
1670                                            &forward_request_task,
1671                                            pr);
1672
1673   return msize;
1674 }
1675
1676
1677 /**
1678  * Function called after we've tried to reserve
1679  * a certain amount of bandwidth for a reply.
1680  * Check if we succeeded and if so send our query.
1681  *
1682  * @param cls the requests "struct PendingRequest*"
1683  * @param peer identifies the peer
1684  * @param latency current latency estimate, "FOREVER" if we have been
1685  *                disconnected
1686  * @param bpm_in set to the current bandwidth limit (receiving) for this peer
1687  * @param bpm_out set to the current bandwidth limit (sending) for this peer
1688  * @param amount set to the amount that was actually reserved or unreserved
1689  * @param preference current traffic preference for the given peer
1690  */
1691 static void
1692 target_reservation_cb (void *cls,
1693                        const struct
1694                        GNUNET_PeerIdentity * peer,
1695                        unsigned int bpm_in,
1696                        unsigned int bpm_out,
1697                        struct GNUNET_TIME_Relative
1698                        latency, int amount,
1699                        unsigned long long preference)
1700 {
1701   struct PendingRequest *pr = cls;
1702   uint32_t priority;
1703   uint16_t size;
1704   struct GNUNET_TIME_Relative maxdelay;
1705
1706   GNUNET_assert (peer != NULL);
1707   if ( (amount != DBLOCK_SIZE) ||
1708        (pr->cth != NULL) )
1709     {
1710       /* try again later; FIXME: we may need to un-reserve "amount"? */
1711       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1712                                                GNUNET_NO,
1713                                                GNUNET_SCHEDULER_PRIORITY_IDLE,
1714                                                GNUNET_SCHEDULER_NO_TASK,
1715                                                get_processing_delay (), // FIXME: longer?
1716                                                &forward_request_task,
1717                                                pr);
1718       return;
1719     }
1720   // (2) transmit, update ttl/priority
1721   // FIXME: calculate priority, maxdelay, size properly!
1722   priority = 0;
1723   size = 60000;
1724   maxdelay = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
1725   pr->cth = GNUNET_CORE_notify_transmit_ready (core,
1726                                                priority,
1727                                                maxdelay,
1728                                                peer,
1729                                                size,
1730                                                &transmit_request_cb,
1731                                                pr);
1732   if (pr->cth == NULL)
1733     {
1734       /* try again later */
1735       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1736                                                GNUNET_NO,
1737                                                GNUNET_SCHEDULER_PRIORITY_IDLE,
1738                                                GNUNET_SCHEDULER_NO_TASK,
1739                                                get_processing_delay (), // FIXME: longer?
1740                                                &forward_request_task,
1741                                                pr);
1742     }
1743 }
1744
1745
1746 /**
1747  * Task that is run for each request with the
1748  * goal of forwarding the associated query to
1749  * other peers.  The task should re-schedule
1750  * itself to be re-run once the TTL has expired.
1751  * (or at a later time if more peers should
1752  * be queried earlier).
1753  *
1754  * @param cls the requests "struct PendingRequest*"
1755  * @param tc task context (unused)
1756  */
1757 static void
1758 forward_request_task (void *cls,
1759                       const struct GNUNET_SCHEDULER_TaskContext *tc)
1760 {
1761   struct PendingRequest *pr = cls;
1762   struct PeerSelectionContext psc;
1763
1764   pr->task = GNUNET_SCHEDULER_NO_TASK;
1765   if (pr->cth != NULL) 
1766     {
1767       /* we're busy transmitting a result, wait a bit */
1768       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1769                                                GNUNET_NO,
1770                                                GNUNET_SCHEDULER_PRIORITY_IDLE,
1771                                                GNUNET_SCHEDULER_NO_TASK,
1772                                                get_processing_delay (), 
1773                                                &forward_request_task,
1774                                                pr);
1775       return;
1776     }
1777   /* (1) select target */
1778   psc.pr = pr;
1779   psc.target_score = MINDOUBLE;
1780   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1781                                          &target_peer_select_cb,
1782                                          &psc);
1783   if (psc.target_score == MINDOUBLE)
1784     {
1785       /* no possible target found, wait some time */
1786       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1787                                                GNUNET_NO,
1788                                                GNUNET_SCHEDULER_PRIORITY_IDLE,
1789                                                GNUNET_SCHEDULER_NO_TASK,
1790                                                get_processing_delay (), // FIXME: exponential back-off? or at least wait longer...
1791                                                &forward_request_task,
1792                                                pr);
1793       return;
1794     }
1795   /* (2) reserve reply bandwidth */
1796   // FIXME: need a way to cancel; this
1797   // async operation is problematic (segv-problematic)
1798   // if "pr" is destroyed while it happens!
1799   GNUNET_CORE_peer_configure (core,
1800                               &psc.target,
1801                               GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
1802                               -1,
1803                               DBLOCK_SIZE, // FIXME: make dependent on type?
1804                               0,
1805                               &target_reservation_cb,
1806                               pr);
1807 }
1808
1809
1810 /**
1811  * We're processing (local) results for a search request
1812  * from a (local) client.  Pass applicable results to the
1813  * client and if we are done either clean up (operation
1814  * complete) or switch to P2P search (more results possible).
1815  *
1816  * @param cls our closure (struct LocalGetContext)
1817  * @param key key for the content
1818  * @param size number of bytes in data
1819  * @param data content stored
1820  * @param type type of the content
1821  * @param priority priority of the content
1822  * @param anonymity anonymity-level for the content
1823  * @param expiration expiration time for the content
1824  * @param uid unique identifier for the datum;
1825  *        maybe 0 if no unique identifier is available
1826  */
1827 static void
1828 process_local_get_result (void *cls,
1829                           const GNUNET_HashCode * key,
1830                           uint32_t size,
1831                           const void *data,
1832                           uint32_t type,
1833                           uint32_t priority,
1834                           uint32_t anonymity,
1835                           struct GNUNET_TIME_Absolute
1836                           expiration, 
1837                           uint64_t uid)
1838 {
1839   struct LocalGetContext *lgc = cls;
1840   struct PendingRequest *pr;
1841   struct ClientRequestList *crl;
1842   struct ClientList *cl;
1843   size_t msize;
1844   unsigned int i;
1845
1846   if (key == NULL)
1847     {
1848       /* no further results from datastore; continue
1849          processing further requests from the client and
1850          allow the next task to use the datastore; also,
1851          switch to P2P requests or clean up our state. */
1852       next_ds_request ();
1853       GNUNET_SERVER_receive_done (lgc->client,
1854                                   GNUNET_OK);
1855       if ( (lgc->results_used == 0) ||
1856            (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
1857            (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
1858            (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) )
1859         {
1860           cl = clients;
1861           while ( (NULL != cl) &&
1862                   (cl->client != lgc->client) )
1863             cl = cl->next;
1864           if (cl == NULL)
1865             {
1866               cl = GNUNET_malloc (sizeof (struct ClientList));
1867               cl->client = lgc->client;
1868               cl->next = clients;
1869               clients = cl;
1870             }
1871           crl = GNUNET_malloc (sizeof (struct ClientRequestList));
1872           crl->cl = cl;
1873           GNUNET_CONTAINER_DLL_insert (cl->head, cl->tail, crl);
1874           pr = GNUNET_malloc (sizeof (struct PendingRequest));
1875           pr->client = lgc->client;
1876           GNUNET_SERVER_client_keep (pr->client);
1877           pr->crl_entry = crl;
1878           crl->req = pr;
1879           if (lgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)
1880             {
1881               pr->namespace = GNUNET_malloc (sizeof (GNUNET_HashCode));
1882               *pr->namespace = lgc->namespace;
1883             }
1884           pr->replies_seen = lgc->results;
1885           lgc->results = NULL;
1886           pr->start_time = GNUNET_TIME_absolute_get ();
1887           pr->query = lgc->query;
1888           pr->target_pid = GNUNET_PEER_intern (&lgc->target);
1889           pr->replies_seen_off = lgc->results_used;
1890           pr->replies_seen_size = lgc->results_size;
1891           lgc->results_size = 0;
1892           pr->type = lgc->type;
1893           pr->anonymity_level = lgc->anonymity_level;
1894           pr->bf = refresh_bloomfilter (pr->replies_seen_off,
1895                                         &pr->mingle,
1896                                         pr->replies_seen);
1897           GNUNET_CONTAINER_multihashmap_put (requests_by_query,
1898                                              &pr->query,
1899                                              pr,
1900                                              GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1901           pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1902                                                    GNUNET_NO,
1903                                                    GNUNET_SCHEDULER_PRIORITY_IDLE,
1904                                                    GNUNET_SCHEDULER_NO_TASK,
1905                                                    get_processing_delay (),
1906                                                    &forward_request_task,
1907                                                    pr);
1908           local_get_context_free (lgc);
1909           return;
1910         }
1911       /* got all possible results, clean up! */
1912       local_get_context_free (lgc);
1913       return;
1914     }
1915   if (type == GNUNET_DATASTORE_BLOCKTYPE_ONDEMAND)
1916     {
1917       handle_on_demand_block (key, size, data, type, priority, 
1918                               anonymity, expiration, uid,
1919                               &process_local_get_result,
1920                               lgc);
1921       return;
1922     }
1923   if (type != lgc->type)
1924     {
1925       /* this should be virtually impossible to reach (DBLOCK 
1926          query hash being identical to KBLOCK/SBLOCK query hash);
1927          nevertheless, if it happens, the correct thing is to
1928          simply skip the result. */
1929       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);        
1930       return;
1931     }
1932   /* check if this is a result we've alredy
1933      received */
1934   for (i=0;i<lgc->results_used;i++)
1935     if (0 == memcmp (key,
1936                      &lgc->results[i],
1937                      sizeof (GNUNET_HashCode)))
1938       {
1939         GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
1940         return; 
1941       }
1942   if (lgc->results_used == lgc->results_size)
1943     GNUNET_array_grow (lgc->results,
1944                        lgc->results_size,
1945                        lgc->results_size * 2 + 2);
1946   GNUNET_CRYPTO_hash (data, 
1947                       size, 
1948                       &lgc->results[lgc->results_used++]);    
1949   msize = size + sizeof (struct ContentMessage);
1950   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1951   lgc->result = GNUNET_malloc (msize);
1952   lgc->result->header.size = htons (msize);
1953   lgc->result->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT);
1954   lgc->result->type = htonl (type);
1955   lgc->result->expiration = GNUNET_TIME_absolute_hton (expiration);
1956   memcpy (&lgc->result[1],
1957           data,
1958           size);
1959   GNUNET_SERVER_notify_transmit_ready (lgc->client,
1960                                        msize,
1961                                        GNUNET_TIME_UNIT_FOREVER_REL,
1962                                        &transmit_local_result,
1963                                        lgc);
1964 }
1965
1966
1967 /**
1968  * We're processing a search request from a local
1969  * client.  Now it is our turn to query the datastore.
1970  * 
1971  * @param cls our closure (struct LocalGetContext)
1972  * @param tc unused
1973  */
1974 static void
1975 transmit_local_get (void *cls,
1976                     const struct GNUNET_SCHEDULER_TaskContext *tc)
1977 {
1978   struct LocalGetContext *lgc = cls;
1979   uint32_t type;
1980   
1981   type = lgc->type;
1982   if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK)
1983     type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */
1984   GNUNET_DATASTORE_get (dsh,
1985                         &lgc->query,
1986                         type,
1987                         &process_local_get_result,
1988                         lgc,
1989                         GNUNET_TIME_UNIT_FOREVER_REL);
1990 }
1991
1992
1993 /**
1994  * We're processing a search request from a local
1995  * client.  Now it is our turn to query the datastore.
1996  * 
1997  * @param cls our closure (struct LocalGetContext)
1998  * @param ok did we succeed to queue for datastore access, should always be GNUNET_OK
1999  */
2000 static void 
2001 transmit_local_get_ready (void *cls,
2002                           int ok)
2003 {
2004   struct LocalGetContext *lgc = cls;
2005
2006   GNUNET_assert (GNUNET_OK == ok);
2007   GNUNET_SCHEDULER_add_continuation (sched,
2008                                      GNUNET_NO,
2009                                      &transmit_local_get,
2010                                      lgc,
2011                                      GNUNET_SCHEDULER_REASON_PREREQ_DONE);
2012 }
2013
2014
2015 /**
2016  * Handle START_SEARCH-message (search request from client).
2017  *
2018  * @param cls closure
2019  * @param client identification of the client
2020  * @param message the actual message
2021  */
2022 static void
2023 handle_start_search (void *cls,
2024                      struct GNUNET_SERVER_Client *client,
2025                      const struct GNUNET_MessageHeader *message)
2026 {
2027   const struct SearchMessage *sm;
2028   struct LocalGetContext *lgc;
2029   uint16_t msize;
2030   unsigned int sc;
2031   
2032   msize = ntohs (message->size);
2033   if ( (msize < sizeof (struct SearchMessage)) ||
2034        (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
2035     {
2036       GNUNET_break (0);
2037       GNUNET_SERVER_receive_done (client,
2038                                   GNUNET_SYSERR);
2039       return;
2040     }
2041   sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
2042   sm = (const struct SearchMessage*) message;
2043   GNUNET_SERVER_client_keep (client);
2044   lgc = GNUNET_malloc (sizeof (struct LocalGetContext));
2045   if  (sc > 0)
2046     {
2047       lgc->results_used = sc;
2048       GNUNET_array_grow (lgc->results,
2049                          lgc->results_size,
2050                          sc * 2);
2051       memcpy (lgc->results,
2052               &sm[1],
2053               sc * sizeof (GNUNET_HashCode));
2054     }
2055   lgc->client = client;
2056   lgc->type = ntohl (sm->type);
2057   lgc->anonymity_level = ntohl (sm->anonymity_level);
2058   switch (lgc->type)
2059     {
2060     case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
2061     case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
2062       lgc->target.hashPubKey = sm->target;
2063       break;
2064     case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
2065       lgc->namespace = sm->target;
2066       break;
2067     default:
2068       break;
2069     }
2070   lgc->query = sm->query;
2071   GNUNET_CONTAINER_DLL_insert (lgc_head, lgc_tail, lgc);
2072   lgc->req = queue_ds_request (GNUNET_TIME_UNIT_FOREVER_REL,
2073                                &transmit_local_get_ready,
2074                                lgc);
2075 }
2076
2077
2078 /**
2079  * List of handlers for the messages understood by this
2080  * service.
2081  */
2082 static struct GNUNET_SERVER_MessageHandler handlers[] = {
2083   {&handle_index_start, NULL, 
2084    GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
2085   {&handle_index_list_get, NULL, 
2086    GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
2087   {&handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
2088    sizeof (struct UnindexMessage) },
2089   {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
2090    0 },
2091   {NULL, NULL, 0, 0}
2092 };
2093
2094
2095 /**
2096  * Clean up the memory used by the PendingRequest structure (except
2097  * for the client or peer list that the request may be part of).
2098  *
2099  * @param pr request to clean up
2100  */
2101 static void
2102 destroy_pending_request (struct PendingRequest *pr)
2103 {
2104   struct PendingReply *reply;
2105   struct ClientList *cl;
2106
2107   GNUNET_CONTAINER_multihashmap_remove (requests_by_query,
2108                                         &pr->query,
2109                                         pr);
2110   // FIXME: not sure how this can work (efficiently)
2111   // also, what does the return value mean?
2112   if (pr->client == NULL)
2113     {
2114       GNUNET_CONTAINER_heap_remove_node (requests_by_expiration,
2115                                          pr);
2116     }
2117   else
2118     {
2119       cl = pr->crl_entry->cl;
2120       GNUNET_CONTAINER_DLL_remove (cl->head,
2121                                    cl->tail,
2122                                    pr->crl_entry);
2123     }
2124   if (GNUNET_SCHEDULER_NO_TASK != pr->task)
2125     GNUNET_SCHEDULER_cancel (sched, pr->task);
2126   if (NULL != pr->cth)
2127     GNUNET_CORE_notify_transmit_ready_cancel (pr->cth);
2128   if (NULL != pr->bf)
2129     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
2130   if (NULL != pr->th)
2131     GNUNET_CONNECTION_notify_transmit_ready_cancel (pr->th);
2132   while (NULL != (reply = pr->replies_pending))
2133     {
2134       pr->replies_pending = reply->next;
2135       GNUNET_free (reply);
2136     }
2137   GNUNET_PEER_change_rc (pr->source_pid, -1);
2138   GNUNET_PEER_change_rc (pr->target_pid, -1);
2139   GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
2140   GNUNET_free_non_null (pr->used_pids);
2141   GNUNET_free_non_null (pr->replies_seen);
2142   GNUNET_free_non_null (pr->namespace);
2143   GNUNET_free (pr);
2144 }
2145
2146
2147 /**
2148  * A client disconnected.  Remove all of its pending queries.
2149  *
2150  * @param cls closure, NULL
2151  * @param client identification of the client
2152  */
2153 static void
2154 handle_client_disconnect (void *cls,
2155                           struct GNUNET_SERVER_Client
2156                           * client)
2157 {
2158   struct LocalGetContext *lgc;
2159   struct ClientList *cpos;
2160   struct ClientList *cprev;
2161   struct ClientRequestList *rl;
2162
2163   lgc = lgc_head;
2164   while ( (NULL != lgc) &&
2165           (lgc->client != client) )
2166     lgc = lgc->next;
2167   if (lgc != NULL)
2168     local_get_context_free (lgc);
2169   cprev = NULL;
2170   cpos = clients;
2171   while ( (NULL != cpos) &&
2172           (clients->client != client) )
2173     {
2174       cprev = cpos;
2175       cpos = cpos->next;
2176     }
2177   if (cpos != NULL)
2178     {
2179       if (cprev == NULL)
2180         clients = cpos->next;
2181       else
2182         cprev->next = cpos->next;
2183       while (NULL != (rl = cpos->head))
2184         {
2185           cpos->head = rl->next;
2186           destroy_pending_request (rl->req);
2187           GNUNET_free (rl);
2188         }
2189       GNUNET_free (cpos);
2190     }
2191 }
2192
2193
2194 /**
2195  * Iterator over entries in the "requests_by_query" map
2196  * that frees all the entries.
2197  *
2198  * @param cls closure, NULL
2199  * @param key current key code (the query, unused) 
2200  * @param value value in the hash map, of type "struct PendingRequest*"
2201  * @return GNUNET_YES (we should continue to  iterate)
2202  */
2203 static int 
2204 destroy_pending_request_cb (void *cls,
2205                             const GNUNET_HashCode * key,
2206                             void *value)
2207 {
2208   struct PendingRequest *pr = value;
2209
2210   destroy_pending_request (pr);
2211   return GNUNET_YES;
2212 }
2213
2214
2215 /**
2216  * Task run during shutdown.
2217  *
2218  * @param cls unused
2219  * @param tc unused
2220  */
2221 static void
2222 shutdown_task (void *cls,
2223                const struct GNUNET_SCHEDULER_TaskContext *tc)
2224 {
2225   struct IndexInfo *pos;  
2226
2227   if (NULL != core)
2228     GNUNET_CORE_disconnect (core);
2229   GNUNET_DATASTORE_disconnect (dsh,
2230                                GNUNET_NO);
2231   dsh = NULL;
2232   GNUNET_CONTAINER_multihashmap_iterate (requests_by_query,
2233                                          &destroy_pending_request_cb,
2234                                          NULL);
2235   while (clients != NULL)
2236     handle_client_disconnect (NULL,
2237                               clients->client);
2238   GNUNET_CONTAINER_multihashmap_destroy (requests_by_query);
2239   requests_by_query = NULL;
2240   GNUNET_CONTAINER_multihashmap_destroy (requests_by_peer);
2241   requests_by_peer = NULL;
2242   GNUNET_CONTAINER_heap_destroy (requests_by_expiration);
2243   requests_by_expiration = NULL;
2244   // FIXME: iterate over entries and free individually?
2245   // (or do we get disconnect notifications?)
2246   GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
2247   connected_peers = NULL;
2248   GNUNET_CONTAINER_multihashmap_destroy (ifm);
2249   ifm = NULL;
2250   while (NULL != (pos = indexed_files))
2251     {
2252       indexed_files = pos->next;
2253       GNUNET_free (pos);
2254     }
2255 }
2256
2257
2258 /**
2259  * Free (each) request made by the peer.
2260  *
2261  * @param cls closure, points to peer that the request belongs to
2262  * @param key current key code
2263  * @param value value in the hash map
2264  * @return GNUNET_YES (we should continue to iterate)
2265  */
2266 static int
2267 destroy_request (void *cls,
2268                  const GNUNET_HashCode * key,
2269                  void *value)
2270 {
2271   const struct GNUNET_PeerIdentity * peer = cls;
2272   struct PendingRequest *pr = value;
2273   
2274   GNUNET_CONTAINER_multihashmap_remove (requests_by_peer,
2275                                         &peer->hashPubKey,
2276                                         pr);
2277   destroy_pending_request (pr);
2278   return GNUNET_YES;
2279 }
2280
2281
2282
2283 /**
2284  * Method called whenever a given peer connects.
2285  *
2286  * @param cls closure, not used
2287  * @param peer peer identity this notification is about
2288  */
2289 static void 
2290 peer_connect_handler (void *cls,
2291                       const struct
2292                       GNUNET_PeerIdentity * peer)
2293 {
2294   struct ConnectedPeer *cp;
2295
2296   cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
2297   cp->pid = GNUNET_PEER_intern (peer);
2298   GNUNET_CONTAINER_multihashmap_put (connected_peers,
2299                                      &peer->hashPubKey,
2300                                      cp,
2301                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
2302 }
2303
2304
2305 /**
2306  * Method called whenever a peer disconnects.
2307  *
2308  * @param cls closure, not used
2309  * @param peer peer identity this notification is about
2310  */
2311 static void
2312 peer_disconnect_handler (void *cls,
2313                          const struct
2314                          GNUNET_PeerIdentity * peer)
2315 {
2316   struct ConnectedPeer *cp;
2317
2318   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2319                                           &peer->hashPubKey);
2320   GNUNET_PEER_change_rc (cp->pid, -1);
2321   GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
2322   GNUNET_free (cp);
2323   GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_peer,
2324                                               &peer->hashPubKey,
2325                                               &destroy_request,
2326                                               (void*) peer);
2327 }
2328
2329
2330 /**
2331  * We're processing a GET request from
2332  * another peer and have decided to forward
2333  * it to other peers.
2334  *
2335  * @param cls our "struct ProcessGetContext *"
2336  * @param tc unused
2337  */
2338 static void
2339 forward_get_request (void *cls,
2340                      const struct GNUNET_SCHEDULER_TaskContext *tc)
2341 {
2342   struct ProcessGetContext *pgc = cls;
2343   struct PendingRequest *pr;
2344   struct PendingRequest *eer;
2345   struct GNUNET_PeerIdentity target;
2346
2347   pr = GNUNET_malloc (sizeof (struct PendingRequest));
2348   if (GET_MESSAGE_BIT_SKS_NAMESPACE == (GET_MESSAGE_BIT_SKS_NAMESPACE & pgc->bm))
2349     {
2350       pr->namespace = GNUNET_malloc (sizeof(GNUNET_HashCode));
2351       *pr->namespace = pgc->namespace;
2352     }
2353   pr->bf = pgc->bf;
2354   pgc->bf = NULL;
2355   pr->start_time = pgc->start_time;
2356   pr->query = pgc->query;
2357   pr->source_pid = GNUNET_PEER_intern (&pgc->reply_to);
2358   if (GET_MESSAGE_BIT_TRANSMIT_TO == (GET_MESSAGE_BIT_TRANSMIT_TO & pgc->bm))
2359     pr->target_pid = GNUNET_PEER_intern (&pgc->prime_target);
2360   pr->anonymity_level = 1; /* default */
2361   pr->priority = pgc->priority;
2362   pr->remaining_priority = pr->priority;
2363   pr->mingle = pgc->mingle;
2364   pr->ttl = pgc->ttl; 
2365   pr->type = pgc->type;
2366   GNUNET_CONTAINER_multihashmap_put (requests_by_query,
2367                                      &pr->query,
2368                                      pr,
2369                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
2370   GNUNET_CONTAINER_multihashmap_put (requests_by_peer,
2371                                      &pgc->reply_to.hashPubKey,
2372                                      pr,
2373                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
2374   GNUNET_CONTAINER_heap_insert (requests_by_expiration,
2375                                 pr,
2376                                 pr->start_time.value + pr->ttl);
2377   if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) > max_pending_requests)
2378     {
2379       /* expire oldest request! */
2380       eer = GNUNET_CONTAINER_heap_peek (requests_by_expiration);
2381       GNUNET_PEER_resolve (eer->source_pid,
2382                            &target);    
2383       GNUNET_CONTAINER_multihashmap_remove (requests_by_peer,
2384                                             &target.hashPubKey,
2385                                             eer);
2386       destroy_pending_request (eer);     
2387     }
2388   pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2389                                            GNUNET_NO,
2390                                            GNUNET_SCHEDULER_PRIORITY_IDLE,
2391                                            GNUNET_SCHEDULER_NO_TASK,
2392                                            get_processing_delay (),
2393                                            &forward_request_task,
2394                                            pr);
2395   GNUNET_free (pgc); 
2396 }
2397 /**
2398  * Transmit the given message by copying it to
2399  * the target buffer "buf".  "buf" will be
2400  * NULL and "size" zero if the socket was closed for
2401  * writing in the meantime.  In that case, only
2402
2403  * free the message
2404  *
2405  * @param cls closure, pointer to the message
2406  * @param size number of bytes available in buf
2407  * @param buf where the callee should write the message
2408  * @return number of bytes written to buf
2409  */
2410 static size_t
2411 transmit_message (void *cls,
2412                   size_t size, void *buf)
2413 {
2414   struct GNUNET_MessageHeader *msg = cls;
2415   uint16_t msize;
2416   
2417   if (NULL == buf)
2418     {
2419 #if DEBUG_FS
2420       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2421                   "Dropping reply, core too busy.\n");
2422 #endif
2423       GNUNET_free (msg);
2424       return 0;
2425     }
2426   msize = ntohs (msg->size);
2427   GNUNET_assert (size >= msize);
2428   memcpy (buf, msg, msize);
2429   GNUNET_free (msg);
2430   return msize;
2431 }
2432
2433
2434 /**
2435  * Test if the load on this peer is too high
2436  * to even consider processing the query at
2437  * all.
2438  * 
2439  * @return GNUNET_YES if the load is too high, GNUNET_NO otherwise
2440  */
2441 static int
2442 test_load_too_high ()
2443 {
2444   return GNUNET_NO; // FIXME
2445 }
2446
2447
2448 /**
2449  * We're processing (local) results for a search request
2450  * from another peer.  Pass applicable results to the
2451  * peer and if we are done either clean up (operation
2452  * complete) or forward to other peers (more results possible).
2453  *
2454  * @param cls our closure (struct LocalGetContext)
2455  * @param key key for the content
2456  * @param size number of bytes in data
2457  * @param data content stored
2458  * @param type type of the content
2459  * @param priority priority of the content
2460  * @param anonymity anonymity-level for the content
2461  * @param expiration expiration time for the content
2462  * @param uid unique identifier for the datum;
2463  *        maybe 0 if no unique identifier is available
2464  */
2465 static void
2466 process_p2p_get_result (void *cls,
2467                         const GNUNET_HashCode * key,
2468                         uint32_t size,
2469                         const void *data,
2470                         uint32_t type,
2471                         uint32_t priority,
2472                         uint32_t anonymity,
2473                         struct GNUNET_TIME_Absolute
2474                         expiration, 
2475                         uint64_t uid)
2476 {
2477   struct ProcessGetContext *pgc = cls;
2478   GNUNET_HashCode dhash;
2479   GNUNET_HashCode mhash;
2480   struct PutMessage *reply;
2481   
2482   if (NULL == key)
2483     {
2484       /* no more results */
2485       if ( ( (pgc->policy & ROUTING_POLICY_FORWARD) ==  ROUTING_POLICY_FORWARD) &&
2486            ( (0 == pgc->results_found) ||
2487              (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
2488              (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
2489              (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) ) )
2490         {
2491           GNUNET_SCHEDULER_add_continuation (sched,
2492                                              GNUNET_NO,
2493                                              &forward_get_request,
2494                                              pgc,
2495                                              GNUNET_SCHEDULER_REASON_PREREQ_DONE);
2496         }
2497       else
2498         {
2499           if (pgc->bf != NULL)
2500             GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2501           GNUNET_free (pgc); 
2502         }
2503       next_ds_request ();
2504       return;
2505     }
2506   if (type == GNUNET_DATASTORE_BLOCKTYPE_ONDEMAND)
2507     {
2508       handle_on_demand_block (key, size, data, type, priority, 
2509                               anonymity, expiration, uid,
2510                               &process_p2p_get_result,
2511                               pgc);
2512       return;
2513     }
2514   /* check for duplicates */
2515   GNUNET_CRYPTO_hash (data, size, &dhash);
2516   mingle_hash (&dhash, 
2517                pgc->mingle,
2518                &mhash);
2519   if ( (pgc->bf != NULL) &&
2520        (GNUNET_YES ==
2521         GNUNET_CONTAINER_bloomfilter_test (pgc->bf,
2522                                            &mhash)) )
2523     {      
2524 #if DEBUG_FS
2525       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2526                   "Result from datastore filtered by bloomfilter.\n");
2527 #endif
2528       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2529       return;
2530     }
2531   pgc->results_found++;
2532   if ( (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
2533        (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
2534        (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) )
2535     {
2536       if (pgc->bf == NULL)
2537         pgc->bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
2538                                                      32, 
2539                                                      BLOOMFILTER_K);
2540       GNUNET_CONTAINER_bloomfilter_add (pgc->bf, 
2541                                         &mhash);
2542     }
2543
2544   reply = GNUNET_malloc (sizeof (struct PutMessage) + size);
2545   reply->header.size = htons (sizeof (struct PutMessage) + size);
2546   reply->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2547   reply->type = htonl (type);
2548   reply->expiration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (expiration));
2549   memcpy (&reply[1], data, size);
2550   GNUNET_CORE_notify_transmit_ready (core,
2551                                      pgc->priority,
2552                                      ACCEPTABLE_REPLY_DELAY,
2553                                      &pgc->reply_to,
2554                                      sizeof (struct PutMessage) + size,
2555                                      &transmit_message,
2556                                      reply);
2557   if ( (GNUNET_YES == test_load_too_high()) ||
2558        (pgc->results_found > 5 + 2 * pgc->priority) )
2559     {
2560       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
2561       pgc->policy &= ~ ROUTING_POLICY_FORWARD;
2562       return;
2563     }
2564   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2565 }
2566   
2567
2568 /**
2569  * We're processing a GET request from another peer.  Give it to our
2570  * local datastore.
2571  *
2572  * @param cls our "struct ProcessGetContext"
2573  * @param ok did we get a datastore slice or not?
2574  */
2575 static void
2576 ds_get_request (void *cls, 
2577                 int ok)
2578 {
2579   struct ProcessGetContext *pgc = cls;
2580   uint32_t type;
2581   struct GNUNET_TIME_Relative timeout;
2582
2583   if (GNUNET_OK != ok)
2584     {
2585       /* no point in doing P2P stuff if we can't even do local */
2586       GNUNET_free (dsh);
2587       return;
2588     }
2589   type = pgc->type;
2590   if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK)
2591     type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */
2592   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
2593                                            (pgc->priority + 1));
2594   GNUNET_DATASTORE_get (dsh,
2595                         &pgc->query,
2596                         type,
2597                         &process_p2p_get_result,
2598                         pgc,
2599                         timeout);
2600 }
2601
2602
2603 /**
2604  * The priority level imposes a bound on the maximum
2605  * value for the ttl that can be requested.
2606  *
2607  * @param ttl_in requested ttl
2608  * @param priority given priority
2609  * @return ttl_in if ttl_in is below the limit,
2610  *         otherwise the ttl-limit for the given priority
2611  */
2612 static int32_t
2613 bound_ttl (int32_t ttl_in, uint32_t prio)
2614 {
2615   unsigned long long allowed;
2616
2617   if (ttl_in <= 0)
2618     return ttl_in;
2619   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
2620   if (ttl_in > allowed)      
2621     {
2622       if (allowed >= (1 << 30))
2623         return 1 << 30;
2624       return allowed;
2625     }
2626   return ttl_in;
2627 }
2628
2629
2630 /**
2631  * We've received a request with the specified
2632  * priority.  Bound it according to how much
2633  * we trust the given peer.
2634  * 
2635  * @param prio_in requested priority
2636  * @param peer the peer making the request
2637  * @return effective priority
2638  */
2639 static uint32_t
2640 bound_priority (uint32_t prio_in,
2641                 const struct GNUNET_PeerIdentity *peer)
2642 {
2643   return 0; // FIXME!
2644 }
2645
2646
2647 /**
2648  * Handle P2P "GET" request.
2649  *
2650  * @param cls closure, always NULL
2651  * @param peer the other peer involved (sender or receiver, NULL
2652  *        for loopback messages where we are both sender and receiver)
2653  * @param message the actual message
2654  * @return GNUNET_OK to keep the connection open,
2655  *         GNUNET_SYSERR to close it (signal serious error)
2656  */
2657 static int
2658 handle_p2p_get (void *cls,
2659                 const struct GNUNET_PeerIdentity *other,
2660                 const struct GNUNET_MessageHeader *message)
2661 {
2662   uint16_t msize;
2663   const struct GetMessage *gm;
2664   unsigned int bits;
2665   const GNUNET_HashCode *opt;
2666   struct ProcessGetContext *pgc;
2667   uint32_t bm;
2668   size_t bfsize;
2669   uint32_t ttl_decrement;
2670   double preference;
2671   int net_load_up;
2672   int net_load_down;
2673
2674   msize = ntohs(message->size);
2675   if (msize < sizeof (struct GetMessage))
2676     {
2677       GNUNET_break_op (0);
2678       return GNUNET_SYSERR;
2679     }
2680   gm = (const struct GetMessage*) message;
2681   bm = ntohl (gm->hash_bitmap);
2682   bits = 0;
2683   while (bm > 0)
2684     {
2685       if (1 == (bm & 1))
2686         bits++;
2687       bm >>= 1;
2688     }
2689   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
2690     {
2691       GNUNET_break_op (0);
2692       return GNUNET_SYSERR;
2693     }  
2694   opt = (const GNUNET_HashCode*) &gm[1];
2695   bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
2696   pgc = GNUNET_malloc (sizeof (struct ProcessGetContext));
2697   if (bfsize > 0)
2698     pgc->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &pgc[1],
2699                                                  bfsize,
2700                                                  BLOOMFILTER_K);
2701   pgc->type = ntohl (gm->type);
2702   pgc->bm = ntohl (gm->hash_bitmap);
2703   pgc->mingle = gm->filter_mutator;
2704   bits = 0;
2705   if (0 != (pgc->bm & GET_MESSAGE_BIT_RETURN_TO))
2706     pgc->reply_to.hashPubKey = opt[bits++];
2707   else
2708     pgc->reply_to = *other;
2709   if (0 != (pgc->bm & GET_MESSAGE_BIT_SKS_NAMESPACE))
2710     pgc->namespace = opt[bits++];
2711   else if (pgc->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)
2712     {
2713       GNUNET_break_op (0);
2714       GNUNET_free (pgc);
2715       return GNUNET_SYSERR;
2716     }
2717   if (0 != (pgc->bm & GET_MESSAGE_BIT_TRANSMIT_TO))
2718     pgc->prime_target.hashPubKey = opt[bits++];
2719   /* note that we can really only check load here since otherwise
2720      peers could find out that we are overloaded by being disconnected
2721      after sending us a malformed query... */
2722   if (GNUNET_YES == test_load_too_high ())
2723     {
2724       if (NULL != pgc->bf)
2725         GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2726       GNUNET_free (pgc);
2727 #if DEBUG_FS
2728       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2729                   "Dropping query from `%s', this peer is too busy.\n",
2730                   GNUNET_h2s (other));
2731 #endif
2732       return GNUNET_OK;
2733     }
2734   net_load_up = 50; // FIXME
2735   net_load_down = 50; // FIXME
2736   pgc->policy = ROUTING_POLICY_NONE;
2737   if ( (net_load_up < IDLE_LOAD_THRESHOLD) &&
2738        (net_load_down < IDLE_LOAD_THRESHOLD) )
2739     {
2740       pgc->policy |= ROUTING_POLICY_ALL;
2741       pgc->priority = 0; /* no charge */
2742     }
2743   else
2744     {
2745       pgc->priority = bound_priority (ntohl (gm->priority), other);
2746       if ( (net_load_up < 
2747             IDLE_LOAD_THRESHOLD + pgc->priority * pgc->priority) &&
2748            (net_load_down < 
2749             IDLE_LOAD_THRESHOLD + pgc->priority * pgc->priority) )
2750         {
2751           pgc->policy |= ROUTING_POLICY_ALL;
2752         }
2753       else
2754         {
2755           // FIXME: is this sound?
2756           if (net_load_up < 90 + 10 * pgc->priority)
2757             pgc->policy |= ROUTING_POLICY_FORWARD;
2758           if (net_load_down < 90 + 10 * pgc->priority)
2759             pgc->policy |= ROUTING_POLICY_ANSWER;
2760         }
2761     }
2762   if (pgc->policy == ROUTING_POLICY_NONE)
2763     {
2764 #if DEBUG_FS
2765       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2766                   "Dropping query from `%s', network saturated.\n",
2767                   GNUNET_h2s (other));
2768 #endif
2769       if (NULL != pgc->bf)
2770         GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2771       GNUNET_free (pgc);
2772       return GNUNET_OK;     /* drop */
2773     }
2774   if ((pgc->policy & ROUTING_POLICY_INDIRECT) != ROUTING_POLICY_INDIRECT)
2775     pgc->priority = 0;  /* kill the priority (we cannot benefit) */
2776   pgc->ttl = bound_ttl (ntohl (gm->ttl), pgc->priority);
2777   /* decrement ttl (always) */
2778   ttl_decrement = 2 * TTL_DECREMENT +
2779     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2780                               TTL_DECREMENT);
2781   if ( (pgc->ttl < 0) &&
2782        (pgc->ttl - ttl_decrement > 0) )
2783     {
2784 #if DEBUG_FS
2785       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2786                   "Dropping query from `%s' due to TTL underflow.\n",
2787                   GNUNET_h2s (other));
2788 #endif
2789       /* integer underflow => drop (should be very rare)! */
2790       if (NULL != pgc->bf)
2791         GNUNET_CONTAINER_bloomfilter_free (pgc->bf);
2792       GNUNET_free (pgc);
2793       return GNUNET_OK;
2794     }
2795   pgc->ttl -= ttl_decrement;
2796   pgc->start_time = GNUNET_TIME_absolute_get ();
2797   preference = (double) pgc->priority;
2798   if (preference < QUERY_BANDWIDTH_VALUE)
2799     preference = QUERY_BANDWIDTH_VALUE;
2800   // FIXME: also reserve bandwidth for reply?
2801   GNUNET_CORE_peer_configure (core,
2802                               other,
2803                               GNUNET_TIME_UNIT_FOREVER_REL,
2804                               0, 0, preference, NULL, NULL);
2805   if (0 != (pgc->policy & ROUTING_POLICY_ANSWER))
2806     pgc->drq = queue_ds_request (BASIC_DATASTORE_REQUEST_DELAY,
2807                                  &ds_get_request,
2808                                  pgc);
2809   else
2810     GNUNET_SCHEDULER_add_continuation (sched,
2811                                        GNUNET_NO,
2812                                        &forward_get_request,
2813                                        pgc,
2814                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
2815   return GNUNET_OK;
2816 }
2817
2818
2819 /**
2820  * Function called to notify us that we can now transmit a reply to a
2821  * client or peer.  "buf" will be NULL and "size" zero if the socket was
2822  * closed for writing in the meantime.
2823  *
2824  * @param cls closure, points to a "struct PendingRequest*" with
2825  *            one or more pending replies
2826  * @param size number of bytes available in buf
2827  * @param buf where the callee should write the message
2828  * @return number of bytes written to buf
2829  */
2830 static size_t
2831 transmit_result (void *cls,
2832                  size_t size, 
2833                  void *buf)
2834 {
2835   struct PendingRequest *pr = cls;
2836   char *cbuf = buf;
2837   struct PendingReply *reply;
2838   size_t ret;
2839
2840   ret = 0;
2841   while (NULL != (reply = pr->replies_pending))
2842     {
2843       if ( (reply->msize + ret < ret) ||
2844            (reply->msize + ret > size) )
2845         break;
2846       pr->replies_pending = reply->next;
2847       memcpy (&cbuf[ret], &reply[1], reply->msize);
2848       ret += reply->msize;
2849       GNUNET_free (pr);
2850     }
2851   return 0;
2852 }
2853
2854
2855 /**
2856  * Iterator over pending requests.
2857  *
2858  * @param cls response (struct ProcessReplyClosure)
2859  * @param key our query
2860  * @param value value in the hash map (meta-info about the query)
2861  * @return GNUNET_YES (we should continue to iterate)
2862  */
2863 static int
2864 process_reply (void *cls,
2865                const GNUNET_HashCode * key,
2866                void *value)
2867 {
2868   struct ProcessReplyClosure *prq = cls;
2869   struct PendingRequest *pr = value;
2870   struct PendingRequest *eer;
2871   struct PendingReply *reply;
2872   struct PutMessage *pm;
2873   struct ContentMessage *cm;
2874   GNUNET_HashCode chash;
2875   GNUNET_HashCode mhash;
2876   struct GNUNET_PeerIdentity target;
2877   size_t msize;
2878   uint32_t prio;
2879   struct GNUNET_TIME_Relative max_delay;
2880   
2881   GNUNET_CRYPTO_hash (prq->data,
2882                       prq->size,
2883                       &chash);
2884   switch (prq->type)
2885     {
2886     case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
2887     case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
2888       break;
2889     case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
2890       /* FIXME: does prq->namespace match our expectations? */
2891       /* then: fall-through??? */
2892     case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
2893       if (pr->bf != NULL) 
2894         {
2895           mingle_hash (&chash, pr->mingle, &mhash);
2896           if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (pr->bf,
2897                                                                &mhash))
2898             return GNUNET_YES; /* duplicate */
2899           GNUNET_CONTAINER_bloomfilter_add (pr->bf,
2900                                             &mhash);
2901         }
2902       break;
2903     case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK:
2904       // FIXME: any checks against duplicates for SKBlocks?
2905       break;
2906     }
2907   prio = pr->priority;
2908   prq->priority += pr->remaining_priority;
2909   pr->remaining_priority = 0;
2910   if (pr->client != NULL)
2911     {
2912       if (pr->replies_seen_size == pr->replies_seen_off)
2913         GNUNET_array_grow (pr->replies_seen,
2914                            pr->replies_seen_size,
2915                            pr->replies_seen_size * 2 + 4);
2916       pr->replies_seen[pr->replies_seen_off++] = chash;
2917       // FIXME: possibly recalculate BF!
2918     }
2919   if (pr->client == NULL)
2920     {
2921       msize = sizeof (struct ContentMessage) + prq->size;
2922       reply = GNUNET_malloc (msize + sizeof (struct PendingReply));
2923       reply->msize = msize;
2924       cm = (struct ContentMessage*) &reply[1];
2925       cm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CONTENT);
2926       cm->header.size = htons (msize);
2927       cm->type = htonl (prq->type);
2928       cm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
2929       reply->next = pr->replies_pending;
2930       pr->replies_pending = reply;
2931       memcpy (&reply[1], prq->data, prq->size);
2932       if (pr->cth != NULL)
2933         return GNUNET_YES;
2934       max_delay = GNUNET_TIME_UNIT_FOREVER_REL;
2935       if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration) >= max_pending_requests)
2936         {
2937           /* estimate expiration time from time difference between
2938              first request that will be discarded and this request */
2939           eer = GNUNET_CONTAINER_heap_peek (requests_by_expiration);
2940           max_delay = GNUNET_TIME_absolute_get_difference (pr->start_time,
2941                                                            eer->start_time);
2942         }
2943       GNUNET_PEER_resolve (pr->source_pid,
2944                            &target);
2945       pr->cth = GNUNET_CORE_notify_transmit_ready (core,
2946                                                    prio,
2947                                                    max_delay,
2948                                                    &target,
2949                                                    msize,
2950                                                    &transmit_result,
2951                                                    pr);
2952       if (NULL == pr->cth)
2953         {
2954           // FIXME: now what? discard?
2955         }
2956     }
2957   else
2958     {
2959       msize = sizeof (struct PutMessage) + prq->size;
2960       reply = GNUNET_malloc (msize + sizeof (struct PendingReply));
2961       reply->msize = msize;
2962       reply->next = pr->replies_pending;
2963       pm = (struct PutMessage*) &reply[1];
2964       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2965       pm->header.size = htons (msize);
2966       pm->type = htonl (prq->type);
2967       pm->expiration = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (prq->expiration));
2968       pr->replies_pending = reply;
2969       memcpy (&reply[1], prq->data, prq->size);
2970       if (pr->th != NULL)
2971         return GNUNET_YES;
2972       pr->th = GNUNET_SERVER_notify_transmit_ready (pr->client,
2973                                                     msize,
2974                                                     GNUNET_TIME_UNIT_FOREVER_REL,
2975                                                     &transmit_result,
2976                                                     pr);
2977       if (pr->th == NULL)
2978         {
2979           // FIXME: need to try again later (not much
2980           // to do here specifically, but we need to
2981           // check somewhere else to handle this case!)
2982         }
2983     }
2984   // FIXME: implement hot-path routing statistics keeping!
2985   return GNUNET_YES;
2986 }
2987
2988
2989 /**
2990  * Check if the given KBlock is well-formed.
2991  *
2992  * @param kb the kblock data (or at least "dsize" bytes claiming to be one)
2993  * @param dsize size of "kb" in bytes; check for < sizeof(struct KBlock)!
2994  * @param query where to store the query that this block answers
2995  * @return GNUNET_OK if this is actually a well-formed KBlock
2996  */
2997 static int
2998 check_kblock (const struct KBlock *kb,
2999               size_t dsize,
3000               GNUNET_HashCode *query)
3001 {
3002   if (dsize < sizeof (struct KBlock))
3003     {
3004       GNUNET_break_op (0);
3005       return GNUNET_SYSERR;
3006     }
3007   if (dsize - sizeof (struct KBlock) !=
3008       ntohs (kb->purpose.size) 
3009       - sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) 
3010       - sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) ) 
3011     {
3012       GNUNET_break_op (0);
3013       return GNUNET_SYSERR;
3014     }
3015   if (GNUNET_OK !=
3016       GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_KBLOCK,
3017                                 &kb->purpose,
3018                                 &kb->signature,
3019                                 &kb->keyspace)) 
3020     {
3021       GNUNET_break_op (0);
3022       return GNUNET_SYSERR;
3023     }
3024   if (query != NULL)
3025     GNUNET_CRYPTO_hash (&kb->keyspace,
3026                         sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
3027                         query);
3028   return GNUNET_OK;
3029 }
3030
3031
3032 /**
3033  * Check if the given SBlock is well-formed.
3034  *
3035  * @param sb the sblock data (or at least "dsize" bytes claiming to be one)
3036  * @param dsize size of "kb" in bytes; check for < sizeof(struct SBlock)!
3037  * @param query where to store the query that this block answers
3038  * @param namespace where to store the namespace that this block belongs to
3039  * @return GNUNET_OK if this is actually a well-formed SBlock
3040  */
3041 static int
3042 check_sblock (const struct SBlock *sb,
3043               size_t dsize,
3044               GNUNET_HashCode *query,   
3045               GNUNET_HashCode *namespace)
3046 {
3047   if (dsize < sizeof (struct SBlock))
3048     {
3049       GNUNET_break_op (0);
3050       return GNUNET_SYSERR;
3051     }
3052   if (dsize !=
3053       ntohs (sb->purpose.size) + sizeof (struct GNUNET_CRYPTO_RsaSignature))
3054     {
3055       GNUNET_break_op (0);
3056       return GNUNET_SYSERR;
3057     }
3058   if (GNUNET_OK !=
3059       GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_SBLOCK,
3060                                 &sb->purpose,
3061                                 &sb->signature,
3062                                 &sb->subspace)) 
3063     {
3064       GNUNET_break_op (0);
3065       return GNUNET_SYSERR;
3066     }
3067   if (query != NULL)
3068     *query = sb->identifier;
3069   if (namespace != NULL)
3070     GNUNET_CRYPTO_hash (&sb->subspace,
3071                         sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
3072                         namespace);
3073   return GNUNET_OK;
3074 }
3075
3076
3077 /**
3078  * Handle P2P "PUT" request.
3079  *
3080  * @param cls closure, always NULL
3081  * @param peer the other peer involved (sender or receiver, NULL
3082  *        for loopback messages where we are both sender and receiver)
3083  * @param message the actual message
3084  * @return GNUNET_OK to keep the connection open,
3085  *         GNUNET_SYSERR to close it (signal serious error)
3086  */
3087 static int
3088 handle_p2p_put (void *cls,
3089                 const struct GNUNET_PeerIdentity *other,
3090                 const struct GNUNET_MessageHeader *message)
3091 {
3092   const struct PutMessage *put;
3093   uint16_t msize;
3094   size_t dsize;
3095   uint32_t type;
3096   struct GNUNET_TIME_Absolute expiration;
3097   GNUNET_HashCode query;
3098   struct ProcessReplyClosure prq;
3099
3100   msize = ntohs (message->size);
3101   if (msize < sizeof (struct PutMessage))
3102     {
3103       GNUNET_break_op(0);
3104       return GNUNET_SYSERR;
3105     }
3106   put = (const struct PutMessage*) message;
3107   dsize = msize - sizeof (struct PutMessage);
3108   type = ntohl (put->type);
3109   expiration = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_ntoh (put->expiration));
3110
3111   /* first, validate! */
3112   switch (type)
3113     {
3114     case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
3115     case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
3116       GNUNET_CRYPTO_hash (&put[1], dsize, &query);
3117       break;
3118     case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
3119       if (GNUNET_OK !=
3120           check_kblock ((const struct KBlock*) &put[1],
3121                         dsize,
3122                         &query))
3123         return GNUNET_SYSERR;
3124       break;
3125     case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
3126       if (GNUNET_OK !=
3127           check_sblock ((const struct SBlock*) &put[1],
3128                         dsize,
3129                         &query,
3130                         &prq.namespace))
3131         return GNUNET_SYSERR;
3132       break;
3133     case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK:
3134       // FIXME -- validate SKBLOCK!
3135       GNUNET_break (0);
3136       return GNUNET_OK;
3137     default:
3138       /* unknown block type */
3139       GNUNET_break_op (0);
3140       return GNUNET_SYSERR;
3141     }
3142
3143   /* now, lookup 'query' */
3144   prq.data = (const void*) &put[1];
3145   prq.size = dsize;
3146   prq.type = type;
3147   prq.expiration = expiration;
3148   prq.priority = 0;
3149   GNUNET_CONTAINER_multihashmap_get_multiple (requests_by_query,
3150                                               &query,
3151                                               &process_reply,
3152                                               &prq);
3153   // FIXME: if migration is on and load is low,
3154   // queue to store data in datastore;
3155   // use "prq.priority" for that!
3156   return GNUNET_OK;
3157 }
3158
3159
3160 /**
3161  * List of handlers for P2P messages
3162  * that we care about.
3163  */
3164 static struct GNUNET_CORE_MessageHandler p2p_handlers[] =
3165   {
3166     { &handle_p2p_get, 
3167       GNUNET_MESSAGE_TYPE_FS_GET, 0 },
3168     { &handle_p2p_put, 
3169       GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
3170     { NULL, 0, 0 }
3171   };
3172
3173
3174 /**
3175  * Task that will try to initiate a connection with the
3176  * core service.
3177  * 
3178  * @param cls unused
3179  * @param tc unused
3180  */
3181 static void
3182 core_connect_task (void *cls,
3183                    const struct GNUNET_SCHEDULER_TaskContext *tc);
3184
3185
3186 /**
3187  * Function called by the core after we've
3188  * connected.
3189  */
3190 static void
3191 core_start_cb (void *cls,
3192                struct GNUNET_CORE_Handle * server,
3193                const struct GNUNET_PeerIdentity *
3194                my_identity,
3195                const struct
3196                GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded *
3197                publicKey)
3198 {
3199   if (server == NULL)
3200     {
3201       GNUNET_SCHEDULER_add_delayed (sched,
3202                                     GNUNET_NO,
3203                                     GNUNET_SCHEDULER_PRIORITY_HIGH,
3204                                     GNUNET_SCHEDULER_NO_TASK,
3205                                     GNUNET_TIME_UNIT_SECONDS,
3206                                     &core_connect_task,
3207                                     NULL);
3208       return;
3209     }
3210   core = server;
3211 }
3212
3213
3214 /**
3215  * Task that will try to initiate a connection with the
3216  * core service.
3217  * 
3218  * @param cls unused
3219  * @param tc unused
3220  */
3221 static void
3222 core_connect_task (void *cls,
3223                    const struct GNUNET_SCHEDULER_TaskContext *tc)
3224 {
3225   GNUNET_CORE_connect (sched,
3226                        cfg,
3227                        GNUNET_TIME_UNIT_FOREVER_REL,
3228                        NULL,
3229                        &core_start_cb,
3230                        &peer_connect_handler,
3231                        &peer_disconnect_handler,
3232                        NULL, 
3233                        NULL, GNUNET_NO,
3234                        NULL, GNUNET_NO,
3235                        p2p_handlers);
3236 }
3237
3238
3239 /**
3240  * Process fs requests.
3241  *
3242  * @param cls closure
3243  * @param sched scheduler to use
3244  * @param server the initialized server
3245  * @param cfg configuration to use
3246  */
3247 static void
3248 run (void *cls,
3249      struct GNUNET_SCHEDULER_Handle *s,
3250      struct GNUNET_SERVER_Handle *server,
3251      const struct GNUNET_CONFIGURATION_Handle *c)
3252 {
3253   sched = s;
3254   cfg = c;
3255
3256   ifm = GNUNET_CONTAINER_multihashmap_create (128);
3257   requests_by_query = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
3258   requests_by_peer = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
3259   connected_peers = GNUNET_CONTAINER_multihashmap_create (64);
3260   requests_by_expiration = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
3261   read_index_list ();
3262   dsh = GNUNET_DATASTORE_connect (cfg,
3263                                   sched);
3264   if (NULL == dsh)
3265     {
3266       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3267                   _("Failed to connect to datastore service.\n"));
3268       return;
3269     }
3270   GNUNET_SERVER_disconnect_notify (server, 
3271                                    &handle_client_disconnect,
3272                                    NULL);
3273   GNUNET_SERVER_add_handlers (server, handlers);
3274   core_connect_task (NULL, NULL);
3275   GNUNET_SCHEDULER_add_delayed (sched,
3276                                 GNUNET_YES,
3277                                 GNUNET_SCHEDULER_PRIORITY_IDLE,
3278                                 GNUNET_SCHEDULER_NO_TASK,
3279                                 GNUNET_TIME_UNIT_FOREVER_REL,
3280                                 &shutdown_task,
3281                                 NULL);
3282 }
3283
3284
3285 /**
3286  * The main function for the fs service.
3287  *
3288  * @param argc number of arguments from the command line
3289  * @param argv command line arguments
3290  * @return 0 ok, 1 on error
3291  */
3292 int
3293 main (int argc, char *const *argv)
3294 {
3295   return (GNUNET_OK ==
3296           GNUNET_SERVICE_run (argc,
3297                               argv,
3298                               "fs", &run, NULL, NULL, NULL)) ? 0 : 1;
3299 }
3300
3301 /* end of gnunet-service-fs.c */