stuff
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs.c
23  * @brief gnunet anonymity protocol implementation
24  * @author Christian Grothoff
25  *
26  * FIXME:
27  * - TTL/priority calculations are absent!
28  * TODO:
29  * - have non-zero preference / priority for requests we initiate!
30  * - track stats for hot-path routing
31  * - implement hot-path routing decision procedure
32  * - implement: bound_priority, test_load_too_high, validate_nblock
33  * - add content migration support (store locally) [or create new service]
34  * - statistics
35  */
36 #include "platform.h"
37 #include <float.h>
38 #include "gnunet_constants.h"
39 #include "gnunet_core_service.h"
40 #include "gnunet_datastore_service.h"
41 #include "gnunet_peer_lib.h"
42 #include "gnunet_protocols.h"
43 #include "gnunet_signatures.h"
44 #include "gnunet_statistics_service.h"
45 #include "gnunet_util_lib.h"
46 #include "gnunet-service-fs_indexing.h"
47 #include "fs.h"
48
49 #define DEBUG_FS GNUNET_NO
50
51 /**
52  * Maximum number of outgoing messages we queue per peer.
53  * FIXME: make configurable?
54  */
55 #define MAX_QUEUE_PER_PEER 16
56
57 /**
58  * Inverse of the probability that we will submit the same query
59  * to the same peer again.  If the same peer already got the query
60  * repeatedly recently, the probability is multiplied by the inverse
61  * of this number each time.  Note that we only try about every TTL_DECREMENT/2
62  * plus MAX_CORK_DELAY (so roughly every 3.5s).
63  */
64 #define RETRY_PROBABILITY_INV 3
65
66 /**
67  * What is the maximum delay for a P2P FS message (in our interaction
68  * with core)?  FS-internal delays are another story.  The value is
69  * chosen based on the 32k block size.  Given that peers typcially
70  * have at least 1 kb/s bandwidth, 45s waits give us a chance to
71  * transmit one message even to the lowest-bandwidth peers.
72  */
73 #define MAX_TRANSMIT_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 45)
74
75
76
77 /**
78  * Maximum number of requests (from other peers) that we're
79  * willing to have pending at any given point in time.
80  * FIXME: set from configuration.
81  */
82 static uint64_t max_pending_requests = (32 * 1024);
83
84
85 /**
86  * Information we keep for each pending reply.  The
87  * actual message follows at the end of this struct.
88  */
89 struct PendingMessage;
90
91 /**
92  * Our connection to the datastore.
93  */
94 static struct GNUNET_DATASTORE_Handle *dsh;
95
96
97 /**
98  * Function called upon completion of a transmission.
99  *
100  * @param cls closure
101  * @param pid ID of receiving peer, 0 on transmission error
102  */
103 typedef void (*TransmissionContinuation)(void * cls, 
104                                          GNUNET_PEER_Id tpid);
105
106
107 /**
108  * Information we keep for each pending message (GET/PUT).  The
109  * actual message follows at the end of this struct.
110  */
111 struct PendingMessage
112 {
113   /**
114    * This is a doubly-linked list of messages to the same peer.
115    */
116   struct PendingMessage *next;
117
118   /**
119    * This is a doubly-linked list of messages to the same peer.
120    */
121   struct PendingMessage *prev;
122
123   /**
124    * Entry in pending message list for this pending message.
125    */ 
126   struct PendingMessageList *pml;  
127
128   /**
129    * Function to call immediately once we have transmitted this
130    * message.
131    */
132   TransmissionContinuation cont;
133
134   /**
135    * Closure for cont.
136    */
137   void *cont_cls;
138
139   /**
140    * Size of the reply; actual reply message follows
141    * at the end of this struct.
142    */
143   size_t msize;
144   
145   /**
146    * How important is this message for us?
147    */
148   uint32_t priority;
149  
150 };
151
152
153 /**
154  * Information about a peer that we are connected to.
155  * We track data that is useful for determining which
156  * peers should receive our requests.  We also keep
157  * a list of messages to transmit to this peer.
158  */
159 struct ConnectedPeer
160 {
161
162   /**
163    * List of the last clients for which this peer successfully
164    * answered a query.
165    */
166   struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
167
168   /**
169    * List of the last PIDs for which
170    * this peer successfully answered a query;
171    * We use 0 to indicate no successful reply.
172    */
173   GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
174
175   /**
176    * Average delay between sending the peer a request and
177    * getting a reply (only calculated over the requests for
178    * which we actually got a reply).   Calculated
179    * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
180    */ 
181   struct GNUNET_TIME_Relative avg_delay;
182
183   /**
184    * Handle for an active request for transmission to this
185    * peer, or NULL.
186    */
187   struct GNUNET_CORE_TransmitHandle *cth;
188
189   /**
190    * Messages (replies, queries, content migration) we would like to
191    * send to this peer in the near future.  Sorted by priority, head.
192    */
193   struct PendingMessage *pending_messages_head;
194
195   /**
196    * Messages (replies, queries, content migration) we would like to
197    * send to this peer in the near future.  Sorted by priority, tail.
198    */
199   struct PendingMessage *pending_messages_tail;
200
201   /**
202    * Average priority of successful replies.  Calculated
203    * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
204    */
205   double avg_priority;
206
207   /**
208    * Increase in traffic preference still to be submitted
209    * to the core service for this peer. FIXME: double or 'uint64_t'?
210    */
211   double inc_preference;
212
213   /**
214    * The peer's identity.
215    */
216   GNUNET_PEER_Id pid;  
217
218   /**
219    * Size of the linked list of 'pending_messages'.
220    */
221   unsigned int pending_requests;
222
223   /**
224    * Which offset in "last_p2p_replies" will be updated next?
225    * (we go round-robin).
226    */
227   unsigned int last_p2p_replies_woff;
228
229   /**
230    * Which offset in "last_client_replies" will be updated next?
231    * (we go round-robin).
232    */
233   unsigned int last_client_replies_woff;
234
235 };
236
237
238 /**
239  * Information we keep for each pending request.  We should try to
240  * keep this struct as small as possible since its memory consumption
241  * is key to how many requests we can have pending at once.
242  */
243 struct PendingRequest;
244
245
246 /**
247  * Doubly-linked list of requests we are performing
248  * on behalf of the same client.
249  */
250 struct ClientRequestList
251 {
252
253   /**
254    * This is a doubly-linked list.
255    */
256   struct ClientRequestList *next;
257
258   /**
259    * This is a doubly-linked list.
260    */
261   struct ClientRequestList *prev;
262
263   /**
264    * Request this entry represents.
265    */
266   struct PendingRequest *req;
267
268   /**
269    * Client list this request belongs to.
270    */
271   struct ClientList *client_list;
272
273 };
274
275
276 /**
277  * Replies to be transmitted to the client.  The actual
278  * response message is allocated after this struct.
279  */
280 struct ClientResponseMessage
281 {
282   /**
283    * This is a doubly-linked list.
284    */
285   struct ClientResponseMessage *next;
286
287   /**
288    * This is a doubly-linked list.
289    */
290   struct ClientResponseMessage *prev;
291
292   /**
293    * Client list entry this response belongs to.
294    */
295   struct ClientList *client_list;
296
297   /**
298    * Number of bytes in the response.
299    */
300   size_t msize;
301 };
302
303
304 /**
305  * Linked list of clients we are performing requests
306  * for right now.
307  */
308 struct ClientList
309 {
310   /**
311    * This is a linked list.
312    */
313   struct ClientList *next;
314
315   /**
316    * ID of a client making a request, NULL if this entry is for a
317    * peer.
318    */
319   struct GNUNET_SERVER_Client *client;
320
321   /**
322    * Head of list of requests performed on behalf
323    * of this client right now.
324    */
325   struct ClientRequestList *rl_head;
326
327   /**
328    * Tail of list of requests performed on behalf
329    * of this client right now.
330    */
331   struct ClientRequestList *rl_tail;
332
333   /**
334    * Head of linked list of responses.
335    */
336   struct ClientResponseMessage *res_head;
337
338   /**
339    * Tail of linked list of responses.
340    */
341   struct ClientResponseMessage *res_tail;
342
343   /**
344    * Context for sending replies.
345    */
346   struct GNUNET_CONNECTION_TransmitHandle *th;
347
348 };
349
350
351 /**
352  * Doubly-linked list of messages we are performing
353  * due to a pending request.
354  */
355 struct PendingMessageList
356 {
357
358   /**
359    * This is a doubly-linked list of messages on behalf of the same request.
360    */
361   struct PendingMessageList *next;
362
363   /**
364    * This is a doubly-linked list of messages on behalf of the same request.
365    */
366   struct PendingMessageList *prev;
367
368   /**
369    * Message this entry represents.
370    */
371   struct PendingMessage *pm;
372
373   /**
374    * Request this entry belongs to.
375    */
376   struct PendingRequest *req;
377
378   /**
379    * Peer this message is targeted for.
380    */
381   struct ConnectedPeer *target;
382
383 };
384
385
386 /**
387  * Information we keep for each pending request.  We should try to
388  * keep this struct as small as possible since its memory consumption
389  * is key to how many requests we can have pending at once.
390  */
391 struct PendingRequest
392 {
393
394   /**
395    * If this request was made by a client, this is our entry in the
396    * client request list; otherwise NULL.
397    */
398   struct ClientRequestList *client_request_list;
399
400   /**
401    * Entry of peer responsible for this entry (if this request
402    * was made by a peer).
403    */
404   struct ConnectedPeer *cp;
405
406   /**
407    * If this is a namespace query, pointer to the hash of the public
408    * key of the namespace; otherwise NULL.  Pointer will be to the 
409    * end of this struct (so no need to free it).
410    */
411   const GNUNET_HashCode *namespace;
412
413   /**
414    * Bloomfilter we use to filter out replies that we don't care about
415    * (anymore).  NULL as long as we are interested in all replies.
416    */
417   struct GNUNET_CONTAINER_BloomFilter *bf;
418
419   /**
420    * Context of our GNUNET_CORE_peer_change_preference call.
421    */
422   struct GNUNET_CORE_InformationRequestContext *irc;
423
424   /**
425    * Hash code of all replies that we have seen so far (only valid
426    * if client is not NULL since we only track replies like this for
427    * our own clients).
428    */
429   GNUNET_HashCode *replies_seen;
430
431   /**
432    * Node in the heap representing this entry; NULL
433    * if we have no heap node.
434    */
435   struct GNUNET_CONTAINER_HeapNode *hnode;
436
437   /**
438    * Head of list of messages being performed on behalf of this
439    * request.
440    */
441   struct PendingMessageList *pending_head;
442
443   /**
444    * Tail of list of messages being performed on behalf of this
445    * request.
446    */
447   struct PendingMessageList *pending_tail;
448
449   /**
450    * When did we first see this request (form this peer), or, if our
451    * client is initiating, when did we last initiate a search?
452    */
453   struct GNUNET_TIME_Absolute start_time;
454
455   /**
456    * The query that this request is for.
457    */
458   GNUNET_HashCode query;
459
460   /**
461    * The task responsible for transmitting queries
462    * for this request.
463    */
464   GNUNET_SCHEDULER_TaskIdentifier task;
465
466   /**
467    * (Interned) Peer identifier that identifies a preferred target
468    * for requests.
469    */
470   GNUNET_PEER_Id target_pid;
471
472   /**
473    * (Interned) Peer identifiers of peers that have already
474    * received our query for this content.
475    */
476   GNUNET_PEER_Id *used_pids;
477   
478   /**
479    * Our entry in the queue (non-NULL while we wait for our
480    * turn to interact with the local database).
481    */
482   struct GNUNET_DATASTORE_QueueEntry *qe;
483
484   /**
485    * Size of the 'bf' (in bytes).
486    */
487   size_t bf_size;
488
489   /**
490    * Desired anonymity level; only valid for requests from a local client.
491    */
492   uint32_t anonymity_level;
493
494   /**
495    * How many entries in "used_pids" are actually valid?
496    */
497   unsigned int used_pids_off;
498
499   /**
500    * How long is the "used_pids" array?
501    */
502   unsigned int used_pids_size;
503
504   /**
505    * Number of results found for this request.
506    */
507   unsigned int results_found;
508
509   /**
510    * How many entries in "replies_seen" are actually valid?
511    */
512   unsigned int replies_seen_off;
513
514   /**
515    * How long is the "replies_seen" array?
516    */
517   unsigned int replies_seen_size;
518   
519   /**
520    * Priority with which this request was made.  If one of our clients
521    * made the request, then this is the current priority that we are
522    * using when initiating the request.  This value is used when
523    * we decide to reward other peers with trust for providing a reply.
524    */
525   uint32_t priority;
526
527   /**
528    * Priority points left for us to spend when forwarding this request
529    * to other peers.
530    */
531   uint32_t remaining_priority;
532
533   /**
534    * Number to mingle hashes for bloom-filter tests with.
535    */
536   int32_t mingle;
537
538   /**
539    * TTL with which we saw this request (or, if we initiated, TTL that
540    * we used for the request).
541    */
542   int32_t ttl;
543   
544   /**
545    * Type of the content that this request is for.
546    */
547   enum GNUNET_BLOCK_Type type;
548
549   /**
550    * Remove this request after transmission of the current response.
551    */
552   int16_t do_remove;
553
554   /**
555    * GNUNET_YES if we should not forward this request to other peers.
556    */
557   int16_t local_only;
558
559 };
560
561
562 /**
563  * Our scheduler.
564  */
565 static struct GNUNET_SCHEDULER_Handle *sched;
566
567 /**
568  * Our configuration.
569  */
570 static const struct GNUNET_CONFIGURATION_Handle *cfg;
571
572 /**
573  * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
574  */
575 static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
576
577 /**
578  * Map of peer identifiers to "struct PendingRequest" (for that peer).
579  */
580 static struct GNUNET_CONTAINER_MultiHashMap *peer_request_map;
581
582 /**
583  * Map of query identifiers to "struct PendingRequest" (for that query).
584  */
585 static struct GNUNET_CONTAINER_MultiHashMap *query_request_map;
586
587 /**
588  * Heap with the request that will expire next at the top.  Contains
589  * pointers of type "struct PendingRequest*"; these will *also* be
590  * aliased from the "requests_by_peer" data structures and the
591  * "requests_by_query" table.  Note that requests from our clients
592  * don't expire and are thus NOT in the "requests_by_expiration"
593  * (or the "requests_by_peer" tables).
594  */
595 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
596
597 /**
598  * Handle for reporting statistics.
599  */
600 static struct GNUNET_STATISTICS_Handle *stats;
601
602 /**
603  * Linked list of clients we are currently processing requests for.
604  */
605 static struct ClientList *client_list;
606
607 /**
608  * Pointer to handle to the core service (points to NULL until we've
609  * connected to it).
610  */
611 static struct GNUNET_CORE_Handle *core;
612
613 /**
614  * Are we allowed to migrate content to this peer.
615  */
616 static int active_migration;
617
618 /* ******************* clean up functions ************************ */
619
620
621 /**
622  * We're done with a particular message list entry.
623  * Free all associated resources.
624  * 
625  * @param pml entry to destroy
626  */
627 static void
628 destroy_pending_message_list_entry (struct PendingMessageList *pml)
629 {
630   GNUNET_CONTAINER_DLL_remove (pml->req->pending_head,
631                                pml->req->pending_tail,
632                                pml);
633   GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head,
634                                pml->target->pending_messages_tail,
635                                pml->pm);
636   pml->target->pending_requests--;
637   GNUNET_free (pml->pm);
638   GNUNET_free (pml);
639 }
640
641
642 /**
643  * Destroy the given pending message (and call the respective
644  * continuation).
645  *
646  * @param pm message to destroy
647  * @param tpid id of peer that the message was delivered to, or 0 for none
648  */
649 static void
650 destroy_pending_message (struct PendingMessage *pm,
651                          GNUNET_PEER_Id tpid)
652 {
653   struct PendingMessageList *pml = pm->pml;
654   TransmissionContinuation cont;
655   void *cont_cls;
656
657   GNUNET_assert (pml->pm == pm);
658   GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
659   cont = pm->cont;
660   cont_cls = pm->cont_cls;
661   destroy_pending_message_list_entry (pml);
662   cont (cont_cls, tpid);  
663 }
664
665
666 /**
667  * We're done processing a particular request.
668  * Free all associated resources.
669  *
670  * @param pr request to destroy
671  */
672 static void
673 destroy_pending_request (struct PendingRequest *pr)
674 {
675   struct GNUNET_PeerIdentity pid;
676
677   if (pr->hnode != NULL)
678     {
679       GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
680                                          pr->hnode);
681       pr->hnode = NULL;
682     }
683   if (NULL == pr->client_request_list)
684     {
685       GNUNET_STATISTICS_update (stats,
686                                 gettext_noop ("# P2P searches active"),
687                                 -1,
688                                 GNUNET_NO);
689     }
690   else
691     {
692       GNUNET_STATISTICS_update (stats,
693                                 gettext_noop ("# client searches active"),
694                                 -1,
695                                 GNUNET_NO);
696     }
697   /* might have already been removed from map in 'process_reply' (if
698      there was a unique reply) or never inserted if it was a
699      duplicate; hence ignore the return value here */
700   (void) GNUNET_CONTAINER_multihashmap_remove (query_request_map,
701                                                &pr->query,
702                                                pr);
703   if (pr->qe != NULL)
704      {
705       GNUNET_DATASTORE_cancel (pr->qe);
706       pr->qe = NULL;
707     }
708   if (pr->client_request_list != NULL)
709     {
710       GNUNET_CONTAINER_DLL_remove (pr->client_request_list->client_list->rl_head,
711                                    pr->client_request_list->client_list->rl_tail,
712                                    pr->client_request_list);
713       GNUNET_free (pr->client_request_list);
714       pr->client_request_list = NULL;
715     }
716   if (pr->cp != NULL)
717     {
718       GNUNET_PEER_resolve (pr->cp->pid,
719                            &pid);
720       (void) GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
721                                                    &pid.hashPubKey,
722                                                    pr);
723       pr->cp = NULL;
724     }
725   if (pr->bf != NULL)
726     {
727       GNUNET_CONTAINER_bloomfilter_free (pr->bf);                                        
728       pr->bf = NULL;
729     }
730   if (pr->irc != NULL)
731     {
732       GNUNET_CORE_peer_change_preference_cancel (pr->irc);
733       pr->irc = NULL;
734     }
735   if (pr->replies_seen != NULL)
736     {
737       GNUNET_free (pr->replies_seen);
738       pr->replies_seen = NULL;
739     }
740   if (pr->task != GNUNET_SCHEDULER_NO_TASK)
741     {
742       GNUNET_SCHEDULER_cancel (sched,
743                                pr->task);
744       pr->task = GNUNET_SCHEDULER_NO_TASK;
745     }
746   while (NULL != pr->pending_head)    
747     destroy_pending_message_list_entry (pr->pending_head);
748   GNUNET_PEER_change_rc (pr->target_pid, -1);
749   if (pr->used_pids != NULL)
750     {
751       GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
752       GNUNET_free (pr->used_pids);
753       pr->used_pids_off = 0;
754       pr->used_pids_size = 0;
755       pr->used_pids = NULL;
756     }
757   GNUNET_free (pr);
758 }
759
760
761 /**
762  * Method called whenever a given peer connects.
763  *
764  * @param cls closure, not used
765  * @param peer peer identity this notification is about
766  * @param latency reported latency of the connection with 'other'
767  * @param distance reported distance (DV) to 'other' 
768  */
769 static void 
770 peer_connect_handler (void *cls,
771                       const struct
772                       GNUNET_PeerIdentity * peer,
773                       struct GNUNET_TIME_Relative latency,
774                       uint32_t distance)
775 {
776   struct ConnectedPeer *cp;
777
778   cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
779   cp->pid = GNUNET_PEER_intern (peer);
780   GNUNET_break (GNUNET_OK ==
781                 GNUNET_CONTAINER_multihashmap_put (connected_peers,
782                                                    &peer->hashPubKey,
783                                                    cp,
784                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
785 }
786
787
788 /**
789  * Free (each) request made by the peer.
790  *
791  * @param cls closure, points to peer that the request belongs to
792  * @param key current key code
793  * @param value value in the hash map
794  * @return GNUNET_YES (we should continue to iterate)
795  */
796 static int
797 destroy_request (void *cls,
798                  const GNUNET_HashCode * key,
799                  void *value)
800 {
801   const struct GNUNET_PeerIdentity * peer = cls;
802   struct PendingRequest *pr = value;
803   
804   GNUNET_break (GNUNET_YES ==
805                 GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
806                                                       &peer->hashPubKey,
807                                                       pr));
808   destroy_pending_request (pr);
809   return GNUNET_YES;
810 }
811
812
813 /**
814  * Method called whenever a peer disconnects.
815  *
816  * @param cls closure, not used
817  * @param peer peer identity this notification is about
818  */
819 static void
820 peer_disconnect_handler (void *cls,
821                          const struct
822                          GNUNET_PeerIdentity * peer)
823 {
824   struct ConnectedPeer *cp;
825   struct PendingMessage *pm;
826   unsigned int i;
827
828   GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map,
829                                               &peer->hashPubKey,
830                                               &destroy_request,
831                                               (void*) peer);
832   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
833                                           &peer->hashPubKey);
834   if (cp == NULL)
835     return;
836   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
837     {
838       if (NULL != cp->last_client_replies[i])
839         {
840           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
841           cp->last_client_replies[i] = NULL;
842         }
843     }
844   GNUNET_break (GNUNET_YES ==
845                 GNUNET_CONTAINER_multihashmap_remove (connected_peers,
846                                                       &peer->hashPubKey,
847                                                       cp));
848   GNUNET_PEER_change_rc (cp->pid, -1);
849   GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
850   if (NULL != cp->cth)
851     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
852   while (NULL != (pm = cp->pending_messages_head))
853     destroy_pending_message (pm, 0 /* delivery failed */);
854   GNUNET_break (0 == cp->pending_requests);
855   GNUNET_free (cp);
856 }
857
858
859 /**
860  * Iterator over hash map entries that removes all occurences
861  * of the given 'client' from the 'last_client_replies' of the
862  * given connected peer.
863  *
864  * @param cls closure, the 'struct GNUNET_SERVER_Client*' to remove
865  * @param key current key code (unused)
866  * @param value value in the hash map (the 'struct ConnectedPeer*' to change)
867  * @return GNUNET_YES (we should continue to iterate)
868  */
869 static int
870 remove_client_from_last_client_replies (void *cls,
871                                         const GNUNET_HashCode * key,
872                                         void *value)
873 {
874   struct GNUNET_SERVER_Client *client = cls;
875   struct ConnectedPeer *cp = value;
876   unsigned int i;
877
878   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
879     {
880       if (cp->last_client_replies[i] == client)
881         {
882           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
883           cp->last_client_replies[i] = NULL;
884         }
885     }  
886   return GNUNET_YES;
887 }
888
889
890 /**
891  * A client disconnected.  Remove all of its pending queries.
892  *
893  * @param cls closure, NULL
894  * @param client identification of the client
895  */
896 static void
897 handle_client_disconnect (void *cls,
898                           struct GNUNET_SERVER_Client
899                           * client)
900 {
901   struct ClientList *pos;
902   struct ClientList *prev;
903   struct ClientRequestList *rcl;
904   struct ClientResponseMessage *creply;
905
906   if (client == NULL)
907     return;
908   prev = NULL;
909   pos = client_list;
910   while ( (NULL != pos) &&
911           (pos->client != client) )
912     {
913       prev = pos;
914       pos = pos->next;
915     }
916   if (pos == NULL)
917     return; /* no requests pending for this client */
918   while (NULL != (rcl = pos->rl_head))
919     {
920       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
921                   "Destroying pending request `%s' on disconnect\n",
922                   GNUNET_h2s (&rcl->req->query));
923       destroy_pending_request (rcl->req);
924     }
925   if (prev == NULL)
926     client_list = pos->next;
927   else
928     prev->next = pos->next;
929   if (pos->th != NULL)
930     {
931       GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
932       pos->th = NULL;
933     }
934   while (NULL != (creply = pos->res_head))
935     {
936       GNUNET_CONTAINER_DLL_remove (pos->res_head,
937                                    pos->res_tail,
938                                    creply);
939       GNUNET_free (creply);
940     }    
941   GNUNET_SERVER_client_drop (pos->client);
942   GNUNET_free (pos);
943   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
944                                          &remove_client_from_last_client_replies,
945                                          client);
946 }
947
948
949 /**
950  * Iterator to free peer entries.
951  *
952  * @param cls closure, unused
953  * @param key current key code
954  * @param value value in the hash map (peer entry)
955  * @return GNUNET_YES (we should continue to iterate)
956  */
957 static int 
958 clean_peer (void *cls,
959             const GNUNET_HashCode * key,
960             void *value)
961 {
962   peer_disconnect_handler (NULL, (const struct GNUNET_PeerIdentity*) key);
963   return GNUNET_YES;
964 }
965
966
967 /**
968  * Task run during shutdown.
969  *
970  * @param cls unused
971  * @param tc unused
972  */
973 static void
974 shutdown_task (void *cls,
975                const struct GNUNET_SCHEDULER_TaskContext *tc)
976 {
977   while (client_list != NULL)
978     handle_client_disconnect (NULL,
979                               client_list->client);
980   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
981                                          &clean_peer,
982                                          NULL);
983   GNUNET_break (0 == GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap));
984   GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
985   requests_by_expiration_heap = 0;
986   GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
987   connected_peers = NULL;
988   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map));
989   GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
990   query_request_map = NULL;
991   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map));
992   GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
993   peer_request_map = NULL;
994   GNUNET_assert (NULL != core);
995   GNUNET_CORE_disconnect (core);
996   core = NULL;
997   if (stats != NULL)
998     {
999       GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
1000       stats = NULL;
1001     }
1002   GNUNET_DATASTORE_disconnect (dsh,
1003                                GNUNET_NO);
1004   dsh = NULL;
1005   sched = NULL;
1006   cfg = NULL;  
1007 }
1008
1009
1010 /* ******************* Utility functions  ******************** */
1011
1012
1013 /**
1014  * Transmit the given message by copying it to the target buffer
1015  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
1016  * for writing in the meantime.  In that case, do nothing
1017  * (the disconnect or shutdown handler will take care of the rest).
1018  * If we were able to transmit messages and there are still more
1019  * pending, ask core again for further calls to this function.
1020  *
1021  * @param cls closure, pointer to the 'struct ConnectedPeer*'
1022  * @param size number of bytes available in buf
1023  * @param buf where the callee should write the message
1024  * @return number of bytes written to buf
1025  */
1026 static size_t
1027 transmit_to_peer (void *cls,
1028                   size_t size, void *buf)
1029 {
1030   struct ConnectedPeer *cp = cls;
1031   char *cbuf = buf;
1032   struct GNUNET_PeerIdentity pid;
1033   struct PendingMessage *pm;
1034   size_t msize;
1035  
1036   cp->cth = NULL;
1037   if (NULL == buf)
1038     {
1039 #if DEBUG_FS
1040       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1041                   "Dropping message, core too busy.\n");
1042 #endif
1043       return 0;
1044     }
1045   msize = 0;
1046   while ( (NULL != (pm = cp->pending_messages_head) ) &&
1047           (pm->msize <= size) )
1048     {
1049       memcpy (&cbuf[msize], &pm[1], pm->msize);
1050       msize += pm->msize;
1051       size -= pm->msize;
1052       destroy_pending_message (pm, cp->pid);
1053     }
1054   if (NULL != pm)
1055     {
1056       GNUNET_PEER_resolve (cp->pid,
1057                            &pid);
1058       cp->cth = GNUNET_CORE_notify_transmit_ready (core,
1059                                                    pm->priority,
1060                                                    GNUNET_CONSTANTS_SERVICE_TIMEOUT,
1061                                                    &pid,
1062                                                    pm->msize,
1063                                                    &transmit_to_peer,
1064                                                    cp);
1065     }
1066 #if DEBUG_FS
1067   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1068               "Transmitting %u bytes to peer %u.\n",
1069               msize,
1070               cp->pid);
1071 #endif
1072   return msize;
1073 }
1074
1075
1076 /**
1077  * Add a message to the set of pending messages for the given peer.
1078  *
1079  * @param cp peer to send message to
1080  * @param pm message to queue
1081  * @param pr request on which behalf this message is being queued
1082  */
1083 static void
1084 add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
1085                                   struct PendingMessage *pm,
1086                                   struct PendingRequest *pr)
1087 {
1088   struct PendingMessage *pos;
1089   struct PendingMessageList *pml;
1090   struct GNUNET_PeerIdentity pid;
1091
1092   GNUNET_assert (pm->next == NULL);
1093   GNUNET_assert (pm->pml == NULL);    
1094   pml = GNUNET_malloc (sizeof (struct PendingMessageList));
1095   pml->req = pr;
1096   pml->target = cp;
1097   pml->pm = pm;
1098   pm->pml = pml;  
1099   GNUNET_CONTAINER_DLL_insert (pr->pending_head,
1100                                pr->pending_tail,
1101                                pml);
1102   pos = cp->pending_messages_head;
1103   while ( (pos != NULL) &&
1104           (pm->priority < pos->priority) )
1105     pos = pos->next;    
1106   GNUNET_CONTAINER_DLL_insert_after (cp->pending_messages_head,
1107                                      cp->pending_messages_tail,
1108                                      pos,
1109                                      pm);
1110   cp->pending_requests++;
1111   if (cp->pending_requests > MAX_QUEUE_PER_PEER)
1112     destroy_pending_message (cp->pending_messages_tail, 0);  
1113   if (cp->cth == NULL)
1114     {
1115       /* need to schedule transmission */
1116       GNUNET_PEER_resolve (cp->pid, &pid);
1117       cp->cth = GNUNET_CORE_notify_transmit_ready (core,
1118                                                    cp->pending_messages_head->priority,
1119                                                    MAX_TRANSMIT_DELAY,
1120                                                    &pid,
1121                                                    cp->pending_messages_head->msize,
1122                                                    &transmit_to_peer,
1123                                                    cp);
1124     }
1125   if (cp->cth == NULL)
1126     {
1127 #if DEBUG_FS
1128       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1129                   "Failed to schedule transmission with core!\n");
1130 #endif
1131       /* FIXME: call stats (rare, bad case) */
1132     }
1133 }
1134
1135
1136 /**
1137  * Mingle hash with the mingle_number to produce different bits.
1138  */
1139 static void
1140 mingle_hash (const GNUNET_HashCode * in,
1141              int32_t mingle_number, 
1142              GNUNET_HashCode * hc)
1143 {
1144   GNUNET_HashCode m;
1145
1146   GNUNET_CRYPTO_hash (&mingle_number, 
1147                       sizeof (int32_t), 
1148                       &m);
1149   GNUNET_CRYPTO_hash_xor (&m, in, hc);
1150 }
1151
1152
1153 /**
1154  * Test if the load on this peer is too high
1155  * to even consider processing the query at
1156  * all.
1157  * 
1158  * @return GNUNET_YES if the load is too high, GNUNET_NO otherwise
1159  */
1160 static int
1161 test_load_too_high ()
1162 {
1163   return GNUNET_NO; // FIXME
1164 }
1165
1166
1167 /* ******************* Pending Request Refresh Task ******************** */
1168
1169
1170
1171 /**
1172  * We use a random delay to make the timing of requests less
1173  * predictable.  This function returns such a random delay.  We add a base
1174  * delay of MAX_CORK_DELAY (1s).
1175  *
1176  * FIXME: make schedule dependent on the specifics of the request?
1177  * Or bandwidth and number of connected peers and load?
1178  *
1179  * @return random delay to use for some request, between 1s and 1000+TTL_DECREMENT ms
1180  */
1181 static struct GNUNET_TIME_Relative
1182 get_processing_delay ()
1183 {
1184   return 
1185     GNUNET_TIME_relative_add (GNUNET_CONSTANTS_MAX_CORK_DELAY,
1186                               GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
1187                                                              GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1188                                                                                        TTL_DECREMENT)));
1189 }
1190
1191
1192 /**
1193  * We're processing a GET request from another peer and have decided
1194  * to forward it to other peers.  This function is called periodically
1195  * and should forward the request to other peers until we have all
1196  * possible replies.  If we have transmitted the *only* reply to
1197  * the initiator we should destroy the pending request.  If we have
1198  * many replies in the queue to the initiator, we should delay sending
1199  * out more queries until the reply queue has shrunk some.
1200  *
1201  * @param cls our "struct ProcessGetContext *"
1202  * @param tc unused
1203  */
1204 static void
1205 forward_request_task (void *cls,
1206                       const struct GNUNET_SCHEDULER_TaskContext *tc);
1207
1208
1209 /**
1210  * Function called after we either failed or succeeded
1211  * at transmitting a query to a peer.  
1212  *
1213  * @param cls the requests "struct PendingRequest*"
1214  * @param tpid ID of receiving peer, 0 on transmission error
1215  */
1216 static void
1217 transmit_query_continuation (void *cls,
1218                              GNUNET_PEER_Id tpid)
1219 {
1220   struct PendingRequest *pr = cls;
1221
1222   GNUNET_STATISTICS_update (stats,
1223                             gettext_noop ("# queries scheduled for forwarding"),
1224                             -1,
1225                             GNUNET_NO);
1226   if (tpid == 0)   
1227     {
1228 #if DEBUG_FS
1229       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1230                   "Transmission of request failed, will try again later.\n");
1231 #endif
1232       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
1233         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1234                                                  get_processing_delay (),
1235                                                  &forward_request_task,
1236                                                  pr); 
1237       return;    
1238     }
1239   GNUNET_STATISTICS_update (stats,
1240                             gettext_noop ("# queries forwarded"),
1241                             1,
1242                             GNUNET_NO);
1243   GNUNET_PEER_change_rc (tpid, 1);
1244   if (pr->used_pids_off == pr->used_pids_size)
1245     GNUNET_array_grow (pr->used_pids,
1246                        pr->used_pids_size,
1247                        pr->used_pids_size * 2 + 2);
1248   pr->used_pids[pr->used_pids_off++] = tpid;
1249   if (pr->task == GNUNET_SCHEDULER_NO_TASK)
1250     pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1251                                              get_processing_delay (),
1252                                              &forward_request_task,
1253                                              pr);
1254 }
1255
1256
1257 /**
1258  * How many bytes should a bloomfilter be if we have already seen
1259  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
1260  * of bits set per entry.  Furthermore, we should not re-size the
1261  * filter too often (to keep it cheap).
1262  *
1263  * Since other peers will also add entries but not resize the filter,
1264  * we should generally pick a slightly larger size than what the
1265  * strict math would suggest.
1266  *
1267  * @return must be a power of two and smaller or equal to 2^15.
1268  */
1269 static size_t
1270 compute_bloomfilter_size (unsigned int entry_count)
1271 {
1272   size_t size;
1273   unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
1274   uint16_t max = 1 << 15;
1275
1276   if (entry_count > max)
1277     return max;
1278   size = 8;
1279   while ((size < max) && (size < ideal))
1280     size *= 2;
1281   if (size > max)
1282     return max;
1283   return size;
1284 }
1285
1286
1287 /**
1288  * Recalculate our bloom filter for filtering replies.  This function
1289  * will create a new bloom filter from scratch, so it should only be
1290  * called if we have no bloomfilter at all (and hence can create a
1291  * fresh one of minimal size without problems) OR if our peer is the
1292  * initiator (in which case we may resize to larger than mimimum size).
1293  *
1294  * @param pr request for which the BF is to be recomputed
1295  */
1296 static void
1297 refresh_bloomfilter (struct PendingRequest *pr)
1298 {
1299   unsigned int i;
1300   size_t nsize;
1301   GNUNET_HashCode mhash;
1302
1303   nsize = compute_bloomfilter_size (pr->replies_seen_off);
1304   if (nsize == pr->bf_size)
1305     return; /* size not changed */
1306   if (pr->bf != NULL)
1307     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
1308   pr->bf_size = nsize;
1309   pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
1310   pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
1311                                               pr->bf_size,
1312                                               BLOOMFILTER_K);
1313   for (i=0;i<pr->replies_seen_off;i++)
1314     {
1315       mingle_hash (&pr->replies_seen[i], pr->mingle, &mhash);
1316       GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
1317     }
1318 }
1319
1320
1321 /**
1322  * Function called after we've tried to reserve a certain amount of
1323  * bandwidth for a reply.  Check if we succeeded and if so send our
1324  * query.
1325  *
1326  * @param cls the requests "struct PendingRequest*"
1327  * @param peer identifies the peer
1328  * @param bpm_in set to the current bandwidth limit (receiving) for this peer
1329  * @param bpm_out set to the current bandwidth limit (sending) for this peer
1330  * @param amount set to the amount that was actually reserved or unreserved
1331  * @param preference current traffic preference for the given peer
1332  */
1333 static void
1334 target_reservation_cb (void *cls,
1335                        const struct
1336                        GNUNET_PeerIdentity * peer,
1337                        struct GNUNET_BANDWIDTH_Value32NBO bpm_in,
1338                        struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
1339                        int amount,
1340                        uint64_t preference)
1341 {
1342   struct PendingRequest *pr = cls;
1343   struct ConnectedPeer *cp;
1344   struct PendingMessage *pm;
1345   struct GetMessage *gm;
1346   GNUNET_HashCode *ext;
1347   char *bfdata;
1348   size_t msize;
1349   unsigned int k;
1350   int no_route;
1351   uint32_t bm;
1352
1353   pr->irc = NULL;
1354   if (peer == NULL)
1355     {
1356       /* error in communication with core, try again later */
1357       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
1358         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1359                                                  get_processing_delay (),
1360                                                  &forward_request_task,
1361                                                  pr);
1362       return;
1363     }
1364   // (3) transmit, update ttl/priority
1365   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1366                                           &peer->hashPubKey);
1367   if (cp == NULL)
1368     {
1369       /* Peer must have just left */
1370 #if DEBUG_FS
1371       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1372                   "Selected peer disconnected!\n");
1373 #endif
1374       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
1375         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1376                                                  get_processing_delay (),
1377                                                  &forward_request_task,
1378                                                  pr);
1379       return;
1380     }
1381   no_route = GNUNET_NO;
1382   /* FIXME: check against DBLOCK_SIZE and possibly return
1383      amount to reserve; however, this also needs to work
1384      with testcases which currently start out with a far
1385      too low per-peer bw limit, so they would never send
1386      anything.  Big issue. */
1387   if (amount == 0)
1388     {
1389       if (pr->cp == NULL)
1390         {
1391 #if DEBUG_FS > 1
1392           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1393                       "Failed to reserve bandwidth for reply (got %d/%u bytes only)!\n",
1394                       amount,
1395                       DBLOCK_SIZE);
1396 #endif
1397           GNUNET_STATISTICS_update (stats,
1398                                     gettext_noop ("# reply bandwidth reservation requests failed"),
1399                                     1,
1400                                     GNUNET_NO);
1401           if (pr->task == GNUNET_SCHEDULER_NO_TASK)
1402             pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1403                                                      get_processing_delay (),
1404                                                      &forward_request_task,
1405                                                      pr);
1406           return;  /* this target round failed */
1407         }
1408       /* FIXME: if we are "quite" busy, we may still want to skip
1409          this round; need more load detection code! */
1410       no_route = GNUNET_YES;
1411     }
1412   
1413   GNUNET_STATISTICS_update (stats,
1414                             gettext_noop ("# queries scheduled for forwarding"),
1415                             1,
1416                             GNUNET_NO);
1417   /* build message and insert message into priority queue */
1418 #if DEBUG_FS
1419   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1420               "Forwarding request `%s' to `%4s'!\n",
1421               GNUNET_h2s (&pr->query),
1422               GNUNET_i2s (peer));
1423 #endif
1424   k = 0;
1425   bm = 0;
1426   if (GNUNET_YES == no_route)
1427     {
1428       bm |= GET_MESSAGE_BIT_RETURN_TO;
1429       k++;      
1430     }
1431   if (pr->namespace != NULL)
1432     {
1433       bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
1434       k++;
1435     }
1436   if (pr->target_pid != 0)
1437     {
1438       bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
1439       k++;
1440     }
1441   msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
1442   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1443   pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
1444   pm->msize = msize;
1445   gm = (struct GetMessage*) &pm[1];
1446   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
1447   gm->header.size = htons (msize);
1448   gm->type = htonl (pr->type);
1449   pr->remaining_priority /= 2;
1450   gm->priority = htonl (pr->remaining_priority);
1451   gm->ttl = htonl (pr->ttl);
1452   gm->filter_mutator = htonl(pr->mingle); 
1453   gm->hash_bitmap = htonl (bm);
1454   gm->query = pr->query;
1455   ext = (GNUNET_HashCode*) &gm[1];
1456   k = 0;
1457   if (GNUNET_YES == no_route)
1458     GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
1459   if (pr->namespace != NULL)
1460     memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
1461   if (pr->target_pid != 0)
1462     GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
1463   bfdata = (char *) &ext[k];
1464   if (pr->bf != NULL)
1465     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
1466                                                bfdata,
1467                                                pr->bf_size);
1468   pm->cont = &transmit_query_continuation;
1469   pm->cont_cls = pr;
1470   add_to_pending_messages_for_peer (cp, pm, pr);
1471 }
1472
1473
1474 /**
1475  * Closure used for "target_peer_select_cb".
1476  */
1477 struct PeerSelectionContext 
1478 {
1479   /**
1480    * The request for which we are selecting
1481    * peers.
1482    */
1483   struct PendingRequest *pr;
1484
1485   /**
1486    * Current "prime" target.
1487    */
1488   struct GNUNET_PeerIdentity target;
1489
1490   /**
1491    * How much do we like this target?
1492    */
1493   double target_score;
1494
1495 };
1496
1497
1498 /**
1499  * Function called for each connected peer to determine
1500  * which one(s) would make good targets for forwarding.
1501  *
1502  * @param cls closure (struct PeerSelectionContext)
1503  * @param key current key code (peer identity)
1504  * @param value value in the hash map (struct ConnectedPeer)
1505  * @return GNUNET_YES if we should continue to
1506  *         iterate,
1507  *         GNUNET_NO if not.
1508  */
1509 static int
1510 target_peer_select_cb (void *cls,
1511                        const GNUNET_HashCode * key,
1512                        void *value)
1513 {
1514   struct PeerSelectionContext *psc = cls;
1515   struct ConnectedPeer *cp = value;
1516   struct PendingRequest *pr = psc->pr;
1517   double score;
1518   unsigned int i;
1519   unsigned int pc;
1520
1521   /* 1) check that this peer is not the initiator */
1522   if (cp == pr->cp)
1523     return GNUNET_YES; /* skip */          
1524
1525   /* 2) check if we have already (recently) forwarded to this peer */
1526   pc = 0;
1527   for (i=0;i<pr->used_pids_off;i++)
1528     if (pr->used_pids[i] == cp->pid) 
1529       {
1530         pc++;
1531         if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1532                                            RETRY_PROBABILITY_INV))
1533           {
1534 #if DEBUG_FS
1535             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1536                         "NOT re-trying query that was previously transmitted %u times\n",
1537                         (unsigned int) pr->used_pids_off);
1538 #endif
1539             return GNUNET_YES; /* skip */
1540           }
1541       }
1542 #if DEBUG_FS
1543   if (0 < pc)
1544     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1545                 "Re-trying query that was previously transmitted %u times to this peer\n",
1546                 (unsigned int) pc);
1547 #endif
1548   // 3) calculate how much we'd like to forward to this peer
1549   score = 42; // FIXME!
1550   // FIXME: also need API to gather data on responsiveness
1551   // of this peer (we have fields for that in 'cp', but
1552   // they are never set!)
1553   
1554   /* store best-fit in closure */
1555   if (score > psc->target_score)
1556     {
1557       psc->target_score = score;
1558       psc->target.hashPubKey = *key; 
1559     }
1560   return GNUNET_YES;
1561 }
1562   
1563
1564 /**
1565  * The priority level imposes a bound on the maximum
1566  * value for the ttl that can be requested.
1567  *
1568  * @param ttl_in requested ttl
1569  * @param prio given priority
1570  * @return ttl_in if ttl_in is below the limit,
1571  *         otherwise the ttl-limit for the given priority
1572  */
1573 static int32_t
1574 bound_ttl (int32_t ttl_in, uint32_t prio)
1575 {
1576   unsigned long long allowed;
1577
1578   if (ttl_in <= 0)
1579     return ttl_in;
1580   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
1581   if (ttl_in > allowed)      
1582     {
1583       if (allowed >= (1 << 30))
1584         return 1 << 30;
1585       return allowed;
1586     }
1587   return ttl_in;
1588 }
1589
1590
1591 /**
1592  * We're processing a GET request from another peer and have decided
1593  * to forward it to other peers.  This function is called periodically
1594  * and should forward the request to other peers until we have all
1595  * possible replies.  If we have transmitted the *only* reply to
1596  * the initiator we should destroy the pending request.  If we have
1597  * many replies in the queue to the initiator, we should delay sending
1598  * out more queries until the reply queue has shrunk some.
1599  *
1600  * @param cls our "struct ProcessGetContext *"
1601  * @param tc unused
1602  */
1603 static void
1604 forward_request_task (void *cls,
1605                      const struct GNUNET_SCHEDULER_TaskContext *tc)
1606 {
1607   struct PendingRequest *pr = cls;
1608   struct PeerSelectionContext psc;
1609   struct ConnectedPeer *cp; 
1610   struct GNUNET_TIME_Relative delay;
1611
1612   pr->task = GNUNET_SCHEDULER_NO_TASK;
1613   if (pr->irc != NULL)
1614     {
1615 #if DEBUG_FS
1616       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1617                   "Forwarding of query `%s' not attempted due to pending local lookup!\n",
1618                   GNUNET_h2s (&pr->query));
1619 #endif
1620       return; /* already pending */
1621     }
1622   if (GNUNET_YES == pr->local_only)
1623     return; /* configured to not do P2P search */
1624   /* (1) select target */
1625   psc.pr = pr;
1626   psc.target_score = DBL_MIN;
1627   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1628                                          &target_peer_select_cb,
1629                                          &psc);  
1630   if (psc.target_score == DBL_MIN)
1631     {
1632       delay = get_processing_delay ();
1633 #if DEBUG_FS 
1634       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1635                   "No peer selected for forwarding of query `%s', will try again in %llu ms!\n",
1636                   GNUNET_h2s (&pr->query),
1637                   delay.value);
1638 #endif
1639       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1640                                                delay,
1641                                                &forward_request_task,
1642                                                pr);
1643       return; /* nobody selected */
1644     }
1645   /* (3) update TTL/priority */
1646   
1647   if (pr->client_request_list != NULL)
1648     {
1649       /* FIXME: use better algorithm!? */
1650       if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1651                                          4))
1652         pr->priority++;
1653       /* FIXME: bound priority by "customary" priority used by other peers
1654          at this time! */
1655       pr->ttl = bound_ttl (pr->ttl + TTL_DECREMENT * 2,
1656                            pr->priority);
1657 #if DEBUG_FS
1658       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1659                   "Trying query `%s' with priority %u and TTL %d.\n",
1660                   GNUNET_h2s (&pr->query),
1661                   pr->priority,
1662                   pr->ttl);
1663 #endif
1664     }
1665   else
1666     {
1667       /* FIXME: should we do something here as well!? */
1668     }
1669
1670   /* (3) reserve reply bandwidth */
1671   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1672                                           &psc.target.hashPubKey);
1673   GNUNET_assert (NULL != cp);
1674   pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
1675                                                 &psc.target,
1676                                                 GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
1677                                                 GNUNET_BANDWIDTH_value_init ((uint32_t) -1 /* no limit */), 
1678                                                 DBLOCK_SIZE * 2, 
1679                                                 (uint64_t) cp->inc_preference,
1680                                                 &target_reservation_cb,
1681                                                 pr);
1682   cp->inc_preference = 0.0;
1683 }
1684
1685
1686 /* **************************** P2P PUT Handling ************************ */
1687
1688
1689 /**
1690  * Function called after we either failed or succeeded
1691  * at transmitting a reply to a peer.  
1692  *
1693  * @param cls the requests "struct PendingRequest*"
1694  * @param tpid ID of receiving peer, 0 on transmission error
1695  */
1696 static void
1697 transmit_reply_continuation (void *cls,
1698                              GNUNET_PEER_Id tpid)
1699 {
1700   struct PendingRequest *pr = cls;
1701   
1702   switch (pr->type)
1703     {
1704     case GNUNET_BLOCK_TYPE_DBLOCK:
1705     case GNUNET_BLOCK_TYPE_IBLOCK:
1706       /* only one reply expected, done with the request! */
1707       destroy_pending_request (pr);
1708       break;
1709     case GNUNET_BLOCK_TYPE_ANY:
1710     case GNUNET_BLOCK_TYPE_KBLOCK:
1711     case GNUNET_BLOCK_TYPE_SBLOCK:
1712       break;
1713     default:
1714       GNUNET_break (0);
1715       break;
1716     }
1717 }
1718
1719
1720 /**
1721  * Transmit the given message by copying it to the target buffer
1722  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
1723  * for writing in the meantime.  In that case, do nothing
1724  * (the disconnect or shutdown handler will take care of the rest).
1725  * If we were able to transmit messages and there are still more
1726  * pending, ask core again for further calls to this function.
1727  *
1728  * @param cls closure, pointer to the 'struct ClientList*'
1729  * @param size number of bytes available in buf
1730  * @param buf where the callee should write the message
1731  * @return number of bytes written to buf
1732  */
1733 static size_t
1734 transmit_to_client (void *cls,
1735                   size_t size, void *buf)
1736 {
1737   struct ClientList *cl = cls;
1738   char *cbuf = buf;
1739   struct ClientResponseMessage *creply;
1740   size_t msize;
1741   
1742   cl->th = NULL;
1743   if (NULL == buf)
1744     {
1745 #if DEBUG_FS
1746       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1747                   "Not sending reply, client communication problem.\n");
1748 #endif
1749       return 0;
1750     }
1751   msize = 0;
1752   while ( (NULL != (creply = cl->res_head) ) &&
1753           (creply->msize <= size) )
1754     {
1755       memcpy (&cbuf[msize], &creply[1], creply->msize);
1756       msize += creply->msize;
1757       size -= creply->msize;
1758       GNUNET_CONTAINER_DLL_remove (cl->res_head,
1759                                    cl->res_tail,
1760                                    creply);
1761       GNUNET_free (creply);
1762     }
1763   if (NULL != creply)
1764     cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
1765                                                   creply->msize,
1766                                                   GNUNET_TIME_UNIT_FOREVER_REL,
1767                                                   &transmit_to_client,
1768                                                   cl);
1769 #if DEBUG_FS
1770   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1771               "Transmitted %u bytes to client\n",
1772               (unsigned int) msize);
1773 #endif
1774   return msize;
1775 }
1776
1777
1778 /**
1779  * Closure for "process_reply" function.
1780  */
1781 struct ProcessReplyClosure
1782 {
1783   /**
1784    * The data for the reply.
1785    */
1786   const void *data;
1787
1788   // FIXME: add 'struct ConnectedPeer' to track 'last_xxx_replies' here!
1789
1790   /**
1791    * When the reply expires.
1792    */
1793   struct GNUNET_TIME_Absolute expiration;
1794
1795   /**
1796    * Size of data.
1797    */
1798   size_t size;
1799
1800   /**
1801    * Namespace that this reply belongs to
1802    * (if it is of type SBLOCK).
1803    */
1804   GNUNET_HashCode namespace;
1805
1806   /**
1807    * Type of the block.
1808    */
1809   enum GNUNET_BLOCK_Type type;
1810
1811   /**
1812    * How much was this reply worth to us?
1813    */
1814   uint32_t priority;
1815 };
1816
1817
1818 /**
1819  * We have received a reply; handle it!
1820  *
1821  * @param cls response (struct ProcessReplyClosure)
1822  * @param key our query
1823  * @param value value in the hash map (info about the query)
1824  * @return GNUNET_YES (we should continue to iterate)
1825  */
1826 static int
1827 process_reply (void *cls,
1828                const GNUNET_HashCode * key,
1829                void *value)
1830 {
1831   struct ProcessReplyClosure *prq = cls;
1832   struct PendingRequest *pr = value;
1833   struct PendingMessage *reply;
1834   struct ClientResponseMessage *creply;
1835   struct ClientList *cl;
1836   struct PutMessage *pm;
1837   struct ConnectedPeer *cp;
1838   GNUNET_HashCode chash;
1839   GNUNET_HashCode mhash;
1840   size_t msize;
1841
1842 #if DEBUG_FS
1843   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1844               "Matched result (type %u) for query `%s' with pending request\n",
1845               (unsigned int) prq->type,
1846               GNUNET_h2s (key));
1847 #endif  
1848   GNUNET_STATISTICS_update (stats,
1849                             gettext_noop ("# replies received and matched"),
1850                             1,
1851                             GNUNET_NO);
1852   GNUNET_CRYPTO_hash (prq->data,
1853                       prq->size,
1854                       &chash);
1855   switch (prq->type)
1856     {
1857     case GNUNET_BLOCK_TYPE_DBLOCK:
1858     case GNUNET_BLOCK_TYPE_IBLOCK:
1859       /* only possible reply, stop requesting! */
1860       while (NULL != pr->pending_head)
1861         destroy_pending_message_list_entry (pr->pending_head);
1862       if (pr->qe != NULL)
1863         {
1864           if (pr->client_request_list != NULL)
1865             GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
1866                                         GNUNET_YES);
1867           GNUNET_DATASTORE_cancel (pr->qe);
1868           pr->qe = NULL;
1869         }
1870       pr->do_remove = GNUNET_YES;
1871       if (pr->task != GNUNET_SCHEDULER_NO_TASK)
1872         {
1873           GNUNET_SCHEDULER_cancel (sched,
1874                                    pr->task);
1875           pr->task = GNUNET_SCHEDULER_NO_TASK;
1876         }
1877       GNUNET_break (GNUNET_YES ==
1878                     GNUNET_CONTAINER_multihashmap_remove (query_request_map,
1879                                                           key,
1880                                                           pr));
1881       break;
1882     case GNUNET_BLOCK_TYPE_SBLOCK:
1883       if (pr->namespace == NULL)
1884         {
1885           GNUNET_break (0);
1886           return GNUNET_YES;
1887         }
1888       if (0 != memcmp (pr->namespace,
1889                        &prq->namespace,
1890                        sizeof (GNUNET_HashCode)))
1891         {
1892           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1893                       _("Reply mismatched in terms of namespace.  Discarded.\n"));
1894           return GNUNET_YES; /* wrong namespace */      
1895         }
1896       /* then: fall-through! */
1897     case GNUNET_BLOCK_TYPE_KBLOCK:
1898     case GNUNET_BLOCK_TYPE_NBLOCK:
1899       if (pr->bf != NULL) 
1900         {
1901           mingle_hash (&chash, pr->mingle, &mhash);
1902           if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (pr->bf,
1903                                                                &mhash))
1904             {
1905               GNUNET_STATISTICS_update (stats,
1906                                         gettext_noop ("# duplicate replies discarded (bloomfilter)"),
1907                                         1,
1908                                         GNUNET_NO);
1909 #if DEBUG_FS
1910               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1911                           "Duplicate response `%s', discarding.\n",
1912                           GNUNET_h2s (&mhash));
1913 #endif
1914               return GNUNET_YES; /* duplicate */
1915             }
1916 #if DEBUG_FS
1917           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1918                       "New response `%s', adding to filter.\n",
1919                       GNUNET_h2s (&mhash));
1920 #endif
1921         }
1922       if (pr->client_request_list != NULL)
1923         {
1924           if (pr->replies_seen_size == pr->replies_seen_off)
1925             GNUNET_array_grow (pr->replies_seen,
1926                                pr->replies_seen_size,
1927                                pr->replies_seen_size * 2 + 4);  
1928             pr->replies_seen[pr->replies_seen_off++] = chash;         
1929         }
1930       if ( (pr->bf == NULL) ||
1931            (pr->client_request_list != NULL) )
1932         refresh_bloomfilter (pr);
1933       GNUNET_CONTAINER_bloomfilter_add (pr->bf,
1934                                         &mhash);
1935       break;
1936     default:
1937       GNUNET_break (0);
1938       return GNUNET_YES;
1939     }
1940   prq->priority += pr->remaining_priority;
1941   pr->remaining_priority = 0;
1942   if (pr->client_request_list != NULL)
1943     {
1944       GNUNET_STATISTICS_update (stats,
1945                                 gettext_noop ("# replies received for local clients"),
1946                                 1,
1947                                 GNUNET_NO);
1948       cl = pr->client_request_list->client_list;
1949       msize = sizeof (struct PutMessage) + prq->size;
1950       creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage));
1951       creply->msize = msize;
1952       creply->client_list = cl;
1953       GNUNET_CONTAINER_DLL_insert_after (cl->res_head,
1954                                          cl->res_tail,
1955                                          cl->res_tail,
1956                                          creply);      
1957       pm = (struct PutMessage*) &creply[1];
1958       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
1959       pm->header.size = htons (msize);
1960       pm->type = htonl (prq->type);
1961       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
1962       memcpy (&pm[1], prq->data, prq->size);      
1963       if (NULL == cl->th)
1964         {
1965 #if DEBUG_FS
1966           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1967                       "Transmitting result for query `%s' to client\n",
1968                       GNUNET_h2s (key));
1969 #endif  
1970           cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
1971                                                         msize,
1972                                                         GNUNET_TIME_UNIT_FOREVER_REL,
1973                                                         &transmit_to_client,
1974                                                         cl);
1975         }
1976       GNUNET_break (cl->th != NULL);
1977       if (pr->do_remove)                
1978         destroy_pending_request (pr);           
1979     }
1980   else
1981     {
1982       cp = pr->cp;
1983 #if DEBUG_FS
1984       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1985                   "Transmitting result for query `%s' to other peer (PID=%u)\n",
1986                   GNUNET_h2s (key),
1987                   (unsigned int) cp->pid);
1988 #endif  
1989       GNUNET_STATISTICS_update (stats,
1990                                 gettext_noop ("# replies received for other peers"),
1991                                 1,
1992                                 GNUNET_NO);
1993       msize = sizeof (struct PutMessage) + prq->size;
1994       reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
1995       reply->cont = &transmit_reply_continuation;
1996       reply->cont_cls = pr;
1997       reply->msize = msize;
1998       reply->priority = (uint32_t) -1; /* send replies first! */
1999       pm = (struct PutMessage*) &reply[1];
2000       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2001       pm->header.size = htons (msize);
2002       pm->type = htonl (prq->type);
2003       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
2004       memcpy (&pm[1], prq->data, prq->size);
2005       add_to_pending_messages_for_peer (cp, reply, pr);
2006     }
2007   // FIXME: implement hot-path routing statistics keeping!
2008   return GNUNET_YES;
2009 }
2010
2011
2012
2013 /**
2014  * Continuation called to notify client about result of the
2015  * operation.
2016  *
2017  * @param cls closure
2018  * @param success GNUNET_SYSERR on failure
2019  * @param msg NULL on success, otherwise an error message
2020  */
2021 static void 
2022 put_migration_continuation (void *cls,
2023                             int success,
2024                             const char *msg)
2025 {
2026   /* FIXME */
2027 }
2028
2029
2030 /**
2031  * Handle P2P "PUT" message.
2032  *
2033  * @param cls closure, always NULL
2034  * @param other the other peer involved (sender or receiver, NULL
2035  *        for loopback messages where we are both sender and receiver)
2036  * @param message the actual message
2037  * @param latency reported latency of the connection with 'other'
2038  * @param distance reported distance (DV) to 'other' 
2039  * @return GNUNET_OK to keep the connection open,
2040  *         GNUNET_SYSERR to close it (signal serious error)
2041  */
2042 static int
2043 handle_p2p_put (void *cls,
2044                 const struct GNUNET_PeerIdentity *other,
2045                 const struct GNUNET_MessageHeader *message,
2046                 struct GNUNET_TIME_Relative latency,
2047                 uint32_t distance)
2048 {
2049   const struct PutMessage *put;
2050   uint16_t msize;
2051   size_t dsize;
2052   enum GNUNET_BLOCK_Type type;
2053   struct GNUNET_TIME_Absolute expiration;
2054   GNUNET_HashCode query;
2055   struct ProcessReplyClosure prq;
2056   const struct SBlock *sb;
2057
2058   msize = ntohs (message->size);
2059   if (msize < sizeof (struct PutMessage))
2060     {
2061       GNUNET_break_op(0);
2062       return GNUNET_SYSERR;
2063     }
2064   put = (const struct PutMessage*) message;
2065   dsize = msize - sizeof (struct PutMessage);
2066   type = ntohl (put->type);
2067   expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
2068
2069   if (GNUNET_OK !=
2070       GNUNET_BLOCK_check_block (type,
2071                                 &put[1],
2072                                 dsize,
2073                                 &query))
2074     {
2075       GNUNET_break_op (0);
2076       return GNUNET_SYSERR;
2077     }
2078   if (type == GNUNET_BLOCK_TYPE_ONDEMAND)
2079     return GNUNET_SYSERR;
2080   if (GNUNET_BLOCK_TYPE_SBLOCK == type)
2081     { 
2082       sb = (const struct SBlock*) &put[1];
2083       GNUNET_CRYPTO_hash (&sb->subspace,
2084                           sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
2085                           &prq.namespace);
2086     }
2087
2088 #if DEBUG_FS
2089   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2090               "Received result for query `%s' from peer `%4s'\n",
2091               GNUNET_h2s (&query),
2092               GNUNET_i2s (other));
2093 #endif
2094   GNUNET_STATISTICS_update (stats,
2095                             gettext_noop ("# replies received (overall)"),
2096                             1,
2097                             GNUNET_NO);
2098   /* now, lookup 'query' */
2099   prq.data = (const void*) &put[1];
2100   prq.size = dsize;
2101   prq.type = type;
2102   prq.expiration = expiration;
2103   prq.priority = 0;
2104   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
2105                                               &query,
2106                                               &process_reply,
2107                                               &prq);
2108   if (GNUNET_YES == active_migration)
2109     {
2110       GNUNET_DATASTORE_put (dsh,
2111                             0, &query, dsize, &put[1],
2112                             type, prq.priority, 1 /* anonymity */, 
2113                             expiration, 
2114                             1 + prq.priority, MAX_DATASTORE_QUEUE,
2115                             GNUNET_CONSTANTS_SERVICE_TIMEOUT,
2116                             &put_migration_continuation, 
2117                             NULL);
2118     }
2119   return GNUNET_OK;
2120 }
2121
2122
2123 /* **************************** P2P GET Handling ************************ */
2124
2125
2126 /**
2127  * Closure for 'check_duplicate_request_{peer,client}'.
2128  */
2129 struct CheckDuplicateRequestClosure
2130 {
2131   /**
2132    * The new request we should check if it already exists.
2133    */
2134   const struct PendingRequest *pr;
2135
2136   /**
2137    * Existing request found by the checker, NULL if none.
2138    */
2139   struct PendingRequest *have;
2140 };
2141
2142
2143 /**
2144  * Iterator over entries in the 'query_request_map' that
2145  * tries to see if we have the same request pending from
2146  * the same client already.
2147  *
2148  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
2149  * @param key current key code (query, ignored, must match)
2150  * @param value value in the hash map (a 'struct PendingRequest' 
2151  *              that already exists)
2152  * @return GNUNET_YES if we should continue to
2153  *         iterate (no match yet)
2154  *         GNUNET_NO if not (match found).
2155  */
2156 static int
2157 check_duplicate_request_client (void *cls,
2158                                 const GNUNET_HashCode * key,
2159                                 void *value)
2160 {
2161   struct CheckDuplicateRequestClosure *cdc = cls;
2162   struct PendingRequest *have = value;
2163
2164   if (have->client_request_list == NULL)
2165     return GNUNET_YES;
2166   if ( (cdc->pr->client_request_list->client_list->client == have->client_request_list->client_list->client) &&
2167        (cdc->pr != have) )
2168     {
2169       cdc->have = have;
2170       return GNUNET_NO;
2171     }
2172   return GNUNET_YES;
2173 }
2174
2175
2176 /**
2177  * We're processing (local) results for a search request
2178  * from another peer.  Pass applicable results to the
2179  * peer and if we are done either clean up (operation
2180  * complete) or forward to other peers (more results possible).
2181  *
2182  * @param cls our closure (struct LocalGetContext)
2183  * @param key key for the content
2184  * @param size number of bytes in data
2185  * @param data content stored
2186  * @param type type of the content
2187  * @param priority priority of the content
2188  * @param anonymity anonymity-level for the content
2189  * @param expiration expiration time for the content
2190  * @param uid unique identifier for the datum;
2191  *        maybe 0 if no unique identifier is available
2192  */
2193 static void
2194 process_local_reply (void *cls,
2195                      const GNUNET_HashCode * key,
2196                      uint32_t size,
2197                      const void *data,
2198                      enum GNUNET_BLOCK_Type type,
2199                      uint32_t priority,
2200                      uint32_t anonymity,
2201                      struct GNUNET_TIME_Absolute
2202                      expiration, 
2203                      uint64_t uid)
2204 {
2205   struct PendingRequest *pr = cls;
2206   struct ProcessReplyClosure prq;
2207   struct CheckDuplicateRequestClosure cdrc;
2208   const struct SBlock *sb;
2209   GNUNET_HashCode dhash;
2210   GNUNET_HashCode mhash;
2211   GNUNET_HashCode query;
2212   
2213   if (NULL == key)
2214     {
2215 #if DEBUG_FS > 1
2216       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2217                   "Done processing local replies, forwarding request to other peers.\n");
2218 #endif
2219       pr->qe = NULL;
2220       if (pr->client_request_list != NULL)
2221         {
2222           GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
2223                                       GNUNET_YES);
2224           /* Figure out if this is a duplicate request and possibly
2225              merge 'struct PendingRequest' entries */
2226           cdrc.have = NULL;
2227           cdrc.pr = pr;
2228           GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
2229                                                       &pr->query,
2230                                                       &check_duplicate_request_client,
2231                                                       &cdrc);
2232           if (cdrc.have != NULL)
2233             {
2234 #if DEBUG_FS
2235               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2236                           "Received request for block `%s' twice from client, will only request once.\n",
2237                           GNUNET_h2s (&pr->query));
2238 #endif
2239               
2240               destroy_pending_request (pr);
2241               return;
2242             }
2243         }
2244
2245       /* no more results */
2246       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2247         pr->task = GNUNET_SCHEDULER_add_now (sched,
2248                                              &forward_request_task,
2249                                              pr);      
2250       return;
2251     }
2252 #if DEBUG_FS
2253   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2254               "New local response to `%s' of type %u.\n",
2255               GNUNET_h2s (key),
2256               type);
2257 #endif
2258   if (type == GNUNET_BLOCK_TYPE_ONDEMAND)
2259     {
2260 #if DEBUG_FS
2261       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2262                   "Found ONDEMAND block, performing on-demand encoding\n");
2263 #endif
2264       GNUNET_STATISTICS_update (stats,
2265                                 gettext_noop ("# on-demand blocks matched requests"),
2266                                 1,
2267                                 GNUNET_NO);
2268       if (GNUNET_OK != 
2269           GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, 
2270                                             anonymity, expiration, uid, 
2271                                             &process_local_reply,
2272                                             pr))
2273       if (pr->qe != NULL)
2274         GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2275       return;
2276     }
2277   /* check for duplicates */
2278   GNUNET_CRYPTO_hash (data, size, &dhash);
2279   mingle_hash (&dhash, 
2280                pr->mingle,
2281                &mhash);
2282   if ( (pr->bf != NULL) &&
2283        (GNUNET_YES ==
2284         GNUNET_CONTAINER_bloomfilter_test (pr->bf,
2285                                            &mhash)) )
2286     {      
2287 #if DEBUG_FS
2288       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2289                   "Result from datastore filtered by bloomfilter (duplicate).\n");
2290 #endif
2291       GNUNET_STATISTICS_update (stats,
2292                                 gettext_noop ("# results filtered by query bloomfilter"),
2293                                 1,
2294                                 GNUNET_NO);
2295       if (pr->qe != NULL)
2296         GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2297       return;
2298     }
2299 #if DEBUG_FS
2300   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2301               "Found result for query `%s' in local datastore\n",
2302               GNUNET_h2s (key));
2303 #endif
2304   GNUNET_STATISTICS_update (stats,
2305                             gettext_noop ("# results found locally"),
2306                             1,
2307                             GNUNET_NO);
2308   pr->results_found++;
2309   memset (&prq, 0, sizeof (prq));
2310   prq.data = data;
2311   prq.expiration = expiration;
2312   prq.size = size;  
2313   if (GNUNET_BLOCK_TYPE_SBLOCK == type)
2314     { 
2315       sb = (const struct SBlock*) data;
2316       GNUNET_CRYPTO_hash (&sb->subspace,
2317                           sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
2318                           &prq.namespace);
2319     }
2320   if (GNUNET_OK != GNUNET_BLOCK_check_block (type,
2321                                              data,
2322                                              size,
2323                                              &query))
2324     {
2325       GNUNET_break (0);
2326       GNUNET_DATASTORE_remove (dsh,
2327                                key,
2328                                size, data,
2329                                -1, -1, 
2330                                GNUNET_TIME_UNIT_FOREVER_REL,
2331                                NULL, NULL);
2332       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2333       return;
2334     }
2335   prq.type = type;
2336   prq.priority = priority;  
2337   process_reply (&prq, key, pr);
2338
2339   if ( (type == GNUNET_BLOCK_TYPE_DBLOCK) ||
2340        (type == GNUNET_BLOCK_TYPE_IBLOCK) ) 
2341     {
2342       if (pr->qe != NULL)
2343         GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
2344       return;
2345     }
2346   if ( (pr->client_request_list == NULL) &&
2347        ( (GNUNET_YES == test_load_too_high()) ||
2348          (pr->results_found > 5 + 2 * pr->priority) ) )
2349     {
2350 #if DEBUG_FS > 2
2351       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2352                   "Load too high, done with request\n");
2353 #endif
2354       GNUNET_STATISTICS_update (stats,
2355                                 gettext_noop ("# processing result set cut short due to load"),
2356                                 1,
2357                                 GNUNET_NO);
2358       if (pr->qe != NULL)
2359         GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
2360       return;
2361     }
2362   if (pr->qe != NULL)
2363     GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2364 }
2365
2366
2367 /**
2368  * We've received a request with the specified priority.  Bound it
2369  * according to how much we trust the given peer.
2370  * 
2371  * @param prio_in requested priority
2372  * @param cp the peer making the request
2373  * @return effective priority
2374  */
2375 static uint32_t
2376 bound_priority (uint32_t prio_in,
2377                 struct ConnectedPeer *cp)
2378 {
2379   return 0; // FIXME!
2380 }
2381
2382
2383 /**
2384  * Iterator over entries in the 'query_request_map' that
2385  * tries to see if we have the same request pending from
2386  * the same peer already.
2387  *
2388  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
2389  * @param key current key code (query, ignored, must match)
2390  * @param value value in the hash map (a 'struct PendingRequest' 
2391  *              that already exists)
2392  * @return GNUNET_YES if we should continue to
2393  *         iterate (no match yet)
2394  *         GNUNET_NO if not (match found).
2395  */
2396 static int
2397 check_duplicate_request_peer (void *cls,
2398                               const GNUNET_HashCode * key,
2399                               void *value)
2400 {
2401   struct CheckDuplicateRequestClosure *cdc = cls;
2402   struct PendingRequest *have = value;
2403
2404   if (cdc->pr->target_pid == have->target_pid)
2405     {
2406       cdc->have = have;
2407       return GNUNET_NO;
2408     }
2409   return GNUNET_YES;
2410 }
2411
2412
2413 /**
2414  * Handle P2P "GET" request.
2415  *
2416  * @param cls closure, always NULL
2417  * @param other the other peer involved (sender or receiver, NULL
2418  *        for loopback messages where we are both sender and receiver)
2419  * @param message the actual message
2420  * @param latency reported latency of the connection with 'other'
2421  * @param distance reported distance (DV) to 'other' 
2422  * @return GNUNET_OK to keep the connection open,
2423  *         GNUNET_SYSERR to close it (signal serious error)
2424  */
2425 static int
2426 handle_p2p_get (void *cls,
2427                 const struct GNUNET_PeerIdentity *other,
2428                 const struct GNUNET_MessageHeader *message,
2429                 struct GNUNET_TIME_Relative latency,
2430                 uint32_t distance)
2431 {
2432   struct PendingRequest *pr;
2433   struct ConnectedPeer *cp;
2434   struct ConnectedPeer *cps;
2435   struct CheckDuplicateRequestClosure cdc;
2436   struct GNUNET_TIME_Relative timeout;
2437   uint16_t msize;
2438   const struct GetMessage *gm;
2439   unsigned int bits;
2440   const GNUNET_HashCode *opt;
2441   uint32_t bm;
2442   size_t bfsize;
2443   uint32_t ttl_decrement;
2444   enum GNUNET_BLOCK_Type type;
2445   double preference;
2446   int have_ns;
2447
2448   msize = ntohs(message->size);
2449   if (msize < sizeof (struct GetMessage))
2450     {
2451       GNUNET_break_op (0);
2452       return GNUNET_SYSERR;
2453     }
2454   gm = (const struct GetMessage*) message;
2455   type = ntohl (gm->type);
2456   switch (type)
2457     {
2458     case GNUNET_BLOCK_TYPE_ANY:
2459     case GNUNET_BLOCK_TYPE_DBLOCK:
2460     case GNUNET_BLOCK_TYPE_IBLOCK:
2461     case GNUNET_BLOCK_TYPE_KBLOCK:
2462     case GNUNET_BLOCK_TYPE_SBLOCK:
2463       break;
2464     default:
2465       GNUNET_break_op (0);
2466       return GNUNET_SYSERR;
2467     }
2468   bm = ntohl (gm->hash_bitmap);
2469   bits = 0;
2470   while (bm > 0)
2471     {
2472       if (1 == (bm & 1))
2473         bits++;
2474       bm >>= 1;
2475     }
2476   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
2477     {
2478       GNUNET_break_op (0);
2479       return GNUNET_SYSERR;
2480     }  
2481   opt = (const GNUNET_HashCode*) &gm[1];
2482   bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
2483   bm = ntohl (gm->hash_bitmap);
2484   if ( (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) &&
2485        (type != GNUNET_BLOCK_TYPE_SBLOCK) )
2486     {
2487       GNUNET_break_op (0);
2488       return GNUNET_SYSERR;      
2489     }
2490   bits = 0;
2491   cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2492                                            &other->hashPubKey);
2493   if (NULL == cps)
2494     {
2495       /* peer must have just disconnected */
2496       GNUNET_STATISTICS_update (stats,
2497                                 gettext_noop ("# requests dropped due to initiator not being connected"),
2498                                 1,
2499                                 GNUNET_NO);
2500       return GNUNET_SYSERR;
2501     }
2502   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
2503     cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2504                                             &opt[bits++]);
2505   else
2506     cp = cps;
2507   if (cp == NULL)
2508     {
2509 #if DEBUG_FS
2510       if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
2511         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2512                     "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
2513                     GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
2514       
2515       else
2516         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2517                     "Failed to find peer `%4s' in connection set. Dropping query.\n",
2518                     GNUNET_i2s (other));
2519 #endif
2520       GNUNET_STATISTICS_update (stats,
2521                                 gettext_noop ("# requests dropped due to missing reverse route"),
2522                                 1,
2523                                 GNUNET_NO);
2524      /* FIXME: try connect? */
2525       return GNUNET_OK;
2526     }
2527   /* note that we can really only check load here since otherwise
2528      peers could find out that we are overloaded by not being
2529      disconnected after sending us a malformed query... */
2530   if (GNUNET_YES == test_load_too_high ())
2531     {
2532 #if DEBUG_FS
2533       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2534                   "Dropping query from `%s', this peer is too busy.\n",
2535                   GNUNET_i2s (other));
2536 #endif
2537       GNUNET_STATISTICS_update (stats,
2538                                 gettext_noop ("# requests dropped due to high load"),
2539                                 1,
2540                                 GNUNET_NO);
2541       return GNUNET_OK;
2542     }
2543
2544 #if DEBUG_FS 
2545   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2546               "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
2547               GNUNET_h2s (&gm->query),
2548               (unsigned int) type,
2549               GNUNET_i2s (other),
2550               (unsigned int) bm);
2551 #endif
2552   have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
2553   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
2554                       (have_ns ? sizeof(GNUNET_HashCode) : 0));
2555   if (have_ns)
2556     pr->namespace = (GNUNET_HashCode*) &pr[1];
2557   pr->type = type;
2558   pr->mingle = ntohl (gm->filter_mutator);
2559   if (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE))    
2560     memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
2561   if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
2562     pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
2563
2564   pr->anonymity_level = 1;
2565   pr->priority = bound_priority (ntohl (gm->priority), cps);
2566   pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
2567   pr->query = gm->query;
2568   /* decrement ttl (always) */
2569   ttl_decrement = 2 * TTL_DECREMENT +
2570     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2571                               TTL_DECREMENT);
2572   if ( (pr->ttl < 0) &&
2573        (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
2574     {
2575 #if DEBUG_FS
2576       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2577                   "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
2578                   GNUNET_i2s (other),
2579                   pr->ttl,
2580                   ttl_decrement);
2581 #endif
2582       GNUNET_STATISTICS_update (stats,
2583                                 gettext_noop ("# requests dropped due TTL underflow"),
2584                                 1,
2585                                 GNUNET_NO);
2586       /* integer underflow => drop (should be very rare)! */
2587       GNUNET_free (pr);
2588       return GNUNET_OK;
2589     } 
2590   pr->ttl -= ttl_decrement;
2591   pr->start_time = GNUNET_TIME_absolute_get ();
2592
2593   /* get bloom filter */
2594   if (bfsize > 0)
2595     {
2596       pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
2597                                                   bfsize,
2598                                                   BLOOMFILTER_K);
2599       pr->bf_size = bfsize;
2600     }
2601
2602   cdc.have = NULL;
2603   cdc.pr = pr;
2604   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
2605                                               &gm->query,
2606                                               &check_duplicate_request_peer,
2607                                               &cdc);
2608   if (cdc.have != NULL)
2609     {
2610       if (cdc.have->start_time.value + cdc.have->ttl >=
2611           pr->start_time.value + pr->ttl)
2612         {
2613           /* existing request has higher TTL, drop new one! */
2614           cdc.have->priority += pr->priority;
2615           destroy_pending_request (pr);
2616 #if DEBUG_FS
2617           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2618                       "Have existing request with higher TTL, dropping new request.\n",
2619                       GNUNET_i2s (other));
2620 #endif
2621           GNUNET_STATISTICS_update (stats,
2622                                     gettext_noop ("# requests dropped due to higher-TTL request"),
2623                                     1,
2624                                     GNUNET_NO);
2625           return GNUNET_OK;
2626         }
2627       else
2628         {
2629           /* existing request has lower TTL, drop old one! */
2630           pr->priority += cdc.have->priority;
2631           /* Possible optimization: if we have applicable pending
2632              replies in 'cdc.have', we might want to move those over
2633              (this is a really rare special-case, so it is not clear
2634              that this would be worth it) */
2635           destroy_pending_request (cdc.have);
2636           /* keep processing 'pr'! */
2637         }
2638     }
2639
2640   pr->cp = cp;
2641   GNUNET_break (GNUNET_OK ==
2642                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
2643                                                    &gm->query,
2644                                                    pr,
2645                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
2646   GNUNET_break (GNUNET_OK ==
2647                 GNUNET_CONTAINER_multihashmap_put (peer_request_map,
2648                                                    &other->hashPubKey,
2649                                                    pr,
2650                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
2651   
2652   pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
2653                                             pr,
2654                                             pr->start_time.value + pr->ttl);
2655
2656   GNUNET_STATISTICS_update (stats,
2657                             gettext_noop ("# P2P searches received"),
2658                             1,
2659                             GNUNET_NO);
2660   GNUNET_STATISTICS_update (stats,
2661                             gettext_noop ("# P2P searches active"),
2662                             1,
2663                             GNUNET_NO);
2664
2665   /* calculate change in traffic preference */
2666   preference = (double) pr->priority;
2667   if (preference < QUERY_BANDWIDTH_VALUE)
2668     preference = QUERY_BANDWIDTH_VALUE;
2669   cps->inc_preference += preference;
2670
2671   /* process locally */
2672   if (type == GNUNET_BLOCK_TYPE_DBLOCK)
2673     type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
2674   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
2675                                            (pr->priority + 1)); 
2676   pr->qe = GNUNET_DATASTORE_get (dsh,
2677                                  &gm->query,
2678                                  type,                         
2679                                  pr->priority + 1,
2680                                  MAX_DATASTORE_QUEUE,                            
2681                                  timeout,
2682                                  &process_local_reply,
2683                                  pr);
2684
2685   /* Are multiple results possible?  If so, start processing remotely now! */
2686   switch (pr->type)
2687     {
2688     case GNUNET_BLOCK_TYPE_DBLOCK:
2689     case GNUNET_BLOCK_TYPE_IBLOCK:
2690       /* only one result, wait for datastore */
2691       break;
2692     default:
2693       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2694         pr->task = GNUNET_SCHEDULER_add_now (sched,
2695                                              &forward_request_task,
2696                                              pr);
2697     }
2698
2699   /* make sure we don't track too many requests */
2700   if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
2701     {
2702       pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
2703       destroy_pending_request (pr);
2704     }
2705   return GNUNET_OK;
2706 }
2707
2708
2709 /* **************************** CS GET Handling ************************ */
2710
2711
2712 /**
2713  * Handle START_SEARCH-message (search request from client).
2714  *
2715  * @param cls closure
2716  * @param client identification of the client
2717  * @param message the actual message
2718  */
2719 static void
2720 handle_start_search (void *cls,
2721                      struct GNUNET_SERVER_Client *client,
2722                      const struct GNUNET_MessageHeader *message)
2723 {
2724   static GNUNET_HashCode all_zeros;
2725   const struct SearchMessage *sm;
2726   struct ClientList *cl;
2727   struct ClientRequestList *crl;
2728   struct PendingRequest *pr;
2729   uint16_t msize;
2730   unsigned int sc;
2731   enum GNUNET_BLOCK_Type type;
2732
2733   msize = ntohs (message->size);
2734   if ( (msize < sizeof (struct SearchMessage)) ||
2735        (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
2736     {
2737       GNUNET_break (0);
2738       GNUNET_SERVER_receive_done (client,
2739                                   GNUNET_SYSERR);
2740       return;
2741     }
2742   GNUNET_STATISTICS_update (stats,
2743                             gettext_noop ("# client searches received"),
2744                             1,
2745                             GNUNET_NO);
2746   sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
2747   sm = (const struct SearchMessage*) message;
2748   type = ntohl (sm->type);
2749 #if DEBUG_FS
2750   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2751               "Received request for `%s' of type %u from local client\n",
2752               GNUNET_h2s (&sm->query),
2753               (unsigned int) type);
2754 #endif
2755   switch (type)
2756     {
2757     case GNUNET_BLOCK_TYPE_ANY:
2758     case GNUNET_BLOCK_TYPE_DBLOCK:
2759     case GNUNET_BLOCK_TYPE_IBLOCK:
2760     case GNUNET_BLOCK_TYPE_KBLOCK:
2761     case GNUNET_BLOCK_TYPE_SBLOCK:
2762     case GNUNET_BLOCK_TYPE_NBLOCK:
2763       break;
2764     default:
2765       GNUNET_break (0);
2766       GNUNET_SERVER_receive_done (client,
2767                                   GNUNET_SYSERR);
2768       return;
2769     }  
2770
2771   cl = client_list;
2772   while ( (cl != NULL) &&
2773           (cl->client != client) )
2774     cl = cl->next;
2775   if (cl == NULL)
2776     {
2777       cl = GNUNET_malloc (sizeof (struct ClientList));
2778       cl->client = client;
2779       GNUNET_SERVER_client_keep (client);
2780       cl->next = client_list;
2781       client_list = cl;
2782     }
2783   /* detect duplicate KBLOCK requests */
2784   if ( (type == GNUNET_BLOCK_TYPE_KBLOCK) ||
2785        (type == GNUNET_BLOCK_TYPE_NBLOCK) ||
2786        (type == GNUNET_BLOCK_TYPE_ANY) )
2787     {
2788       crl = cl->rl_head;
2789       while ( (crl != NULL) &&
2790               ( (0 != memcmp (&crl->req->query,
2791                               &sm->query,
2792                               sizeof (GNUNET_HashCode))) ||
2793                 (crl->req->type != type) ) )
2794         crl = crl->next;
2795       if (crl != NULL)  
2796         { 
2797 #if DEBUG_FS
2798           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2799                       "Have existing request, merging content-seen lists.\n");
2800 #endif
2801           pr = crl->req;
2802           /* Duplicate request (used to send long list of
2803              known/blocked results); merge 'pr->replies_seen'
2804              and update bloom filter */
2805           GNUNET_array_grow (pr->replies_seen,
2806                              pr->replies_seen_size,
2807                              pr->replies_seen_off + sc);
2808           memcpy (&pr->replies_seen[pr->replies_seen_off],
2809                   &sm[1],
2810                   sc * sizeof (GNUNET_HashCode));
2811           pr->replies_seen_off += sc;
2812           refresh_bloomfilter (pr);
2813           GNUNET_STATISTICS_update (stats,
2814                                     gettext_noop ("# client searches updated (merged content seen list)"),
2815                                     1,
2816                                     GNUNET_NO);
2817           GNUNET_SERVER_receive_done (client,
2818                                       GNUNET_OK);
2819           return;
2820         }
2821     }
2822   GNUNET_STATISTICS_update (stats,
2823                             gettext_noop ("# client searches active"),
2824                             1,
2825                             GNUNET_NO);
2826   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
2827                       ((type == GNUNET_BLOCK_TYPE_SBLOCK) ? sizeof(GNUNET_HashCode) : 0));
2828   crl = GNUNET_malloc (sizeof (struct ClientRequestList));
2829   memset (crl, 0, sizeof (struct ClientRequestList));
2830   crl->client_list = cl;
2831   GNUNET_CONTAINER_DLL_insert (cl->rl_head,
2832                                cl->rl_tail,
2833                                crl);  
2834   crl->req = pr;
2835   pr->type = type;
2836   pr->client_request_list = crl;
2837   GNUNET_array_grow (pr->replies_seen,
2838                      pr->replies_seen_size,
2839                      sc);
2840   memcpy (pr->replies_seen,
2841           &sm[1],
2842           sc * sizeof (GNUNET_HashCode));
2843   pr->replies_seen_off = sc;
2844   pr->anonymity_level = ntohl (sm->anonymity_level); 
2845   refresh_bloomfilter (pr);
2846   pr->query = sm->query;
2847   if (0 == (1 & ntohl (sm->options)))
2848     pr->local_only = GNUNET_NO;
2849   else
2850     pr->local_only = GNUNET_YES;
2851   switch (type)
2852     {
2853     case GNUNET_BLOCK_TYPE_DBLOCK:
2854     case GNUNET_BLOCK_TYPE_IBLOCK:
2855       if (0 != memcmp (&sm->target,
2856                        &all_zeros,
2857                        sizeof (GNUNET_HashCode)))
2858         pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target);
2859       break;
2860     case GNUNET_BLOCK_TYPE_SBLOCK:
2861       pr->namespace = (GNUNET_HashCode*) &pr[1];
2862       memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode));
2863       break;
2864     default:
2865       break;
2866     }
2867   GNUNET_break (GNUNET_OK ==
2868                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
2869                                                    &sm->query,
2870                                                    pr,
2871                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
2872   if (type == GNUNET_BLOCK_TYPE_DBLOCK)
2873     type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */
2874   pr->qe = GNUNET_DATASTORE_get (dsh,
2875                                  &sm->query,
2876                                  type,
2877                                  -3, -1,
2878                                  GNUNET_CONSTANTS_SERVICE_TIMEOUT,                             
2879                                  &process_local_reply,
2880                                  pr);
2881 }
2882
2883
2884 /* **************************** Startup ************************ */
2885
2886
2887 /**
2888  * List of handlers for P2P messages
2889  * that we care about.
2890  */
2891 static struct GNUNET_CORE_MessageHandler p2p_handlers[] =
2892   {
2893     { &handle_p2p_get, 
2894       GNUNET_MESSAGE_TYPE_FS_GET, 0 },
2895     { &handle_p2p_put, 
2896       GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
2897     { NULL, 0, 0 }
2898   };
2899
2900
2901 /**
2902  * List of handlers for the messages understood by this
2903  * service.
2904  */
2905 static struct GNUNET_SERVER_MessageHandler handlers[] = {
2906   {&GNUNET_FS_handle_index_start, NULL, 
2907    GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
2908   {&GNUNET_FS_handle_index_list_get, NULL, 
2909    GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
2910   {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
2911    sizeof (struct UnindexMessage) },
2912   {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
2913    0 },
2914   {NULL, NULL, 0, 0}
2915 };
2916
2917
2918 /**
2919  * Process fs requests.
2920  *
2921  * @param s scheduler to use
2922  * @param server the initialized server
2923  * @param c configuration to use
2924  */
2925 static int
2926 main_init (struct GNUNET_SCHEDULER_Handle *s,
2927            struct GNUNET_SERVER_Handle *server,
2928            const struct GNUNET_CONFIGURATION_Handle *c)
2929 {
2930   sched = s;
2931   cfg = c;
2932   stats = GNUNET_STATISTICS_create (sched, "fs", cfg);
2933   connected_peers = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
2934   query_request_map = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
2935   peer_request_map = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
2936   requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
2937   core = GNUNET_CORE_connect (sched,
2938                               cfg,
2939                               GNUNET_TIME_UNIT_FOREVER_REL,
2940                               NULL,
2941                               NULL,
2942                               &peer_connect_handler,
2943                               &peer_disconnect_handler,
2944                               NULL, GNUNET_NO,
2945                               NULL, GNUNET_NO,
2946                               p2p_handlers);
2947   if (NULL == core)
2948     {
2949       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2950                   _("Failed to connect to `%s' service.\n"),
2951                   "core");
2952       GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
2953       connected_peers = NULL;
2954       GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
2955       query_request_map = NULL;
2956       GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
2957       requests_by_expiration_heap = NULL;
2958       GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
2959       peer_request_map = NULL;
2960       if (dsh != NULL)
2961         {
2962           GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
2963           dsh = NULL;
2964         }
2965       return GNUNET_SYSERR;
2966     }
2967   GNUNET_SERVER_disconnect_notify (server, 
2968                                    &handle_client_disconnect,
2969                                    NULL);
2970   GNUNET_SERVER_add_handlers (server, handlers);
2971   GNUNET_SCHEDULER_add_delayed (sched,
2972                                 GNUNET_TIME_UNIT_FOREVER_REL,
2973                                 &shutdown_task,
2974                                 NULL);
2975   return GNUNET_OK;
2976 }
2977
2978
2979 /**
2980  * Process fs requests.
2981  *
2982  * @param cls closure
2983  * @param sched scheduler to use
2984  * @param server the initialized server
2985  * @param cfg configuration to use
2986  */
2987 static void
2988 run (void *cls,
2989      struct GNUNET_SCHEDULER_Handle *sched,
2990      struct GNUNET_SERVER_Handle *server,
2991      const struct GNUNET_CONFIGURATION_Handle *cfg)
2992 {
2993   active_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
2994                                                            "FS",
2995                                                            "ACTIVEMIGRATION");
2996   dsh = GNUNET_DATASTORE_connect (cfg,
2997                                   sched);
2998   if (dsh == NULL)
2999     {
3000       GNUNET_SCHEDULER_shutdown (sched);
3001       return;
3002     }
3003   if ( (GNUNET_OK != GNUNET_FS_indexing_init (sched, cfg, dsh)) ||
3004        (GNUNET_OK != main_init (sched, server, cfg)) )
3005     {    
3006       GNUNET_SCHEDULER_shutdown (sched);
3007       GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
3008       dsh = NULL;
3009       return;   
3010     }
3011 }
3012
3013
3014 /**
3015  * The main function for the fs service.
3016  *
3017  * @param argc number of arguments from the command line
3018  * @param argv command line arguments
3019  * @return 0 ok, 1 on error
3020  */
3021 int
3022 main (int argc, char *const *argv)
3023 {
3024   return (GNUNET_OK ==
3025           GNUNET_SERVICE_run (argc,
3026                               argv,
3027                               "fs",
3028                               GNUNET_SERVICE_OPTION_NONE,
3029                               &run, NULL)) ? 0 : 1;
3030 }
3031
3032 /* end of gnunet-service-fs.c */