2 This file is part of GNUnet.
3 Copyright (C) 2009-2013 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
22 * @file fs/gnunet-service-fs_pr.c
23 * @brief API to handle pending requests
24 * @author Christian Grothoff
27 #include "gnunet_util_lib.h"
28 #include "gnunet_load_lib.h"
29 #include "gnunet-service-fs.h"
30 #include "gnunet-service-fs_cp.h"
31 #include "gnunet-service-fs_indexing.h"
32 #include "gnunet-service-fs_pe.h"
33 #include "gnunet-service-fs_pr.h"
34 #include "gnunet-service-fs_cadet.h"
38 * Desired replication level for GETs.
40 #define DHT_GET_REPLICATION 5
43 * Maximum size of the datastore queue for P2P operations. Needs to
44 * be large enough to queue #MAX_QUEUE_PER_PEER operations for roughly
45 * the number of active (connected) peers.
47 #define MAX_DATASTORE_QUEUE (16 * MAX_QUEUE_PER_PEER)
50 * Bandwidth value of a 0-priority content (must be fairly high
51 * compared to query since content is typically significantly larger
52 * -- and more valueable since it can take many queries to get one
55 #define CONTENT_BANDWIDTH_VALUE 800
58 * Hard limit on the number of results we may get from the datastore per query.
60 #define MAX_RESULTS (100 * 1024)
63 * Collect an instane number of statistics? May cause excessive IPC.
65 #define INSANE_STATISTICS GNUNET_NO
68 * If obtaining a block via cadet fails, how often do we retry it before
69 * giving up for good (and sticking to non-anonymous transfer)?
71 #define CADET_RETRY_MAX 3
77 struct GSF_PendingRequest
80 * Public data for the request.
82 struct GSF_PendingRequestData public_data;
85 * Function to call if we encounter a reply.
87 GSF_PendingRequestReplyHandler rh;
95 * Array of hash codes of replies we've already seen.
97 struct GNUNET_HashCode *replies_seen;
100 * Bloomfilter masking replies we've already seen.
102 struct GNUNET_CONTAINER_BloomFilter *bf;
105 * Entry for this pending request in the expiration heap, or NULL.
107 struct GNUNET_CONTAINER_HeapNode *hnode;
110 * Datastore queue entry for this request (or NULL for none).
112 struct GNUNET_DATASTORE_QueueEntry *qe;
115 * DHT request handle for this request (or NULL for none).
117 struct GNUNET_DHT_GetHandle *gh;
120 * Cadet request handle for this request (or NULL for none).
122 struct GSF_CadetRequest *cadet_request;
125 * Function to call upon completion of the local get
126 * request, or NULL for none.
128 GSF_LocalLookupContinuation llc_cont;
131 * Closure for @e llc_cont.
136 * Last result from the local datastore lookup evaluation.
138 enum GNUNET_BLOCK_EvaluationResult local_result;
141 * Identity of the peer that we should use for the 'sender'
142 * (recipient of the response) when forwarding (0 for none).
144 GNUNET_PEER_Id sender_pid;
147 * Identity of the peer that we should never forward this query
148 * to since it originated this query (0 for none).
150 GNUNET_PEER_Id origin_pid;
153 * Time we started the last datastore lookup.
155 struct GNUNET_TIME_Absolute qe_start;
158 * Task that warns us if the local datastore lookup takes too long.
160 struct GNUNET_SCHEDULER_Task * warn_task;
163 * Current offset for querying our local datastore for results.
164 * Starts at a random value, incremented until we get the same
165 * UID again (detected using 'first_uid'), which is then used
166 * to termiante the iteration.
168 uint64_t local_result_offset;
171 * Unique ID of the first result from the local datastore;
172 * used to detect wrap-around of the offset.
177 * How often have we retried this request via 'cadet'?
178 * (used to bound overall retries).
180 unsigned int cadet_retry_count;
183 * Number of valid entries in the 'replies_seen' array.
185 unsigned int replies_seen_count;
188 * Length of the 'replies_seen' array.
190 unsigned int replies_seen_size;
193 * Mingle value we currently use for the bf.
198 * Do we have a first UID yet?
200 unsigned int have_first_uid;
206 * All pending requests, ordered by the query. Entries
207 * are of type 'struct GSF_PendingRequest*'.
209 static struct GNUNET_CONTAINER_MultiHashMap *pr_map;
213 * Datastore 'PUT' load tracking.
215 static struct GNUNET_LOAD_Value *datastore_put_load;
219 * Are we allowed to migrate content to this peer.
221 static int active_to_migration;
225 * Heap with the request that will expire next at the top. Contains
226 * pointers of type "struct PendingRequest*"; these will *also* be
227 * aliased from the "requests_by_peer" data structures and the
228 * "requests_by_query" table. Note that requests from our clients
229 * don't expire and are thus NOT in the "requests_by_expiration"
230 * (or the "requests_by_peer" tables).
232 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
236 * Maximum number of requests (from other peers, overall) that we're
237 * willing to have pending at any given point in time. Can be changed
238 * via the configuration file (32k is just the default).
240 static unsigned long long max_pending_requests = (32 * 1024);
245 * Recalculate our bloom filter for filtering replies. This function
246 * will create a new bloom filter from scratch, so it should only be
247 * called if we have no bloomfilter at all (and hence can create a
248 * fresh one of minimal size without problems) OR if our peer is the
249 * initiator (in which case we may resize to larger than mimimum size).
251 * @param pr request for which the BF is to be recomputed
254 refresh_bloomfilter (struct GSF_PendingRequest *pr)
257 GNUNET_CONTAINER_bloomfilter_free (pr->bf);
259 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
261 GNUNET_BLOCK_construct_bloomfilter (pr->mingle, pr->replies_seen,
262 pr->replies_seen_count);
267 * Create a new pending request.
269 * @param options request options
270 * @param type type of the block that is being requested
271 * @param query key for the lookup
272 * @param target preferred target for the request, NULL for none
273 * @param bf_data raw data for bloom filter for known replies, can be NULL
274 * @param bf_size number of bytes in @a bf_data
275 * @param mingle mingle value for bf
276 * @param anonymity_level desired anonymity level
277 * @param priority maximum outgoing cummulative request priority to use
278 * @param ttl current time-to-live for the request
279 * @param sender_pid peer ID to use for the sender when forwarding, 0 for none
280 * @param origin_pid peer ID of origin of query (do not loop back)
281 * @param replies_seen hash codes of known local replies
282 * @param replies_seen_count size of the @a replies_seen array
283 * @param rh handle to call when we get a reply
284 * @param rh_cls closure for @a rh
285 * @return handle for the new pending request
287 struct GSF_PendingRequest *
288 GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
289 enum GNUNET_BLOCK_Type type,
290 const struct GNUNET_HashCode *query,
291 const struct GNUNET_PeerIdentity *target,
292 const char *bf_data, size_t bf_size,
293 uint32_t mingle, uint32_t anonymity_level,
294 uint32_t priority, int32_t ttl,
295 GNUNET_PEER_Id sender_pid,
296 GNUNET_PEER_Id origin_pid,
297 const struct GNUNET_HashCode *replies_seen,
298 unsigned int replies_seen_count,
299 GSF_PendingRequestReplyHandler rh, void *rh_cls)
301 struct GSF_PendingRequest *pr;
302 struct GSF_PendingRequest *dpr;
304 struct GNUNET_HashCode *eptr;
306 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
307 "Creating request handle for `%s' of type %d\n",
308 GNUNET_h2s (query), type);
309 #if INSANE_STATISTICS
310 GNUNET_STATISTICS_update (GSF_stats,
311 gettext_noop ("# Pending requests created"), 1,
316 extra += sizeof (struct GNUNET_PeerIdentity);
317 pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest) + extra);
318 pr->local_result_offset =
319 GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
320 pr->public_data.query = *query;
321 eptr = (struct GNUNET_HashCode *) &pr[1];
324 pr->public_data.target = (struct GNUNET_PeerIdentity *) eptr;
325 memcpy (eptr, target, sizeof (struct GNUNET_PeerIdentity));
327 pr->public_data.anonymity_level = anonymity_level;
328 pr->public_data.priority = priority;
329 pr->public_data.original_priority = priority;
330 pr->public_data.options = options;
331 pr->public_data.type = type;
332 pr->public_data.start_time = GNUNET_TIME_absolute_get ();
333 pr->sender_pid = sender_pid;
334 pr->origin_pid = origin_pid;
337 GNUNET_assert ((sender_pid != 0) || (0 == (options & GSF_PRO_FORWARD_ONLY)));
339 pr->public_data.ttl =
340 GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply
341 (GNUNET_TIME_UNIT_SECONDS,
344 pr->public_data.ttl =
345 GNUNET_TIME_absolute_subtract (pr->public_data.start_time,
346 GNUNET_TIME_relative_multiply
347 (GNUNET_TIME_UNIT_SECONDS,
349 if (replies_seen_count > 0)
351 pr->replies_seen_size = replies_seen_count;
353 GNUNET_malloc (sizeof (struct GNUNET_HashCode) * pr->replies_seen_size);
354 memcpy (pr->replies_seen, replies_seen,
355 replies_seen_count * sizeof (struct GNUNET_HashCode));
356 pr->replies_seen_count = replies_seen_count;
361 GNUNET_CONTAINER_bloomfilter_init (bf_data, bf_size,
362 GNUNET_CONSTANTS_BLOOMFILTER_K);
365 else if ((replies_seen_count > 0) &&
366 (0 != (options & GSF_PRO_BLOOMFILTER_FULL_REFRESH)))
368 refresh_bloomfilter (pr);
370 GNUNET_CONTAINER_multihashmap_put (pr_map,
371 &pr->public_data.query, pr,
372 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
373 if (0 == (options & GSF_PRO_REQUEST_NEVER_EXPIRES))
376 GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap, pr,
377 pr->public_data.ttl.abs_value_us);
378 /* make sure we don't track too many requests */
379 while (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) >
380 max_pending_requests)
382 dpr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
383 GNUNET_assert (dpr != NULL);
385 break; /* let the request live briefly... */
387 dpr->rh (dpr->rh_cls, GNUNET_BLOCK_EVALUATION_REQUEST_VALID, dpr,
388 UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_ABS, GNUNET_TIME_UNIT_FOREVER_ABS,
389 GNUNET_BLOCK_TYPE_ANY, NULL, 0);
390 GSF_pending_request_cancel_ (dpr, GNUNET_YES);
393 GNUNET_STATISTICS_update (GSF_stats,
394 gettext_noop ("# Pending requests active"), 1,
400 * Obtain the public data associated with a pending request
402 * @param pr pending request
403 * @return associated public data
405 struct GSF_PendingRequestData *
406 GSF_pending_request_get_data_ (struct GSF_PendingRequest *pr)
408 return &pr->public_data;
413 * Test if two pending requests are compatible (would generate
414 * the same query modulo filters and should thus be processed
417 * @param pra a pending request
418 * @param prb another pending request
419 * @return #GNUNET_OK if the requests are compatible
422 GSF_pending_request_is_compatible_ (struct GSF_PendingRequest *pra,
423 struct GSF_PendingRequest *prb)
425 if ( (pra->public_data.type != prb->public_data.type) ||
426 (0 != memcmp (&pra->public_data.query,
427 &prb->public_data.query,
428 sizeof (struct GNUNET_HashCode))))
436 * Update a given pending request with additional replies
437 * that have been seen.
439 * @param pr request to update
440 * @param replies_seen hash codes of replies that we've seen
441 * @param replies_seen_count size of the replies_seen array
444 GSF_pending_request_update_ (struct GSF_PendingRequest *pr,
445 const struct GNUNET_HashCode * replies_seen,
446 unsigned int replies_seen_count)
449 struct GNUNET_HashCode mhash;
451 if (replies_seen_count + pr->replies_seen_count < pr->replies_seen_count)
452 return; /* integer overflow */
453 if (0 != (pr->public_data.options & GSF_PRO_BLOOMFILTER_FULL_REFRESH))
455 /* we're responsible for the BF, full refresh */
456 if (replies_seen_count + pr->replies_seen_count > pr->replies_seen_size)
457 GNUNET_array_grow (pr->replies_seen, pr->replies_seen_size,
458 replies_seen_count + pr->replies_seen_count);
459 memcpy (&pr->replies_seen[pr->replies_seen_count], replies_seen,
460 sizeof (struct GNUNET_HashCode) * replies_seen_count);
461 pr->replies_seen_count += replies_seen_count;
462 refresh_bloomfilter (pr);
468 /* we're not the initiator, but the initiator did not give us
469 * any bloom-filter, so we need to create one on-the-fly */
471 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX);
473 GNUNET_BLOCK_construct_bloomfilter (pr->mingle, replies_seen,
478 for (i = 0; i < pr->replies_seen_count; i++)
480 GNUNET_BLOCK_mingle_hash (&replies_seen[i], pr->mingle, &mhash);
481 GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
486 GNUNET_DHT_get_filter_known_results (pr->gh,
493 * Generate the message corresponding to the given pending request for
494 * transmission to other peers (or at least determine its size).
496 * @param pr request to generate the message for
497 * @param buf_size number of bytes available in @a buf
498 * @param buf where to copy the message (can be NULL)
499 * @return number of bytes needed (if `>` @a buf_size) or used
502 GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
503 size_t buf_size, void *buf)
505 char lbuf[GNUNET_SERVER_MAX_MESSAGE_SIZE];
506 struct GetMessage *gm;
507 struct GNUNET_PeerIdentity *ext;
513 struct GNUNET_TIME_Absolute now;
518 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
519 "Building request message for `%s' of type %d\n",
520 GNUNET_h2s (&pr->public_data.query), pr->public_data.type);
523 do_route = (0 == (pr->public_data.options & GSF_PRO_FORWARD_ONLY));
524 if ((!do_route) && (pr->sender_pid == 0))
527 do_route = GNUNET_YES;
531 bm |= GET_MESSAGE_BIT_RETURN_TO;
534 if (NULL != pr->public_data.target)
536 bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
539 bf_size = GNUNET_CONTAINER_bloomfilter_get_size (pr->bf);
540 msize = sizeof (struct GetMessage) + bf_size + k * sizeof (struct GNUNET_PeerIdentity);
541 GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
542 if (buf_size < msize)
544 gm = (struct GetMessage *) lbuf;
545 gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
546 gm->header.size = htons (msize);
547 gm->type = htonl (pr->public_data.type);
550 GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
551 pr->public_data.priority + 1);
554 pr->public_data.priority -= prio;
555 pr->public_data.num_transmissions++;
556 pr->public_data.respect_offered += prio;
557 gm->priority = htonl (prio);
558 now = GNUNET_TIME_absolute_get ();
559 ttl = (int64_t) (pr->public_data.ttl.abs_value_us - now.abs_value_us);
560 gm->ttl = htonl (ttl / 1000LL / 1000LL);
561 gm->filter_mutator = htonl (pr->mingle);
562 gm->hash_bitmap = htonl (bm);
563 gm->query = pr->public_data.query;
564 ext = (struct GNUNET_PeerIdentity *) &gm[1];
567 GNUNET_PEER_resolve (pr->sender_pid,
569 if (NULL != pr->public_data.target)
570 ext[k++] = *pr->public_data.target;
572 GNUNET_assert (GNUNET_SYSERR !=
573 GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
576 memcpy (buf, gm, msize);
582 * Iterator to free pending requests.
584 * @param cls closure, unused
585 * @param key current key code
586 * @param value value in the hash map (pending request)
587 * @return #GNUNET_YES (we should continue to iterate)
590 clean_request (void *cls, const struct GNUNET_HashCode *key, void *value)
592 struct GSF_PendingRequest *pr = value;
593 GSF_LocalLookupContinuation cont;
595 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
596 "Cleaning up pending request for `%s'.\n",
598 if (NULL != pr->cadet_request)
600 pr->cadet_retry_count = CADET_RETRY_MAX;
601 GSF_cadet_query_cancel (pr->cadet_request);
602 pr->cadet_request = NULL;
604 if (NULL != (cont = pr->llc_cont))
607 cont (pr->llc_cont_cls, pr, pr->local_result);
609 GSF_plan_notify_request_done_ (pr);
610 GNUNET_free_non_null (pr->replies_seen);
613 GNUNET_CONTAINER_bloomfilter_free (pr->bf);
616 GNUNET_PEER_change_rc (pr->sender_pid, -1);
618 GNUNET_PEER_change_rc (pr->origin_pid, -1);
620 if (NULL != pr->hnode)
622 GNUNET_CONTAINER_heap_remove_node (pr->hnode);
627 GNUNET_DATASTORE_cancel (pr->qe);
632 GNUNET_DHT_get_stop (pr->gh);
635 if (NULL != pr->warn_task)
637 GNUNET_SCHEDULER_cancel (pr->warn_task);
638 pr->warn_task = NULL;
640 GNUNET_assert (GNUNET_OK ==
641 GNUNET_CONTAINER_multihashmap_remove (pr_map,
642 &pr->public_data.query,
644 GNUNET_STATISTICS_update (GSF_stats,
645 gettext_noop ("# Pending requests active"), -1,
653 * Explicitly cancel a pending request.
655 * @param pr request to cancel
656 * @param full_cleanup fully purge the request
659 GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr,
662 GSF_LocalLookupContinuation cont;
665 return; /* already cleaned up! */
666 if (GNUNET_YES != full_cleanup)
668 /* make request inactive (we're no longer interested in more results),
669 * but do NOT remove from our data-structures, we still need it there
670 * to prevent the request from looping */
672 if (NULL != pr->cadet_request)
674 pr->cadet_retry_count = CADET_RETRY_MAX;
675 GSF_cadet_query_cancel (pr->cadet_request);
676 pr->cadet_request = NULL;
678 if (NULL != (cont = pr->llc_cont))
681 cont (pr->llc_cont_cls,
685 GSF_plan_notify_request_done_ (pr);
688 GNUNET_DATASTORE_cancel (pr->qe);
693 GNUNET_DHT_get_stop (pr->gh);
696 if (NULL != pr->warn_task)
698 GNUNET_SCHEDULER_cancel (pr->warn_task);
699 pr->warn_task = NULL;
703 GNUNET_assert (GNUNET_YES ==
705 &pr->public_data.query,
711 * Iterate over all pending requests.
713 * @param it function to call for each request
714 * @param cls closure for it
717 GSF_iterate_pending_requests_ (GSF_PendingRequestIterator it, void *cls)
719 GNUNET_CONTAINER_multihashmap_iterate (pr_map,
720 (GNUNET_CONTAINER_HashMapIterator) it,
726 * Closure for process_reply() function.
728 struct ProcessReplyClosure
731 * The data for the reply.
736 * Who gave us this reply? NULL for local host (or DHT)
738 struct GSF_ConnectedPeer *sender;
741 * When the reply expires.
743 struct GNUNET_TIME_Absolute expiration;
753 enum GNUNET_BLOCK_Type type;
756 * Control flags for evaluation.
758 enum GNUNET_BLOCK_EvaluationOptions eo;
761 * How much was this reply worth to us?
766 * Anonymity requirements for this reply.
768 uint32_t anonymity_level;
771 * Evaluation result (returned).
773 enum GNUNET_BLOCK_EvaluationResult eval;
776 * Did we find a matching request?
783 * Update the performance data for the sender (if any) since
784 * the sender successfully answered one of our queries.
786 * @param prq information about the sender
787 * @param pr request that was satisfied
790 update_request_performance_data (struct ProcessReplyClosure *prq,
791 struct GSF_PendingRequest *pr)
793 if (prq->sender == NULL)
795 GSF_peer_update_performance_ (prq->sender, pr->public_data.start_time,
801 * We have received a reply; handle it!
803 * @param cls response (a `struct ProcessReplyClosure`)
804 * @param key our query
805 * @param value value in the hash map (info about the query)
806 * @return #GNUNET_YES (we should continue to iterate)
809 process_reply (void *cls,
810 const struct GNUNET_HashCode *key,
813 struct ProcessReplyClosure *prq = cls;
814 struct GSF_PendingRequest *pr = value;
815 struct GNUNET_HashCode chash;
816 struct GNUNET_TIME_Absolute last_transmission;
820 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
821 "Matched result (type %u) for query `%s' with pending request\n",
822 (unsigned int) prq->type,
824 GNUNET_STATISTICS_update (GSF_stats,
825 gettext_noop ("# replies received and matched"), 1,
828 GNUNET_BLOCK_evaluate (GSF_block_ctx,
840 case GNUNET_BLOCK_EVALUATION_OK_MORE:
841 update_request_performance_data (prq, pr);
843 case GNUNET_BLOCK_EVALUATION_OK_LAST:
844 /* short cut: stop processing early, no BF-update, etc. */
845 update_request_performance_data (prq, pr);
846 GNUNET_LOAD_update (GSF_rt_entry_lifetime,
847 GNUNET_TIME_absolute_get_duration (pr->
848 public_data.start_time).rel_value_us);
850 GSF_request_plan_reference_get_last_transmission_ (pr->public_data.pr_head,
853 last_transmission.abs_value_us = GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us;
854 /* pass on to other peers / local clients */
855 pr->rh (pr->rh_cls, prq->eval, pr, prq->anonymity_level, prq->expiration,
856 last_transmission, prq->type, prq->data, prq->size);
858 case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
859 #if INSANE_STATISTICS
860 GNUNET_STATISTICS_update (GSF_stats,
862 ("# duplicate replies discarded (bloomfilter)"),
865 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
866 "Duplicate response, discarding.\n");
867 return GNUNET_YES; /* duplicate */
868 case GNUNET_BLOCK_EVALUATION_RESULT_IRRELEVANT:
869 GNUNET_STATISTICS_update (GSF_stats,
871 ("# irrelevant replies discarded"),
873 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
874 "Irrelevant response, ignoring.\n");
876 case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
877 return GNUNET_YES; /* wrong namespace */
878 case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
881 case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
884 case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
885 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
886 _("Unsupported block type %u\n"),
890 /* update bloomfilter */
891 GNUNET_CRYPTO_hash (prq->data,
894 GSF_pending_request_update_ (pr,
897 if (NULL == prq->sender)
899 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
900 "Found result for query `%s' in local datastore\n",
902 GNUNET_STATISTICS_update (GSF_stats,
903 gettext_noop ("# results found locally"),
909 GSF_dht_lookup_ (pr);
911 prq->priority += pr->public_data.original_priority;
912 pr->public_data.priority = 0;
913 pr->public_data.original_priority = 0;
914 pr->public_data.results_found++;
915 prq->request_found = GNUNET_YES;
916 /* finally, pass on to other peer / local client */
917 if (! GSF_request_plan_reference_get_last_transmission_ (pr->public_data.pr_head,
920 last_transmission.abs_value_us = GNUNET_TIME_UNIT_FOREVER_ABS.abs_value_us;
924 prq->anonymity_level,
935 * Context for put_migration_continuation().
937 struct PutMigrationContext
941 * Start time for the operation.
943 struct GNUNET_TIME_Absolute start;
948 struct GNUNET_PeerIdentity origin;
951 * #GNUNET_YES if we had a matching request for this block,
959 * Continuation called to notify client about result of the
963 * @param success #GNUNET_SYSERR on failure
964 * @param min_expiration minimum expiration time required for content to be stored
965 * @param msg NULL on success, otherwise an error message
968 put_migration_continuation (void *cls, int success,
969 struct GNUNET_TIME_Absolute min_expiration,
972 struct PutMigrationContext *pmc = cls;
973 struct GSF_ConnectedPeer *cp;
974 struct GNUNET_TIME_Relative mig_pause;
975 struct GSF_PeerPerformanceData *ppd;
977 if (NULL != datastore_put_load)
979 if (GNUNET_SYSERR != success)
981 GNUNET_LOAD_update (datastore_put_load,
982 GNUNET_TIME_absolute_get_duration (pmc->start).rel_value_us);
986 /* on queue failure / timeout, increase the put load dramatically */
987 GNUNET_LOAD_update (datastore_put_load,
988 GNUNET_TIME_UNIT_MINUTES.rel_value_us);
991 cp = GSF_peer_get_ (&pmc->origin);
992 if (GNUNET_OK == success)
996 ppd = GSF_get_peer_performance_data_ (cp);
997 ppd->migration_delay.rel_value_us /= 2;
1002 if ( (GNUNET_NO == success) &&
1003 (GNUNET_NO == pmc->requested) &&
1006 ppd = GSF_get_peer_performance_data_ (cp);
1007 if (min_expiration.abs_value_us > 0)
1009 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1010 "Asking to stop migration for %s because datastore is full\n",
1011 GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (min_expiration), GNUNET_YES));
1012 GSF_block_peer_migration_ (cp, min_expiration);
1016 ppd->migration_delay = GNUNET_TIME_relative_max (GNUNET_TIME_UNIT_SECONDS,
1017 ppd->migration_delay);
1018 ppd->migration_delay = GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_HOURS,
1019 ppd->migration_delay);
1020 mig_pause.rel_value_us = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
1021 ppd->migration_delay.rel_value_us);
1022 ppd->migration_delay = GNUNET_TIME_relative_multiply (ppd->migration_delay, 2);
1023 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1024 "Replicated content already exists locally, asking to stop migration for %s\n",
1025 GNUNET_STRINGS_relative_time_to_string (mig_pause,
1027 GSF_block_peer_migration_ (cp,
1028 GNUNET_TIME_relative_to_absolute (mig_pause));
1032 GNUNET_STATISTICS_update (GSF_stats,
1033 gettext_noop ("# Datastore `PUT' failures"), 1,
1039 * Test if the DATABASE (PUT) load on this peer is too high
1040 * to even consider processing the query at
1043 * @param priority the priority of the item
1044 * @return #GNUNET_YES if the load is too high to do anything (load high)
1045 * #GNUNET_NO to process normally (load normal or low)
1048 test_put_load_too_high (uint32_t priority)
1052 if (NULL == datastore_put_load)
1054 if (GNUNET_LOAD_get_average (datastore_put_load) < 50)
1055 return GNUNET_NO; /* very fast */
1056 ld = GNUNET_LOAD_get_load (datastore_put_load);
1057 if (ld < 2.0 * (1 + priority))
1059 GNUNET_STATISTICS_update (GSF_stats,
1061 ("# storage requests dropped due to high load"), 1,
1068 * Iterator called on each result obtained for a DHT
1069 * operation that expects a reply
1071 * @param cls closure
1072 * @param exp when will this value expire
1073 * @param key key of the result
1074 * @param get_path peers on reply path (or NULL if not recorded)
1075 * @param get_path_length number of entries in @a get_path
1076 * @param put_path peers on the PUT path (or NULL if not recorded)
1077 * @param put_path_length number of entries in @a get_path
1078 * @param type type of the result
1079 * @param size number of bytes in @a data
1080 * @param data pointer to the result data
1083 handle_dht_reply (void *cls,
1084 struct GNUNET_TIME_Absolute exp,
1085 const struct GNUNET_HashCode *key,
1086 const struct GNUNET_PeerIdentity *get_path,
1087 unsigned int get_path_length,
1088 const struct GNUNET_PeerIdentity *put_path,
1089 unsigned int put_path_length,
1090 enum GNUNET_BLOCK_Type type,
1094 struct GSF_PendingRequest *pr = cls;
1095 struct ProcessReplyClosure prq;
1096 struct PutMigrationContext *pmc;
1098 GNUNET_STATISTICS_update (GSF_stats,
1099 gettext_noop ("# Replies received from DHT"), 1,
1101 memset (&prq, 0, sizeof (prq));
1103 prq.expiration = exp;
1104 /* do not allow migrated content to live longer than 1 year */
1105 prq.expiration = GNUNET_TIME_absolute_min (GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_YEARS),
1109 prq.eo = GNUNET_BLOCK_EO_NONE;
1110 process_reply (&prq, key, pr);
1111 if ((GNUNET_YES == active_to_migration) &&
1112 (GNUNET_NO == test_put_load_too_high (prq.priority)))
1114 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1115 "Replicating result for query `%s' with priority %u\n",
1116 GNUNET_h2s (key), prq.priority);
1117 pmc = GNUNET_new (struct PutMigrationContext);
1118 pmc->start = GNUNET_TIME_absolute_get ();
1119 pmc->requested = GNUNET_YES;
1121 GNUNET_DATASTORE_put (GSF_dsh, 0, key, size, data, type, prq.priority,
1123 0 /* replication */ ,
1124 exp, 1 + prq.priority, MAX_DATASTORE_QUEUE,
1125 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
1126 &put_migration_continuation, pmc))
1128 put_migration_continuation (pmc,
1130 GNUNET_TIME_UNIT_ZERO_ABS,
1138 * Consider looking up the data in the DHT (anonymity-level permitting).
1140 * @param pr the pending request to process
1143 GSF_dht_lookup_ (struct GSF_PendingRequest *pr)
1147 struct GNUNET_PeerIdentity pi;
1148 char buf[sizeof (struct GNUNET_HashCode) * 2] GNUNET_ALIGN;
1150 if (0 != pr->public_data.anonymity_level)
1154 GNUNET_DHT_get_stop (pr->gh);
1159 if (0 != (pr->public_data.options & GSF_PRO_FORWARD_ONLY))
1161 GNUNET_assert (0 != pr->sender_pid);
1162 GNUNET_PEER_resolve (pr->sender_pid, &pi);
1163 memcpy (&buf[xquery_size], &pi, sizeof (struct GNUNET_PeerIdentity));
1164 xquery_size += sizeof (struct GNUNET_PeerIdentity);
1167 GNUNET_DHT_get_start (GSF_dht,
1168 pr->public_data.type, &pr->public_data.query,
1169 DHT_GET_REPLICATION,
1170 GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE,
1171 xquery, xquery_size, &handle_dht_reply, pr);
1172 if ( (NULL != pr->gh) &&
1173 (0 != pr->replies_seen_count) )
1174 GNUNET_DHT_get_filter_known_results (pr->gh,
1175 pr->replies_seen_count,
1181 * Function called with a reply from the cadet.
1183 * @param cls the pending request struct
1184 * @param type type of the block, ANY on error
1185 * @param expiration expiration time for the block
1186 * @param data_size number of bytes in @a data, 0 on error
1187 * @param data reply block data, NULL on error
1190 cadet_reply_proc (void *cls,
1191 enum GNUNET_BLOCK_Type type,
1192 struct GNUNET_TIME_Absolute expiration,
1196 struct GSF_PendingRequest *pr = cls;
1197 struct ProcessReplyClosure prq;
1198 struct GNUNET_HashCode query;
1200 pr->cadet_request = NULL;
1201 if (GNUNET_BLOCK_TYPE_ANY == type)
1203 GNUNET_break (NULL == data);
1204 GNUNET_break (0 == data_size);
1205 pr->cadet_retry_count++;
1206 if (pr->cadet_retry_count >= CADET_RETRY_MAX)
1207 return; /* give up on cadet */
1208 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1209 "Error retrieiving block via cadet\n");
1210 /* retry -- without delay, as this is non-anonymous
1211 and cadet/cadet connect will take some time anyway */
1212 pr->cadet_request = GSF_cadet_query (pr->public_data.target,
1213 &pr->public_data.query,
1214 pr->public_data.type,
1220 GNUNET_BLOCK_get_key (GSF_block_ctx,
1222 data, data_size, &query))
1224 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1225 "Failed to derive key for block of type %d\n",
1227 GNUNET_break_op (0);
1230 GNUNET_STATISTICS_update (GSF_stats,
1231 gettext_noop ("# Replies received from CADET"), 1,
1233 memset (&prq, 0, sizeof (prq));
1235 prq.expiration = expiration;
1236 /* do not allow migrated content to live longer than 1 year */
1237 prq.expiration = GNUNET_TIME_absolute_min (GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_YEARS),
1239 prq.size = data_size;
1241 prq.eo = GNUNET_BLOCK_EO_NONE;
1242 process_reply (&prq, &query, pr);
1247 * Consider downloading via cadet (if possible)
1249 * @param pr the pending request to process
1252 GSF_cadet_lookup_ (struct GSF_PendingRequest *pr)
1254 if (0 != pr->public_data.anonymity_level)
1256 if (0 == pr->public_data.target)
1258 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1259 "Cannot do cadet-based download, target peer not known\n");
1262 if (NULL != pr->cadet_request)
1264 pr->cadet_request = GSF_cadet_query (pr->public_data.target,
1265 &pr->public_data.query,
1266 pr->public_data.type,
1273 * Task that issues a warning if the datastore lookup takes too long.
1275 * @param cls the 'struct GSF_PendingRequest'
1276 * @param tc task context
1279 warn_delay_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1281 struct GSF_PendingRequest *pr = cls;
1283 GNUNET_log (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK,
1284 _("Datastore lookup already took %s!\n"),
1285 GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_duration (pr->qe_start),
1288 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1295 * Task that issues a warning if the datastore lookup takes too long.
1297 * @param cls the 'struct GSF_PendingRequest'
1298 * @param tc task context
1301 odc_warn_delay_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1303 struct GSF_PendingRequest *pr = cls;
1305 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1306 _("On-demand lookup already took %s!\n"),
1307 GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_duration (pr->qe_start), GNUNET_YES));
1309 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1310 &odc_warn_delay_task, pr);
1315 * We're processing (local) results for a search request
1316 * from another peer. Pass applicable results to the
1317 * peer and if we are done either clean up (operation
1318 * complete) or forward to other peers (more results possible).
1320 * @param cls our closure (`struct GSF_PendingRequest *`)
1321 * @param key key for the content
1322 * @param size number of bytes in @a data
1323 * @param data content stored
1324 * @param type type of the content
1325 * @param priority priority of the content
1326 * @param anonymity anonymity-level for the content
1327 * @param expiration expiration time for the content
1328 * @param uid unique identifier for the datum;
1329 * maybe 0 if no unique identifier is available
1332 process_local_reply (void *cls,
1333 const struct GNUNET_HashCode *key,
1336 enum GNUNET_BLOCK_Type type,
1339 struct GNUNET_TIME_Absolute expiration,
1342 struct GSF_PendingRequest *pr = cls;
1343 GSF_LocalLookupContinuation cont;
1344 struct ProcessReplyClosure prq;
1345 struct GNUNET_HashCode query;
1346 unsigned int old_rf;
1348 GNUNET_SCHEDULER_cancel (pr->warn_task);
1349 pr->warn_task = NULL;
1355 #if INSANE_STATISTICS
1356 GNUNET_STATISTICS_update (GSF_stats,
1358 ("# Datastore lookups concluded (no results)"),
1362 if (GNUNET_NO == pr->have_first_uid)
1364 pr->first_uid = uid;
1365 pr->have_first_uid = 1;
1369 if ((uid == pr->first_uid) && (key != NULL))
1371 GNUNET_STATISTICS_update (GSF_stats,
1373 ("# Datastore lookups concluded (seen all)"),
1375 key = NULL; /* all replies seen! */
1377 pr->have_first_uid++;
1378 if ((pr->have_first_uid > MAX_RESULTS) && (key != NULL))
1380 GNUNET_STATISTICS_update (GSF_stats,
1382 ("# Datastore lookups aborted (more than MAX_RESULTS)"),
1384 key = NULL; /* all replies seen! */
1390 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1391 "No further local responses available.\n");
1392 #if INSANE_STATISTICS
1393 if ((pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK) ||
1394 (pr->public_data.type == GNUNET_BLOCK_TYPE_FS_IBLOCK))
1395 GNUNET_STATISTICS_update (GSF_stats,
1397 ("# requested DBLOCK or IBLOCK not found"), 1,
1400 goto check_error_and_continue;
1402 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1403 "Received reply for `%s' of type %d with UID %llu from datastore.\n",
1404 GNUNET_h2s (key), type, (unsigned long long) uid);
1405 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
1407 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1408 "Found ONDEMAND block, performing on-demand encoding\n");
1409 GNUNET_STATISTICS_update (GSF_stats,
1411 ("# on-demand blocks matched requests"), 1,
1413 pr->qe_start = GNUNET_TIME_absolute_get ();
1415 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1416 &odc_warn_delay_task, pr);
1418 GNUNET_FS_handle_on_demand_block (key, size, data, type, priority,
1419 anonymity, expiration, uid,
1420 &process_local_reply, pr))
1422 GNUNET_STATISTICS_update (GSF_stats,
1424 ("# on-demand lookups performed successfully"),
1426 return; /* we're done */
1428 GNUNET_STATISTICS_update (GSF_stats,
1429 gettext_noop ("# on-demand lookups failed"), 1,
1431 GNUNET_SCHEDULER_cancel (pr->warn_task);
1433 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1434 &warn_delay_task, pr);
1436 GNUNET_DATASTORE_get_key (GSF_dsh, pr->local_result_offset - 1,
1437 &pr->public_data.query,
1438 pr->public_data.type ==
1439 GNUNET_BLOCK_TYPE_FS_DBLOCK ?
1440 GNUNET_BLOCK_TYPE_ANY : pr->public_data.type,
1442 (GSF_PRO_PRIORITY_UNLIMITED &
1443 pr->public_data.options)) ? UINT_MAX : 1
1444 /* queue priority */ ,
1446 (GSF_PRO_PRIORITY_UNLIMITED &
1447 pr->public_data.options)) ? UINT_MAX :
1448 GSF_datastore_queue_size
1449 /* max queue size */ ,
1450 GNUNET_TIME_UNIT_FOREVER_REL,
1451 &process_local_reply, pr);
1453 return; /* we're done */
1454 GNUNET_STATISTICS_update (GSF_stats,
1456 ("# Datastore lookups concluded (error queueing)"),
1458 goto check_error_and_continue;
1460 old_rf = pr->public_data.results_found;
1461 memset (&prq, 0, sizeof (prq));
1463 prq.expiration = expiration;
1466 GNUNET_BLOCK_get_key (GSF_block_ctx, type, data, size, &query))
1469 GNUNET_DATASTORE_remove (GSF_dsh, key, size, data, -1, -1,
1470 GNUNET_TIME_UNIT_FOREVER_REL, NULL, NULL);
1471 pr->qe_start = GNUNET_TIME_absolute_get ();
1473 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1474 &warn_delay_task, pr);
1476 GNUNET_DATASTORE_get_key (GSF_dsh, pr->local_result_offset - 1,
1477 &pr->public_data.query,
1478 pr->public_data.type ==
1479 GNUNET_BLOCK_TYPE_FS_DBLOCK ?
1480 GNUNET_BLOCK_TYPE_ANY : pr->public_data.type,
1482 (GSF_PRO_PRIORITY_UNLIMITED &
1483 pr->public_data.options)) ? UINT_MAX : 1
1484 /* queue priority */ ,
1486 (GSF_PRO_PRIORITY_UNLIMITED &
1487 pr->public_data.options)) ? UINT_MAX :
1488 GSF_datastore_queue_size
1489 /* max queue size */ ,
1490 GNUNET_TIME_UNIT_FOREVER_REL,
1491 &process_local_reply, pr);
1494 GNUNET_STATISTICS_update (GSF_stats,
1496 ("# Datastore lookups concluded (error queueing)"),
1498 goto check_error_and_continue;
1503 prq.priority = priority;
1504 prq.request_found = GNUNET_NO;
1505 prq.anonymity_level = anonymity;
1506 if ((0 == old_rf) && (0 == pr->public_data.results_found))
1507 GSF_update_datastore_delay_ (pr->public_data.start_time);
1508 prq.eo = GNUNET_BLOCK_EO_LOCAL_SKIP_CRYPTO;
1509 process_reply (&prq, key, pr);
1510 pr->local_result = prq.eval;
1511 if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
1513 GNUNET_STATISTICS_update (GSF_stats,
1515 ("# Datastore lookups concluded (found last result)"),
1518 goto check_error_and_continue;
1520 if ((0 == (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) &&
1521 ((GNUNET_YES == GSF_test_get_load_too_high_ (0)) ||
1522 (pr->public_data.results_found > 5 + 2 * pr->public_data.priority)))
1524 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1525 "Load too high, done with request\n");
1526 GNUNET_STATISTICS_update (GSF_stats,
1527 gettext_noop ("# Datastore lookups concluded (load too high)"),
1530 goto check_error_and_continue;
1532 pr->qe_start = GNUNET_TIME_absolute_get ();
1534 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1538 GNUNET_DATASTORE_get_key (GSF_dsh, pr->local_result_offset++,
1539 &pr->public_data.query,
1540 pr->public_data.type ==
1541 GNUNET_BLOCK_TYPE_FS_DBLOCK ?
1542 GNUNET_BLOCK_TYPE_ANY : pr->public_data.type,
1544 (GSF_PRO_PRIORITY_UNLIMITED & pr->
1545 public_data.options)) ? UINT_MAX : 1
1546 /* queue priority */ ,
1548 (GSF_PRO_PRIORITY_UNLIMITED & pr->
1549 public_data.options)) ? UINT_MAX :
1550 GSF_datastore_queue_size
1551 /* max queue size */ ,
1552 GNUNET_TIME_UNIT_FOREVER_REL,
1553 &process_local_reply, pr);
1554 /* check if we successfully queued another datastore request;
1555 * if so, return, otherwise call our continuation (if we have
1557 check_error_and_continue:
1560 if (NULL != pr->warn_task)
1562 GNUNET_SCHEDULER_cancel (pr->warn_task);
1563 pr->warn_task = NULL;
1565 if (NULL == (cont = pr->llc_cont))
1566 return; /* no continuation */
1567 pr->llc_cont = NULL;
1568 if (0 != (GSF_PRO_LOCAL_ONLY & pr->public_data.options))
1570 if (GNUNET_BLOCK_EVALUATION_OK_LAST != pr->local_result)
1572 /* Signal that we are done and that there won't be any
1573 additional results to allow client to clean up state. */
1575 GNUNET_BLOCK_EVALUATION_OK_LAST,
1578 GNUNET_TIME_UNIT_ZERO_ABS,
1579 GNUNET_TIME_UNIT_FOREVER_ABS,
1580 GNUNET_BLOCK_TYPE_ANY,
1583 /* Finally, call our continuation to signal that we are
1584 done with local processing of this request; i.e. to
1585 start reading again from the client. */
1586 cont (pr->llc_cont_cls, NULL, GNUNET_BLOCK_EVALUATION_OK_LAST);
1590 cont (pr->llc_cont_cls, pr, pr->local_result);
1595 * Is the given target a legitimate peer for forwarding the given request?
1599 * @return #GNUNET_YES if this request could be forwarded to the given peer
1602 GSF_pending_request_test_target_ (struct GSF_PendingRequest *pr,
1603 const struct GNUNET_PeerIdentity *target)
1605 struct GNUNET_PeerIdentity pi;
1607 if (0 == pr->origin_pid)
1609 GNUNET_PEER_resolve (pr->origin_pid, &pi);
1611 memcmp (&pi, target,
1612 sizeof (struct GNUNET_PeerIdentity))) ? GNUNET_NO :
1618 * Look up the request in the local datastore.
1620 * @param pr the pending request to process
1621 * @param cont function to call at the end
1622 * @param cont_cls closure for @a cont
1625 GSF_local_lookup_ (struct GSF_PendingRequest *pr,
1626 GSF_LocalLookupContinuation cont,
1629 GNUNET_assert (NULL == pr->gh);
1630 GNUNET_assert (NULL == pr->cadet_request);
1631 GNUNET_assert (NULL == pr->llc_cont);
1632 pr->llc_cont = cont;
1633 pr->llc_cont_cls = cont_cls;
1634 pr->qe_start = GNUNET_TIME_absolute_get ();
1636 GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
1639 #if INSANE_STATISTICS
1640 GNUNET_STATISTICS_update (GSF_stats,
1641 gettext_noop ("# Datastore lookups initiated"), 1,
1645 GNUNET_DATASTORE_get_key (GSF_dsh, pr->local_result_offset++,
1646 &pr->public_data.query,
1647 pr->public_data.type ==
1648 GNUNET_BLOCK_TYPE_FS_DBLOCK ?
1649 GNUNET_BLOCK_TYPE_ANY : pr->public_data.type,
1651 (GSF_PRO_PRIORITY_UNLIMITED & pr->
1652 public_data.options)) ? UINT_MAX : 1
1653 /* queue priority */ ,
1655 (GSF_PRO_PRIORITY_UNLIMITED & pr->
1656 public_data.options)) ? UINT_MAX :
1657 GSF_datastore_queue_size
1658 /* max queue size */ ,
1659 GNUNET_TIME_UNIT_FOREVER_REL,
1660 &process_local_reply, pr);
1663 GNUNET_STATISTICS_update (GSF_stats,
1665 ("# Datastore lookups concluded (error queueing)"),
1667 GNUNET_SCHEDULER_cancel (pr->warn_task);
1668 pr->warn_task = NULL;
1669 pr->llc_cont = NULL;
1671 cont (cont_cls, pr, pr->local_result);
1677 * Handle P2P "CONTENT" message. Checks that the message is
1678 * well-formed and then checks if there are any pending requests for
1679 * this content and possibly passes it on (to local clients or other
1680 * peers). Does NOT perform migration (content caching at this peer).
1682 * @param cp the other peer involved (sender or receiver, NULL
1683 * for loopback messages where we are both sender and receiver)
1684 * @param message the actual message
1685 * @return #GNUNET_OK if the message was well-formed,
1686 * #GNUNET_SYSERR if the message was malformed (close connection,
1687 * do not cache under any circumstances)
1690 GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
1691 const struct GNUNET_MessageHeader *message)
1693 const struct PutMessage *put;
1696 enum GNUNET_BLOCK_Type type;
1697 struct GNUNET_TIME_Absolute expiration;
1698 struct GNUNET_HashCode query;
1699 struct ProcessReplyClosure prq;
1700 struct GNUNET_TIME_Relative block_time;
1702 struct PutMigrationContext *pmc;
1704 msize = ntohs (message->size);
1705 if (msize < sizeof (struct PutMessage))
1707 GNUNET_break_op (0);
1708 return GNUNET_SYSERR;
1710 put = (const struct PutMessage *) message;
1711 dsize = msize - sizeof (struct PutMessage);
1712 type = ntohl (put->type);
1713 expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
1714 /* do not allow migrated content to live longer than 1 year */
1715 expiration = GNUNET_TIME_absolute_min (GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_YEARS),
1717 if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
1718 return GNUNET_SYSERR;
1720 GNUNET_BLOCK_get_key (GSF_block_ctx,
1726 GNUNET_break_op (0);
1727 return GNUNET_SYSERR;
1729 GNUNET_STATISTICS_update (GSF_stats,
1730 gettext_noop ("# GAP PUT messages received"),
1733 /* now, lookup 'query' */
1734 prq.data = (const void *) &put[1];
1738 prq.expiration = expiration;
1740 prq.anonymity_level = UINT32_MAX;
1741 prq.request_found = GNUNET_NO;
1742 prq.eo = GNUNET_BLOCK_EO_NONE;
1743 GNUNET_CONTAINER_multihashmap_get_multiple (pr_map,
1749 GSF_connected_peer_change_preference_ (cp,
1750 CONTENT_BANDWIDTH_VALUE +
1751 1000 * prq.priority);
1752 GSF_get_peer_performance_data_ (cp)->respect += prq.priority;
1754 if ((GNUNET_YES == active_to_migration) &&
1756 (GNUNET_NO == test_put_load_too_high (prq.priority)))
1758 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1759 "Replicating result for query `%s' with priority %u\n",
1760 GNUNET_h2s (&query),
1762 pmc = GNUNET_new (struct PutMigrationContext);
1763 pmc->start = GNUNET_TIME_absolute_get ();
1764 pmc->requested = prq.request_found;
1765 GNUNET_assert (0 != GSF_get_peer_performance_data_ (cp)->pid);
1766 GNUNET_PEER_resolve (GSF_get_peer_performance_data_ (cp)->pid,
1769 GNUNET_DATASTORE_put (GSF_dsh, 0, &query, dsize, &put[1], type,
1770 prq.priority, 1 /* anonymity */ ,
1771 0 /* replication */ ,
1772 expiration, 1 + prq.priority, MAX_DATASTORE_QUEUE,
1773 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
1774 &put_migration_continuation, pmc))
1776 put_migration_continuation (pmc,
1778 GNUNET_TIME_UNIT_ZERO_ABS,
1782 else if (NULL != cp)
1784 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1785 "Choosing not to keep content `%s' (%d/%d)\n",
1786 GNUNET_h2s (&query), active_to_migration,
1787 test_put_load_too_high (prq.priority));
1789 putl = GNUNET_LOAD_get_load (datastore_put_load);
1790 if ( (NULL != cp) &&
1791 (GNUNET_NO == prq.request_found) &&
1792 ( (GNUNET_YES != active_to_migration) ||
1793 (putl > 2.5 * (1 + prq.priority)) ) )
1795 if (GNUNET_YES != active_to_migration)
1796 putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
1798 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
1800 GNUNET_CRYPTO_random_u32
1801 (GNUNET_CRYPTO_QUALITY_WEAK,
1802 (unsigned int) (60000 * putl * putl)));
1803 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1804 "Asking to stop migration for %s because of load %f and events %d/%d\n",
1805 GNUNET_STRINGS_relative_time_to_string (block_time,
1808 active_to_migration,
1809 (GNUNET_NO == prq.request_found));
1810 GSF_block_peer_migration_ (cp, GNUNET_TIME_relative_to_absolute (block_time));
1817 * Setup the subsystem.
1820 GSF_pending_request_init_ ()
1823 GNUNET_CONFIGURATION_get_value_number (GSF_cfg, "fs",
1824 "MAX_PENDING_REQUESTS",
1825 &max_pending_requests))
1827 GNUNET_log_config_missing (GNUNET_ERROR_TYPE_INFO,
1828 "fs", "MAX_PENDING_REQUESTS");
1830 active_to_migration =
1831 GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg, "FS", "CONTENT_CACHING");
1832 datastore_put_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
1833 pr_map = GNUNET_CONTAINER_multihashmap_create (32 * 1024, GNUNET_YES);
1834 requests_by_expiration_heap =
1835 GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1840 * Shutdown the subsystem.
1843 GSF_pending_request_done_ ()
1845 GNUNET_CONTAINER_multihashmap_iterate (pr_map, &clean_request, NULL);
1846 GNUNET_CONTAINER_multihashmap_destroy (pr_map);
1848 GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
1849 requests_by_expiration_heap = NULL;
1850 GNUNET_LOAD_value_free (datastore_put_load);
1851 datastore_put_load = NULL;
1855 /* end of gnunet-service-fs_pr.c */