fixes
[oweals/gnunet.git] / src / fs / gnunet-service-fs.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs.c
23  * @brief gnunet anonymity protocol implementation
24  * @author Christian Grothoff
25  *
26  * FIXME:
27  * - TTL/priority calculations are absent!
28  * TODO:
29  * - have non-zero preference / priority for requests we initiate!
30  * - track stats for hot-path routing
31  * - implement hot-path routing decision procedure
32  * - implement: bound_priority, test_load_too_high, validate_nblock
33  * - add content migration support (store locally) [or create new service]
34  * - statistics
35  */
36 #include "platform.h"
37 #include <float.h>
38 #include "gnunet_constants.h"
39 #include "gnunet_core_service.h"
40 #include "gnunet_datastore_service.h"
41 #include "gnunet_peer_lib.h"
42 #include "gnunet_protocols.h"
43 #include "gnunet_signatures.h"
44 #include "gnunet_statistics_service.h"
45 #include "gnunet_util_lib.h"
46 #include "gnunet-service-fs_indexing.h"
47 #include "fs.h"
48
49 #define DEBUG_FS GNUNET_YES
50
51 /**
52  * Maximum number of outgoing messages we queue per peer.
53  * FIXME: make configurable?
54  */
55 #define MAX_QUEUE_PER_PEER 16
56
57 /**
58  * Inverse of the probability that we will submit the same query
59  * to the same peer again.  If the same peer already got the query
60  * repeatedly recently, the probability is multiplied by the inverse
61  * of this number each time.  Note that we only try about every TTL_DECREMENT/2
62  * plus MAX_CORK_DELAY (so roughly every 3.5s).
63  */
64 #define RETRY_PROBABILITY_INV 3
65
66 /**
67  * What is the maximum delay for a P2P FS message (in our interaction
68  * with core)?  FS-internal delays are another story.  The value is
69  * chosen based on the 32k block size.  Given that peers typcially
70  * have at least 1 kb/s bandwidth, 45s waits give us a chance to
71  * transmit one message even to the lowest-bandwidth peers.
72  */
73 #define MAX_TRANSMIT_DELAY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 45)
74
75
76
77 /**
78  * Maximum number of requests (from other peers) that we're
79  * willing to have pending at any given point in time.
80  * FIXME: set from configuration.
81  */
82 static uint64_t max_pending_requests = (32 * 1024);
83
84
85 /**
86  * Information we keep for each pending reply.  The
87  * actual message follows at the end of this struct.
88  */
89 struct PendingMessage;
90
91 /**
92  * Our connection to the datastore.
93  */
94 static struct GNUNET_DATASTORE_Handle *dsh;
95
96
97 /**
98  * Function called upon completion of a transmission.
99  *
100  * @param cls closure
101  * @param pid ID of receiving peer, 0 on transmission error
102  */
103 typedef void (*TransmissionContinuation)(void * cls, 
104                                          GNUNET_PEER_Id tpid);
105
106
107 /**
108  * Information we keep for each pending message (GET/PUT).  The
109  * actual message follows at the end of this struct.
110  */
111 struct PendingMessage
112 {
113   /**
114    * This is a doubly-linked list of messages to the same peer.
115    */
116   struct PendingMessage *next;
117
118   /**
119    * This is a doubly-linked list of messages to the same peer.
120    */
121   struct PendingMessage *prev;
122
123   /**
124    * Entry in pending message list for this pending message.
125    */ 
126   struct PendingMessageList *pml;  
127
128   /**
129    * Function to call immediately once we have transmitted this
130    * message.
131    */
132   TransmissionContinuation cont;
133
134   /**
135    * Closure for cont.
136    */
137   void *cont_cls;
138
139   /**
140    * Size of the reply; actual reply message follows
141    * at the end of this struct.
142    */
143   size_t msize;
144   
145   /**
146    * How important is this message for us?
147    */
148   uint32_t priority;
149  
150 };
151
152
153 /**
154  * Information about a peer that we are connected to.
155  * We track data that is useful for determining which
156  * peers should receive our requests.  We also keep
157  * a list of messages to transmit to this peer.
158  */
159 struct ConnectedPeer
160 {
161
162   /**
163    * List of the last clients for which this peer successfully
164    * answered a query.
165    */
166   struct GNUNET_SERVER_Client *last_client_replies[CS2P_SUCCESS_LIST_SIZE];
167
168   /**
169    * List of the last PIDs for which
170    * this peer successfully answered a query;
171    * We use 0 to indicate no successful reply.
172    */
173   GNUNET_PEER_Id last_p2p_replies[P2P_SUCCESS_LIST_SIZE];
174
175   /**
176    * Average delay between sending the peer a request and
177    * getting a reply (only calculated over the requests for
178    * which we actually got a reply).   Calculated
179    * as a moving average: new_delay = ((n-1)*last_delay+curr_delay) / n
180    */ 
181   struct GNUNET_TIME_Relative avg_delay;
182
183   /**
184    * Handle for an active request for transmission to this
185    * peer, or NULL.
186    */
187   struct GNUNET_CORE_TransmitHandle *cth;
188
189   /**
190    * Messages (replies, queries, content migration) we would like to
191    * send to this peer in the near future.  Sorted by priority, head.
192    */
193   struct PendingMessage *pending_messages_head;
194
195   /**
196    * Messages (replies, queries, content migration) we would like to
197    * send to this peer in the near future.  Sorted by priority, tail.
198    */
199   struct PendingMessage *pending_messages_tail;
200
201   /**
202    * Average priority of successful replies.  Calculated
203    * as a moving average: new_avg = ((n-1)*last_avg+curr_prio) / n
204    */
205   double avg_priority;
206
207   /**
208    * Increase in traffic preference still to be submitted
209    * to the core service for this peer. FIXME: double or 'uint64_t'?
210    */
211   double inc_preference;
212
213   /**
214    * The peer's identity.
215    */
216   GNUNET_PEER_Id pid;  
217
218   /**
219    * Size of the linked list of 'pending_messages'.
220    */
221   unsigned int pending_requests;
222
223   /**
224    * Which offset in "last_p2p_replies" will be updated next?
225    * (we go round-robin).
226    */
227   unsigned int last_p2p_replies_woff;
228
229   /**
230    * Which offset in "last_client_replies" will be updated next?
231    * (we go round-robin).
232    */
233   unsigned int last_client_replies_woff;
234
235 };
236
237
238 /**
239  * Information we keep for each pending request.  We should try to
240  * keep this struct as small as possible since its memory consumption
241  * is key to how many requests we can have pending at once.
242  */
243 struct PendingRequest;
244
245
246 /**
247  * Doubly-linked list of requests we are performing
248  * on behalf of the same client.
249  */
250 struct ClientRequestList
251 {
252
253   /**
254    * This is a doubly-linked list.
255    */
256   struct ClientRequestList *next;
257
258   /**
259    * This is a doubly-linked list.
260    */
261   struct ClientRequestList *prev;
262
263   /**
264    * Request this entry represents.
265    */
266   struct PendingRequest *req;
267
268   /**
269    * Client list this request belongs to.
270    */
271   struct ClientList *client_list;
272
273 };
274
275
276 /**
277  * Replies to be transmitted to the client.  The actual
278  * response message is allocated after this struct.
279  */
280 struct ClientResponseMessage
281 {
282   /**
283    * This is a doubly-linked list.
284    */
285   struct ClientResponseMessage *next;
286
287   /**
288    * This is a doubly-linked list.
289    */
290   struct ClientResponseMessage *prev;
291
292   /**
293    * Client list entry this response belongs to.
294    */
295   struct ClientList *client_list;
296
297   /**
298    * Number of bytes in the response.
299    */
300   size_t msize;
301 };
302
303
304 /**
305  * Linked list of clients we are performing requests
306  * for right now.
307  */
308 struct ClientList
309 {
310   /**
311    * This is a linked list.
312    */
313   struct ClientList *next;
314
315   /**
316    * ID of a client making a request, NULL if this entry is for a
317    * peer.
318    */
319   struct GNUNET_SERVER_Client *client;
320
321   /**
322    * Head of list of requests performed on behalf
323    * of this client right now.
324    */
325   struct ClientRequestList *rl_head;
326
327   /**
328    * Tail of list of requests performed on behalf
329    * of this client right now.
330    */
331   struct ClientRequestList *rl_tail;
332
333   /**
334    * Head of linked list of responses.
335    */
336   struct ClientResponseMessage *res_head;
337
338   /**
339    * Tail of linked list of responses.
340    */
341   struct ClientResponseMessage *res_tail;
342
343   /**
344    * Context for sending replies.
345    */
346   struct GNUNET_CONNECTION_TransmitHandle *th;
347
348 };
349
350
351 /**
352  * Doubly-linked list of messages we are performing
353  * due to a pending request.
354  */
355 struct PendingMessageList
356 {
357
358   /**
359    * This is a doubly-linked list of messages on behalf of the same request.
360    */
361   struct PendingMessageList *next;
362
363   /**
364    * This is a doubly-linked list of messages on behalf of the same request.
365    */
366   struct PendingMessageList *prev;
367
368   /**
369    * Message this entry represents.
370    */
371   struct PendingMessage *pm;
372
373   /**
374    * Request this entry belongs to.
375    */
376   struct PendingRequest *req;
377
378   /**
379    * Peer this message is targeted for.
380    */
381   struct ConnectedPeer *target;
382
383 };
384
385
386 /**
387  * Information we keep for each pending request.  We should try to
388  * keep this struct as small as possible since its memory consumption
389  * is key to how many requests we can have pending at once.
390  */
391 struct PendingRequest
392 {
393
394   /**
395    * If this request was made by a client, this is our entry in the
396    * client request list; otherwise NULL.
397    */
398   struct ClientRequestList *client_request_list;
399
400   /**
401    * Entry of peer responsible for this entry (if this request
402    * was made by a peer).
403    */
404   struct ConnectedPeer *cp;
405
406   /**
407    * If this is a namespace query, pointer to the hash of the public
408    * key of the namespace; otherwise NULL.  Pointer will be to the 
409    * end of this struct (so no need to free it).
410    */
411   const GNUNET_HashCode *namespace;
412
413   /**
414    * Bloomfilter we use to filter out replies that we don't care about
415    * (anymore).  NULL as long as we are interested in all replies.
416    */
417   struct GNUNET_CONTAINER_BloomFilter *bf;
418
419   /**
420    * Context of our GNUNET_CORE_peer_change_preference call.
421    */
422   struct GNUNET_CORE_InformationRequestContext *irc;
423
424   /**
425    * Hash code of all replies that we have seen so far (only valid
426    * if client is not NULL since we only track replies like this for
427    * our own clients).
428    */
429   GNUNET_HashCode *replies_seen;
430
431   /**
432    * Node in the heap representing this entry; NULL
433    * if we have no heap node.
434    */
435   struct GNUNET_CONTAINER_HeapNode *hnode;
436
437   /**
438    * Head of list of messages being performed on behalf of this
439    * request.
440    */
441   struct PendingMessageList *pending_head;
442
443   /**
444    * Tail of list of messages being performed on behalf of this
445    * request.
446    */
447   struct PendingMessageList *pending_tail;
448
449   /**
450    * When did we first see this request (form this peer), or, if our
451    * client is initiating, when did we last initiate a search?
452    */
453   struct GNUNET_TIME_Absolute start_time;
454
455   /**
456    * The query that this request is for.
457    */
458   GNUNET_HashCode query;
459
460   /**
461    * The task responsible for transmitting queries
462    * for this request.
463    */
464   GNUNET_SCHEDULER_TaskIdentifier task;
465
466   /**
467    * (Interned) Peer identifier that identifies a preferred target
468    * for requests.
469    */
470   GNUNET_PEER_Id target_pid;
471
472   /**
473    * (Interned) Peer identifiers of peers that have already
474    * received our query for this content.
475    */
476   GNUNET_PEER_Id *used_pids;
477   
478   /**
479    * Our entry in the queue (non-NULL while we wait for our
480    * turn to interact with the local database).
481    */
482   struct GNUNET_DATASTORE_QueueEntry *qe;
483
484   /**
485    * Size of the 'bf' (in bytes).
486    */
487   size_t bf_size;
488
489   /**
490    * Desired anonymity level; only valid for requests from a local client.
491    */
492   uint32_t anonymity_level;
493
494   /**
495    * How many entries in "used_pids" are actually valid?
496    */
497   unsigned int used_pids_off;
498
499   /**
500    * How long is the "used_pids" array?
501    */
502   unsigned int used_pids_size;
503
504   /**
505    * Number of results found for this request.
506    */
507   unsigned int results_found;
508
509   /**
510    * How many entries in "replies_seen" are actually valid?
511    */
512   unsigned int replies_seen_off;
513
514   /**
515    * How long is the "replies_seen" array?
516    */
517   unsigned int replies_seen_size;
518   
519   /**
520    * Priority with which this request was made.  If one of our clients
521    * made the request, then this is the current priority that we are
522    * using when initiating the request.  This value is used when
523    * we decide to reward other peers with trust for providing a reply.
524    */
525   uint32_t priority;
526
527   /**
528    * Priority points left for us to spend when forwarding this request
529    * to other peers.
530    */
531   uint32_t remaining_priority;
532
533   /**
534    * Number to mingle hashes for bloom-filter tests with.
535    */
536   int32_t mingle;
537
538   /**
539    * TTL with which we saw this request (or, if we initiated, TTL that
540    * we used for the request).
541    */
542   int32_t ttl;
543   
544   /**
545    * Type of the content that this request is for.
546    */
547   enum GNUNET_BLOCK_Type type;
548
549   /**
550    * Remove this request after transmission of the current response.
551    */
552   int16_t do_remove;
553
554   /**
555    * GNUNET_YES if we should not forward this request to other peers.
556    */
557   int16_t local_only;
558
559 };
560
561
562 /**
563  * Our scheduler.
564  */
565 static struct GNUNET_SCHEDULER_Handle *sched;
566
567 /**
568  * Our configuration.
569  */
570 static const struct GNUNET_CONFIGURATION_Handle *cfg;
571
572 /**
573  * Map of peer identifiers to "struct ConnectedPeer" (for that peer).
574  */
575 static struct GNUNET_CONTAINER_MultiHashMap *connected_peers;
576
577 /**
578  * Map of peer identifiers to "struct PendingRequest" (for that peer).
579  */
580 static struct GNUNET_CONTAINER_MultiHashMap *peer_request_map;
581
582 /**
583  * Map of query identifiers to "struct PendingRequest" (for that query).
584  */
585 static struct GNUNET_CONTAINER_MultiHashMap *query_request_map;
586
587 /**
588  * Heap with the request that will expire next at the top.  Contains
589  * pointers of type "struct PendingRequest*"; these will *also* be
590  * aliased from the "requests_by_peer" data structures and the
591  * "requests_by_query" table.  Note that requests from our clients
592  * don't expire and are thus NOT in the "requests_by_expiration"
593  * (or the "requests_by_peer" tables).
594  */
595 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
596
597 /**
598  * Handle for reporting statistics.
599  */
600 static struct GNUNET_STATISTICS_Handle *stats;
601
602 /**
603  * Linked list of clients we are currently processing requests for.
604  */
605 static struct ClientList *client_list;
606
607 /**
608  * Pointer to handle to the core service (points to NULL until we've
609  * connected to it).
610  */
611 static struct GNUNET_CORE_Handle *core;
612
613 /**
614  * Are we allowed to migrate content to this peer.
615  */
616 static int active_migration;
617
618 /* ******************* clean up functions ************************ */
619
620
621 /**
622  * We're done with a particular message list entry.
623  * Free all associated resources.
624  * 
625  * @param pml entry to destroy
626  */
627 static void
628 destroy_pending_message_list_entry (struct PendingMessageList *pml)
629 {
630   GNUNET_CONTAINER_DLL_remove (pml->req->pending_head,
631                                pml->req->pending_tail,
632                                pml);
633   GNUNET_CONTAINER_DLL_remove (pml->target->pending_messages_head,
634                                pml->target->pending_messages_tail,
635                                pml->pm);
636   pml->target->pending_requests--;
637   GNUNET_free (pml->pm);
638   GNUNET_free (pml);
639 }
640
641
642 /**
643  * Destroy the given pending message (and call the respective
644  * continuation).
645  *
646  * @param pm message to destroy
647  * @param tpid id of peer that the message was delivered to, or 0 for none
648  */
649 static void
650 destroy_pending_message (struct PendingMessage *pm,
651                          GNUNET_PEER_Id tpid)
652 {
653   struct PendingMessageList *pml = pm->pml;
654   TransmissionContinuation cont;
655   void *cont_cls;
656
657   GNUNET_assert (pml->pm == pm);
658   GNUNET_assert ( (tpid == 0) || (tpid == pml->target->pid) );
659   cont = pm->cont;
660   cont_cls = pm->cont_cls;
661   destroy_pending_message_list_entry (pml);
662   cont (cont_cls, tpid);  
663 }
664
665
666 /**
667  * We're done processing a particular request.
668  * Free all associated resources.
669  *
670  * @param pr request to destroy
671  */
672 static void
673 destroy_pending_request (struct PendingRequest *pr)
674 {
675   struct GNUNET_PeerIdentity pid;
676
677   if (pr->hnode != NULL)
678     {
679       GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
680                                          pr->hnode);
681       pr->hnode = NULL;
682     }
683   if (NULL == pr->client_request_list)
684     {
685       GNUNET_STATISTICS_update (stats,
686                                 gettext_noop ("# P2P searches active"),
687                                 -1,
688                                 GNUNET_NO);
689     }
690   else
691     {
692       GNUNET_STATISTICS_update (stats,
693                                 gettext_noop ("# client searches active"),
694                                 -1,
695                                 GNUNET_NO);
696     }
697   /* might have already been removed from map in 'process_reply' (if
698      there was a unique reply) or never inserted if it was a
699      duplicate; hence ignore the return value here */
700   (void) GNUNET_CONTAINER_multihashmap_remove (query_request_map,
701                                                &pr->query,
702                                                pr);
703   if (pr->qe != NULL)
704      {
705       GNUNET_DATASTORE_cancel (pr->qe);
706       pr->qe = NULL;
707     }
708   if (pr->client_request_list != NULL)
709     {
710       GNUNET_CONTAINER_DLL_remove (pr->client_request_list->client_list->rl_head,
711                                    pr->client_request_list->client_list->rl_tail,
712                                    pr->client_request_list);
713       GNUNET_free (pr->client_request_list);
714       pr->client_request_list = NULL;
715     }
716   if (pr->cp != NULL)
717     {
718       GNUNET_PEER_resolve (pr->cp->pid,
719                            &pid);
720       (void) GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
721                                                    &pid.hashPubKey,
722                                                    pr);
723       pr->cp = NULL;
724     }
725   if (pr->bf != NULL)
726     {
727       GNUNET_CONTAINER_bloomfilter_free (pr->bf);                                        
728       pr->bf = NULL;
729     }
730   if (pr->irc != NULL)
731     {
732       GNUNET_CORE_peer_change_preference_cancel (pr->irc);
733       pr->irc = NULL;
734     }
735   if (pr->replies_seen != NULL)
736     {
737       GNUNET_free (pr->replies_seen);
738       pr->replies_seen = NULL;
739     }
740   if (pr->task != GNUNET_SCHEDULER_NO_TASK)
741     {
742       GNUNET_SCHEDULER_cancel (sched,
743                                pr->task);
744       pr->task = GNUNET_SCHEDULER_NO_TASK;
745     }
746   while (NULL != pr->pending_head)    
747     destroy_pending_message_list_entry (pr->pending_head);
748   GNUNET_PEER_change_rc (pr->target_pid, -1);
749   if (pr->used_pids != NULL)
750     {
751       GNUNET_PEER_decrement_rcs (pr->used_pids, pr->used_pids_off);
752       GNUNET_free (pr->used_pids);
753       pr->used_pids_off = 0;
754       pr->used_pids_size = 0;
755       pr->used_pids = NULL;
756     }
757   GNUNET_free (pr);
758 }
759
760
761 /**
762  * Method called whenever a given peer connects.
763  *
764  * @param cls closure, not used
765  * @param peer peer identity this notification is about
766  * @param latency reported latency of the connection with 'other'
767  * @param distance reported distance (DV) to 'other' 
768  */
769 static void 
770 peer_connect_handler (void *cls,
771                       const struct
772                       GNUNET_PeerIdentity * peer,
773                       struct GNUNET_TIME_Relative latency,
774                       uint32_t distance)
775 {
776   struct ConnectedPeer *cp;
777
778   cp = GNUNET_malloc (sizeof (struct ConnectedPeer));
779   cp->pid = GNUNET_PEER_intern (peer);
780   GNUNET_break (GNUNET_OK ==
781                 GNUNET_CONTAINER_multihashmap_put (connected_peers,
782                                                    &peer->hashPubKey,
783                                                    cp,
784                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
785 }
786
787
788 /**
789  * Free (each) request made by the peer.
790  *
791  * @param cls closure, points to peer that the request belongs to
792  * @param key current key code
793  * @param value value in the hash map
794  * @return GNUNET_YES (we should continue to iterate)
795  */
796 static int
797 destroy_request (void *cls,
798                  const GNUNET_HashCode * key,
799                  void *value)
800 {
801   const struct GNUNET_PeerIdentity * peer = cls;
802   struct PendingRequest *pr = value;
803   
804   GNUNET_break (GNUNET_YES ==
805                 GNUNET_CONTAINER_multihashmap_remove (peer_request_map,
806                                                       &peer->hashPubKey,
807                                                       pr));
808   destroy_pending_request (pr);
809   return GNUNET_YES;
810 }
811
812
813 /**
814  * Method called whenever a peer disconnects.
815  *
816  * @param cls closure, not used
817  * @param peer peer identity this notification is about
818  */
819 static void
820 peer_disconnect_handler (void *cls,
821                          const struct
822                          GNUNET_PeerIdentity * peer)
823 {
824   struct ConnectedPeer *cp;
825   struct PendingMessage *pm;
826   unsigned int i;
827
828   GNUNET_CONTAINER_multihashmap_get_multiple (peer_request_map,
829                                               &peer->hashPubKey,
830                                               &destroy_request,
831                                               (void*) peer);
832   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
833                                           &peer->hashPubKey);
834   if (cp == NULL)
835     return;
836   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
837     {
838       if (NULL != cp->last_client_replies[i])
839         {
840           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
841           cp->last_client_replies[i] = NULL;
842         }
843     }
844   GNUNET_break (GNUNET_YES ==
845                 GNUNET_CONTAINER_multihashmap_remove (connected_peers,
846                                                       &peer->hashPubKey,
847                                                       cp));
848   GNUNET_PEER_change_rc (cp->pid, -1);
849   GNUNET_PEER_decrement_rcs (cp->last_p2p_replies, P2P_SUCCESS_LIST_SIZE);
850   if (NULL != cp->cth)
851     GNUNET_CORE_notify_transmit_ready_cancel (cp->cth);
852   while (NULL != (pm = cp->pending_messages_head))
853     destroy_pending_message (pm, 0 /* delivery failed */);
854   GNUNET_break (0 == cp->pending_requests);
855   GNUNET_free (cp);
856 }
857
858
859 /**
860  * Iterator over hash map entries that removes all occurences
861  * of the given 'client' from the 'last_client_replies' of the
862  * given connected peer.
863  *
864  * @param cls closure, the 'struct GNUNET_SERVER_Client*' to remove
865  * @param key current key code (unused)
866  * @param value value in the hash map (the 'struct ConnectedPeer*' to change)
867  * @return GNUNET_YES (we should continue to iterate)
868  */
869 static int
870 remove_client_from_last_client_replies (void *cls,
871                                         const GNUNET_HashCode * key,
872                                         void *value)
873 {
874   struct GNUNET_SERVER_Client *client = cls;
875   struct ConnectedPeer *cp = value;
876   unsigned int i;
877
878   for (i=0;i<CS2P_SUCCESS_LIST_SIZE;i++)
879     {
880       if (cp->last_client_replies[i] == client)
881         {
882           GNUNET_SERVER_client_drop (cp->last_client_replies[i]);
883           cp->last_client_replies[i] = NULL;
884         }
885     }  
886   return GNUNET_YES;
887 }
888
889
890 /**
891  * A client disconnected.  Remove all of its pending queries.
892  *
893  * @param cls closure, NULL
894  * @param client identification of the client
895  */
896 static void
897 handle_client_disconnect (void *cls,
898                           struct GNUNET_SERVER_Client
899                           * client)
900 {
901   struct ClientList *pos;
902   struct ClientList *prev;
903   struct ClientRequestList *rcl;
904   struct ClientResponseMessage *creply;
905
906   if (client == NULL)
907     return;
908   prev = NULL;
909   pos = client_list;
910   while ( (NULL != pos) &&
911           (pos->client != client) )
912     {
913       prev = pos;
914       pos = pos->next;
915     }
916   if (pos == NULL)
917     return; /* no requests pending for this client */
918   while (NULL != (rcl = pos->rl_head))
919     {
920       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
921                   "Destroying pending request `%s' on disconnect\n",
922                   GNUNET_h2s (&rcl->req->query));
923       destroy_pending_request (rcl->req);
924     }
925   if (prev == NULL)
926     client_list = pos->next;
927   else
928     prev->next = pos->next;
929   if (pos->th != NULL)
930     {
931       GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->th);
932       pos->th = NULL;
933     }
934   while (NULL != (creply = pos->res_head))
935     {
936       GNUNET_CONTAINER_DLL_remove (pos->res_head,
937                                    pos->res_tail,
938                                    creply);
939       GNUNET_free (creply);
940     }    
941   GNUNET_SERVER_client_drop (pos->client);
942   GNUNET_free (pos);
943   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
944                                          &remove_client_from_last_client_replies,
945                                          client);
946 }
947
948
949 /**
950  * Iterator to free peer entries.
951  *
952  * @param cls closure, unused
953  * @param key current key code
954  * @param value value in the hash map (peer entry)
955  * @return GNUNET_YES (we should continue to iterate)
956  */
957 static int 
958 clean_peer (void *cls,
959             const GNUNET_HashCode * key,
960             void *value)
961 {
962   peer_disconnect_handler (NULL, (const struct GNUNET_PeerIdentity*) key);
963   return GNUNET_YES;
964 }
965
966
967 /**
968  * Task run during shutdown.
969  *
970  * @param cls unused
971  * @param tc unused
972  */
973 static void
974 shutdown_task (void *cls,
975                const struct GNUNET_SCHEDULER_TaskContext *tc)
976 {
977   while (client_list != NULL)
978     handle_client_disconnect (NULL,
979                               client_list->client);
980   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
981                                          &clean_peer,
982                                          NULL);
983   GNUNET_break (0 == GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap));
984   GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
985   requests_by_expiration_heap = 0;
986   GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
987   connected_peers = NULL;
988   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (query_request_map));
989   GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
990   query_request_map = NULL;
991   GNUNET_break (0 == GNUNET_CONTAINER_multihashmap_size (peer_request_map));
992   GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
993   peer_request_map = NULL;
994   GNUNET_assert (NULL != core);
995   GNUNET_CORE_disconnect (core);
996   core = NULL;
997   if (stats != NULL)
998     {
999       GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
1000       stats = NULL;
1001     }
1002   GNUNET_DATASTORE_disconnect (dsh,
1003                                GNUNET_NO);
1004   dsh = NULL;
1005   sched = NULL;
1006   cfg = NULL;  
1007 }
1008
1009
1010 /* ******************* Utility functions  ******************** */
1011
1012
1013 /**
1014  * Transmit the given message by copying it to the target buffer
1015  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
1016  * for writing in the meantime.  In that case, do nothing
1017  * (the disconnect or shutdown handler will take care of the rest).
1018  * If we were able to transmit messages and there are still more
1019  * pending, ask core again for further calls to this function.
1020  *
1021  * @param cls closure, pointer to the 'struct ConnectedPeer*'
1022  * @param size number of bytes available in buf
1023  * @param buf where the callee should write the message
1024  * @return number of bytes written to buf
1025  */
1026 static size_t
1027 transmit_to_peer (void *cls,
1028                   size_t size, void *buf)
1029 {
1030   struct ConnectedPeer *cp = cls;
1031   char *cbuf = buf;
1032   struct GNUNET_PeerIdentity pid;
1033   struct PendingMessage *pm;
1034   size_t msize;
1035  
1036   cp->cth = NULL;
1037   if (NULL == buf)
1038     {
1039 #if DEBUG_FS
1040       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1041                   "Dropping message, core too busy.\n");
1042 #endif
1043       return 0;
1044     }
1045   msize = 0;
1046   while ( (NULL != (pm = cp->pending_messages_head) ) &&
1047           (pm->msize <= size) )
1048     {
1049       memcpy (&cbuf[msize], &pm[1], pm->msize);
1050       msize += pm->msize;
1051       size -= pm->msize;
1052       destroy_pending_message (pm, cp->pid);
1053     }
1054   if (NULL != pm)
1055     {
1056       GNUNET_PEER_resolve (cp->pid,
1057                            &pid);
1058       cp->cth = GNUNET_CORE_notify_transmit_ready (core,
1059                                                    pm->priority,
1060                                                    GNUNET_CONSTANTS_SERVICE_TIMEOUT,
1061                                                    &pid,
1062                                                    pm->msize,
1063                                                    &transmit_to_peer,
1064                                                    cp);
1065     }
1066 #if DEBUG_FS
1067   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1068               "Transmitting %u bytes to peer %u.\n",
1069               msize,
1070               cp->pid);
1071 #endif
1072   return msize;
1073 }
1074
1075
1076 /**
1077  * Add a message to the set of pending messages for the given peer.
1078  *
1079  * @param cp peer to send message to
1080  * @param pm message to queue
1081  * @param pr request on which behalf this message is being queued
1082  */
1083 static void
1084 add_to_pending_messages_for_peer (struct ConnectedPeer *cp,
1085                                   struct PendingMessage *pm,
1086                                   struct PendingRequest *pr)
1087 {
1088   struct PendingMessage *pos;
1089   struct PendingMessageList *pml;
1090   struct GNUNET_PeerIdentity pid;
1091
1092   GNUNET_assert (pm->next == NULL);
1093   GNUNET_assert (pm->pml == NULL);    
1094   pml = GNUNET_malloc (sizeof (struct PendingMessageList));
1095   pml->req = pr;
1096   pml->target = cp;
1097   pml->pm = pm;
1098   pm->pml = pml;  
1099   GNUNET_CONTAINER_DLL_insert (pr->pending_head,
1100                                pr->pending_tail,
1101                                pml);
1102   pos = cp->pending_messages_head;
1103   while ( (pos != NULL) &&
1104           (pm->priority < pos->priority) )
1105     pos = pos->next;    
1106   GNUNET_CONTAINER_DLL_insert_after (cp->pending_messages_head,
1107                                      cp->pending_messages_tail,
1108                                      pos,
1109                                      pm);
1110   cp->pending_requests++;
1111   if (cp->pending_requests > MAX_QUEUE_PER_PEER)
1112     destroy_pending_message (cp->pending_messages_tail, 0);  
1113   if (cp->cth == NULL)
1114     {
1115       /* need to schedule transmission */
1116       GNUNET_PEER_resolve (cp->pid, &pid);
1117       cp->cth = GNUNET_CORE_notify_transmit_ready (core,
1118                                                    cp->pending_messages_head->priority,
1119                                                    MAX_TRANSMIT_DELAY,
1120                                                    &pid,
1121                                                    cp->pending_messages_head->msize,
1122                                                    &transmit_to_peer,
1123                                                    cp);
1124     }
1125   if (cp->cth == NULL)
1126     {
1127 #if DEBUG_FS
1128       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1129                   "Failed to schedule transmission with core!\n");
1130 #endif
1131       /* FIXME: call stats (rare, bad case) */
1132     }
1133 }
1134
1135
1136 /**
1137  * Mingle hash with the mingle_number to produce different bits.
1138  */
1139 static void
1140 mingle_hash (const GNUNET_HashCode * in,
1141              int32_t mingle_number, 
1142              GNUNET_HashCode * hc)
1143 {
1144   GNUNET_HashCode m;
1145
1146   GNUNET_CRYPTO_hash (&mingle_number, 
1147                       sizeof (int32_t), 
1148                       &m);
1149   GNUNET_CRYPTO_hash_xor (&m, in, hc);
1150 }
1151
1152
1153 /**
1154  * Test if the load on this peer is too high
1155  * to even consider processing the query at
1156  * all.
1157  * 
1158  * @return GNUNET_YES if the load is too high, GNUNET_NO otherwise
1159  */
1160 static int
1161 test_load_too_high ()
1162 {
1163   return GNUNET_NO; // FIXME
1164 }
1165
1166
1167 /* ******************* Pending Request Refresh Task ******************** */
1168
1169
1170
1171 /**
1172  * We use a random delay to make the timing of requests less
1173  * predictable.  This function returns such a random delay.  We add a base
1174  * delay of MAX_CORK_DELAY (1s).
1175  *
1176  * FIXME: make schedule dependent on the specifics of the request?
1177  * Or bandwidth and number of connected peers and load?
1178  *
1179  * @return random delay to use for some request, between 1s and 1000+TTL_DECREMENT ms
1180  */
1181 static struct GNUNET_TIME_Relative
1182 get_processing_delay ()
1183 {
1184   return 
1185     GNUNET_TIME_relative_add (GNUNET_CONSTANTS_MAX_CORK_DELAY,
1186                               GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
1187                                                              GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1188                                                                                        TTL_DECREMENT)));
1189 }
1190
1191
1192 /**
1193  * We're processing a GET request from another peer and have decided
1194  * to forward it to other peers.  This function is called periodically
1195  * and should forward the request to other peers until we have all
1196  * possible replies.  If we have transmitted the *only* reply to
1197  * the initiator we should destroy the pending request.  If we have
1198  * many replies in the queue to the initiator, we should delay sending
1199  * out more queries until the reply queue has shrunk some.
1200  *
1201  * @param cls our "struct ProcessGetContext *"
1202  * @param tc unused
1203  */
1204 static void
1205 forward_request_task (void *cls,
1206                       const struct GNUNET_SCHEDULER_TaskContext *tc);
1207
1208
1209 /**
1210  * Function called after we either failed or succeeded
1211  * at transmitting a query to a peer.  
1212  *
1213  * @param cls the requests "struct PendingRequest*"
1214  * @param tpid ID of receiving peer, 0 on transmission error
1215  */
1216 static void
1217 transmit_query_continuation (void *cls,
1218                              GNUNET_PEER_Id tpid)
1219 {
1220   struct PendingRequest *pr = cls;
1221
1222   GNUNET_STATISTICS_update (stats,
1223                             gettext_noop ("# queries scheduled for forwarding"),
1224                             -1,
1225                             GNUNET_NO);
1226   if (tpid == 0)   
1227     {
1228 #if DEBUG_FS
1229       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1230                   "Transmission of request failed, will try again later.\n");
1231 #endif
1232       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
1233         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1234                                                  get_processing_delay (),
1235                                                  &forward_request_task,
1236                                                  pr); 
1237       return;    
1238     }
1239   GNUNET_STATISTICS_update (stats,
1240                             gettext_noop ("# queries forwarded"),
1241                             1,
1242                             GNUNET_NO);
1243   GNUNET_PEER_change_rc (tpid, 1);
1244   if (pr->used_pids_off == pr->used_pids_size)
1245     GNUNET_array_grow (pr->used_pids,
1246                        pr->used_pids_size,
1247                        pr->used_pids_size * 2 + 2);
1248   pr->used_pids[pr->used_pids_off++] = tpid;
1249   if (pr->task == GNUNET_SCHEDULER_NO_TASK)
1250     pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1251                                              get_processing_delay (),
1252                                              &forward_request_task,
1253                                              pr);
1254 }
1255
1256
1257 /**
1258  * How many bytes should a bloomfilter be if we have already seen
1259  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
1260  * of bits set per entry.  Furthermore, we should not re-size the
1261  * filter too often (to keep it cheap).
1262  *
1263  * Since other peers will also add entries but not resize the filter,
1264  * we should generally pick a slightly larger size than what the
1265  * strict math would suggest.
1266  *
1267  * @return must be a power of two and smaller or equal to 2^15.
1268  */
1269 static size_t
1270 compute_bloomfilter_size (unsigned int entry_count)
1271 {
1272   size_t size;
1273   unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
1274   uint16_t max = 1 << 15;
1275
1276   if (entry_count > max)
1277     return max;
1278   size = 8;
1279   while ((size < max) && (size < ideal))
1280     size *= 2;
1281   if (size > max)
1282     return max;
1283   return size;
1284 }
1285
1286
1287 /**
1288  * Recalculate our bloom filter for filtering replies.  This function
1289  * will create a new bloom filter from scratch, so it should only be
1290  * called if we have no bloomfilter at all (and hence can create a
1291  * fresh one of minimal size without problems) OR if our peer is the
1292  * initiator (in which case we may resize to larger than mimimum size).
1293  *
1294  * @param pr request for which the BF is to be recomputed
1295  */
1296 static void
1297 refresh_bloomfilter (struct PendingRequest *pr)
1298 {
1299   unsigned int i;
1300   size_t nsize;
1301   GNUNET_HashCode mhash;
1302
1303   nsize = compute_bloomfilter_size (pr->replies_seen_off);
1304   if (nsize == pr->bf_size)
1305     return; /* size not changed */
1306   if (pr->bf != NULL)
1307     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
1308   pr->bf_size = nsize;
1309   pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, -1);
1310   pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
1311                                               pr->bf_size,
1312                                               BLOOMFILTER_K);
1313   for (i=0;i<pr->replies_seen_off;i++)
1314     {
1315       mingle_hash (&pr->replies_seen[i], pr->mingle, &mhash);
1316       GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
1317     }
1318 }
1319
1320
1321 /**
1322  * Function called after we've tried to reserve a certain amount of
1323  * bandwidth for a reply.  Check if we succeeded and if so send our
1324  * query.
1325  *
1326  * @param cls the requests "struct PendingRequest*"
1327  * @param peer identifies the peer
1328  * @param bpm_in set to the current bandwidth limit (receiving) for this peer
1329  * @param bpm_out set to the current bandwidth limit (sending) for this peer
1330  * @param amount set to the amount that was actually reserved or unreserved
1331  * @param preference current traffic preference for the given peer
1332  */
1333 static void
1334 target_reservation_cb (void *cls,
1335                        const struct
1336                        GNUNET_PeerIdentity * peer,
1337                        struct GNUNET_BANDWIDTH_Value32NBO bpm_in,
1338                        struct GNUNET_BANDWIDTH_Value32NBO bpm_out,
1339                        int amount,
1340                        uint64_t preference)
1341 {
1342   struct PendingRequest *pr = cls;
1343   struct ConnectedPeer *cp;
1344   struct PendingMessage *pm;
1345   struct GetMessage *gm;
1346   GNUNET_HashCode *ext;
1347   char *bfdata;
1348   size_t msize;
1349   unsigned int k;
1350   int no_route;
1351   uint32_t bm;
1352
1353   pr->irc = NULL;
1354   if (peer == NULL)
1355     {
1356       /* error in communication with core, try again later */
1357       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
1358         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1359                                                  get_processing_delay (),
1360                                                  &forward_request_task,
1361                                                  pr);
1362       return;
1363     }
1364   // (3) transmit, update ttl/priority
1365   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1366                                           &peer->hashPubKey);
1367   if (cp == NULL)
1368     {
1369       /* Peer must have just left */
1370 #if DEBUG_FS
1371       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1372                   "Selected peer disconnected!\n");
1373 #endif
1374       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
1375         pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1376                                                  get_processing_delay (),
1377                                                  &forward_request_task,
1378                                                  pr);
1379       return;
1380     }
1381   no_route = GNUNET_NO;
1382   /* FIXME: check against DBLOCK_SIZE and possibly return
1383      amount to reserve; however, this also needs to work
1384      with testcases which currently start out with a far
1385      too low per-peer bw limit, so they would never send
1386      anything.  Big issue. */
1387   if (amount == 0)
1388     {
1389       if (pr->cp == NULL)
1390         {
1391 #if DEBUG_FS > 1
1392           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1393                       "Failed to reserve bandwidth for reply (got %d/%u bytes only)!\n",
1394                       amount,
1395                       DBLOCK_SIZE);
1396 #endif
1397           GNUNET_STATISTICS_update (stats,
1398                                     gettext_noop ("# reply bandwidth reservation requests failed"),
1399                                     1,
1400                                     GNUNET_NO);
1401           if (pr->task == GNUNET_SCHEDULER_NO_TASK)
1402             pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1403                                                      get_processing_delay (),
1404                                                      &forward_request_task,
1405                                                      pr);
1406           return;  /* this target round failed */
1407         }
1408       /* FIXME: if we are "quite" busy, we may still want to skip
1409          this round; need more load detection code! */
1410       no_route = GNUNET_YES;
1411     }
1412   
1413   GNUNET_STATISTICS_update (stats,
1414                             gettext_noop ("# queries scheduled for forwarding"),
1415                             1,
1416                             GNUNET_NO);
1417   /* build message and insert message into priority queue */
1418 #if DEBUG_FS
1419   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1420               "Forwarding request `%s' to `%4s'!\n",
1421               GNUNET_h2s (&pr->query),
1422               GNUNET_i2s (peer));
1423 #endif
1424   k = 0;
1425   bm = 0;
1426   if (GNUNET_YES == no_route)
1427     {
1428       bm |= GET_MESSAGE_BIT_RETURN_TO;
1429       k++;      
1430     }
1431   if (pr->namespace != NULL)
1432     {
1433       bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
1434       k++;
1435     }
1436   if (pr->target_pid != 0)
1437     {
1438       bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
1439       k++;
1440     }
1441   msize = sizeof (struct GetMessage) + pr->bf_size + k * sizeof(GNUNET_HashCode);
1442   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
1443   pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
1444   pm->msize = msize;
1445   gm = (struct GetMessage*) &pm[1];
1446   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
1447   gm->header.size = htons (msize);
1448   gm->type = htonl (pr->type);
1449   pr->remaining_priority /= 2;
1450   gm->priority = htonl (pr->remaining_priority);
1451   gm->ttl = htonl (pr->ttl);
1452   gm->filter_mutator = htonl(pr->mingle); 
1453   gm->hash_bitmap = htonl (bm);
1454   gm->query = pr->query;
1455   ext = (GNUNET_HashCode*) &gm[1];
1456   k = 0;
1457   if (GNUNET_YES == no_route)
1458     GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
1459   if (pr->namespace != NULL)
1460     memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
1461   if (pr->target_pid != 0)
1462     GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
1463   bfdata = (char *) &ext[k];
1464   if (pr->bf != NULL)
1465     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
1466                                                bfdata,
1467                                                pr->bf_size);
1468   pm->cont = &transmit_query_continuation;
1469   pm->cont_cls = pr;
1470   add_to_pending_messages_for_peer (cp, pm, pr);
1471 }
1472
1473
1474 /**
1475  * Closure used for "target_peer_select_cb".
1476  */
1477 struct PeerSelectionContext 
1478 {
1479   /**
1480    * The request for which we are selecting
1481    * peers.
1482    */
1483   struct PendingRequest *pr;
1484
1485   /**
1486    * Current "prime" target.
1487    */
1488   struct GNUNET_PeerIdentity target;
1489
1490   /**
1491    * How much do we like this target?
1492    */
1493   double target_score;
1494
1495 };
1496
1497
1498 /**
1499  * Function called for each connected peer to determine
1500  * which one(s) would make good targets for forwarding.
1501  *
1502  * @param cls closure (struct PeerSelectionContext)
1503  * @param key current key code (peer identity)
1504  * @param value value in the hash map (struct ConnectedPeer)
1505  * @return GNUNET_YES if we should continue to
1506  *         iterate,
1507  *         GNUNET_NO if not.
1508  */
1509 static int
1510 target_peer_select_cb (void *cls,
1511                        const GNUNET_HashCode * key,
1512                        void *value)
1513 {
1514   struct PeerSelectionContext *psc = cls;
1515   struct ConnectedPeer *cp = value;
1516   struct PendingRequest *pr = psc->pr;
1517   double score;
1518   unsigned int i;
1519   unsigned int pc;
1520
1521   /* 1) check that this peer is not the initiator */
1522   if (cp == pr->cp)
1523     return GNUNET_YES; /* skip */          
1524
1525   /* 2) check if we have already (recently) forwarded to this peer */
1526   pc = 0;
1527   for (i=0;i<pr->used_pids_off;i++)
1528     if (pr->used_pids[i] == cp->pid) 
1529       {
1530         pc++;
1531         if (0 != GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1532                                            RETRY_PROBABILITY_INV))
1533           {
1534 #if DEBUG_FS
1535             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1536                         "NOT re-trying query that was previously transmitted %u times\n",
1537                         (unsigned int) pr->used_pids_off);
1538 #endif
1539             return GNUNET_YES; /* skip */
1540           }
1541       }
1542 #if DEBUG_FS
1543   if (0 < pc)
1544     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1545                 "Re-trying query that was previously transmitted %u times to this peer\n",
1546                 (unsigned int) pc);
1547 #endif
1548   // 3) calculate how much we'd like to forward to this peer
1549   score = 42; // FIXME!
1550   // FIXME: also need API to gather data on responsiveness
1551   // of this peer (we have fields for that in 'cp', but
1552   // they are never set!)
1553   
1554   /* store best-fit in closure */
1555   if (score > psc->target_score)
1556     {
1557       psc->target_score = score;
1558       psc->target.hashPubKey = *key; 
1559     }
1560   return GNUNET_YES;
1561 }
1562   
1563
1564 /**
1565  * The priority level imposes a bound on the maximum
1566  * value for the ttl that can be requested.
1567  *
1568  * @param ttl_in requested ttl
1569  * @param prio given priority
1570  * @return ttl_in if ttl_in is below the limit,
1571  *         otherwise the ttl-limit for the given priority
1572  */
1573 static int32_t
1574 bound_ttl (int32_t ttl_in, uint32_t prio)
1575 {
1576   unsigned long long allowed;
1577
1578   if (ttl_in <= 0)
1579     return ttl_in;
1580   allowed = ((unsigned long long) prio) * TTL_DECREMENT / 1000; 
1581   if (ttl_in > allowed)      
1582     {
1583       if (allowed >= (1 << 30))
1584         return 1 << 30;
1585       return allowed;
1586     }
1587   return ttl_in;
1588 }
1589
1590
1591 /**
1592  * We're processing a GET request from another peer and have decided
1593  * to forward it to other peers.  This function is called periodically
1594  * and should forward the request to other peers until we have all
1595  * possible replies.  If we have transmitted the *only* reply to
1596  * the initiator we should destroy the pending request.  If we have
1597  * many replies in the queue to the initiator, we should delay sending
1598  * out more queries until the reply queue has shrunk some.
1599  *
1600  * @param cls our "struct ProcessGetContext *"
1601  * @param tc unused
1602  */
1603 static void
1604 forward_request_task (void *cls,
1605                      const struct GNUNET_SCHEDULER_TaskContext *tc)
1606 {
1607   struct PendingRequest *pr = cls;
1608   struct PeerSelectionContext psc;
1609   struct ConnectedPeer *cp; 
1610   struct GNUNET_TIME_Relative delay;
1611
1612   pr->task = GNUNET_SCHEDULER_NO_TASK;
1613   if (pr->irc != NULL)
1614     {
1615 #if DEBUG_FS
1616       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1617                   "Forwarding of query `%s' not attempted due to pending local lookup!\n",
1618                   GNUNET_h2s (&pr->query));
1619 #endif
1620       return; /* already pending */
1621     }
1622   if (GNUNET_YES == pr->local_only)
1623     return; /* configured to not do P2P search */
1624   /* (1) select target */
1625   psc.pr = pr;
1626   psc.target_score = DBL_MIN;
1627   GNUNET_CONTAINER_multihashmap_iterate (connected_peers,
1628                                          &target_peer_select_cb,
1629                                          &psc);  
1630   if (psc.target_score == DBL_MIN)
1631     {
1632       delay = get_processing_delay ();
1633 #if DEBUG_FS 
1634       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1635                   "No peer selected for forwarding of query `%s', will try again in %llu ms!\n",
1636                   GNUNET_h2s (&pr->query),
1637                   delay.value);
1638 #endif
1639       pr->task = GNUNET_SCHEDULER_add_delayed (sched,
1640                                                delay,
1641                                                &forward_request_task,
1642                                                pr);
1643       return; /* nobody selected */
1644     }
1645   /* (3) update TTL/priority */
1646   
1647   if (pr->client_request_list != NULL)
1648     {
1649       /* FIXME: use better algorithm!? */
1650       if (0 == GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1651                                          4))
1652         pr->priority++;
1653       /* FIXME: bound priority by "customary" priority used by other peers
1654          at this time! */
1655       pr->ttl = bound_ttl (pr->ttl + TTL_DECREMENT * 2,
1656                            pr->priority);
1657 #if DEBUG_FS
1658       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1659                   "Trying query `%s' with priority %u and TTL %d.\n",
1660                   GNUNET_h2s (&pr->query),
1661                   pr->priority,
1662                   pr->ttl);
1663 #endif
1664     }
1665   else
1666     {
1667       /* FIXME: should we do something here as well!? */
1668     }
1669
1670   /* (3) reserve reply bandwidth */
1671   cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
1672                                           &psc.target.hashPubKey);
1673   GNUNET_assert (NULL != cp);
1674   pr->irc = GNUNET_CORE_peer_change_preference (sched, cfg,
1675                                                 &psc.target,
1676                                                 GNUNET_CONSTANTS_SERVICE_TIMEOUT, 
1677                                                 GNUNET_BANDWIDTH_value_init ((uint32_t) -1 /* no limit */), 
1678                                                 DBLOCK_SIZE * 2, 
1679                                                 (uint64_t) cp->inc_preference,
1680                                                 &target_reservation_cb,
1681                                                 pr);
1682   cp->inc_preference = 0.0;
1683 }
1684
1685
1686 /* **************************** P2P PUT Handling ************************ */
1687
1688
1689 /**
1690  * Function called after we either failed or succeeded
1691  * at transmitting a reply to a peer.  
1692  *
1693  * @param cls the requests "struct PendingRequest*"
1694  * @param tpid ID of receiving peer, 0 on transmission error
1695  */
1696 static void
1697 transmit_reply_continuation (void *cls,
1698                              GNUNET_PEER_Id tpid)
1699 {
1700   struct PendingRequest *pr = cls;
1701   
1702   switch (pr->type)
1703     {
1704     case GNUNET_BLOCK_TYPE_DBLOCK:
1705     case GNUNET_BLOCK_TYPE_IBLOCK:
1706       /* only one reply expected, done with the request! */
1707       destroy_pending_request (pr);
1708       break;
1709     case GNUNET_BLOCK_TYPE_ANY:
1710     case GNUNET_BLOCK_TYPE_KBLOCK:
1711     case GNUNET_BLOCK_TYPE_SBLOCK:
1712       break;
1713     default:
1714       GNUNET_break (0);
1715       break;
1716     }
1717 }
1718
1719
1720 /**
1721  * Transmit the given message by copying it to the target buffer
1722  * "buf".  "buf" will be NULL and "size" zero if the socket was closed
1723  * for writing in the meantime.  In that case, do nothing
1724  * (the disconnect or shutdown handler will take care of the rest).
1725  * If we were able to transmit messages and there are still more
1726  * pending, ask core again for further calls to this function.
1727  *
1728  * @param cls closure, pointer to the 'struct ClientList*'
1729  * @param size number of bytes available in buf
1730  * @param buf where the callee should write the message
1731  * @return number of bytes written to buf
1732  */
1733 static size_t
1734 transmit_to_client (void *cls,
1735                   size_t size, void *buf)
1736 {
1737   struct ClientList *cl = cls;
1738   char *cbuf = buf;
1739   struct ClientResponseMessage *creply;
1740   size_t msize;
1741   
1742   cl->th = NULL;
1743   if (NULL == buf)
1744     {
1745 #if DEBUG_FS
1746       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1747                   "Not sending reply, client communication problem.\n");
1748 #endif
1749       return 0;
1750     }
1751   msize = 0;
1752   while ( (NULL != (creply = cl->res_head) ) &&
1753           (creply->msize <= size) )
1754     {
1755       memcpy (&cbuf[msize], &creply[1], creply->msize);
1756       msize += creply->msize;
1757       size -= creply->msize;
1758       GNUNET_CONTAINER_DLL_remove (cl->res_head,
1759                                    cl->res_tail,
1760                                    creply);
1761       GNUNET_free (creply);
1762     }
1763   if (NULL != creply)
1764     cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
1765                                                   creply->msize,
1766                                                   GNUNET_TIME_UNIT_FOREVER_REL,
1767                                                   &transmit_to_client,
1768                                                   cl);
1769 #if DEBUG_FS
1770   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1771               "Transmitted %u bytes to client\n",
1772               (unsigned int) msize);
1773 #endif
1774   return msize;
1775 }
1776
1777
1778 /**
1779  * Closure for "process_reply" function.
1780  */
1781 struct ProcessReplyClosure
1782 {
1783   /**
1784    * The data for the reply.
1785    */
1786   const void *data;
1787
1788   // FIXME: add 'struct ConnectedPeer' to track 'last_xxx_replies' here!
1789
1790   /**
1791    * When the reply expires.
1792    */
1793   struct GNUNET_TIME_Absolute expiration;
1794
1795   /**
1796    * Size of data.
1797    */
1798   size_t size;
1799
1800   /**
1801    * Namespace that this reply belongs to
1802    * (if it is of type SBLOCK).
1803    */
1804   GNUNET_HashCode namespace;
1805
1806   /**
1807    * Type of the block.
1808    */
1809   enum GNUNET_BLOCK_Type type;
1810
1811   /**
1812    * How much was this reply worth to us?
1813    */
1814   uint32_t priority;
1815 };
1816
1817
1818 /**
1819  * We have received a reply; handle it!
1820  *
1821  * @param cls response (struct ProcessReplyClosure)
1822  * @param key our query
1823  * @param value value in the hash map (info about the query)
1824  * @return GNUNET_YES (we should continue to iterate)
1825  */
1826 static int
1827 process_reply (void *cls,
1828                const GNUNET_HashCode * key,
1829                void *value)
1830 {
1831   struct ProcessReplyClosure *prq = cls;
1832   struct PendingRequest *pr = value;
1833   struct PendingMessage *reply;
1834   struct ClientResponseMessage *creply;
1835   struct ClientList *cl;
1836   struct PutMessage *pm;
1837   struct ConnectedPeer *cp;
1838   GNUNET_HashCode chash;
1839   GNUNET_HashCode mhash;
1840   size_t msize;
1841
1842 #if DEBUG_FS
1843   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1844               "Matched result (type %u) for query `%s' with pending request\n",
1845               (unsigned int) prq->type,
1846               GNUNET_h2s (key));
1847 #endif  
1848   GNUNET_STATISTICS_update (stats,
1849                             gettext_noop ("# replies received and matched"),
1850                             1,
1851                             GNUNET_NO);
1852   GNUNET_CRYPTO_hash (prq->data,
1853                       prq->size,
1854                       &chash);
1855   switch (prq->type)
1856     {
1857     case GNUNET_BLOCK_TYPE_DBLOCK:
1858     case GNUNET_BLOCK_TYPE_IBLOCK:
1859       /* only possible reply, stop requesting! */
1860       while (NULL != pr->pending_head)
1861         destroy_pending_message_list_entry (pr->pending_head);
1862       if (pr->qe != NULL)
1863         {
1864           if (pr->client_request_list != NULL)
1865             GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
1866                                         GNUNET_YES);
1867           GNUNET_DATASTORE_cancel (pr->qe);
1868           pr->qe = NULL;
1869         }
1870       pr->do_remove = GNUNET_YES;
1871       if (pr->task != GNUNET_SCHEDULER_NO_TASK)
1872         {
1873           GNUNET_SCHEDULER_cancel (sched,
1874                                    pr->task);
1875           pr->task = GNUNET_SCHEDULER_NO_TASK;
1876         }
1877       GNUNET_break (GNUNET_YES ==
1878                     GNUNET_CONTAINER_multihashmap_remove (query_request_map,
1879                                                           key,
1880                                                           pr));
1881       break;
1882     case GNUNET_BLOCK_TYPE_SBLOCK:
1883       if (pr->namespace == NULL)
1884         {
1885           GNUNET_break (0);
1886           return GNUNET_YES;
1887         }
1888       if (0 != memcmp (pr->namespace,
1889                        &prq->namespace,
1890                        sizeof (GNUNET_HashCode)))
1891         {
1892           GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1893                       _("Reply mismatched in terms of namespace.  Discarded.\n"));
1894           return GNUNET_YES; /* wrong namespace */      
1895         }
1896       /* then: fall-through! */
1897     case GNUNET_BLOCK_TYPE_KBLOCK:
1898     case GNUNET_BLOCK_TYPE_NBLOCK:
1899       if (pr->bf != NULL) 
1900         {
1901           mingle_hash (&chash, pr->mingle, &mhash);
1902           if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (pr->bf,
1903                                                                &mhash))
1904             {
1905               GNUNET_STATISTICS_update (stats,
1906                                         gettext_noop ("# duplicate replies discarded (bloomfilter)"),
1907                                         1,
1908                                         GNUNET_NO);
1909 #if DEBUG_FS
1910               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1911                           "Duplicate response `%s', discarding.\n",
1912                           GNUNET_h2s (&mhash));
1913 #endif
1914               return GNUNET_YES; /* duplicate */
1915             }
1916 #if DEBUG_FS
1917           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1918                       "New response `%s', adding to filter.\n",
1919                       GNUNET_h2s (&mhash));
1920 #endif
1921         }
1922       if (pr->client_request_list != NULL)
1923         {
1924           if (pr->replies_seen_size == pr->replies_seen_off)
1925             GNUNET_array_grow (pr->replies_seen,
1926                                pr->replies_seen_size,
1927                                pr->replies_seen_size * 2 + 4);  
1928             pr->replies_seen[pr->replies_seen_off++] = chash;         
1929         }
1930       if ( (pr->bf == NULL) ||
1931            (pr->client_request_list != NULL) )
1932         refresh_bloomfilter (pr);
1933       GNUNET_CONTAINER_bloomfilter_add (pr->bf,
1934                                         &mhash);
1935       break;
1936     default:
1937       GNUNET_break (0);
1938       return GNUNET_YES;
1939     }
1940   prq->priority += pr->remaining_priority;
1941   pr->remaining_priority = 0;
1942   if (pr->client_request_list != NULL)
1943     {
1944       GNUNET_STATISTICS_update (stats,
1945                                 gettext_noop ("# replies received for local clients"),
1946                                 1,
1947                                 GNUNET_NO);
1948       cl = pr->client_request_list->client_list;
1949       msize = sizeof (struct PutMessage) + prq->size;
1950       creply = GNUNET_malloc (msize + sizeof (struct ClientResponseMessage));
1951       creply->msize = msize;
1952       creply->client_list = cl;
1953       GNUNET_CONTAINER_DLL_insert_after (cl->res_head,
1954                                          cl->res_tail,
1955                                          cl->res_tail,
1956                                          creply);      
1957       pm = (struct PutMessage*) &creply[1];
1958       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
1959       pm->header.size = htons (msize);
1960       pm->type = htonl (prq->type);
1961       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
1962       memcpy (&pm[1], prq->data, prq->size);      
1963       if (NULL == cl->th)
1964         {
1965 #if DEBUG_FS
1966           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1967                       "Transmitting result for query `%s' to client\n",
1968                       GNUNET_h2s (key));
1969 #endif  
1970           cl->th = GNUNET_SERVER_notify_transmit_ready (cl->client,
1971                                                         msize,
1972                                                         GNUNET_TIME_UNIT_FOREVER_REL,
1973                                                         &transmit_to_client,
1974                                                         cl);
1975         }
1976       GNUNET_break (cl->th != NULL);
1977       if (pr->do_remove)                
1978         destroy_pending_request (pr);           
1979     }
1980   else
1981     {
1982       cp = pr->cp;
1983 #if DEBUG_FS
1984       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1985                   "Transmitting result for query `%s' to other peer (PID=%u)\n",
1986                   GNUNET_h2s (key),
1987                   (unsigned int) cp->pid);
1988 #endif  
1989       GNUNET_STATISTICS_update (stats,
1990                                 gettext_noop ("# replies received for other peers"),
1991                                 1,
1992                                 GNUNET_NO);
1993       msize = sizeof (struct PutMessage) + prq->size;
1994       reply = GNUNET_malloc (msize + sizeof (struct PendingMessage));
1995       reply->cont = &transmit_reply_continuation;
1996       reply->cont_cls = pr;
1997       reply->msize = msize;
1998       reply->priority = (uint32_t) -1; /* send replies first! */
1999       pm = (struct PutMessage*) &reply[1];
2000       pm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_PUT);
2001       pm->header.size = htons (msize);
2002       pm->type = htonl (prq->type);
2003       pm->expiration = GNUNET_TIME_absolute_hton (prq->expiration);
2004       memcpy (&pm[1], prq->data, prq->size);
2005       add_to_pending_messages_for_peer (cp, reply, pr);
2006     }
2007   // FIXME: implement hot-path routing statistics keeping!
2008   return GNUNET_YES;
2009 }
2010
2011
2012
2013 /**
2014  * Continuation called to notify client about result of the
2015  * operation.
2016  *
2017  * @param cls closure
2018  * @param success GNUNET_SYSERR on failure
2019  * @param msg NULL on success, otherwise an error message
2020  */
2021 static void 
2022 put_migration_continuation (void *cls,
2023                             int success,
2024                             const char *msg)
2025 {
2026   /* FIXME */
2027 }
2028
2029
2030 /**
2031  * Handle P2P "PUT" message.
2032  *
2033  * @param cls closure, always NULL
2034  * @param other the other peer involved (sender or receiver, NULL
2035  *        for loopback messages where we are both sender and receiver)
2036  * @param message the actual message
2037  * @param latency reported latency of the connection with 'other'
2038  * @param distance reported distance (DV) to 'other' 
2039  * @return GNUNET_OK to keep the connection open,
2040  *         GNUNET_SYSERR to close it (signal serious error)
2041  */
2042 static int
2043 handle_p2p_put (void *cls,
2044                 const struct GNUNET_PeerIdentity *other,
2045                 const struct GNUNET_MessageHeader *message,
2046                 struct GNUNET_TIME_Relative latency,
2047                 uint32_t distance)
2048 {
2049   const struct PutMessage *put;
2050   uint16_t msize;
2051   size_t dsize;
2052   enum GNUNET_BLOCK_Type type;
2053   struct GNUNET_TIME_Absolute expiration;
2054   GNUNET_HashCode query;
2055   struct ProcessReplyClosure prq;
2056   const struct SBlock *sb;
2057
2058   msize = ntohs (message->size);
2059   if (msize < sizeof (struct PutMessage))
2060     {
2061       GNUNET_break_op(0);
2062       return GNUNET_SYSERR;
2063     }
2064   put = (const struct PutMessage*) message;
2065   dsize = msize - sizeof (struct PutMessage);
2066   type = ntohl (put->type);
2067   expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
2068
2069   if (GNUNET_OK !=
2070       GNUNET_BLOCK_check_block (type,
2071                                 &put[1],
2072                                 dsize,
2073                                 &query))
2074     {
2075       GNUNET_break_op (0);
2076       return GNUNET_SYSERR;
2077     }
2078   if (type == GNUNET_BLOCK_TYPE_ONDEMAND)
2079     return GNUNET_SYSERR;
2080   if (GNUNET_BLOCK_TYPE_SBLOCK == type)
2081     { 
2082       sb = (const struct SBlock*) &put[1];
2083       GNUNET_CRYPTO_hash (&sb->subspace,
2084                           sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
2085                           &prq.namespace);
2086     }
2087
2088 #if DEBUG_FS
2089   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2090               "Received result for query `%s' from peer `%4s'\n",
2091               GNUNET_h2s (&query),
2092               GNUNET_i2s (other));
2093 #endif
2094   GNUNET_STATISTICS_update (stats,
2095                             gettext_noop ("# replies received (overall)"),
2096                             1,
2097                             GNUNET_NO);
2098   /* now, lookup 'query' */
2099   prq.data = (const void*) &put[1];
2100   prq.size = dsize;
2101   prq.type = type;
2102   prq.expiration = expiration;
2103   prq.priority = 0;
2104   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
2105                                               &query,
2106                                               &process_reply,
2107                                               &prq);
2108   if (GNUNET_YES == active_migration)
2109     {
2110       GNUNET_DATASTORE_put (dsh,
2111                             0, &query, dsize, &put[1],
2112                             type, prq.priority, 1 /* anonymity */, 
2113                             expiration, 
2114                             0, 64 /* FIXME: use define */,
2115                             GNUNET_CONSTANTS_SERVICE_TIMEOUT,
2116                             &put_migration_continuation, 
2117                             NULL);
2118     }
2119   return GNUNET_OK;
2120 }
2121
2122
2123 /* **************************** P2P GET Handling ************************ */
2124
2125
2126 /**
2127  * Closure for 'check_duplicate_request_{peer,client}'.
2128  */
2129 struct CheckDuplicateRequestClosure
2130 {
2131   /**
2132    * The new request we should check if it already exists.
2133    */
2134   const struct PendingRequest *pr;
2135
2136   /**
2137    * Existing request found by the checker, NULL if none.
2138    */
2139   struct PendingRequest *have;
2140 };
2141
2142
2143 /**
2144  * Iterator over entries in the 'query_request_map' that
2145  * tries to see if we have the same request pending from
2146  * the same client already.
2147  *
2148  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
2149  * @param key current key code (query, ignored, must match)
2150  * @param value value in the hash map (a 'struct PendingRequest' 
2151  *              that already exists)
2152  * @return GNUNET_YES if we should continue to
2153  *         iterate (no match yet)
2154  *         GNUNET_NO if not (match found).
2155  */
2156 static int
2157 check_duplicate_request_client (void *cls,
2158                                 const GNUNET_HashCode * key,
2159                                 void *value)
2160 {
2161   struct CheckDuplicateRequestClosure *cdc = cls;
2162   struct PendingRequest *have = value;
2163
2164   if (have->client_request_list == NULL)
2165     return GNUNET_YES;
2166   if ( (cdc->pr->client_request_list->client_list->client == have->client_request_list->client_list->client) &&
2167        (cdc->pr != have) )
2168     {
2169       cdc->have = have;
2170       return GNUNET_NO;
2171     }
2172   return GNUNET_YES;
2173 }
2174
2175
2176 /**
2177  * We're processing (local) results for a search request
2178  * from another peer.  Pass applicable results to the
2179  * peer and if we are done either clean up (operation
2180  * complete) or forward to other peers (more results possible).
2181  *
2182  * @param cls our closure (struct LocalGetContext)
2183  * @param key key for the content
2184  * @param size number of bytes in data
2185  * @param data content stored
2186  * @param type type of the content
2187  * @param priority priority of the content
2188  * @param anonymity anonymity-level for the content
2189  * @param expiration expiration time for the content
2190  * @param uid unique identifier for the datum;
2191  *        maybe 0 if no unique identifier is available
2192  */
2193 static void
2194 process_local_reply (void *cls,
2195                      const GNUNET_HashCode * key,
2196                      uint32_t size,
2197                      const void *data,
2198                      enum GNUNET_BLOCK_Type type,
2199                      uint32_t priority,
2200                      uint32_t anonymity,
2201                      struct GNUNET_TIME_Absolute
2202                      expiration, 
2203                      uint64_t uid)
2204 {
2205   struct PendingRequest *pr = cls;
2206   struct ProcessReplyClosure prq;
2207   struct CheckDuplicateRequestClosure cdrc;
2208   const struct SBlock *sb;
2209   GNUNET_HashCode dhash;
2210   GNUNET_HashCode mhash;
2211   GNUNET_HashCode query;
2212   
2213   if (NULL == key)
2214     {
2215 #if DEBUG_FS > 1
2216       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2217                   "Done processing local replies, forwarding request to other peers.\n");
2218 #endif
2219       pr->qe = NULL;
2220       if (pr->client_request_list != NULL)
2221         {
2222           GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
2223                                       GNUNET_YES);
2224           /* Figure out if this is a duplicate request and possibly
2225              merge 'struct PendingRequest' entries */
2226           cdrc.have = NULL;
2227           cdrc.pr = pr;
2228           GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
2229                                                       &pr->query,
2230                                                       &check_duplicate_request_client,
2231                                                       &cdrc);
2232           if (cdrc.have != NULL)
2233             {
2234 #if DEBUG_FS
2235               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2236                           "Received request for block `%s' twice from client, will only request once.\n",
2237                           GNUNET_h2s (&pr->query));
2238 #endif
2239               
2240               destroy_pending_request (pr);
2241               return;
2242             }
2243         }
2244
2245       /* no more results */
2246       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2247         pr->task = GNUNET_SCHEDULER_add_now (sched,
2248                                              &forward_request_task,
2249                                              pr);      
2250       return;
2251     }
2252 #if DEBUG_FS
2253   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2254               "New local response to `%s' of type %u.\n",
2255               GNUNET_h2s (key),
2256               type);
2257 #endif
2258   if (type == GNUNET_BLOCK_TYPE_ONDEMAND)
2259     {
2260 #if DEBUG_FS
2261       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2262                   "Found ONDEMAND block, performing on-demand encoding\n");
2263 #endif
2264       GNUNET_STATISTICS_update (stats,
2265                                 gettext_noop ("# on-demand blocks matched requests"),
2266                                 1,
2267                                 GNUNET_NO);
2268       if (GNUNET_OK != 
2269           GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, 
2270                                             anonymity, expiration, uid, 
2271                                             &process_local_reply,
2272                                             pr))
2273       if (pr->qe != NULL)
2274         GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2275       return;
2276     }
2277   /* check for duplicates */
2278   GNUNET_CRYPTO_hash (data, size, &dhash);
2279   mingle_hash (&dhash, 
2280                pr->mingle,
2281                &mhash);
2282   if ( (pr->bf != NULL) &&
2283        (GNUNET_YES ==
2284         GNUNET_CONTAINER_bloomfilter_test (pr->bf,
2285                                            &mhash)) )
2286     {      
2287 #if DEBUG_FS
2288       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2289                   "Result from datastore filtered by bloomfilter (duplicate).\n");
2290 #endif
2291       GNUNET_STATISTICS_update (stats,
2292                                 gettext_noop ("# results filtered by query bloomfilter"),
2293                                 1,
2294                                 GNUNET_NO);
2295       if (pr->qe != NULL)
2296         GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2297       return;
2298     }
2299 #if DEBUG_FS
2300   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2301               "Found result for query `%s' in local datastore\n",
2302               GNUNET_h2s (key));
2303 #endif
2304   GNUNET_STATISTICS_update (stats,
2305                             gettext_noop ("# results found locally"),
2306                             1,
2307                             GNUNET_NO);
2308   pr->results_found++;
2309   memset (&prq, 0, sizeof (prq));
2310   prq.data = data;
2311   prq.expiration = expiration;
2312   prq.size = size;  
2313   if (GNUNET_BLOCK_TYPE_SBLOCK == type)
2314     { 
2315       sb = (const struct SBlock*) data;
2316       GNUNET_CRYPTO_hash (&sb->subspace,
2317                           sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
2318                           &prq.namespace);
2319     }
2320   if (GNUNET_OK != GNUNET_BLOCK_check_block (type,
2321                                              data,
2322                                              size,
2323                                              &query))
2324     {
2325       GNUNET_break (0);
2326       /* FIXME: consider removing the block? */
2327       GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2328       return;
2329     }
2330   prq.type = type;
2331   prq.priority = priority;  
2332   process_reply (&prq, key, pr);
2333
2334   if ( (type == GNUNET_BLOCK_TYPE_DBLOCK) ||
2335        (type == GNUNET_BLOCK_TYPE_IBLOCK) ) 
2336     {
2337       if (pr->qe != NULL)
2338         GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
2339       return;
2340     }
2341   if ( (pr->client_request_list == NULL) &&
2342        ( (GNUNET_YES == test_load_too_high()) ||
2343          (pr->results_found > 5 + 2 * pr->priority) ) )
2344     {
2345 #if DEBUG_FS > 2
2346       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2347                   "Load too high, done with request\n");
2348 #endif
2349       GNUNET_STATISTICS_update (stats,
2350                                 gettext_noop ("# processing result set cut short due to load"),
2351                                 1,
2352                                 GNUNET_NO);
2353       if (pr->qe != NULL)
2354         GNUNET_DATASTORE_get_next (dsh, GNUNET_NO);
2355       return;
2356     }
2357   if (pr->qe != NULL)
2358     GNUNET_DATASTORE_get_next (dsh, GNUNET_YES);
2359 }
2360
2361
2362 /**
2363  * We've received a request with the specified priority.  Bound it
2364  * according to how much we trust the given peer.
2365  * 
2366  * @param prio_in requested priority
2367  * @param cp the peer making the request
2368  * @return effective priority
2369  */
2370 static uint32_t
2371 bound_priority (uint32_t prio_in,
2372                 struct ConnectedPeer *cp)
2373 {
2374   return 0; // FIXME!
2375 }
2376
2377
2378 /**
2379  * Iterator over entries in the 'query_request_map' that
2380  * tries to see if we have the same request pending from
2381  * the same peer already.
2382  *
2383  * @param cls closure (our 'struct CheckDuplicateRequestClosure')
2384  * @param key current key code (query, ignored, must match)
2385  * @param value value in the hash map (a 'struct PendingRequest' 
2386  *              that already exists)
2387  * @return GNUNET_YES if we should continue to
2388  *         iterate (no match yet)
2389  *         GNUNET_NO if not (match found).
2390  */
2391 static int
2392 check_duplicate_request_peer (void *cls,
2393                               const GNUNET_HashCode * key,
2394                               void *value)
2395 {
2396   struct CheckDuplicateRequestClosure *cdc = cls;
2397   struct PendingRequest *have = value;
2398
2399   if (cdc->pr->target_pid == have->target_pid)
2400     {
2401       cdc->have = have;
2402       return GNUNET_NO;
2403     }
2404   return GNUNET_YES;
2405 }
2406
2407
2408 /**
2409  * Handle P2P "GET" request.
2410  *
2411  * @param cls closure, always NULL
2412  * @param other the other peer involved (sender or receiver, NULL
2413  *        for loopback messages where we are both sender and receiver)
2414  * @param message the actual message
2415  * @param latency reported latency of the connection with 'other'
2416  * @param distance reported distance (DV) to 'other' 
2417  * @return GNUNET_OK to keep the connection open,
2418  *         GNUNET_SYSERR to close it (signal serious error)
2419  */
2420 static int
2421 handle_p2p_get (void *cls,
2422                 const struct GNUNET_PeerIdentity *other,
2423                 const struct GNUNET_MessageHeader *message,
2424                 struct GNUNET_TIME_Relative latency,
2425                 uint32_t distance)
2426 {
2427   struct PendingRequest *pr;
2428   struct ConnectedPeer *cp;
2429   struct ConnectedPeer *cps;
2430   struct CheckDuplicateRequestClosure cdc;
2431   struct GNUNET_TIME_Relative timeout;
2432   uint16_t msize;
2433   const struct GetMessage *gm;
2434   unsigned int bits;
2435   const GNUNET_HashCode *opt;
2436   uint32_t bm;
2437   size_t bfsize;
2438   uint32_t ttl_decrement;
2439   enum GNUNET_BLOCK_Type type;
2440   double preference;
2441   int have_ns;
2442
2443   msize = ntohs(message->size);
2444   if (msize < sizeof (struct GetMessage))
2445     {
2446       GNUNET_break_op (0);
2447       return GNUNET_SYSERR;
2448     }
2449   gm = (const struct GetMessage*) message;
2450   type = ntohl (gm->type);
2451   switch (type)
2452     {
2453     case GNUNET_BLOCK_TYPE_ANY:
2454     case GNUNET_BLOCK_TYPE_DBLOCK:
2455     case GNUNET_BLOCK_TYPE_IBLOCK:
2456     case GNUNET_BLOCK_TYPE_KBLOCK:
2457     case GNUNET_BLOCK_TYPE_SBLOCK:
2458       break;
2459     default:
2460       GNUNET_break_op (0);
2461       return GNUNET_SYSERR;
2462     }
2463   bm = ntohl (gm->hash_bitmap);
2464   bits = 0;
2465   while (bm > 0)
2466     {
2467       if (1 == (bm & 1))
2468         bits++;
2469       bm >>= 1;
2470     }
2471   if (msize < sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode))
2472     {
2473       GNUNET_break_op (0);
2474       return GNUNET_SYSERR;
2475     }  
2476   opt = (const GNUNET_HashCode*) &gm[1];
2477   bfsize = msize - sizeof (struct GetMessage) + bits * sizeof (GNUNET_HashCode);
2478   bm = ntohl (gm->hash_bitmap);
2479   if ( (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE)) &&
2480        (type != GNUNET_BLOCK_TYPE_SBLOCK) )
2481     {
2482       GNUNET_break_op (0);
2483       return GNUNET_SYSERR;      
2484     }
2485   bits = 0;
2486   cps = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2487                                            &other->hashPubKey);
2488   if (NULL == cps)
2489     {
2490       /* peer must have just disconnected */
2491       GNUNET_STATISTICS_update (stats,
2492                                 gettext_noop ("# requests dropped due to initiator not being connected"),
2493                                 1,
2494                                 GNUNET_NO);
2495       return GNUNET_SYSERR;
2496     }
2497   if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
2498     cp = GNUNET_CONTAINER_multihashmap_get (connected_peers,
2499                                             &opt[bits++]);
2500   else
2501     cp = cps;
2502   if (cp == NULL)
2503     {
2504 #if DEBUG_FS
2505       if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
2506         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2507                     "Failed to find RETURN-TO peer `%4s' in connection set. Dropping query.\n",
2508                     GNUNET_i2s ((const struct GNUNET_PeerIdentity*) &opt[bits-1]));
2509       
2510       else
2511         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2512                     "Failed to find peer `%4s' in connection set. Dropping query.\n",
2513                     GNUNET_i2s (other));
2514 #endif
2515       GNUNET_STATISTICS_update (stats,
2516                                 gettext_noop ("# requests dropped due to missing reverse route"),
2517                                 1,
2518                                 GNUNET_NO);
2519      /* FIXME: try connect? */
2520       return GNUNET_OK;
2521     }
2522   /* note that we can really only check load here since otherwise
2523      peers could find out that we are overloaded by not being
2524      disconnected after sending us a malformed query... */
2525   if (GNUNET_YES == test_load_too_high ())
2526     {
2527 #if DEBUG_FS
2528       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2529                   "Dropping query from `%s', this peer is too busy.\n",
2530                   GNUNET_i2s (other));
2531 #endif
2532       GNUNET_STATISTICS_update (stats,
2533                                 gettext_noop ("# requests dropped due to high load"),
2534                                 1,
2535                                 GNUNET_NO);
2536       return GNUNET_OK;
2537     }
2538
2539 #if DEBUG_FS 
2540   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2541               "Received request for `%s' of type %u from peer `%4s' with flags %u\n",
2542               GNUNET_h2s (&gm->query),
2543               (unsigned int) type,
2544               GNUNET_i2s (other),
2545               (unsigned int) bm);
2546 #endif
2547   have_ns = (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE));
2548   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
2549                       (have_ns ? sizeof(GNUNET_HashCode) : 0));
2550   if (have_ns)
2551     pr->namespace = (GNUNET_HashCode*) &pr[1];
2552   pr->type = type;
2553   pr->mingle = ntohl (gm->filter_mutator);
2554   if (0 != (bm & GET_MESSAGE_BIT_SKS_NAMESPACE))    
2555     memcpy (&pr[1], &opt[bits++], sizeof (GNUNET_HashCode));
2556   if (0 != (bm & GET_MESSAGE_BIT_TRANSMIT_TO))
2557     pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &opt[bits++]);
2558
2559   pr->anonymity_level = 1;
2560   pr->priority = bound_priority (ntohl (gm->priority), cps);
2561   pr->ttl = bound_ttl (ntohl (gm->ttl), pr->priority);
2562   pr->query = gm->query;
2563   /* decrement ttl (always) */
2564   ttl_decrement = 2 * TTL_DECREMENT +
2565     GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
2566                               TTL_DECREMENT);
2567   if ( (pr->ttl < 0) &&
2568        (((int32_t)(pr->ttl - ttl_decrement)) > 0) )
2569     {
2570 #if DEBUG_FS
2571       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2572                   "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
2573                   GNUNET_i2s (other),
2574                   pr->ttl,
2575                   ttl_decrement);
2576 #endif
2577       GNUNET_STATISTICS_update (stats,
2578                                 gettext_noop ("# requests dropped due TTL underflow"),
2579                                 1,
2580                                 GNUNET_NO);
2581       /* integer underflow => drop (should be very rare)! */
2582       GNUNET_free (pr);
2583       return GNUNET_OK;
2584     } 
2585   pr->ttl -= ttl_decrement;
2586   pr->start_time = GNUNET_TIME_absolute_get ();
2587
2588   /* get bloom filter */
2589   if (bfsize > 0)
2590     {
2591       pr->bf = GNUNET_CONTAINER_bloomfilter_init ((const char*) &opt[bits],
2592                                                   bfsize,
2593                                                   BLOOMFILTER_K);
2594       pr->bf_size = bfsize;
2595     }
2596
2597   cdc.have = NULL;
2598   cdc.pr = pr;
2599   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
2600                                               &gm->query,
2601                                               &check_duplicate_request_peer,
2602                                               &cdc);
2603   if (cdc.have != NULL)
2604     {
2605       if (cdc.have->start_time.value + cdc.have->ttl >=
2606           pr->start_time.value + pr->ttl)
2607         {
2608           /* existing request has higher TTL, drop new one! */
2609           cdc.have->priority += pr->priority;
2610           destroy_pending_request (pr);
2611 #if DEBUG_FS
2612           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2613                       "Have existing request with higher TTL, dropping new request.\n",
2614                       GNUNET_i2s (other));
2615 #endif
2616           GNUNET_STATISTICS_update (stats,
2617                                     gettext_noop ("# requests dropped due to higher-TTL request"),
2618                                     1,
2619                                     GNUNET_NO);
2620           return GNUNET_OK;
2621         }
2622       else
2623         {
2624           /* existing request has lower TTL, drop old one! */
2625           pr->priority += cdc.have->priority;
2626           /* Possible optimization: if we have applicable pending
2627              replies in 'cdc.have', we might want to move those over
2628              (this is a really rare special-case, so it is not clear
2629              that this would be worth it) */
2630           destroy_pending_request (cdc.have);
2631           /* keep processing 'pr'! */
2632         }
2633     }
2634
2635   pr->cp = cp;
2636   GNUNET_break (GNUNET_OK ==
2637                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
2638                                                    &gm->query,
2639                                                    pr,
2640                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
2641   GNUNET_break (GNUNET_OK ==
2642                 GNUNET_CONTAINER_multihashmap_put (peer_request_map,
2643                                                    &other->hashPubKey,
2644                                                    pr,
2645                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
2646   
2647   pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
2648                                             pr,
2649                                             pr->start_time.value + pr->ttl);
2650
2651   GNUNET_STATISTICS_update (stats,
2652                             gettext_noop ("# P2P searches received"),
2653                             1,
2654                             GNUNET_NO);
2655   GNUNET_STATISTICS_update (stats,
2656                             gettext_noop ("# P2P searches active"),
2657                             1,
2658                             GNUNET_NO);
2659
2660   /* calculate change in traffic preference */
2661   preference = (double) pr->priority;
2662   if (preference < QUERY_BANDWIDTH_VALUE)
2663     preference = QUERY_BANDWIDTH_VALUE;
2664   cps->inc_preference += preference;
2665
2666   /* process locally */
2667   if (type == GNUNET_BLOCK_TYPE_DBLOCK)
2668     type = GNUNET_BLOCK_TYPE_ANY; /* to get on-demand as well */
2669   timeout = GNUNET_TIME_relative_multiply (BASIC_DATASTORE_REQUEST_DELAY,
2670                                            (pr->priority + 1)); 
2671   pr->qe = GNUNET_DATASTORE_get (dsh,
2672                                  &gm->query,
2673                                  type,                         
2674                                  (unsigned int) preference, 64 /* FIXME */,
2675                                  
2676                                  timeout,
2677                                  &process_local_reply,
2678                                  pr);
2679
2680   /* Are multiple results possible?  If so, start processing remotely now! */
2681   switch (pr->type)
2682     {
2683     case GNUNET_BLOCK_TYPE_DBLOCK:
2684     case GNUNET_BLOCK_TYPE_IBLOCK:
2685       /* only one result, wait for datastore */
2686       break;
2687     default:
2688       if (pr->task == GNUNET_SCHEDULER_NO_TASK)
2689         pr->task = GNUNET_SCHEDULER_add_now (sched,
2690                                              &forward_request_task,
2691                                              pr);
2692     }
2693
2694   /* make sure we don't track too many requests */
2695   if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
2696     {
2697       pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
2698       destroy_pending_request (pr);
2699     }
2700   return GNUNET_OK;
2701 }
2702
2703
2704 /* **************************** CS GET Handling ************************ */
2705
2706
2707 /**
2708  * Handle START_SEARCH-message (search request from client).
2709  *
2710  * @param cls closure
2711  * @param client identification of the client
2712  * @param message the actual message
2713  */
2714 static void
2715 handle_start_search (void *cls,
2716                      struct GNUNET_SERVER_Client *client,
2717                      const struct GNUNET_MessageHeader *message)
2718 {
2719   static GNUNET_HashCode all_zeros;
2720   const struct SearchMessage *sm;
2721   struct ClientList *cl;
2722   struct ClientRequestList *crl;
2723   struct PendingRequest *pr;
2724   uint16_t msize;
2725   unsigned int sc;
2726   enum GNUNET_BLOCK_Type type;
2727
2728   msize = ntohs (message->size);
2729   if ( (msize < sizeof (struct SearchMessage)) ||
2730        (0 != (msize - sizeof (struct SearchMessage)) % sizeof (GNUNET_HashCode)) )
2731     {
2732       GNUNET_break (0);
2733       GNUNET_SERVER_receive_done (client,
2734                                   GNUNET_SYSERR);
2735       return;
2736     }
2737   GNUNET_STATISTICS_update (stats,
2738                             gettext_noop ("# client searches received"),
2739                             1,
2740                             GNUNET_NO);
2741   sc = (msize - sizeof (struct SearchMessage)) / sizeof (GNUNET_HashCode);
2742   sm = (const struct SearchMessage*) message;
2743   type = ntohl (sm->type);
2744 #if DEBUG_FS
2745   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2746               "Received request for `%s' of type %u from local client\n",
2747               GNUNET_h2s (&sm->query),
2748               (unsigned int) type);
2749 #endif
2750   switch (type)
2751     {
2752     case GNUNET_BLOCK_TYPE_ANY:
2753     case GNUNET_BLOCK_TYPE_DBLOCK:
2754     case GNUNET_BLOCK_TYPE_IBLOCK:
2755     case GNUNET_BLOCK_TYPE_KBLOCK:
2756     case GNUNET_BLOCK_TYPE_SBLOCK:
2757     case GNUNET_BLOCK_TYPE_NBLOCK:
2758       break;
2759     default:
2760       GNUNET_break (0);
2761       GNUNET_SERVER_receive_done (client,
2762                                   GNUNET_SYSERR);
2763       return;
2764     }  
2765
2766   cl = client_list;
2767   while ( (cl != NULL) &&
2768           (cl->client != client) )
2769     cl = cl->next;
2770   if (cl == NULL)
2771     {
2772       cl = GNUNET_malloc (sizeof (struct ClientList));
2773       cl->client = client;
2774       GNUNET_SERVER_client_keep (client);
2775       cl->next = client_list;
2776       client_list = cl;
2777     }
2778   /* detect duplicate KBLOCK requests */
2779   if ( (type == GNUNET_BLOCK_TYPE_KBLOCK) ||
2780        (type == GNUNET_BLOCK_TYPE_NBLOCK) ||
2781        (type == GNUNET_BLOCK_TYPE_ANY) )
2782     {
2783       crl = cl->rl_head;
2784       while ( (crl != NULL) &&
2785               ( (0 != memcmp (&crl->req->query,
2786                               &sm->query,
2787                               sizeof (GNUNET_HashCode))) ||
2788                 (crl->req->type != type) ) )
2789         crl = crl->next;
2790       if (crl != NULL)  
2791         { 
2792 #if DEBUG_FS
2793           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2794                       "Have existing request, merging content-seen lists.\n");
2795 #endif
2796           pr = crl->req;
2797           /* Duplicate request (used to send long list of
2798              known/blocked results); merge 'pr->replies_seen'
2799              and update bloom filter */
2800           GNUNET_array_grow (pr->replies_seen,
2801                              pr->replies_seen_size,
2802                              pr->replies_seen_off + sc);
2803           memcpy (&pr->replies_seen[pr->replies_seen_off],
2804                   &sm[1],
2805                   sc * sizeof (GNUNET_HashCode));
2806           pr->replies_seen_off += sc;
2807           refresh_bloomfilter (pr);
2808           GNUNET_STATISTICS_update (stats,
2809                                     gettext_noop ("# client searches updated (merged content seen list)"),
2810                                     1,
2811                                     GNUNET_NO);
2812           GNUNET_SERVER_receive_done (client,
2813                                       GNUNET_OK);
2814           return;
2815         }
2816     }
2817   GNUNET_STATISTICS_update (stats,
2818                             gettext_noop ("# client searches active"),
2819                             1,
2820                             GNUNET_NO);
2821   pr = GNUNET_malloc (sizeof (struct PendingRequest) + 
2822                       ((type == GNUNET_BLOCK_TYPE_SBLOCK) ? sizeof(GNUNET_HashCode) : 0));
2823   crl = GNUNET_malloc (sizeof (struct ClientRequestList));
2824   memset (crl, 0, sizeof (struct ClientRequestList));
2825   crl->client_list = cl;
2826   GNUNET_CONTAINER_DLL_insert (cl->rl_head,
2827                                cl->rl_tail,
2828                                crl);  
2829   crl->req = pr;
2830   pr->type = type;
2831   pr->client_request_list = crl;
2832   GNUNET_array_grow (pr->replies_seen,
2833                      pr->replies_seen_size,
2834                      sc);
2835   memcpy (pr->replies_seen,
2836           &sm[1],
2837           sc * sizeof (GNUNET_HashCode));
2838   pr->replies_seen_off = sc;
2839   pr->anonymity_level = ntohl (sm->anonymity_level); 
2840   refresh_bloomfilter (pr);
2841   pr->query = sm->query;
2842   if (0 == (1 & ntohl (sm->options)))
2843     pr->local_only = GNUNET_NO;
2844   else
2845     pr->local_only = GNUNET_YES;
2846   switch (type)
2847     {
2848     case GNUNET_BLOCK_TYPE_DBLOCK:
2849     case GNUNET_BLOCK_TYPE_IBLOCK:
2850       if (0 != memcmp (&sm->target,
2851                        &all_zeros,
2852                        sizeof (GNUNET_HashCode)))
2853         pr->target_pid = GNUNET_PEER_intern ((const struct GNUNET_PeerIdentity*) &sm->target);
2854       break;
2855     case GNUNET_BLOCK_TYPE_SBLOCK:
2856       pr->namespace = (GNUNET_HashCode*) &pr[1];
2857       memcpy (&pr[1], &sm->target, sizeof (GNUNET_HashCode));
2858       break;
2859     default:
2860       break;
2861     }
2862   GNUNET_break (GNUNET_OK ==
2863                 GNUNET_CONTAINER_multihashmap_put (query_request_map,
2864                                                    &sm->query,
2865                                                    pr,
2866                                                    GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
2867   if (type == GNUNET_BLOCK_TYPE_DBLOCK)
2868     type = GNUNET_BLOCK_TYPE_ANY; /* get on-demand blocks too! */
2869   pr->qe = GNUNET_DATASTORE_get (dsh,
2870                                  &sm->query,
2871                                  type,
2872                                  -3, -1,
2873                                  GNUNET_CONSTANTS_SERVICE_TIMEOUT,                             
2874                                  &process_local_reply,
2875                                  pr);
2876 }
2877
2878
2879 /* **************************** Startup ************************ */
2880
2881
2882 /**
2883  * List of handlers for P2P messages
2884  * that we care about.
2885  */
2886 static struct GNUNET_CORE_MessageHandler p2p_handlers[] =
2887   {
2888     { &handle_p2p_get, 
2889       GNUNET_MESSAGE_TYPE_FS_GET, 0 },
2890     { &handle_p2p_put, 
2891       GNUNET_MESSAGE_TYPE_FS_PUT, 0 },
2892     { NULL, 0, 0 }
2893   };
2894
2895
2896 /**
2897  * List of handlers for the messages understood by this
2898  * service.
2899  */
2900 static struct GNUNET_SERVER_MessageHandler handlers[] = {
2901   {&GNUNET_FS_handle_index_start, NULL, 
2902    GNUNET_MESSAGE_TYPE_FS_INDEX_START, 0},
2903   {&GNUNET_FS_handle_index_list_get, NULL, 
2904    GNUNET_MESSAGE_TYPE_FS_INDEX_LIST_GET, sizeof(struct GNUNET_MessageHeader) },
2905   {&GNUNET_FS_handle_unindex, NULL, GNUNET_MESSAGE_TYPE_FS_UNINDEX, 
2906    sizeof (struct UnindexMessage) },
2907   {&handle_start_search, NULL, GNUNET_MESSAGE_TYPE_FS_START_SEARCH, 
2908    0 },
2909   {NULL, NULL, 0, 0}
2910 };
2911
2912
2913 /**
2914  * Process fs requests.
2915  *
2916  * @param s scheduler to use
2917  * @param server the initialized server
2918  * @param c configuration to use
2919  */
2920 static int
2921 main_init (struct GNUNET_SCHEDULER_Handle *s,
2922            struct GNUNET_SERVER_Handle *server,
2923            const struct GNUNET_CONFIGURATION_Handle *c)
2924 {
2925   sched = s;
2926   cfg = c;
2927   stats = GNUNET_STATISTICS_create (sched, "fs", cfg);
2928   connected_peers = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
2929   query_request_map = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
2930   peer_request_map = GNUNET_CONTAINER_multihashmap_create (128); // FIXME: get size from config
2931   requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
2932   core = GNUNET_CORE_connect (sched,
2933                               cfg,
2934                               GNUNET_TIME_UNIT_FOREVER_REL,
2935                               NULL,
2936                               NULL,
2937                               &peer_connect_handler,
2938                               &peer_disconnect_handler,
2939                               NULL, GNUNET_NO,
2940                               NULL, GNUNET_NO,
2941                               p2p_handlers);
2942   if (NULL == core)
2943     {
2944       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
2945                   _("Failed to connect to `%s' service.\n"),
2946                   "core");
2947       GNUNET_CONTAINER_multihashmap_destroy (connected_peers);
2948       connected_peers = NULL;
2949       GNUNET_CONTAINER_multihashmap_destroy (query_request_map);
2950       query_request_map = NULL;
2951       GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
2952       requests_by_expiration_heap = NULL;
2953       GNUNET_CONTAINER_multihashmap_destroy (peer_request_map);
2954       peer_request_map = NULL;
2955       if (dsh != NULL)
2956         {
2957           GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
2958           dsh = NULL;
2959         }
2960       return GNUNET_SYSERR;
2961     }
2962   GNUNET_SERVER_disconnect_notify (server, 
2963                                    &handle_client_disconnect,
2964                                    NULL);
2965   GNUNET_SERVER_add_handlers (server, handlers);
2966   GNUNET_SCHEDULER_add_delayed (sched,
2967                                 GNUNET_TIME_UNIT_FOREVER_REL,
2968                                 &shutdown_task,
2969                                 NULL);
2970   return GNUNET_OK;
2971 }
2972
2973
2974 /**
2975  * Process fs requests.
2976  *
2977  * @param cls closure
2978  * @param sched scheduler to use
2979  * @param server the initialized server
2980  * @param cfg configuration to use
2981  */
2982 static void
2983 run (void *cls,
2984      struct GNUNET_SCHEDULER_Handle *sched,
2985      struct GNUNET_SERVER_Handle *server,
2986      const struct GNUNET_CONFIGURATION_Handle *cfg)
2987 {
2988   active_migration = GNUNET_CONFIGURATION_get_value_yesno (cfg,
2989                                                            "FS",
2990                                                            "ACTIVEMIGRATION");
2991   dsh = GNUNET_DATASTORE_connect (cfg,
2992                                   sched);
2993   if (dsh == NULL)
2994     {
2995       GNUNET_SCHEDULER_shutdown (sched);
2996       return;
2997     }
2998   if ( (GNUNET_OK != GNUNET_FS_indexing_init (sched, cfg, dsh)) ||
2999        (GNUNET_OK != main_init (sched, server, cfg)) )
3000     {    
3001       GNUNET_SCHEDULER_shutdown (sched);
3002       GNUNET_DATASTORE_disconnect (dsh, GNUNET_NO);
3003       dsh = NULL;
3004       return;   
3005     }
3006 }
3007
3008
3009 /**
3010  * The main function for the fs service.
3011  *
3012  * @param argc number of arguments from the command line
3013  * @param argv command line arguments
3014  * @return 0 ok, 1 on error
3015  */
3016 int
3017 main (int argc, char *const *argv)
3018 {
3019   return (GNUNET_OK ==
3020           GNUNET_SERVICE_run (argc,
3021                               argv,
3022                               "fs",
3023                               GNUNET_SERVICE_OPTION_NONE,
3024                               &run, NULL)) ? 0 : 1;
3025 }
3026
3027 /* end of gnunet-service-fs.c */