fix
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs.c
23  * @brief gnunet anonymity protocol implementation
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - have non-zero preference / priority for requests we initiate!
28  * - track stats for hot-path routing
29  * - implement hot-path routing decision procedure
30  * - implement: bound_priority, test_load_too_high, validate_skblock
31  * - add content migration support (store locally)
32  * - statistics
33  */
34 #include "platform.h"
35 #include <float.h>
36 #include "gnunet_constants.h"
37 #include "gnunet_core_service.h"
38 #include "gnunet_datastore_service.h"
39 #include "gnunet_peer_lib.h"
40 #include "gnunet_protocols.h"
41 #include "gnunet_signatures.h"
42 #include "gnunet_util_lib.h"
43 #include "gnunet-service-fs_drq.h"
44 #include "gnunet-service-fs_indexing.h"
45 #include "fs.h"
46
47 #define DEBUG_FS GNUNET_YES
48
49 /**
50  * Maximum number of outgoing messages we queue per peer.
51  * FIXME: set to a tiny value for testing; make configurable.
52  */
53 #define MAX_QUEUE_PER_PEER 2
54
55
56 /**
57  * What is the maximum delay for a P2P FS message (in our interaction
58  * with core)?  FS-internal delays are another story.  The value is
59  * chosen based on the 32k block size.  Given that peers typcially
60  * have at least 1 kb/s bandwidth, 45s waits give us a chance to
61  * transmit one message even to the lowest-bandwidth peers.
62  */
63 #define MAX_TRANSMIT_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 45)
64
65
66
67 /**
68  * Maximum number of requests (from other peers) that we're
69  * willing to have pending at any given point in time.
70  * FIXME: set from configuration (and 32 is a tiny value for testing only).
71  */
72 static uint64_t max_pending_requests = 32;
73
74
75 /**
76  * Information we keep for each pending reply.  The
77  * actual message follows at the end of this struct.
78  */
79 struct PendingMessage;
80
81
82 /**
83  * Function called upon completion of a transmission.
84  *
85  * @param cls closure
86  * @param pid ID of receiving peer, 0 on transmission error
87  */
88 typedef void (*TransmissionContinuation)(void * cls, 
89                                          GNUNET_PEER_Id tpid);
90
91
92 /**
93  * Information we keep for each pending message (GET/PUT).  The
94  * actual message follows at the end of this struct.
95  */
96 struct PendingMessage
97 {
98   /**
99    * This is a doubly-linked list of messages to the same peer.
100    */
101   struct PendingMessage *next;
102
103   /**
104    * This is a doubly-linked list of messages to the same peer.
105    */
106   struct PendingMessage *prev;
107
108   /**
109    * Entry in pending message list for this pending message.
110    */ 
111   struct PendingMessageList *pml;  
112
113   /**
114    * Function to call immediately once we have transmitted this
115    * message.
116    */
117   TransmissionContinuation cont;
118
119   /**
120    * Closure for cont.
121    */
122   void *cont_cls;
123
124   /**
125    * Size of the reply; actual reply message follows
126    * at the end of this struct.
127    */
128   size_t msize;
129   
130   /**
131    * How important is this message for us?
132    */
133   uint32_t priority;
134  
135 };
136
137
138 /**
139  * Information about a peer that we are connected to.
140  * We track data that is useful for determining which
141  * peers should receive our requests.  We also keep
142  * a list of messages to transmit to this peer.
143  */
144 struct ConnectedPeer
145 {
146
147   /**
148    * List of the last clients for which this peer successfully
149    * answered a query.
150    */
151   struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
152
153   /**
154    * List of the last PIDs for which
155    * this peer successfully answered a query;
156    * We use 0 to indicate no successful reply.
157    */
158   GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
159
160   /**
161    * Average delay between sending the peer a request and
162    * getting a reply (only calculated over the requests for
163    * which we actually got a reply).   Calculated
164    * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
165    */ 
166   struct GNUNET_TIME_Relative avg_delay;
167
168   /**
169    * Handle for an active request for transmission to this
170    * peer, or NULL.
171    */
172   struct GNUNET_CORE_TransmitHandle *cth;
173
174   /**
175    * Messages (replies, queries, content migration) we would like to
176    * send to this peer in the near future.  Sorted by priority, head.
177    */
178   struct PendingMessage *pending_messages_head;
179
180   /**
181    * Messages (replies, queries, content migration) we would like to
182    * send to this peer in the near future.  Sorted by priority, tail.
183    */
184   struct PendingMessage *pending_messages_tail;
185
186   /**
187    * Average priority of successful replies.  Calculated
188    * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
189    */
190   double avg_priority;
191
192   /**
193    * Increase in traffic preference still to be submitted
194    * to the core service for this peer. FIXME: double or 'uint64_t'?
195    */
196   double inc_preference;
197
198   /**
199    * The peer's identity.
200    */
201   GNUNET_PEER_Id pid;  
202
203   /**
204    * Size of the linked list of 'pending_messages'.
205    */
206   unsigned int pending_requests;
207
208   /**
209    * Which offset in "last_p2p_replies" will be updated next?
210    * (we go round-robin).
211    */
212   unsigned int last_p2p_replies_woff;
213
214   /**
215    * Which offset in "last_client_replies" will be updated next?
216    * (we go round-robin).
217    */
218   unsigned int last_client_replies_woff;
219
220 };
221
222
223 /**
224  * Information we keep for each pending request.  We should try to
225  * keep this struct as small as possible since its memory consumption
226  * is key to how many requests we can have pending at once.
227  */
228 struct PendingRequest;
229
230
231 /**
232  * Doubly-linked list of requests we are performing
233  * on behalf of the same client.
234  */
235 struct ClientRequestList
236 {
237
238   /**
239    * This is a doubly-linked list.
240    */
241   struct ClientRequestList *next;
242
243   /**
244    * This is a doubly-linked list.
245    */
246   struct ClientRequestList *prev;
247
248   /**
249    * Request this entry represents.
250    */
251   struct PendingRequest *req;
252
253   /**
254    * Client list this request belongs to.
255    */
256   struct ClientList *client_list;
257
258 };
259
260
261 /**
262  * Replies to be transmitted to the client.  The actual
263  * response message is allocated after this struct.
264  */
265 struct ClientResponseMessage
266 {
267   /**
268    * This is a doubly-linked list.
269    */
270   struct ClientResponseMessage *next;
271
272   /**
273    * This is a doubly-linked list.
274    */
275   struct ClientResponseMessage *prev;
276
277   /**
278    * Client list entry this response belongs to.
279    */
280   struct ClientList *client_list;
281
282   /**
283    * Number of bytes in the response.
284    */
285   size_t msize;
286 };
287
288
289 /**
290  * Linked list of clients we are performing requests
291  * for right now.
292  */
293 struct ClientList
294 {
295   /**
296    * This is a linked list.
297    */
298   struct ClientList *next;
299
300   /**
301    * ID of a client making a request, NULL if this entry is for a
302    * peer.
303    */
304   struct GNUNET_SERVER_Client *client;
305
306   /**
307    * Head of list of requests performed on behalf
308    * of this client right now.
309    */
310   struct ClientRequestList *rl_head;
311
312   /**
313    * Tail of list of requests performed on behalf
314    * of this client right now.
315    */
316   struct ClientRequestList *rl_tail;
317
318   /**
319    * Head of linked list of responses.
320    */
321   struct ClientResponseMessage *res_head;
322
323   /**
324    * Tail of linked list of responses.
325    */
326   struct ClientResponseMessage *res_tail;
327
328   /**
329    * Context for sending replies.
330    */
331   struct GNUNET_CONNECTION_TransmitHandle *th;
332
333 };
334
335
336 /**
337  * Doubly-linked list of messages we are performing
338  * due to a pending request.
339  */
340 struct PendingMessageList
341 {
342
343   /**
344    * This is a doubly-linked list of messages on behalf of the same request.
345    */
346   struct PendingMessageList *next;
347
348   /**
349    * This is a doubly-linked list of messages on behalf of the same request.
350    */
351   struct PendingMessageList *prev;
352
353   /**
354    * Message this entry represents.
355    */
356   struct PendingMessage *pm;
357
358   /**
359    * Request this entry belongs to.
360    */
361   struct PendingRequest *req;
362
363   /**
364    * Peer this message is targeted for.
365    */
366   struct ConnectedPeer *target;
367
368 };
369
370
371 /**
372  * Information we keep for each pending request.  We should try to
373  * keep this struct as small as possible since its memory consumption
374  * is key to how many requests we can have pending at once.
375  */
376 struct PendingRequest
377 {
378
379   /**
380    * If this request was made by a client, this is our entry in the
381    * client request list; otherwise NULL.
382    */
383   struct ClientRequestList *client_request_list;
384
385   /**
386    * Entry of peer responsible for this entry (if this request
387    * was made by a peer).
388    */
389   struct ConnectedPeer *cp;
390
391   /**
392    * If this is a namespace query, pointer to the hash of the public
393    * key of the namespace; otherwise NULL.  Pointer will be to the 
394    * end of this struct (so no need to free it).
395    */
396   const GNUNET_HashCode *namespace;
397
398   /**
399    * Bloomfilter we use to filter out replies that we don't care about
400    * (anymore).  NULL as long as we are interested in all replies.
401    */
402   struct GNUNET_CONTAINER_BloomFilter *bf;
403
404   /**
405    * Context of our GNUNET_CORE_peer_change_preference call.
406    */
407   struct GNUNET_CORE_InformationRequestContext *irc;
408
409   /**
410    * Hash code of all replies that we have seen so far (only valid
411    * if client is not NULL since we only track replies like this for
412    * our own clients).
413    */
414   GNUNET_HashCode *replies_seen;
415
416   /**
417    * Node in the heap representing this entry; NULL
418    * if we have no heap node.
419    */
420   struct GNUNET_CONTAINER_HeapNode *hnode;
421
422   /**
423    * Head of list of messages being performed on behalf of this
424    * request.
425    */
426   struct PendingMessageList *pending_head;
427
428   /**
429    * Tail of list of messages being performed on behalf of this
430    * request.
431    */
432   struct PendingMessageList *pending_tail;
433
434   /**
435    * When did we first see this request (form this peer), or, if our
436    * client is initiating, when did we last initiate a search?
437    */
438   struct GNUNET_TIME_Absolute start_time;
439
440   /**
441    * The query that this request is for.
442    */
443   GNUNET_HashCode query;
444
445   /**
446    * The task responsible for transmitting queries
447    * for this request.
448    */
449   GNUNET_SCHEDULER_TaskIdentifier task;
450
451   /**
452    * (Interned) Peer identifier that identifies a preferred target
453    * for requests.
454    */
455   GNUNET_PEER_Id target_pid;
456
457   /**
458    * (Interned) Peer identifiers of peers that have already
459    * received our query for this content.
460    */
461   GNUNET_PEER_Id *used_pids;
462   
463   /**
464    * Our entry in the DRQ (non-NULL while we wait for our
465    * turn to interact with the local database).
466    */
467   struct DatastoreRequestQueue *drq;
468
469   /**
470    * Size of the 'bf' (in bytes).
471    */
472   size_t bf_size;
473
474   /**
475    * Desired anonymity level; only valid for requests from a local client.
476    */
477   uint32_t anonymity_level;
478
479   /**
480    * How many entries in "used_pids" are actually valid?
481    */
482   unsigned int used_pids_off;
483
484   /**
485    * How long is the "used_pids" array?
486    */
487   unsigned int used_pids_size;
488
489   /**
490    * Number of results found for this request.
491    */
492   unsigned int results_found;
493
494   /**
495    * How many entries in "replies_seen" are actually valid?
496    */
497   unsigned int replies_seen_off;
498
499   /**
500    * How long is the "replies_seen" array?
501    */
502   unsigned int replies_seen_size;
503   
504   /**
505    * Priority with which this request was made.  If one of our clients
506    * made the request, then this is the current priority that we are
507    * using when initiating the request.  This value is used when
508    * we decide to reward other peers with trust for providing a reply.
509    */
510   uint32_t priority;
511
512   /**
513    * Priority points left for us to spend when forwarding this request
514    * to other peers.
515    */
516   uint32_t remaining_priority;
517
518   /**
519    * Number to mingle hashes for bloom-filter tests with.
520    */
521   int32_t mingle;
522
523   /**
524    * TTL with which we saw this request (or, if we initiated, TTL that
525    * we used for the request).
526    */
527   int32_t ttl;
528   
529   /**
530    * Type of the content that this request is for.
531    */
532   uint32_t type;
533
534 };
535
536
537 /**
538  * Our scheduler.
539  */
540 static struct GNUNET_SCHEDULER_Handle *sched;
541
542 /**
543  * Our configuration.
544  */
545 static const struct GNUNET_CONFIGURATION_Handle *cfg;
546
547 /**
548  * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
549  */
550 static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
551
552 /**
553  * Map of peer identifiers to "struct PendingRequest" (for that peer).
554  */
555 static struct GNUNET_CONTAINER_MultiHashMap *peer_request_map;
556
557 /**
558  * Map of query identifiers to "struct PendingRequest" (for that query).
559  */
560 static struct GNUNET_CONTAINER_MultiHashMap *query_request_map;
561
562 /**
563  * Heap with the request that will expire next at the top.  Contains
564  * pointers of type "struct PendingRequest*"; these will *also* be
565  * aliased from the "requests_by_peer" data structures and the
566  * "requests_by_query" table.  Note that requests from our clients
567  * don't expire and are thus NOT in the "requests_by_expiration"
568  * (or the "requests_by_peer" tables).
569  */
570 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
571
572 /**
573  * Linked list of clients we are currently processing requests for.
574  */
575 struct ClientList *client_list;
576
577 /**
578  * Pointer to handle to the core service (points to NULL until we've
579  * connected to it).
580  */
581 struct GNUNET_CORE_Handle *core;
582
583
584 /* ******************* clean up functions ************************ */
585
586
587 /**
588  * We're done with a particular message list entry.
589  * Free all associated resources.
590  * 
591  * @param pml entry to destroy
592  */
593 static void
594 destroy_pending_message_list_entry (struct PendingMessageList *pml)
595 {
596   GNUNET_CONTAINER_DLL_remove (pml->req->pending_head,
597                                pml->req->pending_tail,
598                                pml);
599   GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head,
600                                pml->target->pending_messages_tail,
601                                pml->pm);
602   pml->target->pending_requests--;
603   GNUNET_free (pml->pm);
604   GNUNET_free (pml);
605 }
606
607
608 /**
609  * Destroy the given pending message (and call the respective
610  * continuation).
611  *
612  * @param pm message to destroy
613  * @param tpid id of peer that the message was delivered to, or 0 for none
614  */
615 static void
616 destroy_pending_message (struct PendingMessage *pm,
617                          GNUNET_PEER_Id tpid)
618 {
619   struct PendingMessageList *pml = pm->pml;
620   TransmissionContinuation cont;
621   void *cont_cls;
622
623   GNUNET_assert (pml->pm == pm);
624   GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
625   cont = pm->cont;
626   cont_cls = pm->cont_cls;
627   destroy_pending_message_list_entry (pml);
628   cont (cont_cls, 0);  
629 }
630
631
632 /**
633  * We're done processing a particular request.
634  * Free all associated resources.
635  *
636  * @param pr request to destroy
637  */
638 static void
639 destroy_pending_request (struct PendingRequest *pr)
640 {
641   struct GNUNET_PeerIdentity pid;
642
643   if (pr->hnode != NULL)
644     {
645       GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
646                                          pr->hnode);
647       pr->hnode = NULL;
648     }
649   /* might have already been removed from map in 'process_reply' (if
650      there was a unique reply) or never inserted if it was a
651      duplicate; hence ignore the return value here */
652   (void) GNUNET_CONTAINER_multihashmap_remove (query_request_map,
653                                                &pr->query,
654                                                pr);
655   if (pr->drq != NULL)
656     {
657       GNUNET_FS_drq_get_cancel (pr->drq);
658       pr->drq = NULL;
659     }
660   if (pr->client_request_list != NULL)
661     {
662       GNUNET_CONTAINER_DLL_remove (pr->client_request_list->client_list->rl_head,
663                                    pr->client_request_list->client_list->rl_tail,
664                                    pr->client_request_list);
665       GNUNET_free (pr->client_request_list);
666       pr->client_request_list = NULL;
667     }
668   if (pr->cp != NULL)
669     {
670       GNUNET_PEER_resolve (pr->cp->pid,
671                            &pid);
672       (void) GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
673                                                    &pid.hashPubKey,
674                                                    pr);
675       pr->cp = NULL;
676     }
677   if (pr->bf != NULL)
678     {
679       GNUNET_CONTAINER_bloomfilter_free (pr->bf);                                        
680       pr->bf = NULL;
681     }
682   if (pr->irc != NULL)
683     {
684       GNUNET_CORE_peer_change_preference_cancel (pr->irc);
685       pr->irc = NULL;
686     }
687   if (pr->replies_seen != NULL)
688     {
689       GNUNET_free (pr->replies_seen);
690       pr->replies_seen = NULL;
691     }
692   if (pr->task != GNUNET_SCHEDULER_NO_TASK)
693     {
694       GNUNET_SCHEDULER_cancel (sched,
695                                pr->task);
696       pr->task = GNUNET_SCHEDULER_NO_TASK;
697     }
698   while (NULL != pr->pending_head)    
699     destroy_pending_message_list_entry (pr->pending_head);
700   GNUNET_PEER_change_rc (pr->target_pid, -1);
701   if (pr->used_pids != NULL)
702     {
703       GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
704       GNUNET_free (pr->used_pids);
705       pr->used_pids_off = 0;
706       pr->used_pids_size = 0;
707       pr->used_pids = NULL;
708     }
709   GNUNET_free (pr);
710 }
711
712
713 /**
714  * Method called whenever a given peer connects.
715  *
716  * @param cls closure, not used
717  * @param peer peer identity this notification is about
718  * @param latency reported latency of the connection with 'other'
719  * @param distance reported distance (DV) to 'other' 
720  */
721 static void 
722 peer_connect_handler (void *cls,
723                       const struct
724                       GNUNET_PeerIdentity * peer,
725                       struct GNUNET_TIME_Relative latency,
726                       uint32_t distance)
727 {
728   struct ConnectedPeer *cp;
729
730   cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
731   cp->pid = GNUNET_PEER_intern (peer);
732   GNUNET_CONTAINER_multihashmap_put (connected_peers,
733                                      &peer->hashPubKey,
734                                      cp,
735                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
736 }
737
738
739 /**
740  * Free (each) request made by the peer.
741  *
742  * @param cls closure, points to peer that the request belongs to
743  * @param key current key code
744  * @param value value in the hash map
745  * @return GNUNET_YES (we should continue to iterate)
746  */
747 static int
748 destroy_request (void *cls,
749                  const GNUNET_HashCode * key,
750                  void *value)
751 {
752   const struct GNUNET_PeerIdentity * peer = cls;
753   struct PendingRequest *pr = value;
754   
755   GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
756                                         &peer->hashPubKey,
757                                         pr);
758   destroy_pending_request (pr);
759   return GNUNET_YES;
760 }
761
762
763 /**
764  * Method called whenever a peer disconnects.
765  *
766  * @param cls closure, not used
767  * @param peer peer identity this notification is about
768  */
769 static void
770 peer_disconnect_handler (void *cls,
771                          const struct
772                          GNUNET_PeerIdentity * peer)
773 {
774   struct ConnectedPeer *cp;
775   struct PendingMessage *pm;
776   unsigned int i;
777
778   GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map,
779                                               &peer->hashPubKey,
780                                               &destroy_request,
781                                               (void*) peer);
782   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
783                                           &peer->hashPubKey);
784   if (cp == NULL)
785     return;
786   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
787     {
788       if (NULL != cp->last_client_replies[i])
789         {
790           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
791           cp->last_client_replies[i] = NULL;
792         }
793     }
794   GNUNET_CONTAINER_multihashmap_remove (connected_peers,
795                                         &peer->hashPubKey,
796                                         cp);
797   GNUNET_PEER_change_rc (cp->pid, -1);
798   GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
799   if (NULL != cp->cth)
800     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
801   while (NULL != (pm = cp->pending_messages_head))
802     destroy_pending_message (pm, 0 /* delivery failed */);
803   GNUNET_break (0 == cp->pending_requests);
804   GNUNET_free (cp);
805 }
806
807
808 /**
809  * Iterator over hash map entries that removes all occurences
810  * of the given 'client' from the 'last_client_replies' of the
811  * given connected peer.
812  *
813  * @param cls closure, the 'struct GNUNET_SERVER_Client*' to remove
814  * @param key current key code (unused)
815  * @param value value in the hash map (the 'struct ConnectedPeer*' to change)
816  * @return GNUNET_YES (we should continue to iterate)
817  */
818 static int
819 remove_client_from_last_client_replies (void *cls,
820                                         const GNUNET_HashCode * key,
821                                         void *value)
822 {
823   struct GNUNET_SERVER_Client *client = cls;
824   struct ConnectedPeer *cp = value;
825   unsigned int i;
826
827   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
828     {
829       if (cp->last_client_replies[i] == client)
830         {
831           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
832           cp->last_client_replies[i] = NULL;
833         }
834     }  
835   return GNUNET_YES;
836 }
837
838
839 /**
840  * A client disconnected.  Remove all of its pending queries.
841  *
842  * @param cls closure, NULL
843  * @param client identification of the client
844  */
845 static void
846 handle_client_disconnect (void *cls,
847                           struct GNUNET_SERVER_Client
848                           * client)
849 {
850   struct ClientList *pos;
851   struct ClientList *prev;
852   struct ClientRequestList *rcl;
853   struct ClientResponseMessage *creply;
854
855   if (client == NULL)
856     return;
857   prev = NULL;
858   pos = client_list;
859   while ( (NULL != pos) &&
860           (pos->client != client) )
861     {
862       prev = pos;
863       pos = pos->next;
864     }
865   if (pos == NULL)
866     return; /* no requests pending for this client */
867   while (NULL != (rcl = pos->rl_head))
868     destroy_pending_request (rcl->req);
869   if (prev == NULL)
870     client_list = pos->next;
871   else
872     prev->next = pos->next;
873   if (pos->th != NULL)
874     {
875       GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
876       pos->th = NULL;
877     }
878   while (NULL != (creply = pos->res_head))
879     {
880       GNUNET_CONTAINER_DLL_remove (pos->res_head,
881                                    pos->res_tail,
882                                    creply);
883       GNUNET_free (creply);
884     }    
885   GNUNET_SERVER_client_drop (pos->client);
886   GNUNET_free (pos);
887   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
888                                          &remove_client_from_last_client_replies,
889                                          client);
890 }
891
892
893 /**
894  * Iterator to free peer entries.
895  *
896  * @param cls closure, unused
897  * @param key current key code
898  * @param value value in the hash map (peer entry)
899  * @return GNUNET_YES (we should continue to iterate)
900  */
901 static int 
902 clean_peer (void *cls,
903             const GNUNET_HashCode * key,
904             void *value)
905 {
906   peer_disconnect_handler (NULL, (const struct GNUNET_PeerIdentity*) key);
907   return GNUNET_YES;
908 }
909
910
911 /**
912  * Task run during shutdown.
913  *
914  * @param cls unused
915  * @param tc unused
916  */
917 static void
918 shutdown_task (void *cls,
919                const struct GNUNET_SCHEDULER_TaskContext *tc)
920 {
921   while (client_list != NULL)
922     handle_client_disconnect (NULL,
923                               client_list->client);
924   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
925                                          &clean_peer,
926                                          NULL);
927   GNUNET_break (0 == GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap));
928   GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
929   requests_by_expiration_heap = 0;
930   GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
931   connected_peers = NULL;
932   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map));
933   GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
934   query_request_map = NULL;
935   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map));
936   GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
937   peer_request_map = NULL;
938   GNUNET_assert (NULL != core);
939   GNUNET_CORE_disconnect (core);
940   core = NULL;
941   sched = NULL;
942   cfg = NULL;  
943 }
944
945
946 /* ******************* Utility functions  ******************** */
947
948
949 /**
950  * Transmit the given message by copying it to the target buffer
951  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
952  * for writing in the meantime.  In that case, do nothing
953  * (the disconnect or shutdown handler will take care of the rest).
954  * If we were able to transmit messages and there are still more
955  * pending, ask core again for further calls to this function.
956  *
957  * @param cls closure, pointer to the 'struct ConnectedPeer*'
958  * @param size number of bytes available in buf
959  * @param buf where the callee should write the message
960  * @return number of bytes written to buf
961  */
962 static size_t
963 transmit_to_peer (void *cls,
964                   size_t size, void *buf)
965 {
966   struct ConnectedPeer *cp = cls;
967   char *cbuf = buf;
968   struct GNUNET_PeerIdentity pid;
969   struct PendingMessage *pm;
970   size_t msize;
971   
972   cp->cth = NULL;
973   if (NULL == buf)
974     {
975 #if DEBUG_FS
976       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
977                   "Dropping message, core too busy.\n");
978 #endif
979       return 0;
980     }
981   msize = 0;
982   while ( (NULL != (pm = cp->pending_messages_head) ) &&
983           (pm->msize <= size) )
984     {
985       memcpy (&cbuf[msize], &pm[1], pm->msize);
986       msize += pm->msize;
987       size -= pm->msize;
988       destroy_pending_message (pm, cp->pid);
989     }
990   if (NULL != pm)
991     {
992       GNUNET_PEER_resolve (cp->pid,
993                            &pid);
994       cp->cth = GNUNET_CORE_notify_transmit_ready (core,
995                                                    pm->priority,
996                                                    GNUNET_CONSTANTS_SERVICE_TIMEOUT,
997                                                    &pid,
998                                                    pm->msize,
999                                                    &transmit_to_peer,
1000                                                    pm);
1001     }
1002 #if DEBUG_FS > 2
1003   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1004               "Transmitting %u bytes to peer %u.\n",
1005               msize,
1006               cp->pid);
1007 #endif
1008   return msize;
1009 }
1010
1011
1012 /**
1013  * Add a message to the set of pending messages for the given peer.
1014  *
1015  * @param cp peer to send message to
1016  * @param pm message to queue
1017  * @param pr request on which behalf this message is being queued
1018  */
1019 static void
1020 add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
1021                                   struct PendingMessage *pm,
1022                                   struct PendingRequest *pr)
1023 {
1024   struct PendingMessage *pos;
1025   struct PendingMessageList *pml;
1026   struct GNUNET_PeerIdentity pid;
1027
1028   GNUNET_assert (pm->next == NULL);
1029   GNUNET_assert (pm->pml == NULL);    
1030   pml = GNUNET_malloc (sizeof (struct PendingMessageList));
1031   pml->req = pr;
1032   pml->target = cp;
1033   pml->pm = pm;
1034   pm->pml = pml;  
1035   GNUNET_CONTAINER_DLL_insert (pr->pending_head,
1036                                pr->pending_tail,
1037                                pml);
1038   pos = cp->pending_messages_head;
1039   while ( (pos != NULL) &&
1040           (pm->priority < pos->priority) )
1041     pos = pos->next;    
1042   GNUNET_CONTAINER_DLL_insert_after (cp->pending_messages_head,
1043                                      cp->pending_messages_tail,
1044                                      pos,
1045                                      pm);
1046   cp->pending_requests++;
1047   if (cp->pending_requests > MAX_QUEUE_PER_PEER)
1048     destroy_pending_message (cp->pending_messages_tail, 0);  
1049   if (cp->cth == NULL)
1050     {
1051       /* need to schedule transmission */
1052       GNUNET_PEER_resolve (cp->pid, &pid);
1053       cp->cth = GNUNET_CORE_notify_transmit_ready (core,
1054                                                    cp->pending_messages_head->priority,
1055                                                    MAX_TRANSMIT_DELAY,
1056                                                    &pid,
1057                                                    cp->pending_messages_head->msize,
1058                                                    &transmit_to_peer,
1059                                                    cp);
1060     }
1061   if (cp->cth == NULL)
1062     {
1063 #if DEBUG_FS
1064       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1065                   "Failed to schedule transmission with core!\n");
1066 #endif
1067       /* FIXME: call stats (rare, bad case) */
1068     }
1069 }
1070
1071
1072 /**
1073  * Mingle hash with the mingle_number to produce different bits.
1074  */
1075 static void
1076 mingle_hash (const GNUNET_HashCode * in,
1077              int32_t mingle_number, 
1078              GNUNET_HashCode * hc)
1079 {
1080   GNUNET_HashCode m;
1081
1082   GNUNET_CRYPTO_hash (&mingle_number, 
1083                       sizeof (int32_t), 
1084                       &m);
1085   GNUNET_CRYPTO_hash_xor (&m, in, hc);
1086 }
1087
1088
1089 /**
1090  * Test if the load on this peer is too high
1091  * to even consider processing the query at
1092  * all.
1093  * 
1094  * @return GNUNET_YES if the load is too high, GNUNET_NO otherwise
1095  */
1096 static int
1097 test_load_too_high ()
1098 {
1099   return GNUNET_NO; // FIXME
1100 }
1101
1102
1103 /* ******************* Pending Request Refresh Task ******************** */
1104
1105
1106
1107 /**
1108  * We use a random delay to make the timing of requests less
1109  * predictable.  This function returns such a random delay.
1110  *
1111  * FIXME: make schedule dependent on the specifics of the request?
1112  * Or bandwidth and number of connected peers and load?
1113  *
1114  * @return random delay to use for some request, between 0 and TTL_DECREMENT ms
1115  */
1116 static struct GNUNET_TIME_Relative
1117 get_processing_delay ()
1118 {
1119   return GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
1120                                         GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1121                                                                   TTL_DECREMENT));
1122 }
1123
1124
1125 /**
1126  * We're processing a GET request from another peer and have decided
1127  * to forward it to other peers.  This function is called periodically
1128  * and should forward the request to other peers until we have all
1129  * possible replies.  If we have transmitted the *only* reply to
1130  * the initiator we should destroy the pending request.  If we have
1131  * many replies in the queue to the initiator, we should delay sending
1132  * out more queries until the reply queue has shrunk some.
1133  *
1134  * @param cls our "struct ProcessGetContext *"
1135  * @param tc unused
1136  */
1137 static void
1138 forward_request_task (void *cls,
1139                       const struct GNUNET_SCHEDULER_TaskContext *tc);
1140
1141
1142 /**
1143  * Function called after we either failed or succeeded
1144  * at transmitting a query to a peer.  
1145  *
1146  * @param cls the requests "struct PendingRequest*"
1147  * @param tpid ID of receiving peer, 0 on transmission error
1148  */
1149 static void
1150 transmit_query_continuation (void *cls,
1151                              GNUNET_PEER_Id tpid)
1152 {
1153   struct PendingRequest *pr = cls;
1154
1155   if (tpid == 0)   
1156     {
1157       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1158                   "Transmission of request failed, will try again later.\n");
1159       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1160                                                get_processing_delay (),
1161                                                &forward_request_task,
1162                                                pr); 
1163       return;    
1164     }
1165   GNUNET_PEER_change_rc (tpid, 1);
1166   if (pr->used_pids_off == pr->used_pids_size)
1167     GNUNET_array_grow (pr->used_pids,
1168                        pr->used_pids_size,
1169                        pr->used_pids_size * 2 + 2);
1170   pr->used_pids[pr->used_pids_off++] = tpid;
1171   pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1172                                            get_processing_delay (),
1173                                            &forward_request_task,
1174                                            pr);
1175 }
1176
1177
1178 /**
1179  * How many bytes should a bloomfilter be if we have already seen
1180  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
1181  * of bits set per entry.  Furthermore, we should not re-size the
1182  * filter too often (to keep it cheap).
1183  *
1184  * Since other peers will also add entries but not resize the filter,
1185  * we should generally pick a slightly larger size than what the
1186  * strict math would suggest.
1187  *
1188  * @return must be a power of two and smaller or equal to 2^15.
1189  */
1190 static size_t
1191 compute_bloomfilter_size (unsigned int entry_count)
1192 {
1193   size_t size;
1194   unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
1195   uint16_t max = 1 << 15;
1196
1197   if (entry_count > max)
1198     return max;
1199   size = 8;
1200   while ((size < max) && (size < ideal))
1201     size *= 2;
1202   if (size > max)
1203     return max;
1204   return size;
1205 }
1206
1207
1208 /**
1209  * Recalculate our bloom filter for filtering replies.
1210  *
1211  * @param count number of entries we are filtering right now
1212  * @param mingle set to our new mingling value
1213  * @param bf_size set to the size of the bloomfilter
1214  * @param entries the entries to filter
1215  * @return updated bloomfilter, NULL for none
1216  */
1217 static struct GNUNET_CONTAINER_BloomFilter *
1218 refresh_bloomfilter (unsigned int count,
1219                      int32_t * mingle,
1220                      size_t *bf_size,
1221                      const GNUNET_HashCode *entries)
1222 {
1223   struct GNUNET_CONTAINER_BloomFilter *bf;
1224   size_t nsize;
1225   unsigned int i;
1226   GNUNET_HashCode mhash;
1227
1228   if (0 == count)
1229     return NULL;
1230   nsize = compute_bloomfilter_size (count);
1231   *mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
1232   *bf_size = nsize;
1233   bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
1234                                           nsize,
1235                                           BLOOMFILTER_K);
1236   for (i=0;i<count;i++)
1237     {
1238       mingle_hash (&entries[i], *mingle, &mhash);
1239       GNUNET_CONTAINER_bloomfilter_add (bf, &mhash);
1240     }
1241   return bf;
1242 }
1243
1244
1245 /**
1246  * Function called after we've tried to reserve a certain amount of
1247  * bandwidth for a reply.  Check if we succeeded and if so send our
1248  * query.
1249  *
1250  * @param cls the requests "struct PendingRequest*"
1251  * @param peer identifies the peer
1252  * @param bpm_in set to the current bandwidth limit (receiving) for this peer
1253  * @param bpm_out set to the current bandwidth limit (sending) for this peer
1254  * @param amount set to the amount that was actually reserved or unreserved
1255  * @param preference current traffic preference for the given peer
1256  */
1257 static void
1258 target_reservation_cb (void *cls,
1259                        const struct
1260                        GNUNET_PeerIdentity * peer,
1261                        struct GNUNET_BANDWIDTH_Value32NBO bpm_in,
1262                        struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
1263                        int amount,
1264                        uint64_t preference)
1265 {
1266   struct PendingRequest *pr = cls;
1267   struct ConnectedPeer *cp;
1268   struct PendingMessage *pm;
1269   struct GetMessage *gm;
1270   GNUNET_HashCode *ext;
1271   char *bfdata;
1272   size_t msize;
1273   unsigned int k;
1274   int no_route;
1275   uint32_t bm;
1276
1277   pr->irc = NULL;
1278   GNUNET_assert (peer != NULL);
1279   // (3) transmit, update ttl/priority
1280   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1281                                           &peer->hashPubKey);
1282   if (cp == NULL)
1283     {
1284       /* Peer must have just left */
1285 #if DEBUG_FS
1286       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1287                   "Selected peer disconnected!\n");
1288 #endif
1289       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1290                                                get_processing_delay (),
1291                                                &forward_request_task,
1292                                                pr);
1293       return;
1294     }
1295   no_route = GNUNET_NO;
1296   /* FIXME: check against DBLOCK_SIZE and possibly return
1297      amount to reserve; however, this also needs to work
1298      with testcases which currently start out with a far
1299      too low per-peer bw limit, so they would never send
1300      anything.  Big issue. */
1301   if (amount == 0)
1302     {
1303       if (pr->cp == NULL)
1304         {
1305 #if DEBUG_FS > 1
1306           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1307                       "Failed to reserve bandwidth for reply (got %d/%u bytes only)!\n",
1308                       amount,
1309                       DBLOCK_SIZE);
1310 #endif
1311           pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1312                                                    get_processing_delay (),
1313                                                    &forward_request_task,
1314                                                    pr);
1315           return;  /* this target round failed */
1316         }
1317       /* FIXME: if we are "quite" busy, we may still want to skip
1318          this round; need more load detection code! */
1319       no_route = GNUNET_YES;
1320     }
1321   
1322   /* build message and insert message into priority queue */
1323 #if DEBUG_FS
1324   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1325               "Forwarding request `%s' to `%4s'!\n",
1326               GNUNET_h2s (&pr->query),
1327               GNUNET_i2s (peer));
1328 #endif
1329   k = 0;
1330   bm = 0;
1331   if (GNUNET_YES == no_route)
1332     {
1333       bm |= GET_MESSAGE_BIT_RETURN_TO;
1334       k++;      
1335     }
1336   if (pr->namespace != NULL)
1337     {
1338       bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
1339       k++;
1340     }
1341   if (pr->target_pid != 0)
1342     {
1343       bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
1344       k++;
1345     }
1346   msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
1347   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1348   pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
1349   pm->msize = msize;
1350   gm = (struct GetMessage*) &pm[1];
1351   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
1352   gm->header.size = htons (msize);
1353   gm->type = htonl (pr->type);
1354   pr->remaining_priority /= 2;
1355   gm->priority = htonl (pr->remaining_priority);
1356   gm->ttl = htonl (pr->ttl);
1357   gm->filter_mutator = htonl(pr->mingle); 
1358   gm->hash_bitmap = htonl (bm);
1359   gm->query = pr->query;
1360   ext = (GNUNET_HashCode*) &gm[1];
1361   k = 0;
1362   if (GNUNET_YES == no_route)
1363     GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
1364   if (pr->namespace != NULL)
1365     memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
1366   if (pr->target_pid != 0)
1367     GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
1368   bfdata = (char *) &ext[k];
1369   if (pr->bf != NULL)
1370     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
1371                                                bfdata,
1372                                                pr->bf_size);
1373   pm->cont = &transmit_query_continuation;
1374   pm->cont_cls = pr;
1375   add_to_pending_messages_for_peer (cp, pm, pr);
1376 }
1377
1378
1379 /**
1380  * Closure used for "target_peer_select_cb".
1381  */
1382 struct PeerSelectionContext 
1383 {
1384   /**
1385    * The request for which we are selecting
1386    * peers.
1387    */
1388   struct PendingRequest *pr;
1389
1390   /**
1391    * Current "prime" target.
1392    */
1393   struct GNUNET_PeerIdentity target;
1394
1395   /**
1396    * How much do we like this target?
1397    */
1398   double target_score;
1399
1400 };
1401
1402
1403 /**
1404  * Function called for each connected peer to determine
1405  * which one(s) would make good targets for forwarding.
1406  *
1407  * @param cls closure (struct PeerSelectionContext)
1408  * @param key current key code (peer identity)
1409  * @param value value in the hash map (struct ConnectedPeer)
1410  * @return GNUNET_YES if we should continue to
1411  *         iterate,
1412  *         GNUNET_NO if not.
1413  */
1414 static int
1415 target_peer_select_cb (void *cls,
1416                        const GNUNET_HashCode * key,
1417                        void *value)
1418 {
1419   struct PeerSelectionContext *psc = cls;
1420   struct ConnectedPeer *cp = value;
1421   struct PendingRequest *pr = psc->pr;
1422   double score;
1423   unsigned int i;
1424   
1425   /* 1) check if we have already (recently) forwarded to this peer */
1426   for (i=0;i<pr->used_pids_off;i++)
1427     if (pr->used_pids[i] == cp->pid)
1428       return GNUNET_YES; /* skip */
1429   // 2) calculate how much we'd like to forward to this peer
1430   score = 42; // FIXME!
1431   // FIXME: also need API to gather data on responsiveness
1432   // of this peer (we have fields for that in 'cp', but
1433   // they are never set!)
1434   
1435   /* store best-fit in closure */
1436   if (score > psc->target_score)
1437     {
1438       psc->target_score = score;
1439       psc->target.hashPubKey = *key; 
1440     }
1441   return GNUNET_YES;
1442 }
1443   
1444
1445 /**
1446  * We're processing a GET request from another peer and have decided
1447  * to forward it to other peers.  This function is called periodically
1448  * and should forward the request to other peers until we have all
1449  * possible replies.  If we have transmitted the *only* reply to
1450  * the initiator we should destroy the pending request.  If we have
1451  * many replies in the queue to the initiator, we should delay sending
1452  * out more queries until the reply queue has shrunk some.
1453  *
1454  * @param cls our "struct ProcessGetContext *"
1455  * @param tc unused
1456  */
1457 static void
1458 forward_request_task (void *cls,
1459                      const struct GNUNET_SCHEDULER_TaskContext *tc)
1460 {
1461   struct PendingRequest *pr = cls;
1462   struct PeerSelectionContext psc;
1463   struct ConnectedPeer *cp; 
1464
1465   pr->task = GNUNET_SCHEDULER_NO_TASK;
1466   GNUNET_assert (pr->irc == NULL);
1467   /* (1) select target */
1468   psc.pr = pr;
1469   psc.target_score = DBL_MIN;
1470   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1471                                          &target_peer_select_cb,
1472                                          &psc);  
1473   if (psc.target_score == DBL_MIN)
1474     {
1475 #if DEBUG_FS
1476       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1477                   "No peer selected for forwarding!\n");
1478 #endif
1479       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1480                                                get_processing_delay (),
1481                                                &forward_request_task,
1482                                                pr);
1483       return; /* nobody selected */
1484     }
1485
1486   /* (2) reserve reply bandwidth */
1487   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1488                                           &psc.target.hashPubKey);
1489   pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
1490                                                 &psc.target,
1491                                                 GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
1492                                                 GNUNET_BANDWIDTH_value_init ((uint32_t) -1 /* no limit */), 
1493                                                 DBLOCK_SIZE, 
1494                                                 (uint64_t) cp->inc_preference,
1495                                                 &target_reservation_cb,
1496                                                 pr);
1497   cp->inc_preference = 0.0;
1498 }
1499
1500
1501 /* **************************** P2P PUT Handling ************************ */
1502
1503
1504 /**
1505  * Function called after we either failed or succeeded
1506  * at transmitting a reply to a peer.  
1507  *
1508  * @param cls the requests "struct PendingRequest*"
1509  * @param tpid ID of receiving peer, 0 on transmission error
1510  */
1511 static void
1512 transmit_reply_continuation (void *cls,
1513                              GNUNET_PEER_Id tpid)
1514 {
1515   struct PendingRequest *pr = cls;
1516   
1517   switch (pr->type)
1518     {
1519     case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
1520     case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
1521       /* only one reply expected, done with the request! */
1522       destroy_pending_request (pr);
1523       break;
1524     case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
1525     case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
1526       break;
1527     default:
1528       GNUNET_break (0);
1529       break;
1530     }
1531 }
1532
1533
1534 /**
1535  * Check if the given KBlock is well-formed.
1536  *
1537  * @param kb the kblock data (or at least "dsize" bytes claiming to be one)
1538  * @param dsize size of "kb" in bytes; check for < sizeof(struct KBlock)!
1539  * @param query where to store the query that this block answers
1540  * @return GNUNET_OK if this is actually a well-formed KBlock
1541  */
1542 static int
1543 check_kblock (const struct KBlock *kb,
1544               size_t dsize,
1545               GNUNET_HashCode *query)
1546 {
1547   if (dsize < sizeof (struct KBlock))
1548     {
1549       GNUNET_break_op (0);
1550       return GNUNET_SYSERR;
1551     }
1552   if (dsize - sizeof (struct KBlock) !=
1553       ntohs (kb->purpose.size) 
1554       - sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) 
1555       - sizeof(struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) ) 
1556     {
1557       GNUNET_break_op (0);
1558       return GNUNET_SYSERR;
1559     }
1560   if (GNUNET_OK !=
1561       GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_KBLOCK,
1562                                 &kb->purpose,
1563                                 &kb->signature,
1564                                 &kb->keyspace)) 
1565     {
1566       GNUNET_break_op (0);
1567       return GNUNET_SYSERR;
1568     }
1569   if (query != NULL)
1570     GNUNET_CRYPTO_hash (&kb->keyspace,
1571                         sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
1572                         query);
1573   return GNUNET_OK;
1574 }
1575
1576
1577 /**
1578  * Check if the given SBlock is well-formed.
1579  *
1580  * @param sb the sblock data (or at least "dsize" bytes claiming to be one)
1581  * @param dsize size of "kb" in bytes; check for < sizeof(struct SBlock)!
1582  * @param query where to store the query that this block answers
1583  * @param namespace where to store the namespace that this block belongs to
1584  * @return GNUNET_OK if this is actually a well-formed SBlock
1585  */
1586 static int
1587 check_sblock (const struct SBlock *sb,
1588               size_t dsize,
1589               GNUNET_HashCode *query,   
1590               GNUNET_HashCode *namespace)
1591 {
1592   if (dsize < sizeof (struct SBlock))
1593     {
1594       GNUNET_break_op (0);
1595       return GNUNET_SYSERR;
1596     }
1597   if (dsize !=
1598       ntohs (sb->purpose.size) + sizeof (struct GNUNET_CRYPTO_RsaSignature))
1599     {
1600       GNUNET_break_op (0);
1601       return GNUNET_SYSERR;
1602     }
1603   if (GNUNET_OK !=
1604       GNUNET_CRYPTO_rsa_verify (GNUNET_SIGNATURE_PURPOSE_FS_SBLOCK,
1605                                 &sb->purpose,
1606                                 &sb->signature,
1607                                 &sb->subspace)) 
1608     {
1609       GNUNET_break_op (0);
1610       return GNUNET_SYSERR;
1611     }
1612   if (query != NULL)
1613     *query = sb->identifier;
1614   if (namespace != NULL)
1615     GNUNET_CRYPTO_hash (&sb->subspace,
1616                         sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
1617                         namespace);
1618   return GNUNET_OK;
1619 }
1620
1621
1622 /**
1623  * Transmit the given message by copying it to the target buffer
1624  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
1625  * for writing in the meantime.  In that case, do nothing
1626  * (the disconnect or shutdown handler will take care of the rest).
1627  * If we were able to transmit messages and there are still more
1628  * pending, ask core again for further calls to this function.
1629  *
1630  * @param cls closure, pointer to the 'struct ClientList*'
1631  * @param size number of bytes available in buf
1632  * @param buf where the callee should write the message
1633  * @return number of bytes written to buf
1634  */
1635 static size_t
1636 transmit_to_client (void *cls,
1637                   size_t size, void *buf)
1638 {
1639   struct ClientList *cl = cls;
1640   char *cbuf = buf;
1641   struct ClientResponseMessage *creply;
1642   size_t msize;
1643   
1644   cl->th = NULL;
1645   if (NULL == buf)
1646     {
1647 #if DEBUG_FS
1648       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1649                   "Not sending reply, client communication problem.\n");
1650 #endif
1651       return 0;
1652     }
1653   msize = 0;
1654   while ( (NULL != (creply = cl->res_head) ) &&
1655           (creply->msize <= size) )
1656     {
1657       memcpy (&cbuf[msize], &creply[1], creply->msize);
1658       msize += creply->msize;
1659       size -= creply->msize;
1660       GNUNET_CONTAINER_DLL_remove (cl->res_head,
1661                                    cl->res_tail,
1662                                    creply);
1663       GNUNET_free (creply);
1664     }
1665   if (NULL != creply)
1666     cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
1667                                                   creply->msize,
1668                                                   GNUNET_TIME_UNIT_FOREVER_REL,
1669                                                   &transmit_to_client,
1670                                                   cl);
1671 #if DEBUG_FS
1672   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1673               "Transmitted %u bytes to client\n",
1674               (unsigned int) msize);
1675 #endif
1676   return msize;
1677 }
1678
1679
1680 /**
1681  * Closure for "process_reply" function.
1682  */
1683 struct ProcessReplyClosure
1684 {
1685   /**
1686    * The data for the reply.
1687    */
1688   const void *data;
1689
1690   // FIXME: add 'struct ConnectedPeer' to track 'last_xxx_replies' here!
1691
1692   /**
1693    * When the reply expires.
1694    */
1695   struct GNUNET_TIME_Absolute expiration;
1696
1697   /**
1698    * Size of data.
1699    */
1700   size_t size;
1701
1702   /**
1703    * Namespace that this reply belongs to
1704    * (if it is of type SBLOCK).
1705    */
1706   GNUNET_HashCode namespace;
1707
1708   /**
1709    * Type of the block.
1710    */
1711   uint32_t type;
1712
1713   /**
1714    * How much was this reply worth to us?
1715    */
1716   uint32_t priority;
1717 };
1718
1719
1720 /**
1721  * We have received a reply; handle it!
1722  *
1723  * @param cls response (struct ProcessReplyClosure)
1724  * @param key our query
1725  * @param value value in the hash map (info about the query)
1726  * @return GNUNET_YES (we should continue to iterate)
1727  */
1728 static int
1729 process_reply (void *cls,
1730                const GNUNET_HashCode * key,
1731                void *value)
1732 {
1733   struct ProcessReplyClosure *prq = cls;
1734   struct PendingRequest *pr = value;
1735   struct PendingMessage *reply;
1736   struct ClientResponseMessage *creply;
1737   struct ClientList *cl;
1738   struct PutMessage *pm;
1739   struct ConnectedPeer *cp;
1740   GNUNET_HashCode chash;
1741   GNUNET_HashCode mhash;
1742   size_t msize;
1743   int do_remove;
1744
1745 #if DEBUG_FS
1746   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1747               "Matched result (type %u) for query `%s' with pending request\n",
1748               (unsigned int) prq->type,
1749               GNUNET_h2s (key));
1750 #endif  
1751   do_remove = GNUNET_NO;
1752   GNUNET_CRYPTO_hash (prq->data,
1753                       prq->size,
1754                       &chash);
1755   switch (prq->type)
1756     {
1757     case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
1758     case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
1759       /* only possible reply, stop requesting! */
1760       while (NULL != pr->pending_head)
1761         destroy_pending_message_list_entry (pr->pending_head);
1762       if (pr->drq != NULL)
1763         {
1764           if (pr->client_request_list != NULL)
1765             GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
1766                                         GNUNET_YES);
1767           GNUNET_FS_drq_get_cancel (pr->drq);
1768           pr->drq = NULL;
1769         }
1770       do_remove = GNUNET_YES;
1771       break;
1772     case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
1773       if (0 != memcmp (pr->namespace,
1774                        &prq->namespace,
1775                        sizeof (GNUNET_HashCode)))
1776         return GNUNET_YES; /* wrong namespace */        
1777       /* then: fall-through! */
1778     case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
1779       if (pr->bf != NULL) 
1780         {
1781           mingle_hash (&chash, pr->mingle, &mhash);
1782           if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (pr->bf,
1783                                                                &mhash))
1784             return GNUNET_YES; /* duplicate */
1785           GNUNET_CONTAINER_bloomfilter_add (pr->bf,
1786                                             &mhash);
1787         }
1788       if (pr->client_request_list != NULL)
1789         {
1790           if (pr->replies_seen_size == pr->replies_seen_off)
1791             {
1792               GNUNET_array_grow (pr->replies_seen,
1793                                  pr->replies_seen_size,
1794                                  pr->replies_seen_size * 2 + 4);
1795               if (pr->bf != NULL)
1796                 GNUNET_CONTAINER_bloomfilter_free (pr->bf);
1797               pr->bf = refresh_bloomfilter (pr->replies_seen_off,
1798                                             &pr->mingle,
1799                                             &pr->bf_size,
1800                                             pr->replies_seen);
1801             }
1802             pr->replies_seen[pr->replies_seen_off++] = chash;
1803               
1804         }
1805       break;
1806     case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK:
1807       // FIXME: any checks against duplicates for SKBlocks?
1808       break;
1809     default:
1810       GNUNET_break (0);
1811       return GNUNET_YES;
1812     }
1813   prq->priority += pr->remaining_priority;
1814   pr->remaining_priority = 0;
1815   if (pr->client_request_list != NULL)
1816     {
1817 #if DEBUG_FS
1818       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1819                   "Transmitting result for query `%s' to local client\n",
1820                   GNUNET_h2s (key));
1821 #endif  
1822       cl = pr->client_request_list->client_list;
1823       msize = sizeof (struct PutMessage) + prq->size;
1824       creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage));
1825       creply->msize = msize;
1826       creply->client_list = cl;
1827       GNUNET_CONTAINER_DLL_insert_after (cl->res_head,
1828                                          cl->res_tail,
1829                                          cl->res_tail,
1830                                          creply);      
1831       pm = (struct PutMessage*) &creply[1];
1832       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
1833       pm->header.size = htons (msize);
1834       pm->type = htonl (prq->type);
1835       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
1836       memcpy (&pm[1], prq->data, prq->size);      
1837       if (NULL == cl->th)
1838         cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
1839                                                       msize,
1840                                                       GNUNET_TIME_UNIT_FOREVER_REL,
1841                                                       &transmit_to_client,
1842                                                       cl);
1843       GNUNET_break (cl->th != NULL);
1844     }
1845   else
1846     {
1847       cp = pr->cp;
1848 #if DEBUG_FS
1849       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1850                   "Transmitting result for query `%s' to other peer (PID=%u)\n",
1851                   GNUNET_h2s (key),
1852                   (unsigned int) cp->pid);
1853 #endif  
1854       msize = sizeof (struct PutMessage) + prq->size;
1855       reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
1856       reply->cont = &transmit_reply_continuation;
1857       reply->cont_cls = pr;
1858       reply->msize = msize;
1859       reply->priority = (uint32_t) -1; /* send replies first! */
1860       pm = (struct PutMessage*) &reply[1];
1861       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
1862       pm->header.size = htons (msize);
1863       pm->type = htonl (prq->type);
1864       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
1865       memcpy (&pm[1], prq->data, prq->size);
1866       add_to_pending_messages_for_peer (cp, reply, pr);
1867     }
1868   if (GNUNET_YES == do_remove)
1869     {
1870       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1871                   "Removing request `%s' from request map (has been satisfied)\n",
1872                   GNUNET_h2s (key));
1873       GNUNET_break (GNUNET_YES ==
1874                     GNUNET_CONTAINER_multihashmap_remove (query_request_map,
1875                                                           key,
1876                                                           pr));
1877       // FIXME: request somehow does not fully
1878       // disappear; how to fix? 
1879       // destroy_pending_request (pr); (not like this!)
1880     }
1881
1882   // FIXME: implement hot-path routing statistics keeping!
1883   return GNUNET_YES;
1884 }
1885
1886
1887 /**
1888  * Handle P2P "PUT" message.
1889  *
1890  * @param cls closure, always NULL
1891  * @param other the other peer involved (sender or receiver, NULL
1892  *        for loopback messages where we are both sender and receiver)
1893  * @param message the actual message
1894  * @param latency reported latency of the connection with 'other'
1895  * @param distance reported distance (DV) to 'other' 
1896  * @return GNUNET_OK to keep the connection open,
1897  *         GNUNET_SYSERR to close it (signal serious error)
1898  */
1899 static int
1900 handle_p2p_put (void *cls,
1901                 const struct GNUNET_PeerIdentity *other,
1902                 const struct GNUNET_MessageHeader *message,
1903                 struct GNUNET_TIME_Relative latency,
1904                 uint32_t distance)
1905 {
1906   const struct PutMessage *put;
1907   uint16_t msize;
1908   size_t dsize;
1909   uint32_t type;
1910   struct GNUNET_TIME_Absolute expiration;
1911   GNUNET_HashCode query;
1912   struct ProcessReplyClosure prq;
1913
1914   msize = ntohs (message->size);
1915   if (msize < sizeof (struct PutMessage))
1916     {
1917       GNUNET_break_op(0);
1918       return GNUNET_SYSERR;
1919     }
1920   put = (const struct PutMessage*) message;
1921   dsize = msize - sizeof (struct PutMessage);
1922   type = ntohl (put->type);
1923   expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
1924
1925   /* first, validate! */
1926   switch (type)
1927     {
1928     case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
1929     case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
1930       GNUNET_CRYPTO_hash (&put[1], dsize, &query);
1931       break;
1932     case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
1933       if (GNUNET_OK !=
1934           check_kblock ((const struct KBlock*) &put[1],
1935                         dsize,
1936                         &query))
1937         return GNUNET_SYSERR;
1938       break;
1939     case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
1940       if (GNUNET_OK !=
1941           check_sblock ((const struct SBlock*) &put[1],
1942                         dsize,
1943                         &query,
1944                         &prq.namespace))
1945         return GNUNET_SYSERR;
1946       break;
1947     case GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK:
1948       // FIXME -- validate SKBLOCK!
1949       GNUNET_break (0);
1950       return GNUNET_OK;
1951     default:
1952       /* unknown block type */
1953       GNUNET_break_op (0);
1954       return GNUNET_SYSERR;
1955     }
1956
1957 #if DEBUG_FS
1958   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1959               "Received result for query `%s' from peer `%4s'\n",
1960               GNUNET_h2s (&query),
1961               GNUNET_i2s (other));
1962 #endif
1963   /* now, lookup 'query' */
1964   prq.data = (const void*) &put[1];
1965   prq.size = dsize;
1966   prq.type = type;
1967   prq.expiration = expiration;
1968   prq.priority = 0;
1969   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
1970                                               &query,
1971                                               &process_reply,
1972                                               &prq);
1973   // FIXME: if migration is on and load is low,
1974   // queue to store data in datastore;
1975   // use "prq.priority" for that!
1976   return GNUNET_OK;
1977 }
1978
1979
1980 /* **************************** P2P GET Handling ************************ */
1981
1982
1983 /**
1984  * Closure for 'check_duplicate_request_{peer,client}'.
1985  */
1986 struct CheckDuplicateRequestClosure
1987 {
1988   /**
1989    * The new request we should check if it already exists.
1990    */
1991   const struct PendingRequest *pr;
1992
1993   /**
1994    * Existing request found by the checker, NULL if none.
1995    */
1996   struct PendingRequest *have;
1997 };
1998
1999
2000 /**
2001  * Iterator over entries in the 'query_request_map' that
2002  * tries to see if we have the same request pending from
2003  * the same client already.
2004  *
2005  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
2006  * @param key current key code (query, ignored, must match)
2007  * @param value value in the hash map (a 'struct PendingRequest' 
2008  *              that already exists)
2009  * @return GNUNET_YES if we should continue to
2010  *         iterate (no match yet)
2011  *         GNUNET_NO if not (match found).
2012  */
2013 static int
2014 check_duplicate_request_client (void *cls,
2015                                 const GNUNET_HashCode * key,
2016                                 void *value)
2017 {
2018   struct CheckDuplicateRequestClosure *cdc = cls;
2019   struct PendingRequest *have = value;
2020
2021   if (have->client_request_list == NULL)
2022     return GNUNET_YES;
2023   if ( (cdc->pr->client_request_list->client_list->client == have->client_request_list->client_list->client) &&
2024        (cdc->pr != have) )
2025     {
2026       cdc->have = have;
2027       return GNUNET_NO;
2028     }
2029   return GNUNET_YES;
2030 }
2031
2032
2033 /**
2034  * We're processing (local) results for a search request
2035  * from another peer.  Pass applicable results to the
2036  * peer and if we are done either clean up (operation
2037  * complete) or forward to other peers (more results possible).
2038  *
2039  * @param cls our closure (struct LocalGetContext)
2040  * @param key key for the content
2041  * @param size number of bytes in data
2042  * @param data content stored
2043  * @param type type of the content
2044  * @param priority priority of the content
2045  * @param anonymity anonymity-level for the content
2046  * @param expiration expiration time for the content
2047  * @param uid unique identifier for the datum;
2048  *        maybe 0 if no unique identifier is available
2049  */
2050 static void
2051 process_local_reply (void *cls,
2052                      const GNUNET_HashCode * key,
2053                      uint32_t size,
2054                      const void *data,
2055                      uint32_t type,
2056                      uint32_t priority,
2057                      uint32_t anonymity,
2058                      struct GNUNET_TIME_Absolute
2059                      expiration, 
2060                      uint64_t uid)
2061 {
2062   struct PendingRequest *pr = cls;
2063   struct ProcessReplyClosure prq;
2064   struct CheckDuplicateRequestClosure cdrc;
2065   GNUNET_HashCode dhash;
2066   GNUNET_HashCode mhash;
2067   GNUNET_HashCode query;
2068   
2069   if (NULL == key)
2070     {
2071 #if DEBUG_FS > 1
2072       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2073                   "Done processing local replies, forwarding request to other peers.\n");
2074 #endif
2075       pr->drq = NULL;
2076       if (pr->client_request_list != NULL)
2077         {
2078           GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
2079                                       GNUNET_YES);
2080           /* Figure out if this is a duplicate request and possibly
2081              merge 'struct PendingRequest' entries */
2082           cdrc.have = NULL;
2083           cdrc.pr = pr;
2084           GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
2085                                                       &pr->query,
2086                                                       &check_duplicate_request_client,
2087                                                       &cdrc);
2088           if (cdrc.have != NULL)
2089             {
2090 #if DEBUG_FS
2091               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2092                           "Received request for block `%s' twice from client, will only request once.\n",
2093                           GNUNET_h2s (&pr->query));
2094 #endif
2095               
2096               destroy_pending_request (pr);
2097               return;
2098             }
2099         }
2100
2101       /* no more results */
2102       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2103         pr->task = GNUNET_SCHEDULER_add_now (sched,
2104                                              &forward_request_task,
2105                                              pr);      
2106       return;
2107     }
2108   if (type == GNUNET_DATASTORE_BLOCKTYPE_ONDEMAND)
2109     {
2110 #if DEBUG_FS
2111       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2112                   "Found ONDEMAND block, performing on-demand encoding\n");
2113 #endif
2114       if (GNUNET_OK != 
2115           GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, 
2116                                             anonymity, expiration, uid, 
2117                                             &process_local_reply,
2118                                             pr))
2119         GNUNET_FS_drq_get_next (GNUNET_YES);
2120       return;
2121     }
2122   /* check for duplicates */
2123   GNUNET_CRYPTO_hash (data, size, &dhash);
2124   mingle_hash (&dhash, 
2125                pr->mingle,
2126                &mhash);
2127   if ( (pr->bf != NULL) &&
2128        (GNUNET_YES ==
2129         GNUNET_CONTAINER_bloomfilter_test (pr->bf,
2130                                            &mhash)) )
2131     {      
2132 #if DEBUG_FS
2133       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2134                   "Result from datastore filtered by bloomfilter (duplicate).\n");
2135 #endif
2136       GNUNET_FS_drq_get_next (GNUNET_YES);
2137       return;
2138     }
2139 #if DEBUG_FS
2140   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2141               "Found result for query `%s' in local datastore\n",
2142               GNUNET_h2s (key));
2143 #endif
2144   pr->results_found++;
2145   if ( (pr->type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK) ||
2146        (pr->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) ||
2147        (pr->type == GNUNET_DATASTORE_BLOCKTYPE_SKBLOCK) )
2148     {
2149       if (pr->bf == NULL)
2150         {
2151           pr->bf_size = 32;
2152           pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
2153                                                       pr->bf_size, 
2154                                                       BLOOMFILTER_K);
2155         }
2156       GNUNET_CONTAINER_bloomfilter_add (pr->bf, 
2157                                         &mhash);
2158     }
2159   memset (&prq, 0, sizeof (prq));
2160   prq.data = data;
2161   prq.expiration = expiration;
2162   prq.size = size;  
2163   if ( (type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) &&
2164        (GNUNET_OK != check_sblock ((const struct SBlock*) data,
2165                                    size,
2166                                    &query,
2167                                    &prq.namespace)) )
2168     {
2169       GNUNET_break (0);
2170       /* FIXME: consider removing the block? */
2171       GNUNET_FS_drq_get_next (GNUNET_YES);
2172       return;
2173     }
2174   prq.type = type;
2175   prq.priority = priority;  
2176   process_reply (&prq, key, pr);
2177   
2178   if ( ( (pr->client_request_list == NULL) &&
2179          ( (GNUNET_YES == test_load_too_high()) ||
2180            (pr->results_found > 5 + 2 * pr->priority) ) ) ||
2181        (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK) ) 
2182     {
2183 #if DEBUG_FS > 2
2184       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2185                   "Unique reply found or load too high, done with request\n");
2186 #endif
2187       GNUNET_FS_drq_get_next (GNUNET_NO);
2188       return;
2189     }
2190   GNUNET_FS_drq_get_next (GNUNET_YES);
2191 }
2192
2193
2194 /**
2195  * The priority level imposes a bound on the maximum
2196  * value for the ttl that can be requested.
2197  *
2198  * @param ttl_in requested ttl
2199  * @param prio given priority
2200  * @return ttl_in if ttl_in is below the limit,
2201  *         otherwise the ttl-limit for the given priority
2202  */
2203 static int32_t
2204 bound_ttl (int32_t ttl_in, uint32_t prio)
2205 {
2206   unsigned long long allowed;
2207
2208   if (ttl_in <= 0)
2209     return ttl_in;
2210   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
2211   if (ttl_in > allowed)      
2212     {
2213       if (allowed >= (1 << 30))
2214         return 1 << 30;
2215       return allowed;
2216     }
2217   return ttl_in;
2218 }
2219
2220
2221 /**
2222  * We've received a request with the specified priority.  Bound it
2223  * according to how much we trust the given peer.
2224  * 
2225  * @param prio_in requested priority
2226  * @param cp the peer making the request
2227  * @return effective priority
2228  */
2229 static uint32_t
2230 bound_priority (uint32_t prio_in,
2231                 struct ConnectedPeer *cp)
2232 {
2233   return 0; // FIXME!
2234 }
2235
2236
2237 /**
2238  * Iterator over entries in the 'query_request_map' that
2239  * tries to see if we have the same request pending from
2240  * the same peer already.
2241  *
2242  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
2243  * @param key current key code (query, ignored, must match)
2244  * @param value value in the hash map (a 'struct PendingRequest' 
2245  *              that already exists)
2246  * @return GNUNET_YES if we should continue to
2247  *         iterate (no match yet)
2248  *         GNUNET_NO if not (match found).
2249  */
2250 static int
2251 check_duplicate_request_peer (void *cls,
2252                               const GNUNET_HashCode * key,
2253                               void *value)
2254 {
2255   struct CheckDuplicateRequestClosure *cdc = cls;
2256   struct PendingRequest *have = value;
2257
2258   if (cdc->pr->target_pid == have->target_pid)
2259     {
2260       cdc->have = have;
2261       return GNUNET_NO;
2262     }
2263   return GNUNET_YES;
2264 }
2265
2266
2267 /**
2268  * Handle P2P "GET" request.
2269  *
2270  * @param cls closure, always NULL
2271  * @param other the other peer involved (sender or receiver, NULL
2272  *        for loopback messages where we are both sender and receiver)
2273  * @param message the actual message
2274  * @param latency reported latency of the connection with 'other'
2275  * @param distance reported distance (DV) to 'other' 
2276  * @return GNUNET_OK to keep the connection open,
2277  *         GNUNET_SYSERR to close it (signal serious error)
2278  */
2279 static int
2280 handle_p2p_get (void *cls,
2281                 const struct GNUNET_PeerIdentity *other,
2282                 const struct GNUNET_MessageHeader *message,
2283                 struct GNUNET_TIME_Relative latency,
2284                 uint32_t distance)
2285 {
2286   struct PendingRequest *pr;
2287   struct ConnectedPeer *cp;
2288   struct ConnectedPeer *cps;
2289   struct CheckDuplicateRequestClosure cdc;
2290   struct GNUNET_TIME_Relative timeout;
2291   uint16_t msize;
2292   const struct GetMessage *gm;
2293   unsigned int bits;
2294   const GNUNET_HashCode *opt;
2295   uint32_t bm;
2296   size_t bfsize;
2297   uint32_t ttl_decrement;
2298   uint32_t type;
2299   double preference;
2300   int have_ns;
2301
2302   msize = ntohs(message->size);
2303   if (msize < sizeof (struct GetMessage))
2304     {
2305       GNUNET_break_op (0);
2306       return GNUNET_SYSERR;
2307     }
2308   gm = (const struct GetMessage*) message;
2309   type = ntohl (gm->type);
2310   switch (type)
2311     {
2312     case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
2313     case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
2314     case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
2315     case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
2316       break;
2317     default:
2318       GNUNET_break_op (0);
2319       return GNUNET_SYSERR;
2320     }
2321   bm = ntohl (gm->hash_bitmap);
2322   bits = 0;
2323   while (bm > 0)
2324     {
2325       if (1 == (bm & 1))
2326         bits++;
2327       bm >>= 1;
2328     }
2329   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
2330     {
2331       GNUNET_break_op (0);
2332       return GNUNET_SYSERR;
2333     }  
2334   opt = (const GNUNET_HashCode*) &gm[1];
2335   bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
2336   bm = ntohl (gm->hash_bitmap);
2337   if ( (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) &&
2338        (ntohl (gm->type) == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK) )
2339     {
2340       GNUNET_break_op (0);
2341       return GNUNET_SYSERR;      
2342     }
2343   bits = 0;
2344   cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2345                                            &other->hashPubKey);
2346   GNUNET_assert (NULL != cps);
2347   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
2348     cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2349                                             &opt[bits++]);
2350   else
2351     cp = cps;
2352   if (cp == NULL)
2353     {
2354 #if DEBUG_FS
2355       if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
2356         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2357                     "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
2358                     GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
2359       
2360       else
2361         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2362                     "Failed to find peer `%4s' in connection set. Dropping query.\n",
2363                     GNUNET_i2s (other));
2364 #endif
2365      /* FIXME: try connect? */
2366       return GNUNET_OK;
2367     }
2368   /* note that we can really only check load here since otherwise
2369      peers could find out that we are overloaded by not being
2370      disconnected after sending us a malformed query... */
2371   if (GNUNET_YES == test_load_too_high ())
2372     {
2373 #if DEBUG_FS
2374       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2375                   "Dropping query from `%s', this peer is too busy.\n",
2376                   GNUNET_i2s (other));
2377 #endif
2378       return GNUNET_OK;
2379     }
2380
2381 #if DEBUG_FS
2382   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2383               "Received request for `%s' of type %u from peer `%4s'\n",
2384               GNUNET_h2s (&gm->query),
2385               (unsigned int) ntohl (gm->type),
2386               GNUNET_i2s (other));
2387 #endif
2388   have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
2389   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
2390                       (have_ns ? sizeof(GNUNET_HashCode) : 0));
2391   if (have_ns)
2392     pr->namespace = (GNUNET_HashCode*) &pr[1];
2393   pr->type = type;
2394   pr->mingle = ntohl (gm->filter_mutator);
2395   if (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE))
2396     memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
2397   else if (pr->type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)
2398     {
2399       GNUNET_break_op (0);
2400       GNUNET_free (pr);
2401       return GNUNET_SYSERR;
2402     }
2403   if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
2404     pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
2405
2406   pr->anonymity_level = 1;
2407   pr->priority = bound_priority (ntohl (gm->priority), cps);
2408   pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
2409   pr->query = gm->query;
2410   /* decrement ttl (always) */
2411   ttl_decrement = 2 * TTL_DECREMENT +
2412     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2413                               TTL_DECREMENT);
2414   if ( (pr->ttl < 0) &&
2415        (pr->ttl - ttl_decrement > 0) )
2416     {
2417 #if DEBUG_FS
2418       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2419                   "Dropping query from `%s' due to TTL underflow.\n",
2420                   GNUNET_i2s (other));
2421 #endif
2422       /* integer underflow => drop (should be very rare)! */
2423       GNUNET_free (pr);
2424       return GNUNET_OK;
2425     } 
2426   pr->ttl -= ttl_decrement;
2427   pr->start_time = GNUNET_TIME_absolute_get ();
2428
2429   /* get bloom filter */
2430   if (bfsize > 0)
2431     {
2432       pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
2433                                                   bfsize,
2434                                                   BLOOMFILTER_K);
2435       pr->bf_size = bfsize;
2436     }
2437
2438   cdc.have = NULL;
2439   cdc.pr = pr;
2440   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
2441                                               &gm->query,
2442                                               &check_duplicate_request_peer,
2443                                               &cdc);
2444   if (cdc.have != NULL)
2445     {
2446       if (cdc.have->start_time.value + cdc.have->ttl >=
2447           pr->start_time.value + pr->ttl)
2448         {
2449           /* existing request has higher TTL, drop new one! */
2450           cdc.have->priority += pr->priority;
2451           destroy_pending_request (pr);
2452 #if DEBUG_FS
2453           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2454                       "Have existing request with higher TTL, dropping new request.\n",
2455                       GNUNET_i2s (other));
2456 #endif
2457           return GNUNET_OK;
2458         }
2459       else
2460         {
2461           /* existing request has lower TTL, drop old one! */
2462           pr->priority += cdc.have->priority;
2463           /* Possible optimization: if we have applicable pending
2464              replies in 'cdc.have', we might want to move those over
2465              (this is a really rare special-case, so it is not clear
2466              that this would be worth it) */
2467           destroy_pending_request (cdc.have);
2468           /* keep processing 'pr'! */
2469         }
2470     }
2471
2472   pr->cp = cp;
2473   GNUNET_CONTAINER_multihashmap_put (query_request_map,
2474                                      &gm->query,
2475                                      pr,
2476                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
2477   GNUNET_CONTAINER_multihashmap_put (peer_request_map,
2478                                      &other->hashPubKey,
2479                                      pr,
2480                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
2481   
2482   pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
2483                                             pr,
2484                                             pr->start_time.value + pr->ttl);
2485
2486
2487   /* calculate change in traffic preference */
2488   preference = (double) pr->priority;
2489   if (preference < QUERY_BANDWIDTH_VALUE)
2490     preference = QUERY_BANDWIDTH_VALUE;
2491   cps->inc_preference += preference;
2492
2493   /* process locally */
2494   if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK)
2495     type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* to get on-demand as well */
2496   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
2497                                            (pr->priority + 1)); 
2498   pr->drq = GNUNET_FS_drq_get (&gm->query,
2499                                type,                           
2500                                &process_local_reply,
2501                                pr,
2502                                timeout,
2503                                GNUNET_NO);
2504
2505   /* Are multiple results possible?  If so, start processing remotely now! */
2506   switch (pr->type)
2507     {
2508     case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
2509     case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
2510       /* only one result, wait for datastore */
2511       break;
2512     default:
2513       pr->task = GNUNET_SCHEDULER_add_now (sched,
2514                                            &forward_request_task,
2515                                            pr);
2516     }
2517
2518   /* make sure we don't track too many requests */
2519   if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
2520     {
2521       pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
2522       destroy_pending_request (pr);
2523     }
2524   return GNUNET_OK;
2525 }
2526
2527
2528 /* **************************** CS GET Handling ************************ */
2529
2530
2531 /**
2532  * Handle START_SEARCH-message (search request from client).
2533  *
2534  * @param cls closure
2535  * @param client identification of the client
2536  * @param message the actual message
2537  */
2538 static void
2539 handle_start_search (void *cls,
2540                      struct GNUNET_SERVER_Client *client,
2541                      const struct GNUNET_MessageHeader *message)
2542 {
2543   static GNUNET_HashCode all_zeros;
2544   const struct SearchMessage *sm;
2545   struct ClientList *cl;
2546   struct ClientRequestList *crl;
2547   struct PendingRequest *pr;
2548   uint16_t msize;
2549   unsigned int sc;
2550   uint32_t type;
2551
2552   msize = ntohs (message->size);
2553   if ( (msize < sizeof (struct SearchMessage)) ||
2554        (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
2555     {
2556       GNUNET_break (0);
2557       GNUNET_SERVER_receive_done (client,
2558                                   GNUNET_SYSERR);
2559       return;
2560     }
2561   sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
2562   sm = (const struct SearchMessage*) message;
2563
2564   cl = client_list;
2565   while ( (cl != NULL) &&
2566           (cl->client != client) )
2567     cl = cl->next;
2568   if (cl == NULL)
2569     {
2570       cl = GNUNET_malloc (sizeof (struct ClientList));
2571       cl->client = client;
2572       GNUNET_SERVER_client_keep (client);
2573       cl->next = client_list;
2574       client_list = cl;
2575     }
2576   type = ntohl (sm->type);
2577   switch (type)
2578     {
2579     case GNUNET_DATASTORE_BLOCKTYPE_ANY:
2580     case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
2581     case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
2582     case GNUNET_DATASTORE_BLOCKTYPE_KBLOCK:
2583     case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
2584       break;
2585     default:
2586       GNUNET_break (0);
2587       GNUNET_SERVER_receive_done (client,
2588                                   GNUNET_SYSERR);
2589       return;
2590     }  
2591 #if DEBUG_FS
2592   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2593               "Received request for `%s' of type %u from local client\n",
2594               GNUNET_h2s (&sm->query),
2595               (unsigned int) type);
2596 #endif
2597
2598   /* detect duplicate KBLOCK requests */
2599   if (type == GNUNET_DATASTORE_BLOCKTYPE_KBLOCK)
2600     {
2601       crl = cl->rl_head;
2602       while ( (crl != NULL) &&
2603               ( (0 != memcmp (&crl->req->query,
2604                               &sm->query,
2605                               sizeof (GNUNET_HashCode))) ||
2606                 (crl->req->type != type) ) )
2607         crl = crl->next;
2608       if (crl != NULL)  
2609         { 
2610 #if DEBUG_FS
2611           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2612                       "Have existing request, merging content-seen lists.\n");
2613 #endif
2614           pr = crl->req;
2615           /* Duplicate request (used to send long list of
2616              known/blocked results); merge 'pr->replies_seen'
2617              and update bloom filter */
2618           GNUNET_array_grow (pr->replies_seen,
2619                              pr->replies_seen_size,
2620                              pr->replies_seen_off + sc);
2621           memcpy (&pr->replies_seen[pr->replies_seen_off],
2622                   &sm[1],
2623                   sc * sizeof (GNUNET_HashCode));
2624           pr->replies_seen_off += sc;
2625           if (pr->bf != NULL)
2626             GNUNET_CONTAINER_bloomfilter_free (pr->bf);
2627           pr->bf = refresh_bloomfilter (pr->replies_seen_off,
2628                                         &pr->mingle,
2629                                         &pr->bf_size,
2630                                         pr->replies_seen);
2631           GNUNET_SERVER_receive_done (client,
2632                                       GNUNET_OK);
2633           return;
2634         }
2635     }
2636   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
2637                       ((type == GNUNET_DATASTORE_BLOCKTYPE_SBLOCK)?sizeof(GNUNET_HashCode):0));
2638   crl = GNUNET_malloc (sizeof (struct ClientRequestList));
2639   memset (crl, 0, sizeof (struct ClientRequestList));
2640   crl->client_list = cl;
2641   GNUNET_CONTAINER_DLL_insert (cl->rl_head,
2642                                cl->rl_tail,
2643                                crl);  
2644   crl->req = pr;
2645   pr->type = type;
2646   pr->client_request_list = crl;
2647   GNUNET_array_grow (pr->replies_seen,
2648                      pr->replies_seen_size,
2649                      sc);
2650   memcpy (pr->replies_seen,
2651           &sm[1],
2652           sc * sizeof (GNUNET_HashCode));
2653   pr->replies_seen_off = sc;
2654   pr->anonymity_level = ntohl (sm->anonymity_level); 
2655   pr->bf = refresh_bloomfilter (pr->replies_seen_off,
2656                                 &pr->mingle,
2657                                 &pr->bf_size,
2658                                 pr->replies_seen);
2659  pr->query = sm->query;
2660   switch (type)
2661     {
2662     case GNUNET_DATASTORE_BLOCKTYPE_DBLOCK:
2663     case GNUNET_DATASTORE_BLOCKTYPE_IBLOCK:
2664       if (0 != memcmp (&sm->target,
2665                        &all_zeros,
2666                        sizeof (GNUNET_HashCode)))
2667         pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target);
2668       break;
2669     case GNUNET_DATASTORE_BLOCKTYPE_SBLOCK:
2670       pr->namespace = (GNUNET_HashCode*) &pr[1];
2671       memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode));
2672       break;
2673     default:
2674       break;
2675     }
2676   GNUNET_CONTAINER_multihashmap_put (query_request_map,
2677                                      &sm->query,
2678                                      pr,
2679                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
2680   if (type == GNUNET_DATASTORE_BLOCKTYPE_DBLOCK)
2681     type = GNUNET_DATASTORE_BLOCKTYPE_ANY; /* get on-demand blocks too! */
2682   pr->drq = GNUNET_FS_drq_get (&sm->query,
2683                                type,                           
2684                                &process_local_reply,
2685                                pr,
2686                                GNUNET_TIME_UNIT_FOREVER_REL,
2687                                GNUNET_YES);
2688 }
2689
2690
2691 /* **************************** Startup ************************ */
2692
2693
2694 /**
2695  * List of handlers for P2P messages
2696  * that we care about.
2697  */
2698 static struct GNUNET_CORE_MessageHandler p2p_handlers[] =
2699   {
2700     { &handle_p2p_get, 
2701       GNUNET_MESSAGE_TYPE_FS_GET, 0 },
2702     { &handle_p2p_put, 
2703       GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
2704     { NULL, 0, 0 }
2705   };
2706
2707
2708 /**
2709  * List of handlers for the messages understood by this
2710  * service.
2711  */
2712 static struct GNUNET_SERVER_MessageHandler handlers[] = {
2713   {&GNUNET_FS_handle_index_start, NULL, 
2714    GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
2715   {&GNUNET_FS_handle_index_list_get, NULL, 
2716    GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
2717   {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
2718    sizeof (struct UnindexMessage) },
2719   {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
2720    0 },
2721   {NULL, NULL, 0, 0}
2722 };
2723
2724
2725 /**
2726  * Process fs requests.
2727  *
2728  * @param s scheduler to use
2729  * @param server the initialized server
2730  * @param c configuration to use
2731  */
2732 static int
2733 main_init (struct GNUNET_SCHEDULER_Handle *s,
2734            struct GNUNET_SERVER_Handle *server,
2735            const struct GNUNET_CONFIGURATION_Handle *c)
2736 {
2737   sched = s;
2738   cfg = c;
2739   connected_peers = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
2740   query_request_map = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
2741   peer_request_map = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
2742   requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
2743   core = GNUNET_CORE_connect (sched,
2744                               cfg,
2745                               GNUNET_TIME_UNIT_FOREVER_REL,
2746                               NULL,
2747                               NULL,
2748                               NULL,
2749                               &peer_connect_handler,
2750                               &peer_disconnect_handler,
2751                               NULL, GNUNET_NO,
2752                               NULL, GNUNET_NO,
2753                               p2p_handlers);
2754   if (NULL == core)
2755     {
2756       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2757                   _("Failed to connect to `%s' service.\n"),
2758                   "core");
2759       GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
2760       connected_peers = NULL;
2761       GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
2762       query_request_map = NULL;
2763       GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
2764       requests_by_expiration_heap = NULL;
2765       GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
2766       peer_request_map = NULL;
2767
2768       return GNUNET_SYSERR;
2769     }  
2770   GNUNET_SERVER_disconnect_notify (server, 
2771                                    &handle_client_disconnect,
2772                                    NULL);
2773   GNUNET_SERVER_add_handlers (server, handlers);
2774   GNUNET_SCHEDULER_add_delayed (sched,
2775                                 GNUNET_TIME_UNIT_FOREVER_REL,
2776                                 &shutdown_task,
2777                                 NULL);
2778   return GNUNET_OK;
2779 }
2780
2781
2782 /**
2783  * Process fs requests.
2784  *
2785  * @param cls closure
2786  * @param sched scheduler to use
2787  * @param server the initialized server
2788  * @param cfg configuration to use
2789  */
2790 static void
2791 run (void *cls,
2792      struct GNUNET_SCHEDULER_Handle *sched,
2793      struct GNUNET_SERVER_Handle *server,
2794      const struct GNUNET_CONFIGURATION_Handle *cfg)
2795 {
2796   if ( (GNUNET_OK != GNUNET_FS_drq_init (sched, cfg)) ||
2797        (GNUNET_OK != GNUNET_FS_indexing_init (sched, cfg)) ||
2798        (GNUNET_OK != main_init (sched, server, cfg)) )
2799     {    
2800       GNUNET_SCHEDULER_shutdown (sched);
2801       return;   
2802     }
2803 }
2804
2805
2806 /**
2807  * The main function for the fs service.
2808  *
2809  * @param argc number of arguments from the command line
2810  * @param argv command line arguments
2811  * @return 0 ok, 1 on error
2812  */
2813 int
2814 main (int argc, char *const *argv)
2815 {
2816   return (GNUNET_OK ==
2817           GNUNET_SERVICE_run (argc,
2818                               argv,
2819                               "fs",
2820                               GNUNET_SERVICE_OPTION_NONE,
2821                               &run, NULL)) ? 0 : 1;
2822 }
2823
2824 /* end of gnunet-service-fs.c */