hxing
[oweals/gnunet.git] / src / fs / gnunet-service-fs_pr.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_pr.c
23  * @brief API to handle pending requests
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_load_lib.h"
28 #include "gnunet-service-fs_cp.h"
29 #include "gnunet-service-fs_pr.h"
30
31
32 /**
33  * An active request.
34  */
35 struct GSF_PendingRequest
36 {
37   /**
38    * Public data for the request.
39    */ 
40   struct GSF_PendingRequestData public_data;
41
42   /**
43    * Function to call if we encounter a reply.
44    */
45   GSF_PendingRequestReplyHandler rh;
46
47   /**
48    * Closure for 'rh'
49    */
50   void *rh_cls;
51
52   /**
53    * Array of hash codes of replies we've already seen.
54    */
55   GNUNET_HashCode *replies_seen;
56
57   /**
58    * Bloomfilter masking replies we've already seen.
59    */
60   struct GNUNET_CONTAINER_BloomFilter *bf;
61
62   /**
63    * Entry for this pending request in the expiration heap, or NULL.
64    */
65   struct GNUNET_CONTAINER_HeapNode *hnode;
66
67   /**
68    * Identity of the peer that we should use for the 'sender'
69    * (recipient of the response) when forwarding (0 for none).
70    */
71   GNUNET_PEER_Id sender_pid;
72
73   /**
74    * Number of valid entries in the 'replies_seen' array.
75    */
76   unsigned int replies_seen_count;
77
78   /**
79    * Length of the 'replies_seen' array.
80    */
81   unsigned int replies_seen_size;
82
83   /**
84    * Mingle value we currently use for the bf.
85    */
86   uint32_t mingle;
87                             
88 };
89
90
91 /**
92  * All pending requests, ordered by the query.  Entries
93  * are of type 'struct GSF_PendingRequest*'.
94  */
95 static struct GNUNET_CONTAINER_MultiHashMap *pr_map;
96
97
98 /**
99  * Datastore 'PUT' load tracking.
100  */
101 static struct GNUNET_LOAD_Value *datastore_put_load;
102
103
104 /**
105  * Are we allowed to migrate content to this peer.
106  */
107 static int active_to_migration;
108
109
110 /**
111  * Heap with the request that will expire next at the top.  Contains
112  * pointers of type "struct PendingRequest*"; these will *also* be
113  * aliased from the "requests_by_peer" data structures and the
114  * "requests_by_query" table.  Note that requests from our clients
115  * don't expire and are thus NOT in the "requests_by_expiration"
116  * (or the "requests_by_peer" tables).
117  */
118 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
119
120
121 /**
122  * Maximum number of requests (from other peers, overall) that we're
123  * willing to have pending at any given point in time.  Can be changed
124  * via the configuration file (32k is just the default).
125  */
126 static unsigned long long max_pending_requests = (32 * 1024);
127
128
129 /**
130  * How many bytes should a bloomfilter be if we have already seen
131  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
132  * of bits set per entry.  Furthermore, we should not re-size the
133  * filter too often (to keep it cheap).
134  *
135  * Since other peers will also add entries but not resize the filter,
136  * we should generally pick a slightly larger size than what the
137  * strict math would suggest.
138  *
139  * @return must be a power of two and smaller or equal to 2^15.
140  */
141 static size_t
142 compute_bloomfilter_size (unsigned int entry_count)
143 {
144   size_t size;
145   unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
146   uint16_t max = 1 << 15;
147
148   if (entry_count > max)
149     return max;
150   size = 8;
151   while ((size < max) && (size < ideal))
152     size *= 2;
153   if (size > max)
154     return max;
155   return size;
156 }
157
158
159 /**
160  * Recalculate our bloom filter for filtering replies.  This function
161  * will create a new bloom filter from scratch, so it should only be
162  * called if we have no bloomfilter at all (and hence can create a
163  * fresh one of minimal size without problems) OR if our peer is the
164  * initiator (in which case we may resize to larger than mimimum size).
165  *
166  * @param pr request for which the BF is to be recomputed
167  * @return GNUNET_YES if a refresh actually happened
168  */
169 static int
170 refresh_bloomfilter (struct GSF_PendingRequest *pr)
171 {
172   unsigned int i;
173   size_t nsize;
174   GNUNET_HashCode mhash;
175
176   nsize = compute_bloomfilter_size (pr->replies_seen_count);
177   if ( (pr->bf != NULL) &&
178        (nsize == GNUNET_CONTAINER_bloomfilter_get_size (pr->bf)) )
179     return GNUNET_NO; /* size not changed */
180   if (pr->bf != NULL)
181     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
182   pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 
183                                          UINT32_MAX);
184   pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
185                                               nsize,
186                                               BLOOMFILTER_K);
187   for (i=0;i<pr->replies_seen_count;i++)
188     {
189       GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i],
190                                 pr->mingle,
191                                 &mhash);
192       GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
193     }
194   return GNUNET_YES;
195 }
196
197
198 /**
199  * Create a new pending request.  
200  *
201  * @param options request options
202  * @param type type of the block that is being requested
203  * @param query key for the lookup
204  * @param namespace namespace to lookup, NULL for no namespace
205  * @param target preferred target for the request, NULL for none
206  * @param bf_data raw data for bloom filter for known replies, can be NULL
207  * @param bf_size number of bytes in bf_data
208  * @param mingle mingle value for bf
209  * @param anonymity_level desired anonymity level
210  * @param priority maximum outgoing cummulative request priority to use
211  * @param ttl current time-to-live for the request
212  * @param sender_pid peer ID to use for the sender when forwarding, 0 for none
213  * @param replies_seen hash codes of known local replies
214  * @param replies_seen_count size of the 'replies_seen' array
215  * @param rh handle to call when we get a reply
216  * @param rh_cls closure for rh
217  * @return handle for the new pending request
218  */
219 struct GSF_PendingRequest *
220 GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
221                              enum GNUNET_BLOCK_Type type,
222                              const GNUNET_HashCode *query,
223                              const GNUNET_HashCode *namespace,
224                              const struct GNUNET_PeerIdentity *target,
225                              const char *bf_data,
226                              size_t bf_size,
227                              uint32_t mingle,
228                              uint32_t anonymity_level,
229                              uint32_t priority,
230                              int32_t ttl,
231                              GNUNET_PEER_Id sender_pid,
232                              const GNUNET_HashCode *replies_seen,
233                              unsigned int replies_seen_count,
234                              GSF_PendingRequestReplyHandler rh,
235                              void *rh_cls)
236 {
237   struct GSF_PendingRequest *pr;
238   struct GSF_PendingRequest *dpr;
239   
240   pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest));
241   pr->public_data.query = *query;
242   if (GNUNET_BLOCK_TYPE_FS_SBLOCK == type)
243     {
244       GNUNET_assert (NULL != namespace);
245       pr->public_data.namespace = *namespace;
246     }
247   if (NULL != target)
248     {
249       pr->public_data.target = *target;
250       pr->public_data.has_target = GNUNET_YES;
251     }
252   pr->public_data.anonymity_level = anonymity_level;
253   pr->public_data.priority = priority;
254   pr->public_data.original_priority = priority;
255   pr->public_data.options = options;
256   pr->public_data.type = type;  
257   pr->public_data.start_time = GNUNET_TIME_absolute_get ();
258   pr->sender_pid = sender_pid;
259   pr->rh = rh;
260   pr->rh_cls = rh_cls;
261   if (ttl >= 0)
262     pr->public_data.ttl = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
263                                                                                            (uint32_t) ttl));
264   else
265     pr->public_data.ttl = GNUNET_TIME_absolute_subtract (pr->public_data.start_time,
266                                                          GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
267                                                                                         (uint32_t) (- ttl)));
268   if (replies_seen_count > 0)
269     {
270       pr->replies_seen_size = replies_seen_count;
271       pr->replies_seen = GNUNET_malloc (sizeof (GNUNET_HashCode) * pr->replies_seen_size);
272       memcpy (pr->replies_seen,
273               replies_seen,
274               replies_seen_count * sizeof (GNUNET_HashCode));
275       pr->replies_seen_count = replies_seen_count;
276     }
277   if (NULL != bf_data)    
278     {
279       pr->bf = GNUNET_CONTAINER_bloomfilter_init (bf_data,
280                                                   bf_size,
281                                                   BLOOMFILTER_K);
282       pr->mingle = mingle;
283     }
284   else if ( (replies_seen_count > 0) &&
285             (0 != (options & GSF_PRO_BLOOMFILTER_FULL_REFRESH)) )
286     {
287       GNUNET_assert (GNUNET_YES == refresh_bloomfilter (pr));
288     }
289   GNUNET_CONTAINER_multihashmap_put (pr_map,
290                                      query,
291                                      pr,
292                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
293   if (0 != (options & GSF_PRO_REQUEST_EXPIRES))
294     {
295       pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
296                                                 pr,
297                                                 pr->public_data.ttl.abs_value);
298       /* make sure we don't track too many requests */
299       while (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
300         {
301           dpr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
302           GNUNET_assert (dpr != NULL);
303           if (pr == dpr)
304             break; /* let the request live briefly... */
305           dpr->rh (dpr->rh_cls,
306                    dpr,
307                    GNUNET_TIME_UNIT_FOREVER_ABS,
308                    NULL, 0,
309                    GNUNET_SYSERR);
310           GSF_pending_request_cancel_ (dpr);
311         }
312     }
313   return pr;
314 }
315
316
317 /**
318  * Obtain the public data associated with a pending request
319  * 
320  * @param pr pending request
321  * @return associated public data
322  */
323 struct GSF_PendingRequestData *
324 GSF_pending_request_get_data_ (struct GSF_PendingRequest *pr)
325 {
326   return &pr->public_data;
327 }
328
329
330 /**
331  * Update a given pending request with additional replies
332  * that have been seen.
333  *
334  * @param pr request to update
335  * @param replies_seen hash codes of replies that we've seen
336  * @param replies_seen_count size of the replies_seen array
337  */
338 void
339 GSF_pending_request_update_ (struct GSF_PendingRequest *pr,
340                              const GNUNET_HashCode *replies_seen,
341                              unsigned int replies_seen_count)
342 {
343   unsigned int i;
344   GNUNET_HashCode mhash;
345
346   if (replies_seen_count + pr->replies_seen_count < pr->replies_seen_count)
347     return; /* integer overflow */
348   if (0 != (pr->public_data.options & GSF_PRO_BLOOMFILTER_FULL_REFRESH))
349     {
350       /* we're responsible for the BF, full refresh */
351       if (replies_seen_count + pr->replies_seen_count > pr->replies_seen_size)
352         GNUNET_array_grow (pr->replies_seen,
353                            pr->replies_seen_size,
354                            replies_seen_count + pr->replies_seen_count);
355       memcpy (&pr->replies_seen[pr->replies_seen_count],
356               replies_seen,
357               sizeof (GNUNET_HashCode) * replies_seen_count);
358       pr->replies_seen_count += replies_seen_count;
359       if (GNUNET_NO == refresh_bloomfilter (pr))
360         {
361           /* bf not recalculated, simply extend it with new bits */
362           for (i=0;i<pr->replies_seen_count;i++)
363             {
364               GNUNET_BLOCK_mingle_hash (&replies_seen[i],
365                                         pr->mingle,
366                                         &mhash);
367               GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
368             }
369         }
370     }
371   else
372     {
373       if (NULL == pr->bf)
374         {
375           /* we're not the initiator, but the initiator did not give us
376              any bloom-filter, so we need to create one on-the-fly */
377           pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 
378                                                  UINT32_MAX);
379           pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
380                                                       compute_bloomfilter_size (replies_seen_count),
381                                                       BLOOMFILTER_K);
382         }
383       for (i=0;i<pr->replies_seen_count;i++)
384         {
385           GNUNET_BLOCK_mingle_hash (&replies_seen[i],
386                                     pr->mingle,
387                                     &mhash);
388           GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
389         }
390     }
391 }
392
393
394 /**
395  * Generate the message corresponding to the given pending request for
396  * transmission to other peers (or at least determine its size).
397  *
398  * @param pr request to generate the message for
399  * @param do_route are we routing the reply
400  * @param buf_size number of bytes available in buf
401  * @param buf where to copy the message (can be NULL)
402  * @return number of bytes needed (if > buf_size) or used
403  */
404 size_t
405 GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
406                                   int do_route,
407                                   size_t buf_size,
408                                   void *buf)
409 {
410   char lbuf[GNUNET_SERVER_MAX_MESSAGE_SIZE];
411   struct GetMessage *gm;
412   GNUNET_HashCode *ext;
413   size_t msize;
414   unsigned int k;
415   uint32_t bm;
416   uint32_t prio;
417   size_t bf_size;
418   struct GNUNET_TIME_Absolute now;
419   int64_t ttl;
420
421   k = 0;
422   bm = 0;
423   if (GNUNET_YES != do_route)
424     {
425       bm |= GET_MESSAGE_BIT_RETURN_TO;
426       k++;      
427     }
428   if (GNUNET_BLOCK_TYPE_FS_SBLOCK == pr->public_data.type)
429     {
430       bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
431       k++;
432     }
433   if (GNUNET_YES == pr->public_data.has_target)
434     {
435       bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
436       k++;
437     }
438   bf_size = GNUNET_CONTAINER_bloomfilter_get_size (pr->bf);
439   msize = sizeof (struct GetMessage) + bf_size + k * sizeof(GNUNET_HashCode);
440   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
441   if (buf_size < msize)
442     return msize;  
443   gm = (struct GetMessage*) lbuf;
444   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
445   gm->header.size = htons (msize);
446   gm->type = htonl (pr->public_data.type);
447   if (GNUNET_YES == do_route)
448     prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
449                                      pr->public_data.priority + 1);
450   else
451     prio = 0;
452   pr->public_data.priority -= prio;
453   gm->priority = htonl (prio);
454   now = GNUNET_TIME_absolute_get ();
455   ttl = (int64_t) (pr->public_data.ttl.abs_value - now.abs_value);
456   gm->ttl = htonl (ttl / 1000);
457   gm->filter_mutator = htonl(pr->mingle); 
458   gm->hash_bitmap = htonl (bm);
459   gm->query = pr->public_data.query;
460   ext = (GNUNET_HashCode*) &gm[1];
461   k = 0;  
462   if (GNUNET_YES != do_route)
463     GNUNET_PEER_resolve (pr->sender_pid, 
464                          (struct GNUNET_PeerIdentity*) &ext[k++]);
465   if (GNUNET_BLOCK_TYPE_FS_SBLOCK == pr->public_data.type)
466     memcpy (&ext[k++], 
467             &pr->public_data.namespace, 
468             sizeof (GNUNET_HashCode));
469   if (GNUNET_YES == pr->public_data.has_target)
470     GNUNET_PEER_resolve (pr->sender_pid, 
471                          (struct GNUNET_PeerIdentity*) &ext[k++]);
472   if (pr->bf != NULL)
473     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
474                                                (char*) &ext[k],
475                                                bf_size);
476   memcpy (buf, gm, msize);
477   return msize;
478 }
479
480
481 /**
482  * Iterator to free pending requests.
483  *
484  * @param cls closure, unused
485  * @param key current key code
486  * @param value value in the hash map (pending request)
487  * @return GNUNET_YES (we should continue to iterate)
488  */
489 static int 
490 clean_request (void *cls,
491                const GNUNET_HashCode * key,
492                void *value)
493 {
494   struct GSF_PendingRequest *pr = value;
495   
496   GNUNET_free_non_null (pr->replies_seen);
497   if (NULL != pr->bf)
498     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
499   GNUNET_PEER_change_rc (pr->sender_pid, -1);
500   if (NULL != pr->hnode)
501     GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
502                                        pr->hnode);
503   GNUNET_free (pr);
504   return GNUNET_YES;
505 }
506
507
508 /**
509  * Explicitly cancel a pending request.
510  *
511  * @param pr request to cancel
512  */
513 void
514 GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr)
515 {
516   GNUNET_assert (GNUNET_OK ==
517                  GNUNET_CONTAINER_multihashmap_remove (pr_map,
518                                                        &pr->public_data.query,
519                                                        pr));
520   GNUNET_assert (GNUNET_YES ==
521                  clean_request (NULL, &pr->public_data.query, pr));  
522 }
523
524
525 /**
526  * Iterate over all pending requests.
527  *
528  * @param it function to call for each request
529  * @param cls closure for it
530  */
531 void
532 GSF_iterate_pending_requests_ (GSF_PendingRequestIterator it,
533                                void *cls)
534 {
535   GNUNET_CONTAINER_multihashmap_iterate (pr_map,
536                                          (GNUNET_CONTAINER_HashMapIterator) it,
537                                          cls);
538 }
539
540
541
542
543 /**
544  * Closure for "process_reply" function.
545  */
546 struct ProcessReplyClosure
547 {
548   /**
549    * The data for the reply.
550    */
551   const void *data;
552
553   /**
554    * Who gave us this reply? NULL for local host (or DHT)
555    */
556   struct GSF_ConnectedPeer *sender;
557
558   /**
559    * When the reply expires.
560    */
561   struct GNUNET_TIME_Absolute expiration;
562
563   /**
564    * Size of data.
565    */
566   size_t size;
567
568   /**
569    * Type of the block.
570    */
571   enum GNUNET_BLOCK_Type type;
572
573   /**
574    * How much was this reply worth to us?
575    */
576   uint32_t priority;
577
578   /**
579    * Anonymity requirements for this reply.
580    */
581   uint32_t anonymity_level;
582
583   /**
584    * Evaluation result (returned).
585    */
586   enum GNUNET_BLOCK_EvaluationResult eval;
587
588   /**
589    * Did we finish processing the associated request?
590    */ 
591   int finished;
592
593   /**
594    * Did we find a matching request?
595    */
596   int request_found;
597 };
598
599
600 /**
601  * Update the performance data for the sender (if any) since
602  * the sender successfully answered one of our queries.
603  *
604  * @param prq information about the sender
605  * @param pr request that was satisfied
606  */
607 static void
608 update_request_performance_data (struct ProcessReplyClosure *prq,
609                                  struct GSF_PendingRequest *pr)
610 {
611   if (prq->sender == NULL)
612     return;      
613   GSF_peer_update_performance_ (prq->sender,
614                                 pr->public_data.start_time,
615                                 prq->priority);
616 }
617                                 
618
619 /**
620  * We have received a reply; handle it!
621  *
622  * @param cls response (struct ProcessReplyClosure)
623  * @param key our query
624  * @param value value in the hash map (info about the query)
625  * @return GNUNET_YES (we should continue to iterate)
626  */
627 static int
628 process_reply (void *cls,
629                const GNUNET_HashCode * key,
630                void *value)
631 {
632   struct ProcessReplyClosure *prq = cls;
633   struct GSF_PendingRequest *pr = value;
634   GNUNET_HashCode chash;
635
636 #if DEBUG_FS
637   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
638               "Matched result (type %u) for query `%s' with pending request\n",
639               (unsigned int) prq->type,
640               GNUNET_h2s (key));
641 #endif  
642   GNUNET_STATISTICS_update (GSF_stats,
643                             gettext_noop ("# replies received and matched"),
644                             1,
645                             GNUNET_NO);
646   prq->eval = GNUNET_BLOCK_evaluate (GSF_block_ctx,
647                                      prq->type,
648                                      key,
649                                      &pr->bf,
650                                      pr->mingle,
651                                      &pr->public_data.namespace, 
652                                      (prq->type == GNUNET_BLOCK_TYPE_FS_SBLOCK) ? sizeof (GNUNET_HashCode) : 0,
653                                      prq->data,
654                                      prq->size);
655   switch (prq->eval)
656     {
657     case GNUNET_BLOCK_EVALUATION_OK_MORE:
658       update_request_performance_data (prq, pr);
659       break;
660     case GNUNET_BLOCK_EVALUATION_OK_LAST:
661       /* short cut: stop processing early, no BF-update, etc. */
662       update_request_performance_data (prq, pr);
663       GNUNET_LOAD_update (GSF_rt_entry_lifetime,
664                           GNUNET_TIME_absolute_get_duration (pr->public_data.start_time).rel_value);
665       /* pass on to other peers / local clients */
666       pr->rh (pr->rh_cls,             
667               pr,
668               prq->expiration,
669               prq->data, prq->size, 
670               GNUNET_NO);
671       /* destroy request, we're done */
672       GSF_pending_request_cancel_ (pr);
673       return GNUNET_YES;
674     case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
675       GNUNET_STATISTICS_update (GSF_stats,
676                                 gettext_noop ("# duplicate replies discarded (bloomfilter)"),
677                                 1,
678                                 GNUNET_NO);
679 #if DEBUG_FS && 0
680       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
681                   "Duplicate response `%s', discarding.\n",
682                   GNUNET_h2s (&mhash));
683 #endif
684       return GNUNET_YES; /* duplicate */
685     case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
686       return GNUNET_YES; /* wrong namespace */  
687     case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
688       GNUNET_break (0);
689       return GNUNET_YES;
690     case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
691       GNUNET_break (0);
692       return GNUNET_YES;
693     case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
694       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
695                   _("Unsupported block type %u\n"),
696                   prq->type);
697       return GNUNET_NO;
698     }
699   /* update bloomfilter */
700   GNUNET_CRYPTO_hash (prq->data,
701                       prq->size,
702                       &chash);
703   GSF_pending_request_update_ (pr, &chash, 1);
704   if (NULL == prq->sender)
705     {
706 #if DEBUG_FS
707       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
708                   "Found result for query `%s' in local datastore\n",
709                   GNUNET_h2s (key));
710 #endif
711       GNUNET_STATISTICS_update (GSF_stats,
712                                 gettext_noop ("# results found locally"),
713                                 1,
714                                 GNUNET_NO);      
715     }
716   prq->priority += pr->public_data.original_priority;
717   pr->public_data.priority = 0;
718   pr->public_data.original_priority = 0;
719   pr->public_data.results_found++;
720   prq->request_found = GNUNET_YES;
721   /* finally, pass on to other peer / local client */
722   pr->rh (pr->rh_cls,
723           pr, 
724           prq->expiration,
725           prq->data, prq->size, 
726           GNUNET_YES);
727   return GNUNET_YES;
728 }
729
730
731 /**
732  * Continuation called to notify client about result of the
733  * operation.
734  *
735  * @param cls closure
736  * @param success GNUNET_SYSERR on failure
737  * @param msg NULL on success, otherwise an error message
738  */
739 static void 
740 put_migration_continuation (void *cls,
741                             int success,
742                             const char *msg)
743 {
744   struct GNUNET_TIME_Absolute *start = cls;
745   struct GNUNET_TIME_Relative delay;
746   
747   delay = GNUNET_TIME_absolute_get_duration (*start);
748   GNUNET_free (start);
749   /* FIXME: should we really update the load value on failure? */
750   GNUNET_LOAD_update (datastore_put_load,
751                       delay.rel_value);
752   if (GNUNET_OK == success)
753     return;
754   GNUNET_STATISTICS_update (GSF_stats,
755                             gettext_noop ("# datastore 'put' failures"),
756                             1,
757                             GNUNET_NO);
758 }
759
760
761 /**
762  * Test if the DATABASE (PUT) load on this peer is too high
763  * to even consider processing the query at
764  * all.  
765  * 
766  * @return GNUNET_YES if the load is too high to do anything (load high)
767  *         GNUNET_NO to process normally (load normal or low)
768  */
769 static int
770 test_put_load_too_high (uint32_t priority)
771 {
772   double ld;
773
774   if (GNUNET_LOAD_get_average (datastore_put_load) < 50)
775     return GNUNET_NO; /* very fast */
776   ld = GNUNET_LOAD_get_load (datastore_put_load);
777   if (ld < 2.0 * (1 + priority))
778     return GNUNET_NO;
779   GNUNET_STATISTICS_update (GSF_stats,
780                             gettext_noop ("# storage requests dropped due to high load"),
781                             1,
782                             GNUNET_NO);
783   return GNUNET_YES;
784 }
785
786
787 /**
788  * Iterator called on each result obtained for a DHT
789  * operation that expects a reply
790  *
791  * @param cls closure
792  * @param exp when will this value expire
793  * @param key key of the result
794  * @param get_path NULL-terminated array of pointers
795  *                 to the peers on reverse GET path (or NULL if not recorded)
796  * @param put_path NULL-terminated array of pointers
797  *                 to the peers on the PUT path (or NULL if not recorded)
798  * @param type type of the result
799  * @param size number of bytes in data
800  * @param data pointer to the result data
801  */
802 void
803 GSF_handle_dht_reply_ (void *cls,
804                        struct GNUNET_TIME_Absolute exp,
805                        const GNUNET_HashCode *key,
806                        const struct GNUNET_PeerIdentity * const *get_path,
807                        const struct GNUNET_PeerIdentity * const *put_path,
808                        enum GNUNET_BLOCK_Type type,
809                        size_t size,
810                        const void *data)
811 {
812   struct GSF_PendingRequest *pr = cls;
813   struct ProcessReplyClosure prq;
814   struct GNUNET_TIME_Absolute *start;
815
816   memset (&prq, 0, sizeof (prq));
817   prq.data = data;
818   prq.expiration = exp;
819   prq.size = size;  
820   prq.type = type;
821   process_reply (&prq, key, pr);
822   if ( (GNUNET_YES == active_to_migration) &&
823        (GNUNET_NO == test_put_load_too_high (prq.priority)) )
824     {      
825 #if DEBUG_FS
826       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
827                   "Replicating result for query `%s' with priority %u\n",
828                   GNUNET_h2s (&query),
829                   prq.priority);
830 #endif
831       start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
832       *start = GNUNET_TIME_absolute_get ();
833       GNUNET_DATASTORE_put (GSF_dsh,
834                             0, key, size, data,
835                             type, prq.priority, 1 /* anonymity */, 
836                             exp, 
837                             1 + prq.priority, MAX_DATASTORE_QUEUE,
838                             GNUNET_CONSTANTS_SERVICE_TIMEOUT,
839                             &put_migration_continuation, 
840                             start);
841     }
842 }
843
844
845 /**
846  * Handle P2P "CONTENT" message.  Checks that the message is
847  * well-formed and then checks if there are any pending requests for
848  * this content and possibly passes it on (to local clients or other
849  * peers).  Does NOT perform migration (content caching at this peer).
850  *
851  * @param cp the other peer involved (sender or receiver, NULL
852  *        for loopback messages where we are both sender and receiver)
853  * @param message the actual message
854  * @return GNUNET_OK if the message was well-formed,
855  *         GNUNET_SYSERR if the message was malformed (close connection,
856  *         do not cache under any circumstances)
857  */
858 int
859 GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
860                          const struct GNUNET_MessageHeader *message)
861 {
862   const struct PutMessage *put;
863   uint16_t msize;
864   size_t dsize;
865   enum GNUNET_BLOCK_Type type;
866   struct GNUNET_TIME_Absolute expiration;
867   GNUNET_HashCode query;
868   struct ProcessReplyClosure prq;
869   struct GNUNET_TIME_Relative block_time;  
870   double putl;
871   struct GNUNET_TIME_Absolute *start;
872
873   msize = ntohs (message->size);
874   if (msize < sizeof (struct PutMessage))
875     {
876       GNUNET_break_op(0);
877       return GNUNET_SYSERR;
878     }
879   put = (const struct PutMessage*) message;
880   dsize = msize - sizeof (struct PutMessage);
881   type = ntohl (put->type);
882   expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
883   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
884     return GNUNET_SYSERR;
885   if (GNUNET_OK !=
886       GNUNET_BLOCK_get_key (GSF_block_ctx,
887                             type,
888                             &put[1],
889                             dsize,
890                             &query))
891     {
892       GNUNET_break_op (0);
893       return GNUNET_SYSERR;
894     }
895   /* now, lookup 'query' */
896   prq.data = (const void*) &put[1];
897   if (NULL != cp)
898     prq.sender = cp;
899   else
900     prq.sender = NULL;
901   prq.size = dsize;
902   prq.type = type;
903   prq.expiration = expiration;
904   prq.priority = 0;
905   prq.anonymity_level = 1;
906   prq.finished = GNUNET_NO;
907   prq.request_found = GNUNET_NO;
908   GNUNET_CONTAINER_multihashmap_get_multiple (pr_map,
909                                               &query,
910                                               &process_reply,
911                                               &prq);
912   if (NULL != cp)
913     {
914       GSF_connected_peer_change_preference_ (cp, CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority);
915       GSF_get_peer_performance_data_ (cp)->trust += prq.priority;
916     }
917   if ( (GNUNET_YES == active_to_migration) &&
918        (GNUNET_NO == test_put_load_too_high (prq.priority)) )
919     {      
920 #if DEBUG_FS
921       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
922                   "Replicating result for query `%s' with priority %u\n",
923                   GNUNET_h2s (&query),
924                   prq.priority);
925 #endif
926       start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
927       *start = GNUNET_TIME_absolute_get ();
928       GNUNET_DATASTORE_put (GSF_dsh,
929                             0, &query, dsize, &put[1],
930                             type, prq.priority, 1 /* anonymity */, 
931                             expiration, 
932                             1 + prq.priority, MAX_DATASTORE_QUEUE,
933                             GNUNET_CONSTANTS_SERVICE_TIMEOUT,
934                             &put_migration_continuation, 
935                             start);
936     }
937   putl = GNUNET_LOAD_get_load (datastore_put_load);
938   if ( (NULL != (cp = prq.sender)) &&
939        (GNUNET_NO == prq.request_found) &&
940        ( (GNUNET_YES != active_to_migration) ||
941          (putl > 2.5 * (1 + prq.priority)) ) ) 
942     {
943       if (GNUNET_YES != active_to_migration) 
944         putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
945       block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
946                                                   5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
947                                                                                    (unsigned int) (60000 * putl * putl)));
948       GSF_block_peer_migration_ (cp, block_time);
949     }
950   return GNUNET_OK;
951 }
952
953
954 /**
955  * Setup the subsystem.
956  */
957 void
958 GSF_pending_request_init_ ()
959 {
960   if (GNUNET_OK !=
961       GNUNET_CONFIGURATION_get_value_number (GSF_cfg,
962                                              "fs",
963                                              "MAX_PENDING_REQUESTS",
964                                              &max_pending_requests))
965     {
966       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
967                   _("Configuration fails to specify `%s', assuming default value."),
968                   "MAX_PENDING_REQUESTS");
969     }
970   pr_map = GNUNET_CONTAINER_multihashmap_create (32 * 1024);
971   requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
972 }
973
974
975 /**
976  * Shutdown the subsystem.
977  */
978 void
979 GSF_pending_request_done_ ()
980 {
981   GNUNET_CONTAINER_multihashmap_iterate (pr_map,
982                                          &clean_request,
983                                          NULL);
984   GNUNET_CONTAINER_multihashmap_destroy (pr_map);
985   pr_map = NULL;
986   GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
987   requests_by_expiration_heap = NULL;
988 }
989
990
991 /* end of gnunet-service-fs_pr.c */