stuff
[oweals/gnunet.git] / src / fs / gnunet-service-fs_pr.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_pr.c
23  * @brief API to handle pending requests
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet-service-fs_pr.h"
28
29
30 /**
31  * An active request.
32  */
33 struct GSF_PendingRequest
34 {
35   /**
36    * Public data for the request.
37    */ 
38   struct GSF_PendingRequestData public_data;
39
40   /**
41    * Function to call if we encounter a reply.
42    */
43   GSF_PendingRequestReplyHandler rh;
44
45   /**
46    * Closure for 'rh'
47    */
48   void *rh_cls;
49
50   /**
51    * Array of hash codes of replies we've already seen.
52    */
53   GNUNET_HashCode *replies_seen;
54
55   /**
56    * Bloomfilter masking replies we've already seen.
57    */
58   struct GNUNET_CONTAINER_BloomFilter *bf;
59
60   /**
61    * Number of valid entries in the 'replies_seen' array.
62    */
63   unsigned int replies_seen_count;
64
65   /**
66    * Length of the 'replies_seen' array.
67    */
68   unsigned int replies_seen_size;
69
70   /**
71    * Mingle value we currently use for the bf.
72    */
73   int32_t mingle;
74                             
75 };
76
77
78 /**
79  * All pending requests, ordered by the query.  Entries
80  * are of type 'struct GSF_PendingRequest*'.
81  */
82 static struct GNUNET_CONTAINER_MultiHashMap *pr_map;
83
84
85 /**
86  * Datastore 'PUT' load tracking.
87  */
88 static struct GNUNET_LOAD_Value *datastore_put_load;
89
90
91 /**
92  * Are we allowed to migrate content to this peer.
93  */
94 static int active_to_migration;
95
96
97 /**
98  * Heap with the request that will expire next at the top.  Contains
99  * pointers of type "struct PendingRequest*"; these will *also* be
100  * aliased from the "requests_by_peer" data structures and the
101  * "requests_by_query" table.  Note that requests from our clients
102  * don't expire and are thus NOT in the "requests_by_expiration"
103  * (or the "requests_by_peer" tables).
104  */
105 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
106
107
108 /**
109  * How many bytes should a bloomfilter be if we have already seen
110  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
111  * of bits set per entry.  Furthermore, we should not re-size the
112  * filter too often (to keep it cheap).
113  *
114  * Since other peers will also add entries but not resize the filter,
115  * we should generally pick a slightly larger size than what the
116  * strict math would suggest.
117  *
118  * @return must be a power of two and smaller or equal to 2^15.
119  */
120 static size_t
121 compute_bloomfilter_size (unsigned int entry_count)
122 {
123   size_t size;
124   unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
125   uint16_t max = 1 << 15;
126
127   if (entry_count > max)
128     return max;
129   size = 8;
130   while ((size < max) && (size < ideal))
131     size *= 2;
132   if (size > max)
133     return max;
134   return size;
135 }
136
137
138 /**
139  * Recalculate our bloom filter for filtering replies.  This function
140  * will create a new bloom filter from scratch, so it should only be
141  * called if we have no bloomfilter at all (and hence can create a
142  * fresh one of minimal size without problems) OR if our peer is the
143  * initiator (in which case we may resize to larger than mimimum size).
144  *
145  * @param pr request for which the BF is to be recomputed
146  * @return GNUNET_YES if a refresh actually happened
147  */
148 static int
149 refresh_bloomfilter (struct GSF_PendingRequest *pr)
150 {
151   unsigned int i;
152   size_t nsize;
153   GNUNET_HashCode mhash;
154
155   nsize = compute_bloomfilter_size (pr->replies_seen_off);
156   if ( (bf != NULL) &&
157        (nsize == GNUNET_CONTAINER_bloomfilter_get_size (pr->bf)) )
158     return GNUNET_NO; /* size not changed */
159   if (pr->bf != NULL)
160     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
161   pr->mingle = (int32_t) GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 
162                                                    UINT32_MAX);
163   pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
164                                               nsize,
165                                               BLOOMFILTER_K);
166   for (i=0;i<pr->replies_seen_count;i++)
167     {
168       GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i],
169                                 pr->mingle,
170                                 &mhash);
171       GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
172     }
173   return GNUNET_YES;
174 }
175
176
177 /**
178  * Create a new pending request.  
179  *
180  * @param options request options
181  * @param type type of the block that is being requested
182  * @param query key for the lookup
183  * @param namespace namespace to lookup, NULL for no namespace
184  * @param target preferred target for the request, NULL for none
185  * @param bf_data raw data for bloom filter for known replies, can be NULL
186  * @param bf_size number of bytes in bf_data
187  * @param mingle mingle value for bf
188  * @param anonymity_level desired anonymity level
189  * @param priority maximum outgoing cummulative request priority to use
190  * @param ttl current time-to-live for the request
191  * @param replies_seen hash codes of known local replies
192  * @param replies_seen_count size of the 'replies_seen' array
193  * @param rh handle to call when we get a reply
194  * @param rh_cls closure for rh
195  * @return handle for the new pending request
196  */
197 struct GSF_PendingRequest *
198 GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
199                              enum GNUNET_BLOCK_Type type,
200                              const GNUNET_HashCode *query,
201                              const GNUNET_HashCode *namespace,
202                              const struct GNUNET_PeerIdentity *target,
203                              const char *bf_data,
204                              size_t bf_size,
205                              int32_t mingle,
206                              uint32_t anonymity_level,
207                              uint32_t priority,
208                              int32_t ttl,
209                              const GNUNET_HashCode *replies_seen,
210                              unsigned int replies_seen_count,
211                              GSF_PendingRequestReplyHandler rh,
212                              void *rh_cls)
213 {
214   struct GSF_PendingRequest *pr;
215
216   
217   pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest));
218   pr->public_data.query = *query;
219   if (GNUNET_BLOCK_TYPE_SBLOCK == type)
220     {
221       GNUNET_assert (NULL != namespace);
222       pr->public_data.namespace = *namespace;
223     }
224   if (NULL != target)
225     {
226       pr->public_data.target = *target;
227       pr->has_target = GNUNET_YES;
228     }
229   pr->public_data.anonymity_level = anonymity_data;
230   pr->public_data.priority = priority;
231   pr->public_data.options = options;
232   pr->public_data.type = type;  
233   pr->public_data.start_time = GNUNET_TIME_absolute_get ();
234   pr->rh = rh;
235   pr->rh_cls = rh_cls;
236   if (ttl >= 0)
237     pr->ttl = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
238                                                                                (uint32_t) ttl));
239   else
240     pr->ttl = GNUNET_TIME_absolute_subtract (pr->public_data.start_time,
241                                              GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
242                                                                             (uint32_t) (- ttl)));
243   if (replies_seen_count > 0)
244     {
245       pr->replies_seen_size = replies_seen_count;
246       pr->replies_seen = GNUNET_malloc (sizeof (GNUNET_HashCode) * pr->replies_seen_size);
247       memcpy (pr->replies_seen,
248               replies_seen,
249               replies_seen_count * sizeof (struct GNUNET_HashCode));
250       pr->replies_seen_count = replies_seen_count;
251     }
252   if (NULL != bf_data)    
253     {
254       pr->bf = GNUNET_CONTAINER_bloomfilter_init (bf_data,
255                                                   bf_size,
256                                                   BLOOMFILTER_K);
257       pr->mingle = mingle;
258     }
259   else if ( (replies_seen_count > 0) &&
260             (0 != (options & GSF_PRO_BLOOMFILTER_FULL_REFRESH)) )
261     {
262       GNUNET_assert (GNUNET_YES == refresh_bloomfilter (pr));
263     }
264   GNUNET_CONTAINER_multihashmap_put (pr_map,
265                                      query,
266                                      pr,
267                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
268   // FIXME: if not a local query, we also need to track the
269   // total number of external queries we currently have and
270   // bound it => need an additional heap!
271
272   pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
273                                             pr,
274                                             pr->start_time.abs_value + pr->ttl);
275
276
277
278   /* make sure we don't track too many requests */
279   if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
280     {
281       pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
282       GNUNET_assert (pr != NULL);
283       destroy_pending_request (pr);
284     }
285
286
287   return pr;
288 }
289
290
291 /**
292  * Obtain the public data associated with a pending request
293  * 
294  * @param pr pending request
295  * @return associated public data
296  */
297 struct GSF_PendingRequestData *
298 GSF_pending_request_get_data_ (struct GSF_PendingRequest *pr)
299 {
300   return &pr->public_data;
301 }
302
303
304 /**
305  * Update a given pending request with additional replies
306  * that have been seen.
307  *
308  * @param pr request to update
309  * @param replies_seen hash codes of replies that we've seen
310  * @param replies_seen_count size of the replies_seen array
311  */
312 void
313 GSF_pending_request_update_ (struct GSF_PendingRequest *pr,
314                              const GNUNET_HashCode *replies_seen,
315                              unsigned int replies_seen_count)
316 {
317   unsigned int i;
318   GNUNET_HashCode mhash;
319
320   if (replies_seen_count + pr->replies_seen_count < pr->replies_seen_count)
321     return; /* integer overflow */
322   if (0 != (options & GSF_PRO_BLOOMFILTER_FULL_REFRESH))
323     {
324       /* we're responsible for the BF, full refresh */
325       if (replies_seen_count + pr->replies_seen_count > pr->replies_seen_size)
326         GNUNET_array_grow (pr->replies_seen,
327                            pr->replies_seen_size,
328                            replies_seen_count + pr->replies_seen_count);
329       memcpy (&pr->replies_seen[pr->replies_seen_count],
330               replies_seen,
331               sizeof (GNUNET_HashCode) * replies_seen_count);
332       pr->replies_seen_count += replies_seen;
333       if (GNUNET_NO == refresh_bloomfilter (pr))
334         {
335           /* bf not recalculated, simply extend it with new bits */
336           for (i=0;i<pr->replies_seen_count;i++)
337             {
338               GNUNET_BLOCK_mingle_hash (&replies_seen[i],
339                                         pr->mingle,
340                                         &mhash);
341               GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
342             }
343         }
344     }
345   else
346     {
347       if (NULL == pr->bf)
348         {
349           /* we're not the initiator, but the initiator did not give us
350              any bloom-filter, so we need to create one on-the-fly */
351           pr->mingle = (int32_t) GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 
352                                                            UINT32_MAX);
353           pr->bf = GNUNET_CONTAINER_bloomfilter_init (compute_bloomfilter_size (replies_seen_count),
354                                                       pr->mingle,
355                                                       BLOOMFILTER_K);
356         }
357       for (i=0;i<pr->replies_seen_count;i++)
358         {
359           GNUNET_BLOCK_mingle_hash (&replies_seen[i],
360                                     pr->mingle,
361                                     &mhash);
362           GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
363         }
364     }
365 }
366
367
368 /**
369  * Generate the message corresponding to the given pending request for
370  * transmission to other peers (or at least determine its size).
371  *
372  * @param pr request to generate the message for
373  * @param do_route are we routing the reply
374  * @param buf_size number of bytes available in buf
375  * @param buf where to copy the message (can be NULL)
376  * @return number of bytes needed (if > buf_size) or used
377  */
378 size_t
379 GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
380                                   int do_route,
381                                   size_t buf_size,
382                                   void *buf)
383 {
384   struct PendingMessage *pm;
385   char lbuf[GNUNET_SERVER_MAX_MESSAGE_SIZE];
386   struct GetMessage *gm;
387   GNUNET_HashCode *ext;
388   size_t msize;
389   unsigned int k;
390   int no_route;
391   uint32_t bm;
392   uint32_t prio;
393   size_t bf_size;
394
395   k = 0;
396   bm = 0;
397   if (GNUNET_YES != do_route)
398     {
399       bm |= GET_MESSAGE_BIT_RETURN_TO;
400       k++;      
401     }
402   if (GNUNET_BLOCK_TYPE_SBLOCK == pr->type)
403     {
404       bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
405       k++;
406     }
407   if (GNUNET_YES == pr->has_target)
408     {
409       bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
410       k++;
411     }
412   bf_size = GNUNET_CONTAINER_bloomfilter_get_size (pr->bf);
413   msize = sizeof (struct GetMessage) + bf_size + k * sizeof(GNUNET_HashCode);
414   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
415   if (buf_size < msize)
416     return msize;  
417   gm = (struct GetMessage*) lbuf;
418   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
419   gm->header.size = htons (msize);
420   gm->type = htonl (pr->type);
421   if (GNUNET_YES == do_route)
422     prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
423                                      pr->public_data.priority + 1);
424   else
425     prio = 0;
426   pr->public_data.priority -= prio;
427   gm->priority = htonl (prio);
428   gm->ttl = htonl (pr->ttl);
429   gm->filter_mutator = htonl(pr->mingle); 
430   gm->hash_bitmap = htonl (bm);
431   gm->query = pr->query;
432   ext = (GNUNET_HashCode*) &gm[1];
433   k = 0;
434   if (GNUNET_YES != do_route)
435     GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
436   if (GNUNET_BLOCK_TYPE_SBLOCK == pr->type)
437     memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
438   if (GNUNET_YES == pr->has_target)
439     GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
440   if (pr->bf != NULL)
441     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
442                                                (char*) &ext[k],
443                                                bf_size);
444   memcpy (buf, gm, msize);
445   return msize;
446 }
447
448
449 /**
450  * Iterator to free pending requests.
451  *
452  * @param cls closure, unused
453  * @param key current key code
454  * @param value value in the hash map (pending request)
455  * @return GNUNET_YES (we should continue to iterate)
456  */
457 static int 
458 clean_request (void *cls,
459                const GNUNET_HashCode * key,
460                void *value)
461 {
462   struct GSF_PendingRequest *pr = value;
463   
464   GNUNET_free_non_null (pr->replies_seen);
465   if (NULL != pr->bf)
466     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
467   GNUNET_free (pr);
468   return GNUNET_YES;
469 }
470
471
472 /**
473  * Explicitly cancel a pending request.
474  *
475  * @param pr request to cancel
476  */
477 void
478 GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr)
479 {
480   GNUNET_assert (GNUNET_OK ==
481                  GNUNET_CONTAINER_multihashmap_remove (pr_map,
482                                                        &pr->public_data.query,
483                                                        pr));
484   GNUNET_assert (GNUNET_YES ==
485                  clean_request (NULL, &pr->public_data.query, pr));  
486 }
487
488
489 /**
490  * Iterate over all pending requests.
491  *
492  * @param it function to call for each request
493  * @param cls closure for it
494  */
495 void
496 GSF_iterate_pending_pr_map_ (GSF_PendingRequestIterator it,
497                              void *cls)
498 {
499   GNUNET_CONTAINER_multihashmap_iterate (pr_map,
500                                          (GNUNET_CONTAINER_HashMapIterator) it,
501                                          cls);
502 }
503
504
505
506
507 /**
508  * Closure for "process_reply" function.
509  */
510 struct ProcessReplyClosure
511 {
512   /**
513    * The data for the reply.
514    */
515   const void *data;
516
517   /**
518    * Who gave us this reply? NULL for local host (or DHT)
519    */
520   struct ConnectedPeer *sender;
521
522   /**
523    * When the reply expires.
524    */
525   struct GNUNET_TIME_Absolute expiration;
526
527   /**
528    * Size of data.
529    */
530   size_t size;
531
532   /**
533    * Type of the block.
534    */
535   enum GNUNET_BLOCK_Type type;
536
537   /**
538    * How much was this reply worth to us?
539    */
540   uint32_t priority;
541
542   /**
543    * Anonymity requirements for this reply.
544    */
545   uint32_t anonymity_level;
546
547   /**
548    * Evaluation result (returned).
549    */
550   enum GNUNET_BLOCK_EvaluationResult eval;
551
552   /**
553    * Did we finish processing the associated request?
554    */ 
555   int finished;
556
557   /**
558    * Did we find a matching request?
559    */
560   int request_found;
561 };
562
563
564 /**
565  * Update the performance data for the sender (if any) since
566  * the sender successfully answered one of our queries.
567  *
568  * @param prq information about the sender
569  * @param pr request that was satisfied
570  */
571 static void
572 update_request_performance_data (struct ProcessReplyClosure *prq,
573                                  struct GSF_PendingRequest *pr)
574 {
575   unsigned int i;
576   struct GNUNET_TIME_Relative cur_delay;
577
578   if (prq->sender == NULL)
579     return;      
580   /* FIXME: adapt code to new API... */
581   for (i=0;i<pr->used_targets_off;i++)
582     if (pr->used_targets[i].pid == prq->sender->pid)
583       break;
584   if (i < pr->used_targets_off)
585     {
586       cur_delay = GNUNET_TIME_absolute_get_duration (pr->used_targets[i].last_request_time);      
587       prq->sender->avg_delay.rel_value
588         = (prq->sender->avg_delay.rel_value * 
589            (RUNAVG_DELAY_N - 1) + cur_delay.rel_value) / RUNAVG_DELAY_N; 
590       prq->sender->avg_priority
591         = (prq->sender->avg_priority * 
592            (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
593     }
594   if (pr->cp != NULL)
595     {
596       GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
597                              [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE], 
598                              -1);
599       GNUNET_PEER_change_rc (pr->cp->pid, 1);
600       prq->sender->last_p2p_replies
601         [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
602         = pr->cp->pid;
603     }
604   else
605     {
606       if (NULL != prq->sender->last_client_replies
607           [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
608         GNUNET_SERVER_client_drop (prq->sender->last_client_replies
609                                    [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
610       prq->sender->last_client_replies
611         [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
612         = pr->client_request_list->client_list->client;
613       GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
614     }
615 }
616                                 
617
618
619 /**
620  * We have received a reply; handle it!
621  *
622  * @param cls response (struct ProcessReplyClosure)
623  * @param key our query
624  * @param value value in the hash map (info about the query)
625  * @return GNUNET_YES (we should continue to iterate)
626  */
627 static int
628 process_reply (void *cls,
629                const GNUNET_HashCode * key,
630                void *value)
631 {
632   struct ProcessReplyClosure *prq = cls;
633   struct GSF_PendingRequest *pr = value;
634   struct PendingMessage *reply;
635   struct ClientResponseMessage *creply;
636   struct ClientList *cl;
637   struct PutMessage *pm;
638   struct ConnectedPeer *cp;
639   size_t msize;
640
641 #if DEBUG_FS
642   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
643               "Matched result (type %u) for query `%s' with pending request\n",
644               (unsigned int) prq->type,
645               GNUNET_h2s (key));
646 #endif  
647   GNUNET_STATISTICS_update (stats,
648                             gettext_noop ("# replies received and matched"),
649                             1,
650                             GNUNET_NO);
651   prq->eval = GNUNET_BLOCK_evaluate (block_ctx,
652                                      prq->type,
653                                      key,
654                                      &pr->bf,
655                                      pr->mingle,
656                                      pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
657                                      prq->data,
658                                      prq->size);
659   switch (prq->eval)
660     {
661     case GNUNET_BLOCK_EVALUATION_OK_MORE:
662       update_request_performance_data (prq, pr);
663       break;
664     case GNUNET_BLOCK_EVALUATION_OK_LAST:
665       update_request_performance_data (prq, pr);
666       /* FIXME: adapt code to new API! */
667       while (NULL != pr->pending_head)
668         destroy_pending_message_list_entry (pr->pending_head);
669       if (pr->qe != NULL)
670         {
671           if (pr->client_request_list != NULL)
672             GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client, 
673                                         GNUNET_YES);
674           GNUNET_DATASTORE_cancel (pr->qe);
675           pr->qe = NULL;
676         }
677       pr->do_remove = GNUNET_YES;
678       if (pr->task != GNUNET_SCHEDULER_NO_TASK)
679         {
680           GNUNET_SCHEDULER_cancel (pr->task);
681           pr->task = GNUNET_SCHEDULER_NO_TASK;
682         }
683       GNUNET_break (GNUNET_YES ==
684                     GNUNET_CONTAINER_multihashmap_remove (query_request_map,
685                                                           key,
686                                                           pr));
687       GNUNET_LOAD_update (rt_entry_lifetime,
688                           GNUNET_TIME_absolute_get_duration (pr->start_time).rel_value);
689       break;
690     case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
691       GNUNET_STATISTICS_update (stats,
692                                 gettext_noop ("# duplicate replies discarded (bloomfilter)"),
693                                 1,
694                                 GNUNET_NO);
695 #if DEBUG_FS && 0
696       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
697                   "Duplicate response `%s', discarding.\n",
698                   GNUNET_h2s (&mhash));
699 #endif
700       return GNUNET_YES; /* duplicate */
701     case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
702       return GNUNET_YES; /* wrong namespace */  
703     case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
704       GNUNET_break (0);
705       return GNUNET_YES;
706     case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
707       GNUNET_break (0);
708       return GNUNET_YES;
709     case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
710       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
711                   _("Unsupported block type %u\n"),
712                   prq->type);
713       return GNUNET_NO;
714     }
715   /* FIXME: adapt code to new API! */
716   if (pr->client_request_list != NULL)
717     {
718       if (pr->replies_seen_size == pr->replies_seen_off)
719         GNUNET_array_grow (pr->replies_seen,
720                            pr->replies_seen_size,
721                            pr->replies_seen_size * 2 + 4);      
722       GNUNET_CRYPTO_hash (prq->data,
723                           prq->size,
724                           &pr->replies_seen[pr->replies_seen_off++]);         
725       refresh_bloomfilter (pr);
726     }
727   if (NULL == prq->sender)
728     {
729 #if DEBUG_FS
730       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
731                   "Found result for query `%s' in local datastore\n",
732                   GNUNET_h2s (key));
733 #endif
734       GNUNET_STATISTICS_update (stats,
735                                 gettext_noop ("# results found locally"),
736                                 1,
737                                 GNUNET_NO);      
738     }
739   prq->priority += pr->remaining_priority;
740   pr->remaining_priority = 0;
741   pr->results_found++;
742   prq->request_found = GNUNET_YES;
743   /* finally, pass on to other peers / local clients */
744   pr->rh (pr->rh_cls, pr, prq->data, prq->size);
745   return GNUNET_YES;
746 }
747
748
749 /**
750  * Continuation called to notify client about result of the
751  * operation.
752  *
753  * @param cls closure
754  * @param success GNUNET_SYSERR on failure
755  * @param msg NULL on success, otherwise an error message
756  */
757 static void 
758 put_migration_continuation (void *cls,
759                             int success,
760                             const char *msg)
761 {
762   struct GNUNET_TIME_Absolute *start = cls;
763   struct GNUNET_TIME_Relative delay;
764   
765   delay = GNUNET_TIME_absolute_get_duration (*start);
766   GNUNET_free (start);
767   /* FIXME: should we really update the load value on failure? */
768   GNUNET_LOAD_update (datastore_put_load,
769                       delay.rel_value);
770   if (GNUNET_OK == success)
771     return;
772   GNUNET_STATISTICS_update (stats,
773                             gettext_noop ("# datastore 'put' failures"),
774                             1,
775                             GNUNET_NO);
776 }
777
778
779 /**
780  * Test if the DATABASE (PUT) load on this peer is too high
781  * to even consider processing the query at
782  * all.  
783  * 
784  * @return GNUNET_YES if the load is too high to do anything (load high)
785  *         GNUNET_NO to process normally (load normal or low)
786  */
787 static int
788 test_put_load_too_high (uint32_t priority)
789 {
790   double ld;
791
792   if (GNUNET_LOAD_get_average (datastore_put_load) < 50)
793     return GNUNET_NO; /* very fast */
794   ld = GNUNET_LOAD_get_load (datastore_put_load);
795   if (ld < 2.0 * (1 + priority))
796     return GNUNET_NO;
797   GNUNET_STATISTICS_update (stats,
798                             gettext_noop ("# storage requests dropped due to high load"),
799                             1,
800                             GNUNET_NO);
801   return GNUNET_YES;
802 }
803
804
805 /**
806  * Iterator called on each result obtained for a DHT
807  * operation that expects a reply
808  *
809  * @param cls closure
810  * @param exp when will this value expire
811  * @param key key of the result
812  * @param get_path NULL-terminated array of pointers
813  *                 to the peers on reverse GET path (or NULL if not recorded)
814  * @param put_path NULL-terminated array of pointers
815  *                 to the peers on the PUT path (or NULL if not recorded)
816  * @param type type of the result
817  * @param size number of bytes in data
818  * @param data pointer to the result data
819  */
820 void
821 GSF_handle_dht_reply_ (void *cls,
822                        struct GNUNET_TIME_Absolute exp,
823                        const GNUNET_HashCode * key,
824                        const struct GNUNET_PeerIdentity * const *get_path,
825                        const struct GNUNET_PeerIdentity * const *put_path,
826                        enum GNUNET_BLOCK_Type type,
827                        size_t size,
828                        const void *data)
829 {
830   struct GSF_PendingRequest *pr = cls;
831   struct ProcessReplyClosure prq;
832
833   memset (&prq, 0, sizeof (prq));
834   prq.data = data;
835   prq.expiration = exp;
836   prq.size = size;  
837   prq.type = type;
838   process_reply (&prq, key, pr);
839   if ( (GNUNET_YES == active_to_migration) &&
840        (GNUNET_NO == test_put_load_too_high (prq.priority)) )
841     {      
842 #if DEBUG_FS
843       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
844                   "Replicating result for query `%s' with priority %u\n",
845                   GNUNET_h2s (&query),
846                   prq.priority);
847 #endif
848       start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
849       *start = GNUNET_TIME_absolute_get ();
850       GNUNET_DATASTORE_put (dsh,
851                             0, &query, dsize, &put[1],
852                             type, prq.priority, 1 /* anonymity */, 
853                             expiration, 
854                             1 + prq.priority, MAX_DATASTORE_QUEUE,
855                             GNUNET_CONSTANTS_SERVICE_TIMEOUT,
856                             &put_migration_continuation, 
857                             start);
858     }
859 }
860
861
862 /**
863  * Handle P2P "CONTENT" message.  Checks that the message is
864  * well-formed and then checks if there are any pending requests for
865  * this content and possibly passes it on (to local clients or other
866  * peers).  Does NOT perform migration (content caching at this peer).
867  *
868  * @param cp the other peer involved (sender or receiver, NULL
869  *        for loopback messages where we are both sender and receiver)
870  * @param message the actual message
871  * @return GNUNET_OK if the message was well-formed,
872  *         GNUNET_SYSERR if the message was malformed (close connection,
873  *         do not cache under any circumstances)
874  */
875 int
876 GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
877                          const struct GNUNET_MessageHeader *message)
878 {
879   const struct PutMessage *put;
880   uint16_t msize;
881   size_t dsize;
882   enum GNUNET_BLOCK_Type type;
883   struct GNUNET_TIME_Absolute expiration;
884   GNUNET_HashCode query;
885   struct ProcessReplyClosure prq;
886   struct GNUNET_TIME_Relative block_time;  
887   double putl;
888   struct GNUNET_TIME_Absolute *start;
889
890   msize = ntohs (message->size);
891   if (msize < sizeof (struct PutMessage))
892     {
893       GNUNET_break_op(0);
894       return GNUNET_SYSERR;
895     }
896   put = (const struct PutMessage*) message;
897   dsize = msize - sizeof (struct PutMessage);
898   type = ntohl (put->type);
899   expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
900   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
901     return GNUNET_SYSERR;
902   if (GNUNET_OK !=
903       GNUNET_BLOCK_get_key (block_ctx,
904                             type,
905                             &put[1],
906                             dsize,
907                             &query))
908     {
909       GNUNET_break_op (0);
910       return GNUNET_SYSERR;
911     }
912   /* now, lookup 'query' */
913   prq.data = (const void*) &put[1];
914   if (NULL != cp)
915     prq.sender = cp;
916   else
917     prq.sender = NULL;
918   prq.size = dsize;
919   prq.type = type;
920   prq.expiration = expiration;
921   prq.priority = 0;
922   prq.anonymity_level = 1;
923   prq.finished = GNUNET_NO;
924   prq.request_found = GNUNET_NO;
925   GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
926                                               &query,
927                                               &process_reply,
928                                               &prq);
929   if (NULL != cp)
930     {
931       GSF_connected_peer_change_preference (cp, CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority);
932       GSF_get_peer_performance_data (cp)->trust += prq.priority;
933     }
934   if ( (GNUNET_YES == active_to_migration) &&
935        (GNUNET_NO == test_put_load_too_high (prq.priority)) )
936     {      
937 #if DEBUG_FS
938       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
939                   "Replicating result for query `%s' with priority %u\n",
940                   GNUNET_h2s (&query),
941                   prq.priority);
942 #endif
943       start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
944       *start = GNUNET_TIME_absolute_get ();
945       GNUNET_DATASTORE_put (dsh,
946                             0, &query, dsize, &put[1],
947                             type, prq.priority, 1 /* anonymity */, 
948                             expiration, 
949                             1 + prq.priority, MAX_DATASTORE_QUEUE,
950                             GNUNET_CONSTANTS_SERVICE_TIMEOUT,
951                             &put_migration_continuation, 
952                             start);
953     }
954   putl = GNUNET_LOAD_get_load (datastore_put_load);
955   if ( (NULL != (cp = prq.sender)) &&
956        (GNUNET_NO == prq.request_found) &&
957        ( (GNUNET_YES != active_to_migration) ||
958          (putl > 2.5 * (1 + prq.priority)) ) ) 
959     {
960       if (GNUNET_YES != active_to_migration) 
961         putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
962       block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
963                                                   5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
964                                                                                    (unsigned int) (60000 * putl * putl)));
965       GSF_block_peer_migration (cp, block_time);
966     }
967   return GNUNET_OK;
968 }
969
970
971 /**
972  * Setup the subsystem.
973  */
974 void
975 GSF_pending_request_init_ ()
976 {
977   pr_map = GNUNET_CONTAINER_multihashmap_create (32 * 1024);
978 }
979
980
981 /**
982  * Shutdown the subsystem.
983  */
984 void
985 GSF_pending_request_done_ ()
986 {
987   GNUNET_CONTAINER_multihashmap_iterate (pr_map,
988                                          &clean_request,
989                                          NULL);
990   GNUNET_CONTAINER_multihashmap_destroy (pr_map);
991   pr_map = NULL;
992 }
993
994
995 /* end of gnunet-service-fs_pr.c */