clenaer
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs.c
23  * @brief gnunet anonymity protocol implementation
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - trust not properly received and pushed back to peerinfo!
28  * - bound_priority by priorities used by other peers
29  * - have a way to drop queries based on load
30  * - introduce random latency in processing
31  * - consider more precise latency estimation (per-peer & request)
32  * - better algorithm for priority selection for requests we initiate?
33  * - tell other peers to stop migration if our PUTs fail (or if
34  *   we don't support migration per configuration?)
35  * - more statistics
36  */
37 #include "platform.h"
38 #include <float.h>
39 #include "gnunet_constants.h"
40 #include "gnunet_core_service.h"
41 #include "gnunet_datastore_service.h"
42 #include "gnunet_peer_lib.h"
43 #include "gnunet_protocols.h"
44 #include "gnunet_signatures.h"
45 #include "gnunet_statistics_service.h"
46 #include "gnunet_util_lib.h"
47 #include "gnunet-service-fs_indexing.h"
48 #include "fs.h"
49
50 #define DEBUG_FS GNUNET_NO
51
52 /**
53  * Maximum number of outgoing messages we queue per peer.
54  */
55 #define MAX_QUEUE_PER_PEER 16
56
57 /**
58  * Inverse of the probability that we will submit the same query
59  * to the same peer again.  If the same peer already got the query
60  * repeatedly recently, the probability is multiplied by the inverse
61  * of this number each time.  Note that we only try about every TTL_DECREMENT/2
62  * plus MAX_CORK_DELAY (so roughly every 3.5s).
63  */
64 #define RETRY_PROBABILITY_INV 3
65
66 /**
67  * What is the maximum delay for a P2P FS message (in our interaction
68  * with core)?  FS-internal delays are another story.  The value is
69  * chosen based on the 32k block size.  Given that peers typcially
70  * have at least 1 kb/s bandwidth, 45s waits give us a chance to
71  * transmit one message even to the lowest-bandwidth peers.
72  */
73 #define MAX_TRANSMIT_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 45)
74
75
76
77 /**
78  * Maximum number of requests (from other peers) that we're
79  * willing to have pending at any given point in time.
80  * FIXME: set from configuration.
81  */
82 static uint64_t max_pending_requests = (32 * 1024);
83
84
85 /**
86  * Information we keep for each pending reply.  The
87  * actual message follows at the end of this struct.
88  */
89 struct PendingMessage;
90
91 /**
92  * Our connection to the datastore.
93  */
94 static struct GNUNET_DATASTORE_Handle *dsh;
95
96
97 /**
98  * Function called upon completion of a transmission.
99  *
100  * @param cls closure
101  * @param pid ID of receiving peer, 0 on transmission error
102  */
103 typedef void (*TransmissionContinuation)(void * cls, 
104                                          GNUNET_PEER_Id tpid);
105
106
107 /**
108  * Information we keep for each pending message (GET/PUT).  The
109  * actual message follows at the end of this struct.
110  */
111 struct PendingMessage
112 {
113   /**
114    * This is a doubly-linked list of messages to the same peer.
115    */
116   struct PendingMessage *next;
117
118   /**
119    * This is a doubly-linked list of messages to the same peer.
120    */
121   struct PendingMessage *prev;
122
123   /**
124    * Entry in pending message list for this pending message.
125    */ 
126   struct PendingMessageList *pml;  
127
128   /**
129    * Function to call immediately once we have transmitted this
130    * message.
131    */
132   TransmissionContinuation cont;
133
134   /**
135    * Closure for cont.
136    */
137   void *cont_cls;
138
139   /**
140    * Size of the reply; actual reply message follows
141    * at the end of this struct.
142    */
143   size_t msize;
144   
145   /**
146    * How important is this message for us?
147    */
148   uint32_t priority;
149  
150 };
151
152
153 /**
154  * Information about a peer that we are connected to.
155  * We track data that is useful for determining which
156  * peers should receive our requests.  We also keep
157  * a list of messages to transmit to this peer.
158  */
159 struct ConnectedPeer
160 {
161
162   /**
163    * List of the last clients for which this peer successfully
164    * answered a query.
165    */
166   struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
167
168   /**
169    * List of the last PIDs for which
170    * this peer successfully answered a query;
171    * We use 0 to indicate no successful reply.
172    */
173   GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
174
175   /**
176    * Average delay between sending the peer a request and
177    * getting a reply (only calculated over the requests for
178    * which we actually got a reply).   Calculated
179    * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
180    */ 
181   struct GNUNET_TIME_Relative avg_delay;
182
183   /**
184    * Handle for an active request for transmission to this
185    * peer, or NULL.
186    */
187   struct GNUNET_CORE_TransmitHandle *cth;
188
189   /**
190    * Messages (replies, queries, content migration) we would like to
191    * send to this peer in the near future.  Sorted by priority, head.
192    */
193   struct PendingMessage *pending_messages_head;
194
195   /**
196    * Messages (replies, queries, content migration) we would like to
197    * send to this peer in the near future.  Sorted by priority, tail.
198    */
199   struct PendingMessage *pending_messages_tail;
200
201   /**
202    * Average priority of successful replies.  Calculated
203    * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
204    */
205   double avg_priority;
206
207   /**
208    * Increase in traffic preference still to be submitted
209    * to the core service for this peer.
210    */
211   uint64_t inc_preference;
212
213   /**
214    * Trust delta to still commit to the system.
215    */
216   uint32_t trust_delta;
217
218   /**
219    * The peer's identity.
220    */
221   GNUNET_PEER_Id pid;  
222
223   /**
224    * Size of the linked list of 'pending_messages'.
225    */
226   unsigned int pending_requests;
227
228   /**
229    * Which offset in "last_p2p_replies" will be updated next?
230    * (we go round-robin).
231    */
232   unsigned int last_p2p_replies_woff;
233
234   /**
235    * Which offset in "last_client_replies" will be updated next?
236    * (we go round-robin).
237    */
238   unsigned int last_client_replies_woff;
239
240 };
241
242
243 /**
244  * Information we keep for each pending request.  We should try to
245  * keep this struct as small as possible since its memory consumption
246  * is key to how many requests we can have pending at once.
247  */
248 struct PendingRequest;
249
250
251 /**
252  * Doubly-linked list of requests we are performing
253  * on behalf of the same client.
254  */
255 struct ClientRequestList
256 {
257
258   /**
259    * This is a doubly-linked list.
260    */
261   struct ClientRequestList *next;
262
263   /**
264    * This is a doubly-linked list.
265    */
266   struct ClientRequestList *prev;
267
268   /**
269    * Request this entry represents.
270    */
271   struct PendingRequest *req;
272
273   /**
274    * Client list this request belongs to.
275    */
276   struct ClientList *client_list;
277
278 };
279
280
281 /**
282  * Replies to be transmitted to the client.  The actual
283  * response message is allocated after this struct.
284  */
285 struct ClientResponseMessage
286 {
287   /**
288    * This is a doubly-linked list.
289    */
290   struct ClientResponseMessage *next;
291
292   /**
293    * This is a doubly-linked list.
294    */
295   struct ClientResponseMessage *prev;
296
297   /**
298    * Client list entry this response belongs to.
299    */
300   struct ClientList *client_list;
301
302   /**
303    * Number of bytes in the response.
304    */
305   size_t msize;
306 };
307
308
309 /**
310  * Linked list of clients we are performing requests
311  * for right now.
312  */
313 struct ClientList
314 {
315   /**
316    * This is a linked list.
317    */
318   struct ClientList *next;
319
320   /**
321    * ID of a client making a request, NULL if this entry is for a
322    * peer.
323    */
324   struct GNUNET_SERVER_Client *client;
325
326   /**
327    * Head of list of requests performed on behalf
328    * of this client right now.
329    */
330   struct ClientRequestList *rl_head;
331
332   /**
333    * Tail of list of requests performed on behalf
334    * of this client right now.
335    */
336   struct ClientRequestList *rl_tail;
337
338   /**
339    * Head of linked list of responses.
340    */
341   struct ClientResponseMessage *res_head;
342
343   /**
344    * Tail of linked list of responses.
345    */
346   struct ClientResponseMessage *res_tail;
347
348   /**
349    * Context for sending replies.
350    */
351   struct GNUNET_CONNECTION_TransmitHandle *th;
352
353 };
354
355
356 /**
357  * Doubly-linked list of messages we are performing
358  * due to a pending request.
359  */
360 struct PendingMessageList
361 {
362
363   /**
364    * This is a doubly-linked list of messages on behalf of the same request.
365    */
366   struct PendingMessageList *next;
367
368   /**
369    * This is a doubly-linked list of messages on behalf of the same request.
370    */
371   struct PendingMessageList *prev;
372
373   /**
374    * Message this entry represents.
375    */
376   struct PendingMessage *pm;
377
378   /**
379    * Request this entry belongs to.
380    */
381   struct PendingRequest *req;
382
383   /**
384    * Peer this message is targeted for.
385    */
386   struct ConnectedPeer *target;
387
388 };
389
390
391 /**
392  * Information we keep for each pending request.  We should try to
393  * keep this struct as small as possible since its memory consumption
394  * is key to how many requests we can have pending at once.
395  */
396 struct PendingRequest
397 {
398
399   /**
400    * If this request was made by a client, this is our entry in the
401    * client request list; otherwise NULL.
402    */
403   struct ClientRequestList *client_request_list;
404
405   /**
406    * Entry of peer responsible for this entry (if this request
407    * was made by a peer).
408    */
409   struct ConnectedPeer *cp;
410
411   /**
412    * If this is a namespace query, pointer to the hash of the public
413    * key of the namespace; otherwise NULL.  Pointer will be to the 
414    * end of this struct (so no need to free it).
415    */
416   const GNUNET_HashCode *namespace;
417
418   /**
419    * Bloomfilter we use to filter out replies that we don't care about
420    * (anymore).  NULL as long as we are interested in all replies.
421    */
422   struct GNUNET_CONTAINER_BloomFilter *bf;
423
424   /**
425    * Context of our GNUNET_CORE_peer_change_preference call.
426    */
427   struct GNUNET_CORE_InformationRequestContext *irc;
428
429   /**
430    * Hash code of all replies that we have seen so far (only valid
431    * if client is not NULL since we only track replies like this for
432    * our own clients).
433    */
434   GNUNET_HashCode *replies_seen;
435
436   /**
437    * Node in the heap representing this entry; NULL
438    * if we have no heap node.
439    */
440   struct GNUNET_CONTAINER_HeapNode *hnode;
441
442   /**
443    * Head of list of messages being performed on behalf of this
444    * request.
445    */
446   struct PendingMessageList *pending_head;
447
448   /**
449    * Tail of list of messages being performed on behalf of this
450    * request.
451    */
452   struct PendingMessageList *pending_tail;
453
454   /**
455    * When did we first see this request (form this peer), or, if our
456    * client is initiating, when did we last initiate a search?
457    */
458   struct GNUNET_TIME_Absolute start_time;
459
460   /**
461    * The query that this request is for.
462    */
463   GNUNET_HashCode query;
464
465   /**
466    * The task responsible for transmitting queries
467    * for this request.
468    */
469   GNUNET_SCHEDULER_TaskIdentifier task;
470
471   /**
472    * (Interned) Peer identifier that identifies a preferred target
473    * for requests.
474    */
475   GNUNET_PEER_Id target_pid;
476
477   /**
478    * (Interned) Peer identifiers of peers that have already
479    * received our query for this content.
480    */
481   GNUNET_PEER_Id *used_pids;
482   
483   /**
484    * Our entry in the queue (non-NULL while we wait for our
485    * turn to interact with the local database).
486    */
487   struct GNUNET_DATASTORE_QueueEntry *qe;
488
489   /**
490    * Size of the 'bf' (in bytes).
491    */
492   size_t bf_size;
493
494   /**
495    * Desired anonymity level; only valid for requests from a local client.
496    */
497   uint32_t anonymity_level;
498
499   /**
500    * How many entries in "used_pids" are actually valid?
501    */
502   unsigned int used_pids_off;
503
504   /**
505    * How long is the "used_pids" array?
506    */
507   unsigned int used_pids_size;
508
509   /**
510    * Number of results found for this request.
511    */
512   unsigned int results_found;
513
514   /**
515    * How many entries in "replies_seen" are actually valid?
516    */
517   unsigned int replies_seen_off;
518
519   /**
520    * How long is the "replies_seen" array?
521    */
522   unsigned int replies_seen_size;
523   
524   /**
525    * Priority with which this request was made.  If one of our clients
526    * made the request, then this is the current priority that we are
527    * using when initiating the request.  This value is used when
528    * we decide to reward other peers with trust for providing a reply.
529    */
530   uint32_t priority;
531
532   /**
533    * Priority points left for us to spend when forwarding this request
534    * to other peers.
535    */
536   uint32_t remaining_priority;
537
538   /**
539    * Number to mingle hashes for bloom-filter tests with.
540    */
541   int32_t mingle;
542
543   /**
544    * TTL with which we saw this request (or, if we initiated, TTL that
545    * we used for the request).
546    */
547   int32_t ttl;
548   
549   /**
550    * Type of the content that this request is for.
551    */
552   enum GNUNET_BLOCK_Type type;
553
554   /**
555    * Remove this request after transmission of the current response.
556    */
557   int16_t do_remove;
558
559   /**
560    * GNUNET_YES if we should not forward this request to other peers.
561    */
562   int16_t local_only;
563
564 };
565
566
567 /**
568  * Block that is ready for migration to other peers.  Actual data is at the end of the block.
569  */
570 struct MigrationReadyBlock
571 {
572
573   /**
574    * This is a doubly-linked list.
575    */
576   struct MigrationReadyBlock *next;
577
578   /**
579    * This is a doubly-linked list.
580    */
581   struct MigrationReadyBlock *prev;
582
583   /**
584    * Query for the block.
585    */
586   GNUNET_HashCode query;
587
588   /**
589    * When does this block expire? 
590    */
591   struct GNUNET_TIME_Absolute expiration;
592
593   /**
594    * Peers we would consider forwarding this
595    * block to.  Zero for empty entries.
596    */
597   GNUNET_PEER_Id target_list[MIGRATION_LIST_SIZE];
598
599   /**
600    * Size of the block.
601    */
602   size_t size;
603
604   /**
605    *  Number of targets already used.
606    */
607   unsigned int used_targets;
608
609   /**
610    * Type of the block.
611    */
612   enum GNUNET_BLOCK_Type type;
613 };
614
615
616 /**
617  * Our scheduler.
618  */
619 static struct GNUNET_SCHEDULER_Handle *sched;
620
621 /**
622  * Our configuration.
623  */
624 static const struct GNUNET_CONFIGURATION_Handle *cfg;
625
626 /**
627  * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
628  */
629 static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
630
631 /**
632  * Map of peer identifiers to "struct PendingRequest" (for that peer).
633  */
634 static struct GNUNET_CONTAINER_MultiHashMap *peer_request_map;
635
636 /**
637  * Map of query identifiers to "struct PendingRequest" (for that query).
638  */
639 static struct GNUNET_CONTAINER_MultiHashMap *query_request_map;
640
641 /**
642  * Heap with the request that will expire next at the top.  Contains
643  * pointers of type "struct PendingRequest*"; these will *also* be
644  * aliased from the "requests_by_peer" data structures and the
645  * "requests_by_query" table.  Note that requests from our clients
646  * don't expire and are thus NOT in the "requests_by_expiration"
647  * (or the "requests_by_peer" tables).
648  */
649 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
650
651 /**
652  * Handle for reporting statistics.
653  */
654 static struct GNUNET_STATISTICS_Handle *stats;
655
656 /**
657  * Linked list of clients we are currently processing requests for.
658  */
659 static struct ClientList *client_list;
660
661 /**
662  * Pointer to handle to the core service (points to NULL until we've
663  * connected to it).
664  */
665 static struct GNUNET_CORE_Handle *core;
666
667 /**
668  * Head of linked list of blocks that can be migrated.
669  */
670 static struct MigrationReadyBlock *mig_head;
671
672 /**
673  * Tail of linked list of blocks that can be migrated.
674  */
675 static struct MigrationReadyBlock *mig_tail;
676
677 /**
678  * Request to datastore for migration (or NULL).
679  */
680 static struct GNUNET_DATASTORE_QueueEntry *mig_qe;
681
682 /**
683  * ID of task that collects blocks for migration.
684  */
685 static GNUNET_SCHEDULER_TaskIdentifier mig_task;
686
687 /**
688  * What is the maximum frequency at which we are allowed to
689  * poll the datastore for migration content?
690  */
691 static struct GNUNET_TIME_Relative min_migration_delay;
692
693 /**
694  * Size of the doubly-linked list of migration blocks.
695  */
696 static unsigned int mig_size;
697
698 /**
699  * Are we allowed to migrate content to this peer.
700  */
701 static int active_migration;
702
703
704 /**
705  * Transmit messages by copying it to the target buffer
706  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
707  * for writing in the meantime.  In that case, do nothing
708  * (the disconnect or shutdown handler will take care of the rest).
709  * If we were able to transmit messages and there are still more
710  * pending, ask core again for further calls to this function.
711  *
712  * @param cls closure, pointer to the 'struct ConnectedPeer*'
713  * @param size number of bytes available in buf
714  * @param buf where the callee should write the message
715  * @return number of bytes written to buf
716  */
717 static size_t
718 transmit_to_peer (void *cls,
719                   size_t size, void *buf);
720
721
722 /* ******************* clean up functions ************************ */
723
724
725 /**
726  * Delete the given migration block.
727  *
728  * @param mb block to delete
729  */
730 static void
731 delete_migration_block (struct MigrationReadyBlock *mb)
732 {
733   GNUNET_CONTAINER_DLL_remove (mig_head,
734                                mig_tail,
735                                mb);
736   GNUNET_PEER_decrement_rcs (mb->target_list,
737                              MIGRATION_LIST_SIZE);
738   mig_size--;
739   GNUNET_free (mb);
740 }
741
742
743 /**
744  * Compare the distance of two peers to a key.
745  *
746  * @param key key
747  * @param p1 first peer
748  * @param p2 second peer
749  * @return GNUNET_YES if P1 is closer to key than P2
750  */
751 static int
752 is_closer (const GNUNET_HashCode *key,
753            const struct GNUNET_PeerIdentity *p1,
754            const struct GNUNET_PeerIdentity *p2)
755 {
756   return GNUNET_CRYPTO_hash_xorcmp (&p1->hashPubKey,
757                                     &p2->hashPubKey,
758                                     key);
759 }
760
761
762 /**
763  * Consider migrating content to a given peer.
764  *
765  * @param cls 'struct MigrationReadyBlock*' to select
766  *            targets for (or NULL for none)
767  * @param key ID of the peer 
768  * @param value 'struct ConnectedPeer' of the peer
769  * @return GNUNET_YES (always continue iteration)
770  */
771 static int
772 consider_migration (void *cls,
773                     const GNUNET_HashCode *key,
774                     void *value)
775 {
776   struct MigrationReadyBlock *mb = cls;
777   struct ConnectedPeer *cp = value;
778   struct MigrationReadyBlock *pos;
779   struct GNUNET_PeerIdentity cppid;
780   struct GNUNET_PeerIdentity otherpid;
781   struct GNUNET_PeerIdentity worstpid;
782   size_t msize;
783   unsigned int i;
784   unsigned int repl;
785   
786   /* consider 'cp' as a migration target for mb */
787   if (mb != NULL)
788     {
789       GNUNET_PEER_resolve (cp->pid,
790                            &cppid);
791       repl = MIGRATION_LIST_SIZE;
792       for (i=0;i<MIGRATION_LIST_SIZE;i++)
793         {
794           if (mb->target_list[i] == 0)
795             {
796               mb->target_list[i] = cp->pid;
797               GNUNET_PEER_change_rc (mb->target_list[i], 1);
798               repl = MIGRATION_LIST_SIZE;
799               break;
800             }
801           GNUNET_PEER_resolve (mb->target_list[i],
802                                &otherpid);
803           if ( (repl == MIGRATION_LIST_SIZE) &&
804                is_closer (&mb->query,
805                           &cppid,
806                           &otherpid)) 
807             {
808               repl = i;
809               worstpid = otherpid;
810             }
811           else if ( (repl != MIGRATION_LIST_SIZE) &&
812                     (is_closer (&mb->query,
813                                 &worstpid,
814                                 &otherpid) ) )
815             {
816               repl = i;
817               worstpid = otherpid;
818             }       
819         }
820       if (repl != MIGRATION_LIST_SIZE) 
821         {
822           GNUNET_PEER_change_rc (mb->target_list[repl], -1);
823           mb->target_list[repl] = cp->pid;
824           GNUNET_PEER_change_rc (mb->target_list[repl], 1);
825         }
826     }
827
828   /* consider scheduling transmission to cp for content migration */
829   if (cp->cth != NULL)
830     return GNUNET_YES; 
831   msize = 0;
832   pos = mig_head;
833   while (pos != NULL)
834     {
835       for (i=0;i<MIGRATION_LIST_SIZE;i++)
836         {
837           if (cp->pid == pos->target_list[i])
838             {
839               if (msize == 0)
840                 msize = pos->size;
841               else
842                 msize = GNUNET_MIN (msize,
843                                     pos->size);
844               break;
845             }
846         }
847       pos = pos->next;
848     }
849   if (msize == 0)
850     return GNUNET_YES; /* no content available */
851 #if DEBUG_FS
852   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
853               "Trying to migrate at least %u bytes to peer `%s'\n",
854               msize,
855               GNUNET_h2s (key));
856 #endif
857   cp->cth 
858     = GNUNET_CORE_notify_transmit_ready (core,
859                                          0, GNUNET_TIME_UNIT_FOREVER_REL,
860                                          (const struct GNUNET_PeerIdentity*) key,
861                                          msize + sizeof (struct PutMessage),
862                                          &transmit_to_peer,
863                                          cp);
864   return GNUNET_YES;
865 }
866
867
868 /**
869  * Task that is run periodically to obtain blocks for content
870  * migration
871  * 
872  * @param cls unused
873  * @param tc scheduler context (also unused)
874  */
875 static void
876 gather_migration_blocks (void *cls,
877                          const struct GNUNET_SCHEDULER_TaskContext *tc);
878
879
880 /**
881  * If the migration task is not currently running, consider
882  * (re)scheduling it with the appropriate delay.
883  */
884 static void
885 consider_migration_gathering ()
886 {
887   struct GNUNET_TIME_Relative delay;
888
889   if (mig_qe != NULL)
890     return;
891   if (mig_task != GNUNET_SCHEDULER_NO_TASK)
892     return;
893   delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
894                                          mig_size);
895   delay = GNUNET_TIME_relative_divide (delay,
896                                        MAX_MIGRATION_QUEUE);
897   delay = GNUNET_TIME_relative_max (delay,
898                                     min_migration_delay);
899   mig_task = GNUNET_SCHEDULER_add_delayed (sched,
900                                            delay,
901                                            &gather_migration_blocks,
902                                            NULL);
903 }
904
905
906 /**
907  * Process content offered for migration.
908  *
909  * @param cls closure
910  * @param key key for the content
911  * @param size number of bytes in data
912  * @param data content stored
913  * @param type type of the content
914  * @param priority priority of the content
915  * @param anonymity anonymity-level for the content
916  * @param expiration expiration time for the content
917  * @param uid unique identifier for the datum;
918  *        maybe 0 if no unique identifier is available
919  */
920 static void
921 process_migration_content (void *cls,
922                            const GNUNET_HashCode * key,
923                            uint32_t size,
924                            const void *data,
925                            enum GNUNET_BLOCK_Type type,
926                            uint32_t priority,
927                            uint32_t anonymity,
928                            struct GNUNET_TIME_Absolute
929                            expiration, uint64_t uid)
930 {
931   struct MigrationReadyBlock *mb;
932   
933   if (key == NULL)
934     {
935       mig_qe = NULL;
936       if (mig_size < MAX_MIGRATION_QUEUE)  
937         consider_migration_gathering ();
938       return;
939     }
940   if (type == GNUNET_BLOCK_TYPE_ONDEMAND)
941     {
942       if (GNUNET_OK !=
943           GNUNET_FS_handle_on_demand_block (key, size, data,
944                                             type, priority, anonymity,
945                                             expiration, uid, 
946                                             &process_migration_content,
947                                             NULL))
948         GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
949       return;
950     }
951 #if DEBUG_FS
952   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
953               "Retrieved block `%s' of type %u for migration\n",
954               GNUNET_h2s (key),
955               type);
956 #endif
957   mb = GNUNET_malloc (sizeof (struct MigrationReadyBlock) + size);
958   mb->query = *key;
959   mb->expiration = expiration;
960   mb->size = size;
961   mb->type = type;
962   memcpy (&mb[1], data, size);
963   GNUNET_CONTAINER_DLL_insert_after (mig_head,
964                                      mig_tail,
965                                      mig_tail,
966                                      mb);
967   mig_size++;
968   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
969                                          &consider_migration,
970                                          mb);
971   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
972 }
973
974
975 /**
976  * Task that is run periodically to obtain blocks for content
977  * migration
978  * 
979  * @param cls unused
980  * @param tc scheduler context (also unused)
981  */
982 static void
983 gather_migration_blocks (void *cls,
984                          const struct GNUNET_SCHEDULER_TaskContext *tc)
985 {
986   mig_task = GNUNET_SCHEDULER_NO_TASK;
987   mig_qe = GNUNET_DATASTORE_get_random (dsh, 0, -1,
988                                         GNUNET_TIME_UNIT_FOREVER_REL,
989                                         &process_migration_content, NULL);
990   GNUNET_assert (mig_qe != NULL);
991 }
992
993
994 /**
995  * We're done with a particular message list entry.
996  * Free all associated resources.
997  * 
998  * @param pml entry to destroy
999  */
1000 static void
1001 destroy_pending_message_list_entry (struct PendingMessageList *pml)
1002 {
1003   GNUNET_CONTAINER_DLL_remove (pml->req->pending_head,
1004                                pml->req->pending_tail,
1005                                pml);
1006   GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head,
1007                                pml->target->pending_messages_tail,
1008                                pml->pm);
1009   pml->target->pending_requests--;
1010   GNUNET_free (pml->pm);
1011   GNUNET_free (pml);
1012 }
1013
1014
1015 /**
1016  * Destroy the given pending message (and call the respective
1017  * continuation).
1018  *
1019  * @param pm message to destroy
1020  * @param tpid id of peer that the message was delivered to, or 0 for none
1021  */
1022 static void
1023 destroy_pending_message (struct PendingMessage *pm,
1024                          GNUNET_PEER_Id tpid)
1025 {
1026   struct PendingMessageList *pml = pm->pml;
1027   TransmissionContinuation cont;
1028   void *cont_cls;
1029
1030   GNUNET_assert (pml->pm == pm);
1031   GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
1032   cont = pm->cont;
1033   cont_cls = pm->cont_cls;
1034   destroy_pending_message_list_entry (pml);
1035   cont (cont_cls, tpid);  
1036 }
1037
1038
1039 /**
1040  * We're done processing a particular request.
1041  * Free all associated resources.
1042  *
1043  * @param pr request to destroy
1044  */
1045 static void
1046 destroy_pending_request (struct PendingRequest *pr)
1047 {
1048   struct GNUNET_PeerIdentity pid;
1049
1050   if (pr->hnode != NULL)
1051     {
1052       GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
1053                                          pr->hnode);
1054       pr->hnode = NULL;
1055     }
1056   if (NULL == pr->client_request_list)
1057     {
1058       GNUNET_STATISTICS_update (stats,
1059                                 gettext_noop ("# P2P searches active"),
1060                                 -1,
1061                                 GNUNET_NO);
1062     }
1063   else
1064     {
1065       GNUNET_STATISTICS_update (stats,
1066                                 gettext_noop ("# client searches active"),
1067                                 -1,
1068                                 GNUNET_NO);
1069     }
1070   /* might have already been removed from map in 'process_reply' (if
1071      there was a unique reply) or never inserted if it was a
1072      duplicate; hence ignore the return value here */
1073   (void) GNUNET_CONTAINER_multihashmap_remove (query_request_map,
1074                                                &pr->query,
1075                                                pr);
1076   if (pr->qe != NULL)
1077      {
1078       GNUNET_DATASTORE_cancel (pr->qe);
1079       pr->qe = NULL;
1080     }
1081   if (pr->client_request_list != NULL)
1082     {
1083       GNUNET_CONTAINER_DLL_remove (pr->client_request_list->client_list->rl_head,
1084                                    pr->client_request_list->client_list->rl_tail,
1085                                    pr->client_request_list);
1086       GNUNET_free (pr->client_request_list);
1087       pr->client_request_list = NULL;
1088     }
1089   if (pr->cp != NULL)
1090     {
1091       GNUNET_PEER_resolve (pr->cp->pid,
1092                            &pid);
1093       (void) GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1094                                                    &pid.hashPubKey,
1095                                                    pr);
1096       pr->cp = NULL;
1097     }
1098   if (pr->bf != NULL)
1099     {
1100       GNUNET_CONTAINER_bloomfilter_free (pr->bf);                                        
1101       pr->bf = NULL;
1102     }
1103   if (pr->irc != NULL)
1104     {
1105       GNUNET_CORE_peer_change_preference_cancel (pr->irc);
1106       pr->irc = NULL;
1107     }
1108   if (pr->replies_seen != NULL)
1109     {
1110       GNUNET_free (pr->replies_seen);
1111       pr->replies_seen = NULL;
1112     }
1113   if (pr->task != GNUNET_SCHEDULER_NO_TASK)
1114     {
1115       GNUNET_SCHEDULER_cancel (sched,
1116                                pr->task);
1117       pr->task = GNUNET_SCHEDULER_NO_TASK;
1118     }
1119   while (NULL != pr->pending_head)    
1120     destroy_pending_message_list_entry (pr->pending_head);
1121   GNUNET_PEER_change_rc (pr->target_pid, -1);
1122   if (pr->used_pids != NULL)
1123     {
1124       GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
1125       GNUNET_free (pr->used_pids);
1126       pr->used_pids_off = 0;
1127       pr->used_pids_size = 0;
1128       pr->used_pids = NULL;
1129     }
1130   GNUNET_free (pr);
1131 }
1132
1133
1134 /**
1135  * Method called whenever a given peer connects.
1136  *
1137  * @param cls closure, not used
1138  * @param peer peer identity this notification is about
1139  * @param latency reported latency of the connection with 'other'
1140  * @param distance reported distance (DV) to 'other' 
1141  */
1142 static void 
1143 peer_connect_handler (void *cls,
1144                       const struct
1145                       GNUNET_PeerIdentity * peer,
1146                       struct GNUNET_TIME_Relative latency,
1147                       uint32_t distance)
1148 {
1149   struct ConnectedPeer *cp;
1150   struct MigrationReadyBlock *pos;
1151   
1152   cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
1153   cp->pid = GNUNET_PEER_intern (peer);
1154   GNUNET_break (GNUNET_OK ==
1155                 GNUNET_CONTAINER_multihashmap_put (connected_peers,
1156                                                    &peer->hashPubKey,
1157                                                    cp,
1158                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1159
1160   pos = mig_head;
1161   while (NULL != pos)
1162     {
1163       (void) consider_migration (pos, &peer->hashPubKey, cp);
1164       pos = pos->next;
1165     }
1166 }
1167
1168
1169
1170 /**
1171  * Free (each) request made by the peer.
1172  *
1173  * @param cls closure, points to peer that the request belongs to
1174  * @param key current key code
1175  * @param value value in the hash map
1176  * @return GNUNET_YES (we should continue to iterate)
1177  */
1178 static int
1179 destroy_request (void *cls,
1180                  const GNUNET_HashCode * key,
1181                  void *value)
1182 {
1183   const struct GNUNET_PeerIdentity * peer = cls;
1184   struct PendingRequest *pr = value;
1185   
1186   GNUNET_break (GNUNET_YES ==
1187                 GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
1188                                                       &peer->hashPubKey,
1189                                                       pr));
1190   destroy_pending_request (pr);
1191   return GNUNET_YES;
1192 }
1193
1194
1195 /**
1196  * Method called whenever a peer disconnects.
1197  *
1198  * @param cls closure, not used
1199  * @param peer peer identity this notification is about
1200  */
1201 static void
1202 peer_disconnect_handler (void *cls,
1203                          const struct
1204                          GNUNET_PeerIdentity * peer)
1205 {
1206   struct ConnectedPeer *cp;
1207   struct PendingMessage *pm;
1208   unsigned int i;
1209   struct MigrationReadyBlock *pos;
1210   struct MigrationReadyBlock *next;
1211
1212   GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map,
1213                                               &peer->hashPubKey,
1214                                               &destroy_request,
1215                                               (void*) peer);
1216   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1217                                           &peer->hashPubKey);
1218   if (cp == NULL)
1219     return;
1220   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1221     {
1222       if (NULL != cp->last_client_replies[i])
1223         {
1224           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1225           cp->last_client_replies[i] = NULL;
1226         }
1227     }
1228   GNUNET_break (GNUNET_YES ==
1229                 GNUNET_CONTAINER_multihashmap_remove (connected_peers,
1230                                                       &peer->hashPubKey,
1231                                                       cp));
1232   /* remove this peer from migration considerations; schedule
1233      alternatives */
1234   next = mig_head;
1235   while (NULL != (pos = next))
1236     {
1237       next = pos->next;
1238       for (i=0;i<MIGRATION_LIST_SIZE;i++)
1239         {
1240           if (pos->target_list[i] == cp->pid)
1241             {
1242               GNUNET_PEER_change_rc (pos->target_list[i], -1);
1243               pos->target_list[i] = 0;
1244             }
1245          }
1246       if (pos->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers))
1247         {
1248           delete_migration_block (pos);
1249           consider_migration_gathering ();
1250           continue;
1251         }
1252       GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1253                                              &consider_migration,
1254                                              pos);
1255     }
1256   if (cp->trust_delta > 0)
1257     {
1258       /* FIXME: push trust back to peerinfo! 
1259          (need better peerinfo API!) */
1260     }
1261   GNUNET_PEER_change_rc (cp->pid, -1);
1262   GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
1263   if (NULL != cp->cth)
1264     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1265   while (NULL != (pm = cp->pending_messages_head))
1266     destroy_pending_message (pm, 0 /* delivery failed */);
1267   GNUNET_break (0 == cp->pending_requests);
1268   GNUNET_free (cp);
1269 }
1270
1271
1272 /**
1273  * Iterator over hash map entries that removes all occurences
1274  * of the given 'client' from the 'last_client_replies' of the
1275  * given connected peer.
1276  *
1277  * @param cls closure, the 'struct GNUNET_SERVER_Client*' to remove
1278  * @param key current key code (unused)
1279  * @param value value in the hash map (the 'struct ConnectedPeer*' to change)
1280  * @return GNUNET_YES (we should continue to iterate)
1281  */
1282 static int
1283 remove_client_from_last_client_replies (void *cls,
1284                                         const GNUNET_HashCode * key,
1285                                         void *value)
1286 {
1287   struct GNUNET_SERVER_Client *client = cls;
1288   struct ConnectedPeer *cp = value;
1289   unsigned int i;
1290
1291   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
1292     {
1293       if (cp->last_client_replies[i] == client)
1294         {
1295           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
1296           cp->last_client_replies[i] = NULL;
1297         }
1298     }  
1299   return GNUNET_YES;
1300 }
1301
1302
1303 /**
1304  * A client disconnected.  Remove all of its pending queries.
1305  *
1306  * @param cls closure, NULL
1307  * @param client identification of the client
1308  */
1309 static void
1310 handle_client_disconnect (void *cls,
1311                           struct GNUNET_SERVER_Client
1312                           * client)
1313 {
1314   struct ClientList *pos;
1315   struct ClientList *prev;
1316   struct ClientRequestList *rcl;
1317   struct ClientResponseMessage *creply;
1318
1319   if (client == NULL)
1320     return;
1321   prev = NULL;
1322   pos = client_list;
1323   while ( (NULL != pos) &&
1324           (pos->client != client) )
1325     {
1326       prev = pos;
1327       pos = pos->next;
1328     }
1329   if (pos == NULL)
1330     return; /* no requests pending for this client */
1331   while (NULL != (rcl = pos->rl_head))
1332     {
1333       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1334                   "Destroying pending request `%s' on disconnect\n",
1335                   GNUNET_h2s (&rcl->req->query));
1336       destroy_pending_request (rcl->req);
1337     }
1338   if (prev == NULL)
1339     client_list = pos->next;
1340   else
1341     prev->next = pos->next;
1342   if (pos->th != NULL)
1343     {
1344       GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
1345       pos->th = NULL;
1346     }
1347   while (NULL != (creply = pos->res_head))
1348     {
1349       GNUNET_CONTAINER_DLL_remove (pos->res_head,
1350                                    pos->res_tail,
1351                                    creply);
1352       GNUNET_free (creply);
1353     }    
1354   GNUNET_SERVER_client_drop (pos->client);
1355   GNUNET_free (pos);
1356   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1357                                          &remove_client_from_last_client_replies,
1358                                          client);
1359 }
1360
1361
1362 /**
1363  * Iterator to free peer entries.
1364  *
1365  * @param cls closure, unused
1366  * @param key current key code
1367  * @param value value in the hash map (peer entry)
1368  * @return GNUNET_YES (we should continue to iterate)
1369  */
1370 static int 
1371 clean_peer (void *cls,
1372             const GNUNET_HashCode * key,
1373             void *value)
1374 {
1375   peer_disconnect_handler (NULL, (const struct GNUNET_PeerIdentity*) key);
1376   return GNUNET_YES;
1377 }
1378
1379
1380 /**
1381  * Task run during shutdown.
1382  *
1383  * @param cls unused
1384  * @param tc unused
1385  */
1386 static void
1387 shutdown_task (void *cls,
1388                const struct GNUNET_SCHEDULER_TaskContext *tc)
1389 {
1390   if (mig_qe != NULL)
1391     {
1392       GNUNET_DATASTORE_cancel (mig_qe);
1393       mig_qe = NULL;
1394     }
1395   if (GNUNET_SCHEDULER_NO_TASK != mig_task)
1396     {
1397       GNUNET_SCHEDULER_cancel (sched, mig_task);
1398       mig_task = GNUNET_SCHEDULER_NO_TASK;
1399     }
1400   while (client_list != NULL)
1401     handle_client_disconnect (NULL,
1402                               client_list->client);
1403   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1404                                          &clean_peer,
1405                                          NULL);
1406   GNUNET_break (0 == GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap));
1407   GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
1408   requests_by_expiration_heap = 0;
1409   GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
1410   connected_peers = NULL;
1411   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map));
1412   GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
1413   query_request_map = NULL;
1414   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map));
1415   GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
1416   peer_request_map = NULL;
1417   GNUNET_assert (NULL != core);
1418   GNUNET_CORE_disconnect (core);
1419   core = NULL;
1420   if (stats != NULL)
1421     {
1422       GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
1423       stats = NULL;
1424     }
1425   GNUNET_DATASTORE_disconnect (dsh,
1426                                GNUNET_NO);
1427   while (mig_head != NULL)
1428     delete_migration_block (mig_head);
1429   GNUNET_assert (0 == mig_size);
1430   dsh = NULL;
1431   sched = NULL;
1432   cfg = NULL;  
1433 }
1434
1435
1436 /* ******************* Utility functions  ******************** */
1437
1438
1439 /**
1440  * Transmit messages by copying it to the target buffer
1441  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
1442  * for writing in the meantime.  In that case, do nothing
1443  * (the disconnect or shutdown handler will take care of the rest).
1444  * If we were able to transmit messages and there are still more
1445  * pending, ask core again for further calls to this function.
1446  *
1447  * @param cls closure, pointer to the 'struct ConnectedPeer*'
1448  * @param size number of bytes available in buf
1449  * @param buf where the callee should write the message
1450  * @return number of bytes written to buf
1451  */
1452 static size_t
1453 transmit_to_peer (void *cls,
1454                   size_t size, void *buf)
1455 {
1456   struct ConnectedPeer *cp = cls;
1457   char *cbuf = buf;
1458   struct GNUNET_PeerIdentity pid;
1459   struct PendingMessage *pm;
1460   struct MigrationReadyBlock *mb;
1461   struct MigrationReadyBlock *next;
1462   struct PutMessage migm;
1463   size_t msize;
1464   unsigned int i;
1465  
1466   cp->cth = NULL;
1467   if (NULL == buf)
1468     {
1469 #if DEBUG_FS
1470       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1471                   "Dropping message, core too busy.\n");
1472 #endif
1473       return 0;
1474     }
1475   msize = 0;
1476   while ( (NULL != (pm = cp->pending_messages_head) ) &&
1477           (pm->msize <= size) )
1478     {
1479       memcpy (&cbuf[msize], &pm[1], pm->msize);
1480       msize += pm->msize;
1481       size -= pm->msize;
1482       destroy_pending_message (pm, cp->pid);
1483     }
1484   if (NULL != pm)
1485     {
1486       GNUNET_PEER_resolve (cp->pid,
1487                            &pid);
1488       cp->cth = GNUNET_CORE_notify_transmit_ready (core,
1489                                                    pm->priority,
1490                                                    GNUNET_CONSTANTS_SERVICE_TIMEOUT,
1491                                                    &pid,
1492                                                    pm->msize,
1493                                                    &transmit_to_peer,
1494                                                    cp);
1495     }
1496   else
1497     {      
1498       next = mig_head;
1499       while (NULL != (mb = next))
1500         {
1501           next = mb->next;
1502           for (i=0;i<MIGRATION_LIST_SIZE;i++)
1503             {
1504               if ( (cp->pid == mb->target_list[i]) &&
1505                    (mb->size + sizeof (migm) <= size) )
1506                 {
1507                   GNUNET_PEER_change_rc (mb->target_list[i], -1);
1508                   mb->target_list[i] = 0;
1509                   mb->used_targets++;
1510                   migm.header.size = htons (sizeof (migm) + mb->size);
1511                   migm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
1512                   migm.type = htonl (mb->type);
1513                   migm.expiration = GNUNET_TIME_absolute_hton (mb->expiration);
1514                   memcpy (&cbuf[msize], &migm, sizeof (migm));
1515                   msize += sizeof (migm);
1516                   size -= sizeof (migm);
1517                   memcpy (&cbuf[msize], &mb[1], mb->size);
1518                   msize += mb->size;
1519                   size -= mb->size;
1520 #if DEBUG_FS
1521                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1522                               "Pushing migration block `%s' (%u bytes) to `%s'\n",
1523                               GNUNET_h2s (&mb->query),
1524                               mb->size,
1525                               GNUNET_i2s (&pid));
1526 #endif    
1527                   break;
1528                 }
1529               else
1530                 {
1531 #if DEBUG_FS
1532                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1533                               "Migration block `%s' (%u bytes) is not on migration list for peer `%s'\n",
1534                               GNUNET_h2s (&mb->query),
1535                               mb->size,
1536                               GNUNET_i2s (&pid));
1537 #endif    
1538                 }
1539             }
1540           if ( (mb->used_targets >= MIGRATION_TARGET_COUNT) ||
1541                (mb->used_targets >= GNUNET_CONTAINER_multihashmap_size (connected_peers)) )
1542             {
1543               delete_migration_block (mb);
1544               consider_migration_gathering ();
1545             }
1546         }
1547       consider_migration (NULL, 
1548                           &pid.hashPubKey,
1549                           cp);
1550     }
1551 #if DEBUG_FS
1552   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1553               "Transmitting %u bytes to peer %u\n",
1554               msize,
1555               cp->pid);
1556 #endif
1557   return msize;
1558 }
1559
1560
1561 /**
1562  * Add a message to the set of pending messages for the given peer.
1563  *
1564  * @param cp peer to send message to
1565  * @param pm message to queue
1566  * @param pr request on which behalf this message is being queued
1567  */
1568 static void
1569 add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
1570                                   struct PendingMessage *pm,
1571                                   struct PendingRequest *pr)
1572 {
1573   struct PendingMessage *pos;
1574   struct PendingMessageList *pml;
1575   struct GNUNET_PeerIdentity pid;
1576
1577   GNUNET_assert (pm->next == NULL);
1578   GNUNET_assert (pm->pml == NULL);    
1579   pml = GNUNET_malloc (sizeof (struct PendingMessageList));
1580   pml->req = pr;
1581   pml->target = cp;
1582   pml->pm = pm;
1583   pm->pml = pml;  
1584   GNUNET_CONTAINER_DLL_insert (pr->pending_head,
1585                                pr->pending_tail,
1586                                pml);
1587   pos = cp->pending_messages_head;
1588   while ( (pos != NULL) &&
1589           (pm->priority < pos->priority) )
1590     pos = pos->next;    
1591   GNUNET_CONTAINER_DLL_insert_after (cp->pending_messages_head,
1592                                      cp->pending_messages_tail,
1593                                      pos,
1594                                      pm);
1595   cp->pending_requests++;
1596   if (cp->pending_requests > MAX_QUEUE_PER_PEER)
1597     destroy_pending_message (cp->pending_messages_tail, 0);  
1598   GNUNET_PEER_resolve (cp->pid, &pid);
1599   if (NULL != cp->cth)
1600     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
1601   /* need to schedule transmission */
1602   cp->cth = GNUNET_CORE_notify_transmit_ready (core,
1603                                                cp->pending_messages_head->priority,
1604                                                MAX_TRANSMIT_DELAY,
1605                                                &pid,
1606                                                cp->pending_messages_head->msize,
1607                                                &transmit_to_peer,
1608                                                cp);
1609   if (cp->cth == NULL)
1610     {
1611 #if DEBUG_FS
1612       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1613                   "Failed to schedule transmission with core!\n");
1614 #endif
1615       GNUNET_STATISTICS_update (stats,
1616                                 gettext_noop ("# CORE transmission failures"),
1617                                 1,
1618                                 GNUNET_NO);
1619     }
1620 }
1621
1622
1623 /**
1624  * Mingle hash with the mingle_number to produce different bits.
1625  */
1626 static void
1627 mingle_hash (const GNUNET_HashCode * in,
1628              int32_t mingle_number, 
1629              GNUNET_HashCode * hc)
1630 {
1631   GNUNET_HashCode m;
1632
1633   GNUNET_CRYPTO_hash (&mingle_number, 
1634                       sizeof (int32_t), 
1635                       &m);
1636   GNUNET_CRYPTO_hash_xor (&m, in, hc);
1637 }
1638
1639
1640 /**
1641  * Test if the load on this peer is too high
1642  * to even consider processing the query at
1643  * all.
1644  * 
1645  * @return GNUNET_YES if the load is too high, GNUNET_NO otherwise
1646  */
1647 static int
1648 test_load_too_high ()
1649 {
1650   return GNUNET_NO; // FIXME
1651 }
1652
1653
1654 /* ******************* Pending Request Refresh Task ******************** */
1655
1656
1657
1658 /**
1659  * We use a random delay to make the timing of requests less
1660  * predictable.  This function returns such a random delay.  We add a base
1661  * delay of MAX_CORK_DELAY (1s).
1662  *
1663  * FIXME: make schedule dependent on the specifics of the request?
1664  * Or bandwidth and number of connected peers and load?
1665  *
1666  * @return random delay to use for some request, between 1s and 1000+TTL_DECREMENT ms
1667  */
1668 static struct GNUNET_TIME_Relative
1669 get_processing_delay ()
1670 {
1671   return 
1672     GNUNET_TIME_relative_add (GNUNET_CONSTANTS_MAX_CORK_DELAY,
1673                               GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
1674                                                              GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1675                                                                                        TTL_DECREMENT)));
1676 }
1677
1678
1679 /**
1680  * We're processing a GET request from another peer and have decided
1681  * to forward it to other peers.  This function is called periodically
1682  * and should forward the request to other peers until we have all
1683  * possible replies.  If we have transmitted the *only* reply to
1684  * the initiator we should destroy the pending request.  If we have
1685  * many replies in the queue to the initiator, we should delay sending
1686  * out more queries until the reply queue has shrunk some.
1687  *
1688  * @param cls our "struct ProcessGetContext *"
1689  * @param tc unused
1690  */
1691 static void
1692 forward_request_task (void *cls,
1693                       const struct GNUNET_SCHEDULER_TaskContext *tc);
1694
1695
1696 /**
1697  * Function called after we either failed or succeeded
1698  * at transmitting a query to a peer.  
1699  *
1700  * @param cls the requests "struct PendingRequest*"
1701  * @param tpid ID of receiving peer, 0 on transmission error
1702  */
1703 static void
1704 transmit_query_continuation (void *cls,
1705                              GNUNET_PEER_Id tpid)
1706 {
1707   struct PendingRequest *pr = cls;
1708
1709   GNUNET_STATISTICS_update (stats,
1710                             gettext_noop ("# queries scheduled for forwarding"),
1711                             -1,
1712                             GNUNET_NO);
1713   if (tpid == 0)   
1714     {
1715 #if DEBUG_FS
1716       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1717                   "Transmission of request failed, will try again later.\n");
1718 #endif
1719       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
1720         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1721                                                  get_processing_delay (),
1722                                                  &forward_request_task,
1723                                                  pr); 
1724       return;    
1725     }
1726   GNUNET_STATISTICS_update (stats,
1727                             gettext_noop ("# queries forwarded"),
1728                             1,
1729                             GNUNET_NO);
1730   GNUNET_PEER_change_rc (tpid, 1);
1731   if (pr->used_pids_off == pr->used_pids_size)
1732     GNUNET_array_grow (pr->used_pids,
1733                        pr->used_pids_size,
1734                        pr->used_pids_size * 2 + 2);
1735   pr->used_pids[pr->used_pids_off++] = tpid;
1736   if (pr->task == GNUNET_SCHEDULER_NO_TASK)
1737     pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1738                                              get_processing_delay (),
1739                                              &forward_request_task,
1740                                              pr);
1741 }
1742
1743
1744 /**
1745  * How many bytes should a bloomfilter be if we have already seen
1746  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
1747  * of bits set per entry.  Furthermore, we should not re-size the
1748  * filter too often (to keep it cheap).
1749  *
1750  * Since other peers will also add entries but not resize the filter,
1751  * we should generally pick a slightly larger size than what the
1752  * strict math would suggest.
1753  *
1754  * @return must be a power of two and smaller or equal to 2^15.
1755  */
1756 static size_t
1757 compute_bloomfilter_size (unsigned int entry_count)
1758 {
1759   size_t size;
1760   unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
1761   uint16_t max = 1 << 15;
1762
1763   if (entry_count > max)
1764     return max;
1765   size = 8;
1766   while ((size < max) && (size < ideal))
1767     size *= 2;
1768   if (size > max)
1769     return max;
1770   return size;
1771 }
1772
1773
1774 /**
1775  * Recalculate our bloom filter for filtering replies.  This function
1776  * will create a new bloom filter from scratch, so it should only be
1777  * called if we have no bloomfilter at all (and hence can create a
1778  * fresh one of minimal size without problems) OR if our peer is the
1779  * initiator (in which case we may resize to larger than mimimum size).
1780  *
1781  * @param pr request for which the BF is to be recomputed
1782  */
1783 static void
1784 refresh_bloomfilter (struct PendingRequest *pr)
1785 {
1786   unsigned int i;
1787   size_t nsize;
1788   GNUNET_HashCode mhash;
1789
1790   nsize = compute_bloomfilter_size (pr->replies_seen_off);
1791   if (nsize == pr->bf_size)
1792     return; /* size not changed */
1793   if (pr->bf != NULL)
1794     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
1795   pr->bf_size = nsize;
1796   pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
1797   pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
1798                                               pr->bf_size,
1799                                               BLOOMFILTER_K);
1800   for (i=0;i<pr->replies_seen_off;i++)
1801     {
1802       mingle_hash (&pr->replies_seen[i], pr->mingle, &mhash);
1803       GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
1804     }
1805 }
1806
1807
1808 /**
1809  * Function called after we've tried to reserve a certain amount of
1810  * bandwidth for a reply.  Check if we succeeded and if so send our
1811  * query.
1812  *
1813  * @param cls the requests "struct PendingRequest*"
1814  * @param peer identifies the peer
1815  * @param bpm_in set to the current bandwidth limit (receiving) for this peer
1816  * @param bpm_out set to the current bandwidth limit (sending) for this peer
1817  * @param amount set to the amount that was actually reserved or unreserved
1818  * @param preference current traffic preference for the given peer
1819  */
1820 static void
1821 target_reservation_cb (void *cls,
1822                        const struct
1823                        GNUNET_PeerIdentity * peer,
1824                        struct GNUNET_BANDWIDTH_Value32NBO bpm_in,
1825                        struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
1826                        int amount,
1827                        uint64_t preference)
1828 {
1829   struct PendingRequest *pr = cls;
1830   struct ConnectedPeer *cp;
1831   struct PendingMessage *pm;
1832   struct GetMessage *gm;
1833   GNUNET_HashCode *ext;
1834   char *bfdata;
1835   size_t msize;
1836   unsigned int k;
1837   int no_route;
1838   uint32_t bm;
1839
1840   pr->irc = NULL;
1841   if (peer == NULL)
1842     {
1843       /* error in communication with core, try again later */
1844       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
1845         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1846                                                  get_processing_delay (),
1847                                                  &forward_request_task,
1848                                                  pr);
1849       return;
1850     }
1851   // (3) transmit, update ttl/priority
1852   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1853                                           &peer->hashPubKey);
1854   if (cp == NULL)
1855     {
1856       /* Peer must have just left */
1857 #if DEBUG_FS
1858       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1859                   "Selected peer disconnected!\n");
1860 #endif
1861       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
1862         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1863                                                  get_processing_delay (),
1864                                                  &forward_request_task,
1865                                                  pr);
1866       return;
1867     }
1868   no_route = GNUNET_NO;
1869   if (amount == 0)
1870     {
1871       if (pr->cp == NULL)
1872         {
1873 #if DEBUG_FS > 1
1874           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1875                       "Failed to reserve bandwidth for reply (got %d/%u bytes only)!\n",
1876                       amount,
1877                       DBLOCK_SIZE);
1878 #endif
1879           GNUNET_STATISTICS_update (stats,
1880                                     gettext_noop ("# reply bandwidth reservation requests failed"),
1881                                     1,
1882                                     GNUNET_NO);
1883           if (pr->task == GNUNET_SCHEDULER_NO_TASK)
1884             pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1885                                                      get_processing_delay (),
1886                                                      &forward_request_task,
1887                                                      pr);
1888           return;  /* this target round failed */
1889         }
1890       /* FIXME: if we are "quite" busy, we may still want to skip
1891          this round; need more load detection code! */
1892       no_route = GNUNET_YES;
1893     }
1894   
1895   GNUNET_STATISTICS_update (stats,
1896                             gettext_noop ("# queries scheduled for forwarding"),
1897                             1,
1898                             GNUNET_NO);
1899   /* build message and insert message into priority queue */
1900 #if DEBUG_FS
1901   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1902               "Forwarding request `%s' to `%4s'!\n",
1903               GNUNET_h2s (&pr->query),
1904               GNUNET_i2s (peer));
1905 #endif
1906   k = 0;
1907   bm = 0;
1908   if (GNUNET_YES == no_route)
1909     {
1910       bm |= GET_MESSAGE_BIT_RETURN_TO;
1911       k++;      
1912     }
1913   if (pr->namespace != NULL)
1914     {
1915       bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
1916       k++;
1917     }
1918   if (pr->target_pid != 0)
1919     {
1920       bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
1921       k++;
1922     }
1923   msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
1924   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1925   pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
1926   pm->msize = msize;
1927   gm = (struct GetMessage*) &pm[1];
1928   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
1929   gm->header.size = htons (msize);
1930   gm->type = htonl (pr->type);
1931   pr->remaining_priority /= 2;
1932   gm->priority = htonl (pr->remaining_priority);
1933   gm->ttl = htonl (pr->ttl);
1934   gm->filter_mutator = htonl(pr->mingle); 
1935   gm->hash_bitmap = htonl (bm);
1936   gm->query = pr->query;
1937   ext = (GNUNET_HashCode*) &gm[1];
1938   k = 0;
1939   if (GNUNET_YES == no_route)
1940     GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
1941   if (pr->namespace != NULL)
1942     memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
1943   if (pr->target_pid != 0)
1944     GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
1945   bfdata = (char *) &ext[k];
1946   if (pr->bf != NULL)
1947     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
1948                                                bfdata,
1949                                                pr->bf_size);
1950   pm->cont = &transmit_query_continuation;
1951   pm->cont_cls = pr;
1952   add_to_pending_messages_for_peer (cp, pm, pr);
1953 }
1954
1955
1956 /**
1957  * Closure used for "target_peer_select_cb".
1958  */
1959 struct PeerSelectionContext 
1960 {
1961   /**
1962    * The request for which we are selecting
1963    * peers.
1964    */
1965   struct PendingRequest *pr;
1966
1967   /**
1968    * Current "prime" target.
1969    */
1970   struct GNUNET_PeerIdentity target;
1971
1972   /**
1973    * How much do we like this target?
1974    */
1975   double target_score;
1976
1977 };
1978
1979
1980 /**
1981  * Function called for each connected peer to determine
1982  * which one(s) would make good targets for forwarding.
1983  *
1984  * @param cls closure (struct PeerSelectionContext)
1985  * @param key current key code (peer identity)
1986  * @param value value in the hash map (struct ConnectedPeer)
1987  * @return GNUNET_YES if we should continue to
1988  *         iterate,
1989  *         GNUNET_NO if not.
1990  */
1991 static int
1992 target_peer_select_cb (void *cls,
1993                        const GNUNET_HashCode * key,
1994                        void *value)
1995 {
1996   struct PeerSelectionContext *psc = cls;
1997   struct ConnectedPeer *cp = value;
1998   struct PendingRequest *pr = psc->pr;
1999   double score;
2000   unsigned int i;
2001   unsigned int pc;
2002
2003   /* 1) check that this peer is not the initiator */
2004   if (cp == pr->cp)
2005     {
2006 #if DEBUG_FS
2007       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2008                   "Skipping initiator in forwarding selection\n");
2009 #endif
2010       return GNUNET_YES; /* skip */        
2011     }
2012
2013   /* 2) check if we have already (recently) forwarded to this peer */
2014   pc = 0;
2015   for (i=0;i<pr->used_pids_off;i++)
2016     if (pr->used_pids[i] == cp->pid) 
2017       {
2018         pc++;
2019         if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2020                                            RETRY_PROBABILITY_INV))
2021           {
2022 #if DEBUG_FS
2023             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2024                         "NOT re-trying query that was previously transmitted %u times\n",
2025                         (unsigned int) pr->used_pids_off);
2026 #endif
2027             return GNUNET_YES; /* skip */
2028           }
2029       }
2030 #if DEBUG_FS
2031   if (0 < pc)
2032     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
2033                 "Re-trying query that was previously transmitted %u times to this peer\n",
2034                 (unsigned int) pc);
2035 #endif
2036   /* 3) calculate how much we'd like to forward to this peer,
2037      starting with a random value that is strong enough
2038      to at least give any peer a chance sometimes 
2039      (compared to the other factors that come later) */
2040   /* 3a) count successful (recent) routes from cp for same source */
2041   if (pr->cp != NULL)
2042     {
2043       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2044                                         P2P_SUCCESS_LIST_SIZE);
2045       for (i=0;i<P2P_SUCCESS_LIST_SIZE;i++)
2046         if (cp->last_p2p_replies[i] == pr->cp->pid)
2047           score += 1; /* likely successful based on hot path */
2048     }
2049   else
2050     {
2051       score = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2052                                         CS2P_SUCCESS_LIST_SIZE);
2053       for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
2054         if (cp->last_client_replies[i] == pr->client_request_list->client_list->client)
2055           score += 1; /* likely successful based on hot path */
2056     }
2057   /* 3b) include latency */
2058   if (cp->avg_delay.value < 4 * TTL_DECREMENT)
2059     score += 1; /* likely fast based on latency */
2060   /* 3c) include priorities */
2061   if (cp->avg_priority <= pr->remaining_priority / 2.0)
2062     score += 1; /* likely successful based on priorities */
2063   /* 3d) penalize for queue size */  
2064   score -= (2.0 * cp->pending_requests / (double) MAX_QUEUE_PER_PEER); 
2065   /* 3e) include peer proximity */
2066   score -= (2.0 * (GNUNET_CRYPTO_hash_distance_u32 (key,
2067                                                     &pr->query)) / (double) UINT32_MAX);
2068   /* store best-fit in closure */
2069 #if DEBUG_FS
2070   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2071               "Peer `%s' gets score %f for forwarding query, max is %f\n",
2072               GNUNET_h2s (key),
2073               score,
2074               psc->target_score);
2075 #endif  
2076   score++; /* avoid zero */
2077   if (score > psc->target_score)
2078     {
2079       psc->target_score = score;
2080       psc->target.hashPubKey = *key; 
2081     }
2082   return GNUNET_YES;
2083 }
2084   
2085
2086 /**
2087  * The priority level imposes a bound on the maximum
2088  * value for the ttl that can be requested.
2089  *
2090  * @param ttl_in requested ttl
2091  * @param prio given priority
2092  * @return ttl_in if ttl_in is below the limit,
2093  *         otherwise the ttl-limit for the given priority
2094  */
2095 static int32_t
2096 bound_ttl (int32_t ttl_in, uint32_t prio)
2097 {
2098   unsigned long long allowed;
2099
2100   if (ttl_in <= 0)
2101     return ttl_in;
2102   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
2103   if (ttl_in > allowed)      
2104     {
2105       if (allowed >= (1 << 30))
2106         return 1 << 30;
2107       return allowed;
2108     }
2109   return ttl_in;
2110 }
2111
2112
2113 /**
2114  * We're processing a GET request and have decided
2115  * to forward it to other peers.  This function is called periodically
2116  * and should forward the request to other peers until we have all
2117  * possible replies.  If we have transmitted the *only* reply to
2118  * the initiator we should destroy the pending request.  If we have
2119  * many replies in the queue to the initiator, we should delay sending
2120  * out more queries until the reply queue has shrunk some.
2121  *
2122  * @param cls our "struct ProcessGetContext *"
2123  * @param tc unused
2124  */
2125 static void
2126 forward_request_task (void *cls,
2127                      const struct GNUNET_SCHEDULER_TaskContext *tc)
2128 {
2129   struct PendingRequest *pr = cls;
2130   struct PeerSelectionContext psc;
2131   struct ConnectedPeer *cp; 
2132   struct GNUNET_TIME_Relative delay;
2133
2134   pr->task = GNUNET_SCHEDULER_NO_TASK;
2135   if (pr->irc != NULL)
2136     {
2137 #if DEBUG_FS
2138       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2139                   "Forwarding of query `%s' not attempted due to pending local lookup!\n",
2140                   GNUNET_h2s (&pr->query));
2141 #endif
2142       return; /* already pending */
2143     }
2144   if (GNUNET_YES == pr->local_only)
2145     return; /* configured to not do P2P search */
2146   /* (1) select target */
2147   psc.pr = pr;
2148   psc.target_score = -DBL_MAX;
2149   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
2150                                          &target_peer_select_cb,
2151                                          &psc);  
2152   if (psc.target_score == -DBL_MAX)
2153     {
2154       delay = get_processing_delay ();
2155 #if DEBUG_FS 
2156       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2157                   "No peer selected for forwarding of query `%s', will try again in %llu ms!\n",
2158                   GNUNET_h2s (&pr->query),
2159                   delay.value);
2160 #endif
2161       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
2162                                                delay,
2163                                                &forward_request_task,
2164                                                pr);
2165       return; /* nobody selected */
2166     }
2167   /* (3) update TTL/priority */
2168   if (pr->client_request_list != NULL)
2169     {
2170       /* FIXME: use better algorithm!? */
2171       if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2172                                          4))
2173         pr->priority++;
2174       /* FIXME: bound priority by "customary" priority used by other peers
2175          at this time! */
2176       pr->ttl = bound_ttl (pr->ttl + TTL_DECREMENT * 2,
2177                            pr->priority);
2178 #if DEBUG_FS
2179       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2180                   "Trying query `%s' with priority %u and TTL %d.\n",
2181                   GNUNET_h2s (&pr->query),
2182                   pr->priority,
2183                   pr->ttl);
2184 #endif
2185     }
2186
2187   /* (3) reserve reply bandwidth */
2188   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2189                                           &psc.target.hashPubKey);
2190   GNUNET_assert (NULL != cp);
2191   pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
2192                                                 &psc.target,
2193                                                 GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
2194                                                 GNUNET_BANDWIDTH_value_init (UINT32_MAX),
2195                                                 DBLOCK_SIZE * 2, 
2196                                                 cp->inc_preference,
2197                                                 &target_reservation_cb,
2198                                                 pr);
2199   cp->inc_preference = 0;
2200 }
2201
2202
2203 /* **************************** P2P PUT Handling ************************ */
2204
2205
2206 /**
2207  * Function called after we either failed or succeeded
2208  * at transmitting a reply to a peer.  
2209  *
2210  * @param cls the requests "struct PendingRequest*"
2211  * @param tpid ID of receiving peer, 0 on transmission error
2212  */
2213 static void
2214 transmit_reply_continuation (void *cls,
2215                              GNUNET_PEER_Id tpid)
2216 {
2217   struct PendingRequest *pr = cls;
2218   
2219   switch (pr->type)
2220     {
2221     case GNUNET_BLOCK_TYPE_DBLOCK:
2222     case GNUNET_BLOCK_TYPE_IBLOCK:
2223       /* only one reply expected, done with the request! */
2224       destroy_pending_request (pr);
2225       break;
2226     case GNUNET_BLOCK_TYPE_ANY:
2227     case GNUNET_BLOCK_TYPE_KBLOCK:
2228     case GNUNET_BLOCK_TYPE_SBLOCK:
2229       break;
2230     default:
2231       GNUNET_break (0);
2232       break;
2233     }
2234 }
2235
2236
2237 /**
2238  * Transmit the given message by copying it to the target buffer
2239  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
2240  * for writing in the meantime.  In that case, do nothing
2241  * (the disconnect or shutdown handler will take care of the rest).
2242  * If we were able to transmit messages and there are still more
2243  * pending, ask core again for further calls to this function.
2244  *
2245  * @param cls closure, pointer to the 'struct ClientList*'
2246  * @param size number of bytes available in buf
2247  * @param buf where the callee should write the message
2248  * @return number of bytes written to buf
2249  */
2250 static size_t
2251 transmit_to_client (void *cls,
2252                   size_t size, void *buf)
2253 {
2254   struct ClientList *cl = cls;
2255   char *cbuf = buf;
2256   struct ClientResponseMessage *creply;
2257   size_t msize;
2258   
2259   cl->th = NULL;
2260   if (NULL == buf)
2261     {
2262 #if DEBUG_FS
2263       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2264                   "Not sending reply, client communication problem.\n");
2265 #endif
2266       return 0;
2267     }
2268   msize = 0;
2269   while ( (NULL != (creply = cl->res_head) ) &&
2270           (creply->msize <= size) )
2271     {
2272       memcpy (&cbuf[msize], &creply[1], creply->msize);
2273       msize += creply->msize;
2274       size -= creply->msize;
2275       GNUNET_CONTAINER_DLL_remove (cl->res_head,
2276                                    cl->res_tail,
2277                                    creply);
2278       GNUNET_free (creply);
2279     }
2280   if (NULL != creply)
2281     cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
2282                                                   creply->msize,
2283                                                   GNUNET_TIME_UNIT_FOREVER_REL,
2284                                                   &transmit_to_client,
2285                                                   cl);
2286 #if DEBUG_FS
2287   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2288               "Transmitted %u bytes to client\n",
2289               (unsigned int) msize);
2290 #endif
2291   return msize;
2292 }
2293
2294
2295 /**
2296  * Closure for "process_reply" function.
2297  */
2298 struct ProcessReplyClosure
2299 {
2300   /**
2301    * The data for the reply.
2302    */
2303   const void *data;
2304
2305   /**
2306    * Who gave us this reply? NULL for local host.
2307    */
2308   struct ConnectedPeer *sender;
2309
2310   /**
2311    * When the reply expires.
2312    */
2313   struct GNUNET_TIME_Absolute expiration;
2314
2315   /**
2316    * Size of data.
2317    */
2318   size_t size;
2319
2320   /**
2321    * Namespace that this reply belongs to
2322    * (if it is of type SBLOCK).
2323    */
2324   GNUNET_HashCode namespace;
2325
2326   /**
2327    * Type of the block.
2328    */
2329   enum GNUNET_BLOCK_Type type;
2330
2331   /**
2332    * How much was this reply worth to us?
2333    */
2334   uint32_t priority;
2335
2336   /**
2337    * Did we finish processing the associated request?
2338    */ 
2339   int finished;
2340 };
2341
2342
2343 /**
2344  * We have received a reply; handle it!
2345  *
2346  * @param cls response (struct ProcessReplyClosure)
2347  * @param key our query
2348  * @param value value in the hash map (info about the query)
2349  * @return GNUNET_YES (we should continue to iterate)
2350  */
2351 static int
2352 process_reply (void *cls,
2353                const GNUNET_HashCode * key,
2354                void *value)
2355 {
2356   struct ProcessReplyClosure *prq = cls;
2357   struct PendingRequest *pr = value;
2358   struct PendingMessage *reply;
2359   struct ClientResponseMessage *creply;
2360   struct ClientList *cl;
2361   struct PutMessage *pm;
2362   struct ConnectedPeer *cp;
2363   struct GNUNET_TIME_Relative cur_delay;
2364   GNUNET_HashCode chash;
2365   GNUNET_HashCode mhash;
2366   size_t msize;
2367
2368 #if DEBUG_FS
2369   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2370               "Matched result (type %u) for query `%s' with pending request\n",
2371               (unsigned int) prq->type,
2372               GNUNET_h2s (key));
2373 #endif  
2374   GNUNET_STATISTICS_update (stats,
2375                             gettext_noop ("# replies received and matched"),
2376                             1,
2377                             GNUNET_NO);
2378   if (prq->sender != NULL)
2379     {
2380       /* FIXME: should we be more precise here and not use
2381          "start_time" but a peer-specific time stamp? */
2382       cur_delay = GNUNET_TIME_absolute_get_duration (pr->start_time);
2383       prq->sender->avg_delay.value
2384         = (prq->sender->avg_delay.value * 
2385            (RUNAVG_DELAY_N - 1) + cur_delay.value) / RUNAVG_DELAY_N; 
2386       prq->sender->avg_priority
2387         = (prq->sender->avg_priority * 
2388            (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
2389       if (pr->cp != NULL)
2390         {
2391           GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
2392                                  [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], 
2393                                  -1);
2394           GNUNET_PEER_change_rc (pr->cp->pid, 1);
2395           prq->sender->last_p2p_replies
2396             [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
2397             = pr->cp->pid;
2398         }
2399       else
2400         {
2401           if (NULL != prq->sender->last_client_replies
2402               [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
2403             GNUNET_SERVER_client_drop (prq->sender->last_client_replies
2404                                        [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
2405           prq->sender->last_client_replies
2406             [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
2407             = pr->client_request_list->client_list->client;
2408           GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
2409         }
2410     }
2411   GNUNET_CRYPTO_hash (prq->data,
2412                       prq->size,
2413                       &chash);
2414   switch (prq->type)
2415     {
2416     case GNUNET_BLOCK_TYPE_DBLOCK:
2417     case GNUNET_BLOCK_TYPE_IBLOCK:
2418       /* only possible reply, stop requesting! */
2419       while (NULL != pr->pending_head)
2420         destroy_pending_message_list_entry (pr->pending_head);
2421       if (pr->qe != NULL)
2422         {
2423           if (pr->client_request_list != NULL)
2424             GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
2425                                         GNUNET_YES);
2426           GNUNET_DATASTORE_cancel (pr->qe);
2427           pr->qe = NULL;
2428         }
2429       pr->do_remove = GNUNET_YES;
2430       if (pr->task != GNUNET_SCHEDULER_NO_TASK)
2431         {
2432           GNUNET_SCHEDULER_cancel (sched,
2433                                    pr->task);
2434           pr->task = GNUNET_SCHEDULER_NO_TASK;
2435         }
2436       GNUNET_break (GNUNET_YES ==
2437                     GNUNET_CONTAINER_multihashmap_remove (query_request_map,
2438                                                           key,
2439                                                           pr));
2440       break;
2441     case GNUNET_BLOCK_TYPE_SBLOCK:
2442       if (pr->namespace == NULL)
2443         {
2444           GNUNET_break (0);
2445           return GNUNET_YES;
2446         }
2447       if (0 != memcmp (pr->namespace,
2448                        &prq->namespace,
2449                        sizeof (GNUNET_HashCode)))
2450         {
2451           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
2452                       _("Reply mismatched in terms of namespace.  Discarded.\n"));
2453           return GNUNET_YES; /* wrong namespace */      
2454         }
2455       /* then: fall-through! */
2456     case GNUNET_BLOCK_TYPE_KBLOCK:
2457     case GNUNET_BLOCK_TYPE_NBLOCK:
2458       if (pr->bf != NULL) 
2459         {
2460           mingle_hash (&chash, pr->mingle, &mhash);
2461           if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (pr->bf,
2462                                                                &mhash))
2463             {
2464               GNUNET_STATISTICS_update (stats,
2465                                         gettext_noop ("# duplicate replies discarded (bloomfilter)"),
2466                                         1,
2467                                         GNUNET_NO);
2468 #if DEBUG_FS
2469               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2470                           "Duplicate response `%s', discarding.\n",
2471                           GNUNET_h2s (&mhash));
2472 #endif
2473               return GNUNET_YES; /* duplicate */
2474             }
2475 #if DEBUG_FS
2476           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2477                       "New response `%s', adding to filter.\n",
2478                       GNUNET_h2s (&mhash));
2479 #endif
2480         }
2481       if (pr->client_request_list != NULL)
2482         {
2483           if (pr->replies_seen_size == pr->replies_seen_off)
2484             GNUNET_array_grow (pr->replies_seen,
2485                                pr->replies_seen_size,
2486                                pr->replies_seen_size * 2 + 4);  
2487             pr->replies_seen[pr->replies_seen_off++] = chash;         
2488         }
2489       if ( (pr->bf == NULL) ||
2490            (pr->client_request_list != NULL) )
2491         refresh_bloomfilter (pr);
2492       GNUNET_CONTAINER_bloomfilter_add (pr->bf,
2493                                         &mhash);
2494       break;
2495     default:
2496       GNUNET_break (0);
2497       return GNUNET_YES;
2498     }
2499   prq->priority += pr->remaining_priority;
2500   pr->remaining_priority = 0;
2501   if (NULL != pr->client_request_list)
2502     {
2503       GNUNET_STATISTICS_update (stats,
2504                                 gettext_noop ("# replies received for local clients"),
2505                                 1,
2506                                 GNUNET_NO);
2507       cl = pr->client_request_list->client_list;
2508       msize = sizeof (struct PutMessage) + prq->size;
2509       creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage));
2510       creply->msize = msize;
2511       creply->client_list = cl;
2512       GNUNET_CONTAINER_DLL_insert_after (cl->res_head,
2513                                          cl->res_tail,
2514                                          cl->res_tail,
2515                                          creply);      
2516       pm = (struct PutMessage*) &creply[1];
2517       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2518       pm->header.size = htons (msize);
2519       pm->type = htonl (prq->type);
2520       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
2521       memcpy (&pm[1], prq->data, prq->size);      
2522       if (NULL == cl->th)
2523         {
2524 #if DEBUG_FS
2525           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2526                       "Transmitting result for query `%s' to client\n",
2527                       GNUNET_h2s (key));
2528 #endif  
2529           cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
2530                                                         msize,
2531                                                         GNUNET_TIME_UNIT_FOREVER_REL,
2532                                                         &transmit_to_client,
2533                                                         cl);
2534         }
2535       GNUNET_break (cl->th != NULL);
2536       if (pr->do_remove)                
2537         {
2538           prq->finished = GNUNET_YES;
2539           destroy_pending_request (pr);         
2540         }
2541     }
2542   else
2543     {
2544       cp = pr->cp;
2545 #if DEBUG_FS
2546       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2547                   "Transmitting result for query `%s' to other peer (PID=%u)\n",
2548                   GNUNET_h2s (key),
2549                   (unsigned int) cp->pid);
2550 #endif  
2551       GNUNET_STATISTICS_update (stats,
2552                                 gettext_noop ("# replies received for other peers"),
2553                                 1,
2554                                 GNUNET_NO);
2555       msize = sizeof (struct PutMessage) + prq->size;
2556       reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
2557       reply->cont = &transmit_reply_continuation;
2558       reply->cont_cls = pr;
2559       reply->msize = msize;
2560       reply->priority = UINT32_MAX; /* send replies first! */
2561       pm = (struct PutMessage*) &reply[1];
2562       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2563       pm->header.size = htons (msize);
2564       pm->type = htonl (prq->type);
2565       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
2566       memcpy (&pm[1], prq->data, prq->size);
2567       add_to_pending_messages_for_peer (cp, reply, pr);
2568     }
2569   return GNUNET_YES;
2570 }
2571
2572
2573 /**
2574  * Continuation called to notify client about result of the
2575  * operation.
2576  *
2577  * @param cls closure
2578  * @param success GNUNET_SYSERR on failure
2579  * @param msg NULL on success, otherwise an error message
2580  */
2581 static void 
2582 put_migration_continuation (void *cls,
2583                             int success,
2584                             const char *msg)
2585 {
2586   /* FIXME */
2587 }
2588
2589
2590 /**
2591  * Handle P2P "PUT" message.
2592  *
2593  * @param cls closure, always NULL
2594  * @param other the other peer involved (sender or receiver, NULL
2595  *        for loopback messages where we are both sender and receiver)
2596  * @param message the actual message
2597  * @param latency reported latency of the connection with 'other'
2598  * @param distance reported distance (DV) to 'other' 
2599  * @return GNUNET_OK to keep the connection open,
2600  *         GNUNET_SYSERR to close it (signal serious error)
2601  */
2602 static int
2603 handle_p2p_put (void *cls,
2604                 const struct GNUNET_PeerIdentity *other,
2605                 const struct GNUNET_MessageHeader *message,
2606                 struct GNUNET_TIME_Relative latency,
2607                 uint32_t distance)
2608 {
2609   const struct PutMessage *put;
2610   uint16_t msize;
2611   size_t dsize;
2612   enum GNUNET_BLOCK_Type type;
2613   struct GNUNET_TIME_Absolute expiration;
2614   GNUNET_HashCode query;
2615   struct ProcessReplyClosure prq;
2616   const struct SBlock *sb;
2617   struct ConnectedPeer *cps;
2618
2619   msize = ntohs (message->size);
2620   if (msize < sizeof (struct PutMessage))
2621     {
2622       GNUNET_break_op(0);
2623       return GNUNET_SYSERR;
2624     }
2625   put = (const struct PutMessage*) message;
2626   dsize = msize - sizeof (struct PutMessage);
2627   type = ntohl (put->type);
2628   expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
2629
2630   if (GNUNET_OK !=
2631       GNUNET_BLOCK_check_block (type,
2632                                 &put[1],
2633                                 dsize,
2634                                 &query))
2635     {
2636       GNUNET_break_op (0);
2637       return GNUNET_SYSERR;
2638     }
2639   if (type == GNUNET_BLOCK_TYPE_ONDEMAND)
2640     return GNUNET_SYSERR;
2641   if (GNUNET_BLOCK_TYPE_SBLOCK == type)
2642     { 
2643       sb = (const struct SBlock*) &put[1];
2644       GNUNET_CRYPTO_hash (&sb->subspace,
2645                           sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
2646                           &prq.namespace);
2647     }
2648
2649 #if DEBUG_FS
2650   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2651               "Received result for query `%s' from peer `%4s'\n",
2652               GNUNET_h2s (&query),
2653               GNUNET_i2s (other));
2654 #endif
2655   GNUNET_STATISTICS_update (stats,
2656                             gettext_noop ("# replies received (overall)"),
2657                             1,
2658                             GNUNET_NO);
2659   /* now, lookup 'query' */
2660   prq.data = (const void*) &put[1];
2661   if (other != NULL)
2662     prq.sender = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2663                                                     &other->hashPubKey);
2664   prq.size = dsize;
2665   prq.type = type;
2666   prq.expiration = expiration;
2667   prq.priority = 0;
2668   prq.finished = GNUNET_NO;
2669   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
2670                                               &query,
2671                                               &process_reply,
2672                                               &prq);
2673   cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2674                                            &other->hashPubKey);
2675   if (cps != NULL)
2676     {
2677       cps->inc_preference += CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority;
2678       cps->trust_delta += prq.priority;
2679     }
2680   else
2681     {
2682       GNUNET_break (0);
2683     }
2684   if (GNUNET_YES == active_migration)
2685     {
2686 #if DEBUG_FS
2687       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2688                   "Replicating result for query `%s' with priority %u\n",
2689                   GNUNET_h2s (&query),
2690                   prq.priority);
2691 #endif
2692       GNUNET_DATASTORE_put (dsh,
2693                             0, &query, dsize, &put[1],
2694                             type, prq.priority, 1 /* anonymity */, 
2695                             expiration, 
2696                             1 + prq.priority, MAX_DATASTORE_QUEUE,
2697                             GNUNET_CONSTANTS_SERVICE_TIMEOUT,
2698                             &put_migration_continuation, 
2699                             NULL);
2700     }
2701   return GNUNET_OK;
2702 }
2703
2704
2705 /* **************************** P2P GET Handling ************************ */
2706
2707
2708 /**
2709  * Closure for 'check_duplicate_request_{peer,client}'.
2710  */
2711 struct CheckDuplicateRequestClosure
2712 {
2713   /**
2714    * The new request we should check if it already exists.
2715    */
2716   const struct PendingRequest *pr;
2717
2718   /**
2719    * Existing request found by the checker, NULL if none.
2720    */
2721   struct PendingRequest *have;
2722 };
2723
2724
2725 /**
2726  * Iterator over entries in the 'query_request_map' that
2727  * tries to see if we have the same request pending from
2728  * the same client already.
2729  *
2730  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
2731  * @param key current key code (query, ignored, must match)
2732  * @param value value in the hash map (a 'struct PendingRequest' 
2733  *              that already exists)
2734  * @return GNUNET_YES if we should continue to
2735  *         iterate (no match yet)
2736  *         GNUNET_NO if not (match found).
2737  */
2738 static int
2739 check_duplicate_request_client (void *cls,
2740                                 const GNUNET_HashCode * key,
2741                                 void *value)
2742 {
2743   struct CheckDuplicateRequestClosure *cdc = cls;
2744   struct PendingRequest *have = value;
2745
2746   if (have->client_request_list == NULL)
2747     return GNUNET_YES;
2748   if ( (cdc->pr->client_request_list->client_list->client == have->client_request_list->client_list->client) &&
2749        (cdc->pr != have) )
2750     {
2751       cdc->have = have;
2752       return GNUNET_NO;
2753     }
2754   return GNUNET_YES;
2755 }
2756
2757
2758 /**
2759  * We're processing (local) results for a search request
2760  * from another peer.  Pass applicable results to the
2761  * peer and if we are done either clean up (operation
2762  * complete) or forward to other peers (more results possible).
2763  *
2764  * @param cls our closure (struct LocalGetContext)
2765  * @param key key for the content
2766  * @param size number of bytes in data
2767  * @param data content stored
2768  * @param type type of the content
2769  * @param priority priority of the content
2770  * @param anonymity anonymity-level for the content
2771  * @param expiration expiration time for the content
2772  * @param uid unique identifier for the datum;
2773  *        maybe 0 if no unique identifier is available
2774  */
2775 static void
2776 process_local_reply (void *cls,
2777                      const GNUNET_HashCode * key,
2778                      uint32_t size,
2779                      const void *data,
2780                      enum GNUNET_BLOCK_Type type,
2781                      uint32_t priority,
2782                      uint32_t anonymity,
2783                      struct GNUNET_TIME_Absolute
2784                      expiration, 
2785                      uint64_t uid)
2786 {
2787   struct PendingRequest *pr = cls;
2788   struct ProcessReplyClosure prq;
2789   struct CheckDuplicateRequestClosure cdrc;
2790   const struct SBlock *sb;
2791   GNUNET_HashCode dhash;
2792   GNUNET_HashCode mhash;
2793   GNUNET_HashCode query;
2794   
2795   if (NULL == key)
2796     {
2797 #if DEBUG_FS > 1
2798       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2799                   "Done processing local replies, forwarding request to other peers.\n");
2800 #endif
2801       pr->qe = NULL;
2802       if (pr->client_request_list != NULL)
2803         {
2804           GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
2805                                       GNUNET_YES);
2806           /* Figure out if this is a duplicate request and possibly
2807              merge 'struct PendingRequest' entries */
2808           cdrc.have = NULL;
2809           cdrc.pr = pr;
2810           GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
2811                                                       &pr->query,
2812                                                       &check_duplicate_request_client,
2813                                                       &cdrc);
2814           if (cdrc.have != NULL)
2815             {
2816 #if DEBUG_FS
2817               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2818                           "Received request for block `%s' twice from client, will only request once.\n",
2819                           GNUNET_h2s (&pr->query));
2820 #endif
2821               
2822               destroy_pending_request (pr);
2823               return;
2824             }
2825         }
2826
2827       /* no more results */
2828       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2829         pr->task = GNUNET_SCHEDULER_add_now (sched,
2830                                              &forward_request_task,
2831                                              pr);      
2832       return;
2833     }
2834 #if DEBUG_FS
2835   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2836               "New local response to `%s' of type %u.\n",
2837               GNUNET_h2s (key),
2838               type);
2839 #endif
2840   if (type == GNUNET_BLOCK_TYPE_ONDEMAND)
2841     {
2842 #if DEBUG_FS
2843       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2844                   "Found ONDEMAND block, performing on-demand encoding\n");
2845 #endif
2846       GNUNET_STATISTICS_update (stats,
2847                                 gettext_noop ("# on-demand blocks matched requests"),
2848                                 1,
2849                                 GNUNET_NO);
2850       if (GNUNET_OK != 
2851           GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, 
2852                                             anonymity, expiration, uid, 
2853                                             &process_local_reply,
2854                                             pr))
2855       if (pr->qe != NULL)
2856         GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2857       return;
2858     }
2859   /* check for duplicates */
2860   GNUNET_CRYPTO_hash (data, size, &dhash);
2861   mingle_hash (&dhash, 
2862                pr->mingle,
2863                &mhash);
2864   if ( (pr->bf != NULL) &&
2865        (GNUNET_YES ==
2866         GNUNET_CONTAINER_bloomfilter_test (pr->bf,
2867                                            &mhash)) )
2868     {      
2869 #if DEBUG_FS
2870       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2871                   "Result from datastore filtered by bloomfilter (duplicate).\n");
2872 #endif
2873       GNUNET_STATISTICS_update (stats,
2874                                 gettext_noop ("# results filtered by query bloomfilter"),
2875                                 1,
2876                                 GNUNET_NO);
2877       if (pr->qe != NULL)
2878         GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2879       return;
2880     }
2881 #if DEBUG_FS
2882   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2883               "Found result for query `%s' in local datastore\n",
2884               GNUNET_h2s (key));
2885 #endif
2886   GNUNET_STATISTICS_update (stats,
2887                             gettext_noop ("# results found locally"),
2888                             1,
2889                             GNUNET_NO);
2890   pr->results_found++;
2891   memset (&prq, 0, sizeof (prq));
2892   prq.data = data;
2893   prq.expiration = expiration;
2894   prq.size = size;  
2895   if (GNUNET_BLOCK_TYPE_SBLOCK == type)
2896     { 
2897       sb = (const struct SBlock*) data;
2898       GNUNET_CRYPTO_hash (&sb->subspace,
2899                           sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
2900                           &prq.namespace);
2901     }
2902   if (GNUNET_OK != GNUNET_BLOCK_check_block (type,
2903                                              data,
2904                                              size,
2905                                              &query))
2906     {
2907       GNUNET_break (0);
2908       GNUNET_DATASTORE_remove (dsh,
2909                                key,
2910                                size, data,
2911                                -1, -1, 
2912                                GNUNET_TIME_UNIT_FOREVER_REL,
2913                                NULL, NULL);
2914       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2915       return;
2916     }
2917   prq.type = type;
2918   prq.priority = priority;  
2919   prq.finished = GNUNET_NO;
2920   process_reply (&prq, key, pr);
2921   if (prq.finished == GNUNET_YES)
2922     return;
2923   if (pr->qe == NULL)
2924     return; /* done here */
2925   if ( (type == GNUNET_BLOCK_TYPE_DBLOCK) ||
2926        (type == GNUNET_BLOCK_TYPE_IBLOCK) ) 
2927     {
2928       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
2929       return;
2930     }
2931   if ( (pr->client_request_list == NULL) &&
2932        ( (GNUNET_YES == test_load_too_high()) ||
2933          (pr->results_found > 5 + 2 * pr->priority) ) )
2934     {
2935 #if DEBUG_FS > 2
2936       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2937                   "Load too high, done with request\n");
2938 #endif
2939       GNUNET_STATISTICS_update (stats,
2940                                 gettext_noop ("# processing result set cut short due to load"),
2941                                 1,
2942                                 GNUNET_NO);
2943       GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
2944       return;
2945     }
2946   GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2947 }
2948
2949
2950 /**
2951  * We've received a request with the specified priority.  Bound it
2952  * according to how much we trust the given peer.
2953  * 
2954  * @param prio_in requested priority
2955  * @param cp the peer making the request
2956  * @return effective priority
2957  */
2958 static uint32_t
2959 bound_priority (uint32_t prio_in,
2960                 struct ConnectedPeer *cp)
2961 {
2962   if (cp->trust_delta > prio_in)
2963     {
2964       cp->trust_delta -= prio_in;
2965       return prio_in;
2966     }
2967   // FIXME: get out trust in the target peer from peerinfo!
2968   return 0; 
2969 }
2970
2971
2972 /**
2973  * Iterator over entries in the 'query_request_map' that
2974  * tries to see if we have the same request pending from
2975  * the same peer already.
2976  *
2977  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
2978  * @param key current key code (query, ignored, must match)
2979  * @param value value in the hash map (a 'struct PendingRequest' 
2980  *              that already exists)
2981  * @return GNUNET_YES if we should continue to
2982  *         iterate (no match yet)
2983  *         GNUNET_NO if not (match found).
2984  */
2985 static int
2986 check_duplicate_request_peer (void *cls,
2987                               const GNUNET_HashCode * key,
2988                               void *value)
2989 {
2990   struct CheckDuplicateRequestClosure *cdc = cls;
2991   struct PendingRequest *have = value;
2992
2993   if (cdc->pr->target_pid == have->target_pid)
2994     {
2995       cdc->have = have;
2996       return GNUNET_NO;
2997     }
2998   return GNUNET_YES;
2999 }
3000
3001
3002 /**
3003  * Handle P2P "GET" request.
3004  *
3005  * @param cls closure, always NULL
3006  * @param other the other peer involved (sender or receiver, NULL
3007  *        for loopback messages where we are both sender and receiver)
3008  * @param message the actual message
3009  * @param latency reported latency of the connection with 'other'
3010  * @param distance reported distance (DV) to 'other' 
3011  * @return GNUNET_OK to keep the connection open,
3012  *         GNUNET_SYSERR to close it (signal serious error)
3013  */
3014 static int
3015 handle_p2p_get (void *cls,
3016                 const struct GNUNET_PeerIdentity *other,
3017                 const struct GNUNET_MessageHeader *message,
3018                 struct GNUNET_TIME_Relative latency,
3019                 uint32_t distance)
3020 {
3021   struct PendingRequest *pr;
3022   struct ConnectedPeer *cp;
3023   struct ConnectedPeer *cps;
3024   struct CheckDuplicateRequestClosure cdc;
3025   struct GNUNET_TIME_Relative timeout;
3026   uint16_t msize;
3027   const struct GetMessage *gm;
3028   unsigned int bits;
3029   const GNUNET_HashCode *opt;
3030   uint32_t bm;
3031   size_t bfsize;
3032   uint32_t ttl_decrement;
3033   enum GNUNET_BLOCK_Type type;
3034   int have_ns;
3035
3036   msize = ntohs(message->size);
3037   if (msize < sizeof (struct GetMessage))
3038     {
3039       GNUNET_break_op (0);
3040       return GNUNET_SYSERR;
3041     }
3042   gm = (const struct GetMessage*) message;
3043   type = ntohl (gm->type);
3044   switch (type)
3045     {
3046     case GNUNET_BLOCK_TYPE_ANY:
3047     case GNUNET_BLOCK_TYPE_DBLOCK:
3048     case GNUNET_BLOCK_TYPE_IBLOCK:
3049     case GNUNET_BLOCK_TYPE_KBLOCK:
3050     case GNUNET_BLOCK_TYPE_SBLOCK:
3051       break;
3052     default:
3053       GNUNET_break_op (0);
3054       return GNUNET_SYSERR;
3055     }
3056   bm = ntohl (gm->hash_bitmap);
3057   bits = 0;
3058   while (bm > 0)
3059     {
3060       if (1 == (bm & 1))
3061         bits++;
3062       bm >>= 1;
3063     }
3064   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
3065     {
3066       GNUNET_break_op (0);
3067       return GNUNET_SYSERR;
3068     }  
3069   opt = (const GNUNET_HashCode*) &gm[1];
3070   bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
3071   bm = ntohl (gm->hash_bitmap);
3072   if ( (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) &&
3073        (type != GNUNET_BLOCK_TYPE_SBLOCK) )
3074     {
3075       GNUNET_break_op (0);
3076       return GNUNET_SYSERR;      
3077     }
3078   bits = 0;
3079   cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3080                                            &other->hashPubKey);
3081   if (NULL == cps)
3082     {
3083       /* peer must have just disconnected */
3084       GNUNET_STATISTICS_update (stats,
3085                                 gettext_noop ("# requests dropped due to initiator not being connected"),
3086                                 1,
3087                                 GNUNET_NO);
3088       return GNUNET_SYSERR;
3089     }
3090   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
3091     cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
3092                                             &opt[bits++]);
3093   else
3094     cp = cps;
3095   if (cp == NULL)
3096     {
3097 #if DEBUG_FS
3098       if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
3099         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3100                     "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
3101                     GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
3102       
3103       else
3104         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3105                     "Failed to find peer `%4s' in connection set. Dropping query.\n",
3106                     GNUNET_i2s (other));
3107 #endif
3108       GNUNET_STATISTICS_update (stats,
3109                                 gettext_noop ("# requests dropped due to missing reverse route"),
3110                                 1,
3111                                 GNUNET_NO);
3112      /* FIXME: try connect? */
3113       return GNUNET_OK;
3114     }
3115   /* note that we can really only check load here since otherwise
3116      peers could find out that we are overloaded by not being
3117      disconnected after sending us a malformed query... */
3118   if (GNUNET_YES == test_load_too_high ())
3119     {
3120 #if DEBUG_FS
3121       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3122                   "Dropping query from `%s', this peer is too busy.\n",
3123                   GNUNET_i2s (other));
3124 #endif
3125       GNUNET_STATISTICS_update (stats,
3126                                 gettext_noop ("# requests dropped due to high load"),
3127                                 1,
3128                                 GNUNET_NO);
3129       return GNUNET_OK;
3130     }
3131
3132 #if DEBUG_FS 
3133   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3134               "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
3135               GNUNET_h2s (&gm->query),
3136               (unsigned int) type,
3137               GNUNET_i2s (other),
3138               (unsigned int) bm);
3139 #endif
3140   have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
3141   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
3142                       (have_ns ? sizeof(GNUNET_HashCode) : 0));
3143   if (have_ns)
3144     {
3145       pr->namespace = (GNUNET_HashCode*) &pr[1];
3146       memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
3147     }
3148   pr->type = type;
3149   pr->mingle = ntohl (gm->filter_mutator);
3150   if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
3151     pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
3152
3153   pr->anonymity_level = 1;
3154   pr->priority = bound_priority (ntohl (gm->priority), cps);
3155   pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
3156   pr->query = gm->query;
3157   /* decrement ttl (always) */
3158   ttl_decrement = 2 * TTL_DECREMENT +
3159     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
3160                               TTL_DECREMENT);
3161   if ( (pr->ttl < 0) &&
3162        (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
3163     {
3164 #if DEBUG_FS
3165       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3166                   "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
3167                   GNUNET_i2s (other),
3168                   pr->ttl,
3169                   ttl_decrement);
3170 #endif
3171       GNUNET_STATISTICS_update (stats,
3172                                 gettext_noop ("# requests dropped due TTL underflow"),
3173                                 1,
3174                                 GNUNET_NO);
3175       /* integer underflow => drop (should be very rare)! */      
3176       GNUNET_free (pr);
3177       return GNUNET_OK;
3178     } 
3179   pr->ttl -= ttl_decrement;
3180   pr->start_time = GNUNET_TIME_absolute_get ();
3181
3182   /* get bloom filter */
3183   if (bfsize > 0)
3184     {
3185       pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
3186                                                   bfsize,
3187                                                   BLOOMFILTER_K);
3188       pr->bf_size = bfsize;
3189     }
3190
3191   cdc.have = NULL;
3192   cdc.pr = pr;
3193   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
3194                                               &gm->query,
3195                                               &check_duplicate_request_peer,
3196                                               &cdc);
3197   if (cdc.have != NULL)
3198     {
3199       if (cdc.have->start_time.value + cdc.have->ttl >=
3200           pr->start_time.value + pr->ttl)
3201         {
3202           /* existing request has higher TTL, drop new one! */
3203           cdc.have->priority += pr->priority;
3204           destroy_pending_request (pr);
3205 #if DEBUG_FS
3206           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3207                       "Have existing request with higher TTL, dropping new request.\n",
3208                       GNUNET_i2s (other));
3209 #endif
3210           GNUNET_STATISTICS_update (stats,
3211                                     gettext_noop ("# requests dropped due to higher-TTL request"),
3212                                     1,
3213                                     GNUNET_NO);
3214           return GNUNET_OK;
3215         }
3216       else
3217         {
3218           /* existing request has lower TTL, drop old one! */
3219           pr->priority += cdc.have->priority;
3220           /* Possible optimization: if we have applicable pending
3221              replies in 'cdc.have', we might want to move those over
3222              (this is a really rare special-case, so it is not clear
3223              that this would be worth it) */
3224           destroy_pending_request (cdc.have);
3225           /* keep processing 'pr'! */
3226         }
3227     }
3228
3229   pr->cp = cp;
3230   GNUNET_break (GNUNET_OK ==
3231                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
3232                                                    &gm->query,
3233                                                    pr,
3234                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
3235   GNUNET_break (GNUNET_OK ==
3236                 GNUNET_CONTAINER_multihashmap_put (peer_request_map,
3237                                                    &other->hashPubKey,
3238                                                    pr,
3239                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
3240   
3241   pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
3242                                             pr,
3243                                             pr->start_time.value + pr->ttl);
3244
3245   GNUNET_STATISTICS_update (stats,
3246                             gettext_noop ("# P2P searches received"),
3247                             1,
3248                             GNUNET_NO);
3249   GNUNET_STATISTICS_update (stats,
3250                             gettext_noop ("# P2P searches active"),
3251                             1,
3252                             GNUNET_NO);
3253
3254   /* calculate change in traffic preference */
3255   cps->inc_preference += pr->priority * 1000 + QUERY_BANDWIDTH_VALUE;
3256   /* process locally */
3257   if (type == GNUNET_BLOCK_TYPE_DBLOCK)
3258     type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
3259   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
3260                                            (pr->priority + 1)); 
3261   pr->qe = GNUNET_DATASTORE_get (dsh,
3262                                  &gm->query,
3263                                  type,                         
3264                                  pr->priority + 1,
3265                                  MAX_DATASTORE_QUEUE,                            
3266                                  timeout,
3267                                  &process_local_reply,
3268                                  pr);
3269
3270   /* Are multiple results possible?  If so, start processing remotely now! */
3271   switch (pr->type)
3272     {
3273     case GNUNET_BLOCK_TYPE_DBLOCK:
3274     case GNUNET_BLOCK_TYPE_IBLOCK:
3275       /* only one result, wait for datastore */
3276       break;
3277     default:
3278       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
3279         pr->task = GNUNET_SCHEDULER_add_now (sched,
3280                                              &forward_request_task,
3281                                              pr);
3282     }
3283
3284   /* make sure we don't track too many requests */
3285   if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
3286     {
3287       pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
3288       destroy_pending_request (pr);
3289     }
3290   return GNUNET_OK;
3291 }
3292
3293
3294 /* **************************** CS GET Handling ************************ */
3295
3296
3297 /**
3298  * Handle START_SEARCH-message (search request from client).
3299  *
3300  * @param cls closure
3301  * @param client identification of the client
3302  * @param message the actual message
3303  */
3304 static void
3305 handle_start_search (void *cls,
3306                      struct GNUNET_SERVER_Client *client,
3307                      const struct GNUNET_MessageHeader *message)
3308 {
3309   static GNUNET_HashCode all_zeros;
3310   const struct SearchMessage *sm;
3311   struct ClientList *cl;
3312   struct ClientRequestList *crl;
3313   struct PendingRequest *pr;
3314   uint16_t msize;
3315   unsigned int sc;
3316   enum GNUNET_BLOCK_Type type;
3317
3318   msize = ntohs (message->size);
3319   if ( (msize < sizeof (struct SearchMessage)) ||
3320        (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
3321     {
3322       GNUNET_break (0);
3323       GNUNET_SERVER_receive_done (client,
3324                                   GNUNET_SYSERR);
3325       return;
3326     }
3327   GNUNET_STATISTICS_update (stats,
3328                             gettext_noop ("# client searches received"),
3329                             1,
3330                             GNUNET_NO);
3331   sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
3332   sm = (const struct SearchMessage*) message;
3333   type = ntohl (sm->type);
3334 #if DEBUG_FS
3335   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3336               "Received request for `%s' of type %u from local client\n",
3337               GNUNET_h2s (&sm->query),
3338               (unsigned int) type);
3339 #endif
3340   switch (type)
3341     {
3342     case GNUNET_BLOCK_TYPE_ANY:
3343     case GNUNET_BLOCK_TYPE_DBLOCK:
3344     case GNUNET_BLOCK_TYPE_IBLOCK:
3345     case GNUNET_BLOCK_TYPE_KBLOCK:
3346     case GNUNET_BLOCK_TYPE_SBLOCK:
3347     case GNUNET_BLOCK_TYPE_NBLOCK:
3348       break;
3349     default:
3350       GNUNET_break (0);
3351       GNUNET_SERVER_receive_done (client,
3352                                   GNUNET_SYSERR);
3353       return;
3354     }  
3355
3356   cl = client_list;
3357   while ( (cl != NULL) &&
3358           (cl->client != client) )
3359     cl = cl->next;
3360   if (cl == NULL)
3361     {
3362       cl = GNUNET_malloc (sizeof (struct ClientList));
3363       cl->client = client;
3364       GNUNET_SERVER_client_keep (client);
3365       cl->next = client_list;
3366       client_list = cl;
3367     }
3368   /* detect duplicate KBLOCK requests */
3369   if ( (type == GNUNET_BLOCK_TYPE_KBLOCK) ||
3370        (type == GNUNET_BLOCK_TYPE_NBLOCK) ||
3371        (type == GNUNET_BLOCK_TYPE_ANY) )
3372     {
3373       crl = cl->rl_head;
3374       while ( (crl != NULL) &&
3375               ( (0 != memcmp (&crl->req->query,
3376                               &sm->query,
3377                               sizeof (GNUNET_HashCode))) ||
3378                 (crl->req->type != type) ) )
3379         crl = crl->next;
3380       if (crl != NULL)  
3381         { 
3382 #if DEBUG_FS
3383           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
3384                       "Have existing request, merging content-seen lists.\n");
3385 #endif
3386           pr = crl->req;
3387           /* Duplicate request (used to send long list of
3388              known/blocked results); merge 'pr->replies_seen'
3389              and update bloom filter */
3390           GNUNET_array_grow (pr->replies_seen,
3391                              pr->replies_seen_size,
3392                              pr->replies_seen_off + sc);
3393           memcpy (&pr->replies_seen[pr->replies_seen_off],
3394                   &sm[1],
3395                   sc * sizeof (GNUNET_HashCode));
3396           pr->replies_seen_off += sc;
3397           refresh_bloomfilter (pr);
3398           GNUNET_STATISTICS_update (stats,
3399                                     gettext_noop ("# client searches updated (merged content seen list)"),
3400                                     1,
3401                                     GNUNET_NO);
3402           GNUNET_SERVER_receive_done (client,
3403                                       GNUNET_OK);
3404           return;
3405         }
3406     }
3407   GNUNET_STATISTICS_update (stats,
3408                             gettext_noop ("# client searches active"),
3409                             1,
3410                             GNUNET_NO);
3411   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
3412                       ((type == GNUNET_BLOCK_TYPE_SBLOCK) ? sizeof(GNUNET_HashCode) : 0));
3413   crl = GNUNET_malloc (sizeof (struct ClientRequestList));
3414   memset (crl, 0, sizeof (struct ClientRequestList));
3415   crl->client_list = cl;
3416   GNUNET_CONTAINER_DLL_insert (cl->rl_head,
3417                                cl->rl_tail,
3418                                crl);  
3419   crl->req = pr;
3420   pr->type = type;
3421   pr->client_request_list = crl;
3422   GNUNET_array_grow (pr->replies_seen,
3423                      pr->replies_seen_size,
3424                      sc);
3425   memcpy (pr->replies_seen,
3426           &sm[1],
3427           sc * sizeof (GNUNET_HashCode));
3428   pr->replies_seen_off = sc;
3429   pr->anonymity_level = ntohl (sm->anonymity_level); 
3430   refresh_bloomfilter (pr);
3431   pr->query = sm->query;
3432   if (0 == (1 & ntohl (sm->options)))
3433     pr->local_only = GNUNET_NO;
3434   else
3435     pr->local_only = GNUNET_YES;
3436   switch (type)
3437     {
3438     case GNUNET_BLOCK_TYPE_DBLOCK:
3439     case GNUNET_BLOCK_TYPE_IBLOCK:
3440       if (0 != memcmp (&sm->target,
3441                        &all_zeros,
3442                        sizeof (GNUNET_HashCode)))
3443         pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target);
3444       break;
3445     case GNUNET_BLOCK_TYPE_SBLOCK:
3446       pr->namespace = (GNUNET_HashCode*) &pr[1];
3447       memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode));
3448       break;
3449     default:
3450       break;
3451     }
3452   GNUNET_break (GNUNET_OK ==
3453                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
3454                                                    &sm->query,
3455                                                    pr,
3456                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
3457   if (type == GNUNET_BLOCK_TYPE_DBLOCK)
3458     type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */
3459   pr->qe = GNUNET_DATASTORE_get (dsh,
3460                                  &sm->query,
3461                                  type,
3462                                  -3, -1,
3463                                  GNUNET_CONSTANTS_SERVICE_TIMEOUT,                             
3464                                  &process_local_reply,
3465                                  pr);
3466 }
3467
3468
3469 /* **************************** Startup ************************ */
3470
3471 /**
3472  * Process fs requests.
3473  *
3474  * @param s scheduler to use
3475  * @param server the initialized server
3476  * @param c configuration to use
3477  */
3478 static int
3479 main_init (struct GNUNET_SCHEDULER_Handle *s,
3480            struct GNUNET_SERVER_Handle *server,
3481            const struct GNUNET_CONFIGURATION_Handle *c)
3482 {
3483   static const struct GNUNET_CORE_MessageHandler p2p_handlers[] =
3484     {
3485       { &handle_p2p_get, 
3486         GNUNET_MESSAGE_TYPE_FS_GET, 0 },
3487       { &handle_p2p_put, 
3488         GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
3489       { NULL, 0, 0 }
3490     };
3491   static const struct GNUNET_SERVER_MessageHandler handlers[] = {
3492     {&GNUNET_FS_handle_index_start, NULL, 
3493      GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
3494     {&GNUNET_FS_handle_index_list_get, NULL, 
3495      GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
3496     {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
3497      sizeof (struct UnindexMessage) },
3498     {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
3499      0 },
3500     {NULL, NULL, 0, 0}
3501   };
3502
3503   sched = s;
3504   cfg = c;
3505   stats = GNUNET_STATISTICS_create (sched, "fs", cfg);
3506   min_migration_delay = GNUNET_TIME_UNIT_SECONDS; // FIXME: get from config
3507   connected_peers = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
3508   query_request_map = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
3509   peer_request_map = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
3510   requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
3511   core = GNUNET_CORE_connect (sched,
3512                               cfg,
3513                               GNUNET_TIME_UNIT_FOREVER_REL,
3514                               NULL,
3515                               NULL,
3516                               &peer_connect_handler,
3517                               &peer_disconnect_handler,
3518                               NULL, GNUNET_NO,
3519                               NULL, GNUNET_NO,
3520                               p2p_handlers);
3521   if (NULL == core)
3522     {
3523       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
3524                   _("Failed to connect to `%s' service.\n"),
3525                   "core");
3526       GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
3527       connected_peers = NULL;
3528       GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
3529       query_request_map = NULL;
3530       GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
3531       requests_by_expiration_heap = NULL;
3532       GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
3533       peer_request_map = NULL;
3534       if (dsh != NULL)
3535         {
3536           GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
3537           dsh = NULL;
3538         }
3539       return GNUNET_SYSERR;
3540     }
3541   /* FIXME: distinguish between sending and storing in options? */
3542   if (active_migration) 
3543     {
3544       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
3545                   _("Content migration is enabled, will start to gather data\n"));
3546       consider_migration_gathering ();
3547     }
3548   GNUNET_SERVER_disconnect_notify (server, 
3549                                    &handle_client_disconnect,
3550                                    NULL);
3551   GNUNET_SERVER_add_handlers (server, handlers);
3552   GNUNET_SCHEDULER_add_delayed (sched,
3553                                 GNUNET_TIME_UNIT_FOREVER_REL,
3554                                 &shutdown_task,
3555                                 NULL);
3556   return GNUNET_OK;
3557 }
3558
3559
3560 /**
3561  * Process fs requests.
3562  *
3563  * @param cls closure
3564  * @param sched scheduler to use
3565  * @param server the initialized server
3566  * @param cfg configuration to use
3567  */
3568 static void
3569 run (void *cls,
3570      struct GNUNET_SCHEDULER_Handle *sched,
3571      struct GNUNET_SERVER_Handle *server,
3572      const struct GNUNET_CONFIGURATION_Handle *cfg)
3573 {
3574   active_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
3575                                                            "FS",
3576                                                            "ACTIVEMIGRATION");
3577   dsh = GNUNET_DATASTORE_connect (cfg,
3578                                   sched);
3579   if (dsh == NULL)
3580     {
3581       GNUNET_SCHEDULER_shutdown (sched);
3582       return;
3583     }
3584   if ( (GNUNET_OK != GNUNET_FS_indexing_init (sched, cfg, dsh)) ||
3585        (GNUNET_OK != main_init (sched, server, cfg)) )
3586     {    
3587       GNUNET_SCHEDULER_shutdown (sched);
3588       GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
3589       dsh = NULL;
3590       return;   
3591     }
3592 }
3593
3594
3595 /**
3596  * The main function for the fs service.
3597  *
3598  * @param argc number of arguments from the command line
3599  * @param argv command line arguments
3600  * @return 0 ok, 1 on error
3601  */
3602 int
3603 main (int argc, char *const *argv)
3604 {
3605   return (GNUNET_OK ==
3606           GNUNET_SERVICE_run (argc,
3607                               argv,
3608                               "fs",
3609                               GNUNET_SERVICE_OPTION_NONE,
3610                               &run, NULL)) ? 0 : 1;
3611 }
3612
3613 /* end of gnunet-service-fs.c */