nitpicks
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs.c
23  * @brief gnunet anonymity protocol implementation
24  * @author Christian Grothoff
25  *
26  * FIXME:
27  * - TTL/priority calculations are absent!
28  * TODO:
29  * - have non-zero preference / priority for requests we initiate!
30  * - implement hot-path routing decision procedure
31  * - implement: bound_priority, test_load_too_high
32  * - statistics
33  */
34 #include "platform.h"
35 #include <float.h>
36 #include "gnunet_constants.h"
37 #include "gnunet_core_service.h"
38 #include "gnunet_datastore_service.h"
39 #include "gnunet_peer_lib.h"
40 #include "gnunet_protocols.h"
41 #include "gnunet_signatures.h"
42 #include "gnunet_statistics_service.h"
43 #include "gnunet_util_lib.h"
44 #include "gnunet-service-fs_indexing.h"
45 #include "fs.h"
46
47 #define DEBUG_FS GNUNET_NO
48
49 /**
50  * Maximum number of outgoing messages we queue per peer.
51  * FIXME: make configurable?
52  */
53 #define MAX_QUEUE_PER_PEER 16
54
55 /**
56  * Inverse of the probability that we will submit the same query
57  * to the same peer again.  If the same peer already got the query
58  * repeatedly recently, the probability is multiplied by the inverse
59  * of this number each time.  Note that we only try about every TTL_DECREMENT/2
60  * plus MAX_CORK_DELAY (so roughly every 3.5s).
61  */
62 #define RETRY_PROBABILITY_INV 3
63
64 /**
65  * What is the maximum delay for a P2P FS message (in our interaction
66  * with core)?  FS-internal delays are another story.  The value is
67  * chosen based on the 32k block size.  Given that peers typcially
68  * have at least 1 kb/s bandwidth, 45s waits give us a chance to
69  * transmit one message even to the lowest-bandwidth peers.
70  */
71 #define MAX_TRANSMIT_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 45)
72
73
74
75 /**
76  * Maximum number of requests (from other peers) that we're
77  * willing to have pending at any given point in time.
78  * FIXME: set from configuration.
79  */
80 static uint64_t max_pending_requests = (32 * 1024);
81
82
83 /**
84  * Information we keep for each pending reply.  The
85  * actual message follows at the end of this struct.
86  */
87 struct PendingMessage;
88
89 /**
90  * Our connection to the datastore.
91  */
92 static struct GNUNET_DATASTORE_Handle *dsh;
93
94
95 /**
96  * Function called upon completion of a transmission.
97  *
98  * @param cls closure
99  * @param pid ID of receiving peer, 0 on transmission error
100  */
101 typedef void (*TransmissionContinuation)(void * cls, 
102                                          GNUNET_PEER_Id tpid);
103
104
105 /**
106  * Information we keep for each pending message (GET/PUT).  The
107  * actual message follows at the end of this struct.
108  */
109 struct PendingMessage
110 {
111   /**
112    * This is a doubly-linked list of messages to the same peer.
113    */
114   struct PendingMessage *next;
115
116   /**
117    * This is a doubly-linked list of messages to the same peer.
118    */
119   struct PendingMessage *prev;
120
121   /**
122    * Entry in pending message list for this pending message.
123    */ 
124   struct PendingMessageList *pml;  
125
126   /**
127    * Function to call immediately once we have transmitted this
128    * message.
129    */
130   TransmissionContinuation cont;
131
132   /**
133    * Closure for cont.
134    */
135   void *cont_cls;
136
137   /**
138    * Size of the reply; actual reply message follows
139    * at the end of this struct.
140    */
141   size_t msize;
142   
143   /**
144    * How important is this message for us?
145    */
146   uint32_t priority;
147  
148 };
149
150
151 /**
152  * Information about a peer that we are connected to.
153  * We track data that is useful for determining which
154  * peers should receive our requests.  We also keep
155  * a list of messages to transmit to this peer.
156  */
157 struct ConnectedPeer
158 {
159
160   /**
161    * List of the last clients for which this peer successfully
162    * answered a query.
163    */
164   struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
165
166   /**
167    * List of the last PIDs for which
168    * this peer successfully answered a query;
169    * We use 0 to indicate no successful reply.
170    */
171   GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
172
173   /**
174    * Average delay between sending the peer a request and
175    * getting a reply (only calculated over the requests for
176    * which we actually got a reply).   Calculated
177    * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
178    */ 
179   struct GNUNET_TIME_Relative avg_delay;
180
181   /**
182    * Handle for an active request for transmission to this
183    * peer, or NULL.
184    */
185   struct GNUNET_CORE_TransmitHandle *cth;
186
187   /**
188    * Messages (replies, queries, content migration) we would like to
189    * send to this peer in the near future.  Sorted by priority, head.
190    */
191   struct PendingMessage *pending_messages_head;
192
193   /**
194    * Messages (replies, queries, content migration) we would like to
195    * send to this peer in the near future.  Sorted by priority, tail.
196    */
197   struct PendingMessage *pending_messages_tail;
198
199   /**
200    * Average priority of successful replies.  Calculated
201    * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
202    */
203   double avg_priority;
204
205   /**
206    * Increase in traffic preference still to be submitted
207    * to the core service for this peer. FIXME: double or 'uint64_t'?
208    */
209   double inc_preference;
210
211   /**
212    * The peer's identity.
213    */
214   GNUNET_PEER_Id pid;  
215
216   /**
217    * Size of the linked list of 'pending_messages'.
218    */
219   unsigned int pending_requests;
220
221   /**
222    * Which offset in "last_p2p_replies" will be updated next?
223    * (we go round-robin).
224    */
225   unsigned int last_p2p_replies_woff;
226
227   /**
228    * Which offset in "last_client_replies" will be updated next?
229    * (we go round-robin).
230    */
231   unsigned int last_client_replies_woff;
232
233 };
234
235
236 /**
237  * Information we keep for each pending request.  We should try to
238  * keep this struct as small as possible since its memory consumption
239  * is key to how many requests we can have pending at once.
240  */
241 struct PendingRequest;
242
243
244 /**
245  * Doubly-linked list of requests we are performing
246  * on behalf of the same client.
247  */
248 struct ClientRequestList
249 {
250
251   /**
252    * This is a doubly-linked list.
253    */
254   struct ClientRequestList *next;
255
256   /**
257    * This is a doubly-linked list.
258    */
259   struct ClientRequestList *prev;
260
261   /**
262    * Request this entry represents.
263    */
264   struct PendingRequest *req;
265
266   /**
267    * Client list this request belongs to.
268    */
269   struct ClientList *client_list;
270
271 };
272
273
274 /**
275  * Replies to be transmitted to the client.  The actual
276  * response message is allocated after this struct.
277  */
278 struct ClientResponseMessage
279 {
280   /**
281    * This is a doubly-linked list.
282    */
283   struct ClientResponseMessage *next;
284
285   /**
286    * This is a doubly-linked list.
287    */
288   struct ClientResponseMessage *prev;
289
290   /**
291    * Client list entry this response belongs to.
292    */
293   struct ClientList *client_list;
294
295   /**
296    * Number of bytes in the response.
297    */
298   size_t msize;
299 };
300
301
302 /**
303  * Linked list of clients we are performing requests
304  * for right now.
305  */
306 struct ClientList
307 {
308   /**
309    * This is a linked list.
310    */
311   struct ClientList *next;
312
313   /**
314    * ID of a client making a request, NULL if this entry is for a
315    * peer.
316    */
317   struct GNUNET_SERVER_Client *client;
318
319   /**
320    * Head of list of requests performed on behalf
321    * of this client right now.
322    */
323   struct ClientRequestList *rl_head;
324
325   /**
326    * Tail of list of requests performed on behalf
327    * of this client right now.
328    */
329   struct ClientRequestList *rl_tail;
330
331   /**
332    * Head of linked list of responses.
333    */
334   struct ClientResponseMessage *res_head;
335
336   /**
337    * Tail of linked list of responses.
338    */
339   struct ClientResponseMessage *res_tail;
340
341   /**
342    * Context for sending replies.
343    */
344   struct GNUNET_CONNECTION_TransmitHandle *th;
345
346 };
347
348
349 /**
350  * Doubly-linked list of messages we are performing
351  * due to a pending request.
352  */
353 struct PendingMessageList
354 {
355
356   /**
357    * This is a doubly-linked list of messages on behalf of the same request.
358    */
359   struct PendingMessageList *next;
360
361   /**
362    * This is a doubly-linked list of messages on behalf of the same request.
363    */
364   struct PendingMessageList *prev;
365
366   /**
367    * Message this entry represents.
368    */
369   struct PendingMessage *pm;
370
371   /**
372    * Request this entry belongs to.
373    */
374   struct PendingRequest *req;
375
376   /**
377    * Peer this message is targeted for.
378    */
379   struct ConnectedPeer *target;
380
381 };
382
383
384 /**
385  * Information we keep for each pending request.  We should try to
386  * keep this struct as small as possible since its memory consumption
387  * is key to how many requests we can have pending at once.
388  */
389 struct PendingRequest
390 {
391
392   /**
393    * If this request was made by a client, this is our entry in the
394    * client request list; otherwise NULL.
395    */
396   struct ClientRequestList *client_request_list;
397
398   /**
399    * Entry of peer responsible for this entry (if this request
400    * was made by a peer).
401    */
402   struct ConnectedPeer *cp;
403
404   /**
405    * If this is a namespace query, pointer to the hash of the public
406    * key of the namespace; otherwise NULL.  Pointer will be to the 
407    * end of this struct (so no need to free it).
408    */
409   const GNUNET_HashCode *namespace;
410
411   /**
412    * Bloomfilter we use to filter out replies that we don't care about
413    * (anymore).  NULL as long as we are interested in all replies.
414    */
415   struct GNUNET_CONTAINER_BloomFilter *bf;
416
417   /**
418    * Context of our GNUNET_CORE_peer_change_preference call.
419    */
420   struct GNUNET_CORE_InformationRequestContext *irc;
421
422   /**
423    * Hash code of all replies that we have seen so far (only valid
424    * if client is not NULL since we only track replies like this for
425    * our own clients).
426    */
427   GNUNET_HashCode *replies_seen;
428
429   /**
430    * Node in the heap representing this entry; NULL
431    * if we have no heap node.
432    */
433   struct GNUNET_CONTAINER_HeapNode *hnode;
434
435   /**
436    * Head of list of messages being performed on behalf of this
437    * request.
438    */
439   struct PendingMessageList *pending_head;
440
441   /**
442    * Tail of list of messages being performed on behalf of this
443    * request.
444    */
445   struct PendingMessageList *pending_tail;
446
447   /**
448    * When did we first see this request (form this peer), or, if our
449    * client is initiating, when did we last initiate a search?
450    */
451   struct GNUNET_TIME_Absolute start_time;
452
453   /**
454    * The query that this request is for.
455    */
456   GNUNET_HashCode query;
457
458   /**
459    * The task responsible for transmitting queries
460    * for this request.
461    */
462   GNUNET_SCHEDULER_TaskIdentifier task;
463
464   /**
465    * (Interned) Peer identifier that identifies a preferred target
466    * for requests.
467    */
468   GNUNET_PEER_Id target_pid;
469
470   /**
471    * (Interned) Peer identifiers of peers that have already
472    * received our query for this content.
473    */
474   GNUNET_PEER_Id *used_pids;
475   
476   /**
477    * Our entry in the queue (non-NULL while we wait for our
478    * turn to interact with the local database).
479    */
480   struct GNUNET_DATASTORE_QueueEntry *qe;
481
482   /**
483
484    * Size of the 'bf' (in bytes).
485    */
486   size_t bf_size;
487
488   /**
489    * Desired anonymity level; only valid for requests from a local client.
490    */
491   uint32_t anonymity_level;
492
493   /**
494    * How many entries in "used_pids" are actually valid?
495    */
496   unsigned int used_pids_off;
497
498   /**
499    * How long is the "used_pids" array?
500    */
501   unsigned int used_pids_size;
502
503   /**
504    * Number of results found for this request.
505    */
506   unsigned int results_found;
507
508   /**
509    * How many entries in "replies_seen" are actually valid?
510    */
511   unsigned int replies_seen_off;
512
513   /**
514    * How long is the "replies_seen" array?
515    */
516   unsigned int replies_seen_size;
517   
518   /**
519    * Priority with which this request was made.  If one of our clients
520    * made the request, then this is the current priority that we are
521    * using when initiating the request.  This value is used when
522    * we decide to reward other peers with trust for providing a reply.
523    */
524   uint32_t priority;
525
526   /**
527    * Priority points left for us to spend when forwarding this request
528    * to other peers.
529    */
530   uint32_t remaining_priority;
531
532   /**
533    * Number to mingle hashes for bloom-filter tests with.
534    */
535   int32_t mingle;
536
537   /**
538    * TTL with which we saw this request (or, if we initiated, TTL that
539    * we used for the request).
540    */
541   int32_t ttl;
542   
543   /**
544    * Type of the content that this request is for.
545    */
546   enum GNUNET_BLOCK_Type type;
547
548   /**
549    * Remove this request after transmission of the current response.
550    */
551   int16_t do_remove;
552
553   /**
554    * GNUNET_YES if we should not forward this request to other peers.
555    */
556   int16_t local_only;
557
558 };
559
560
561 /**
562  * Block that is ready for migration to other peers.  Actual data is at the end of the block.
563  */
564 struct MigrationReadyBlock
565 {
566
567   /**
568    * This is a doubly-linked list.
569    */
570   struct MigrationReadyBlock *next;
571
572   /**
573    * This is a doubly-linked list.
574    */
575   struct MigrationReadyBlock *prev;
576
577   /**
578    * Query for the block.
579    */
580   GNUNET_HashCode query;
581
582   /**
583    * When does this block expire? 
584    */
585   struct GNUNET_TIME_Absolute expiration;
586
587   /**
588    * Peers we would consider forwarding this
589    * block to.  Zero for empty entries.
590    */
591   GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
592
593   /**
594    * Size of the block.
595    */
596   size_t size;
597
598   /**
599    *  Number of targets already used.
600    */
601   unsigned int used_targets;
602
603   /**
604    * Type of the block.
605    */
606   enum GNUNET_BLOCK_Type type;
607 };
608
609
610 /**
611  * Our scheduler.
612  */
613 static struct GNUNET_SCHEDULER_Handle *sched;
614
615 /**
616  * Our configuration.
617  */
618 static const struct GNUNET_CONFIGURATION_Handle *cfg;
619
620 /**
621  * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
622  */
623 static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
624
625 /**
626  * Map of peer identifiers to "struct PendingRequest" (for that peer).
627  */
628 static struct GNUNET_CONTAINER_MultiHashMap *peer_request_map;
629
630 /**
631  * Map of query identifiers to "struct PendingRequest" (for that query).
632  */
633 static struct GNUNET_CONTAINER_MultiHashMap *query_request_map;
634
635 /**
636  * Heap with the request that will expire next at the top.  Contains
637  * pointers of type "struct PendingRequest*"; these will *also* be
638  * aliased from the "requests_by_peer" data structures and the
639  * "requests_by_query" table.  Note that requests from our clients
640  * don't expire and are thus NOT in the "requests_by_expiration"
641  * (or the "requests_by_peer" tables).
642  */
643 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
644
645 /**
646  * Handle for reporting statistics.
647  */
648 static struct GNUNET_STATISTICS_Handle *stats;
649
650 /**
651  * Linked list of clients we are currently processing requests for.
652  */
653 static struct ClientList *client_list;
654
655 /**
656  * Pointer to handle to the core service (points to NULL until we've
657  * connected to it).
658  */
659 static struct GNUNET_CORE_Handle *core;
660
661 /**
662  * Head of linked list of blocks that can be migrated.
663  */
664 static struct MigrationReadyBlock *mig_head;
665
666 /**
667  * Tail of linked list of blocks that can be migrated.
668  */
669 static struct MigrationReadyBlock *mig_tail;
670
671 /**
672  * Request to datastore for migration (or NULL).
673  */
674 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
675
676 /**
677  * ID of task that collects blocks for migration.
678  */
679 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
680
681 /**
682  * What is the maximum frequency at which we are allowed to
683  * poll the datastore for migration content?
684  */
685 static struct GNUNET_TIME_Relative min_migration_delay;
686
687 /**
688  * Size of the doubly-linked list of migration blocks.
689  */
690 static unsigned int mig_size;
691
692 /**
693  * Are we allowed to migrate content to this peer.
694  */
695 static int active_migration;
696
697
698 /**
699  * Transmit messages by copying it to the target buffer
700  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
701  * for writing in the meantime.  In that case, do nothing
702  * (the disconnect or shutdown handler will take care of the rest).
703  * If we were able to transmit messages and there are still more
704  * pending, ask core again for further calls to this function.
705  *
706  * @param cls closure, pointer to the 'struct ConnectedPeer*'
707  * @param size number of bytes available in buf
708  * @param buf where the callee should write the message
709  * @return number of bytes written to buf
710  */
711 static size_t
712 transmit_to_peer (void *cls,
713                   size_t size, void *buf);
714
715
716 /* ******************* clean up functions ************************ */
717
718
719 /**
720  * Delete the given migration block.
721  *
722  * @param mb block to delete
723  */
724 static void
725 delete_migration_block (struct MigrationReadyBlock *mb)
726 {
727   GNUNET_CONTAINER_DLL_remove (mig_head,
728                                mig_tail,
729                                mb);
730   GNUNET_PEER_decrement_rcs (mb->target_list,
731                              MIGRATION_LIST_SIZE);
732   mig_size--;
733   GNUNET_free (mb);
734 }
735
736
737 /**
738  * Compare the distance of two peers to a key.
739  *
740  * @param key key
741  * @param p1 first peer
742  * @param p2 second peer
743  * @return GNUNET_YES if P1 is closer to key than P2
744  */
745 static int
746 is_closer (const GNUNET_HashCode *key,
747            const struct GNUNET_PeerIdentity *p1,
748            const struct GNUNET_PeerIdentity *p2)
749 {
750   return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey,
751                                     &p2->hashPubKey,
752                                     key);
753 }
754
755
756 /**
757  * Consider migrating content to a given peer.
758  *
759  * @param cls 'struct MigrationReadyBlock*' to select
760  *            targets for (or NULL for none)
761  * @param key ID of the peer 
762  * @param value 'struct ConnectedPeer' of the peer
763  * @return GNUNET_YES (always continue iteration)2
764  */
765 static int
766 consider_migration (void *cls,
767                     const GNUNET_HashCode *key,
768                     void *value)
769 {
770   struct MigrationReadyBlock *mb = cls;
771   struct ConnectedPeer *cp = value;
772   struct MigrationReadyBlock *pos;
773   struct GNUNET_PeerIdentity cppid;
774   struct GNUNET_PeerIdentity otherpid;
775   struct GNUNET_PeerIdentity worstpid;
776   size_t msize;
777   unsigned int i;
778   unsigned int repl;
779   
780   /* consider 'cp' as a migration target for mb */
781   if (mb != NULL)
782     {
783       GNUNET_PEER_resolve (cp->pid,
784                            &cppid);
785       repl = MIGRATION_LIST_SIZE;
786       for (i=0;i<MIGRATION_LIST_SIZE;i++)
787         {
788           if (mb->target_list[i] == 0)
789             {
790               mb->target_list[i] = cp->pid;
791               GNUNET_PEER_change_rc (mb->target_list[i], 1);
792               repl = MIGRATION_LIST_SIZE;
793               break;
794             }
795           GNUNET_PEER_resolve (mb->target_list[i],
796                                &otherpid);
797           if ( (repl == MIGRATION_LIST_SIZE) &&
798                is_closer (&mb->query,
799                           &cppid,
800                           &otherpid)) 
801             {
802               repl = i;
803               worstpid = otherpid;
804             }
805           else if ( (repl != MIGRATION_LIST_SIZE) &&
806                     (is_closer (&mb->query,
807                                 &worstpid,
808                                 &otherpid) ) )
809             {
810               repl = i;
811               worstpid = otherpid;
812             }       
813         }
814       if (repl != MIGRATION_LIST_SIZE) 
815         {
816           GNUNET_PEER_change_rc (mb->target_list[repl], -1);
817           mb->target_list[repl] = cp->pid;
818           GNUNET_PEER_change_rc (mb->target_list[repl], 1);
819         }
820     }
821
822   /* consider scheduling transmission to cp for content migration */
823   if (cp->cth != NULL)
824     return GNUNET_YES; 
825   msize = 0;
826   pos = mig_head;
827   while (pos != NULL)
828     {
829       for (i=0;i<MIGRATION_LIST_SIZE;i++)
830         {
831           if (cp->pid == pos->target_list[i])
832             {
833               if (msize == 0)
834                 msize = pos->size;
835               else
836                 msize = GNUNET_MIN (msize,
837                                     pos->size);
838               break;
839             }
840         }
841       pos = pos->next;
842     }
843   if (msize == 0)
844     return GNUNET_YES; /* no content available */
845   cp->cth 
846     = GNUNET_CORE_notify_transmit_ready (core,
847                                          0, GNUNET_TIME_UNIT_FOREVER_REL,
848                                          (const struct GNUNET_PeerIdentity*) key,
849                                          msize + sizeof (struct PutMessage),
850                                          &transmit_to_peer,
851                                          cp);
852   return GNUNET_YES;
853 }
854
855
856 /**
857  * Task that is run periodically to obtain blocks for content
858  * migration
859  * 
860  * @param cls unused
861  * @param tc scheduler context (also unused)
862  */
863 static void
864 gather_migration_blocks (void *cls,
865                          const struct GNUNET_SCHEDULER_TaskContext *tc);
866
867
868 /**
869  * If the migration task is not currently running, consider
870  * (re)scheduling it with the appropriate delay.
871  */
872 static void
873 consider_migration_gathering ()
874 {
875   struct GNUNET_TIME_Relative delay;
876
877   if (mig_qe != NULL)
878     return;
879   if (mig_task != GNUNET_SCHEDULER_NO_TASK)
880     return;
881   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
882                                          mig_size);
883   delay = GNUNET_TIME_relative_divide (delay,
884                                        MAX_MIGRATION_QUEUE);
885   delay = GNUNET_TIME_relative_max (delay,
886                                     min_migration_delay);
887   mig_task = GNUNET_SCHEDULER_add_delayed (sched,
888                                            delay,
889                                            &gather_migration_blocks,
890                                            NULL);
891 }
892
893
894 /**
895  * Process content offered for migration.
896  *
897  * @param cls closure
898  * @param key key for the content
899  * @param size number of bytes in data
900  * @param data content stored
901  * @param type type of the content
902  * @param priority priority of the content
903  * @param anonymity anonymity-level for the content
904  * @param expiration expiration time for the content
905  * @param uid unique identifier for the datum;
906  *        maybe 0 if no unique identifier is available
907  */
908 static void
909 process_migration_content (void *cls,
910                            const GNUNET_HashCode * key,
911                            uint32_t size,
912                            const void *data,
913                            enum GNUNET_BLOCK_Type type,
914                            uint32_t priority,
915                            uint32_t anonymity,
916                            struct GNUNET_TIME_Absolute
917                            expiration, uint64_t uid)
918 {
919   struct MigrationReadyBlock *mb;
920   
921   if (key == NULL)
922     {
923       mig_qe = NULL;
924       if (mig_size < MAX_MIGRATION_QUEUE)  
925         consider_migration_gathering ();
926       return;
927     }
928   if (type == GNUNET_BLOCK_TYPE_ONDEMAND)
929     {
930       if (GNUNET_OK !=
931           GNUNET_FS_handle_on_demand_block (key, size, data,
932                                             type, priority, anonymity,
933                                             expiration, uid, 
934                                             &process_migration_content,
935                                             NULL))
936         GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
937       return;
938     }
939 #if DEBUG_FS
940   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
941               "Retrieved block `%s' of type %u for migration\n",
942               GNUNET_h2s (key),
943               type);
944 #endif
945   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
946   mb->query = *key;
947   mb->expiration = expiration;
948   mb->size = size;
949   mb->type = type;
950   memcpy (&mb[1], data, size);
951   GNUNET_CONTAINER_DLL_insert_after (mig_head,
952                                      mig_tail,
953                                      mig_tail,
954                                      mb);
955   mig_size++;
956   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
957                                          &consider_migration,
958                                          mb);
959   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
960 }
961
962
963 /**
964  * Task that is run periodically to obtain blocks for content
965  * migration
966  * 
967  * @param cls unused
968  * @param tc scheduler context (also unused)
969  */
970 static void
971 gather_migration_blocks (void *cls,
972                          const struct GNUNET_SCHEDULER_TaskContext *tc)
973 {
974   mig_task = GNUNET_SCHEDULER_NO_TASK;
975   mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, -1,
976                                         GNUNET_TIME_UNIT_FOREVER_REL,
977                                         &process_migration_content, NULL);
978   GNUNET_assert (mig_qe != NULL);
979 }
980
981
982 /**
983  * We're done with a particular message list entry.
984  * Free all associated resources.
985  * 
986  * @param pml entry to destroy
987  */
988 static void
989 destroy_pending_message_list_entry (struct PendingMessageList *pml)
990 {
991   GNUNET_CONTAINER_DLL_remove (pml->req->pending_head,
992                                pml->req->pending_tail,
993                                pml);
994   GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head,
995                                pml->target->pending_messages_tail,
996                                pml->pm);
997   pml->target->pending_requests--;
998   GNUNET_free (pml->pm);
999   GNUNET_free (pml);
1000 }
1001
1002
1003 /**
1004  * Destroy the given pending message (and call the respective
1005  * continuation).
1006  *
1007  * @param pm message to destroy
1008  * @param tpid id of peer that the message was delivered to, or 0 for none
1009  */
1010 static void
1011 destroy_pending_message (struct PendingMessage *pm,
1012                          GNUNET_PEER_Id tpid)
1013 {
1014   struct PendingMessageList *pml = pm->pml;
1015   TransmissionContinuation cont;
1016   void *cont_cls;
1017
1018   GNUNET_assert (pml->pm == pm);
1019   GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
1020   cont = pm->cont;
1021   cont_cls = pm->cont_cls;
1022   destroy_pending_message_list_entry (pml);
1023   cont (cont_cls, tpid);  
1024 }
1025
1026
1027 /**
1028  * We're done processing a particular request.
1029  * Free all associated resources.
1030  *
1031  * @param pr request to destroy
1032  */
1033 static void
1034 destroy_pending_request (struct PendingRequest *pr)
1035 {
1036   struct GNUNET_PeerIdentity pid;
1037
1038   if (pr->hnode != NULL)
1039     {
1040       GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
1041                                          pr->hnode);
1042       pr->hnode = NULL;
1043     }
1044   if (NULL == pr->client_request_list)
1045     {
1046       GNUNET_STATISTICS_update (stats,
1047                                 gettext_noop ("# P2P searches active"),
1048                                 -1,
1049                                 GNUNET_NO);
1050     }
1051   else
1052     {
1053       GNUNET_STATISTICS_update (stats,
1054                                 gettext_noop ("# client searches active"),
1055                                 -1,
1056                                 GNUNET_NO);
1057     }
1058   /* might have already been removed from map in 'process_reply' (if
1059      there was a unique reply) or never inserted if it was a
1060      duplicate; hence ignore the return value here */
1061   (void) GNUNET_CONTAINER_multihashmap_remove (query_request_map,
1062                                                &pr->query,
1063                                                pr);
1064   if (pr->qe != NULL)
1065      {
1066       GNUNET_DATASTORE_cancel (pr->qe);
1067       pr->qe = NULL;
1068     }
1069   if (pr->client_request_list != NULL)
1070     {
1071       GNUNET_CONTAINER_DLL_remove (pr->client_request_list->client_list->rl_head,
1072                                    pr->client_request_list->client_list->rl_tail,
1073                                    pr->client_request_list);
1074       GNUNET_free (pr->client_request_list);
1075       pr->client_request_list = NULL;
1076     }
1077   if (pr->cp != NULL)
1078     {
1079       GNUNET_PEER_resolve (pr->cp->pid,
1080                            &pid);
1081       (void) GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1082                                                    &pid.hashPubKey,
1083                                                    pr);
1084       pr->cp = NULL;
1085     }
1086   if (pr->bf != NULL)
1087     {
1088       GNUNET_CONTAINER_bloomfilter_free (pr->bf);                                        
1089       pr->bf = NULL;
1090     }
1091   if (pr->irc != NULL)
1092     {
1093       GNUNET_CORE_peer_change_preference_cancel (pr->irc);
1094       pr->irc = NULL;
1095     }
1096   if (pr->replies_seen != NULL)
1097     {
1098       GNUNET_free (pr->replies_seen);
1099       pr->replies_seen = NULL;
1100     }
1101   if (pr->task != GNUNET_SCHEDULER_NO_TASK)
1102     {
1103       GNUNET_SCHEDULER_cancel (sched,
1104                                pr->task);
1105       pr->task = GNUNET_SCHEDULER_NO_TASK;
1106     }
1107   while (NULL != pr->pending_head)    
1108     destroy_pending_message_list_entry (pr->pending_head);
1109   GNUNET_PEER_change_rc (pr->target_pid, -1);
1110   if (pr->used_pids != NULL)
1111     {
1112       GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
1113       GNUNET_free (pr->used_pids);
1114       pr->used_pids_off = 0;
1115       pr->used_pids_size = 0;
1116       pr->used_pids = NULL;
1117     }
1118   GNUNET_free (pr);
1119 }
1120
1121
1122 /**
1123  * Method called whenever a given peer connects.
1124  *
1125  * @param cls closure, not used
1126  * @param peer peer identity this notification is about
1127  * @param latency reported latency of the connection with 'other'
1128  * @param distance reported distance (DV) to 'other' 
1129  */
1130 static void 
1131 peer_connect_handler (void *cls,
1132                       const struct
1133                       GNUNET_PeerIdentity * peer,
1134                       struct GNUNET_TIME_Relative latency,
1135                       uint32_t distance)
1136 {
1137   struct ConnectedPeer *cp;
1138   struct MigrationReadyBlock *pos;
1139   
1140   cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
1141   cp->pid = GNUNET_PEER_intern (peer);
1142   GNUNET_break (GNUNET_OK ==
1143                 GNUNET_CONTAINER_multihashmap_put (connected_peers,
1144                                                    &peer->hashPubKey,
1145                                                    cp,
1146                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1147
1148   pos = mig_head;
1149   while (NULL != pos)
1150     {
1151       (void) consider_migration (pos, &peer->hashPubKey, cp);
1152       pos = pos->next;
1153     }
1154 }
1155
1156
1157
1158 /**
1159  * Free (each) request made by the peer.
1160  *
1161  * @param cls closure, points to peer that the request belongs to
1162  * @param key current key code
1163  * @param value value in the hash map
1164  * @return GNUNET_YES (we should continue to iterate)
1165  */
1166 static int
1167 destroy_request (void *cls,
1168                  const GNUNET_HashCode * key,
1169                  void *value)
1170 {
1171   const struct GNUNET_PeerIdentity * peer = cls;
1172   struct PendingRequest *pr = value;
1173   
1174   GNUNET_break (GNUNET_YES ==
1175                 GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1176                                                       &peer->hashPubKey,
1177                                                       pr));
1178   destroy_pending_request (pr);
1179   return GNUNET_YES;
1180 }
1181
1182
1183 /**
1184  * Method called whenever a peer disconnects.
1185  *
1186  * @param cls closure, not used
1187  * @param peer peer identity this notification is about
1188  */
1189 static void
1190 peer_disconnect_handler (void *cls,
1191                          const struct
1192                          GNUNET_PeerIdentity * peer)
1193 {
1194   struct ConnectedPeer *cp;
1195   struct PendingMessage *pm;
1196   unsigned int i;
1197   struct MigrationReadyBlock *pos;
1198   struct MigrationReadyBlock *next;
1199
1200   GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map,
1201                                               &peer->hashPubKey,
1202                                               &destroy_request,
1203                                               (void*) peer);
1204   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1205                                           &peer->hashPubKey);
1206   if (cp == NULL)
1207     return;
1208   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1209     {
1210       if (NULL != cp->last_client_replies[i])
1211         {
1212           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1213           cp->last_client_replies[i] = NULL;
1214         }
1215     }
1216   GNUNET_break (GNUNET_YES ==
1217                 GNUNET_CONTAINER_multihashmap_remove (connected_peers,
1218                                                       &peer->hashPubKey,
1219                                                       cp));
1220   /* remove this peer from migration considerations; schedule
1221      alternatives */
1222   next = mig_head;
1223   while (NULL != (pos = next))
1224     {
1225       next = pos->next;
1226       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1227         {
1228           if (pos->target_list[i] == cp->pid)
1229             {
1230               GNUNET_PEER_change_rc (pos->target_list[i], -1);
1231               pos->target_list[i] = 0;
1232             }
1233          }
1234       if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers))
1235         {
1236           delete_migration_block (pos);
1237           consider_migration_gathering ();
1238           continue;
1239         }
1240       GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1241                                              &consider_migration,
1242                                              pos);
1243     }
1244
1245   GNUNET_PEER_change_rc (cp->pid, -1);
1246   GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1247   if (NULL != cp->cth)
1248     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1249   while (NULL != (pm = cp->pending_messages_head))
1250     destroy_pending_message (pm, 0 /* delivery failed */);
1251   GNUNET_break (0 == cp->pending_requests);
1252   GNUNET_free (cp);
1253 }
1254
1255
1256 /**
1257  * Iterator over hash map entries that removes all occurences
1258  * of the given 'client' from the 'last_client_replies' of the
1259  * given connected peer.
1260  *
1261  * @param cls closure, the 'struct GNUNET_SERVER_Client*' to remove
1262  * @param key current key code (unused)
1263  * @param value value in the hash map (the 'struct ConnectedPeer*' to change)
1264  * @return GNUNET_YES (we should continue to iterate)
1265  */
1266 static int
1267 remove_client_from_last_client_replies (void *cls,
1268                                         const GNUNET_HashCode * key,
1269                                         void *value)
1270 {
1271   struct GNUNET_SERVER_Client *client = cls;
1272   struct ConnectedPeer *cp = value;
1273   unsigned int i;
1274
1275   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1276     {
1277       if (cp->last_client_replies[i] == client)
1278         {
1279           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1280           cp->last_client_replies[i] = NULL;
1281         }
1282     }  
1283   return GNUNET_YES;
1284 }
1285
1286
1287 /**
1288  * A client disconnected.  Remove all of its pending queries.
1289  *
1290  * @param cls closure, NULL
1291  * @param client identification of the client
1292  */
1293 static void
1294 handle_client_disconnect (void *cls,
1295                           struct GNUNET_SERVER_Client
1296                           * client)
1297 {
1298   struct ClientList *pos;
1299   struct ClientList *prev;
1300   struct ClientRequestList *rcl;
1301   struct ClientResponseMessage *creply;
1302
1303   if (client == NULL)
1304     return;
1305   prev = NULL;
1306   pos = client_list;
1307   while ( (NULL != pos) &&
1308           (pos->client != client) )
1309     {
1310       prev = pos;
1311       pos = pos->next;
1312     }
1313   if (pos == NULL)
1314     return; /* no requests pending for this client */
1315   while (NULL != (rcl = pos->rl_head))
1316     {
1317       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1318                   "Destroying pending request `%s' on disconnect\n",
1319                   GNUNET_h2s (&rcl->req->query));
1320       destroy_pending_request (rcl->req);
1321     }
1322   if (prev == NULL)
1323     client_list = pos->next;
1324   else
1325     prev->next = pos->next;
1326   if (pos->th != NULL)
1327     {
1328       GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
1329       pos->th = NULL;
1330     }
1331   while (NULL != (creply = pos->res_head))
1332     {
1333       GNUNET_CONTAINER_DLL_remove (pos->res_head,
1334                                    pos->res_tail,
1335                                    creply);
1336       GNUNET_free (creply);
1337     }    
1338   GNUNET_SERVER_client_drop (pos->client);
1339   GNUNET_free (pos);
1340   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1341                                          &remove_client_from_last_client_replies,
1342                                          client);
1343 }
1344
1345
1346 /**
1347  * Iterator to free peer entries.
1348  *
1349  * @param cls closure, unused
1350  * @param key current key code
1351  * @param value value in the hash map (peer entry)
1352  * @return GNUNET_YES (we should continue to iterate)
1353  */
1354 static int 
1355 clean_peer (void *cls,
1356             const GNUNET_HashCode * key,
1357             void *value)
1358 {
1359   peer_disconnect_handler (NULL, (const struct GNUNET_PeerIdentity*) key);
1360   return GNUNET_YES;
1361 }
1362
1363
1364 /**
1365  * Task run during shutdown.
1366  *
1367  * @param cls unused
1368  * @param tc unused
1369  */
1370 static void
1371 shutdown_task (void *cls,
1372                const struct GNUNET_SCHEDULER_TaskContext *tc)
1373 {
1374   if (mig_qe != NULL)
1375     {
1376       GNUNET_DATASTORE_cancel (mig_qe);
1377       mig_qe = NULL;
1378     }
1379   if (GNUNET_SCHEDULER_NO_TASK != mig_task)
1380     {
1381       GNUNET_SCHEDULER_cancel (sched, mig_task);
1382       mig_task = GNUNET_SCHEDULER_NO_TASK;
1383     }
1384   while (client_list != NULL)
1385     handle_client_disconnect (NULL,
1386                               client_list->client);
1387   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1388                                          &clean_peer,
1389                                          NULL);
1390   GNUNET_break (0 == GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap));
1391   GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
1392   requests_by_expiration_heap = 0;
1393   GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
1394   connected_peers = NULL;
1395   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map));
1396   GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
1397   query_request_map = NULL;
1398   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map));
1399   GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
1400   peer_request_map = NULL;
1401   GNUNET_assert (NULL != core);
1402   GNUNET_CORE_disconnect (core);
1403   core = NULL;
1404   if (stats != NULL)
1405     {
1406       GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
1407       stats = NULL;
1408     }
1409   GNUNET_DATASTORE_disconnect (dsh,
1410                                GNUNET_NO);
1411   while (mig_head != NULL)
1412     delete_migration_block (mig_head);
1413   GNUNET_assert (0 == mig_size);
1414   dsh = NULL;
1415   sched = NULL;
1416   cfg = NULL;  
1417 }
1418
1419
1420 /* ******************* Utility functions  ******************** */
1421
1422
1423 /**
1424  * Transmit messages by copying it to the target buffer
1425  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
1426  * for writing in the meantime.  In that case, do nothing
1427  * (the disconnect or shutdown handler will take care of the rest).
1428  * If we were able to transmit messages and there are still more
1429  * pending, ask core again for further calls to this function.
1430  *
1431  * @param cls closure, pointer to the 'struct ConnectedPeer*'
1432  * @param size number of bytes available in buf
1433  * @param buf where the callee should write the message
1434  * @return number of bytes written to buf
1435  */
1436 static size_t
1437 transmit_to_peer (void *cls,
1438                   size_t size, void *buf)
1439 {
1440   struct ConnectedPeer *cp = cls;
1441   char *cbuf = buf;
1442   struct GNUNET_PeerIdentity pid;
1443   struct PendingMessage *pm;
1444   struct MigrationReadyBlock *mb;
1445   struct MigrationReadyBlock *next;
1446   struct PutMessage migm;
1447   size_t msize;
1448   unsigned int i;
1449  
1450   cp->cth = NULL;
1451   if (NULL == buf)
1452     {
1453 #if DEBUG_FS
1454       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1455                   "Dropping message, core too busy.\n");
1456 #endif
1457       return 0;
1458     }
1459   msize = 0;
1460   while ( (NULL != (pm = cp->pending_messages_head) ) &&
1461           (pm->msize <= size) )
1462     {
1463       memcpy (&cbuf[msize], &pm[1], pm->msize);
1464       msize += pm->msize;
1465       size -= pm->msize;
1466       destroy_pending_message (pm, cp->pid);
1467     }
1468   if (NULL != pm)
1469     {
1470       GNUNET_PEER_resolve (cp->pid,
1471                            &pid);
1472       cp->cth = GNUNET_CORE_notify_transmit_ready (core,
1473                                                    pm->priority,
1474                                                    GNUNET_CONSTANTS_SERVICE_TIMEOUT,
1475                                                    &pid,
1476                                                    pm->msize,
1477                                                    &transmit_to_peer,
1478                                                    cp);
1479     }
1480   else
1481     {      
1482       next = mig_head;
1483       while (NULL != (mb = next))
1484         {
1485           next = mb->next;
1486           for (i=0;i<MIGRATION_LIST_SIZE;i++)
1487             {
1488               if ( (cp->pid == mb->target_list[i]) &&
1489                    (mb->size + sizeof (migm) <= size) )
1490                 {
1491                   GNUNET_PEER_change_rc (mb->target_list[i], -1);
1492                   mb->target_list[i] = 0;
1493                   mb->used_targets++;
1494                   migm.header.size = htons (sizeof (migm) + mb->size);
1495                   migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
1496                   migm.type = htonl (mb->type);
1497                   migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
1498                   memcpy (&cbuf[msize], &migm, sizeof (migm));
1499                   msize += sizeof (migm);
1500                   size -= sizeof (migm);
1501                   memcpy (&cbuf[msize], &mb[1], mb->size);
1502                   msize += mb->size;
1503                   size -= mb->size;               
1504                   break;
1505                 }
1506             }
1507           if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
1508                (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
1509             {
1510               delete_migration_block (mb);
1511               consider_migration_gathering ();
1512             }
1513         }
1514       consider_migration (NULL, 
1515                           &pid.hashPubKey,
1516                           cp);
1517     }
1518 #if DEBUG_FS > 3
1519   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1520               "Transmitting %u bytes to peer %u\n",
1521               msize,
1522               cp->pid);
1523 #endif
1524   return msize;
1525 }
1526
1527
1528 /**
1529  * Add a message to the set of pending messages for the given peer.
1530  *
1531  * @param cp peer to send message to
1532  * @param pm message to queue
1533  * @param pr request on which behalf this message is being queued
1534  */
1535 static void
1536 add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
1537                                   struct PendingMessage *pm,
1538                                   struct PendingRequest *pr)
1539 {
1540   struct PendingMessage *pos;
1541   struct PendingMessageList *pml;
1542   struct GNUNET_PeerIdentity pid;
1543
1544   GNUNET_assert (pm->next == NULL);
1545   GNUNET_assert (pm->pml == NULL);    
1546   pml = GNUNET_malloc (sizeof (struct PendingMessageList));
1547   pml->req = pr;
1548   pml->target = cp;
1549   pml->pm = pm;
1550   pm->pml = pml;  
1551   GNUNET_CONTAINER_DLL_insert (pr->pending_head,
1552                                pr->pending_tail,
1553                                pml);
1554   pos = cp->pending_messages_head;
1555   while ( (pos != NULL) &&
1556           (pm->priority < pos->priority) )
1557     pos = pos->next;    
1558   GNUNET_CONTAINER_DLL_insert_after (cp->pending_messages_head,
1559                                      cp->pending_messages_tail,
1560                                      pos,
1561                                      pm);
1562   cp->pending_requests++;
1563   if (cp->pending_requests > MAX_QUEUE_PER_PEER)
1564     destroy_pending_message (cp->pending_messages_tail, 0);  
1565   GNUNET_PEER_resolve (cp->pid, &pid);
1566   if (NULL != cp->cth)
1567     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1568   /* need to schedule transmission */
1569   cp->cth = GNUNET_CORE_notify_transmit_ready (core,
1570                                                cp->pending_messages_head->priority,
1571                                                MAX_TRANSMIT_DELAY,
1572                                                &pid,
1573                                                cp->pending_messages_head->msize,
1574                                                &transmit_to_peer,
1575                                                cp);
1576   if (cp->cth == NULL)
1577     {
1578 #if DEBUG_FS
1579       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1580                   "Failed to schedule transmission with core!\n");
1581 #endif
1582       /* FIXME: call stats (rare, bad case) */
1583     }
1584 }
1585
1586
1587 /**
1588  * Mingle hash with the mingle_number to produce different bits.
1589  */
1590 static void
1591 mingle_hash (const GNUNET_HashCode * in,
1592              int32_t mingle_number, 
1593              GNUNET_HashCode * hc)
1594 {
1595   GNUNET_HashCode m;
1596
1597   GNUNET_CRYPTO_hash (&mingle_number, 
1598                       sizeof (int32_t), 
1599                       &m);
1600   GNUNET_CRYPTO_hash_xor (&m, in, hc);
1601 }
1602
1603
1604 /**
1605  * Test if the load on this peer is too high
1606  * to even consider processing the query at
1607  * all.
1608  * 
1609  * @return GNUNET_YES if the load is too high, GNUNET_NO otherwise
1610  */
1611 static int
1612 test_load_too_high ()
1613 {
1614   return GNUNET_NO; // FIXME
1615 }
1616
1617
1618 /* ******************* Pending Request Refresh Task ******************** */
1619
1620
1621
1622 /**
1623  * We use a random delay to make the timing of requests less
1624  * predictable.  This function returns such a random delay.  We add a base
1625  * delay of MAX_CORK_DELAY (1s).
1626  *
1627  * FIXME: make schedule dependent on the specifics of the request?
1628  * Or bandwidth and number of connected peers and load?
1629  *
1630  * @return random delay to use for some request, between 1s and 1000+TTL_DECREMENT ms
1631  */
1632 static struct GNUNET_TIME_Relative
1633 get_processing_delay ()
1634 {
1635   return 
1636     GNUNET_TIME_relative_add (GNUNET_CONSTANTS_MAX_CORK_DELAY,
1637                               GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
1638                                                              GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1639                                                                                        TTL_DECREMENT)));
1640 }
1641
1642
1643 /**
1644  * We're processing a GET request from another peer and have decided
1645  * to forward it to other peers.  This function is called periodically
1646  * and should forward the request to other peers until we have all
1647  * possible replies.  If we have transmitted the *only* reply to
1648  * the initiator we should destroy the pending request.  If we have
1649  * many replies in the queue to the initiator, we should delay sending
1650  * out more queries until the reply queue has shrunk some.
1651  *
1652  * @param cls our "struct ProcessGetContext *"
1653  * @param tc unused
1654  */
1655 static void
1656 forward_request_task (void *cls,
1657                       const struct GNUNET_SCHEDULER_TaskContext *tc);
1658
1659
1660 /**
1661  * Function called after we either failed or succeeded
1662  * at transmitting a query to a peer.  
1663  *
1664  * @param cls the requests "struct PendingRequest*"
1665  * @param tpid ID of receiving peer, 0 on transmission error
1666  */
1667 static void
1668 transmit_query_continuation (void *cls,
1669                              GNUNET_PEER_Id tpid)
1670 {
1671   struct PendingRequest *pr = cls;
1672
1673   GNUNET_STATISTICS_update (stats,
1674                             gettext_noop ("# queries scheduled for forwarding"),
1675                             -1,
1676                             GNUNET_NO);
1677   if (tpid == 0)   
1678     {
1679 #if DEBUG_FS
1680       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1681                   "Transmission of request failed, will try again later.\n");
1682 #endif
1683       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
1684         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1685                                                  get_processing_delay (),
1686                                                  &forward_request_task,
1687                                                  pr); 
1688       return;    
1689     }
1690   GNUNET_STATISTICS_update (stats,
1691                             gettext_noop ("# queries forwarded"),
1692                             1,
1693                             GNUNET_NO);
1694   GNUNET_PEER_change_rc (tpid, 1);
1695   if (pr->used_pids_off == pr->used_pids_size)
1696     GNUNET_array_grow (pr->used_pids,
1697                        pr->used_pids_size,
1698                        pr->used_pids_size * 2 + 2);
1699   pr->used_pids[pr->used_pids_off++] = tpid;
1700   if (pr->task == GNUNET_SCHEDULER_NO_TASK)
1701     pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1702                                              get_processing_delay (),
1703                                              &forward_request_task,
1704                                              pr);
1705 }
1706
1707
1708 /**
1709  * How many bytes should a bloomfilter be if we have already seen
1710  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
1711  * of bits set per entry.  Furthermore, we should not re-size the
1712  * filter too often (to keep it cheap).
1713  *
1714  * Since other peers will also add entries but not resize the filter,
1715  * we should generally pick a slightly larger size than what the
1716  * strict math would suggest.
1717  *
1718  * @return must be a power of two and smaller or equal to 2^15.
1719  */
1720 static size_t
1721 compute_bloomfilter_size (unsigned int entry_count)
1722 {
1723   size_t size;
1724   unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
1725   uint16_t max = 1 << 15;
1726
1727   if (entry_count > max)
1728     return max;
1729   size = 8;
1730   while ((size < max) && (size < ideal))
1731     size *= 2;
1732   if (size > max)
1733     return max;
1734   return size;
1735 }
1736
1737
1738 /**
1739  * Recalculate our bloom filter for filtering replies.  This function
1740  * will create a new bloom filter from scratch, so it should only be
1741  * called if we have no bloomfilter at all (and hence can create a
1742  * fresh one of minimal size without problems) OR if our peer is the
1743  * initiator (in which case we may resize to larger than mimimum size).
1744  *
1745  * @param pr request for which the BF is to be recomputed
1746  */
1747 static void
1748 refresh_bloomfilter (struct PendingRequest *pr)
1749 {
1750   unsigned int i;
1751   size_t nsize;
1752   GNUNET_HashCode mhash;
1753
1754   nsize = compute_bloomfilter_size (pr->replies_seen_off);
1755   if (nsize == pr->bf_size)
1756     return; /* size not changed */
1757   if (pr->bf != NULL)
1758     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
1759   pr->bf_size = nsize;
1760   pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
1761   pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
1762                                               pr->bf_size,
1763                                               BLOOMFILTER_K);
1764   for (i=0;i<pr->replies_seen_off;i++)
1765     {
1766       mingle_hash (&pr->replies_seen[i], pr->mingle, &mhash);
1767       GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
1768     }
1769 }
1770
1771
1772 /**
1773  * Function called after we've tried to reserve a certain amount of
1774  * bandwidth for a reply.  Check if we succeeded and if so send our
1775  * query.
1776  *
1777  * @param cls the requests "struct PendingRequest*"
1778  * @param peer identifies the peer
1779  * @param bpm_in set to the current bandwidth limit (receiving) for this peer
1780  * @param bpm_out set to the current bandwidth limit (sending) for this peer
1781  * @param amount set to the amount that was actually reserved or unreserved
1782  * @param preference current traffic preference for the given peer
1783  */
1784 static void
1785 target_reservation_cb (void *cls,
1786                        const struct
1787                        GNUNET_PeerIdentity * peer,
1788                        struct GNUNET_BANDWIDTH_Value32NBO bpm_in,
1789                        struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
1790                        int amount,
1791                        uint64_t preference)
1792 {
1793   struct PendingRequest *pr = cls;
1794   struct ConnectedPeer *cp;
1795   struct PendingMessage *pm;
1796   struct GetMessage *gm;
1797   GNUNET_HashCode *ext;
1798   char *bfdata;
1799   size_t msize;
1800   unsigned int k;
1801   int no_route;
1802   uint32_t bm;
1803
1804   pr->irc = NULL;
1805   if (peer == NULL)
1806     {
1807       /* error in communication with core, try again later */
1808       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
1809         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1810                                                  get_processing_delay (),
1811                                                  &forward_request_task,
1812                                                  pr);
1813       return;
1814     }
1815   // (3) transmit, update ttl/priority
1816   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1817                                           &peer->hashPubKey);
1818   if (cp == NULL)
1819     {
1820       /* Peer must have just left */
1821 #if DEBUG_FS
1822       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1823                   "Selected peer disconnected!\n");
1824 #endif
1825       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
1826         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1827                                                  get_processing_delay (),
1828                                                  &forward_request_task,
1829                                                  pr);
1830       return;
1831     }
1832   no_route = GNUNET_NO;
1833   /* FIXME: check against DBLOCK_SIZE and possibly return
1834      amount to reserve; however, this also needs to work
1835      with testcases which currently start out with a far
1836      too low per-peer bw limit, so they would never send
1837      anything.  Big issue. */
1838   if (amount == 0)
1839     {
1840       if (pr->cp == NULL)
1841         {
1842 #if DEBUG_FS > 1
1843           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1844                       "Failed to reserve bandwidth for reply (got %d/%u bytes only)!\n",
1845                       amount,
1846                       DBLOCK_SIZE);
1847 #endif
1848           GNUNET_STATISTICS_update (stats,
1849                                     gettext_noop ("# reply bandwidth reservation requests failed"),
1850                                     1,
1851                                     GNUNET_NO);
1852           if (pr->task == GNUNET_SCHEDULER_NO_TASK)
1853             pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1854                                                      get_processing_delay (),
1855                                                      &forward_request_task,
1856                                                      pr);
1857           return;  /* this target round failed */
1858         }
1859       /* FIXME: if we are "quite" busy, we may still want to skip
1860          this round; need more load detection code! */
1861       no_route = GNUNET_YES;
1862     }
1863   
1864   GNUNET_STATISTICS_update (stats,
1865                             gettext_noop ("# queries scheduled for forwarding"),
1866                             1,
1867                             GNUNET_NO);
1868   /* build message and insert message into priority queue */
1869 #if DEBUG_FS
1870   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1871               "Forwarding request `%s' to `%4s'!\n",
1872               GNUNET_h2s (&pr->query),
1873               GNUNET_i2s (peer));
1874 #endif
1875   k = 0;
1876   bm = 0;
1877   if (GNUNET_YES == no_route)
1878     {
1879       bm |= GET_MESSAGE_BIT_RETURN_TO;
1880       k++;      
1881     }
1882   if (pr->namespace != NULL)
1883     {
1884       bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
1885       k++;
1886     }
1887   if (pr->target_pid != 0)
1888     {
1889       bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
1890       k++;
1891     }
1892   msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
1893   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1894   pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
1895   pm->msize = msize;
1896   gm = (struct GetMessage*) &pm[1];
1897   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
1898   gm->header.size = htons (msize);
1899   gm->type = htonl (pr->type);
1900   pr->remaining_priority /= 2;
1901   gm->priority = htonl (pr->remaining_priority);
1902   gm->ttl = htonl (pr->ttl);
1903   gm->filter_mutator = htonl(pr->mingle); 
1904   gm->hash_bitmap = htonl (bm);
1905   gm->query = pr->query;
1906   ext = (GNUNET_HashCode*) &gm[1];
1907   k = 0;
1908   if (GNUNET_YES == no_route)
1909     GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
1910   if (pr->namespace != NULL)
1911     memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
1912   if (pr->target_pid != 0)
1913     GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
1914   bfdata = (char *) &ext[k];
1915   if (pr->bf != NULL)
1916     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
1917                                                bfdata,
1918                                                pr->bf_size);
1919   pm->cont = &transmit_query_continuation;
1920   pm->cont_cls = pr;
1921   add_to_pending_messages_for_peer (cp, pm, pr);
1922 }
1923
1924
1925 /**
1926  * Closure used for "target_peer_select_cb".
1927  */
1928 struct PeerSelectionContext 
1929 {
1930   /**
1931    * The request for which we are selecting
1932    * peers.
1933    */
1934   struct PendingRequest *pr;
1935
1936   /**
1937    * Current "prime" target.
1938    */
1939   struct GNUNET_PeerIdentity target;
1940
1941   /**
1942    * How much do we like this target?
1943    */
1944   double target_score;
1945
1946 };
1947
1948
1949 /**
1950  * Function called for each connected peer to determine
1951  * which one(s) would make good targets for forwarding.
1952  *
1953  * @param cls closure (struct PeerSelectionContext)
1954  * @param key current key code (peer identity)
1955  * @param value value in the hash map (struct ConnectedPeer)
1956  * @return GNUNET_YES if we should continue to
1957  *         iterate,
1958  *         GNUNET_NO if not.
1959  */
1960 static int
1961 target_peer_select_cb (void *cls,
1962                        const GNUNET_HashCode * key,
1963                        void *value)
1964 {
1965   struct PeerSelectionContext *psc = cls;
1966   struct ConnectedPeer *cp = value;
1967   struct PendingRequest *pr = psc->pr;
1968   double score;
1969   unsigned int i;
1970   unsigned int pc;
1971
1972   /* 1) check that this peer is not the initiator */
1973   if (cp == pr->cp)
1974     return GNUNET_YES; /* skip */          
1975
1976   /* 2) check if we have already (recently) forwarded to this peer */
1977   pc = 0;
1978   for (i=0;i<pr->used_pids_off;i++)
1979     if (pr->used_pids[i] == cp->pid) 
1980       {
1981         pc++;
1982         if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1983                                            RETRY_PROBABILITY_INV))
1984           {
1985 #if DEBUG_FS
1986             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1987                         "NOT re-trying query that was previously transmitted %u times\n",
1988                         (unsigned int) pr->used_pids_off);
1989 #endif
1990             return GNUNET_YES; /* skip */
1991           }
1992       }
1993 #if DEBUG_FS
1994   if (0 < pc)
1995     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1996                 "Re-trying query that was previously transmitted %u times to this peer\n",
1997                 (unsigned int) pc);
1998 #endif
1999   // 3) calculate how much we'd like to forward to this peer
2000   score = 42; // FIXME!
2001   // FIXME: also need API to gather data on responsiveness
2002   // of this peer (we have fields for that in 'cp', but
2003   // they are never set!)
2004   
2005   /* store best-fit in closure */
2006   if (score > psc->target_score)
2007     {
2008       psc->target_score = score;
2009       psc->target.hashPubKey = *key; 
2010     }
2011   return GNUNET_YES;
2012 }
2013   
2014
2015 /**
2016  * The priority level imposes a bound on the maximum
2017  * value for the ttl that can be requested.
2018  *
2019  * @param ttl_in requested ttl
2020  * @param prio given priority
2021  * @return ttl_in if ttl_in is below the limit,
2022  *         otherwise the ttl-limit for the given priority
2023  */
2024 static int32_t
2025 bound_ttl (int32_t ttl_in, uint32_t prio)
2026 {
2027   unsigned long long allowed;
2028
2029   if (ttl_in <= 0)
2030     return ttl_in;
2031   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
2032   if (ttl_in > allowed)      
2033     {
2034       if (allowed >= (1 << 30))
2035         return 1 << 30;
2036       return allowed;
2037     }
2038   return ttl_in;
2039 }
2040
2041
2042 /**
2043  * We're processing a GET request from another peer and have decided
2044  * to forward it to other peers.  This function is called periodically
2045  * and should forward the request to other peers until we have all
2046  * possible replies.  If we have transmitted the *only* reply to
2047  * the initiator we should destroy the pending request.  If we have
2048  * many replies in the queue to the initiator, we should delay sending
2049  * out more queries until the reply queue has shrunk some.
2050  *
2051  * @param cls our "struct ProcessGetContext *"
2052  * @param tc unused
2053  */
2054 static void
2055 forward_request_task (void *cls,
2056                      const struct GNUNET_SCHEDULER_TaskContext *tc)
2057 {
2058   struct PendingRequest *pr = cls;
2059   struct PeerSelectionContext psc;
2060   struct ConnectedPeer *cp; 
2061   struct GNUNET_TIME_Relative delay;
2062
2063   pr->task = GNUNET_SCHEDULER_NO_TASK;
2064   if (pr->irc != NULL)
2065     {
2066 #if DEBUG_FS
2067       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2068                   "Forwarding of query `%s' not attempted due to pending local lookup!\n",
2069                   GNUNET_h2s (&pr->query));
2070 #endif
2071       return; /* already pending */
2072     }
2073   if (GNUNET_YES == pr->local_only)
2074     return; /* configured to not do P2P search */
2075   /* (1) select target */
2076   psc.pr = pr;
2077   psc.target_score = DBL_MIN;
2078   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
2079                                          &target_peer_select_cb,
2080                                          &psc);  
2081   if (psc.target_score == DBL_MIN)
2082     {
2083       delay = get_processing_delay ();
2084 #if DEBUG_FS 
2085       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2086                   "No peer selected for forwarding of query `%s', will try again in %llu ms!\n",
2087                   GNUNET_h2s (&pr->query),
2088                   delay.value);
2089 #endif
2090       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2091                                                delay,
2092                                                &forward_request_task,
2093                                                pr);
2094       return; /* nobody selected */
2095     }
2096   /* (3) update TTL/priority */
2097   
2098   if (pr->client_request_list != NULL)
2099     {
2100       /* FIXME: use better algorithm!? */
2101       if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2102                                          4))
2103         pr->priority++;
2104       /* FIXME: bound priority by "customary" priority used by other peers
2105          at this time! */
2106       pr->ttl = bound_ttl (pr->ttl + TTL_DECREMENT * 2,
2107                            pr->priority);
2108 #if DEBUG_FS
2109       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2110                   "Trying query `%s' with priority %u and TTL %d.\n",
2111                   GNUNET_h2s (&pr->query),
2112                   pr->priority,
2113                   pr->ttl);
2114 #endif
2115     }
2116   else
2117     {
2118       /* FIXME: should we do something here as well!? */
2119     }
2120
2121   /* (3) reserve reply bandwidth */
2122   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2123                                           &psc.target.hashPubKey);
2124   GNUNET_assert (NULL != cp);
2125   pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
2126                                                 &psc.target,
2127                                                 GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
2128                                                 GNUNET_BANDWIDTH_value_init ((uint32_t) -1 /* no limit */), 
2129                                                 DBLOCK_SIZE * 2, 
2130                                                 (uint64_t) cp->inc_preference,
2131                                                 &target_reservation_cb,
2132                                                 pr);
2133   cp->inc_preference = 0.0;
2134 }
2135
2136
2137 /* **************************** P2P PUT Handling ************************ */
2138
2139
2140 /**
2141  * Function called after we either failed or succeeded
2142  * at transmitting a reply to a peer.  
2143  *
2144  * @param cls the requests "struct PendingRequest*"
2145  * @param tpid ID of receiving peer, 0 on transmission error
2146  */
2147 static void
2148 transmit_reply_continuation (void *cls,
2149                              GNUNET_PEER_Id tpid)
2150 {
2151   struct PendingRequest *pr = cls;
2152   
2153   switch (pr->type)
2154     {
2155     case GNUNET_BLOCK_TYPE_DBLOCK:
2156     case GNUNET_BLOCK_TYPE_IBLOCK:
2157       /* only one reply expected, done with the request! */
2158       destroy_pending_request (pr);
2159       break;
2160     case GNUNET_BLOCK_TYPE_ANY:
2161     case GNUNET_BLOCK_TYPE_KBLOCK:
2162     case GNUNET_BLOCK_TYPE_SBLOCK:
2163       break;
2164     default:
2165       GNUNET_break (0);
2166       break;
2167     }
2168 }
2169
2170
2171 /**
2172  * Transmit the given message by copying it to the target buffer
2173  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
2174  * for writing in the meantime.  In that case, do nothing
2175  * (the disconnect or shutdown handler will take care of the rest).
2176  * If we were able to transmit messages and there are still more
2177  * pending, ask core again for further calls to this function.
2178  *
2179  * @param cls closure, pointer to the 'struct ClientList*'
2180  * @param size number of bytes available in buf
2181  * @param buf where the callee should write the message
2182  * @return number of bytes written to buf
2183  */
2184 static size_t
2185 transmit_to_client (void *cls,
2186                   size_t size, void *buf)
2187 {
2188   struct ClientList *cl = cls;
2189   char *cbuf = buf;
2190   struct ClientResponseMessage *creply;
2191   size_t msize;
2192   
2193   cl->th = NULL;
2194   if (NULL == buf)
2195     {
2196 #if DEBUG_FS
2197       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2198                   "Not sending reply, client communication problem.\n");
2199 #endif
2200       return 0;
2201     }
2202   msize = 0;
2203   while ( (NULL != (creply = cl->res_head) ) &&
2204           (creply->msize <= size) )
2205     {
2206       memcpy (&cbuf[msize], &creply[1], creply->msize);
2207       msize += creply->msize;
2208       size -= creply->msize;
2209       GNUNET_CONTAINER_DLL_remove (cl->res_head,
2210                                    cl->res_tail,
2211                                    creply);
2212       GNUNET_free (creply);
2213     }
2214   if (NULL != creply)
2215     cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
2216                                                   creply->msize,
2217                                                   GNUNET_TIME_UNIT_FOREVER_REL,
2218                                                   &transmit_to_client,
2219                                                   cl);
2220 #if DEBUG_FS
2221   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2222               "Transmitted %u bytes to client\n",
2223               (unsigned int) msize);
2224 #endif
2225   return msize;
2226 }
2227
2228
2229 /**
2230  * Closure for "process_reply" function.
2231  */
2232 struct ProcessReplyClosure
2233 {
2234   /**
2235    * The data for the reply.
2236    */
2237   const void *data;
2238
2239   /**
2240    * Who gave us this reply? NULL for local host.
2241    */
2242   struct ConnectedPeer *sender;
2243
2244   /**
2245    * When the reply expires.
2246    */
2247   struct GNUNET_TIME_Absolute expiration;
2248
2249   /**
2250    * Size of data.
2251    */
2252   size_t size;
2253
2254   /**
2255    * Namespace that this reply belongs to
2256    * (if it is of type SBLOCK).
2257    */
2258   GNUNET_HashCode namespace;
2259
2260   /**
2261    * Type of the block.
2262    */
2263   enum GNUNET_BLOCK_Type type;
2264
2265   /**
2266    * How much was this reply worth to us?
2267    */
2268   uint32_t priority;
2269
2270   /**
2271    * Did we finish processing the associated request?
2272    */ 
2273   int finished;
2274 };
2275
2276
2277 /**
2278  * We have received a reply; handle it!
2279  *
2280  * @param cls response (struct ProcessReplyClosure)
2281  * @param key our query
2282  * @param value value in the hash map (info about the query)
2283  * @return GNUNET_YES (we should continue to iterate)
2284  */
2285 static int
2286 process_reply (void *cls,
2287                const GNUNET_HashCode * key,
2288                void *value)
2289 {
2290   struct ProcessReplyClosure *prq = cls;
2291   struct PendingRequest *pr = value;
2292   struct PendingMessage *reply;
2293   struct ClientResponseMessage *creply;
2294   struct ClientList *cl;
2295   struct PutMessage *pm;
2296   struct ConnectedPeer *cp;
2297   struct GNUNET_TIME_Relative cur_delay;
2298   GNUNET_HashCode chash;
2299   GNUNET_HashCode mhash;
2300   size_t msize;
2301
2302 #if DEBUG_FS
2303   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2304               "Matched result (type %u) for query `%s' with pending request\n",
2305               (unsigned int) prq->type,
2306               GNUNET_h2s (key));
2307 #endif  
2308   GNUNET_STATISTICS_update (stats,
2309                             gettext_noop ("# replies received and matched"),
2310                             1,
2311                             GNUNET_NO);
2312   if (prq->sender != NULL)
2313     {
2314       /* FIXME: should we be more precise here and not use
2315          "start_time" but a peer-specific time stamp? */
2316       cur_delay = GNUNET_TIME_absolute_get_duration (pr->start_time);
2317       prq->sender->avg_delay.value
2318         = (prq->sender->avg_delay.value * 
2319            (RUNAVG_DELAY_N - 1) + cur_delay.value) / RUNAVG_DELAY_N; 
2320       prq->sender->avg_priority
2321         = (prq->sender->avg_priority * 
2322            (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
2323       if (pr->cp != NULL)
2324         {
2325           GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
2326                                  [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], -1);
2327           GNUNET_PEER_change_rc (pr->cp->pid, 1);
2328           prq->sender->last_p2p_replies
2329             [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
2330             = pr->cp->pid;
2331         }
2332       else
2333         {
2334           if (NULL != prq->sender->last_client_replies
2335               [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
2336             GNUNET_SERVER_client_drop (prq->sender->last_client_replies
2337                                        [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
2338           prq->sender->last_client_replies
2339             [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
2340             = pr->client_request_list->client_list->client;
2341           GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
2342         }
2343     }
2344   GNUNET_CRYPTO_hash (prq->data,
2345                       prq->size,
2346                       &chash);
2347   switch (prq->type)
2348     {
2349     case GNUNET_BLOCK_TYPE_DBLOCK:
2350     case GNUNET_BLOCK_TYPE_IBLOCK:
2351       /* only possible reply, stop requesting! */
2352       while (NULL != pr->pending_head)
2353         destroy_pending_message_list_entry (pr->pending_head);
2354       if (pr->qe != NULL)
2355         {
2356           if (pr->client_request_list != NULL)
2357             GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
2358                                         GNUNET_YES);
2359           GNUNET_DATASTORE_cancel (pr->qe);
2360           pr->qe = NULL;
2361         }
2362       pr->do_remove = GNUNET_YES;
2363       if (pr->task != GNUNET_SCHEDULER_NO_TASK)
2364         {
2365           GNUNET_SCHEDULER_cancel (sched,
2366                                    pr->task);
2367           pr->task = GNUNET_SCHEDULER_NO_TASK;
2368         }
2369       GNUNET_break (GNUNET_YES ==
2370                     GNUNET_CONTAINER_multihashmap_remove (query_request_map,
2371                                                           key,
2372                                                           pr));
2373       break;
2374     case GNUNET_BLOCK_TYPE_SBLOCK:
2375       if (pr->namespace == NULL)
2376         {
2377           GNUNET_break (0);
2378           return GNUNET_YES;
2379         }
2380       if (0 != memcmp (pr->namespace,
2381                        &prq->namespace,
2382                        sizeof (GNUNET_HashCode)))
2383         {
2384           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2385                       _("Reply mismatched in terms of namespace.  Discarded.\n"));
2386           return GNUNET_YES; /* wrong namespace */      
2387         }
2388       /* then: fall-through! */
2389     case GNUNET_BLOCK_TYPE_KBLOCK:
2390     case GNUNET_BLOCK_TYPE_NBLOCK:
2391       if (pr->bf != NULL) 
2392         {
2393           mingle_hash (&chash, pr->mingle, &mhash);
2394           if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (pr->bf,
2395                                                                &mhash))
2396             {
2397               GNUNET_STATISTICS_update (stats,
2398                                         gettext_noop ("# duplicate replies discarded (bloomfilter)"),
2399                                         1,
2400                                         GNUNET_NO);
2401 #if DEBUG_FS
2402               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2403                           "Duplicate response `%s', discarding.\n",
2404                           GNUNET_h2s (&mhash));
2405 #endif
2406               return GNUNET_YES; /* duplicate */
2407             }
2408 #if DEBUG_FS
2409           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2410                       "New response `%s', adding to filter.\n",
2411                       GNUNET_h2s (&mhash));
2412 #endif
2413         }
2414       if (pr->client_request_list != NULL)
2415         {
2416           if (pr->replies_seen_size == pr->replies_seen_off)
2417             GNUNET_array_grow (pr->replies_seen,
2418                                pr->replies_seen_size,
2419                                pr->replies_seen_size * 2 + 4);  
2420             pr->replies_seen[pr->replies_seen_off++] = chash;         
2421         }
2422       if ( (pr->bf == NULL) ||
2423            (pr->client_request_list != NULL) )
2424         refresh_bloomfilter (pr);
2425       GNUNET_CONTAINER_bloomfilter_add (pr->bf,
2426                                         &mhash);
2427       break;
2428     default:
2429       GNUNET_break (0);
2430       return GNUNET_YES;
2431     }
2432   prq->priority += pr->remaining_priority;
2433   pr->remaining_priority = 0;
2434   if (NULL != pr->client_request_list)
2435     {
2436       GNUNET_STATISTICS_update (stats,
2437                                 gettext_noop ("# replies received for local clients"),
2438                                 1,
2439                                 GNUNET_NO);
2440       cl = pr->client_request_list->client_list;
2441       msize = sizeof (struct PutMessage) + prq->size;
2442       creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage));
2443       creply->msize = msize;
2444       creply->client_list = cl;
2445       GNUNET_CONTAINER_DLL_insert_after (cl->res_head,
2446                                          cl->res_tail,
2447                                          cl->res_tail,
2448                                          creply);      
2449       pm = (struct PutMessage*) &creply[1];
2450       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2451       pm->header.size = htons (msize);
2452       pm->type = htonl (prq->type);
2453       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
2454       memcpy (&pm[1], prq->data, prq->size);      
2455       if (NULL == cl->th)
2456         {
2457 #if DEBUG_FS
2458           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2459                       "Transmitting result for query `%s' to client\n",
2460                       GNUNET_h2s (key));
2461 #endif  
2462           cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
2463                                                         msize,
2464                                                         GNUNET_TIME_UNIT_FOREVER_REL,
2465                                                         &transmit_to_client,
2466                                                         cl);
2467         }
2468       GNUNET_break (cl->th != NULL);
2469       if (pr->do_remove)                
2470         {
2471           prq->finished = GNUNET_YES;
2472           destroy_pending_request (pr);         
2473         }
2474     }
2475   else
2476     {
2477       cp = pr->cp;
2478 #if DEBUG_FS
2479       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2480                   "Transmitting result for query `%s' to other peer (PID=%u)\n",
2481                   GNUNET_h2s (key),
2482                   (unsigned int) cp->pid);
2483 #endif  
2484       GNUNET_STATISTICS_update (stats,
2485                                 gettext_noop ("# replies received for other peers"),
2486                                 1,
2487                                 GNUNET_NO);
2488       msize = sizeof (struct PutMessage) + prq->size;
2489       reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
2490       reply->cont = &transmit_reply_continuation;
2491       reply->cont_cls = pr;
2492       reply->msize = msize;
2493       reply->priority = (uint32_t) -1; /* send replies first! */
2494       pm = (struct PutMessage*) &reply[1];
2495       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2496       pm->header.size = htons (msize);
2497       pm->type = htonl (prq->type);
2498       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
2499       memcpy (&pm[1], prq->data, prq->size);
2500       add_to_pending_messages_for_peer (cp, reply, pr);
2501     }
2502   return GNUNET_YES;
2503 }
2504
2505
2506 /**
2507  * Continuation called to notify client about result of the
2508  * operation.
2509  *
2510  * @param cls closure
2511  * @param success GNUNET_SYSERR on failure
2512  * @param msg NULL on success, otherwise an error message
2513  */
2514 static void 
2515 put_migration_continuation (void *cls,
2516                             int success,
2517                             const char *msg)
2518 {
2519   /* FIXME */
2520 }
2521
2522
2523 /**
2524  * Handle P2P "PUT" message.
2525  *
2526  * @param cls closure, always NULL
2527  * @param other the other peer involved (sender or receiver, NULL
2528  *        for loopback messages where we are both sender and receiver)
2529  * @param message the actual message
2530  * @param latency reported latency of the connection with 'other'
2531  * @param distance reported distance (DV) to 'other' 
2532  * @return GNUNET_OK to keep the connection open,
2533  *         GNUNET_SYSERR to close it (signal serious error)
2534  */
2535 static int
2536 handle_p2p_put (void *cls,
2537                 const struct GNUNET_PeerIdentity *other,
2538                 const struct GNUNET_MessageHeader *message,
2539                 struct GNUNET_TIME_Relative latency,
2540                 uint32_t distance)
2541 {
2542   const struct PutMessage *put;
2543   uint16_t msize;
2544   size_t dsize;
2545   enum GNUNET_BLOCK_Type type;
2546   struct GNUNET_TIME_Absolute expiration;
2547   GNUNET_HashCode query;
2548   struct ProcessReplyClosure prq;
2549   const struct SBlock *sb;
2550
2551   msize = ntohs (message->size);
2552   if (msize < sizeof (struct PutMessage))
2553     {
2554       GNUNET_break_op(0);
2555       return GNUNET_SYSERR;
2556     }
2557   put = (const struct PutMessage*) message;
2558   dsize = msize - sizeof (struct PutMessage);
2559   type = ntohl (put->type);
2560   expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
2561
2562   if (GNUNET_OK !=
2563       GNUNET_BLOCK_check_block (type,
2564                                 &put[1],
2565                                 dsize,
2566                                 &query))
2567     {
2568       GNUNET_break_op (0);
2569       return GNUNET_SYSERR;
2570     }
2571   if (type == GNUNET_BLOCK_TYPE_ONDEMAND)
2572     return GNUNET_SYSERR;
2573   if (GNUNET_BLOCK_TYPE_SBLOCK == type)
2574     { 
2575       sb = (const struct SBlock*) &put[1];
2576       GNUNET_CRYPTO_hash (&sb->subspace,
2577                           sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
2578                           &prq.namespace);
2579     }
2580
2581 #if DEBUG_FS
2582   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2583               "Received result for query `%s' from peer `%4s'\n",
2584               GNUNET_h2s (&query),
2585               GNUNET_i2s (other));
2586 #endif
2587   GNUNET_STATISTICS_update (stats,
2588                             gettext_noop ("# replies received (overall)"),
2589                             1,
2590                             GNUNET_NO);
2591   /* now, lookup 'query' */
2592   prq.data = (const void*) &put[1];
2593   if (other != NULL)
2594     prq.sender = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2595                                                     &other->hashPubKey);
2596   prq.size = dsize;
2597   prq.type = type;
2598   prq.expiration = expiration;
2599   prq.priority = 0;
2600   prq.finished = GNUNET_NO;
2601   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
2602                                               &query,
2603                                               &process_reply,
2604                                               &prq);
2605   if (GNUNET_YES == active_migration)
2606     {
2607 #if DEBUG_FS
2608       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2609                   "Replicating result for query `%s' with priority %u\n",
2610                   GNUNET_h2s (&query),
2611                   prq.priority);
2612 #endif
2613       GNUNET_DATASTORE_put (dsh,
2614                             0, &query, dsize, &put[1],
2615                             type, prq.priority, 1 /* anonymity */, 
2616                             expiration, 
2617                             1 + prq.priority, MAX_DATASTORE_QUEUE,
2618                             GNUNET_CONSTANTS_SERVICE_TIMEOUT,
2619                             &put_migration_continuation, 
2620                             NULL);
2621     }
2622   return GNUNET_OK;
2623 }
2624
2625
2626 /* **************************** P2P GET Handling ************************ */
2627
2628
2629 /**
2630  * Closure for 'check_duplicate_request_{peer,client}'.
2631  */
2632 struct CheckDuplicateRequestClosure
2633 {
2634   /**
2635    * The new request we should check if it already exists.
2636    */
2637   const struct PendingRequest *pr;
2638
2639   /**
2640    * Existing request found by the checker, NULL if none.
2641    */
2642   struct PendingRequest *have;
2643 };
2644
2645
2646 /**
2647  * Iterator over entries in the 'query_request_map' that
2648  * tries to see if we have the same request pending from
2649  * the same client already.
2650  *
2651  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
2652  * @param key current key code (query, ignored, must match)
2653  * @param value value in the hash map (a 'struct PendingRequest' 
2654  *              that already exists)
2655  * @return GNUNET_YES if we should continue to
2656  *         iterate (no match yet)
2657  *         GNUNET_NO if not (match found).
2658  */
2659 static int
2660 check_duplicate_request_client (void *cls,
2661                                 const GNUNET_HashCode * key,
2662                                 void *value)
2663 {
2664   struct CheckDuplicateRequestClosure *cdc = cls;
2665   struct PendingRequest *have = value;
2666
2667   if (have->client_request_list == NULL)
2668     return GNUNET_YES;
2669   if ( (cdc->pr->client_request_list->client_list->client == have->client_request_list->client_list->client) &&
2670        (cdc->pr != have) )
2671     {
2672       cdc->have = have;
2673       return GNUNET_NO;
2674     }
2675   return GNUNET_YES;
2676 }
2677
2678
2679 /**
2680  * We're processing (local) results for a search request
2681  * from another peer.  Pass applicable results to the
2682  * peer and if we are done either clean up (operation
2683  * complete) or forward to other peers (more results possible).
2684  *
2685  * @param cls our closure (struct LocalGetContext)
2686  * @param key key for the content
2687  * @param size number of bytes in data
2688  * @param data content stored
2689  * @param type type of the content
2690  * @param priority priority of the content
2691  * @param anonymity anonymity-level for the content
2692  * @param expiration expiration time for the content
2693  * @param uid unique identifier for the datum;
2694  *        maybe 0 if no unique identifier is available
2695  */
2696 static void
2697 process_local_reply (void *cls,
2698                      const GNUNET_HashCode * key,
2699                      uint32_t size,
2700                      const void *data,
2701                      enum GNUNET_BLOCK_Type type,
2702                      uint32_t priority,
2703                      uint32_t anonymity,
2704                      struct GNUNET_TIME_Absolute
2705                      expiration, 
2706                      uint64_t uid)
2707 {
2708   struct PendingRequest *pr = cls;
2709   struct ProcessReplyClosure prq;
2710   struct CheckDuplicateRequestClosure cdrc;
2711   const struct SBlock *sb;
2712   GNUNET_HashCode dhash;
2713   GNUNET_HashCode mhash;
2714   GNUNET_HashCode query;
2715   
2716   if (NULL == key)
2717     {
2718 #if DEBUG_FS > 1
2719       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2720                   "Done processing local replies, forwarding request to other peers.\n");
2721 #endif
2722       pr->qe = NULL;
2723       if (pr->client_request_list != NULL)
2724         {
2725           GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
2726                                       GNUNET_YES);
2727           /* Figure out if this is a duplicate request and possibly
2728              merge 'struct PendingRequest' entries */
2729           cdrc.have = NULL;
2730           cdrc.pr = pr;
2731           GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
2732                                                       &pr->query,
2733                                                       &check_duplicate_request_client,
2734                                                       &cdrc);
2735           if (cdrc.have != NULL)
2736             {
2737 #if DEBUG_FS
2738               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2739                           "Received request for block `%s' twice from client, will only request once.\n",
2740                           GNUNET_h2s (&pr->query));
2741 #endif
2742               
2743               destroy_pending_request (pr);
2744               return;
2745             }
2746         }
2747
2748       /* no more results */
2749       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2750         pr->task = GNUNET_SCHEDULER_add_now (sched,
2751                                              &forward_request_task,
2752                                              pr);      
2753       return;
2754     }
2755 #if DEBUG_FS
2756   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2757               "New local response to `%s' of type %u.\n",
2758               GNUNET_h2s (key),
2759               type);
2760 #endif
2761   if (type == GNUNET_BLOCK_TYPE_ONDEMAND)
2762     {
2763 #if DEBUG_FS
2764       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2765                   "Found ONDEMAND block, performing on-demand encoding\n");
2766 #endif
2767       GNUNET_STATISTICS_update (stats,
2768                                 gettext_noop ("# on-demand blocks matched requests"),
2769                                 1,
2770                                 GNUNET_NO);
2771       if (GNUNET_OK != 
2772           GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, 
2773                                             anonymity, expiration, uid, 
2774                                             &process_local_reply,
2775                                             pr))
2776       if (pr->qe != NULL)
2777         GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2778       return;
2779     }
2780   /* check for duplicates */
2781   GNUNET_CRYPTO_hash (data, size, &dhash);
2782   mingle_hash (&dhash, 
2783                pr->mingle,
2784                &mhash);
2785   if ( (pr->bf != NULL) &&
2786        (GNUNET_YES ==
2787         GNUNET_CONTAINER_bloomfilter_test (pr->bf,
2788                                            &mhash)) )
2789     {      
2790 #if DEBUG_FS
2791       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2792                   "Result from datastore filtered by bloomfilter (duplicate).\n");
2793 #endif
2794       GNUNET_STATISTICS_update (stats,
2795                                 gettext_noop ("# results filtered by query bloomfilter"),
2796                                 1,
2797                                 GNUNET_NO);
2798       if (pr->qe != NULL)
2799         GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2800       return;
2801     }
2802 #if DEBUG_FS
2803   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2804               "Found result for query `%s' in local datastore\n",
2805               GNUNET_h2s (key));
2806 #endif
2807   GNUNET_STATISTICS_update (stats,
2808                             gettext_noop ("# results found locally"),
2809                             1,
2810                             GNUNET_NO);
2811   pr->results_found++;
2812   memset (&prq, 0, sizeof (prq));
2813   prq.data = data;
2814   prq.expiration = expiration;
2815   prq.size = size;  
2816   if (GNUNET_BLOCK_TYPE_SBLOCK == type)
2817     { 
2818       sb = (const struct SBlock*) data;
2819       GNUNET_CRYPTO_hash (&sb->subspace,
2820                           sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
2821                           &prq.namespace);
2822     }
2823   if (GNUNET_OK != GNUNET_BLOCK_check_block (type,
2824                                              data,
2825                                              size,
2826                                              &query))
2827     {
2828       GNUNET_break (0);
2829       GNUNET_DATASTORE_remove (dsh,
2830                                key,
2831                                size, data,
2832                                -1, -1, 
2833                                GNUNET_TIME_UNIT_FOREVER_REL,
2834                                NULL, NULL);
2835       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2836       return;
2837     }
2838   prq.type = type;
2839   prq.priority = priority;  
2840   prq.finished = GNUNET_NO;
2841   process_reply (&prq, key, pr);
2842   if (prq.finished == GNUNET_YES)
2843     return;
2844   if (pr->qe == NULL)
2845     return; /* done here */
2846   if ( (type == GNUNET_BLOCK_TYPE_DBLOCK) ||
2847        (type == GNUNET_BLOCK_TYPE_IBLOCK) ) 
2848     {
2849       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
2850       return;
2851     }
2852   if ( (pr->client_request_list == NULL) &&
2853        ( (GNUNET_YES == test_load_too_high()) ||
2854          (pr->results_found > 5 + 2 * pr->priority) ) )
2855     {
2856 #if DEBUG_FS > 2
2857       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2858                   "Load too high, done with request\n");
2859 #endif
2860       GNUNET_STATISTICS_update (stats,
2861                                 gettext_noop ("# processing result set cut short due to load"),
2862                                 1,
2863                                 GNUNET_NO);
2864       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
2865       return;
2866     }
2867   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2868 }
2869
2870
2871 /**
2872  * We've received a request with the specified priority.  Bound it
2873  * according to how much we trust the given peer.
2874  * 
2875  * @param prio_in requested priority
2876  * @param cp the peer making the request
2877  * @return effective priority
2878  */
2879 static uint32_t
2880 bound_priority (uint32_t prio_in,
2881                 struct ConnectedPeer *cp)
2882 {
2883   return 0; // FIXME!
2884 }
2885
2886
2887 /**
2888  * Iterator over entries in the 'query_request_map' that
2889  * tries to see if we have the same request pending from
2890  * the same peer already.
2891  *
2892  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
2893  * @param key current key code (query, ignored, must match)
2894  * @param value value in the hash map (a 'struct PendingRequest' 
2895  *              that already exists)
2896  * @return GNUNET_YES if we should continue to
2897  *         iterate (no match yet)
2898  *         GNUNET_NO if not (match found).
2899  */
2900 static int
2901 check_duplicate_request_peer (void *cls,
2902                               const GNUNET_HashCode * key,
2903                               void *value)
2904 {
2905   struct CheckDuplicateRequestClosure *cdc = cls;
2906   struct PendingRequest *have = value;
2907
2908   if (cdc->pr->target_pid == have->target_pid)
2909     {
2910       cdc->have = have;
2911       return GNUNET_NO;
2912     }
2913   return GNUNET_YES;
2914 }
2915
2916
2917 /**
2918  * Handle P2P "GET" request.
2919  *
2920  * @param cls closure, always NULL
2921  * @param other the other peer involved (sender or receiver, NULL
2922  *        for loopback messages where we are both sender and receiver)
2923  * @param message the actual message
2924  * @param latency reported latency of the connection with 'other'
2925  * @param distance reported distance (DV) to 'other' 
2926  * @return GNUNET_OK to keep the connection open,
2927  *         GNUNET_SYSERR to close it (signal serious error)
2928  */
2929 static int
2930 handle_p2p_get (void *cls,
2931                 const struct GNUNET_PeerIdentity *other,
2932                 const struct GNUNET_MessageHeader *message,
2933                 struct GNUNET_TIME_Relative latency,
2934                 uint32_t distance)
2935 {
2936   struct PendingRequest *pr;
2937   struct ConnectedPeer *cp;
2938   struct ConnectedPeer *cps;
2939   struct CheckDuplicateRequestClosure cdc;
2940   struct GNUNET_TIME_Relative timeout;
2941   uint16_t msize;
2942   const struct GetMessage *gm;
2943   unsigned int bits;
2944   const GNUNET_HashCode *opt;
2945   uint32_t bm;
2946   size_t bfsize;
2947   uint32_t ttl_decrement;
2948   enum GNUNET_BLOCK_Type type;
2949   double preference;
2950   int have_ns;
2951
2952   msize = ntohs(message->size);
2953   if (msize < sizeof (struct GetMessage))
2954     {
2955       GNUNET_break_op (0);
2956       return GNUNET_SYSERR;
2957     }
2958   gm = (const struct GetMessage*) message;
2959   type = ntohl (gm->type);
2960   switch (type)
2961     {
2962     case GNUNET_BLOCK_TYPE_ANY:
2963     case GNUNET_BLOCK_TYPE_DBLOCK:
2964     case GNUNET_BLOCK_TYPE_IBLOCK:
2965     case GNUNET_BLOCK_TYPE_KBLOCK:
2966     case GNUNET_BLOCK_TYPE_SBLOCK:
2967       break;
2968     default:
2969       GNUNET_break_op (0);
2970       return GNUNET_SYSERR;
2971     }
2972   bm = ntohl (gm->hash_bitmap);
2973   bits = 0;
2974   while (bm > 0)
2975     {
2976       if (1 == (bm & 1))
2977         bits++;
2978       bm >>= 1;
2979     }
2980   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
2981     {
2982       GNUNET_break_op (0);
2983       return GNUNET_SYSERR;
2984     }  
2985   opt = (const GNUNET_HashCode*) &gm[1];
2986   bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
2987   bm = ntohl (gm->hash_bitmap);
2988   if ( (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) &&
2989        (type != GNUNET_BLOCK_TYPE_SBLOCK) )
2990     {
2991       GNUNET_break_op (0);
2992       return GNUNET_SYSERR;      
2993     }
2994   bits = 0;
2995   cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2996                                            &other->hashPubKey);
2997   if (NULL == cps)
2998     {
2999       /* peer must have just disconnected */
3000       GNUNET_STATISTICS_update (stats,
3001                                 gettext_noop ("# requests dropped due to initiator not being connected"),
3002                                 1,
3003                                 GNUNET_NO);
3004       return GNUNET_SYSERR;
3005     }
3006   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
3007     cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3008                                             &opt[bits++]);
3009   else
3010     cp = cps;
3011   if (cp == NULL)
3012     {
3013 #if DEBUG_FS
3014       if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
3015         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3016                     "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
3017                     GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
3018       
3019       else
3020         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3021                     "Failed to find peer `%4s' in connection set. Dropping query.\n",
3022                     GNUNET_i2s (other));
3023 #endif
3024       GNUNET_STATISTICS_update (stats,
3025                                 gettext_noop ("# requests dropped due to missing reverse route"),
3026                                 1,
3027                                 GNUNET_NO);
3028      /* FIXME: try connect? */
3029       return GNUNET_OK;
3030     }
3031   /* note that we can really only check load here since otherwise
3032      peers could find out that we are overloaded by not being
3033      disconnected after sending us a malformed query... */
3034   if (GNUNET_YES == test_load_too_high ())
3035     {
3036 #if DEBUG_FS
3037       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3038                   "Dropping query from `%s', this peer is too busy.\n",
3039                   GNUNET_i2s (other));
3040 #endif
3041       GNUNET_STATISTICS_update (stats,
3042                                 gettext_noop ("# requests dropped due to high load"),
3043                                 1,
3044                                 GNUNET_NO);
3045       return GNUNET_OK;
3046     }
3047
3048 #if DEBUG_FS 
3049   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3050               "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
3051               GNUNET_h2s (&gm->query),
3052               (unsigned int) type,
3053               GNUNET_i2s (other),
3054               (unsigned int) bm);
3055 #endif
3056   have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
3057   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
3058                       (have_ns ? sizeof(GNUNET_HashCode) : 0));
3059   if (have_ns)
3060     pr->namespace = (GNUNET_HashCode*) &pr[1];
3061   pr->type = type;
3062   pr->mingle = ntohl (gm->filter_mutator);
3063   if (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE))    
3064     memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
3065   if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
3066     pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
3067
3068   pr->anonymity_level = 1;
3069   pr->priority = bound_priority (ntohl (gm->priority), cps);
3070   pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
3071   pr->query = gm->query;
3072   /* decrement ttl (always) */
3073   ttl_decrement = 2 * TTL_DECREMENT +
3074     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3075                               TTL_DECREMENT);
3076   if ( (pr->ttl < 0) &&
3077        (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
3078     {
3079 #if DEBUG_FS
3080       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3081                   "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
3082                   GNUNET_i2s (other),
3083                   pr->ttl,
3084                   ttl_decrement);
3085 #endif
3086       GNUNET_STATISTICS_update (stats,
3087                                 gettext_noop ("# requests dropped due TTL underflow"),
3088                                 1,
3089                                 GNUNET_NO);
3090       /* integer underflow => drop (should be very rare)! */      
3091       GNUNET_free (pr);
3092       return GNUNET_OK;
3093     } 
3094   pr->ttl -= ttl_decrement;
3095   pr->start_time = GNUNET_TIME_absolute_get ();
3096
3097   /* get bloom filter */
3098   if (bfsize > 0)
3099     {
3100       pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
3101                                                   bfsize,
3102                                                   BLOOMFILTER_K);
3103       pr->bf_size = bfsize;
3104     }
3105
3106   cdc.have = NULL;
3107   cdc.pr = pr;
3108   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3109                                               &gm->query,
3110                                               &check_duplicate_request_peer,
3111                                               &cdc);
3112   if (cdc.have != NULL)
3113     {
3114       if (cdc.have->start_time.value + cdc.have->ttl >=
3115           pr->start_time.value + pr->ttl)
3116         {
3117           /* existing request has higher TTL, drop new one! */
3118           cdc.have->priority += pr->priority;
3119           destroy_pending_request (pr);
3120 #if DEBUG_FS
3121           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3122                       "Have existing request with higher TTL, dropping new request.\n",
3123                       GNUNET_i2s (other));
3124 #endif
3125           GNUNET_STATISTICS_update (stats,
3126                                     gettext_noop ("# requests dropped due to higher-TTL request"),
3127                                     1,
3128                                     GNUNET_NO);
3129           return GNUNET_OK;
3130         }
3131       else
3132         {
3133           /* existing request has lower TTL, drop old one! */
3134           pr->priority += cdc.have->priority;
3135           /* Possible optimization: if we have applicable pending
3136              replies in 'cdc.have', we might want to move those over
3137              (this is a really rare special-case, so it is not clear
3138              that this would be worth it) */
3139           destroy_pending_request (cdc.have);
3140           /* keep processing 'pr'! */
3141         }
3142     }
3143
3144   pr->cp = cp;
3145   GNUNET_break (GNUNET_OK ==
3146                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
3147                                                    &gm->query,
3148                                                    pr,
3149                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
3150   GNUNET_break (GNUNET_OK ==
3151                 GNUNET_CONTAINER_multihashmap_put (peer_request_map,
3152                                                    &other->hashPubKey,
3153                                                    pr,
3154                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
3155   
3156   pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
3157                                             pr,
3158                                             pr->start_time.value + pr->ttl);
3159
3160   GNUNET_STATISTICS_update (stats,
3161                             gettext_noop ("# P2P searches received"),
3162                             1,
3163                             GNUNET_NO);
3164   GNUNET_STATISTICS_update (stats,
3165                             gettext_noop ("# P2P searches active"),
3166                             1,
3167                             GNUNET_NO);
3168
3169   /* calculate change in traffic preference */
3170   preference = (double) pr->priority;
3171   if (preference < QUERY_BANDWIDTH_VALUE)
3172     preference = QUERY_BANDWIDTH_VALUE;
3173   cps->inc_preference += preference;
3174
3175   /* process locally */
3176   if (type == GNUNET_BLOCK_TYPE_DBLOCK)
3177     type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
3178   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
3179                                            (pr->priority + 1)); 
3180   pr->qe = GNUNET_DATASTORE_get (dsh,
3181                                  &gm->query,
3182                                  type,                         
3183                                  pr->priority + 1,
3184                                  MAX_DATASTORE_QUEUE,                            
3185                                  timeout,
3186                                  &process_local_reply,
3187                                  pr);
3188
3189   /* Are multiple results possible?  If so, start processing remotely now! */
3190   switch (pr->type)
3191     {
3192     case GNUNET_BLOCK_TYPE_DBLOCK:
3193     case GNUNET_BLOCK_TYPE_IBLOCK:
3194       /* only one result, wait for datastore */
3195       break;
3196     default:
3197       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
3198         pr->task = GNUNET_SCHEDULER_add_now (sched,
3199                                              &forward_request_task,
3200                                              pr);
3201     }
3202
3203   /* make sure we don't track too many requests */
3204   if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
3205     {
3206       pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
3207       destroy_pending_request (pr);
3208     }
3209   return GNUNET_OK;
3210 }
3211
3212
3213 /* **************************** CS GET Handling ************************ */
3214
3215
3216 /**
3217  * Handle START_SEARCH-message (search request from client).
3218  *
3219  * @param cls closure
3220  * @param client identification of the client
3221  * @param message the actual message
3222  */
3223 static void
3224 handle_start_search (void *cls,
3225                      struct GNUNET_SERVER_Client *client,
3226                      const struct GNUNET_MessageHeader *message)
3227 {
3228   static GNUNET_HashCode all_zeros;
3229   const struct SearchMessage *sm;
3230   struct ClientList *cl;
3231   struct ClientRequestList *crl;
3232   struct PendingRequest *pr;
3233   uint16_t msize;
3234   unsigned int sc;
3235   enum GNUNET_BLOCK_Type type;
3236
3237   msize = ntohs (message->size);
3238   if ( (msize < sizeof (struct SearchMessage)) ||
3239        (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
3240     {
3241       GNUNET_break (0);
3242       GNUNET_SERVER_receive_done (client,
3243                                   GNUNET_SYSERR);
3244       return;
3245     }
3246   GNUNET_STATISTICS_update (stats,
3247                             gettext_noop ("# client searches received"),
3248                             1,
3249                             GNUNET_NO);
3250   sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
3251   sm = (const struct SearchMessage*) message;
3252   type = ntohl (sm->type);
3253 #if DEBUG_FS
3254   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3255               "Received request for `%s' of type %u from local client\n",
3256               GNUNET_h2s (&sm->query),
3257               (unsigned int) type);
3258 #endif
3259   switch (type)
3260     {
3261     case GNUNET_BLOCK_TYPE_ANY:
3262     case GNUNET_BLOCK_TYPE_DBLOCK:
3263     case GNUNET_BLOCK_TYPE_IBLOCK:
3264     case GNUNET_BLOCK_TYPE_KBLOCK:
3265     case GNUNET_BLOCK_TYPE_SBLOCK:
3266     case GNUNET_BLOCK_TYPE_NBLOCK:
3267       break;
3268     default:
3269       GNUNET_break (0);
3270       GNUNET_SERVER_receive_done (client,
3271                                   GNUNET_SYSERR);
3272       return;
3273     }  
3274
3275   cl = client_list;
3276   while ( (cl != NULL) &&
3277           (cl->client != client) )
3278     cl = cl->next;
3279   if (cl == NULL)
3280     {
3281       cl = GNUNET_malloc (sizeof (struct ClientList));
3282       cl->client = client;
3283       GNUNET_SERVER_client_keep (client);
3284       cl->next = client_list;
3285       client_list = cl;
3286     }
3287   /* detect duplicate KBLOCK requests */
3288   if ( (type == GNUNET_BLOCK_TYPE_KBLOCK) ||
3289        (type == GNUNET_BLOCK_TYPE_NBLOCK) ||
3290        (type == GNUNET_BLOCK_TYPE_ANY) )
3291     {
3292       crl = cl->rl_head;
3293       while ( (crl != NULL) &&
3294               ( (0 != memcmp (&crl->req->query,
3295                               &sm->query,
3296                               sizeof (GNUNET_HashCode))) ||
3297                 (crl->req->type != type) ) )
3298         crl = crl->next;
3299       if (crl != NULL)  
3300         { 
3301 #if DEBUG_FS
3302           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3303                       "Have existing request, merging content-seen lists.\n");
3304 #endif
3305           pr = crl->req;
3306           /* Duplicate request (used to send long list of
3307              known/blocked results); merge 'pr->replies_seen'
3308              and update bloom filter */
3309           GNUNET_array_grow (pr->replies_seen,
3310                              pr->replies_seen_size,
3311                              pr->replies_seen_off + sc);
3312           memcpy (&pr->replies_seen[pr->replies_seen_off],
3313                   &sm[1],
3314                   sc * sizeof (GNUNET_HashCode));
3315           pr->replies_seen_off += sc;
3316           refresh_bloomfilter (pr);
3317           GNUNET_STATISTICS_update (stats,
3318                                     gettext_noop ("# client searches updated (merged content seen list)"),
3319                                     1,
3320                                     GNUNET_NO);
3321           GNUNET_SERVER_receive_done (client,
3322                                       GNUNET_OK);
3323           return;
3324         }
3325     }
3326   GNUNET_STATISTICS_update (stats,
3327                             gettext_noop ("# client searches active"),
3328                             1,
3329                             GNUNET_NO);
3330   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
3331                       ((type == GNUNET_BLOCK_TYPE_SBLOCK) ? sizeof(GNUNET_HashCode) : 0));
3332   crl = GNUNET_malloc (sizeof (struct ClientRequestList));
3333   memset (crl, 0, sizeof (struct ClientRequestList));
3334   crl->client_list = cl;
3335   GNUNET_CONTAINER_DLL_insert (cl->rl_head,
3336                                cl->rl_tail,
3337                                crl);  
3338   crl->req = pr;
3339   pr->type = type;
3340   pr->client_request_list = crl;
3341   GNUNET_array_grow (pr->replies_seen,
3342                      pr->replies_seen_size,
3343                      sc);
3344   memcpy (pr->replies_seen,
3345           &sm[1],
3346           sc * sizeof (GNUNET_HashCode));
3347   pr->replies_seen_off = sc;
3348   pr->anonymity_level = ntohl (sm->anonymity_level); 
3349   refresh_bloomfilter (pr);
3350   pr->query = sm->query;
3351   if (0 == (1 & ntohl (sm->options)))
3352     pr->local_only = GNUNET_NO;
3353   else
3354     pr->local_only = GNUNET_YES;
3355   switch (type)
3356     {
3357     case GNUNET_BLOCK_TYPE_DBLOCK:
3358     case GNUNET_BLOCK_TYPE_IBLOCK:
3359       if (0 != memcmp (&sm->target,
3360                        &all_zeros,
3361                        sizeof (GNUNET_HashCode)))
3362         pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target);
3363       break;
3364     case GNUNET_BLOCK_TYPE_SBLOCK:
3365       pr->namespace = (GNUNET_HashCode*) &pr[1];
3366       memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode));
3367       break;
3368     default:
3369       break;
3370     }
3371   GNUNET_break (GNUNET_OK ==
3372                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
3373                                                    &sm->query,
3374                                                    pr,
3375                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
3376   if (type == GNUNET_BLOCK_TYPE_DBLOCK)
3377     type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */
3378   pr->qe = GNUNET_DATASTORE_get (dsh,
3379                                  &sm->query,
3380                                  type,
3381                                  -3, -1,
3382                                  GNUNET_CONSTANTS_SERVICE_TIMEOUT,                             
3383                                  &process_local_reply,
3384                                  pr);
3385 }
3386
3387
3388 /* **************************** Startup ************************ */
3389
3390
3391 /**
3392  * List of handlers for P2P messages
3393  * that we care about.
3394  */
3395 static struct GNUNET_CORE_MessageHandler p2p_handlers[] =
3396   {
3397     { &handle_p2p_get, 
3398       GNUNET_MESSAGE_TYPE_FS_GET, 0 },
3399     { &handle_p2p_put, 
3400       GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
3401     { NULL, 0, 0 }
3402   };
3403
3404
3405 /**
3406  * List of handlers for the messages understood by this
3407  * service.
3408  */
3409 static struct GNUNET_SERVER_MessageHandler handlers[] = {
3410   {&GNUNET_FS_handle_index_start, NULL, 
3411    GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
3412   {&GNUNET_FS_handle_index_list_get, NULL, 
3413    GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
3414   {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
3415    sizeof (struct UnindexMessage) },
3416   {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
3417    0 },
3418   {NULL, NULL, 0, 0}
3419 };
3420
3421
3422 /**
3423  * Process fs requests.
3424  *
3425  * @param s scheduler to use
3426  * @param server the initialized server
3427  * @param c configuration to use
3428  */
3429 static int
3430 main_init (struct GNUNET_SCHEDULER_Handle *s,
3431            struct GNUNET_SERVER_Handle *server,
3432            const struct GNUNET_CONFIGURATION_Handle *c)
3433 {
3434   sched = s;
3435   cfg = c;
3436   stats = GNUNET_STATISTICS_create (sched, "fs", cfg);
3437   min_migration_delay = GNUNET_TIME_UNIT_SECONDS; // FIXME: get from config
3438   connected_peers = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
3439   query_request_map = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
3440   peer_request_map = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
3441   requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
3442   core = GNUNET_CORE_connect (sched,
3443                               cfg,
3444                               GNUNET_TIME_UNIT_FOREVER_REL,
3445                               NULL,
3446                               NULL,
3447                               &peer_connect_handler,
3448                               &peer_disconnect_handler,
3449                               NULL, GNUNET_NO,
3450                               NULL, GNUNET_NO,
3451                               p2p_handlers);
3452   if (NULL == core)
3453     {
3454       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3455                   _("Failed to connect to `%s' service.\n"),
3456                   "core");
3457       GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
3458       connected_peers = NULL;
3459       GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
3460       query_request_map = NULL;
3461       GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
3462       requests_by_expiration_heap = NULL;
3463       GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
3464       peer_request_map = NULL;
3465       if (dsh != NULL)
3466         {
3467           GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
3468           dsh = NULL;
3469         }
3470       return GNUNET_SYSERR;
3471     }
3472   /* FIXME: distinguish between sending and storing in options? */
3473   if (active_migration) 
3474     {
3475       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3476                   _("Content migration is enabled, will start to gather data\n"));
3477       consider_migration_gathering ();
3478     }
3479   GNUNET_SERVER_disconnect_notify (server, 
3480                                    &handle_client_disconnect,
3481                                    NULL);
3482   GNUNET_SERVER_add_handlers (server, handlers);
3483   GNUNET_SCHEDULER_add_delayed (sched,
3484                                 GNUNET_TIME_UNIT_FOREVER_REL,
3485                                 &shutdown_task,
3486                                 NULL);
3487   return GNUNET_OK;
3488 }
3489
3490
3491 /**
3492  * Process fs requests.
3493  *
3494  * @param cls closure
3495  * @param sched scheduler to use
3496  * @param server the initialized server
3497  * @param cfg configuration to use
3498  */
3499 static void
3500 run (void *cls,
3501      struct GNUNET_SCHEDULER_Handle *sched,
3502      struct GNUNET_SERVER_Handle *server,
3503      const struct GNUNET_CONFIGURATION_Handle *cfg)
3504 {
3505   active_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
3506                                                            "FS",
3507                                                            "ACTIVEMIGRATION");
3508   dsh = GNUNET_DATASTORE_connect (cfg,
3509                                   sched);
3510   if (dsh == NULL)
3511     {
3512       GNUNET_SCHEDULER_shutdown (sched);
3513       return;
3514     }
3515   if ( (GNUNET_OK != GNUNET_FS_indexing_init (sched, cfg, dsh)) ||
3516        (GNUNET_OK != main_init (sched, server, cfg)) )
3517     {    
3518       GNUNET_SCHEDULER_shutdown (sched);
3519       GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
3520       dsh = NULL;
3521       return;   
3522     }
3523 }
3524
3525
3526 /**
3527  * The main function for the fs service.
3528  *
3529  * @param argc number of arguments from the command line
3530  * @param argv command line arguments
3531  * @return 0 ok, 1 on error
3532  */
3533 int
3534 main (int argc, char *const *argv)
3535 {
3536   return (GNUNET_OK ==
3537           GNUNET_SERVICE_run (argc,
3538                               argv,
3539                               "fs",
3540                               GNUNET_SERVICE_OPTION_NONE,
3541                               &run, NULL)) ? 0 : 1;
3542 }
3543
3544 /* end of gnunet-service-fs.c */