2 This file is part of GNUnet.
3 (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file fs/gnunet-service-fs_pr.c
23 * @brief API to handle pending requests
24 * @author Christian Grothoff
27 #include "gnunet-service-fs_pr.h"
33 struct GSF_PendingRequest
36 * Public data for the request.
38 struct GSF_PendingRequestData public_data;
41 * Function to call if we encounter a reply.
43 GSF_PendingRequestReplyHandler rh;
51 * Array of hash codes of replies we've already seen.
53 GNUNET_HashCode *replies_seen;
56 * Bloomfilter masking replies we've already seen.
58 struct GNUNET_CONTAINER_BloomFilter *bf;
61 * Number of valid entries in the 'replies_seen' array.
63 unsigned int replies_seen_count;
66 * Length of the 'replies_seen' array.
68 unsigned int replies_seen_size;
71 * Mingle value we currently use for the bf.
79 * All pending requests, ordered by the query. Entries
80 * are of type 'struct GSF_PendingRequest*'.
82 static struct GNUNET_CONTAINER_MultiHashMap *pr_map;
86 * Datastore 'PUT' load tracking.
88 static struct GNUNET_LOAD_Value *datastore_put_load;
92 * Are we allowed to migrate content to this peer.
94 static int active_to_migration;
98 * Heap with the request that will expire next at the top. Contains
99 * pointers of type "struct PendingRequest*"; these will *also* be
100 * aliased from the "requests_by_peer" data structures and the
101 * "requests_by_query" table. Note that requests from our clients
102 * don't expire and are thus NOT in the "requests_by_expiration"
103 * (or the "requests_by_peer" tables).
105 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
109 * How many bytes should a bloomfilter be if we have already seen
110 * entry_count responses? Note that BLOOMFILTER_K gives us the number
111 * of bits set per entry. Furthermore, we should not re-size the
112 * filter too often (to keep it cheap).
114 * Since other peers will also add entries but not resize the filter,
115 * we should generally pick a slightly larger size than what the
116 * strict math would suggest.
118 * @return must be a power of two and smaller or equal to 2^15.
121 compute_bloomfilter_size (unsigned int entry_count)
124 unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
125 uint16_t max = 1 << 15;
127 if (entry_count > max)
130 while ((size < max) && (size < ideal))
139 * Recalculate our bloom filter for filtering replies. This function
140 * will create a new bloom filter from scratch, so it should only be
141 * called if we have no bloomfilter at all (and hence can create a
142 * fresh one of minimal size without problems) OR if our peer is the
143 * initiator (in which case we may resize to larger than mimimum size).
145 * @param pr request for which the BF is to be recomputed
146 * @return GNUNET_YES if a refresh actually happened
149 refresh_bloomfilter (struct GSF_PendingRequest *pr)
153 GNUNET_HashCode mhash;
155 nsize = compute_bloomfilter_size (pr->replies_seen_off);
157 (nsize == GNUNET_CONTAINER_bloomfilter_get_size (pr->bf)) )
158 return GNUNET_NO; /* size not changed */
160 GNUNET_CONTAINER_bloomfilter_free (pr->bf);
161 pr->mingle = (int32_t) GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
163 pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
166 for (i=0;i<pr->replies_seen_count;i++)
168 GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i],
171 GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
178 * Create a new pending request.
180 * @param options request options
181 * @param type type of the block that is being requested
182 * @param query key for the lookup
183 * @param namespace namespace to lookup, NULL for no namespace
184 * @param target preferred target for the request, NULL for none
185 * @param bf bloom filter for known replies, can be NULL
186 * @param mingle mingle value for bf
187 * @param anonymity_level desired anonymity level
188 * @param priority maximum outgoing cummulative request priority to use
189 * @param replies_seen hash codes of known local replies
190 * @param replies_seen_count size of the 'replies_seen' array
191 * @param rh handle to call when we get a reply
192 * @param rh_cls closure for rh
193 * @return handle for the new pending request
195 struct GSF_PendingRequest *
196 GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
197 enum GNUNET_BLOCK_Type type,
198 const GNUNET_HashCode *query,
199 const GNUNET_HashCode *namespace,
200 const struct GNUNET_PeerIdentity *target,
201 const struct GNUNET_CONTAINER_BloomFilter *bf,
203 uint32_t anonymity_level,
205 const GNUNET_HashCode *replies_seen,
206 unsigned int replies_seen_count,
207 GSF_PendingRequestReplyHandler rh,
210 struct GSF_PendingRequest *pr;
213 pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest));
214 pr->public_data.query = *query;
215 if (GNUNET_BLOCK_TYPE_SBLOCK == type)
217 GNUNET_assert (NULL != namespace);
218 pr->public_data.namespace = *namespace;
222 pr->public_data.target = *target;
223 pr->has_target = GNUNET_YES;
225 pr->public_data.anonymity_level = anonymity_data;
226 pr->public_data.priority = priority;
227 pr->public_data.options = options;
228 pr->public_data.type = type;
231 if (replies_seen_count > 0)
233 pr->replies_seen_size = replies_seen_count;
234 pr->replies_seen = GNUNET_malloc (sizeof (GNUNET_HashCode) * pr->replies_seen_size);
235 memcpy (pr->replies_seen,
237 replies_seen_count * sizeof (struct GNUNET_HashCode));
238 pr->replies_seen_count = replies_seen_count;
242 pr->bf = GNUNET_CONTAINER_bloomfilter_copy (bf);
245 else if ( (replies_seen_count > 0) &&
246 (0 != (options & GSF_PRO_BLOOMFILTER_FULL_REFRESH)) )
248 GNUNET_assert (GNUNET_YES == refresh_bloomfilter (pr));
250 GNUNET_CONTAINER_multihashmap_put (pr_map,
253 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
254 // FIXME: if not a local query, we also need to track the
255 // total number of external queries we currently have and
256 // bound it => need an additional heap!
262 * Update a given pending request with additional replies
263 * that have been seen.
265 * @param pr request to update
266 * @param replies_seen hash codes of replies that we've seen
267 * @param replies_seen_count size of the replies_seen array
270 GSF_pending_request_update_ (struct GSF_PendingRequest *pr,
271 const GNUNET_HashCode *replies_seen,
272 unsigned int replies_seen_count)
275 GNUNET_HashCode mhash;
277 if (replies_seen_count + pr->replies_seen_count < pr->replies_seen_count)
278 return; /* integer overflow */
279 if (0 != (options & GSF_PRO_BLOOMFILTER_FULL_REFRESH))
281 /* we're responsible for the BF, full refresh */
282 if (replies_seen_count + pr->replies_seen_count > pr->replies_seen_size)
283 GNUNET_array_grow (pr->replies_seen,
284 pr->replies_seen_size,
285 replies_seen_count + pr->replies_seen_count);
286 memcpy (&pr->replies_seen[pr->replies_seen_count],
288 sizeof (GNUNET_HashCode) * replies_seen_count);
289 pr->replies_seen_count += replies_seen;
290 if (GNUNET_NO == refresh_bloomfilter (pr))
292 /* bf not recalculated, simply extend it with new bits */
293 for (i=0;i<pr->replies_seen_count;i++)
295 GNUNET_BLOCK_mingle_hash (&replies_seen[i],
298 GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
306 /* we're not the initiator, but the initiator did not give us
307 any bloom-filter, so we need to create one on-the-fly */
308 pr->mingle = (int32_t) GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
310 pr->bf = GNUNET_CONTAINER_bloomfilter_init (compute_bloomfilter_size (replies_seen_count),
314 for (i=0;i<pr->replies_seen_count;i++)
316 GNUNET_BLOCK_mingle_hash (&replies_seen[i],
319 GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
326 * Generate the message corresponding to the given pending request for
327 * transmission to other peers (or at least determine its size).
329 * @param pr request to generate the message for
330 * @param do_route are we routing the reply
331 * @param buf_size number of bytes available in buf
332 * @param buf where to copy the message (can be NULL)
333 * @return number of bytes needed (if > buf_size) or used
336 GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
341 struct PendingMessage *pm;
342 char lbuf[GNUNET_SERVER_MAX_MESSAGE_SIZE];
343 struct GetMessage *gm;
344 GNUNET_HashCode *ext;
354 if (GNUNET_YES != do_route)
356 bm |= GET_MESSAGE_BIT_RETURN_TO;
359 if (GNUNET_BLOCK_TYPE_SBLOCK == pr->type)
361 bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
364 if (GNUNET_YES == pr->has_target)
366 bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
369 bf_size = GNUNET_CONTAINER_bloomfilter_get_size (pr->bf);
370 msize = sizeof (struct GetMessage) + bf_size + k * sizeof(GNUNET_HashCode);
371 GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
372 if (buf_size < msize)
374 gm = (struct GetMessage*) lbuf;
375 gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
376 gm->header.size = htons (msize);
377 gm->type = htonl (pr->type);
378 if (GNUNET_YES == do_route)
379 prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
380 pr->public_data.priority + 1);
383 pr->public_data.priority -= prio;
384 gm->priority = htonl (prio);
385 gm->ttl = htonl (pr->ttl);
386 gm->filter_mutator = htonl(pr->mingle);
387 gm->hash_bitmap = htonl (bm);
388 gm->query = pr->query;
389 ext = (GNUNET_HashCode*) &gm[1];
391 if (GNUNET_YES != do_route)
392 GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
393 if (GNUNET_BLOCK_TYPE_SBLOCK == pr->type)
394 memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
395 if (GNUNET_YES == pr->has_target)
396 GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
398 GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
401 memcpy (buf, gm, msize);
407 * Iterator to free pending requests.
409 * @param cls closure, unused
410 * @param key current key code
411 * @param value value in the hash map (pending request)
412 * @return GNUNET_YES (we should continue to iterate)
415 clean_request (void *cls,
416 const GNUNET_HashCode * key,
419 struct GSF_PendingRequest *pr = value;
421 GNUNET_free_non_null (pr->replies_seen);
423 GNUNET_CONTAINER_bloomfilter_free (pr->bf);
430 * Explicitly cancel a pending request.
432 * @param pr request to cancel
435 GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr)
437 GNUNET_assert (GNUNET_OK ==
438 GNUNET_CONTAINER_multihashmap_remove (pr_map,
439 &pr->public_data.query,
441 GNUNET_assert (GNUNET_YES ==
442 clean_request (NULL, &pr->public_data.query, pr));
447 * Iterate over all pending requests.
449 * @param it function to call for each request
450 * @param cls closure for it
453 GSF_iterate_pending_pr_map_ (GSF_PendingRequestIterator it,
456 GNUNET_CONTAINER_multihashmap_iterate (pr_map,
457 (GNUNET_CONTAINER_HashMapIterator) it,
465 * Closure for "process_reply" function.
467 struct ProcessReplyClosure
470 * The data for the reply.
475 * Who gave us this reply? NULL for local host (or DHT)
477 struct ConnectedPeer *sender;
480 * When the reply expires.
482 struct GNUNET_TIME_Absolute expiration;
492 enum GNUNET_BLOCK_Type type;
495 * How much was this reply worth to us?
500 * Anonymity requirements for this reply.
502 uint32_t anonymity_level;
505 * Evaluation result (returned).
507 enum GNUNET_BLOCK_EvaluationResult eval;
510 * Did we finish processing the associated request?
515 * Did we find a matching request?
522 * Update the performance data for the sender (if any) since
523 * the sender successfully answered one of our queries.
525 * @param prq information about the sender
526 * @param pr request that was satisfied
529 update_request_performance_data (struct ProcessReplyClosure *prq,
530 struct GSF_PendingRequest *pr)
533 struct GNUNET_TIME_Relative cur_delay;
535 if (prq->sender == NULL)
537 /* FIXME: adapt code to new API... */
538 for (i=0;i<pr->used_targets_off;i++)
539 if (pr->used_targets[i].pid == prq->sender->pid)
541 if (i < pr->used_targets_off)
543 cur_delay = GNUNET_TIME_absolute_get_duration (pr->used_targets[i].last_request_time);
544 prq->sender->avg_delay.rel_value
545 = (prq->sender->avg_delay.rel_value *
546 (RUNAVG_DELAY_N - 1) + cur_delay.rel_value) / RUNAVG_DELAY_N;
547 prq->sender->avg_priority
548 = (prq->sender->avg_priority *
549 (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
553 GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
554 [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE],
556 GNUNET_PEER_change_rc (pr->cp->pid, 1);
557 prq->sender->last_p2p_replies
558 [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
563 if (NULL != prq->sender->last_client_replies
564 [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
565 GNUNET_SERVER_client_drop (prq->sender->last_client_replies
566 [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
567 prq->sender->last_client_replies
568 [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
569 = pr->client_request_list->client_list->client;
570 GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
577 * We have received a reply; handle it!
579 * @param cls response (struct ProcessReplyClosure)
580 * @param key our query
581 * @param value value in the hash map (info about the query)
582 * @return GNUNET_YES (we should continue to iterate)
585 process_reply (void *cls,
586 const GNUNET_HashCode * key,
589 struct ProcessReplyClosure *prq = cls;
590 struct GSF_PendingRequest *pr = value;
591 struct PendingMessage *reply;
592 struct ClientResponseMessage *creply;
593 struct ClientList *cl;
594 struct PutMessage *pm;
595 struct ConnectedPeer *cp;
599 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
600 "Matched result (type %u) for query `%s' with pending request\n",
601 (unsigned int) prq->type,
604 GNUNET_STATISTICS_update (stats,
605 gettext_noop ("# replies received and matched"),
608 prq->eval = GNUNET_BLOCK_evaluate (block_ctx,
613 pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
618 case GNUNET_BLOCK_EVALUATION_OK_MORE:
619 update_request_performance_data (prq, pr);
621 case GNUNET_BLOCK_EVALUATION_OK_LAST:
622 update_request_performance_data (prq, pr);
623 /* FIXME: adapt code to new API! */
624 while (NULL != pr->pending_head)
625 destroy_pending_message_list_entry (pr->pending_head);
628 if (pr->client_request_list != NULL)
629 GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client,
631 GNUNET_DATASTORE_cancel (pr->qe);
634 pr->do_remove = GNUNET_YES;
635 if (pr->task != GNUNET_SCHEDULER_NO_TASK)
637 GNUNET_SCHEDULER_cancel (pr->task);
638 pr->task = GNUNET_SCHEDULER_NO_TASK;
640 GNUNET_break (GNUNET_YES ==
641 GNUNET_CONTAINER_multihashmap_remove (query_request_map,
644 GNUNET_LOAD_update (rt_entry_lifetime,
645 GNUNET_TIME_absolute_get_duration (pr->start_time).rel_value);
647 case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
648 GNUNET_STATISTICS_update (stats,
649 gettext_noop ("# duplicate replies discarded (bloomfilter)"),
653 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
654 "Duplicate response `%s', discarding.\n",
655 GNUNET_h2s (&mhash));
657 return GNUNET_YES; /* duplicate */
658 case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
659 return GNUNET_YES; /* wrong namespace */
660 case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
663 case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
666 case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
667 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
668 _("Unsupported block type %u\n"),
672 /* FIXME: adapt code to new API! */
673 if (pr->client_request_list != NULL)
675 if (pr->replies_seen_size == pr->replies_seen_off)
676 GNUNET_array_grow (pr->replies_seen,
677 pr->replies_seen_size,
678 pr->replies_seen_size * 2 + 4);
679 GNUNET_CRYPTO_hash (prq->data,
681 &pr->replies_seen[pr->replies_seen_off++]);
682 refresh_bloomfilter (pr);
684 if (NULL == prq->sender)
687 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
688 "Found result for query `%s' in local datastore\n",
691 GNUNET_STATISTICS_update (stats,
692 gettext_noop ("# results found locally"),
696 prq->priority += pr->remaining_priority;
697 pr->remaining_priority = 0;
699 prq->request_found = GNUNET_YES;
700 /* finally, pass on to other peers / local clients */
701 pr->rh (pr->rh_cls, pr, prq->data, prq->size);
707 * Continuation called to notify client about result of the
711 * @param success GNUNET_SYSERR on failure
712 * @param msg NULL on success, otherwise an error message
715 put_migration_continuation (void *cls,
719 struct GNUNET_TIME_Absolute *start = cls;
720 struct GNUNET_TIME_Relative delay;
722 delay = GNUNET_TIME_absolute_get_duration (*start);
724 /* FIXME: should we really update the load value on failure? */
725 GNUNET_LOAD_update (datastore_put_load,
727 if (GNUNET_OK == success)
729 GNUNET_STATISTICS_update (stats,
730 gettext_noop ("# datastore 'put' failures"),
737 * Test if the DATABASE (PUT) load on this peer is too high
738 * to even consider processing the query at
741 * @return GNUNET_YES if the load is too high to do anything (load high)
742 * GNUNET_NO to process normally (load normal or low)
745 test_put_load_too_high (uint32_t priority)
749 if (GNUNET_LOAD_get_average (datastore_put_load) < 50)
750 return GNUNET_NO; /* very fast */
751 ld = GNUNET_LOAD_get_load (datastore_put_load);
752 if (ld < 2.0 * (1 + priority))
754 GNUNET_STATISTICS_update (stats,
755 gettext_noop ("# storage requests dropped due to high load"),
763 * Iterator called on each result obtained for a DHT
764 * operation that expects a reply
767 * @param exp when will this value expire
768 * @param key key of the result
769 * @param get_path NULL-terminated array of pointers
770 * to the peers on reverse GET path (or NULL if not recorded)
771 * @param put_path NULL-terminated array of pointers
772 * to the peers on the PUT path (or NULL if not recorded)
773 * @param type type of the result
774 * @param size number of bytes in data
775 * @param data pointer to the result data
778 GSF_handle_dht_reply_ (void *cls,
779 struct GNUNET_TIME_Absolute exp,
780 const GNUNET_HashCode * key,
781 const struct GNUNET_PeerIdentity * const *get_path,
782 const struct GNUNET_PeerIdentity * const *put_path,
783 enum GNUNET_BLOCK_Type type,
787 struct GSF_PendingRequest *pr = cls;
788 struct ProcessReplyClosure prq;
790 memset (&prq, 0, sizeof (prq));
792 prq.expiration = exp;
795 process_reply (&prq, key, pr);
796 if ( (GNUNET_YES == active_to_migration) &&
797 (GNUNET_NO == test_put_load_too_high (prq.priority)) )
800 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
801 "Replicating result for query `%s' with priority %u\n",
805 start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
806 *start = GNUNET_TIME_absolute_get ();
807 GNUNET_DATASTORE_put (dsh,
808 0, &query, dsize, &put[1],
809 type, prq.priority, 1 /* anonymity */,
811 1 + prq.priority, MAX_DATASTORE_QUEUE,
812 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
813 &put_migration_continuation,
820 * Handle P2P "CONTENT" message. Checks that the message is
821 * well-formed and then checks if there are any pending requests for
822 * this content and possibly passes it on (to local clients or other
823 * peers). Does NOT perform migration (content caching at this peer).
825 * @param cp the other peer involved (sender or receiver, NULL
826 * for loopback messages where we are both sender and receiver)
827 * @param message the actual message
828 * @return GNUNET_OK if the message was well-formed,
829 * GNUNET_SYSERR if the message was malformed (close connection,
830 * do not cache under any circumstances)
833 GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
834 const struct GNUNET_MessageHeader *message)
836 const struct PutMessage *put;
839 enum GNUNET_BLOCK_Type type;
840 struct GNUNET_TIME_Absolute expiration;
841 GNUNET_HashCode query;
842 struct ProcessReplyClosure prq;
843 struct GNUNET_TIME_Relative block_time;
845 struct GNUNET_TIME_Absolute *start;
847 msize = ntohs (message->size);
848 if (msize < sizeof (struct PutMessage))
851 return GNUNET_SYSERR;
853 put = (const struct PutMessage*) message;
854 dsize = msize - sizeof (struct PutMessage);
855 type = ntohl (put->type);
856 expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
857 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
858 return GNUNET_SYSERR;
860 GNUNET_BLOCK_get_key (block_ctx,
867 return GNUNET_SYSERR;
869 /* now, lookup 'query' */
870 prq.data = (const void*) &put[1];
877 prq.expiration = expiration;
879 prq.anonymity_level = 1;
880 prq.finished = GNUNET_NO;
881 prq.request_found = GNUNET_NO;
882 GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
888 GSF_connected_peer_change_preference (cp, CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority);
889 GSF_get_peer_performance_data (cp)->trust += prq.priority;
891 if ( (GNUNET_YES == active_to_migration) &&
892 (GNUNET_NO == test_put_load_too_high (prq.priority)) )
895 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
896 "Replicating result for query `%s' with priority %u\n",
900 start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
901 *start = GNUNET_TIME_absolute_get ();
902 GNUNET_DATASTORE_put (dsh,
903 0, &query, dsize, &put[1],
904 type, prq.priority, 1 /* anonymity */,
906 1 + prq.priority, MAX_DATASTORE_QUEUE,
907 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
908 &put_migration_continuation,
911 putl = GNUNET_LOAD_get_load (datastore_put_load);
912 if ( (NULL != (cp = prq.sender)) &&
913 (GNUNET_NO == prq.request_found) &&
914 ( (GNUNET_YES != active_to_migration) ||
915 (putl > 2.5 * (1 + prq.priority)) ) )
917 if (GNUNET_YES != active_to_migration)
918 putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
919 block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
920 5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
921 (unsigned int) (60000 * putl * putl)));
922 GSF_block_peer_migration (cp, block_time);
929 * Setup the subsystem.
932 GSF_pending_request_init_ ()
934 pr_map = GNUNET_CONTAINER_multihashmap_create (32 * 1024);
939 * Shutdown the subsystem.
942 GSF_pending_request_done_ ()
944 GNUNET_CONTAINER_multihashmap_iterate (pr_map,
947 GNUNET_CONTAINER_multihashmap_destroy (pr_map);
952 /* end of gnunet-service-fs_pr.c */