error handling
[oweals/gnunet.git] / src / fs / gnunet-service-fs_pr.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009-2013 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19  */
20
21 /**
22  * @file fs/gnunet-service-fs_pr.c
23  * @brief API to handle pending requests
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_util_lib.h"
28 #include "gnunet_load_lib.h"
29 #include "gnunet-service-fs.h"
30 #include "gnunet-service-fs_cp.h"
31 #include "gnunet-service-fs_indexing.h"
32 #include "gnunet-service-fs_pe.h"
33 #include "gnunet-service-fs_pr.h"
34 #include "gnunet-service-fs_cadet.h"
35
36
37 /**
38  * Desired replication level for GETs.
39  */
40 #define DHT_GET_REPLICATION 5
41
42 /**
43  * Maximum size of the datastore queue for P2P operations.  Needs to
44  * be large enough to queue #MAX_QUEUE_PER_PEER operations for roughly
45  * the number of active (connected) peers.
46  */
47 #define MAX_DATASTORE_QUEUE (16 * MAX_QUEUE_PER_PEER)
48
49 /**
50  * Bandwidth value of a 0-priority content (must be fairly high
51  * compared to query since content is typically significantly larger
52  * -- and more valueable since it can take many queries to get one
53  * piece of content).
54  */
55 #define CONTENT_BANDWIDTH_VALUE 800
56
57 /**
58  * Hard limit on the number of results we may get from the datastore per query.
59  */
60 #define MAX_RESULTS (100 * 1024)
61
62 /**
63  * Collect an instane number of statistics?  May cause excessive IPC.
64  */
65 #define INSANE_STATISTICS GNUNET_NO
66
67 /**
68  * If obtaining a block via cadet fails, how often do we retry it before
69  * giving up for good (and sticking to non-anonymous transfer)?
70  */
71 #define CADET_RETRY_MAX 3
72
73
74 /**
75  * An active request.
76  */
77 struct GSF_PendingRequest
78 {
79   /**
80    * Public data for the request.
81    */
82   struct GSF_PendingRequestData public_data;
83
84   /**
85    * Function to call if we encounter a reply.
86    */
87   GSF_PendingRequestReplyHandler rh;
88
89   /**
90    * Closure for @e rh
91    */
92   void *rh_cls;
93
94   /**
95    * Array of hash codes of replies we've already seen.
96    */
97   struct GNUNET_HashCode *replies_seen;
98
99   /**
100    * Block group for filtering replies we've already seen.
101    */
102   struct GNUNET_BLOCK_Group *bg;
103
104   /**
105    * Entry for this pending request in the expiration heap, or NULL.
106    */
107   struct GNUNET_CONTAINER_HeapNode *hnode;
108
109   /**
110    * Datastore queue entry for this request (or NULL for none).
111    */
112   struct GNUNET_DATASTORE_QueueEntry *qe;
113
114   /**
115    * DHT request handle for this request (or NULL for none).
116    */
117   struct GNUNET_DHT_GetHandle *gh;
118
119   /**
120    * Cadet request handle for this request (or NULL for none).
121    */
122   struct GSF_CadetRequest *cadet_request;
123
124   /**
125    * Function to call upon completion of the local get
126    * request, or NULL for none.
127    */
128   GSF_LocalLookupContinuation llc_cont;
129
130   /**
131    * Closure for @e llc_cont.
132    */
133   void *llc_cont_cls;
134
135   /**
136    * Last result from the local datastore lookup evaluation.
137    */
138   enum GNUNET_BLOCK_EvaluationResult local_result;
139
140   /**
141    * Identity of the peer that we should use for the 'sender'
142    * (recipient of the response) when forwarding (0 for none).
143    */
144   GNUNET_PEER_Id sender_pid;
145
146   /**
147    * Identity of the peer that we should never forward this query
148    * to since it originated this query (0 for none).
149    */
150   GNUNET_PEER_Id origin_pid;
151
152   /**
153    * Time we started the last datastore lookup.
154    */
155   struct GNUNET_TIME_Absolute qe_start;
156
157   /**
158    * Task that warns us if the local datastore lookup takes too long.
159    */
160   struct GNUNET_SCHEDULER_Task *warn_task;
161
162   /**
163    * Do we have a first UID yet?
164    */
165   bool have_first_uid;
166
167   /**
168    * Have we seen a NULL result yet?
169    */
170   bool seen_null;
171
172   /**
173    * Unique ID of the first result from the local datastore;
174    * used to terminate the loop.
175    */
176   uint64_t first_uid;
177
178   /**
179    * Result count.
180    */
181   size_t result_count;
182
183   /**
184    * How often have we retried this request via 'cadet'?
185    * (used to bound overall retries).
186    */
187   unsigned int cadet_retry_count;
188
189   /**
190    * Number of valid entries in the 'replies_seen' array.
191    */
192   unsigned int replies_seen_count;
193
194   /**
195    * Length of the 'replies_seen' array.
196    */
197   unsigned int replies_seen_size;
198 };
199
200
201 /**
202  * All pending requests, ordered by the query.  Entries
203  * are of type 'struct GSF_PendingRequest*'.
204  */
205 static struct GNUNET_CONTAINER_MultiHashMap *pr_map;
206
207
208 /**
209  * Datastore 'PUT' load tracking.
210  */
211 static struct GNUNET_LOAD_Value *datastore_put_load;
212
213
214 /**
215  * Are we allowed to migrate content to this peer.
216  */
217 static int active_to_migration;
218
219
220 /**
221  * Heap with the request that will expire next at the top.  Contains
222  * pointers of type "struct PendingRequest*"; these will *also* be
223  * aliased from the "requests_by_peer" data structures and the
224  * "requests_by_query" table.  Note that requests from our clients
225  * don't expire and are thus NOT in the "requests_by_expiration"
226  * (or the "requests_by_peer" tables).
227  */
228 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
229
230
231 /**
232  * Maximum number of requests (from other peers, overall) that we're
233  * willing to have pending at any given point in time.  Can be changed
234  * via the configuration file (32k is just the default).
235  */
236 static unsigned long long max_pending_requests = (32 * 1024);
237
238
239 /**
240  * Recalculate our bloom filter for filtering replies.  This function
241  * will create a new bloom filter from scratch, so it should only be
242  * called if we have no bloomfilter at all (and hence can create a
243  * fresh one of minimal size without problems) OR if our peer is the
244  * initiator (in which case we may resize to larger than mimimum size).
245  *
246  * @param type type of the request
247  * @param pr request for which the BF is to be recomputed
248  */
249 static void
250 refresh_bloomfilter (enum GNUNET_BLOCK_Type type, struct GSF_PendingRequest *pr)
251 {
252   if (NULL != pr->bg)
253   {
254     GNUNET_BLOCK_group_destroy (pr->bg);
255     pr->bg = NULL;
256   }
257   if (GNUNET_BLOCK_TYPE_FS_UBLOCK != type)
258     return; /* no need */
259   pr->bg =
260     GNUNET_BLOCK_group_create (GSF_block_ctx,
261                                type,
262                                GNUNET_CRYPTO_random_u32 (
263                                  GNUNET_CRYPTO_QUALITY_WEAK,
264                                  UINT32_MAX),
265                                NULL,
266                                0,
267                                "seen-set-size",
268                                pr->replies_seen_count,
269                                NULL);
270   if (NULL == pr->bg)
271     return;
272   GNUNET_break (GNUNET_OK ==
273                 GNUNET_BLOCK_group_set_seen (pr->bg,
274                                              pr->replies_seen,
275                                              pr->replies_seen_count));
276 }
277
278
279 /**
280  * Create a new pending request.
281  *
282  * @param options request options
283  * @param type type of the block that is being requested
284  * @param query key for the lookup
285  * @param target preferred target for the request, NULL for none
286  * @param bf_data raw data for bloom filter for known replies, can be NULL
287  * @param bf_size number of bytes in @a bf_data
288  * @param mingle mingle value for bf
289  * @param anonymity_level desired anonymity level
290  * @param priority maximum outgoing cummulative request priority to use
291  * @param ttl current time-to-live for the request
292  * @param sender_pid peer ID to use for the sender when forwarding, 0 for none
293  * @param origin_pid peer ID of origin of query (do not loop back)
294  * @param replies_seen hash codes of known local replies
295  * @param replies_seen_count size of the @a replies_seen array
296  * @param rh handle to call when we get a reply
297  * @param rh_cls closure for @a rh
298  * @return handle for the new pending request
299  */
300 struct GSF_PendingRequest *
301 GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
302                              enum GNUNET_BLOCK_Type type,
303                              const struct GNUNET_HashCode *query,
304                              const struct GNUNET_PeerIdentity *target,
305                              const char *bf_data,
306                              size_t bf_size,
307                              uint32_t mingle,
308                              uint32_t anonymity_level,
309                              uint32_t priority,
310                              int32_t ttl,
311                              GNUNET_PEER_Id sender_pid,
312                              GNUNET_PEER_Id origin_pid,
313                              const struct GNUNET_HashCode *replies_seen,
314                              unsigned int replies_seen_count,
315                              GSF_PendingRequestReplyHandler rh,
316                              void *rh_cls)
317 {
318   struct GSF_PendingRequest *pr;
319   struct GSF_PendingRequest *dpr;
320   size_t extra;
321   struct GNUNET_HashCode *eptr;
322
323   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
324               "Creating request handle for `%s' of type %d\n",
325               GNUNET_h2s (query),
326               type);
327 #if INSANE_STATISTICS
328   GNUNET_STATISTICS_update (GSF_stats,
329                             gettext_noop ("# Pending requests created"),
330                             1,
331                             GNUNET_NO);
332 #endif
333   extra = 0;
334   if (NULL != target)
335     extra += sizeof(struct GNUNET_PeerIdentity);
336   pr = GNUNET_malloc (sizeof(struct GSF_PendingRequest) + extra);
337   pr->public_data.query = *query;
338   eptr = (struct GNUNET_HashCode *) &pr[1];
339   if (NULL != target)
340   {
341     pr->public_data.target = (struct GNUNET_PeerIdentity *) eptr;
342     GNUNET_memcpy (eptr, target, sizeof(struct GNUNET_PeerIdentity));
343   }
344   pr->public_data.anonymity_level = anonymity_level;
345   pr->public_data.priority = priority;
346   pr->public_data.original_priority = priority;
347   pr->public_data.options = options;
348   pr->public_data.type = type;
349   pr->public_data.start_time = GNUNET_TIME_absolute_get ();
350   pr->sender_pid = sender_pid;
351   pr->origin_pid = origin_pid;
352   pr->rh = rh;
353   pr->rh_cls = rh_cls;
354   GNUNET_assert ((sender_pid != 0) || (0 == (options & GSF_PRO_FORWARD_ONLY)));
355   if (ttl >= 0)
356     pr->public_data.ttl = GNUNET_TIME_relative_to_absolute (
357       GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, (uint32_t) ttl));
358   else
359     pr->public_data.ttl = GNUNET_TIME_absolute_subtract (
360       pr->public_data.start_time,
361       GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
362                                      (uint32_t) (-ttl)));
363   if (replies_seen_count > 0)
364   {
365     pr->replies_seen_size = replies_seen_count;
366     pr->replies_seen =
367       GNUNET_new_array (pr->replies_seen_size, struct GNUNET_HashCode);
368     GNUNET_memcpy (pr->replies_seen,
369                    replies_seen,
370                    replies_seen_count * sizeof(struct GNUNET_HashCode));
371     pr->replies_seen_count = replies_seen_count;
372   }
373   if ((NULL != bf_data) &&
374       (GNUNET_BLOCK_TYPE_FS_UBLOCK == pr->public_data.type))
375   {
376     pr->bg = GNUNET_BLOCK_group_create (GSF_block_ctx,
377                                         pr->public_data.type,
378                                         mingle,
379                                         bf_data,
380                                         bf_size,
381                                         "seen-set-size",
382                                         0,
383                                         NULL);
384   }
385   else if ((replies_seen_count > 0) &&
386            (0 != (options & GSF_PRO_BLOOMFILTER_FULL_REFRESH)))
387   {
388     refresh_bloomfilter (pr->public_data.type, pr);
389   }
390   GNUNET_CONTAINER_multihashmap_put (pr_map,
391                                      &pr->public_data.query,
392                                      pr,
393                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
394   if (0 == (options & GSF_PRO_REQUEST_NEVER_EXPIRES))
395   {
396     pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
397                                               pr,
398                                               pr->public_data.ttl.abs_value_us);
399     /* make sure we don't track too many requests */
400     while (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) >
401            max_pending_requests)
402     {
403       dpr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
404       GNUNET_assert (NULL != dpr);
405       if (pr == dpr)
406         break;     /* let the request live briefly... */
407       if (NULL != dpr->rh)
408         dpr->rh (dpr->rh_cls,
409                  GNUNET_BLOCK_EVALUATION_REQUEST_VALID,
410                  dpr,
411                  UINT32_MAX,
412                  GNUNET_TIME_UNIT_FOREVER_ABS,
413                  GNUNET_TIME_UNIT_FOREVER_ABS,
414                  GNUNET_BLOCK_TYPE_ANY,
415                  NULL,
416                  0);
417       GSF_pending_request_cancel_ (dpr, GNUNET_YES);
418     }
419   }
420   GNUNET_STATISTICS_update (GSF_stats,
421                             gettext_noop ("# Pending requests active"),
422                             1,
423                             GNUNET_NO);
424   return pr;
425 }
426
427
428 /**
429  * Obtain the public data associated with a pending request
430  *
431  * @param pr pending request
432  * @return associated public data
433  */
434 struct GSF_PendingRequestData *
435 GSF_pending_request_get_data_ (struct GSF_PendingRequest *pr)
436 {
437   return &pr->public_data;
438 }
439
440
441 /**
442  * Test if two pending requests are compatible (would generate
443  * the same query modulo filters and should thus be processed
444  * jointly).
445  *
446  * @param pra a pending request
447  * @param prb another pending request
448  * @return #GNUNET_OK if the requests are compatible
449  */
450 int
451 GSF_pending_request_is_compatible_ (struct GSF_PendingRequest *pra,
452                                     struct GSF_PendingRequest *prb)
453 {
454   if ((pra->public_data.type != prb->public_data.type) ||
455       (0 != memcmp (&pra->public_data.query,
456                     &prb->public_data.query,
457                     sizeof(struct GNUNET_HashCode))))
458     return GNUNET_NO;
459   return GNUNET_OK;
460 }
461
462
463 /**
464  * Update a given pending request with additional replies
465  * that have been seen.
466  *
467  * @param pr request to update
468  * @param replies_seen hash codes of replies that we've seen
469  * @param replies_seen_count size of the replies_seen array
470  */
471 void
472 GSF_pending_request_update_ (struct GSF_PendingRequest *pr,
473                              const struct GNUNET_HashCode *replies_seen,
474                              unsigned int replies_seen_count)
475 {
476   if (replies_seen_count + pr->replies_seen_count < pr->replies_seen_count)
477     return; /* integer overflow */
478   if (0 != (pr->public_data.options & GSF_PRO_BLOOMFILTER_FULL_REFRESH))
479   {
480     /* we're responsible for the BF, full refresh */
481     if (replies_seen_count + pr->replies_seen_count > pr->replies_seen_size)
482       GNUNET_array_grow (pr->replies_seen,
483                          pr->replies_seen_size,
484                          replies_seen_count + pr->replies_seen_count);
485     GNUNET_memcpy (&pr->replies_seen[pr->replies_seen_count],
486                    replies_seen,
487                    sizeof(struct GNUNET_HashCode) * replies_seen_count);
488     pr->replies_seen_count += replies_seen_count;
489     refresh_bloomfilter (pr->public_data.type, pr);
490   }
491   else
492   {
493     if (NULL == pr->bg)
494     {
495       /* we're not the initiator, but the initiator did not give us
496        * any bloom-filter, so we need to create one on-the-fly */
497       refresh_bloomfilter (pr->public_data.type, pr);
498     }
499     else
500     {
501       GNUNET_break (GNUNET_OK ==
502                     GNUNET_BLOCK_group_set_seen (pr->bg,
503                                                  replies_seen,
504                                                  pr->replies_seen_count));
505     }
506   }
507   if (NULL != pr->gh)
508     GNUNET_DHT_get_filter_known_results (pr->gh,
509                                          replies_seen_count,
510                                          replies_seen);
511 }
512
513
514 /**
515  * Generate the message corresponding to the given pending request for
516  * transmission to other peers.
517  *
518  * @param pr request to generate the message for
519  * @return envelope with the request message
520  */
521 struct GNUNET_MQ_Envelope *
522 GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr)
523 {
524   struct GNUNET_MQ_Envelope *env;
525   struct GetMessage *gm;
526   struct GNUNET_PeerIdentity *ext;
527   unsigned int k;
528   uint32_t bm;
529   uint32_t prio;
530   size_t bf_size;
531   struct GNUNET_TIME_Absolute now;
532   int64_t ttl;
533   int do_route;
534   void *bf_data;
535   uint32_t bf_nonce;
536
537   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
538               "Building request message for `%s' of type %d\n",
539               GNUNET_h2s (&pr->public_data.query),
540               pr->public_data.type);
541   k = 0;
542   bm = 0;
543   do_route = (0 == (pr->public_data.options & GSF_PRO_FORWARD_ONLY));
544   if ((! do_route) && (pr->sender_pid == 0))
545   {
546     GNUNET_break (0);
547     do_route = GNUNET_YES;
548   }
549   if (! do_route)
550   {
551     bm |= GET_MESSAGE_BIT_RETURN_TO;
552     k++;
553   }
554   if (NULL != pr->public_data.target)
555   {
556     bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
557     k++;
558   }
559   if (GNUNET_OK !=
560       GNUNET_BLOCK_group_serialize (pr->bg, &bf_nonce, &bf_data, &bf_size))
561   {
562     bf_size = 0;
563     bf_data = NULL;
564   }
565   env = GNUNET_MQ_msg_extra (gm,
566                              bf_size + k * sizeof(struct GNUNET_PeerIdentity),
567                              GNUNET_MESSAGE_TYPE_FS_GET);
568   gm->type = htonl (pr->public_data.type);
569   if (do_route)
570     prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
571                                      pr->public_data.priority + 1);
572   else
573     prio = 0;
574   pr->public_data.priority -= prio;
575   pr->public_data.num_transmissions++;
576   pr->public_data.respect_offered += prio;
577   gm->priority = htonl (prio);
578   now = GNUNET_TIME_absolute_get ();
579   ttl = (int64_t) (pr->public_data.ttl.abs_value_us - now.abs_value_us);
580   gm->ttl = htonl (ttl / 1000LL / 1000LL);
581   gm->filter_mutator = htonl (bf_nonce);
582   gm->hash_bitmap = htonl (bm);
583   gm->query = pr->public_data.query;
584   ext = (struct GNUNET_PeerIdentity *) &gm[1];
585   k = 0;
586   if (! do_route)
587     GNUNET_PEER_resolve (pr->sender_pid, &ext[k++]);
588   if (NULL != pr->public_data.target)
589     ext[k++] = *pr->public_data.target;
590   GNUNET_memcpy (&ext[k], bf_data, bf_size);
591   GNUNET_free_non_null (bf_data);
592   return env;
593 }
594
595
596 /**
597  * Iterator to free pending requests.
598  *
599  * @param cls closure, unused
600  * @param key current key code
601  * @param value value in the hash map (pending request)
602  * @return #GNUNET_YES (we should continue to iterate)
603  */
604 static int
605 clean_request (void *cls, const struct GNUNET_HashCode *key, void *value)
606 {
607   struct GSF_PendingRequest *pr = value;
608   GSF_LocalLookupContinuation cont;
609
610   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
611               "Cleaning up pending request for `%s'.\n",
612               GNUNET_h2s (key));
613   if (NULL != pr->cadet_request)
614   {
615     pr->cadet_retry_count = CADET_RETRY_MAX;
616     GSF_cadet_query_cancel (pr->cadet_request);
617     pr->cadet_request = NULL;
618   }
619   if (NULL != (cont = pr->llc_cont))
620   {
621     pr->llc_cont = NULL;
622     cont (pr->llc_cont_cls, pr, pr->local_result);
623   }
624   GSF_plan_notify_request_done_ (pr);
625   GNUNET_free_non_null (pr->replies_seen);
626   GNUNET_BLOCK_group_destroy (pr->bg);
627   pr->bg = NULL;
628   GNUNET_PEER_change_rc (pr->sender_pid, -1);
629   pr->sender_pid = 0;
630   GNUNET_PEER_change_rc (pr->origin_pid, -1);
631   pr->origin_pid = 0;
632   if (NULL != pr->hnode)
633   {
634     GNUNET_CONTAINER_heap_remove_node (pr->hnode);
635     pr->hnode = NULL;
636   }
637   if (NULL != pr->qe)
638   {
639     GNUNET_DATASTORE_cancel (pr->qe);
640     pr->qe = NULL;
641   }
642   if (NULL != pr->gh)
643   {
644     GNUNET_DHT_get_stop (pr->gh);
645     pr->gh = NULL;
646   }
647   if (NULL != pr->warn_task)
648   {
649     GNUNET_SCHEDULER_cancel (pr->warn_task);
650     pr->warn_task = NULL;
651   }
652   GNUNET_assert (
653     GNUNET_OK ==
654     GNUNET_CONTAINER_multihashmap_remove (pr_map, &pr->public_data.query, pr));
655   GNUNET_STATISTICS_update (GSF_stats,
656                             gettext_noop ("# Pending requests active"),
657                             -1,
658                             GNUNET_NO);
659   GNUNET_free (pr);
660   return GNUNET_YES;
661 }
662
663
664 /**
665  * Explicitly cancel a pending request.
666  *
667  * @param pr request to cancel
668  * @param full_cleanup fully purge the request
669  */
670 void
671 GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr, int full_cleanup)
672 {
673   GSF_LocalLookupContinuation cont;
674
675   if (NULL == pr_map)
676     return; /* already cleaned up! */
677   if (GNUNET_NO == full_cleanup)
678   {
679     /* make request inactive (we're no longer interested in more results),
680      * but do NOT remove from our data-structures, we still need it there
681      * to prevent the request from looping */
682     pr->rh = NULL;
683     if (NULL != pr->cadet_request)
684     {
685       pr->cadet_retry_count = CADET_RETRY_MAX;
686       GSF_cadet_query_cancel (pr->cadet_request);
687       pr->cadet_request = NULL;
688     }
689     if (NULL != (cont = pr->llc_cont))
690     {
691       pr->llc_cont = NULL;
692       cont (pr->llc_cont_cls, pr, pr->local_result);
693     }
694     GSF_plan_notify_request_done_ (pr);
695     if (NULL != pr->qe)
696     {
697       GNUNET_DATASTORE_cancel (pr->qe);
698       pr->qe = NULL;
699     }
700     if (NULL != pr->gh)
701     {
702       GNUNET_DHT_get_stop (pr->gh);
703       pr->gh = NULL;
704     }
705     if (NULL != pr->warn_task)
706     {
707       GNUNET_SCHEDULER_cancel (pr->warn_task);
708       pr->warn_task = NULL;
709     }
710     return;
711   }
712   GNUNET_assert (GNUNET_YES ==
713                  clean_request (NULL, &pr->public_data.query, pr));
714 }
715
716
717 /**
718  * Iterate over all pending requests.
719  *
720  * @param it function to call for each request
721  * @param cls closure for @a it
722  */
723 void
724 GSF_iterate_pending_requests_ (GSF_PendingRequestIterator it, void *cls)
725 {
726   GNUNET_CONTAINER_multihashmap_iterate (
727     pr_map,
728     (GNUNET_CONTAINER_MulitHashMapIteratorCallback) it,
729     cls);
730 }
731
732
733 /**
734  * Closure for process_reply() function.
735  */
736 struct ProcessReplyClosure
737 {
738   /**
739    * The data for the reply.
740    */
741   const void *data;
742
743   /**
744    * Who gave us this reply? NULL for local host (or DHT)
745    */
746   struct GSF_ConnectedPeer *sender;
747
748   /**
749    * When the reply expires.
750    */
751   struct GNUNET_TIME_Absolute expiration;
752
753   /**
754    * Size of data.
755    */
756   size_t size;
757
758   /**
759    * Type of the block.
760    */
761   enum GNUNET_BLOCK_Type type;
762
763   /**
764    * Control flags for evaluation.
765    */
766   enum GNUNET_BLOCK_EvaluationOptions eo;
767
768   /**
769    * How much was this reply worth to us?
770    */
771   uint32_t priority;
772
773   /**
774    * Anonymity requirements for this reply.
775    */
776   uint32_t anonymity_level;
777
778   /**
779    * Evaluation result (returned).
780    */
781   enum GNUNET_BLOCK_EvaluationResult eval;
782
783   /**
784    * Did we find a matching request?
785    */
786   int request_found;
787 };
788
789
790 /**
791  * Update the performance data for the sender (if any) since
792  * the sender successfully answered one of our queries.
793  *
794  * @param prq information about the sender
795  * @param pr request that was satisfied
796  */
797 static void
798 update_request_performance_data (struct ProcessReplyClosure *prq,
799                                  struct GSF_PendingRequest *pr)
800 {
801   if (prq->sender == NULL)
802     return;
803   GSF_peer_update_performance_ (prq->sender,
804                                 pr->public_data.start_time,
805                                 prq->priority);
806 }
807
808
809 /**
810  * We have received a reply; handle it!
811  *
812  * @param cls response (a `struct ProcessReplyClosure`)
813  * @param key our query
814  * @param value value in the hash map (info about the query)
815  * @return #GNUNET_YES (we should continue to iterate)
816  */
817 static int
818 process_reply (void *cls, const struct GNUNET_HashCode *key, void *value)
819 {
820   struct ProcessReplyClosure *prq = cls;
821   struct GSF_PendingRequest *pr = value;
822   struct GNUNET_HashCode chash;
823   struct GNUNET_TIME_Absolute last_transmission;
824
825   if (NULL == pr->rh)
826     return GNUNET_YES;
827   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
828               "Matched result (type %u) for query `%s' with pending request\n",
829               (unsigned int) prq->type,
830               GNUNET_h2s (key));
831   GNUNET_STATISTICS_update (GSF_stats,
832                             gettext_noop ("# replies received and matched"),
833                             1,
834                             GNUNET_NO);
835   prq->eval = GNUNET_BLOCK_evaluate (GSF_block_ctx,
836                                      prq->type,
837                                      pr->bg,
838                                      prq->eo,
839                                      key,
840                                      NULL,
841                                      0,
842                                      prq->data,
843                                      prq->size);
844   switch (prq->eval)
845   {
846   case GNUNET_BLOCK_EVALUATION_OK_MORE:
847     update_request_performance_data (prq, pr);
848     break;
849
850   case GNUNET_BLOCK_EVALUATION_OK_LAST:
851     /* short cut: stop processing early, no BF-update, etc. */
852     update_request_performance_data (prq, pr);
853     GNUNET_LOAD_update (GSF_rt_entry_lifetime,
854                         GNUNET_TIME_absolute_get_duration (
855                           pr->public_data.start_time)
856                         .rel_value_us);
857     if (GNUNET_YES !=
858         GSF_request_plan_reference_get_last_transmission_ (pr->public_data
859                                                            .pr_head,
860                                                            prq->sender,
861                                                            &last_transmission))
862       last_transmission.abs_value_us =
863         GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us;
864     /* pass on to other peers / local clients */
865     pr->rh (pr->rh_cls,
866             prq->eval,
867             pr,
868             prq->anonymity_level,
869             prq->expiration,
870             last_transmission,
871             prq->type,
872             prq->data,
873             prq->size);
874     return GNUNET_YES;
875
876   case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
877 #if INSANE_STATISTICS
878     GNUNET_STATISTICS_update (GSF_stats,
879                               gettext_noop (
880                                 "# duplicate replies discarded (bloomfilter)"),
881                               1,
882                               GNUNET_NO);
883 #endif
884     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Duplicate response, discarding.\n");
885     return GNUNET_YES;   /* duplicate */
886
887   case GNUNET_BLOCK_EVALUATION_RESULT_IRRELEVANT:
888     GNUNET_STATISTICS_update (GSF_stats,
889                               gettext_noop ("# irrelevant replies discarded"),
890                               1,
891                               GNUNET_NO);
892     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Irrelevant response, ignoring.\n");
893     return GNUNET_YES;
894
895   case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
896     return GNUNET_YES;   /* wrong namespace */
897
898   case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
899     GNUNET_break (0);
900     return GNUNET_YES;
901
902   case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
903     GNUNET_break (0);
904     return GNUNET_YES;
905
906   case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
907     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
908                 _ ("Unsupported block type %u\n"),
909                 prq->type);
910     return GNUNET_NO;
911   }
912   /* update bloomfilter */
913   GNUNET_CRYPTO_hash (prq->data, prq->size, &chash);
914   GSF_pending_request_update_ (pr, &chash, 1);
915   if (NULL == prq->sender)
916   {
917     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
918                 "Found result for query `%s' in local datastore\n",
919                 GNUNET_h2s (key));
920     GNUNET_STATISTICS_update (GSF_stats,
921                               gettext_noop ("# results found locally"),
922                               1,
923                               GNUNET_NO);
924   }
925   else
926   {
927     GSF_dht_lookup_ (pr);
928   }
929   prq->priority += pr->public_data.original_priority;
930   pr->public_data.priority = 0;
931   pr->public_data.original_priority = 0;
932   pr->public_data.results_found++;
933   prq->request_found = GNUNET_YES;
934   /* finally, pass on to other peer / local client */
935   if (! GSF_request_plan_reference_get_last_transmission_ (pr->public_data
936                                                            .pr_head,
937                                                            prq->sender,
938                                                            &last_transmission))
939     last_transmission.abs_value_us = GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us;
940   pr->rh (pr->rh_cls,
941           prq->eval,
942           pr,
943           prq->anonymity_level,
944           prq->expiration,
945           last_transmission,
946           prq->type,
947           prq->data,
948           prq->size);
949   return GNUNET_YES;
950 }
951
952
953 /**
954  * Context for put_migration_continuation().
955  */
956 struct PutMigrationContext
957 {
958   /**
959    * Start time for the operation.
960    */
961   struct GNUNET_TIME_Absolute start;
962
963   /**
964    * Request origin.
965    */
966   struct GNUNET_PeerIdentity origin;
967
968   /**
969    * #GNUNET_YES if we had a matching request for this block,
970    * #GNUNET_NO if not.
971    */
972   int requested;
973 };
974
975
976 /**
977  * Continuation called to notify client about result of the
978  * operation.
979  *
980  * @param cls closure
981  * @param success #GNUNET_SYSERR on failure
982  * @param min_expiration minimum expiration time required for content to be stored
983  * @param msg NULL on success, otherwise an error message
984  */
985 static void
986 put_migration_continuation (void *cls,
987                             int success,
988                             struct GNUNET_TIME_Absolute min_expiration,
989                             const char *msg)
990 {
991   struct PutMigrationContext *pmc = cls;
992   struct GSF_ConnectedPeer *cp;
993   struct GNUNET_TIME_Relative mig_pause;
994   struct GSF_PeerPerformanceData *ppd;
995
996   if (NULL != datastore_put_load)
997   {
998     if (GNUNET_SYSERR != success)
999     {
1000       GNUNET_LOAD_update (datastore_put_load,
1001                           GNUNET_TIME_absolute_get_duration (pmc->start)
1002                           .rel_value_us);
1003     }
1004     else
1005     {
1006       /* on queue failure / timeout, increase the put load dramatically */
1007       GNUNET_LOAD_update (datastore_put_load,
1008                           GNUNET_TIME_UNIT_MINUTES.rel_value_us);
1009     }
1010   }
1011   cp = GSF_peer_get_ (&pmc->origin);
1012   if (GNUNET_OK == success)
1013   {
1014     if (NULL != cp)
1015     {
1016       ppd = GSF_get_peer_performance_data_ (cp);
1017       ppd->migration_delay.rel_value_us /= 2;
1018     }
1019     GNUNET_free (pmc);
1020     return;
1021   }
1022   if ((GNUNET_NO == success) && (GNUNET_NO == pmc->requested) && (NULL != cp))
1023   {
1024     ppd = GSF_get_peer_performance_data_ (cp);
1025     if (min_expiration.abs_value_us > 0)
1026     {
1027       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1028                   "Asking to stop migration for %s because datastore is full\n",
1029                   GNUNET_STRINGS_relative_time_to_string (
1030                     GNUNET_TIME_absolute_get_remaining (min_expiration),
1031                     GNUNET_YES));
1032       GSF_block_peer_migration_ (cp, min_expiration);
1033     }
1034     else
1035     {
1036       ppd->migration_delay = GNUNET_TIME_relative_max (GNUNET_TIME_UNIT_SECONDS,
1037                                                        ppd->migration_delay);
1038       ppd->migration_delay =
1039         GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_HOURS, ppd->migration_delay);
1040       mig_pause.rel_value_us =
1041         GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
1042                                   ppd->migration_delay.rel_value_us);
1043       ppd->migration_delay =
1044         GNUNET_TIME_relative_saturating_multiply (ppd->migration_delay, 2);
1045       GNUNET_log (
1046         GNUNET_ERROR_TYPE_DEBUG,
1047         "Replicated content already exists locally, asking to stop migration for %s\n",
1048         GNUNET_STRINGS_relative_time_to_string (mig_pause, GNUNET_YES));
1049       GSF_block_peer_migration_ (cp,
1050                                  GNUNET_TIME_relative_to_absolute (mig_pause));
1051     }
1052   }
1053   GNUNET_free (pmc);
1054   GNUNET_STATISTICS_update (GSF_stats,
1055                             gettext_noop ("# Datastore `PUT' failures"),
1056                             1,
1057                             GNUNET_NO);
1058 }
1059
1060
1061 /**
1062  * Test if the DATABASE (PUT) load on this peer is too high
1063  * to even consider processing the query at
1064  * all.
1065  *
1066  * @param priority the priority of the item
1067  * @return #GNUNET_YES if the load is too high to do anything (load high)
1068  *         #GNUNET_NO to process normally (load normal or low)
1069  */
1070 static int
1071 test_put_load_too_high (uint32_t priority)
1072 {
1073   double ld;
1074
1075   if (NULL == datastore_put_load)
1076     return GNUNET_NO;
1077   if (GNUNET_LOAD_get_average (datastore_put_load) < 50)
1078     return GNUNET_NO; /* very fast */
1079   ld = GNUNET_LOAD_get_load (datastore_put_load);
1080   if (ld < 2.0 * (1 + priority))
1081     return GNUNET_NO;
1082   GNUNET_STATISTICS_update (GSF_stats,
1083                             gettext_noop (
1084                               "# storage requests dropped due to high load"),
1085                             1,
1086                             GNUNET_NO);
1087   return GNUNET_YES;
1088 }
1089
1090
1091 /**
1092  * Iterator called on each result obtained for a DHT
1093  * operation that expects a reply
1094  *
1095  * @param cls closure
1096  * @param exp when will this value expire
1097  * @param key key of the result
1098  * @param get_path peers on reply path (or NULL if not recorded)
1099  * @param get_path_length number of entries in @a get_path
1100  * @param put_path peers on the PUT path (or NULL if not recorded)
1101  * @param put_path_length number of entries in @a get_path
1102  * @param type type of the result
1103  * @param size number of bytes in @a data
1104  * @param data pointer to the result data
1105  */
1106 static void
1107 handle_dht_reply (void *cls,
1108                   struct GNUNET_TIME_Absolute exp,
1109                   const struct GNUNET_HashCode *key,
1110                   const struct GNUNET_PeerIdentity *get_path,
1111                   unsigned int get_path_length,
1112                   const struct GNUNET_PeerIdentity *put_path,
1113                   unsigned int put_path_length,
1114                   enum GNUNET_BLOCK_Type type,
1115                   size_t size,
1116                   const void *data)
1117 {
1118   struct GSF_PendingRequest *pr = cls;
1119   struct ProcessReplyClosure prq;
1120   struct PutMigrationContext *pmc;
1121
1122   GNUNET_STATISTICS_update (GSF_stats,
1123                             gettext_noop ("# Replies received from DHT"),
1124                             1,
1125                             GNUNET_NO);
1126   memset (&prq, 0, sizeof(prq));
1127   prq.data = data;
1128   prq.expiration = exp;
1129   /* do not allow migrated content to live longer than 1 year */
1130   prq.expiration = GNUNET_TIME_absolute_min (GNUNET_TIME_relative_to_absolute (
1131                                                GNUNET_TIME_UNIT_YEARS),
1132                                              prq.expiration);
1133   prq.size = size;
1134   prq.type = type;
1135   prq.eo = GNUNET_BLOCK_EO_NONE;
1136   process_reply (&prq, key, pr);
1137   if ((GNUNET_YES == active_to_migration) &&
1138       (GNUNET_NO == test_put_load_too_high (prq.priority)))
1139   {
1140     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1141                 "Replicating result for query `%s' with priority %u\n",
1142                 GNUNET_h2s (key),
1143                 prq.priority);
1144     pmc = GNUNET_new (struct PutMigrationContext);
1145     pmc->start = GNUNET_TIME_absolute_get ();
1146     pmc->requested = GNUNET_YES;
1147     if (NULL == GNUNET_DATASTORE_put (GSF_dsh,
1148                                       0,
1149                                       key,
1150                                       size,
1151                                       data,
1152                                       type,
1153                                       prq.priority,
1154                                       1 /* anonymity */,
1155                                       0 /* replication */,
1156                                       exp,
1157                                       1 + prq.priority,
1158                                       MAX_DATASTORE_QUEUE,
1159                                       &put_migration_continuation,
1160                                       pmc))
1161     {
1162       put_migration_continuation (pmc,
1163                                   GNUNET_SYSERR,
1164                                   GNUNET_TIME_UNIT_ZERO_ABS,
1165                                   NULL);
1166     }
1167   }
1168 }
1169
1170
1171 /**
1172  * Consider looking up the data in the DHT (anonymity-level permitting).
1173  *
1174  * @param pr the pending request to process
1175  */
1176 void
1177 GSF_dht_lookup_ (struct GSF_PendingRequest *pr)
1178 {
1179   const void *xquery;
1180   size_t xquery_size;
1181   struct GNUNET_PeerIdentity pi;
1182   char buf[sizeof(struct GNUNET_HashCode) * 2] GNUNET_ALIGN;
1183
1184   if (0 != pr->public_data.anonymity_level)
1185     return;
1186   if (NULL != pr->gh)
1187   {
1188     GNUNET_DHT_get_stop (pr->gh);
1189     pr->gh = NULL;
1190   }
1191   xquery = NULL;
1192   xquery_size = 0;
1193   if (0 != (pr->public_data.options & GSF_PRO_FORWARD_ONLY))
1194   {
1195     GNUNET_assert (0 != pr->sender_pid);
1196     GNUNET_PEER_resolve (pr->sender_pid, &pi);
1197     GNUNET_memcpy (&buf[xquery_size], &pi, sizeof(struct GNUNET_PeerIdentity));
1198     xquery_size += sizeof(struct GNUNET_PeerIdentity);
1199   }
1200   pr->gh = GNUNET_DHT_get_start (GSF_dht,
1201                                  pr->public_data.type,
1202                                  &pr->public_data.query,
1203                                  DHT_GET_REPLICATION,
1204                                  GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE,
1205                                  xquery,
1206                                  xquery_size,
1207                                  &handle_dht_reply,
1208                                  pr);
1209   if ((NULL != pr->gh) && (0 != pr->replies_seen_count))
1210     GNUNET_DHT_get_filter_known_results (pr->gh,
1211                                          pr->replies_seen_count,
1212                                          pr->replies_seen);
1213 }
1214
1215
1216 /**
1217  * Function called with a reply from the cadet.
1218  *
1219  * @param cls the pending request struct
1220  * @param type type of the block, ANY on error
1221  * @param expiration expiration time for the block
1222  * @param data_size number of bytes in @a data, 0 on error
1223  * @param data reply block data, NULL on error
1224  */
1225 static void
1226 cadet_reply_proc (void *cls,
1227                   enum GNUNET_BLOCK_Type type,
1228                   struct GNUNET_TIME_Absolute expiration,
1229                   size_t data_size,
1230                   const void *data)
1231 {
1232   struct GSF_PendingRequest *pr = cls;
1233   struct ProcessReplyClosure prq;
1234   struct GNUNET_HashCode query;
1235
1236   pr->cadet_request = NULL;
1237   if (GNUNET_BLOCK_TYPE_ANY == type)
1238   {
1239     GNUNET_break (NULL == data);
1240     GNUNET_break (0 == data_size);
1241     pr->cadet_retry_count++;
1242     if (pr->cadet_retry_count >= CADET_RETRY_MAX)
1243       return;   /* give up on cadet */
1244     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Error retrieiving block via cadet\n");
1245     /* retry -- without delay, as this is non-anonymous
1246        and cadet/cadet connect will take some time anyway */
1247     pr->cadet_request = GSF_cadet_query (pr->public_data.target,
1248                                          &pr->public_data.query,
1249                                          pr->public_data.type,
1250                                          &cadet_reply_proc,
1251                                          pr);
1252     return;
1253   }
1254   if (GNUNET_YES !=
1255       GNUNET_BLOCK_get_key (GSF_block_ctx, type, data, data_size, &query))
1256   {
1257     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1258                 "Failed to derive key for block of type %d\n",
1259                 (int) type);
1260     GNUNET_break_op (0);
1261     return;
1262   }
1263   GNUNET_STATISTICS_update (GSF_stats,
1264                             gettext_noop ("# Replies received from CADET"),
1265                             1,
1266                             GNUNET_NO);
1267   memset (&prq, 0, sizeof(prq));
1268   prq.data = data;
1269   prq.expiration = expiration;
1270   /* do not allow migrated content to live longer than 1 year */
1271   prq.expiration = GNUNET_TIME_absolute_min (GNUNET_TIME_relative_to_absolute (
1272                                                GNUNET_TIME_UNIT_YEARS),
1273                                              prq.expiration);
1274   prq.size = data_size;
1275   prq.type = type;
1276   prq.eo = GNUNET_BLOCK_EO_NONE;
1277   process_reply (&prq, &query, pr);
1278 }
1279
1280
1281 /**
1282  * Consider downloading via cadet (if possible)
1283  *
1284  * @param pr the pending request to process
1285  */
1286 void
1287 GSF_cadet_lookup_ (struct GSF_PendingRequest *pr)
1288 {
1289   if (0 != pr->public_data.anonymity_level)
1290     return;
1291   if (0 == pr->public_data.target)
1292   {
1293     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1294                 "Cannot do cadet-based download, target peer not known\n");
1295     return;
1296   }
1297   if (NULL != pr->cadet_request)
1298     return;
1299   pr->cadet_request = GSF_cadet_query (pr->public_data.target,
1300                                        &pr->public_data.query,
1301                                        pr->public_data.type,
1302                                        &cadet_reply_proc,
1303                                        pr);
1304 }
1305
1306
1307 /**
1308  * Task that issues a warning if the datastore lookup takes too long.
1309  *
1310  * @param cls the `struct GSF_PendingRequest`
1311  */
1312 static void
1313 warn_delay_task (void *cls)
1314 {
1315   struct GSF_PendingRequest *pr = cls;
1316
1317   GNUNET_log (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
1318               _ ("Datastore lookup already took %s!\n"),
1319               GNUNET_STRINGS_relative_time_to_string (
1320                 GNUNET_TIME_absolute_get_duration (pr->qe_start),
1321                 GNUNET_YES));
1322   pr->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1323                                                 &warn_delay_task,
1324                                                 pr);
1325 }
1326
1327
1328 /**
1329  * Task that issues a warning if the datastore lookup takes too long.
1330  *
1331  * @param cls the `struct GSF_PendingRequest`
1332  */
1333 static void
1334 odc_warn_delay_task (void *cls)
1335 {
1336   struct GSF_PendingRequest *pr = cls;
1337
1338   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1339               _ ("On-demand lookup already took %s!\n"),
1340               GNUNET_STRINGS_relative_time_to_string (
1341                 GNUNET_TIME_absolute_get_duration (pr->qe_start),
1342                 GNUNET_YES));
1343   pr->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1344                                                 &odc_warn_delay_task,
1345                                                 pr);
1346 }
1347
1348
1349 /* Call our continuation (if we have any) */
1350 static void
1351 call_continuation (struct GSF_PendingRequest *pr)
1352 {
1353   GSF_LocalLookupContinuation cont = pr->llc_cont;
1354
1355   GNUNET_assert (NULL == pr->qe);
1356   if (NULL != pr->warn_task)
1357   {
1358     GNUNET_SCHEDULER_cancel (pr->warn_task);
1359     pr->warn_task = NULL;
1360   }
1361   if (NULL == cont)
1362     return; /* no continuation */
1363   pr->llc_cont = NULL;
1364   if (0 != (GSF_PRO_LOCAL_ONLY & pr->public_data.options))
1365   {
1366     if (GNUNET_BLOCK_EVALUATION_OK_LAST != pr->local_result)
1367     {
1368       /* Signal that we are done and that there won't be any
1369          additional results to allow client to clean up state. */
1370       pr->rh (pr->rh_cls,
1371               GNUNET_BLOCK_EVALUATION_OK_LAST,
1372               pr,
1373               UINT32_MAX,
1374               GNUNET_TIME_UNIT_ZERO_ABS,
1375               GNUNET_TIME_UNIT_FOREVER_ABS,
1376               GNUNET_BLOCK_TYPE_ANY,
1377               NULL,
1378               0);
1379     }
1380     /* Finally, call our continuation to signal that we are
1381        done with local processing of this request; i.e. to
1382        start reading again from the client. */
1383     cont (pr->llc_cont_cls, NULL, GNUNET_BLOCK_EVALUATION_OK_LAST);
1384     return;
1385   }
1386
1387   cont (pr->llc_cont_cls, pr, pr->local_result);
1388 }
1389
1390
1391 /* Update stats and call continuation */
1392 static void
1393 no_more_local_results (struct GSF_PendingRequest *pr)
1394 {
1395   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1396               "No further local responses available.\n");
1397 #if INSANE_STATISTICS
1398   if ((GNUNET_BLOCK_TYPE_FS_DBLOCK == pr->public_data.type) ||
1399       (GNUNET_BLOCK_TYPE_FS_IBLOCK == pr->public_data.type))
1400     GNUNET_STATISTICS_update (GSF_stats,
1401                               gettext_noop (
1402                                 "# requested DBLOCK or IBLOCK not found"),
1403                               1,
1404                               GNUNET_NO);
1405 #endif
1406   call_continuation (pr);
1407 }
1408
1409
1410 /* forward declaration */
1411 static void
1412 process_local_reply (void *cls,
1413                      const struct GNUNET_HashCode *key,
1414                      size_t size,
1415                      const void *data,
1416                      enum GNUNET_BLOCK_Type type,
1417                      uint32_t priority,
1418                      uint32_t anonymity,
1419                      uint32_t replication,
1420                      struct GNUNET_TIME_Absolute expiration,
1421                      uint64_t uid);
1422
1423
1424 /* Start a local query */
1425 static void
1426 start_local_query (struct GSF_PendingRequest *pr,
1427                    uint64_t next_uid,
1428                    bool random)
1429 {
1430   pr->qe_start = GNUNET_TIME_absolute_get ();
1431   pr->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1432                                                 &warn_delay_task,
1433                                                 pr);
1434   pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
1435                                      next_uid,
1436                                      random,
1437                                      &pr->public_data.query,
1438                                      pr->public_data.type ==
1439                                      GNUNET_BLOCK_TYPE_FS_DBLOCK
1440                                      ? GNUNET_BLOCK_TYPE_ANY
1441                                      : pr->public_data.type,
1442                                      (0 != (GSF_PRO_PRIORITY_UNLIMITED
1443                                             & pr->public_data.options))
1444                                      ? UINT_MAX
1445                                      : 1
1446                                      /* queue priority */,
1447                                      (0 != (GSF_PRO_PRIORITY_UNLIMITED
1448                                             & pr->public_data.options))
1449                                      ? UINT_MAX
1450                                      : GSF_datastore_queue_size
1451                                      /* max queue size */,
1452                                      &process_local_reply,
1453                                      pr);
1454   if (NULL != pr->qe)
1455     return;
1456   GNUNET_log (
1457     GNUNET_ERROR_TYPE_DEBUG,
1458     "ERROR Requesting `%s' of type %d with next_uid %llu from datastore.\n",
1459     GNUNET_h2s (&pr->public_data.query),
1460     pr->public_data.type,
1461     (unsigned long long) next_uid);
1462   GNUNET_STATISTICS_update (GSF_stats,
1463                             gettext_noop (
1464                               "# Datastore lookups concluded (error queueing)"),
1465                             1,
1466                             GNUNET_NO);
1467   call_continuation (pr);
1468 }
1469
1470
1471 /**
1472  * We're processing (local) results for a search request
1473  * from another peer.  Pass applicable results to the
1474  * peer and if we are done either clean up (operation
1475  * complete) or forward to other peers (more results possible).
1476  *
1477  * @param cls our closure (`struct GSF_PendingRequest *`)
1478  * @param key key for the content
1479  * @param size number of bytes in @a data
1480  * @param data content stored
1481  * @param type type of the content
1482  * @param priority priority of the content
1483  * @param anonymity anonymity-level for the content
1484  * @param replication replication-level for the content
1485  * @param expiration expiration time for the content
1486  * @param uid unique identifier for the datum;
1487  *        maybe 0 if no unique identifier is available
1488  */
1489 static void
1490 process_local_reply (void *cls,
1491                      const struct GNUNET_HashCode *key,
1492                      size_t size,
1493                      const void *data,
1494                      enum GNUNET_BLOCK_Type type,
1495                      uint32_t priority,
1496                      uint32_t anonymity,
1497                      uint32_t replication,
1498                      struct GNUNET_TIME_Absolute expiration,
1499                      uint64_t uid)
1500 {
1501   struct GSF_PendingRequest *pr = cls;
1502   struct ProcessReplyClosure prq;
1503   struct GNUNET_HashCode query;
1504   unsigned int old_rf;
1505
1506   GNUNET_SCHEDULER_cancel (pr->warn_task);
1507   pr->warn_task = NULL;
1508   if (NULL == pr->qe)
1509     goto called_from_on_demand;
1510   pr->qe = NULL;
1511   if (
1512     (NULL == key) && pr->seen_null &&
1513     ! pr->have_first_uid)  /* We have hit the end for the 2nd time with no results */
1514   {
1515     /* No results */
1516 #if INSANE_STATISTICS
1517     GNUNET_STATISTICS_update (GSF_stats,
1518                               gettext_noop (
1519                                 "# Datastore lookups concluded (no results)"),
1520                               1,
1521                               GNUNET_NO);
1522 #endif
1523     no_more_local_results (pr);
1524     return;
1525   }
1526   if (((NULL == key) &&
1527        pr->seen_null) || /* We have hit the end for the 2nd time OR */
1528       (pr->seen_null && pr->have_first_uid &&
1529        (uid >= pr->first_uid))) /* We have hit the end and past first UID */
1530   {
1531     /* Seen all results */
1532     GNUNET_STATISTICS_update (GSF_stats,
1533                               gettext_noop (
1534                                 "# Datastore lookups concluded (seen all)"),
1535                               1,
1536                               GNUNET_NO);
1537     no_more_local_results (pr);
1538     return;
1539   }
1540   if (NULL == key)
1541   {
1542     GNUNET_assert (! pr->seen_null);
1543     pr->seen_null = true;
1544     start_local_query (pr, 0 /* next_uid */, false /* random */);
1545     return;
1546   }
1547   if (! pr->have_first_uid)
1548   {
1549     pr->first_uid = uid;
1550     pr->have_first_uid = true;
1551   }
1552   pr->result_count++;
1553   if (pr->result_count > MAX_RESULTS)
1554   {
1555     GNUNET_STATISTICS_update (
1556       GSF_stats,
1557       gettext_noop ("# Datastore lookups aborted (more than MAX_RESULTS)"),
1558       1,
1559       GNUNET_NO);
1560     no_more_local_results (pr);
1561     return;
1562   }
1563   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1564               "Received reply for `%s' of type %d with UID %llu from datastore.\n",
1565               GNUNET_h2s (key),
1566               type,
1567               (unsigned long long) uid);
1568   if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
1569   {
1570     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1571                 "Found ONDEMAND block, performing on-demand encoding\n");
1572     GNUNET_STATISTICS_update (GSF_stats,
1573                               gettext_noop (
1574                                 "# on-demand blocks matched requests"),
1575                               1,
1576                               GNUNET_NO);
1577     pr->qe_start = GNUNET_TIME_absolute_get ();
1578     pr->warn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1579                                                   &odc_warn_delay_task,
1580                                                   pr);
1581     if (GNUNET_OK == GNUNET_FS_handle_on_demand_block (key,
1582                                                        size,
1583                                                        data,
1584                                                        type,
1585                                                        priority,
1586                                                        anonymity,
1587                                                        replication,
1588                                                        expiration,
1589                                                        uid,
1590                                                        &process_local_reply,
1591                                                        pr))
1592     {
1593       GNUNET_STATISTICS_update (GSF_stats,
1594                                 gettext_noop (
1595                                   "# on-demand lookups performed successfully"),
1596                                 1,
1597                                 GNUNET_NO);
1598       return;     /* we're done */
1599     }
1600     GNUNET_STATISTICS_update (GSF_stats,
1601                               gettext_noop ("# on-demand lookups failed"),
1602                               1,
1603                               GNUNET_NO);
1604     GNUNET_SCHEDULER_cancel (pr->warn_task);
1605     start_local_query (pr, uid + 1 /* next_uid */, false /* random */);
1606     return;
1607   }
1608 called_from_on_demand:
1609   old_rf = pr->public_data.results_found;
1610   memset (&prq, 0, sizeof(prq));
1611   prq.data = data;
1612   prq.expiration = expiration;
1613   prq.size = size;
1614   if (GNUNET_OK !=
1615       GNUNET_BLOCK_get_key (GSF_block_ctx, type, data, size, &query))
1616   {
1617     GNUNET_break (0);
1618     GNUNET_DATASTORE_remove (GSF_dsh,
1619                              key,
1620                              size,
1621                              data,
1622                              UINT_MAX,
1623                              UINT_MAX,
1624                              NULL,
1625                              NULL);
1626     start_local_query (pr, uid + 1 /* next_uid */, false /* random */);
1627     return;
1628   }
1629   prq.type = type;
1630   prq.priority = priority;
1631   prq.request_found = GNUNET_NO;
1632   prq.anonymity_level = anonymity;
1633   if ((0 == old_rf) && (0 == pr->public_data.results_found))
1634     GSF_update_datastore_delay_ (pr->public_data.start_time);
1635   prq.eo = GNUNET_BLOCK_EO_LOCAL_SKIP_CRYPTO;
1636   process_reply (&prq, key, pr);
1637   pr->local_result = prq.eval;
1638   if (GNUNET_BLOCK_EVALUATION_OK_LAST == prq.eval)
1639   {
1640     GNUNET_STATISTICS_update (
1641       GSF_stats,
1642       gettext_noop ("# Datastore lookups concluded (found last result)"),
1643       1,
1644       GNUNET_NO);
1645     call_continuation (pr);
1646     return;
1647   }
1648   if ((0 == (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) &&
1649       ((GNUNET_YES == GSF_test_get_load_too_high_ (0)) ||
1650        (pr->public_data.results_found > 5 + 2 * pr->public_data.priority)))
1651   {
1652     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Load too high, done with request\n");
1653     GNUNET_STATISTICS_update (GSF_stats,
1654                               gettext_noop (
1655                                 "# Datastore lookups concluded (load too high)"),
1656                               1,
1657                               GNUNET_NO);
1658     call_continuation (pr);
1659     return;
1660   }
1661   start_local_query (pr, uid + 1 /* next_uid */, false /* random */);
1662 }
1663
1664
1665 /**
1666  * Is the given target a legitimate peer for forwarding the given request?
1667  *
1668  * @param pr request
1669  * @param target
1670  * @return #GNUNET_YES if this request could be forwarded to the given peer
1671  */
1672 int
1673 GSF_pending_request_test_target_ (struct GSF_PendingRequest *pr,
1674                                   const struct GNUNET_PeerIdentity *target)
1675 {
1676   struct GNUNET_PeerIdentity pi;
1677
1678   if (0 == pr->origin_pid)
1679     return GNUNET_YES;
1680   GNUNET_PEER_resolve (pr->origin_pid, &pi);
1681   return (0 == memcmp (&pi, target, sizeof(struct GNUNET_PeerIdentity)))
1682          ? GNUNET_NO
1683          : GNUNET_YES;
1684 }
1685
1686
1687 /**
1688  * Look up the request in the local datastore.
1689  *
1690  * @param pr the pending request to process
1691  * @param cont function to call at the end
1692  * @param cont_cls closure for @a cont
1693  */
1694 void
1695 GSF_local_lookup_ (struct GSF_PendingRequest *pr,
1696                    GSF_LocalLookupContinuation cont,
1697                    void *cont_cls)
1698 {
1699   GNUNET_assert (NULL == pr->gh);
1700   GNUNET_assert (NULL == pr->cadet_request);
1701   GNUNET_assert (NULL == pr->llc_cont);
1702   pr->llc_cont = cont;
1703   pr->llc_cont_cls = cont_cls;
1704 #if INSANE_STATISTICS
1705   GNUNET_STATISTICS_update (GSF_stats,
1706                             gettext_noop ("# Datastore lookups initiated"),
1707                             1,
1708                             GNUNET_NO);
1709 #endif
1710   start_local_query (pr, 0 /* next_uid */, true /* random */);
1711 }
1712
1713
1714 /**
1715  * Handle P2P "CONTENT" message.  Checks that the message is
1716  * well-formed and then checks if there are any pending requests for
1717  * this content and possibly passes it on (to local clients or other
1718  * peers).  Does NOT perform migration (content caching at this peer).
1719  *
1720  * @param cls the other peer involved
1721  * @param put the actual message
1722  */
1723 void
1724 handle_p2p_put (void *cls, const struct PutMessage *put)
1725 {
1726   struct GSF_ConnectedPeer *cp = cls;
1727   uint16_t msize;
1728   size_t dsize;
1729   enum GNUNET_BLOCK_Type type;
1730   struct GNUNET_TIME_Absolute expiration;
1731   struct GNUNET_HashCode query;
1732   struct ProcessReplyClosure prq;
1733   struct GNUNET_TIME_Relative block_time;
1734   double putl;
1735   struct PutMigrationContext *pmc;
1736
1737   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1738               "Received P2P PUT from %s\n",
1739               GNUNET_i2s (GSF_get_peer_performance_data_ (cp)->peer));
1740   GSF_cover_content_count++;
1741   msize = ntohs (put->header.size);
1742   dsize = msize - sizeof(struct PutMessage);
1743   type = ntohl (put->type);
1744   expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
1745   /* do not allow migrated content to live longer than 1 year */
1746   expiration = GNUNET_TIME_absolute_min (GNUNET_TIME_relative_to_absolute (
1747                                            GNUNET_TIME_UNIT_YEARS),
1748                                          expiration);
1749   if (GNUNET_OK !=
1750       GNUNET_BLOCK_get_key (GSF_block_ctx, type, &put[1], dsize, &query))
1751   {
1752     GNUNET_break_op (0);
1753     return;
1754   }
1755   GNUNET_STATISTICS_update (GSF_stats,
1756                             gettext_noop ("# GAP PUT messages received"),
1757                             1,
1758                             GNUNET_NO);
1759   /* now, lookup 'query' */
1760   prq.data = (const void *) &put[1];
1761   prq.sender = cp;
1762   prq.size = dsize;
1763   prq.type = type;
1764   prq.expiration = expiration;
1765   prq.priority = 0;
1766   prq.anonymity_level = UINT32_MAX;
1767   prq.request_found = GNUNET_NO;
1768   prq.eo = GNUNET_BLOCK_EO_NONE;
1769   GNUNET_CONTAINER_multihashmap_get_multiple (pr_map,
1770                                               &query,
1771                                               &process_reply,
1772                                               &prq);
1773   if (NULL != cp)
1774   {
1775     GSF_connected_peer_change_preference_ (cp,
1776                                            CONTENT_BANDWIDTH_VALUE
1777                                            + 1000 * prq.priority);
1778     GSF_get_peer_performance_data_ (cp)->respect += prq.priority;
1779   }
1780   if ((GNUNET_YES == active_to_migration) && (NULL != cp) &&
1781       (GNUNET_NO == test_put_load_too_high (prq.priority)))
1782   {
1783     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1784                 "Replicating result for query `%s' with priority %u\n",
1785                 GNUNET_h2s (&query),
1786                 prq.priority);
1787     pmc = GNUNET_new (struct PutMigrationContext);
1788     pmc->start = GNUNET_TIME_absolute_get ();
1789     pmc->requested = prq.request_found;
1790     GNUNET_assert (0 != GSF_get_peer_performance_data_ (cp)->pid);
1791     GNUNET_PEER_resolve (GSF_get_peer_performance_data_ (cp)->pid,
1792                          &pmc->origin);
1793     if (NULL == GNUNET_DATASTORE_put (GSF_dsh,
1794                                       0,
1795                                       &query,
1796                                       dsize,
1797                                       &put[1],
1798                                       type,
1799                                       prq.priority,
1800                                       1 /* anonymity */,
1801                                       0 /* replication */,
1802                                       expiration,
1803                                       1 + prq.priority,
1804                                       MAX_DATASTORE_QUEUE,
1805                                       &put_migration_continuation,
1806                                       pmc))
1807     {
1808       put_migration_continuation (pmc,
1809                                   GNUNET_SYSERR,
1810                                   GNUNET_TIME_UNIT_ZERO_ABS,
1811                                   NULL);
1812     }
1813   }
1814   else if (NULL != cp)
1815   {
1816     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1817                 "Choosing not to keep content `%s' (%d/%d)\n",
1818                 GNUNET_h2s (&query),
1819                 active_to_migration,
1820                 test_put_load_too_high (prq.priority));
1821   }
1822   putl = GNUNET_LOAD_get_load (datastore_put_load);
1823   if ((NULL != cp) && (GNUNET_NO == prq.request_found) &&
1824       ((GNUNET_YES != active_to_migration) ||
1825        (putl > 2.5 * (1 + prq.priority))))
1826   {
1827     if (GNUNET_YES != active_to_migration)
1828       putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
1829     block_time = GNUNET_TIME_relative_multiply (
1830       GNUNET_TIME_UNIT_MILLISECONDS,
1831       5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1832                                        (unsigned int) (60000 * putl * putl)));
1833     GNUNET_log (
1834       GNUNET_ERROR_TYPE_DEBUG,
1835       "Asking to stop migration for %s because of load %f and events %d/%d\n",
1836       GNUNET_STRINGS_relative_time_to_string (block_time, GNUNET_YES),
1837       putl,
1838       active_to_migration,
1839       (GNUNET_NO == prq.request_found));
1840     GSF_block_peer_migration_ (cp,
1841                                GNUNET_TIME_relative_to_absolute (block_time));
1842   }
1843 }
1844
1845
1846 /**
1847  * Check if the given request is still active.
1848  *
1849  * @param pr pending request
1850  * @return #GNUNET_YES if the request is still active
1851  */
1852 int
1853 GSF_pending_request_test_active_ (struct GSF_PendingRequest *pr)
1854 {
1855   return (NULL != pr->rh) ? GNUNET_YES : GNUNET_NO;
1856 }
1857
1858
1859 /**
1860  * Setup the subsystem.
1861  */
1862 void
1863 GSF_pending_request_init_ ()
1864 {
1865   if (GNUNET_OK !=
1866       GNUNET_CONFIGURATION_get_value_number (GSF_cfg,
1867                                              "fs",
1868                                              "MAX_PENDING_REQUESTS",
1869                                              &max_pending_requests))
1870   {
1871     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_INFO,
1872                                "fs",
1873                                "MAX_PENDING_REQUESTS");
1874   }
1875   active_to_migration =
1876     GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg, "FS", "CONTENT_CACHING");
1877   datastore_put_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
1878   pr_map = GNUNET_CONTAINER_multihashmap_create (32 * 1024, GNUNET_YES);
1879   requests_by_expiration_heap =
1880     GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1881 }
1882
1883
1884 /**
1885  * Shutdown the subsystem.
1886  */
1887 void
1888 GSF_pending_request_done_ ()
1889 {
1890   GNUNET_CONTAINER_multihashmap_iterate (pr_map, &clean_request, NULL);
1891   GNUNET_CONTAINER_multihashmap_destroy (pr_map);
1892   pr_map = NULL;
1893   GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
1894   requests_by_expiration_heap = NULL;
1895   GNUNET_LOAD_value_free (datastore_put_load);
1896   datastore_put_load = NULL;
1897 }
1898
1899
1900 /* end of gnunet-service-fs_pr.c */