fixes
[oweals/gnunet.git] / src / fs / gnunet-service-fs_pr.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_pr.c
23  * @brief API to handle pending requests
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_load_lib.h"
28 #include "gnunet-service-fs_cp.h"
29 #include "gnunet-service-fs_pr.h"
30
31
32 /**
33  * An active request.
34  */
35 struct GSF_PendingRequest
36 {
37   /**
38    * Public data for the request.
39    */ 
40   struct GSF_PendingRequestData public_data;
41
42   /**
43    * Function to call if we encounter a reply.
44    */
45   GSF_PendingRequestReplyHandler rh;
46
47   /**
48    * Closure for 'rh'
49    */
50   void *rh_cls;
51
52   /**
53    * Array of hash codes of replies we've already seen.
54    */
55   GNUNET_HashCode *replies_seen;
56
57   /**
58    * Bloomfilter masking replies we've already seen.
59    */
60   struct GNUNET_CONTAINER_BloomFilter *bf;
61
62   /**
63    * Entry for this pending request in the expiration heap, or NULL.
64    */
65   struct GNUNET_CONTAINER_HeapNode *hnode;
66
67   /**
68    * Number of valid entries in the 'replies_seen' array.
69    */
70   unsigned int replies_seen_count;
71
72   /**
73    * Length of the 'replies_seen' array.
74    */
75   unsigned int replies_seen_size;
76
77   /**
78    * Mingle value we currently use for the bf.
79    */
80   uint32_t mingle;
81                             
82 };
83
84
85 /**
86  * All pending requests, ordered by the query.  Entries
87  * are of type 'struct GSF_PendingRequest*'.
88  */
89 static struct GNUNET_CONTAINER_MultiHashMap *pr_map;
90
91
92 /**
93  * Datastore 'PUT' load tracking.
94  */
95 static struct GNUNET_LOAD_Value *datastore_put_load;
96
97
98 /**
99  * Are we allowed to migrate content to this peer.
100  */
101 static int active_to_migration;
102
103
104 /**
105  * Heap with the request that will expire next at the top.  Contains
106  * pointers of type "struct PendingRequest*"; these will *also* be
107  * aliased from the "requests_by_peer" data structures and the
108  * "requests_by_query" table.  Note that requests from our clients
109  * don't expire and are thus NOT in the "requests_by_expiration"
110  * (or the "requests_by_peer" tables).
111  */
112 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
113
114
115 /**
116  * Maximum number of requests (from other peers, overall) that we're
117  * willing to have pending at any given point in time.  Can be changed
118  * via the configuration file (32k is just the default).
119  */
120 static unsigned long long max_pending_requests = (32 * 1024);
121
122
123 /**
124  * How many bytes should a bloomfilter be if we have already seen
125  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
126  * of bits set per entry.  Furthermore, we should not re-size the
127  * filter too often (to keep it cheap).
128  *
129  * Since other peers will also add entries but not resize the filter,
130  * we should generally pick a slightly larger size than what the
131  * strict math would suggest.
132  *
133  * @return must be a power of two and smaller or equal to 2^15.
134  */
135 static size_t
136 compute_bloomfilter_size (unsigned int entry_count)
137 {
138   size_t size;
139   unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
140   uint16_t max = 1 << 15;
141
142   if (entry_count > max)
143     return max;
144   size = 8;
145   while ((size < max) && (size < ideal))
146     size *= 2;
147   if (size > max)
148     return max;
149   return size;
150 }
151
152
153 /**
154  * Recalculate our bloom filter for filtering replies.  This function
155  * will create a new bloom filter from scratch, so it should only be
156  * called if we have no bloomfilter at all (and hence can create a
157  * fresh one of minimal size without problems) OR if our peer is the
158  * initiator (in which case we may resize to larger than mimimum size).
159  *
160  * @param pr request for which the BF is to be recomputed
161  * @return GNUNET_YES if a refresh actually happened
162  */
163 static int
164 refresh_bloomfilter (struct GSF_PendingRequest *pr)
165 {
166   unsigned int i;
167   size_t nsize;
168   GNUNET_HashCode mhash;
169
170   nsize = compute_bloomfilter_size (pr->replies_seen_count);
171   if ( (pr->bf != NULL) &&
172        (nsize == GNUNET_CONTAINER_bloomfilter_get_size (pr->bf)) )
173     return GNUNET_NO; /* size not changed */
174   if (pr->bf != NULL)
175     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
176   pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 
177                                          UINT32_MAX);
178   pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
179                                               nsize,
180                                               BLOOMFILTER_K);
181   for (i=0;i<pr->replies_seen_count;i++)
182     {
183       GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i],
184                                 pr->mingle,
185                                 &mhash);
186       GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
187     }
188   return GNUNET_YES;
189 }
190
191
192 /**
193  * Create a new pending request.  
194  *
195  * @param options request options
196  * @param type type of the block that is being requested
197  * @param query key for the lookup
198  * @param namespace namespace to lookup, NULL for no namespace
199  * @param target preferred target for the request, NULL for none
200  * @param bf_data raw data for bloom filter for known replies, can be NULL
201  * @param bf_size number of bytes in bf_data
202  * @param mingle mingle value for bf
203  * @param anonymity_level desired anonymity level
204  * @param priority maximum outgoing cummulative request priority to use
205  * @param ttl current time-to-live for the request
206  * @param replies_seen hash codes of known local replies
207  * @param replies_seen_count size of the 'replies_seen' array
208  * @param rh handle to call when we get a reply
209  * @param rh_cls closure for rh
210  * @return handle for the new pending request
211  */
212 struct GSF_PendingRequest *
213 GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
214                              enum GNUNET_BLOCK_Type type,
215                              const GNUNET_HashCode *query,
216                              const GNUNET_HashCode *namespace,
217                              const struct GNUNET_PeerIdentity *target,
218                              const char *bf_data,
219                              size_t bf_size,
220                              uint32_t mingle,
221                              uint32_t anonymity_level,
222                              uint32_t priority,
223                              int32_t ttl,
224                              const GNUNET_HashCode *replies_seen,
225                              unsigned int replies_seen_count,
226                              GSF_PendingRequestReplyHandler rh,
227                              void *rh_cls)
228 {
229   struct GSF_PendingRequest *pr;
230   struct GSF_PendingRequest *dpr;
231   
232   pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest));
233   pr->public_data.query = *query;
234   if (GNUNET_BLOCK_TYPE_FS_SBLOCK == type)
235     {
236       GNUNET_assert (NULL != namespace);
237       pr->public_data.namespace = *namespace;
238     }
239   if (NULL != target)
240     {
241       pr->public_data.target = *target;
242       pr->public_data.has_target = GNUNET_YES;
243     }
244   pr->public_data.anonymity_level = anonymity_level;
245   pr->public_data.priority = priority;
246   pr->public_data.original_priority = priority;
247   pr->public_data.options = options;
248   pr->public_data.type = type;  
249   pr->public_data.start_time = GNUNET_TIME_absolute_get ();
250   pr->rh = rh;
251   pr->rh_cls = rh_cls;
252   if (ttl >= 0)
253     pr->public_data.ttl = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
254                                                                                            (uint32_t) ttl));
255   else
256     pr->public_data.ttl = GNUNET_TIME_absolute_subtract (pr->public_data.start_time,
257                                                          GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
258                                                                                         (uint32_t) (- ttl)));
259   if (replies_seen_count > 0)
260     {
261       pr->replies_seen_size = replies_seen_count;
262       pr->replies_seen = GNUNET_malloc (sizeof (GNUNET_HashCode) * pr->replies_seen_size);
263       memcpy (pr->replies_seen,
264               replies_seen,
265               replies_seen_count * sizeof (GNUNET_HashCode));
266       pr->replies_seen_count = replies_seen_count;
267     }
268   if (NULL != bf_data)    
269     {
270       pr->bf = GNUNET_CONTAINER_bloomfilter_init (bf_data,
271                                                   bf_size,
272                                                   BLOOMFILTER_K);
273       pr->mingle = mingle;
274     }
275   else if ( (replies_seen_count > 0) &&
276             (0 != (options & GSF_PRO_BLOOMFILTER_FULL_REFRESH)) )
277     {
278       GNUNET_assert (GNUNET_YES == refresh_bloomfilter (pr));
279     }
280   GNUNET_CONTAINER_multihashmap_put (pr_map,
281                                      query,
282                                      pr,
283                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
284   if (0 != (options & GSF_PRO_REQUEST_EXPIRES))
285     {
286       pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
287                                                 pr,
288                                                 pr->public_data.ttl.abs_value);
289       /* make sure we don't track too many requests */
290       while (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
291         {
292           dpr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
293           GNUNET_assert (dpr != NULL);
294           if (pr == dpr)
295             break; /* let the request live briefly... */
296           dpr->rh (dpr->rh_cls,
297                    dpr,
298                    GNUNET_TIME_UNIT_FOREVER_ABS,
299                    NULL, 0,
300                    GNUNET_SYSERR);
301           GSF_pending_request_cancel_ (dpr);
302         }
303     }
304   return pr;
305 }
306
307
308 /**
309  * Obtain the public data associated with a pending request
310  * 
311  * @param pr pending request
312  * @return associated public data
313  */
314 struct GSF_PendingRequestData *
315 GSF_pending_request_get_data_ (struct GSF_PendingRequest *pr)
316 {
317   return &pr->public_data;
318 }
319
320
321 /**
322  * Update a given pending request with additional replies
323  * that have been seen.
324  *
325  * @param pr request to update
326  * @param replies_seen hash codes of replies that we've seen
327  * @param replies_seen_count size of the replies_seen array
328  */
329 void
330 GSF_pending_request_update_ (struct GSF_PendingRequest *pr,
331                              const GNUNET_HashCode *replies_seen,
332                              unsigned int replies_seen_count)
333 {
334   unsigned int i;
335   GNUNET_HashCode mhash;
336
337   if (replies_seen_count + pr->replies_seen_count < pr->replies_seen_count)
338     return; /* integer overflow */
339   if (0 != (pr->public_data.options & GSF_PRO_BLOOMFILTER_FULL_REFRESH))
340     {
341       /* we're responsible for the BF, full refresh */
342       if (replies_seen_count + pr->replies_seen_count > pr->replies_seen_size)
343         GNUNET_array_grow (pr->replies_seen,
344                            pr->replies_seen_size,
345                            replies_seen_count + pr->replies_seen_count);
346       memcpy (&pr->replies_seen[pr->replies_seen_count],
347               replies_seen,
348               sizeof (GNUNET_HashCode) * replies_seen_count);
349       pr->replies_seen_count += replies_seen_count;
350       if (GNUNET_NO == refresh_bloomfilter (pr))
351         {
352           /* bf not recalculated, simply extend it with new bits */
353           for (i=0;i<pr->replies_seen_count;i++)
354             {
355               GNUNET_BLOCK_mingle_hash (&replies_seen[i],
356                                         pr->mingle,
357                                         &mhash);
358               GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
359             }
360         }
361     }
362   else
363     {
364       if (NULL == pr->bf)
365         {
366           /* we're not the initiator, but the initiator did not give us
367              any bloom-filter, so we need to create one on-the-fly */
368           pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 
369                                                  UINT32_MAX);
370           pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
371                                                       compute_bloomfilter_size (replies_seen_count),
372                                                       BLOOMFILTER_K);
373         }
374       for (i=0;i<pr->replies_seen_count;i++)
375         {
376           GNUNET_BLOCK_mingle_hash (&replies_seen[i],
377                                     pr->mingle,
378                                     &mhash);
379           GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
380         }
381     }
382 }
383
384
385 /**
386  * Generate the message corresponding to the given pending request for
387  * transmission to other peers (or at least determine its size).
388  *
389  * @param pr request to generate the message for
390  * @param do_route are we routing the reply
391  * @param buf_size number of bytes available in buf
392  * @param buf where to copy the message (can be NULL)
393  * @return number of bytes needed (if > buf_size) or used
394  */
395 size_t
396 GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
397                                   int do_route,
398                                   size_t buf_size,
399                                   void *buf)
400 {
401   char lbuf[GNUNET_SERVER_MAX_MESSAGE_SIZE];
402   struct GetMessage *gm;
403   GNUNET_HashCode *ext;
404   size_t msize;
405   unsigned int k;
406   uint32_t bm;
407   uint32_t prio;
408   size_t bf_size;
409   struct GNUNET_TIME_Absolute now;
410   int64_t ttl;
411
412   k = 0;
413   bm = 0;
414   if (GNUNET_YES != do_route)
415     {
416       bm |= GET_MESSAGE_BIT_RETURN_TO;
417       k++;      
418     }
419   if (GNUNET_BLOCK_TYPE_FS_SBLOCK == pr->public_data.type)
420     {
421       bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
422       k++;
423     }
424   if (GNUNET_YES == pr->public_data.has_target)
425     {
426       bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
427       k++;
428     }
429   bf_size = GNUNET_CONTAINER_bloomfilter_get_size (pr->bf);
430   msize = sizeof (struct GetMessage) + bf_size + k * sizeof(GNUNET_HashCode);
431   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
432   if (buf_size < msize)
433     return msize;  
434   gm = (struct GetMessage*) lbuf;
435   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
436   gm->header.size = htons (msize);
437   gm->type = htonl (pr->public_data.type);
438   if (GNUNET_YES == do_route)
439     prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
440                                      pr->public_data.priority + 1);
441   else
442     prio = 0;
443   pr->public_data.priority -= prio;
444   gm->priority = htonl (prio);
445   now = GNUNET_TIME_absolute_get ();
446   ttl = (int64_t) (pr->public_data.ttl.abs_value - now.abs_value);
447   gm->ttl = htonl (ttl / 1000);
448   gm->filter_mutator = htonl(pr->mingle); 
449   gm->hash_bitmap = htonl (bm);
450   gm->query = pr->public_data.query;
451   ext = (GNUNET_HashCode*) &gm[1];
452   k = 0;  
453   if (GNUNET_YES != do_route)
454     GNUNET_PEER_resolve (pr->cp->pid, 
455                          (struct GNUNET_PeerIdentity*) &ext[k++]);
456   if (GNUNET_BLOCK_TYPE_FS_SBLOCK == pr->public_data.type)
457     memcpy (&ext[k++], 
458             &pr->public_data.namespace, 
459             sizeof (GNUNET_HashCode));
460   if (GNUNET_YES == pr->public_data.has_target)
461     GNUNET_PEER_resolve (pr->public_data.target_pid, 
462                          (struct GNUNET_PeerIdentity*) &ext[k++]);
463   if (pr->bf != NULL)
464     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
465                                                (char*) &ext[k],
466                                                bf_size);
467   memcpy (buf, gm, msize);
468   return msize;
469 }
470
471
472 /**
473  * Iterator to free pending requests.
474  *
475  * @param cls closure, unused
476  * @param key current key code
477  * @param value value in the hash map (pending request)
478  * @return GNUNET_YES (we should continue to iterate)
479  */
480 static int 
481 clean_request (void *cls,
482                const GNUNET_HashCode * key,
483                void *value)
484 {
485   struct GSF_PendingRequest *pr = value;
486   
487   GNUNET_free_non_null (pr->replies_seen);
488   if (NULL != pr->bf)
489     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
490   if (NULL != pr->hnode)
491     GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
492                                        pr->hnode);
493   GNUNET_free (pr);
494   return GNUNET_YES;
495 }
496
497
498 /**
499  * Explicitly cancel a pending request.
500  *
501  * @param pr request to cancel
502  */
503 void
504 GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr)
505 {
506   GNUNET_assert (GNUNET_OK ==
507                  GNUNET_CONTAINER_multihashmap_remove (pr_map,
508                                                        &pr->public_data.query,
509                                                        pr));
510   GNUNET_assert (GNUNET_YES ==
511                  clean_request (NULL, &pr->public_data.query, pr));  
512 }
513
514
515 /**
516  * Iterate over all pending requests.
517  *
518  * @param it function to call for each request
519  * @param cls closure for it
520  */
521 void
522 GSF_iterate_pending_pr_map_ (GSF_PendingRequestIterator it,
523                              void *cls)
524 {
525   GNUNET_CONTAINER_multihashmap_iterate (pr_map,
526                                          (GNUNET_CONTAINER_HashMapIterator) it,
527                                          cls);
528 }
529
530
531
532
533 /**
534  * Closure for "process_reply" function.
535  */
536 struct ProcessReplyClosure
537 {
538   /**
539    * The data for the reply.
540    */
541   const void *data;
542
543   /**
544    * Who gave us this reply? NULL for local host (or DHT)
545    */
546   struct GSF_ConnectedPeer *sender;
547
548   /**
549    * When the reply expires.
550    */
551   struct GNUNET_TIME_Absolute expiration;
552
553   /**
554    * Size of data.
555    */
556   size_t size;
557
558   /**
559    * Type of the block.
560    */
561   enum GNUNET_BLOCK_Type type;
562
563   /**
564    * How much was this reply worth to us?
565    */
566   uint32_t priority;
567
568   /**
569    * Anonymity requirements for this reply.
570    */
571   uint32_t anonymity_level;
572
573   /**
574    * Evaluation result (returned).
575    */
576   enum GNUNET_BLOCK_EvaluationResult eval;
577
578   /**
579    * Did we finish processing the associated request?
580    */ 
581   int finished;
582
583   /**
584    * Did we find a matching request?
585    */
586   int request_found;
587 };
588
589
590 /**
591  * Update the performance data for the sender (if any) since
592  * the sender successfully answered one of our queries.
593  *
594  * @param prq information about the sender
595  * @param pr request that was satisfied
596  */
597 static void
598 update_request_performance_data (struct ProcessReplyClosure *prq,
599                                  struct GSF_PendingRequest *pr)
600 {
601   if (prq->sender == NULL)
602     return;      
603   GSF_peer_update_performance_ (prq->sender,
604                                 pr->public_data.start_time,
605                                 prq->priority);
606 }
607                                 
608
609 /**
610  * We have received a reply; handle it!
611  *
612  * @param cls response (struct ProcessReplyClosure)
613  * @param key our query
614  * @param value value in the hash map (info about the query)
615  * @return GNUNET_YES (we should continue to iterate)
616  */
617 static int
618 process_reply (void *cls,
619                const GNUNET_HashCode * key,
620                void *value)
621 {
622   struct ProcessReplyClosure *prq = cls;
623   struct GSF_PendingRequest *pr = value;
624   GNUNET_HashCode chash;
625
626 #if DEBUG_FS
627   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
628               "Matched result (type %u) for query `%s' with pending request\n",
629               (unsigned int) prq->type,
630               GNUNET_h2s (key));
631 #endif  
632   GNUNET_STATISTICS_update (GSF_stats,
633                             gettext_noop ("# replies received and matched"),
634                             1,
635                             GNUNET_NO);
636   prq->eval = GNUNET_BLOCK_evaluate (GSF_block_ctx,
637                                      prq->type,
638                                      key,
639                                      &pr->bf,
640                                      pr->mingle,
641                                      &pr->public_data.namespace, 
642                                      (prq->type == GNUNET_BLOCK_TYPE_FS_SBLOCK) ? sizeof (GNUNET_HashCode) : 0,
643                                      prq->data,
644                                      prq->size);
645   switch (prq->eval)
646     {
647     case GNUNET_BLOCK_EVALUATION_OK_MORE:
648       update_request_performance_data (prq, pr);
649       break;
650     case GNUNET_BLOCK_EVALUATION_OK_LAST:
651       /* short cut: stop processing early, no BF-update, etc. */
652       update_request_performance_data (prq, pr);
653       GNUNET_LOAD_update (GSF_rt_entry_lifetime,
654                           GNUNET_TIME_absolute_get_duration (pr->public_data.start_time).rel_value);
655       /* pass on to other peers / local clients */
656       pr->rh (pr->rh_cls,             
657               pr,
658               prq->expiration,
659               prq->data, prq->size, 
660               GNUNET_NO);
661       /* destroy request, we're done */
662       GSF_pending_request_cancel_ (pr);
663       return GNUNET_YES;
664     case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
665       GNUNET_STATISTICS_update (GSF_stats,
666                                 gettext_noop ("# duplicate replies discarded (bloomfilter)"),
667                                 1,
668                                 GNUNET_NO);
669 #if DEBUG_FS && 0
670       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
671                   "Duplicate response `%s', discarding.\n",
672                   GNUNET_h2s (&mhash));
673 #endif
674       return GNUNET_YES; /* duplicate */
675     case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
676       return GNUNET_YES; /* wrong namespace */  
677     case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
678       GNUNET_break (0);
679       return GNUNET_YES;
680     case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
681       GNUNET_break (0);
682       return GNUNET_YES;
683     case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
684       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
685                   _("Unsupported block type %u\n"),
686                   prq->type);
687       return GNUNET_NO;
688     }
689   /* update bloomfilter */
690   GNUNET_CRYPTO_hash (prq->data,
691                       prq->size,
692                       &chash);
693   GSF_pending_request_update_ (pr, &chash, 1);
694   if (NULL == prq->sender)
695     {
696 #if DEBUG_FS
697       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
698                   "Found result for query `%s' in local datastore\n",
699                   GNUNET_h2s (key));
700 #endif
701       GNUNET_STATISTICS_update (GSF_stats,
702                                 gettext_noop ("# results found locally"),
703                                 1,
704                                 GNUNET_NO);      
705     }
706   prq->priority += pr->public_data.original_priority;
707   pr->public_data.priority = 0;
708   pr->public_data.original_priority = 0;
709   pr->public_data.results_found++;
710   prq->request_found = GNUNET_YES;
711   /* finally, pass on to other peer / local client */
712   pr->rh (pr->rh_cls,
713           pr, 
714           prq->expiration,
715           prq->data, prq->size, 
716           GNUNET_YES);
717   return GNUNET_YES;
718 }
719
720
721 /**
722  * Continuation called to notify client about result of the
723  * operation.
724  *
725  * @param cls closure
726  * @param success GNUNET_SYSERR on failure
727  * @param msg NULL on success, otherwise an error message
728  */
729 static void 
730 put_migration_continuation (void *cls,
731                             int success,
732                             const char *msg)
733 {
734   struct GNUNET_TIME_Absolute *start = cls;
735   struct GNUNET_TIME_Relative delay;
736   
737   delay = GNUNET_TIME_absolute_get_duration (*start);
738   GNUNET_free (start);
739   /* FIXME: should we really update the load value on failure? */
740   GNUNET_LOAD_update (datastore_put_load,
741                       delay.rel_value);
742   if (GNUNET_OK == success)
743     return;
744   GNUNET_STATISTICS_update (GSF_stats,
745                             gettext_noop ("# datastore 'put' failures"),
746                             1,
747                             GNUNET_NO);
748 }
749
750
751 /**
752  * Test if the DATABASE (PUT) load on this peer is too high
753  * to even consider processing the query at
754  * all.  
755  * 
756  * @return GNUNET_YES if the load is too high to do anything (load high)
757  *         GNUNET_NO to process normally (load normal or low)
758  */
759 static int
760 test_put_load_too_high (uint32_t priority)
761 {
762   double ld;
763
764   if (GNUNET_LOAD_get_average (datastore_put_load) < 50)
765     return GNUNET_NO; /* very fast */
766   ld = GNUNET_LOAD_get_load (datastore_put_load);
767   if (ld < 2.0 * (1 + priority))
768     return GNUNET_NO;
769   GNUNET_STATISTICS_update (GSF_stats,
770                             gettext_noop ("# storage requests dropped due to high load"),
771                             1,
772                             GNUNET_NO);
773   return GNUNET_YES;
774 }
775
776
777 /**
778  * Iterator called on each result obtained for a DHT
779  * operation that expects a reply
780  *
781  * @param cls closure
782  * @param exp when will this value expire
783  * @param key key of the result
784  * @param get_path NULL-terminated array of pointers
785  *                 to the peers on reverse GET path (or NULL if not recorded)
786  * @param put_path NULL-terminated array of pointers
787  *                 to the peers on the PUT path (or NULL if not recorded)
788  * @param type type of the result
789  * @param size number of bytes in data
790  * @param data pointer to the result data
791  */
792 void
793 GSF_handle_dht_reply_ (void *cls,
794                        struct GNUNET_TIME_Absolute exp,
795                        const GNUNET_HashCode *key,
796                        const struct GNUNET_PeerIdentity * const *get_path,
797                        const struct GNUNET_PeerIdentity * const *put_path,
798                        enum GNUNET_BLOCK_Type type,
799                        size_t size,
800                        const void *data)
801 {
802   struct GSF_PendingRequest *pr = cls;
803   struct ProcessReplyClosure prq;
804   struct GNUNET_TIME_Absolute *start;
805
806   memset (&prq, 0, sizeof (prq));
807   prq.data = data;
808   prq.expiration = exp;
809   prq.size = size;  
810   prq.type = type;
811   process_reply (&prq, key, pr);
812   if ( (GNUNET_YES == active_to_migration) &&
813        (GNUNET_NO == test_put_load_too_high (prq.priority)) )
814     {      
815 #if DEBUG_FS
816       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
817                   "Replicating result for query `%s' with priority %u\n",
818                   GNUNET_h2s (&query),
819                   prq.priority);
820 #endif
821       start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
822       *start = GNUNET_TIME_absolute_get ();
823       GNUNET_DATASTORE_put (GSF_dsh,
824                             0, key, size, data,
825                             type, prq.priority, 1 /* anonymity */, 
826                             exp, 
827                             1 + prq.priority, MAX_DATASTORE_QUEUE,
828                             GNUNET_CONSTANTS_SERVICE_TIMEOUT,
829                             &put_migration_continuation, 
830                             start);
831     }
832 }
833
834
835 /**
836  * Handle P2P "CONTENT" message.  Checks that the message is
837  * well-formed and then checks if there are any pending requests for
838  * this content and possibly passes it on (to local clients or other
839  * peers).  Does NOT perform migration (content caching at this peer).
840  *
841  * @param cp the other peer involved (sender or receiver, NULL
842  *        for loopback messages where we are both sender and receiver)
843  * @param message the actual message
844  * @return GNUNET_OK if the message was well-formed,
845  *         GNUNET_SYSERR if the message was malformed (close connection,
846  *         do not cache under any circumstances)
847  */
848 int
849 GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
850                          const struct GNUNET_MessageHeader *message)
851 {
852   const struct PutMessage *put;
853   uint16_t msize;
854   size_t dsize;
855   enum GNUNET_BLOCK_Type type;
856   struct GNUNET_TIME_Absolute expiration;
857   GNUNET_HashCode query;
858   struct ProcessReplyClosure prq;
859   struct GNUNET_TIME_Relative block_time;  
860   double putl;
861   struct GNUNET_TIME_Absolute *start;
862
863   msize = ntohs (message->size);
864   if (msize < sizeof (struct PutMessage))
865     {
866       GNUNET_break_op(0);
867       return GNUNET_SYSERR;
868     }
869   put = (const struct PutMessage*) message;
870   dsize = msize - sizeof (struct PutMessage);
871   type = ntohl (put->type);
872   expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
873   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
874     return GNUNET_SYSERR;
875   if (GNUNET_OK !=
876       GNUNET_BLOCK_get_key (GSF_block_ctx,
877                             type,
878                             &put[1],
879                             dsize,
880                             &query))
881     {
882       GNUNET_break_op (0);
883       return GNUNET_SYSERR;
884     }
885   /* now, lookup 'query' */
886   prq.data = (const void*) &put[1];
887   if (NULL != cp)
888     prq.sender = cp;
889   else
890     prq.sender = NULL;
891   prq.size = dsize;
892   prq.type = type;
893   prq.expiration = expiration;
894   prq.priority = 0;
895   prq.anonymity_level = 1;
896   prq.finished = GNUNET_NO;
897   prq.request_found = GNUNET_NO;
898   GNUNET_CONTAINER_multihashmap_get_multiple (pr_map,
899                                               &query,
900                                               &process_reply,
901                                               &prq);
902   if (NULL != cp)
903     {
904       GSF_connected_peer_change_preference_ (cp, CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority);
905       GSF_get_peer_performance_data_ (cp)->trust += prq.priority;
906     }
907   if ( (GNUNET_YES == active_to_migration) &&
908        (GNUNET_NO == test_put_load_too_high (prq.priority)) )
909     {      
910 #if DEBUG_FS
911       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
912                   "Replicating result for query `%s' with priority %u\n",
913                   GNUNET_h2s (&query),
914                   prq.priority);
915 #endif
916       start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
917       *start = GNUNET_TIME_absolute_get ();
918       GNUNET_DATASTORE_put (GSF_dsh,
919                             0, &query, dsize, &put[1],
920                             type, prq.priority, 1 /* anonymity */, 
921                             expiration, 
922                             1 + prq.priority, MAX_DATASTORE_QUEUE,
923                             GNUNET_CONSTANTS_SERVICE_TIMEOUT,
924                             &put_migration_continuation, 
925                             start);
926     }
927   putl = GNUNET_LOAD_get_load (datastore_put_load);
928   if ( (NULL != (cp = prq.sender)) &&
929        (GNUNET_NO == prq.request_found) &&
930        ( (GNUNET_YES != active_to_migration) ||
931          (putl > 2.5 * (1 + prq.priority)) ) ) 
932     {
933       if (GNUNET_YES != active_to_migration) 
934         putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
935       block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
936                                                   5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
937                                                                                    (unsigned int) (60000 * putl * putl)));
938       GSF_block_peer_migration_ (cp, block_time);
939     }
940   return GNUNET_OK;
941 }
942
943
944 /**
945  * Setup the subsystem.
946  *
947  * @param cfg configuration to use
948  */
949 void
950 GSF_pending_request_init_ (struct GNUNET_CONFIGURATION_Handle *cfg)
951 {
952   if (GNUNET_OK !=
953       GNUNET_CONFIGURATION_get_value_number (cfg,
954                                              "fs",
955                                              "MAX_PENDING_REQUESTS",
956                                              &max_pending_requests))
957     {
958       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
959                   _("Configuration fails to specify `%s', assuming default value."),
960                   "MAX_PENDING_REQUESTS");
961     }
962   pr_map = GNUNET_CONTAINER_multihashmap_create (32 * 1024);
963   requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
964 }
965
966
967 /**
968  * Shutdown the subsystem.
969  */
970 void
971 GSF_pending_request_done_ ()
972 {
973   GNUNET_CONTAINER_multihashmap_iterate (pr_map,
974                                          &clean_request,
975                                          NULL);
976   GNUNET_CONTAINER_multihashmap_destroy (pr_map);
977   pr_map = NULL;
978   GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
979   requests_by_expiration_heap = NULL;
980 }
981
982
983 /* end of gnunet-service-fs_pr.c */