nicer debugs
[oweals/gnunet.git] / src / fs / gnunet-service-fs_pr.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_pr.c
23  * @brief API to handle pending requests
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_load_lib.h"
28 #include "gnunet-service-fs.h"
29 #include "gnunet-service-fs_cp.h"
30 #include "gnunet-service-fs_indexing.h"
31 #include "gnunet-service-fs_pe.h"
32 #include "gnunet-service-fs_pr.h"
33
34
35 /**
36  * An active request.
37  */
38 struct GSF_PendingRequest
39 {
40   /**
41    * Public data for the request.
42    */ 
43   struct GSF_PendingRequestData public_data;
44
45   /**
46    * Function to call if we encounter a reply.
47    */
48   GSF_PendingRequestReplyHandler rh;
49
50   /**
51    * Closure for 'rh'
52    */
53   void *rh_cls;
54
55   /**
56    * Array of hash codes of replies we've already seen.
57    */
58   GNUNET_HashCode *replies_seen;
59
60   /**
61    * Bloomfilter masking replies we've already seen.
62    */
63   struct GNUNET_CONTAINER_BloomFilter *bf;
64
65   /**
66    * Entry for this pending request in the expiration heap, or NULL.
67    */
68   struct GNUNET_CONTAINER_HeapNode *hnode;
69
70   /**
71    * Datastore queue entry for this request (or NULL for none).
72    */
73   struct GNUNET_DATASTORE_QueueEntry *qe;
74
75   /**
76    * DHT request handle for this request (or NULL for none).
77    */
78   struct GNUNET_DHT_GetHandle *gh;
79
80   /**
81    * Function to call upon completion of the local get
82    * request, or NULL for none.
83    */
84   GSF_LocalLookupContinuation llc_cont;
85
86   /**
87    * Closure for llc_cont.
88    */
89   void *llc_cont_cls;
90
91   /**
92    * Last result from the local datastore lookup evaluation.
93    */
94   enum GNUNET_BLOCK_EvaluationResult local_result;
95
96   /**
97    * Identity of the peer that we should use for the 'sender'
98    * (recipient of the response) when forwarding (0 for none).
99    */
100   GNUNET_PEER_Id sender_pid;
101
102   /**
103    * Current offset for querying our local datastore for results.
104    * Starts at a random value, incremented until we get the same
105    * UID again (detected using 'first_uid'), which is then used
106    * to termiante the iteration.
107    */
108   uint64_t local_result_offset;
109
110   /**
111    * Unique ID of the first result from the local datastore;
112    * used to detect wrap-around of the offset.
113    */
114   uint64_t first_uid;
115
116   /**
117    * Number of valid entries in the 'replies_seen' array.
118    */
119   unsigned int replies_seen_count;
120
121   /**
122    * Length of the 'replies_seen' array.
123    */
124   unsigned int replies_seen_size;
125
126   /**
127    * Mingle value we currently use for the bf.
128    */
129   uint32_t mingle;
130
131 };
132
133
134 /**
135  * All pending requests, ordered by the query.  Entries
136  * are of type 'struct GSF_PendingRequest*'.
137  */
138 static struct GNUNET_CONTAINER_MultiHashMap *pr_map;
139
140
141 /**
142  * Datastore 'PUT' load tracking.
143  */
144 static struct GNUNET_LOAD_Value *datastore_put_load;
145
146
147 /**
148  * Are we allowed to migrate content to this peer.
149  */
150 static int active_to_migration;
151
152
153 /**
154  * Heap with the request that will expire next at the top.  Contains
155  * pointers of type "struct PendingRequest*"; these will *also* be
156  * aliased from the "requests_by_peer" data structures and the
157  * "requests_by_query" table.  Note that requests from our clients
158  * don't expire and are thus NOT in the "requests_by_expiration"
159  * (or the "requests_by_peer" tables).
160  */
161 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
162
163
164 /**
165  * Maximum number of requests (from other peers, overall) that we're
166  * willing to have pending at any given point in time.  Can be changed
167  * via the configuration file (32k is just the default).
168  */
169 static unsigned long long max_pending_requests = (32 * 1024);
170
171
172 /**
173  * How many bytes should a bloomfilter be if we have already seen
174  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
175  * of bits set per entry.  Furthermore, we should not re-size the
176  * filter too often (to keep it cheap).
177  *
178  * Since other peers will also add entries but not resize the filter,
179  * we should generally pick a slightly larger size than what the
180  * strict math would suggest.
181  *
182  * @return must be a power of two and smaller or equal to 2^15.
183  */
184 static size_t
185 compute_bloomfilter_size (unsigned int entry_count)
186 {
187   size_t size;
188   unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
189   uint16_t max = 1 << 15;
190
191   if (entry_count > max)
192     return max;
193   size = 8;
194   while ((size < max) && (size < ideal))
195     size *= 2;
196   if (size > max)
197     return max;
198   return size;
199 }
200
201
202 /**
203  * Recalculate our bloom filter for filtering replies.  This function
204  * will create a new bloom filter from scratch, so it should only be
205  * called if we have no bloomfilter at all (and hence can create a
206  * fresh one of minimal size without problems) OR if our peer is the
207  * initiator (in which case we may resize to larger than mimimum size).
208  *
209  * @param pr request for which the BF is to be recomputed
210  * @return GNUNET_YES if a refresh actually happened
211  */
212 static int
213 refresh_bloomfilter (struct GSF_PendingRequest *pr)
214 {
215   unsigned int i;
216   size_t nsize;
217   GNUNET_HashCode mhash;
218
219   nsize = compute_bloomfilter_size (pr->replies_seen_count);
220   if ( (pr->bf != NULL) &&
221        (nsize == GNUNET_CONTAINER_bloomfilter_get_size (pr->bf)) )
222     return GNUNET_NO; /* size not changed */
223   if (pr->bf != NULL)
224     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
225   pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 
226                                          UINT32_MAX);
227   pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
228                                               nsize,
229                                               BLOOMFILTER_K);
230   for (i=0;i<pr->replies_seen_count;i++)
231     {
232       GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i],
233                                 pr->mingle,
234                                 &mhash);
235       GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
236     }
237   return GNUNET_YES;
238 }
239
240
241 /**
242  * Create a new pending request.  
243  *
244  * @param options request options
245  * @param type type of the block that is being requested
246  * @param query key for the lookup
247  * @param namespace namespace to lookup, NULL for no namespace
248  * @param target preferred target for the request, NULL for none
249  * @param bf_data raw data for bloom filter for known replies, can be NULL
250  * @param bf_size number of bytes in bf_data
251  * @param mingle mingle value for bf
252  * @param anonymity_level desired anonymity level
253  * @param priority maximum outgoing cummulative request priority to use
254  * @param ttl current time-to-live for the request
255  * @param sender_pid peer ID to use for the sender when forwarding, 0 for none
256  * @param replies_seen hash codes of known local replies
257  * @param replies_seen_count size of the 'replies_seen' array
258  * @param rh handle to call when we get a reply
259  * @param rh_cls closure for rh
260  * @return handle for the new pending request
261  */
262 struct GSF_PendingRequest *
263 GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
264                              enum GNUNET_BLOCK_Type type,
265                              const GNUNET_HashCode *query,
266                              const GNUNET_HashCode *namespace,
267                              const struct GNUNET_PeerIdentity *target,
268                              const char *bf_data,
269                              size_t bf_size,
270                              uint32_t mingle,
271                              uint32_t anonymity_level,
272                              uint32_t priority,
273                              int32_t ttl,
274                              GNUNET_PEER_Id sender_pid,
275                              const GNUNET_HashCode *replies_seen,
276                              unsigned int replies_seen_count,
277                              GSF_PendingRequestReplyHandler rh,
278                              void *rh_cls)
279 {
280   struct GSF_PendingRequest *pr;
281   struct GSF_PendingRequest *dpr;
282   
283 #if DEBUG_FS > 1
284   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
285               "Creating request handle for `%s' of type %d\n",
286               GNUNET_h2s (query),
287               type);
288 #endif 
289   pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest));
290   pr->local_result_offset = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
291                                                       UINT64_MAX);                                                       
292   pr->public_data.query = *query;
293   if (GNUNET_BLOCK_TYPE_FS_SBLOCK == type)
294     {
295       GNUNET_assert (NULL != namespace);
296       pr->public_data.namespace = *namespace;
297     }
298   if (NULL != target)
299     {
300       pr->public_data.target = *target;
301       pr->public_data.has_target = GNUNET_YES;
302     }
303   pr->public_data.anonymity_level = anonymity_level;
304   pr->public_data.priority = priority;
305   pr->public_data.original_priority = priority;
306   pr->public_data.options = options;
307   pr->public_data.type = type;  
308   pr->public_data.start_time = GNUNET_TIME_absolute_get ();
309   pr->sender_pid = sender_pid;
310   pr->rh = rh;
311   pr->rh_cls = rh_cls;
312   if (ttl >= 0)
313     pr->public_data.ttl = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
314                                                                                            (uint32_t) ttl));
315   else
316     pr->public_data.ttl = GNUNET_TIME_absolute_subtract (pr->public_data.start_time,
317                                                          GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
318                                                                                         (uint32_t) (- ttl)));
319   if (replies_seen_count > 0)
320     {
321       pr->replies_seen_size = replies_seen_count;
322       pr->replies_seen = GNUNET_malloc (sizeof (GNUNET_HashCode) * pr->replies_seen_size);
323       memcpy (pr->replies_seen,
324               replies_seen,
325               replies_seen_count * sizeof (GNUNET_HashCode));
326       pr->replies_seen_count = replies_seen_count;
327     }
328   if (NULL != bf_data)    
329     {
330       pr->bf = GNUNET_CONTAINER_bloomfilter_init (bf_data,
331                                                   bf_size,
332                                                   BLOOMFILTER_K);
333       pr->mingle = mingle;
334     }
335   else if ( (replies_seen_count > 0) &&
336             (0 != (options & GSF_PRO_BLOOMFILTER_FULL_REFRESH)) )
337     {
338       GNUNET_assert (GNUNET_YES == refresh_bloomfilter (pr));
339     }
340   GNUNET_CONTAINER_multihashmap_put (pr_map,
341                                      query,
342                                      pr,
343                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
344   if (0 != (options & GSF_PRO_REQUEST_EXPIRES))
345     {
346       pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
347                                                 pr,
348                                                 pr->public_data.ttl.abs_value);
349       /* make sure we don't track too many requests */
350       while (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
351         {
352           dpr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
353           GNUNET_assert (dpr != NULL);
354           if (pr == dpr)
355             break; /* let the request live briefly... */
356           dpr->rh (dpr->rh_cls,
357                    GNUNET_BLOCK_EVALUATION_REQUEST_VALID,
358                    dpr,
359                    GNUNET_TIME_UNIT_FOREVER_ABS,
360                    GNUNET_BLOCK_TYPE_ANY,
361                    NULL, 0);
362           GSF_pending_request_cancel_ (dpr);
363         }
364     }
365   return pr;
366 }
367
368
369 /**
370  * Obtain the public data associated with a pending request
371  * 
372  * @param pr pending request
373  * @return associated public data
374  */
375 struct GSF_PendingRequestData *
376 GSF_pending_request_get_data_ (struct GSF_PendingRequest *pr)
377 {
378   return &pr->public_data;
379 }
380
381
382 /**
383  * Update a given pending request with additional replies
384  * that have been seen.
385  *
386  * @param pr request to update
387  * @param replies_seen hash codes of replies that we've seen
388  * @param replies_seen_count size of the replies_seen array
389  */
390 void
391 GSF_pending_request_update_ (struct GSF_PendingRequest *pr,
392                              const GNUNET_HashCode *replies_seen,
393                              unsigned int replies_seen_count)
394 {
395   unsigned int i;
396   GNUNET_HashCode mhash;
397
398   if (replies_seen_count + pr->replies_seen_count < pr->replies_seen_count)
399     return; /* integer overflow */
400   if (0 != (pr->public_data.options & GSF_PRO_BLOOMFILTER_FULL_REFRESH))
401     {
402       /* we're responsible for the BF, full refresh */
403       if (replies_seen_count + pr->replies_seen_count > pr->replies_seen_size)
404         GNUNET_array_grow (pr->replies_seen,
405                            pr->replies_seen_size,
406                            replies_seen_count + pr->replies_seen_count);
407       memcpy (&pr->replies_seen[pr->replies_seen_count],
408               replies_seen,
409               sizeof (GNUNET_HashCode) * replies_seen_count);
410       pr->replies_seen_count += replies_seen_count;
411       if (GNUNET_NO == refresh_bloomfilter (pr))
412         {
413           /* bf not recalculated, simply extend it with new bits */
414           for (i=0;i<pr->replies_seen_count;i++)
415             {
416               GNUNET_BLOCK_mingle_hash (&replies_seen[i],
417                                         pr->mingle,
418                                         &mhash);
419               GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
420             }
421         }
422     }
423   else
424     {
425       if (NULL == pr->bf)
426         {
427           /* we're not the initiator, but the initiator did not give us
428              any bloom-filter, so we need to create one on-the-fly */
429           pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 
430                                                  UINT32_MAX);
431           pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
432                                                       compute_bloomfilter_size (replies_seen_count),
433                                                       BLOOMFILTER_K);
434         }
435       for (i=0;i<pr->replies_seen_count;i++)
436         {
437           GNUNET_BLOCK_mingle_hash (&replies_seen[i],
438                                     pr->mingle,
439                                     &mhash);
440           GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
441         }
442     }
443 }
444
445
446 /**
447  * Generate the message corresponding to the given pending request for
448  * transmission to other peers (or at least determine its size).
449  *
450  * @param pr request to generate the message for
451  * @param buf_size number of bytes available in buf
452  * @param buf where to copy the message (can be NULL)
453  * @return number of bytes needed (if > buf_size) or used
454  */
455 size_t
456 GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
457                                   size_t buf_size,
458                                   void *buf)
459 {
460   char lbuf[GNUNET_SERVER_MAX_MESSAGE_SIZE];
461   struct GetMessage *gm;
462   GNUNET_HashCode *ext;
463   size_t msize;
464   unsigned int k;
465   uint32_t bm;
466   uint32_t prio;
467   size_t bf_size;
468   struct GNUNET_TIME_Absolute now;
469   int64_t ttl;
470   int do_route;
471
472 #if DEBUG_FS
473   if (buf_size > 0)
474     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
475                 "Building request message for `%s' of type %d\n",
476                 GNUNET_h2s (&pr->public_data.query),
477                 pr->public_data.type);
478 #endif 
479   k = 0;
480   bm = 0;
481   do_route = (0 == (pr->public_data.options & GSF_PRO_FORWARD_ONLY));
482   if (! do_route)
483     {
484       bm |= GET_MESSAGE_BIT_RETURN_TO;
485       k++;      
486     }
487   if (GNUNET_BLOCK_TYPE_FS_SBLOCK == pr->public_data.type)
488     {
489       bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
490       k++;
491     }
492   if (GNUNET_YES == pr->public_data.has_target)
493     {
494       bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
495       k++;
496     }
497   bf_size = GNUNET_CONTAINER_bloomfilter_get_size (pr->bf);
498   msize = sizeof (struct GetMessage) + bf_size + k * sizeof(GNUNET_HashCode);
499   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
500   if (buf_size < msize)
501     return msize;  
502   gm = (struct GetMessage*) lbuf;
503   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
504   gm->header.size = htons (msize);
505   gm->type = htonl (pr->public_data.type);
506   if (do_route)
507     prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
508                                      pr->public_data.priority + 1);
509   else
510     prio = 0;
511   pr->public_data.priority -= prio;
512   gm->priority = htonl (prio);
513   now = GNUNET_TIME_absolute_get ();
514   ttl = (int64_t) (pr->public_data.ttl.abs_value - now.abs_value);
515   gm->ttl = htonl (ttl / 1000);
516   gm->filter_mutator = htonl(pr->mingle); 
517   gm->hash_bitmap = htonl (bm);
518   gm->query = pr->public_data.query;
519   ext = (GNUNET_HashCode*) &gm[1];
520   k = 0;  
521   if (! do_route)
522     GNUNET_PEER_resolve (pr->sender_pid, 
523                          (struct GNUNET_PeerIdentity*) &ext[k++]);
524   if (GNUNET_BLOCK_TYPE_FS_SBLOCK == pr->public_data.type)
525     memcpy (&ext[k++], 
526             &pr->public_data.namespace, 
527             sizeof (GNUNET_HashCode));
528   if (GNUNET_YES == pr->public_data.has_target)
529     GNUNET_PEER_resolve (pr->sender_pid, 
530                          (struct GNUNET_PeerIdentity*) &ext[k++]);
531   if (pr->bf != NULL)
532     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
533                                                (char*) &ext[k],
534                                                bf_size);
535   memcpy (buf, gm, msize);
536   return msize;
537 }
538
539
540 /**
541  * Iterator to free pending requests.
542  *
543  * @param cls closure, unused
544  * @param key current key code
545  * @param value value in the hash map (pending request)
546  * @return GNUNET_YES (we should continue to iterate)
547  */
548 static int 
549 clean_request (void *cls,
550                const GNUNET_HashCode * key,
551                void *value)
552 {
553   struct GSF_PendingRequest *pr = value;
554   GSF_LocalLookupContinuation cont;
555
556 #if DEBUG_FS
557   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
558               "Cleaning up pending request for `%s'.\n",
559               GNUNET_h2s (key));
560 #endif  
561   if (NULL != (cont = pr->llc_cont))
562     {
563       pr->llc_cont = NULL;
564       cont (pr->llc_cont_cls,
565             pr,
566             pr->local_result);
567     } 
568   GSF_plan_notify_request_done_ (pr);
569   GNUNET_free_non_null (pr->replies_seen);
570   if (NULL != pr->bf)
571     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
572   GNUNET_PEER_change_rc (pr->sender_pid, -1);
573   if (NULL != pr->hnode)
574     GNUNET_CONTAINER_heap_remove_node (pr->hnode);
575   if (NULL != pr->qe)
576     GNUNET_DATASTORE_cancel (pr->qe);
577   if (NULL != pr->gh)
578     GNUNET_DHT_get_stop (pr->gh);
579   GNUNET_free (pr);
580   return GNUNET_YES;
581 }
582
583
584 /**
585  * Explicitly cancel a pending request.
586  *
587  * @param pr request to cancel
588  */
589 void
590 GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr)
591 {
592   if (NULL == pr_map) return; /* already cleaned up! */
593   GNUNET_assert (GNUNET_OK ==
594                  GNUNET_CONTAINER_multihashmap_remove (pr_map,
595                                                        &pr->public_data.query,
596                                                        pr));
597   GNUNET_assert (GNUNET_YES ==
598                  clean_request (NULL, &pr->public_data.query, pr));  
599 }
600
601
602 /**
603  * Iterate over all pending requests.
604  *
605  * @param it function to call for each request
606  * @param cls closure for it
607  */
608 void
609 GSF_iterate_pending_requests_ (GSF_PendingRequestIterator it,
610                                void *cls)
611 {
612   GNUNET_CONTAINER_multihashmap_iterate (pr_map,
613                                          (GNUNET_CONTAINER_HashMapIterator) it,
614                                          cls);
615 }
616
617
618
619
620 /**
621  * Closure for "process_reply" function.
622  */
623 struct ProcessReplyClosure
624 {
625   /**
626    * The data for the reply.
627    */
628   const void *data;
629
630   /**
631    * Who gave us this reply? NULL for local host (or DHT)
632    */
633   struct GSF_ConnectedPeer *sender;
634
635   /**
636    * When the reply expires.
637    */
638   struct GNUNET_TIME_Absolute expiration;
639
640   /**
641    * Size of data.
642    */
643   size_t size;
644
645   /**
646    * Type of the block.
647    */
648   enum GNUNET_BLOCK_Type type;
649
650   /**
651    * How much was this reply worth to us?
652    */
653   uint32_t priority;
654
655   /**
656    * Anonymity requirements for this reply.
657    */
658   uint32_t anonymity_level;
659
660   /**
661    * Evaluation result (returned).
662    */
663   enum GNUNET_BLOCK_EvaluationResult eval;
664
665   /**
666    * Did we find a matching request?
667    */
668   int request_found;
669 };
670
671
672 /**
673  * Update the performance data for the sender (if any) since
674  * the sender successfully answered one of our queries.
675  *
676  * @param prq information about the sender
677  * @param pr request that was satisfied
678  */
679 static void
680 update_request_performance_data (struct ProcessReplyClosure *prq,
681                                  struct GSF_PendingRequest *pr)
682 {
683   if (prq->sender == NULL)
684     return;      
685   GSF_peer_update_performance_ (prq->sender,
686                                 pr->public_data.start_time,
687                                 prq->priority);
688 }
689                                 
690
691 /**
692  * We have received a reply; handle it!
693  *
694  * @param cls response (struct ProcessReplyClosure)
695  * @param key our query
696  * @param value value in the hash map (info about the query)
697  * @return GNUNET_YES (we should continue to iterate)
698  */
699 static int
700 process_reply (void *cls,
701                const GNUNET_HashCode * key,
702                void *value)
703 {
704   struct ProcessReplyClosure *prq = cls;
705   struct GSF_PendingRequest *pr = value;
706   GNUNET_HashCode chash;
707
708 #if DEBUG_FS
709   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
710               "Matched result (type %u) for query `%s' with pending request\n",
711               (unsigned int) prq->type,
712               GNUNET_h2s (key));
713 #endif  
714   GNUNET_STATISTICS_update (GSF_stats,
715                             gettext_noop ("# replies received and matched"),
716                             1,
717                             GNUNET_NO);
718   prq->eval = GNUNET_BLOCK_evaluate (GSF_block_ctx,
719                                      prq->type,
720                                      key,
721                                      &pr->bf,
722                                      pr->mingle,
723                                      &pr->public_data.namespace, 
724                                      (prq->type == GNUNET_BLOCK_TYPE_FS_SBLOCK) ? sizeof (GNUNET_HashCode) : 0,
725                                      prq->data,
726                                      prq->size);
727   switch (prq->eval)
728     {
729     case GNUNET_BLOCK_EVALUATION_OK_MORE:
730       update_request_performance_data (prq, pr);
731       break;
732     case GNUNET_BLOCK_EVALUATION_OK_LAST:
733       /* short cut: stop processing early, no BF-update, etc. */
734       update_request_performance_data (prq, pr);
735       GNUNET_LOAD_update (GSF_rt_entry_lifetime,
736                           GNUNET_TIME_absolute_get_duration (pr->public_data.start_time).rel_value);
737       /* pass on to other peers / local clients */
738       pr->rh (pr->rh_cls,             
739               prq->eval,
740               pr,
741               prq->expiration,
742               prq->type,
743               prq->data, prq->size);
744       return GNUNET_YES;
745     case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
746       GNUNET_STATISTICS_update (GSF_stats,
747                                 gettext_noop ("# duplicate replies discarded (bloomfilter)"),
748                                 1,
749                                 GNUNET_NO);
750 #if DEBUG_FS && 0
751       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
752                   "Duplicate response `%s', discarding.\n",
753                   GNUNET_h2s (&mhash));
754 #endif
755       return GNUNET_YES; /* duplicate */
756     case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
757       return GNUNET_YES; /* wrong namespace */  
758     case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
759       GNUNET_break (0);
760       return GNUNET_YES;
761     case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
762       GNUNET_break (0);
763       return GNUNET_YES;
764     case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
765       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
766                   _("Unsupported block type %u\n"),
767                   prq->type);
768       return GNUNET_NO;
769     }
770   /* update bloomfilter */
771   GNUNET_CRYPTO_hash (prq->data,
772                       prq->size,
773                       &chash);
774   GSF_pending_request_update_ (pr, &chash, 1);
775   if (NULL == prq->sender)
776     {
777 #if DEBUG_FS
778       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
779                   "Found result for query `%s' in local datastore\n",
780                   GNUNET_h2s (key));
781 #endif
782       GNUNET_STATISTICS_update (GSF_stats,
783                                 gettext_noop ("# results found locally"),
784                                 1,
785                                 GNUNET_NO);      
786     }
787   else
788     {     
789       GSF_dht_lookup_ (pr);
790     }
791   prq->priority += pr->public_data.original_priority;
792   pr->public_data.priority = 0;
793   pr->public_data.original_priority = 0;
794   pr->public_data.results_found++;
795   prq->request_found = GNUNET_YES;
796   /* finally, pass on to other peer / local client */
797   pr->rh (pr->rh_cls,
798           prq->eval,
799           pr, 
800           prq->expiration,
801           prq->type,
802           prq->data, prq->size);
803   return GNUNET_YES;
804 }
805
806
807 /**
808  * Context for the 'put_migration_continuation'.
809  */
810 struct PutMigrationContext
811 {
812
813   /**
814    * Start time for the operation.
815    */
816   struct GNUNET_TIME_Absolute start;
817
818   /**
819    * Request origin.
820    */
821   struct GNUNET_PeerIdentity origin;
822
823   /**
824    * GNUNET_YES if we had a matching request for this block,
825    * GNUNET_NO if not.
826    */
827   int requested;
828 };
829
830
831 /**
832  * Continuation called to notify client about result of the
833  * operation.
834  *
835  * @param cls closure
836  * @param success GNUNET_SYSERR on failure
837  * @param msg NULL on success, otherwise an error message
838  */
839 static void 
840 put_migration_continuation (void *cls,
841                             int success,
842                             const char *msg)
843 {
844   struct PutMigrationContext *pmc = cls;
845   struct GNUNET_TIME_Relative delay;
846   struct GNUNET_TIME_Relative block_time;  
847   struct GSF_ConnectedPeer *cp;
848   struct GSF_PeerPerformanceData *ppd;
849                          
850   delay = GNUNET_TIME_absolute_get_duration (pmc->start);
851   cp = GSF_peer_get_ (&pmc->origin);
852   if ( (GNUNET_OK != success) &&
853        (GNUNET_NO == pmc->requested) )
854     {
855       /* block migration for a bit... */
856       if (NULL != cp)
857         {
858           ppd = GSF_get_peer_performance_data_ (cp);
859           ppd->migration_duplication++;
860           block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
861                                                       5 * ppd->migration_duplication + 
862                                                       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5));
863           GSF_block_peer_migration_ (cp, block_time);
864         }
865     }
866   else
867     {
868       if (NULL != cp)
869         {
870           ppd = GSF_get_peer_performance_data_ (cp);
871           ppd->migration_duplication = 0; /* reset counter */
872         }
873     }
874   GNUNET_free (pmc);
875   /* FIXME: should we really update the load value on failure? */
876   GNUNET_LOAD_update (datastore_put_load,
877                       delay.rel_value);
878   if (GNUNET_OK == success)
879     return;
880   GNUNET_STATISTICS_update (GSF_stats,
881                             gettext_noop ("# datastore 'put' failures"),
882                             1,
883                             GNUNET_NO);
884 }
885
886
887 /**
888  * Test if the DATABASE (PUT) load on this peer is too high
889  * to even consider processing the query at
890  * all.  
891  * 
892  * @return GNUNET_YES if the load is too high to do anything (load high)
893  *         GNUNET_NO to process normally (load normal or low)
894  */
895 static int
896 test_put_load_too_high (uint32_t priority)
897 {
898   double ld;
899
900   if (GNUNET_LOAD_get_average (datastore_put_load) < 50)
901     return GNUNET_NO; /* very fast */
902   ld = GNUNET_LOAD_get_load (datastore_put_load);
903   if (ld < 2.0 * (1 + priority))
904     return GNUNET_NO;
905   GNUNET_STATISTICS_update (GSF_stats,
906                             gettext_noop ("# storage requests dropped due to high load"),
907                             1,
908                             GNUNET_NO);
909   return GNUNET_YES;
910 }
911
912
913 /**
914  * Iterator called on each result obtained for a DHT
915  * operation that expects a reply
916  *
917  * @param cls closure
918  * @param exp when will this value expire
919  * @param key key of the result
920  * @param get_path NULL-terminated array of pointers
921  *                 to the peers on reverse GET path (or NULL if not recorded)
922  * @param put_path NULL-terminated array of pointers
923  *                 to the peers on the PUT path (or NULL if not recorded)
924  * @param type type of the result
925  * @param size number of bytes in data
926  * @param data pointer to the result data
927  */
928 static void
929 handle_dht_reply (void *cls,
930                   struct GNUNET_TIME_Absolute exp,
931                   const GNUNET_HashCode *key,
932                   const struct GNUNET_PeerIdentity * const *get_path,
933                   const struct GNUNET_PeerIdentity * const *put_path,
934                   enum GNUNET_BLOCK_Type type,
935                   size_t size,
936                   const void *data)
937 {
938   struct GSF_PendingRequest *pr = cls;
939   struct ProcessReplyClosure prq;
940   struct PutMigrationContext *pmc;
941
942   memset (&prq, 0, sizeof (prq));
943   prq.data = data;
944   prq.expiration = exp;
945   prq.size = size;  
946   prq.type = type;
947   process_reply (&prq, key, pr);
948   if ( (GNUNET_YES == active_to_migration) &&
949        (GNUNET_NO == test_put_load_too_high (prq.priority)) )
950     {      
951 #if DEBUG_FS
952       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
953                   "Replicating result for query `%s' with priority %u\n",
954                   GNUNET_h2s (key),
955                   prq.priority);
956 #endif
957       pmc = GNUNET_malloc (sizeof (struct PutMigrationContext));
958       pmc->start = GNUNET_TIME_absolute_get ();
959       pmc->requested = GNUNET_YES;
960       GNUNET_DATASTORE_put (GSF_dsh,
961                             0, key, size, data,
962                             type, prq.priority, 1 /* anonymity */, 
963                             0 /* replication */,
964                             exp, 
965                             1 + prq.priority, MAX_DATASTORE_QUEUE,
966                             GNUNET_CONSTANTS_SERVICE_TIMEOUT,
967                             &put_migration_continuation, 
968                             pmc);
969     }
970 }
971
972
973 /**
974  * Consider looking up the data in the DHT (anonymity-level permitting).
975  *
976  * @param pr the pending request to process
977  */
978 void
979 GSF_dht_lookup_ (struct GSF_PendingRequest *pr)
980 {
981   const void *xquery;
982   size_t xquery_size;
983   struct GNUNET_PeerIdentity pi;
984   char buf[sizeof (GNUNET_HashCode) * 2];
985
986   if (0 != pr->public_data.anonymity_level)
987     return;
988   if (NULL != pr->gh)
989     {
990       GNUNET_DHT_get_stop (pr->gh);
991       pr->gh = NULL;
992     }
993   xquery = NULL;
994   xquery_size = 0;
995   if (GNUNET_BLOCK_TYPE_FS_SBLOCK == pr->public_data.type)
996     {
997       xquery = buf;
998       memcpy (buf, &pr->public_data.namespace, sizeof (GNUNET_HashCode));
999       xquery_size = sizeof (GNUNET_HashCode);
1000     }
1001   if (0 != (pr->public_data.options & GSF_PRO_FORWARD_ONLY))
1002     {
1003       GNUNET_PEER_resolve (pr->sender_pid,
1004                            &pi);
1005       memcpy (&buf[xquery_size], &pi, sizeof (struct GNUNET_PeerIdentity));
1006       xquery_size += sizeof (struct GNUNET_PeerIdentity);
1007     }
1008   pr->gh = GNUNET_DHT_get_start (GSF_dht,
1009                                  GNUNET_TIME_UNIT_FOREVER_REL,
1010                                  pr->public_data.type,
1011                                  &pr->public_data.query,
1012                                  DEFAULT_GET_REPLICATION,
1013                                  GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE,
1014                                  pr->bf,
1015                                  pr->mingle,
1016                                  xquery,
1017                                  xquery_size,
1018                                  &handle_dht_reply,
1019                                  pr);
1020 }
1021
1022 /**
1023  * We're processing (local) results for a search request
1024  * from another peer.  Pass applicable results to the
1025  * peer and if we are done either clean up (operation
1026  * complete) or forward to other peers (more results possible).
1027  *
1028  * @param cls our closure (struct PendingRequest)
1029  * @param key key for the content
1030  * @param size number of bytes in data
1031  * @param data content stored
1032  * @param type type of the content
1033  * @param priority priority of the content
1034  * @param anonymity anonymity-level for the content
1035  * @param expiration expiration time for the content
1036  * @param uid unique identifier for the datum;
1037  *        maybe 0 if no unique identifier is available
1038  */
1039 static void
1040 process_local_reply (void *cls,
1041                      const GNUNET_HashCode *key,
1042                      size_t size,
1043                      const void *data,
1044                      enum GNUNET_BLOCK_Type type,
1045                      uint32_t priority,
1046                      uint32_t anonymity,
1047                      struct GNUNET_TIME_Absolute expiration, 
1048                      uint64_t uid)
1049 {
1050   struct GSF_PendingRequest *pr = cls;
1051   GSF_LocalLookupContinuation cont;
1052   struct ProcessReplyClosure prq;
1053   GNUNET_HashCode query;
1054   unsigned int old_rf;
1055   
1056   pr->qe = NULL;
1057   if (0 == pr->replies_seen_count)
1058     {
1059       pr->first_uid = uid;
1060     }
1061   else
1062     {
1063       if (uid == pr->first_uid)
1064         key = NULL; /* all replies seen! */
1065     }
1066   if (NULL == key)
1067     {
1068 #if DEBUG_FS > 1
1069       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1070                   "No further local responses available.\n");
1071 #endif
1072       if (NULL != (cont = pr->llc_cont))
1073         {
1074           pr->llc_cont = NULL;
1075           cont (pr->llc_cont_cls,
1076                 pr,
1077                 pr->local_result);
1078         }
1079       return;
1080     }
1081 #if DEBUG_FS
1082   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1083               "Received reply for `%s' of type %d with UID %llu from datastore.\n",
1084               GNUNET_h2s (key),
1085               type,
1086               (unsigned long long) uid);
1087 #endif
1088   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1089     {
1090 #if DEBUG_FS > 1
1091       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1092                   "Found ONDEMAND block, performing on-demand encoding\n");
1093 #endif
1094       GNUNET_STATISTICS_update (GSF_stats,
1095                                 gettext_noop ("# on-demand blocks matched requests"),
1096                                 1,
1097                                 GNUNET_NO);
1098       if (GNUNET_OK != 
1099           GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, 
1100                                             anonymity, expiration, uid, 
1101                                             &process_local_reply,
1102                                             pr))
1103         {
1104           pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
1105                                              pr->local_result_offset - 1,
1106                                              &pr->public_data.query,
1107                                              pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK 
1108                                              ? GNUNET_BLOCK_TYPE_ANY 
1109                                              : pr->public_data.type, 
1110                                              (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options))
1111                                              ? UINT_MAX
1112                                              : 1 /* queue priority */,
1113                                              (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options))
1114                                              ? UINT_MAX
1115                                              : 1 /* max queue size */,
1116                                              GNUNET_TIME_UNIT_FOREVER_REL,
1117                                              &process_local_reply,
1118                                              pr);
1119           GNUNET_assert (NULL != pr->qe);
1120         }
1121       return;
1122     }
1123   old_rf = pr->public_data.results_found;
1124   memset (&prq, 0, sizeof (prq));
1125   prq.data = data;
1126   prq.expiration = expiration;
1127   prq.size = size;  
1128   if (GNUNET_OK != 
1129       GNUNET_BLOCK_get_key (GSF_block_ctx,
1130                             type,
1131                             data,
1132                             size,
1133                             &query))
1134     {
1135       GNUNET_break (0);
1136       GNUNET_DATASTORE_remove (GSF_dsh,
1137                                key,
1138                                size, data,
1139                                -1, -1, 
1140                                GNUNET_TIME_UNIT_FOREVER_REL,
1141                                NULL, NULL);
1142       pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
1143                                          pr->local_result_offset - 1,
1144                                          &pr->public_data.query,
1145                                          pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK 
1146                                          ? GNUNET_BLOCK_TYPE_ANY 
1147                                          : pr->public_data.type, 
1148                                          (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options))
1149                                          ? UINT_MAX
1150                                          : 1 /* queue priority */,
1151                                          (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options))
1152                                          ? UINT_MAX
1153                                          : 1 /* max queue size */,
1154                                          GNUNET_TIME_UNIT_FOREVER_REL,
1155                                          &process_local_reply,
1156                                          pr);
1157       GNUNET_assert (NULL != pr->qe);
1158       return;
1159     }
1160   prq.type = type;
1161   prq.priority = priority;  
1162   prq.request_found = GNUNET_NO;
1163   prq.anonymity_level = anonymity;
1164   if ( (old_rf == 0) &&
1165        (pr->public_data.results_found == 0) )
1166     GSF_update_datastore_delay_ (pr->public_data.start_time);
1167   process_reply (&prq, key, pr);
1168   pr->local_result = prq.eval;
1169   if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
1170     {
1171       if (NULL != (cont = pr->llc_cont))
1172         {
1173           pr->llc_cont = NULL;
1174           cont (pr->llc_cont_cls,
1175                 pr,
1176                 pr->local_result);
1177         }      
1178       return;
1179     }
1180   if ( (0 == (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) &&
1181        ( (GNUNET_YES == GSF_test_get_load_too_high_ (0)) ||
1182          (pr->public_data.results_found > 5 + 2 * pr->public_data.priority) ) )
1183     {
1184 #if DEBUG_FS > 2
1185       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1186                   "Load too high, done with request\n");
1187 #endif
1188       GNUNET_STATISTICS_update (GSF_stats,
1189                                 gettext_noop ("# processing result set cut short due to load"),
1190                                 1,
1191                                 GNUNET_NO);
1192       if (NULL != (cont = pr->llc_cont))
1193         {
1194           pr->llc_cont = NULL;
1195           cont (pr->llc_cont_cls,
1196                 pr,
1197                 pr->local_result);
1198         }
1199       return;
1200     }
1201   pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
1202                                      pr->local_result_offset++,
1203                                      &pr->public_data.query,
1204                                      pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK 
1205                                      ? GNUNET_BLOCK_TYPE_ANY 
1206                                      : pr->public_data.type, 
1207                                      (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options))
1208                                      ? UINT_MAX
1209                                      : 1 /* queue priority */,
1210                                      (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options))
1211                                      ? UINT_MAX
1212                                      : 1 /* max queue size */,
1213                                      GNUNET_TIME_UNIT_FOREVER_REL,
1214                                      &process_local_reply,
1215                                      pr);
1216   GNUNET_assert (NULL != pr->qe);
1217 }
1218
1219
1220 /**
1221  * Look up the request in the local datastore.
1222  *
1223  * @param pr the pending request to process
1224  * @param cont function to call at the end
1225  * @param cont_cls closure for cont
1226  */
1227 void
1228 GSF_local_lookup_ (struct GSF_PendingRequest *pr,
1229                    GSF_LocalLookupContinuation cont,
1230                    void *cont_cls)
1231 {
1232   GNUNET_assert (NULL == pr->gh);
1233   GNUNET_assert (NULL == pr->llc_cont);
1234   pr->llc_cont = cont;
1235   pr->llc_cont_cls = cont_cls;
1236   pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
1237                                      pr->local_result_offset++,
1238                                      &pr->public_data.query,
1239                                      pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK 
1240                                      ? GNUNET_BLOCK_TYPE_ANY 
1241                                      : pr->public_data.type, 
1242                                      (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options))
1243                                      ? UINT_MAX
1244                                      : 1 /* queue priority */,
1245                                      (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options))
1246                                      ? UINT_MAX
1247                                      : 1 /* max queue size */,
1248                                      GNUNET_TIME_UNIT_FOREVER_REL,
1249                                      &process_local_reply,
1250                                      pr);
1251 }
1252
1253
1254
1255 /**
1256  * Handle P2P "CONTENT" message.  Checks that the message is
1257  * well-formed and then checks if there are any pending requests for
1258  * this content and possibly passes it on (to local clients or other
1259  * peers).  Does NOT perform migration (content caching at this peer).
1260  *
1261  * @param cp the other peer involved (sender or receiver, NULL
1262  *        for loopback messages where we are both sender and receiver)
1263  * @param message the actual message
1264  * @return GNUNET_OK if the message was well-formed,
1265  *         GNUNET_SYSERR if the message was malformed (close connection,
1266  *         do not cache under any circumstances)
1267  */
1268 int
1269 GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
1270                          const struct GNUNET_MessageHeader *message)
1271 {
1272   const struct PutMessage *put;
1273   uint16_t msize;
1274   size_t dsize;
1275   enum GNUNET_BLOCK_Type type;
1276   struct GNUNET_TIME_Absolute expiration;
1277   GNUNET_HashCode query;
1278   struct ProcessReplyClosure prq;
1279   struct GNUNET_TIME_Relative block_time;  
1280   double putl;
1281   struct PutMigrationContext *pmc;
1282
1283   msize = ntohs (message->size);
1284   if (msize < sizeof (struct PutMessage))
1285     {
1286       GNUNET_break_op(0);
1287       return GNUNET_SYSERR;
1288     }
1289   put = (const struct PutMessage*) message;
1290   dsize = msize - sizeof (struct PutMessage);
1291   type = ntohl (put->type);
1292   expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
1293   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1294     return GNUNET_SYSERR;
1295   if (GNUNET_OK !=
1296       GNUNET_BLOCK_get_key (GSF_block_ctx,
1297                             type,
1298                             &put[1],
1299                             dsize,
1300                             &query))
1301     {
1302       GNUNET_break_op (0);
1303       return GNUNET_SYSERR;
1304     }
1305   /* now, lookup 'query' */
1306   prq.data = (const void*) &put[1];
1307   if (NULL != cp)
1308     prq.sender = cp;
1309   else
1310     prq.sender = NULL;
1311   prq.size = dsize;
1312   prq.type = type;
1313   prq.expiration = expiration;
1314   prq.priority = 0;
1315   prq.anonymity_level = 1;
1316   prq.request_found = GNUNET_NO;
1317   GNUNET_CONTAINER_multihashmap_get_multiple (pr_map,
1318                                               &query,
1319                                               &process_reply,
1320                                               &prq);
1321   if (NULL != cp)
1322     {
1323       GSF_connected_peer_change_preference_ (cp, CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority);
1324       GSF_get_peer_performance_data_ (cp)->trust += prq.priority;
1325     }
1326   if ( (GNUNET_YES == active_to_migration) &&
1327        (GNUNET_NO == test_put_load_too_high (prq.priority)) )
1328     {      
1329 #if DEBUG_FS
1330       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1331                   "Replicating result for query `%s' with priority %u\n",
1332                   GNUNET_h2s (&query),
1333                   prq.priority);
1334 #endif
1335       pmc = GNUNET_malloc (sizeof (struct PutMigrationContext));
1336       pmc->start = GNUNET_TIME_absolute_get ();
1337       pmc->requested = prq.request_found;
1338       GNUNET_PEER_resolve (GSF_get_peer_performance_data_ (cp)->pid,
1339                            &pmc->origin);
1340       GNUNET_DATASTORE_put (GSF_dsh,
1341                             0, &query, dsize, &put[1],
1342                             type, prq.priority, 1 /* anonymity */, 
1343                             0 /* replication */,
1344                             expiration, 
1345                             1 + prq.priority, MAX_DATASTORE_QUEUE,
1346                             GNUNET_CONSTANTS_SERVICE_TIMEOUT,
1347                             &put_migration_continuation, 
1348                             pmc);
1349     }
1350   else
1351     {
1352 #if DEBUG_FS
1353       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1354                   "Choosing not to keep content `%s' (%d/%d)\n",
1355                   GNUNET_h2s (&query),
1356                   active_to_migration,
1357                   test_put_load_too_high (prq.priority));
1358 #endif
1359     }
1360   putl = GNUNET_LOAD_get_load (datastore_put_load);
1361   if ( (NULL != (cp = prq.sender)) &&
1362        (GNUNET_NO == prq.request_found) &&
1363        ( (GNUNET_YES != active_to_migration) ||
1364          (putl > 2.5 * (1 + prq.priority)) ) ) 
1365     {
1366       if (GNUNET_YES != active_to_migration) 
1367         putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
1368       block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
1369                                                   5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1370                                                                                    (unsigned int) (60000 * putl * putl)));
1371       GSF_block_peer_migration_ (cp, block_time);
1372     }  
1373   return GNUNET_OK;
1374 }
1375
1376
1377 /**
1378  * Setup the subsystem.
1379  */
1380 void
1381 GSF_pending_request_init_ ()
1382 {
1383   if (GNUNET_OK !=
1384       GNUNET_CONFIGURATION_get_value_number (GSF_cfg,
1385                                              "fs",
1386                                              "MAX_PENDING_REQUESTS",
1387                                              &max_pending_requests))
1388     {
1389       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1390                   _("Configuration fails to specify `%s', assuming default value."),
1391                   "MAX_PENDING_REQUESTS");
1392     }
1393   active_to_migration = GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg,
1394                                                               "FS",
1395                                                               "CONTENT_CACHING");
1396   datastore_put_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
1397   pr_map = GNUNET_CONTAINER_multihashmap_create (32 * 1024);
1398   requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
1399 }
1400
1401
1402 /**
1403  * Shutdown the subsystem.
1404  */
1405 void
1406 GSF_pending_request_done_ ()
1407 {
1408   GNUNET_CONTAINER_multihashmap_iterate (pr_map,
1409                                          &clean_request,
1410                                          NULL);
1411   GNUNET_CONTAINER_multihashmap_destroy (pr_map);
1412   pr_map = NULL;
1413   GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
1414   requests_by_expiration_heap = NULL;
1415   GNUNET_LOAD_value_free (datastore_put_load);
1416   datastore_put_load = NULL;
1417 }
1418
1419
1420 /* end of gnunet-service-fs_pr.c */