2 This file is part of GNUnet.
3 (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file fs/gnunet-service-fs_pr.c
23 * @brief API to handle pending requests
24 * @author Christian Grothoff
27 #include "gnunet-service-fs_pr.h"
33 struct GSF_PendingRequest
36 * Public data for the request.
38 struct GSF_PendingRequestData public_data;
41 * Function to call if we encounter a reply.
43 GSF_PendingRequestReplyHandler rh;
51 * Array of hash codes of replies we've already seen.
53 GNUNET_HashCode *replies_seen;
56 * Bloomfilter masking replies we've already seen.
58 struct GNUNET_CONTAINER_BloomFilter *bf;
61 * Entry for this pending request in the expiration heap, or NULL.
63 struct GNUNET_CONTAINER_HeapNode *hnode;
66 * Number of valid entries in the 'replies_seen' array.
68 unsigned int replies_seen_count;
71 * Length of the 'replies_seen' array.
73 unsigned int replies_seen_size;
76 * Mingle value we currently use for the bf.
84 * All pending requests, ordered by the query. Entries
85 * are of type 'struct GSF_PendingRequest*'.
87 static struct GNUNET_CONTAINER_MultiHashMap *pr_map;
91 * Datastore 'PUT' load tracking.
93 static struct GNUNET_LOAD_Value *datastore_put_load;
97 * Are we allowed to migrate content to this peer.
99 static int active_to_migration;
103 * Heap with the request that will expire next at the top. Contains
104 * pointers of type "struct PendingRequest*"; these will *also* be
105 * aliased from the "requests_by_peer" data structures and the
106 * "requests_by_query" table. Note that requests from our clients
107 * don't expire and are thus NOT in the "requests_by_expiration"
108 * (or the "requests_by_peer" tables).
110 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
114 * How many bytes should a bloomfilter be if we have already seen
115 * entry_count responses? Note that BLOOMFILTER_K gives us the number
116 * of bits set per entry. Furthermore, we should not re-size the
117 * filter too often (to keep it cheap).
119 * Since other peers will also add entries but not resize the filter,
120 * we should generally pick a slightly larger size than what the
121 * strict math would suggest.
123 * @return must be a power of two and smaller or equal to 2^15.
126 compute_bloomfilter_size (unsigned int entry_count)
129 unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
130 uint16_t max = 1 << 15;
132 if (entry_count > max)
135 while ((size < max) && (size < ideal))
144 * Recalculate our bloom filter for filtering replies. This function
145 * will create a new bloom filter from scratch, so it should only be
146 * called if we have no bloomfilter at all (and hence can create a
147 * fresh one of minimal size without problems) OR if our peer is the
148 * initiator (in which case we may resize to larger than mimimum size).
150 * @param pr request for which the BF is to be recomputed
151 * @return GNUNET_YES if a refresh actually happened
154 refresh_bloomfilter (struct GSF_PendingRequest *pr)
158 GNUNET_HashCode mhash;
160 nsize = compute_bloomfilter_size (pr->replies_seen_off);
162 (nsize == GNUNET_CONTAINER_bloomfilter_get_size (pr->bf)) )
163 return GNUNET_NO; /* size not changed */
165 GNUNET_CONTAINER_bloomfilter_free (pr->bf);
166 pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
168 pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
171 for (i=0;i<pr->replies_seen_count;i++)
173 GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i],
176 GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
183 * Create a new pending request.
185 * @param options request options
186 * @param type type of the block that is being requested
187 * @param query key for the lookup
188 * @param namespace namespace to lookup, NULL for no namespace
189 * @param target preferred target for the request, NULL for none
190 * @param bf_data raw data for bloom filter for known replies, can be NULL
191 * @param bf_size number of bytes in bf_data
192 * @param mingle mingle value for bf
193 * @param anonymity_level desired anonymity level
194 * @param priority maximum outgoing cummulative request priority to use
195 * @param ttl current time-to-live for the request
196 * @param replies_seen hash codes of known local replies
197 * @param replies_seen_count size of the 'replies_seen' array
198 * @param rh handle to call when we get a reply
199 * @param rh_cls closure for rh
200 * @return handle for the new pending request
202 struct GSF_PendingRequest *
203 GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
204 enum GNUNET_BLOCK_Type type,
205 const GNUNET_HashCode *query,
206 const GNUNET_HashCode *namespace,
207 const struct GNUNET_PeerIdentity *target,
211 uint32_t anonymity_level,
214 const GNUNET_HashCode *replies_seen,
215 unsigned int replies_seen_count,
216 GSF_PendingRequestReplyHandler rh,
219 struct GSF_PendingRequest *pr;
220 struct GSF_PendingRequest *dpr;
222 pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest));
223 pr->public_data.query = *query;
224 if (GNUNET_BLOCK_TYPE_SBLOCK == type)
226 GNUNET_assert (NULL != namespace);
227 pr->public_data.namespace = *namespace;
231 pr->public_data.target = *target;
232 pr->has_target = GNUNET_YES;
234 pr->public_data.anonymity_level = anonymity_data;
235 pr->public_data.priority = priority;
236 pr->public_data.original_priority = priority;
237 pr->public_data.options = options;
238 pr->public_data.type = type;
239 pr->public_data.start_time = GNUNET_TIME_absolute_get ();
243 pr->ttl = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
246 pr->ttl = GNUNET_TIME_absolute_subtract (pr->public_data.start_time,
247 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
248 (uint32_t) (- ttl)));
249 if (replies_seen_count > 0)
251 pr->replies_seen_size = replies_seen_count;
252 pr->replies_seen = GNUNET_malloc (sizeof (GNUNET_HashCode) * pr->replies_seen_size);
253 memcpy (pr->replies_seen,
255 replies_seen_count * sizeof (struct GNUNET_HashCode));
256 pr->replies_seen_count = replies_seen_count;
260 pr->bf = GNUNET_CONTAINER_bloomfilter_init (bf_data,
265 else if ( (replies_seen_count > 0) &&
266 (0 != (options & GSF_PRO_BLOOMFILTER_FULL_REFRESH)) )
268 GNUNET_assert (GNUNET_YES == refresh_bloomfilter (pr));
270 GNUNET_CONTAINER_multihashmap_put (pr_map,
273 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
274 if (0 != (options & GSF_PRO_REQUEST_EXPIRES))
276 pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
279 /* make sure we don't track too many requests */
280 while (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
282 dpr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
283 GNUNET_assert (dpr != NULL);
285 break; /* let the request live briefly... */
286 dpr->rh (dpr->rh_cls,
288 GNUNET_TIME_UNIT_FOREVER_ABS,
291 GSF_pending_request_cancel_ (dpr);
299 * Obtain the public data associated with a pending request
301 * @param pr pending request
302 * @return associated public data
304 struct GSF_PendingRequestData *
305 GSF_pending_request_get_data_ (struct GSF_PendingRequest *pr)
307 return &pr->public_data;
312 * Update a given pending request with additional replies
313 * that have been seen.
315 * @param pr request to update
316 * @param replies_seen hash codes of replies that we've seen
317 * @param replies_seen_count size of the replies_seen array
320 GSF_pending_request_update_ (struct GSF_PendingRequest *pr,
321 const GNUNET_HashCode *replies_seen,
322 unsigned int replies_seen_count)
325 GNUNET_HashCode mhash;
327 if (replies_seen_count + pr->replies_seen_count < pr->replies_seen_count)
328 return; /* integer overflow */
329 if (0 != (options & GSF_PRO_BLOOMFILTER_FULL_REFRESH))
331 /* we're responsible for the BF, full refresh */
332 if (replies_seen_count + pr->replies_seen_count > pr->replies_seen_size)
333 GNUNET_array_grow (pr->replies_seen,
334 pr->replies_seen_size,
335 replies_seen_count + pr->replies_seen_count);
336 memcpy (&pr->replies_seen[pr->replies_seen_count],
338 sizeof (GNUNET_HashCode) * replies_seen_count);
339 pr->replies_seen_count += replies_seen;
340 if (GNUNET_NO == refresh_bloomfilter (pr))
342 /* bf not recalculated, simply extend it with new bits */
343 for (i=0;i<pr->replies_seen_count;i++)
345 GNUNET_BLOCK_mingle_hash (&replies_seen[i],
348 GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
356 /* we're not the initiator, but the initiator did not give us
357 any bloom-filter, so we need to create one on-the-fly */
358 pr->mingle = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
360 pr->bf = GNUNET_CONTAINER_bloomfilter_init (compute_bloomfilter_size (replies_seen_count),
364 for (i=0;i<pr->replies_seen_count;i++)
366 GNUNET_BLOCK_mingle_hash (&replies_seen[i],
369 GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
376 * Generate the message corresponding to the given pending request for
377 * transmission to other peers (or at least determine its size).
379 * @param pr request to generate the message for
380 * @param do_route are we routing the reply
381 * @param buf_size number of bytes available in buf
382 * @param buf where to copy the message (can be NULL)
383 * @return number of bytes needed (if > buf_size) or used
386 GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
391 struct PendingMessage *pm;
392 char lbuf[GNUNET_SERVER_MAX_MESSAGE_SIZE];
393 struct GetMessage *gm;
394 GNUNET_HashCode *ext;
404 if (GNUNET_YES != do_route)
406 bm |= GET_MESSAGE_BIT_RETURN_TO;
409 if (GNUNET_BLOCK_TYPE_SBLOCK == pr->type)
411 bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
414 if (GNUNET_YES == pr->has_target)
416 bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
419 bf_size = GNUNET_CONTAINER_bloomfilter_get_size (pr->bf);
420 msize = sizeof (struct GetMessage) + bf_size + k * sizeof(GNUNET_HashCode);
421 GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
422 if (buf_size < msize)
424 gm = (struct GetMessage*) lbuf;
425 gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
426 gm->header.size = htons (msize);
427 gm->type = htonl (pr->type);
428 if (GNUNET_YES == do_route)
429 prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
430 pr->public_data.priority + 1);
433 pr->public_data.priority -= prio;
434 gm->priority = htonl (prio);
435 gm->ttl = htonl (pr->ttl);
436 gm->filter_mutator = htonl(pr->mingle);
437 gm->hash_bitmap = htonl (bm);
438 gm->query = pr->query;
439 ext = (GNUNET_HashCode*) &gm[1];
441 if (GNUNET_YES != do_route)
442 GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
443 if (GNUNET_BLOCK_TYPE_SBLOCK == pr->type)
444 memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
445 if (GNUNET_YES == pr->has_target)
446 GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
448 GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
451 memcpy (buf, gm, msize);
457 * Iterator to free pending requests.
459 * @param cls closure, unused
460 * @param key current key code
461 * @param value value in the hash map (pending request)
462 * @return GNUNET_YES (we should continue to iterate)
465 clean_request (void *cls,
466 const GNUNET_HashCode * key,
469 struct GSF_PendingRequest *pr = value;
471 GNUNET_free_non_null (pr->replies_seen);
473 GNUNET_CONTAINER_bloomfilter_free (pr->bf);
474 if (NULL != pr->hnode)
475 GNUNET_CONTAINER_heap_remove_node (requests_by_expiration_heap,
483 * Explicitly cancel a pending request.
485 * @param pr request to cancel
488 GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr)
490 GNUNET_assert (GNUNET_OK ==
491 GNUNET_CONTAINER_multihashmap_remove (pr_map,
492 &pr->public_data.query,
494 GNUNET_assert (GNUNET_YES ==
495 clean_request (NULL, &pr->public_data.query, pr));
500 * Iterate over all pending requests.
502 * @param it function to call for each request
503 * @param cls closure for it
506 GSF_iterate_pending_pr_map_ (GSF_PendingRequestIterator it,
509 GNUNET_CONTAINER_multihashmap_iterate (pr_map,
510 (GNUNET_CONTAINER_HashMapIterator) it,
518 * Closure for "process_reply" function.
520 struct ProcessReplyClosure
523 * The data for the reply.
528 * Who gave us this reply? NULL for local host (or DHT)
530 struct GSF_ConnectedPeer *sender;
533 * When the reply expires.
535 struct GNUNET_TIME_Absolute expiration;
545 enum GNUNET_BLOCK_Type type;
548 * How much was this reply worth to us?
553 * Anonymity requirements for this reply.
555 uint32_t anonymity_level;
558 * Evaluation result (returned).
560 enum GNUNET_BLOCK_EvaluationResult eval;
563 * Did we finish processing the associated request?
568 * Did we find a matching request?
575 * Update the performance data for the sender (if any) since
576 * the sender successfully answered one of our queries.
578 * @param prq information about the sender
579 * @param pr request that was satisfied
582 update_request_performance_data (struct ProcessReplyClosure *prq,
583 struct GSF_PendingRequest *pr)
586 struct GNUNET_TIME_Relative cur_delay;
588 if (prq->sender == NULL)
590 GSF_peer_update_performance_ (prq->sender,
597 * We have received a reply; handle it!
599 * @param cls response (struct ProcessReplyClosure)
600 * @param key our query
601 * @param value value in the hash map (info about the query)
602 * @return GNUNET_YES (we should continue to iterate)
605 process_reply (void *cls,
606 const GNUNET_HashCode * key,
609 struct ProcessReplyClosure *prq = cls;
610 struct GSF_PendingRequest *pr = value;
611 struct PendingMessage *reply;
612 struct ClientResponseMessage *creply;
613 struct ClientList *cl;
614 struct PutMessage *pm;
615 struct ConnectedPeer *cp;
617 GNUNET_HashCode chash;
620 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
621 "Matched result (type %u) for query `%s' with pending request\n",
622 (unsigned int) prq->type,
625 GNUNET_STATISTICS_update (stats,
626 gettext_noop ("# replies received and matched"),
629 prq->eval = GNUNET_BLOCK_evaluate (block_ctx,
634 pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
639 case GNUNET_BLOCK_EVALUATION_OK_MORE:
640 update_request_performance_data (prq, pr);
642 case GNUNET_BLOCK_EVALUATION_OK_LAST:
643 /* short cut: stop processing early, no BF-update, etc. */
644 update_request_performance_data (prq, pr);
645 GNUNET_LOAD_update (rt_entry_lifetime,
646 GNUNET_TIME_absolute_get_duration (pr->start_time).rel_value);
647 /* pass on to other peers / local clients */
648 pr->rh (pr->rh_cls, pr, prq->data, prq->size, GNUNET_NO);
649 /* destroy request, we're done */
650 GSF_pending_request_cancel_ (pr);
652 case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
653 GNUNET_STATISTICS_update (stats,
654 gettext_noop ("# duplicate replies discarded (bloomfilter)"),
658 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
659 "Duplicate response `%s', discarding.\n",
660 GNUNET_h2s (&mhash));
662 return GNUNET_YES; /* duplicate */
663 case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
664 return GNUNET_YES; /* wrong namespace */
665 case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
668 case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
671 case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
672 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
673 _("Unsupported block type %u\n"),
677 /* update bloomfilter */
678 GNUNET_CRYPTO_hash (prq->data,
681 GSF_pending_request_update_ (pr, &chash, 1);
682 if (NULL == prq->sender)
685 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
686 "Found result for query `%s' in local datastore\n",
689 GNUNET_STATISTICS_update (stats,
690 gettext_noop ("# results found locally"),
694 prq->priority += pr->public_data.original_priority;
695 pr->public_data.remaining_priority = 0;
696 pr->public_data.original_priority = 0;
697 pr->public_data.results_found++;
698 prq->request_found = GNUNET_YES;
699 /* finally, pass on to other peer / local client */
700 pr->rh (pr->rh_cls, pr, prq->data, prq->size, GNUNET_YES);
706 * Continuation called to notify client about result of the
710 * @param success GNUNET_SYSERR on failure
711 * @param msg NULL on success, otherwise an error message
714 put_migration_continuation (void *cls,
718 struct GNUNET_TIME_Absolute *start = cls;
719 struct GNUNET_TIME_Relative delay;
721 delay = GNUNET_TIME_absolute_get_duration (*start);
723 /* FIXME: should we really update the load value on failure? */
724 GNUNET_LOAD_update (datastore_put_load,
726 if (GNUNET_OK == success)
728 GNUNET_STATISTICS_update (stats,
729 gettext_noop ("# datastore 'put' failures"),
736 * Test if the DATABASE (PUT) load on this peer is too high
737 * to even consider processing the query at
740 * @return GNUNET_YES if the load is too high to do anything (load high)
741 * GNUNET_NO to process normally (load normal or low)
744 test_put_load_too_high (uint32_t priority)
748 if (GNUNET_LOAD_get_average (datastore_put_load) < 50)
749 return GNUNET_NO; /* very fast */
750 ld = GNUNET_LOAD_get_load (datastore_put_load);
751 if (ld < 2.0 * (1 + priority))
753 GNUNET_STATISTICS_update (stats,
754 gettext_noop ("# storage requests dropped due to high load"),
762 * Iterator called on each result obtained for a DHT
763 * operation that expects a reply
766 * @param exp when will this value expire
767 * @param key key of the result
768 * @param get_path NULL-terminated array of pointers
769 * to the peers on reverse GET path (or NULL if not recorded)
770 * @param put_path NULL-terminated array of pointers
771 * to the peers on the PUT path (or NULL if not recorded)
772 * @param type type of the result
773 * @param size number of bytes in data
774 * @param data pointer to the result data
777 GSF_handle_dht_reply_ (void *cls,
778 struct GNUNET_TIME_Absolute exp,
779 const GNUNET_HashCode * key,
780 const struct GNUNET_PeerIdentity * const *get_path,
781 const struct GNUNET_PeerIdentity * const *put_path,
782 enum GNUNET_BLOCK_Type type,
786 struct GSF_PendingRequest *pr = cls;
787 struct ProcessReplyClosure prq;
789 memset (&prq, 0, sizeof (prq));
791 prq.expiration = exp;
794 process_reply (&prq, key, pr);
795 if ( (GNUNET_YES == active_to_migration) &&
796 (GNUNET_NO == test_put_load_too_high (prq.priority)) )
799 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
800 "Replicating result for query `%s' with priority %u\n",
804 start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
805 *start = GNUNET_TIME_absolute_get ();
806 GNUNET_DATASTORE_put (dsh,
807 0, &query, dsize, &put[1],
808 type, prq.priority, 1 /* anonymity */,
810 1 + prq.priority, MAX_DATASTORE_QUEUE,
811 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
812 &put_migration_continuation,
819 * Handle P2P "CONTENT" message. Checks that the message is
820 * well-formed and then checks if there are any pending requests for
821 * this content and possibly passes it on (to local clients or other
822 * peers). Does NOT perform migration (content caching at this peer).
824 * @param cp the other peer involved (sender or receiver, NULL
825 * for loopback messages where we are both sender and receiver)
826 * @param message the actual message
827 * @return GNUNET_OK if the message was well-formed,
828 * GNUNET_SYSERR if the message was malformed (close connection,
829 * do not cache under any circumstances)
832 GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
833 const struct GNUNET_MessageHeader *message)
835 const struct PutMessage *put;
838 enum GNUNET_BLOCK_Type type;
839 struct GNUNET_TIME_Absolute expiration;
840 GNUNET_HashCode query;
841 struct ProcessReplyClosure prq;
842 struct GNUNET_TIME_Relative block_time;
844 struct GNUNET_TIME_Absolute *start;
846 msize = ntohs (message->size);
847 if (msize < sizeof (struct PutMessage))
850 return GNUNET_SYSERR;
852 put = (const struct PutMessage*) message;
853 dsize = msize - sizeof (struct PutMessage);
854 type = ntohl (put->type);
855 expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
856 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
857 return GNUNET_SYSERR;
859 GNUNET_BLOCK_get_key (block_ctx,
866 return GNUNET_SYSERR;
868 /* now, lookup 'query' */
869 prq.data = (const void*) &put[1];
876 prq.expiration = expiration;
878 prq.anonymity_level = 1;
879 prq.finished = GNUNET_NO;
880 prq.request_found = GNUNET_NO;
881 GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
887 GSF_connected_peer_change_preference (cp, CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority);
888 GSF_get_peer_performance_data (cp)->trust += prq.priority;
890 if ( (GNUNET_YES == active_to_migration) &&
891 (GNUNET_NO == test_put_load_too_high (prq.priority)) )
894 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
895 "Replicating result for query `%s' with priority %u\n",
899 start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
900 *start = GNUNET_TIME_absolute_get ();
901 GNUNET_DATASTORE_put (dsh,
902 0, &query, dsize, &put[1],
903 type, prq.priority, 1 /* anonymity */,
905 1 + prq.priority, MAX_DATASTORE_QUEUE,
906 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
907 &put_migration_continuation,
910 putl = GNUNET_LOAD_get_load (datastore_put_load);
911 if ( (NULL != (cp = prq.sender)) &&
912 (GNUNET_NO == prq.request_found) &&
913 ( (GNUNET_YES != active_to_migration) ||
914 (putl > 2.5 * (1 + prq.priority)) ) )
916 if (GNUNET_YES != active_to_migration)
917 putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
918 block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
919 5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
920 (unsigned int) (60000 * putl * putl)));
921 GSF_block_peer_migration (cp, block_time);
928 * Setup the subsystem.
931 GSF_pending_request_init_ ()
933 pr_map = GNUNET_CONTAINER_multihashmap_create (32 * 1024);
934 requests_by_expiration_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
939 * Shutdown the subsystem.
942 GSF_pending_request_done_ ()
944 GNUNET_CONTAINER_multihashmap_iterate (pr_map,
947 GNUNET_CONTAINER_multihashmap_destroy (pr_map);
949 GNUNET_CONTAINER_heap_destroy (requests_by_expiration_heap);
950 requests_by_expiration_heap = NULL;
954 /* end of gnunet-service-fs_pr.c */