fix
[oweals/gnunet.git] / src / fs / gnunet-service-fs_pr.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file fs/gnunet-service-fs_pr.c
23  * @brief API to handle pending requests
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_load_lib.h"
28 #include "gnunet-service-fs.h"
29 #include "gnunet-service-fs_cp.h"
30 #include "gnunet-service-fs_indexing.h"
31 #include "gnunet-service-fs_pe.h"
32 #include "gnunet-service-fs_pr.h"
33
34
35 /**
36  * An active request.
37  */
38 struct GSF_PendingRequest
39 {
40   /**
41    * Public data for the request.
42    */ 
43   struct GSF_PendingRequestData public_data;
44
45   /**
46    * Function to call if we encounter a reply.
47    */
48   GSF_PendingRequestReplyHandler rh;
49
50   /**
51    * Closure for 'rh'
52    */
53   void *rh_cls;
54
55   /**
56    * Array of hash codes of replies we've already seen.
57    */
58   GNUNET_HashCode *replies_seen;
59
60   /**
61    * Bloomfilter masking replies we've already seen.
62    */
63   struct GNUNET_CONTAINER_BloomFilter *bf;
64
65   /**
66    * Entry for this pending request in the expiration heap, or NULL.
67    */
68   struct GNUNET_CONTAINER_HeapNode *hnode;
69
70   /**
71    * Datastore queue entry for this request (or NULL for none).
72    */
73   struct GNUNET_DATASTORE_QueueEntry *qe;
74
75   /**
76    * DHT request handle for this request (or NULL for none).
77    */
78   struct GNUNET_DHT_GetHandle *gh;
79
80   /**
81    * Function to call upon completion of the local get
82    * request, or NULL for none.
83    */
84   GSF_LocalLookupContinuation llc_cont;
85
86   /**
87    * Closure for llc_cont.
88    */
89   void *llc_cont_cls;
90
91   /**
92    * Last result from the local datastore lookup evaluation.
93    */
94   enum GNUNET_BLOCK_EvaluationResult local_result;
95
96   /**
97    * Identity of the peer that we should use for the 'sender'
98    * (recipient of the response) when forwarding (0 for none).
99    */
100   GNUNET_PEER_Id sender_pid;
101
102   /**
103    * Time we started the last datastore lookup.
104    */
105   struct GNUNET_TIME_Absolute qe_start;
106
107   /**
108    * Task that warns us if the local datastore lookup takes too long.
109    */
110   GNUNET_SCHEDULER_TaskIdentifier warn_task;
111
112   /**
113    * Current offset for querying our local datastore for results.
114    * Starts at a random value, incremented until we get the same
115    * UID again (detected using 'first_uid'), which is then used
116    * to termiante the iteration.
117    */
118   uint64_t local_result_offset;
119
120   /**
121    * Unique ID of the first result from the local datastore;
122    * used to detect wrap-around of the offset.
123    */
124   uint64_t first_uid;
125
126   /**
127    * Number of valid entries in the 'replies_seen' array.
128    */
129   unsigned int replies_seen_count;
130
131   /**
132    * Length of the 'replies_seen' array.
133    */
134   unsigned int replies_seen_size;
135
136   /**
137    * Mingle value we currently use for the bf.
138    */
139   uint32_t mingle;
140
141   /**
142    * Do we have a first UID yet?
143    */
144   int have_first_uid;
145
146 };
147
148
149 /**
150  * All pending requests, ordered by the query.  Entries
151  * are of type 'struct GSF_PendingRequest*'.
152  */
153 static struct GNUNET_CONTAINER_MultiHashMap *pr_map;
154
155
156 /**
157  * Datastore 'PUT' load tracking.
158  */
159 static struct GNUNET_LOAD_Value *datastore_put_load;
160
161
162 /**
163  * Are we allowed to migrate content to this peer.
164  */
165 static int active_to_migration;
166
167
168 /**
169  * Heap with the request that will expire next at the top.  Contains
170  * pointers of type "struct PendingRequest*"; these will *also* be
171  * aliased from the "requests_by_peer" data structures and the
172  * "requests_by_query" table.  Note that requests from our clients
173  * don't expire and are thus NOT in the "requests_by_expiration"
174  * (or the "requests_by_peer" tables).
175  */
176 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
177
178
179 /**
180  * Maximum number of requests (from other peers, overall) that we're
181  * willing to have pending at any given point in time.  Can be changed
182  * via the configuration file (32k is just the default).
183  */
184 static unsigned long long max_pending_requests = (32 * 1024);
185
186
187 /**
188  * How many bytes should a bloomfilter be if we have already seen
189  * entry_count responses?  Note that BLOOMFILTER_K gives us the number
190  * of bits set per entry.  Furthermore, we should not re-size the
191  * filter too often (to keep it cheap).
192  *
193  * Since other peers will also add entries but not resize the filter,
194  * we should generally pick a slightly larger size than what the
195  * strict math would suggest.
196  *
197  * @return must be a power of two and smaller or equal to 2^15.
198  */
199 static size_t
200 compute_bloomfilter_size (unsigned int entry_count)
201 {
202   size_t size;
203   unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
204   uint16_t max = 1 << 15;
205
206   if (entry_count > max)
207     return max;
208   size = 8;
209   while ((size < max) && (size < ideal))
210     size *= 2;
211   if (size > max)
212     return max;
213   return size;
214 }
215
216
217 /**
218  * Recalculate our bloom filter for filtering replies.  This function
219  * will create a new bloom filter from scratch, so it should only be
220  * called if we have no bloomfilter at all (and hence can create a
221  * fresh one of minimal size without problems) OR if our peer is the
222  * initiator (in which case we may resize to larger than mimimum size).
223  *
224  * @param pr request for which the BF is to be recomputed
225  * @return GNUNET_YES if a refresh actually happened
226  */
227 static int
228 refresh_bloomfilter (struct GSF_PendingRequest *pr)
229 {
230   unsigned int i;
231   size_t nsize;
232   GNUNET_HashCode mhash;
233
234   nsize = compute_bloomfilter_size (pr->replies_seen_count);
235   if ( (pr->bf != NULL) &&
236        (nsize == GNUNET_CONTAINER_bloomfilter_get_size (pr->bf)) )
237     return GNUNET_NO; /* size not changed */
238   if (pr->bf != NULL)
239     GNUNET_CONTAINER_bloomfilter_free (pr->bf);
240   pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 
241                                          UINT32_MAX);
242   pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL, 
243                                               nsize,
244                                               BLOOMFILTER_K);
245   for (i=0;i<pr->replies_seen_count;i++)
246     {
247       GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i],
248                                 pr->mingle,
249                                 &mhash);
250       GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
251     }
252   return GNUNET_YES;
253 }
254
255
256 /**
257  * Create a new pending request.  
258  *
259  * @param options request options
260  * @param type type of the block that is being requested
261  * @param query key for the lookup
262  * @param namespace namespace to lookup, NULL for no namespace
263  * @param target preferred target for the request, NULL for none
264  * @param bf_data raw data for bloom filter for known replies, can be NULL
265  * @param bf_size number of bytes in bf_data
266  * @param mingle mingle value for bf
267  * @param anonymity_level desired anonymity level
268  * @param priority maximum outgoing cummulative request priority to use
269  * @param ttl current time-to-live for the request
270  * @param sender_pid peer ID to use for the sender when forwarding, 0 for none
271  * @param replies_seen hash codes of known local replies
272  * @param replies_seen_count size of the 'replies_seen' array
273  * @param rh handle to call when we get a reply
274  * @param rh_cls closure for rh
275  * @return handle for the new pending request
276  */
277 struct GSF_PendingRequest *
278 GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
279                              enum GNUNET_BLOCK_Type type,
280                              const GNUNET_HashCode *query,
281                              const GNUNET_HashCode *namespace,
282                              const struct GNUNET_PeerIdentity *target,
283                              const char *bf_data,
284                              size_t bf_size,
285                              uint32_t mingle,
286                              uint32_t anonymity_level,
287                              uint32_t priority,
288                              int32_t ttl,
289                              GNUNET_PEER_Id sender_pid,
290                              const GNUNET_HashCode *replies_seen,
291                              unsigned int replies_seen_count,
292                              GSF_PendingRequestReplyHandler rh,
293                              void *rh_cls)
294 {
295   struct GSF_PendingRequest *pr;
296   struct GSF_PendingRequest *dpr;
297   
298 #if DEBUG_FS > 1
299   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
300               "Creating request handle for `%s' of type %d\n",
301               GNUNET_h2s (query),
302               type);
303 #endif 
304   pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest));
305   pr->local_result_offset = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
306                                                       UINT64_MAX);                                                       
307   pr->public_data.query = *query;
308   if (GNUNET_BLOCK_TYPE_FS_SBLOCK == type)
309     {
310       GNUNET_assert (NULL != namespace);
311       pr->public_data.namespace = *namespace;
312     }
313   if (NULL != target)
314     {
315       pr->public_data.target = *target;
316       pr->public_data.has_target = GNUNET_YES;
317     }
318   pr->public_data.anonymity_level = anonymity_level;
319   pr->public_data.priority = priority;
320   pr->public_data.original_priority = priority;
321   pr->public_data.options = options;
322   pr->public_data.type = type;  
323   pr->public_data.start_time = GNUNET_TIME_absolute_get ();
324   pr->sender_pid = sender_pid;
325   pr->rh = rh;
326   pr->rh_cls = rh_cls;
327   if (ttl >= 0)
328     pr->public_data.ttl = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
329                                                                                            (uint32_t) ttl));
330   else
331     pr->public_data.ttl = GNUNET_TIME_absolute_subtract (pr->public_data.start_time,
332                                                          GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
333                                                                                         (uint32_t) (- ttl)));
334   if (replies_seen_count > 0)
335     {
336       pr->replies_seen_size = replies_seen_count;
337       pr->replies_seen = GNUNET_malloc (sizeof (GNUNET_HashCode) * pr->replies_seen_size);
338       memcpy (pr->replies_seen,
339               replies_seen,
340               replies_seen_count * sizeof (GNUNET_HashCode));
341       pr->replies_seen_count = replies_seen_count;
342     }
343   if (NULL != bf_data)    
344     {
345       pr->bf = GNUNET_CONTAINER_bloomfilter_init (bf_data,
346                                                   bf_size,
347                                                   BLOOMFILTER_K);
348       pr->mingle = mingle;
349     }
350   else if ( (replies_seen_count > 0) &&
351             (0 != (options & GSF_PRO_BLOOMFILTER_FULL_REFRESH)) )
352     {
353       GNUNET_assert (GNUNET_YES == refresh_bloomfilter (pr));
354     }
355   GNUNET_CONTAINER_multihashmap_put (pr_map,
356                                      query,
357                                      pr,
358                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
359   if (0 != (options & GSF_PRO_REQUEST_EXPIRES))
360     {
361       pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
362                                                 pr,
363                                                 pr->public_data.ttl.abs_value);
364       /* make sure we don't track too many requests */
365       while (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
366         {
367           dpr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
368           GNUNET_assert (dpr != NULL);
369           if (pr == dpr)
370             break; /* let the request live briefly... */
371           dpr->rh (dpr->rh_cls,
372                    GNUNET_BLOCK_EVALUATION_REQUEST_VALID,
373                    dpr,
374                    UINT32_MAX,
375                    GNUNET_TIME_UNIT_FOREVER_ABS,
376                    GNUNET_BLOCK_TYPE_ANY,
377                    NULL, 0);
378           GSF_pending_request_cancel_ (dpr);
379         }
380     }
381   return pr;
382 }
383
384
385 /**
386  * Obtain the public data associated with a pending request
387  * 
388  * @param pr pending request
389  * @return associated public data
390  */
391 struct GSF_PendingRequestData *
392 GSF_pending_request_get_data_ (struct GSF_PendingRequest *pr)
393 {
394   return &pr->public_data;
395 }
396
397
398 /**
399  * Update a given pending request with additional replies
400  * that have been seen.
401  *
402  * @param pr request to update
403  * @param replies_seen hash codes of replies that we've seen
404  * @param replies_seen_count size of the replies_seen array
405  */
406 void
407 GSF_pending_request_update_ (struct GSF_PendingRequest *pr,
408                              const GNUNET_HashCode *replies_seen,
409                              unsigned int replies_seen_count)
410 {
411   unsigned int i;
412   GNUNET_HashCode mhash;
413
414   if (replies_seen_count + pr->replies_seen_count < pr->replies_seen_count)
415     return; /* integer overflow */
416   if (0 != (pr->public_data.options & GSF_PRO_BLOOMFILTER_FULL_REFRESH))
417     {
418       /* we're responsible for the BF, full refresh */
419       if (replies_seen_count + pr->replies_seen_count > pr->replies_seen_size)
420         GNUNET_array_grow (pr->replies_seen,
421                            pr->replies_seen_size,
422                            replies_seen_count + pr->replies_seen_count);
423       memcpy (&pr->replies_seen[pr->replies_seen_count],
424               replies_seen,
425               sizeof (GNUNET_HashCode) * replies_seen_count);
426       pr->replies_seen_count += replies_seen_count;
427       if (GNUNET_NO == refresh_bloomfilter (pr))
428         {
429           /* bf not recalculated, simply extend it with new bits */
430           for (i=0;i<replies_seen_count;i++)
431             {
432               GNUNET_BLOCK_mingle_hash (&replies_seen[i],
433                                         pr->mingle,
434                                         &mhash);
435               GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
436             }
437         }
438     }
439   else
440     {
441       if (NULL == pr->bf)
442         {
443           /* we're not the initiator, but the initiator did not give us
444              any bloom-filter, so we need to create one on-the-fly */
445           pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 
446                                                  UINT32_MAX);
447           pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
448                                                       compute_bloomfilter_size (replies_seen_count),
449                                                       BLOOMFILTER_K);
450         }
451       for (i=0;i<pr->replies_seen_count;i++)
452         {
453           GNUNET_BLOCK_mingle_hash (&replies_seen[i],
454                                     pr->mingle,
455                                     &mhash);
456           GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
457         }
458     }
459 }
460
461
462 /**
463  * Generate the message corresponding to the given pending request for
464  * transmission to other peers (or at least determine its size).
465  *
466  * @param pr request to generate the message for
467  * @param buf_size number of bytes available in buf
468  * @param buf where to copy the message (can be NULL)
469  * @return number of bytes needed (if > buf_size) or used
470  */
471 size_t
472 GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
473                                   size_t buf_size,
474                                   void *buf)
475 {
476   char lbuf[GNUNET_SERVER_MAX_MESSAGE_SIZE];
477   struct GetMessage *gm;
478   GNUNET_HashCode *ext;
479   size_t msize;
480   unsigned int k;
481   uint32_t bm;
482   uint32_t prio;
483   size_t bf_size;
484   struct GNUNET_TIME_Absolute now;
485   int64_t ttl;
486   int do_route;
487
488 #if DEBUG_FS
489   if (buf_size > 0)
490     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
491                 "Building request message for `%s' of type %d\n",
492                 GNUNET_h2s (&pr->public_data.query),
493                 pr->public_data.type);
494 #endif 
495   k = 0;
496   bm = 0;
497   do_route = (0 == (pr->public_data.options & GSF_PRO_FORWARD_ONLY));
498   if (! do_route)
499     {
500       bm |= GET_MESSAGE_BIT_RETURN_TO;
501       k++;      
502     }
503   if (GNUNET_BLOCK_TYPE_FS_SBLOCK == pr->public_data.type)
504     {
505       bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
506       k++;
507     }
508   if (GNUNET_YES == pr->public_data.has_target)
509     {
510       bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
511       k++;
512     }
513   bf_size = GNUNET_CONTAINER_bloomfilter_get_size (pr->bf);
514   msize = sizeof (struct GetMessage) + bf_size + k * sizeof(GNUNET_HashCode);
515   GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
516   if (buf_size < msize)
517     return msize;  
518   gm = (struct GetMessage*) lbuf;
519   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
520   gm->header.size = htons (msize);
521   gm->type = htonl (pr->public_data.type);
522   if (do_route)
523     prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
524                                      pr->public_data.priority + 1);
525   else
526     prio = 0;
527   pr->public_data.priority -= prio;
528   gm->priority = htonl (prio);
529   now = GNUNET_TIME_absolute_get ();
530   ttl = (int64_t) (pr->public_data.ttl.abs_value - now.abs_value);
531   gm->ttl = htonl (ttl / 1000);
532   gm->filter_mutator = htonl(pr->mingle); 
533   gm->hash_bitmap = htonl (bm);
534   gm->query = pr->public_data.query;
535   ext = (GNUNET_HashCode*) &gm[1];
536   k = 0;  
537   if (! do_route)
538     GNUNET_PEER_resolve (pr->sender_pid, 
539                          (struct GNUNET_PeerIdentity*) &ext[k++]);
540   if (GNUNET_BLOCK_TYPE_FS_SBLOCK == pr->public_data.type)
541     memcpy (&ext[k++], 
542             &pr->public_data.namespace, 
543             sizeof (GNUNET_HashCode));
544   if (GNUNET_YES == pr->public_data.has_target)
545     ext[k++] = pr->public_data.target.hashPubKey;
546   if (pr->bf != NULL)
547     GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
548                                                (char*) &ext[k],
549                                                bf_size);
550   memcpy (buf, gm, msize);
551   return msize;
552 }
553
554
555 /**
556  * Iterator to free pending requests.
557  *
558  * @param cls closure, unused
559  * @param key current key code
560  * @param value value in the hash map (pending request)
561  * @return GNUNET_YES (we should continue to iterate)
562  */
563 static int 
564 clean_request (void *cls,
565                const GNUNET_HashCode * key,
566                void *value)
567 {
568   struct GSF_PendingRequest *pr = value;
569   GSF_LocalLookupContinuation cont;
570
571 #if DEBUG_FS
572   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
573               "Cleaning up pending request for `%s'.\n",
574               GNUNET_h2s (key));
575 #endif  
576   if (NULL != (cont = pr->llc_cont))
577     {
578       pr->llc_cont = NULL;
579       cont (pr->llc_cont_cls,
580             pr,
581             pr->local_result);
582     } 
583   GSF_plan_notify_request_done_ (pr);
584   GNUNET_free_non_null (pr->replies_seen);
585   if (NULL != pr->bf)
586     {
587       GNUNET_CONTAINER_bloomfilter_free (pr->bf);
588       pr->bf = NULL;
589     }
590   GNUNET_PEER_change_rc (pr->sender_pid, -1);
591   if (NULL != pr->hnode)
592     {
593       GNUNET_CONTAINER_heap_remove_node (pr->hnode);
594       pr->hnode = NULL;
595     }
596   if (NULL != pr->qe)
597     {
598       GNUNET_DATASTORE_cancel (pr->qe);
599       pr->qe = NULL;
600     }
601   if (NULL != pr->gh)
602     {
603       GNUNET_DHT_get_stop (pr->gh);
604       pr->gh = NULL;
605     }
606   if (GNUNET_SCHEDULER_NO_TASK != pr->warn_task)
607     {
608       GNUNET_SCHEDULER_cancel (pr->warn_task);
609       pr->warn_task = GNUNET_SCHEDULER_NO_TASK;
610     }
611   GNUNET_free (pr);
612   return GNUNET_YES;
613 }
614
615
616 /**
617  * Explicitly cancel a pending request.
618  *
619  * @param pr request to cancel
620  */
621 void
622 GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr)
623 {
624   if (NULL == pr_map) 
625     return; /* already cleaned up! */
626   GNUNET_assert (GNUNET_OK ==
627                  GNUNET_CONTAINER_multihashmap_remove (pr_map,
628                                                        &pr->public_data.query,
629                                                        pr));
630   GNUNET_assert (GNUNET_YES ==
631                  clean_request (NULL, &pr->public_data.query, pr));  
632 }
633
634
635 /**
636  * Iterate over all pending requests.
637  *
638  * @param it function to call for each request
639  * @param cls closure for it
640  */
641 void
642 GSF_iterate_pending_requests_ (GSF_PendingRequestIterator it,
643                                void *cls)
644 {
645   GNUNET_CONTAINER_multihashmap_iterate (pr_map,
646                                          (GNUNET_CONTAINER_HashMapIterator) it,
647                                          cls);
648 }
649
650
651
652
653 /**
654  * Closure for "process_reply" function.
655  */
656 struct ProcessReplyClosure
657 {
658   /**
659    * The data for the reply.
660    */
661   const void *data;
662
663   /**
664    * Who gave us this reply? NULL for local host (or DHT)
665    */
666   struct GSF_ConnectedPeer *sender;
667
668   /**
669    * When the reply expires.
670    */
671   struct GNUNET_TIME_Absolute expiration;
672
673   /**
674    * Size of data.
675    */
676   size_t size;
677
678   /**
679    * Type of the block.
680    */
681   enum GNUNET_BLOCK_Type type;
682
683   /**
684    * How much was this reply worth to us?
685    */
686   uint32_t priority;
687
688   /**
689    * Anonymity requirements for this reply.
690    */
691   uint32_t anonymity_level;
692
693   /**
694    * Evaluation result (returned).
695    */
696   enum GNUNET_BLOCK_EvaluationResult eval;
697
698   /**
699    * Did we find a matching request?
700    */
701   int request_found;
702 };
703
704
705 /**
706  * Update the performance data for the sender (if any) since
707  * the sender successfully answered one of our queries.
708  *
709  * @param prq information about the sender
710  * @param pr request that was satisfied
711  */
712 static void
713 update_request_performance_data (struct ProcessReplyClosure *prq,
714                                  struct GSF_PendingRequest *pr)
715 {
716   if (prq->sender == NULL)
717     return;      
718   GSF_peer_update_performance_ (prq->sender,
719                                 pr->public_data.start_time,
720                                 prq->priority);
721 }
722                                 
723
724 /**
725  * We have received a reply; handle it!
726  *
727  * @param cls response (struct ProcessReplyClosure)
728  * @param key our query
729  * @param value value in the hash map (info about the query)
730  * @return GNUNET_YES (we should continue to iterate)
731  */
732 static int
733 process_reply (void *cls,
734                const GNUNET_HashCode *key,
735                void *value)
736 {
737   struct ProcessReplyClosure *prq = cls;
738   struct GSF_PendingRequest *pr = value;
739   GNUNET_HashCode chash;
740
741 #if DEBUG_FS
742   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
743               "Matched result (type %u) for query `%s' with pending request\n",
744               (unsigned int) prq->type,
745               GNUNET_h2s (key));
746 #endif  
747   GNUNET_STATISTICS_update (GSF_stats,
748                             gettext_noop ("# replies received and matched"),
749                             1,
750                             GNUNET_NO);
751   prq->eval = GNUNET_BLOCK_evaluate (GSF_block_ctx,
752                                      prq->type,
753                                      key,
754                                      &pr->bf,
755                                      pr->mingle,
756                                      &pr->public_data.namespace, 
757                                      (prq->type == GNUNET_BLOCK_TYPE_FS_SBLOCK) ? sizeof (GNUNET_HashCode) : 0,
758                                      prq->data,
759                                      prq->size);
760   switch (prq->eval)
761     {
762     case GNUNET_BLOCK_EVALUATION_OK_MORE:
763       update_request_performance_data (prq, pr);
764       break;
765     case GNUNET_BLOCK_EVALUATION_OK_LAST:
766       /* short cut: stop processing early, no BF-update, etc. */
767       update_request_performance_data (prq, pr);
768       GNUNET_LOAD_update (GSF_rt_entry_lifetime,
769                           GNUNET_TIME_absolute_get_duration (pr->public_data.start_time).rel_value);
770       /* pass on to other peers / local clients */
771       pr->rh (pr->rh_cls,             
772               prq->eval,
773               pr,
774               prq->anonymity_level,
775               prq->expiration,
776               prq->type,
777               prq->data, prq->size);
778       return GNUNET_YES;
779     case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
780       GNUNET_STATISTICS_update (GSF_stats,
781                                 gettext_noop ("# duplicate replies discarded (bloomfilter)"),
782                                 1,
783                                 GNUNET_NO);
784 #if DEBUG_FS && 0
785       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
786                   "Duplicate response `%s', discarding.\n",
787                   GNUNET_h2s (&mhash));
788 #endif
789       return GNUNET_YES; /* duplicate */
790     case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
791       return GNUNET_YES; /* wrong namespace */  
792     case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
793       GNUNET_break (0);
794       return GNUNET_YES;
795     case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
796       GNUNET_break (0);
797       return GNUNET_YES;
798     case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
799       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
800                   _("Unsupported block type %u\n"),
801                   prq->type);
802       return GNUNET_NO;
803     }
804   /* update bloomfilter */
805   GNUNET_CRYPTO_hash (prq->data,
806                       prq->size,
807                       &chash);
808   GSF_pending_request_update_ (pr, &chash, 1);
809   if (NULL == prq->sender)
810     {
811 #if DEBUG_FS
812       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
813                   "Found result for query `%s' in local datastore\n",
814                   GNUNET_h2s (key));
815 #endif
816       GNUNET_STATISTICS_update (GSF_stats,
817                                 gettext_noop ("# results found locally"),
818                                 1,
819                                 GNUNET_NO);      
820     }
821   else
822     {     
823       GSF_dht_lookup_ (pr);
824     }
825   prq->priority += pr->public_data.original_priority;
826   pr->public_data.priority = 0;
827   pr->public_data.original_priority = 0;
828   pr->public_data.results_found++;
829   prq->request_found = GNUNET_YES;
830   /* finally, pass on to other peer / local client */
831   pr->rh (pr->rh_cls,
832           prq->eval,
833           pr, 
834           prq->anonymity_level,
835           prq->expiration,
836           prq->type,
837           prq->data, prq->size);
838   return GNUNET_YES;
839 }
840
841
842 /**
843  * Context for the 'put_migration_continuation'.
844  */
845 struct PutMigrationContext
846 {
847
848   /**
849    * Start time for the operation.
850    */
851   struct GNUNET_TIME_Absolute start;
852
853   /**
854    * Request origin.
855    */
856   struct GNUNET_PeerIdentity origin;
857
858   /**
859    * GNUNET_YES if we had a matching request for this block,
860    * GNUNET_NO if not.
861    */
862   int requested;
863 };
864
865
866 /**
867  * Continuation called to notify client about result of the
868  * operation.
869  *
870  * @param cls closure
871  * @param success GNUNET_SYSERR on failure
872  * @param msg NULL on success, otherwise an error message
873  */
874 static void 
875 put_migration_continuation (void *cls,
876                             int success,
877                             const char *msg)
878 {
879   struct PutMigrationContext *pmc = cls;
880   struct GNUNET_TIME_Relative delay;
881   struct GNUNET_TIME_Relative block_time;  
882   struct GSF_ConnectedPeer *cp;
883   struct GSF_PeerPerformanceData *ppd;
884                          
885   delay = GNUNET_TIME_absolute_get_duration (pmc->start);
886   cp = GSF_peer_get_ (&pmc->origin);
887   if ( (GNUNET_OK != success) &&
888        (GNUNET_NO == pmc->requested) )
889     {
890       /* block migration for a bit... */
891       if (NULL != cp)
892         {
893           ppd = GSF_get_peer_performance_data_ (cp);
894           ppd->migration_duplication++;
895           block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
896                                                       5 * ppd->migration_duplication + 
897                                                       GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5));
898           GSF_block_peer_migration_ (cp, block_time);
899         }
900     }
901   else
902     {
903       if (NULL != cp)
904         {
905           ppd = GSF_get_peer_performance_data_ (cp);
906           ppd->migration_duplication = 0; /* reset counter */
907         }
908     }
909   GNUNET_free (pmc);
910   /* FIXME: should we really update the load value on failure? */
911   GNUNET_LOAD_update (datastore_put_load,
912                       delay.rel_value);
913   if (GNUNET_OK == success)
914     return;
915   GNUNET_STATISTICS_update (GSF_stats,
916                             gettext_noop ("# datastore 'put' failures"),
917                             1,
918                             GNUNET_NO);
919 }
920
921
922 /**
923  * Test if the DATABASE (PUT) load on this peer is too high
924  * to even consider processing the query at
925  * all.  
926  * 
927  * @return GNUNET_YES if the load is too high to do anything (load high)
928  *         GNUNET_NO to process normally (load normal or low)
929  */
930 static int
931 test_put_load_too_high (uint32_t priority)
932 {
933   double ld;
934
935   if (GNUNET_LOAD_get_average (datastore_put_load) < 50)
936     return GNUNET_NO; /* very fast */
937   ld = GNUNET_LOAD_get_load (datastore_put_load);
938   if (ld < 2.0 * (1 + priority))
939     return GNUNET_NO;
940   GNUNET_STATISTICS_update (GSF_stats,
941                             gettext_noop ("# storage requests dropped due to high load"),
942                             1,
943                             GNUNET_NO);
944   return GNUNET_YES;
945 }
946
947
948 /**
949  * Iterator called on each result obtained for a DHT
950  * operation that expects a reply
951  *
952  * @param cls closure
953  * @param exp when will this value expire
954  * @param key key of the result
955  * @param get_path NULL-terminated array of pointers
956  *                 to the peers on reverse GET path (or NULL if not recorded)
957  * @param put_path NULL-terminated array of pointers
958  *                 to the peers on the PUT path (or NULL if not recorded)
959  * @param type type of the result
960  * @param size number of bytes in data
961  * @param data pointer to the result data
962  */
963 static void
964 handle_dht_reply (void *cls,
965                   struct GNUNET_TIME_Absolute exp,
966                   const GNUNET_HashCode *key,
967                   const struct GNUNET_PeerIdentity * const *get_path,
968                   const struct GNUNET_PeerIdentity * const *put_path,
969                   enum GNUNET_BLOCK_Type type,
970                   size_t size,
971                   const void *data)
972 {
973   struct GSF_PendingRequest *pr = cls;
974   struct ProcessReplyClosure prq;
975   struct PutMigrationContext *pmc;
976
977   GNUNET_STATISTICS_update (GSF_stats,
978                             gettext_noop ("# Replies received from DHT"),
979                             1,
980                             GNUNET_NO);
981   memset (&prq, 0, sizeof (prq));
982   prq.data = data;
983   prq.expiration = exp;
984   prq.size = size;  
985   prq.type = type;
986   process_reply (&prq, key, pr);
987   if ( (GNUNET_YES == active_to_migration) &&
988        (GNUNET_NO == test_put_load_too_high (prq.priority)) )
989     {      
990 #if DEBUG_FS
991       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
992                   "Replicating result for query `%s' with priority %u\n",
993                   GNUNET_h2s (key),
994                   prq.priority);
995 #endif
996       pmc = GNUNET_malloc (sizeof (struct PutMigrationContext));
997       pmc->start = GNUNET_TIME_absolute_get ();
998       pmc->requested = GNUNET_YES;
999       if (NULL == 
1000           GNUNET_DATASTORE_put (GSF_dsh,
1001                                 0, key, size, data,
1002                                 type, prq.priority, 1 /* anonymity */, 
1003                                 0 /* replication */,
1004                                 exp, 
1005                                 1 + prq.priority, MAX_DATASTORE_QUEUE,
1006                                 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
1007                                 &put_migration_continuation, 
1008                                 pmc))
1009         {
1010           put_migration_continuation (pmc, GNUNET_NO, NULL);    
1011         }
1012     }
1013 }
1014
1015
1016 /**
1017  * Consider looking up the data in the DHT (anonymity-level permitting).
1018  *
1019  * @param pr the pending request to process
1020  */
1021 void
1022 GSF_dht_lookup_ (struct GSF_PendingRequest *pr)
1023 {
1024   const void *xquery;
1025   size_t xquery_size;
1026   struct GNUNET_PeerIdentity pi;
1027   char buf[sizeof (GNUNET_HashCode) * 2];
1028
1029   if (0 != pr->public_data.anonymity_level)
1030     return;
1031   if (NULL != pr->gh)
1032     {
1033       GNUNET_DHT_get_stop (pr->gh);
1034       pr->gh = NULL;
1035     }
1036   xquery = NULL;
1037   xquery_size = 0;
1038   if (GNUNET_BLOCK_TYPE_FS_SBLOCK == pr->public_data.type)
1039     {
1040       xquery = buf;
1041       memcpy (buf, &pr->public_data.namespace, sizeof (GNUNET_HashCode));
1042       xquery_size = sizeof (GNUNET_HashCode);
1043     }
1044   if (0 != (pr->public_data.options & GSF_PRO_FORWARD_ONLY))
1045     {
1046       GNUNET_PEER_resolve (pr->sender_pid,
1047                            &pi);
1048       memcpy (&buf[xquery_size], &pi, sizeof (struct GNUNET_PeerIdentity));
1049       xquery_size += sizeof (struct GNUNET_PeerIdentity);
1050     }
1051   pr->gh = GNUNET_DHT_get_start (GSF_dht,
1052                                  GNUNET_TIME_UNIT_FOREVER_REL,
1053                                  pr->public_data.type,
1054                                  &pr->public_data.query,
1055                                  DEFAULT_GET_REPLICATION,
1056                                  GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE,
1057                                  pr->bf,
1058                                  pr->mingle,
1059                                  xquery,
1060                                  xquery_size,
1061                                  &handle_dht_reply,
1062                                  pr);
1063 }
1064
1065
1066 /**
1067  * Task that issues a warning if the datastore lookup takes too long.
1068  * 
1069  * @param cls the 'struct GSF_PendingRequest'
1070  * @param tc task context
1071  */
1072 static void
1073 warn_delay_task (void *cls,
1074                  const struct GNUNET_SCHEDULER_TaskContext *tc)
1075 {
1076   struct GSF_PendingRequest *pr = cls;
1077
1078   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1079               _("Datastore lookup already took %llu ms!\n"),
1080               (unsigned long long) GNUNET_TIME_absolute_get_duration (pr->qe_start).rel_value);
1081   pr->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1082                                                 &warn_delay_task,
1083                                                 pr);
1084 }
1085
1086
1087 /**
1088  * Task that issues a warning if the datastore lookup takes too long.
1089  * 
1090  * @param cls the 'struct GSF_PendingRequest'
1091  * @param tc task context
1092  */
1093 static void
1094 odc_warn_delay_task (void *cls,
1095                      const struct GNUNET_SCHEDULER_TaskContext *tc)
1096 {
1097   struct GSF_PendingRequest *pr = cls;
1098
1099   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1100               _("On-demand lookup already took %llu ms!\n"),
1101               (unsigned long long) GNUNET_TIME_absolute_get_duration (pr->qe_start).rel_value);
1102   pr->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1103                                                 &odc_warn_delay_task,
1104                                                 pr);
1105 }
1106
1107
1108 /**
1109  * We're processing (local) results for a search request
1110  * from another peer.  Pass applicable results to the
1111  * peer and if we are done either clean up (operation
1112  * complete) or forward to other peers (more results possible).
1113  *
1114  * @param cls our closure (struct PendingRequest)
1115  * @param key key for the content
1116  * @param size number of bytes in data
1117  * @param data content stored
1118  * @param type type of the content
1119  * @param priority priority of the content
1120  * @param anonymity anonymity-level for the content
1121  * @param expiration expiration time for the content
1122  * @param uid unique identifier for the datum;
1123  *        maybe 0 if no unique identifier is available
1124  */
1125 static void
1126 process_local_reply (void *cls,
1127                      const GNUNET_HashCode *key,
1128                      size_t size,
1129                      const void *data,
1130                      enum GNUNET_BLOCK_Type type,
1131                      uint32_t priority,
1132                      uint32_t anonymity,
1133                      struct GNUNET_TIME_Absolute expiration, 
1134                      uint64_t uid)
1135 {
1136   struct GSF_PendingRequest *pr = cls;
1137   GSF_LocalLookupContinuation cont;
1138   struct ProcessReplyClosure prq;
1139   GNUNET_HashCode query;
1140   unsigned int old_rf;
1141   
1142   pr->qe = NULL;
1143   GNUNET_SCHEDULER_cancel (pr->warn_task);
1144   pr->warn_task = GNUNET_SCHEDULER_NO_TASK;
1145   if (GNUNET_NO == pr->have_first_uid)
1146     {
1147       pr->first_uid = uid;
1148       pr->have_first_uid = GNUNET_YES;
1149     }
1150   else
1151     {
1152       if (uid == pr->first_uid)
1153         {
1154           GNUNET_STATISTICS_update (GSF_stats,
1155                                     gettext_noop ("# Datastore lookups concluded"),
1156                                     1,
1157                                     GNUNET_NO);
1158           key = NULL; /* all replies seen! */
1159         }
1160     }
1161   if (NULL == key)
1162     {
1163 #if DEBUG_FS > 1
1164       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1165                   "No further local responses available.\n");
1166 #endif
1167       if ( (pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK) ||
1168            (pr->public_data.type == GNUNET_BLOCK_TYPE_FS_IBLOCK) )
1169         GNUNET_STATISTICS_update (GSF_stats,
1170                                   gettext_noop ("# requested DBLOCK or IBLOCK not found"),
1171                                   1,
1172                                   GNUNET_NO);
1173       goto check_error_and_continue;
1174     }
1175 #if DEBUG_FS
1176   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1177               "Received reply for `%s' of type %d with UID %llu from datastore.\n",
1178               GNUNET_h2s (key),
1179               type,
1180               (unsigned long long) uid);
1181 #endif
1182   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1183     {
1184 #if DEBUG_FS > 1
1185       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1186                   "Found ONDEMAND block, performing on-demand encoding\n");
1187 #endif
1188       GNUNET_STATISTICS_update (GSF_stats,
1189                                 gettext_noop ("# on-demand blocks matched requests"),
1190                                 1,
1191                                 GNUNET_NO);
1192       pr->qe_start = GNUNET_TIME_absolute_get ();
1193       pr->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1194                                                     &odc_warn_delay_task,
1195                                                     pr);
1196       if (GNUNET_OK == 
1197           GNUNET_FS_handle_on_demand_block (key, size, data, type, priority, 
1198                                             anonymity, expiration, uid, 
1199                                             &process_local_reply,
1200                                             pr))
1201         {
1202           GNUNET_STATISTICS_update (GSF_stats,
1203                                     gettext_noop ("# on-demand lookups performed successfully"),
1204                                     1,
1205                                     GNUNET_NO);
1206           return; /* we're done */
1207         }
1208       GNUNET_STATISTICS_update (GSF_stats,
1209                                 gettext_noop ("# on-demand lookups failed"),
1210                                 1,
1211                                 GNUNET_NO);
1212       GNUNET_SCHEDULER_cancel (pr->warn_task);
1213       pr->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1214                                                     &warn_delay_task,
1215                                                     pr);        
1216       pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
1217                                          pr->local_result_offset - 1,
1218                                          &pr->public_data.query,
1219                                          pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK 
1220                                          ? GNUNET_BLOCK_TYPE_ANY 
1221                                          : pr->public_data.type, 
1222                                          (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options))
1223                                          ? UINT_MAX
1224                                          : 1 /* queue priority */,
1225                                          (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options))
1226                                          ? UINT_MAX
1227                                          : 1 /* max queue size */,
1228                                          GNUNET_TIME_UNIT_FOREVER_REL,
1229                                          &process_local_reply,
1230                                          pr);
1231       if (NULL != pr->qe)
1232         return; /* we're done */        
1233       goto check_error_and_continue;
1234     }
1235   old_rf = pr->public_data.results_found;
1236   memset (&prq, 0, sizeof (prq));
1237   prq.data = data;
1238   prq.expiration = expiration;
1239   prq.size = size;  
1240   if (GNUNET_OK != 
1241       GNUNET_BLOCK_get_key (GSF_block_ctx,
1242                             type,
1243                             data,
1244                             size,
1245                             &query))
1246     {
1247       GNUNET_break (0);
1248       GNUNET_DATASTORE_remove (GSF_dsh,
1249                                key,
1250                                size, data,
1251                                -1, -1, 
1252                                GNUNET_TIME_UNIT_FOREVER_REL,
1253                                NULL, NULL);
1254       pr->qe_start = GNUNET_TIME_absolute_get ();
1255       pr->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1256                                                     &warn_delay_task,
1257                                                     pr);
1258       pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
1259                                          pr->local_result_offset - 1,
1260                                          &pr->public_data.query,
1261                                          pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK 
1262                                          ? GNUNET_BLOCK_TYPE_ANY 
1263                                          : pr->public_data.type, 
1264                                          (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options))
1265                                          ? UINT_MAX
1266                                          : 1 /* queue priority */,
1267                                          (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options))
1268                                          ? UINT_MAX
1269                                          : 1 /* max queue size */,
1270                                          GNUNET_TIME_UNIT_FOREVER_REL,
1271                                          &process_local_reply,
1272                                          pr);
1273       if (pr->qe == NULL)       
1274         goto check_error_and_continue;  
1275       return;
1276     }
1277   prq.type = type;
1278   prq.priority = priority;  
1279   prq.request_found = GNUNET_NO;
1280   prq.anonymity_level = anonymity;
1281   if ( (old_rf == 0) &&
1282        (pr->public_data.results_found == 0) )
1283     GSF_update_datastore_delay_ (pr->public_data.start_time);
1284   process_reply (&prq, key, pr);
1285   pr->local_result = prq.eval;
1286   if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
1287     goto check_error_and_continue;
1288   if ( (0 == (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) &&
1289        ( (GNUNET_YES == GSF_test_get_load_too_high_ (0)) ||
1290          (pr->public_data.results_found > 5 + 2 * pr->public_data.priority) ) )
1291     {
1292 #if DEBUG_FS > 2
1293       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1294                   "Load too high, done with request\n");
1295 #endif
1296       GNUNET_STATISTICS_update (GSF_stats,
1297                                 gettext_noop ("# processing result set cut short due to load"),
1298                                 1,
1299                                 GNUNET_NO);
1300       goto check_error_and_continue;
1301     }
1302   pr->qe_start = GNUNET_TIME_absolute_get ();
1303   pr->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1304                                                 &warn_delay_task,
1305                                                 pr);
1306   pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
1307                                      pr->local_result_offset++,
1308                                      &pr->public_data.query,
1309                                      pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK 
1310                                      ? GNUNET_BLOCK_TYPE_ANY 
1311                                      : pr->public_data.type, 
1312                                      (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options))
1313                                      ? UINT_MAX
1314                                      : 1 /* queue priority */,
1315                                      (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options))
1316                                      ? UINT_MAX
1317                                      : 1 /* max queue size */,
1318                                      GNUNET_TIME_UNIT_FOREVER_REL,
1319                                      &process_local_reply,
1320                                      pr);
1321   /* check if we successfully queued another datastore request;
1322      if so, return, otherwise call our continuation (if we have
1323      any) */
1324  check_error_and_continue:
1325   if (NULL != pr->qe)
1326     return;
1327   if (GNUNET_SCHEDULER_NO_TASK != pr->warn_task)
1328     {
1329       GNUNET_SCHEDULER_cancel (pr->warn_task);
1330       pr->warn_task = GNUNET_SCHEDULER_NO_TASK;
1331     }
1332   if (NULL == (cont = pr->llc_cont))
1333     return; /* no continuation */      
1334   pr->llc_cont = NULL;
1335   cont (pr->llc_cont_cls,
1336         pr,
1337         pr->local_result);
1338 }
1339
1340
1341 /**
1342  * Look up the request in the local datastore.
1343  *
1344  * @param pr the pending request to process
1345  * @param cont function to call at the end
1346  * @param cont_cls closure for cont
1347  */
1348 void
1349 GSF_local_lookup_ (struct GSF_PendingRequest *pr,
1350                    GSF_LocalLookupContinuation cont,
1351                    void *cont_cls)
1352 {
1353   GNUNET_assert (NULL == pr->gh);
1354   GNUNET_assert (NULL == pr->llc_cont);
1355   pr->llc_cont = cont;
1356   pr->llc_cont_cls = cont_cls;
1357   pr->qe_start = GNUNET_TIME_absolute_get ();
1358   pr->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1359                                                 &warn_delay_task,
1360                                                 pr);
1361   GNUNET_STATISTICS_update (GSF_stats,
1362                             gettext_noop ("# Datastore lookups initiated"),
1363                             1,
1364                             GNUNET_NO);
1365   pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
1366                                      pr->local_result_offset++,
1367                                      &pr->public_data.query,
1368                                      pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK 
1369                                      ? GNUNET_BLOCK_TYPE_ANY 
1370                                      : pr->public_data.type, 
1371                                      (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options))
1372                                      ? UINT_MAX
1373                                      : 1 /* queue priority */,
1374                                      (0 != (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options))
1375                                      ? UINT_MAX
1376                                      : 1 /* max queue size */,
1377                                      GNUNET_TIME_UNIT_FOREVER_REL,
1378                                      &process_local_reply,
1379                                      pr);
1380   if (NULL != pr->qe)
1381     return;
1382   GNUNET_SCHEDULER_cancel (pr->warn_task);
1383   pr->warn_task = GNUNET_SCHEDULER_NO_TASK;
1384   pr->llc_cont = NULL;
1385   if (NULL != cont)
1386     cont (cont_cls, pr, pr->local_result);      
1387 }
1388
1389
1390
1391 /**
1392  * Handle P2P "CONTENT" message.  Checks that the message is
1393  * well-formed and then checks if there are any pending requests for
1394  * this content and possibly passes it on (to local clients or other
1395  * peers).  Does NOT perform migration (content caching at this peer).
1396  *
1397  * @param cp the other peer involved (sender or receiver, NULL
1398  *        for loopback messages where we are both sender and receiver)
1399  * @param message the actual message
1400  * @return GNUNET_OK if the message was well-formed,
1401  *         GNUNET_SYSERR if the message was malformed (close connection,
1402  *         do not cache under any circumstances)
1403  */
1404 int
1405 GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
1406                          const struct GNUNET_MessageHeader *message)
1407 {
1408   const struct PutMessage *put;
1409   uint16_t msize;
1410   size_t dsize;
1411   enum GNUNET_BLOCK_Type type;
1412   struct GNUNET_TIME_Absolute expiration;
1413   GNUNET_HashCode query;
1414   struct ProcessReplyClosure prq;
1415   struct GNUNET_TIME_Relative block_time;  
1416   double putl;
1417   struct PutMigrationContext *pmc;
1418
1419   msize = ntohs (message->size);
1420   if (msize < sizeof (struct PutMessage))
1421     {
1422       GNUNET_break_op(0);
1423       return GNUNET_SYSERR;
1424     }
1425   put = (const struct PutMessage*) message;
1426   dsize = msize - sizeof (struct PutMessage);
1427   type = ntohl (put->type);
1428   expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
1429   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1430     return GNUNET_SYSERR;
1431   if (GNUNET_OK !=
1432       GNUNET_BLOCK_get_key (GSF_block_ctx,
1433                             type,
1434                             &put[1],
1435                             dsize,
1436                             &query))
1437     {
1438       GNUNET_break_op (0);
1439       return GNUNET_SYSERR;
1440     }
1441   GNUNET_STATISTICS_update (GSF_stats,
1442                             gettext_noop ("# GAP PUT messages received"),
1443                             1,
1444                             GNUNET_NO);
1445   /* now, lookup 'query' */
1446   prq.data = (const void*) &put[1];
1447   if (NULL != cp)
1448     prq.sender = cp;
1449   else
1450     prq.sender = NULL;
1451   prq.size = dsize;
1452   prq.type = type;
1453   prq.expiration = expiration;
1454   prq.priority = 0;
1455   prq.anonymity_level = UINT32_MAX;
1456   prq.request_found = GNUNET_NO;
1457   GNUNET_CONTAINER_multihashmap_get_multiple (pr_map,
1458                                               &query,
1459                                               &process_reply,
1460                                               &prq);
1461   if (NULL != cp)
1462     {
1463       GSF_connected_peer_change_preference_ (cp, CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority);
1464       GSF_get_peer_performance_data_ (cp)->trust += prq.priority;
1465     }
1466   if ( (GNUNET_YES == active_to_migration) &&
1467        (GNUNET_NO == test_put_load_too_high (prq.priority)) )
1468     {      
1469 #if DEBUG_FS
1470       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1471                   "Replicating result for query `%s' with priority %u\n",
1472                   GNUNET_h2s (&query),
1473                   prq.priority);
1474 #endif
1475       pmc = GNUNET_malloc (sizeof (struct PutMigrationContext));
1476       pmc->start = GNUNET_TIME_absolute_get ();
1477       pmc->requested = prq.request_found;
1478       GNUNET_PEER_resolve (GSF_get_peer_performance_data_ (cp)->pid,
1479                            &pmc->origin);
1480       if (NULL ==
1481           GNUNET_DATASTORE_put (GSF_dsh,
1482                                 0, &query, dsize, &put[1],
1483                                 type, prq.priority, 1 /* anonymity */, 
1484                                 0 /* replication */,
1485                                 expiration, 
1486                                 1 + prq.priority, MAX_DATASTORE_QUEUE,
1487                                 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
1488                                 &put_migration_continuation, 
1489                                 pmc))
1490         {
1491           put_migration_continuation (pmc, GNUNET_NO, NULL);      
1492         }
1493     }
1494   else
1495     {
1496 #if DEBUG_FS
1497       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1498                   "Choosing not to keep content `%s' (%d/%d)\n",
1499                   GNUNET_h2s (&query),
1500                   active_to_migration,
1501                   test_put_load_too_high (prq.priority));
1502 #endif
1503     }
1504   putl = GNUNET_LOAD_get_load (datastore_put_load);
1505   if ( (NULL != (cp = prq.sender)) &&
1506        (GNUNET_NO == prq.request_found) &&
1507        ( (GNUNET_YES != active_to_migration) ||
1508          (putl > 2.5 * (1 + prq.priority)) ) ) 
1509     {
1510       if (GNUNET_YES != active_to_migration) 
1511         putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
1512       block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
1513                                                   5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1514                                                                                    (unsigned int) (60000 * putl * putl)));
1515       GSF_block_peer_migration_ (cp, block_time);
1516     }  
1517   return GNUNET_OK;
1518 }
1519
1520
1521 /**
1522  * Setup the subsystem.
1523  */
1524 void
1525 GSF_pending_request_init_ ()
1526 {
1527   if (GNUNET_OK !=
1528       GNUNET_CONFIGURATION_get_value_number (GSF_cfg,
1529                                              "fs",
1530                                              "MAX_PENDING_REQUESTS",
1531                                              &max_pending_requests))
1532     {
1533       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1534                   _("Configuration fails to specify `%s', assuming default value."),
1535                   "MAX_PENDING_REQUESTS");
1536     }
1537   active_to_migration = GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg,
1538                                                               "FS",
1539                                                               "CONTENT_CACHING");
1540   datastore_put_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
1541   pr_map = GNUNET_CONTAINER_multihashmap_create (32 * 1024);
1542   requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); 
1543 }
1544
1545
1546 /**
1547  * Shutdown the subsystem.
1548  */
1549 void
1550 GSF_pending_request_done_ ()
1551 {
1552   GNUNET_CONTAINER_multihashmap_iterate (pr_map,
1553                                          &clean_request,
1554                                          NULL);
1555   GNUNET_CONTAINER_multihashmap_destroy (pr_map);
1556   pr_map = NULL;
1557   GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
1558   requests_by_expiration_heap = NULL;
1559   GNUNET_LOAD_value_free (datastore_put_load);
1560   datastore_put_load = NULL;
1561 }
1562
1563
1564 /* end of gnunet-service-fs_pr.c */