2 This file is part of GNUnet.
3 (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
22 * @file fs/gnunet-service-fs_pr.c
23 * @brief API to handle pending requests
24 * @author Christian Grothoff
27 #include "gnunet-service-fs_pr.h"
33 struct GSF_PendingRequest
36 * Public data for the request.
38 struct GSF_PendingRequestData public_data;
41 * Function to call if we encounter a reply.
43 GSF_PendingRequestReplyHandler rh;
51 * Array of hash codes of replies we've already seen.
53 GNUNET_HashCode *replies_seen;
56 * Bloomfilter masking replies we've already seen.
58 struct GNUNET_CONTAINER_BloomFilter *bf;
61 * Number of valid entries in the 'replies_seen' array.
63 unsigned int replies_seen_count;
66 * Length of the 'replies_seen' array.
68 unsigned int replies_seen_size;
71 * Mingle value we currently use for the bf.
79 * All pending requests, ordered by the query. Entries
80 * are of type 'struct GSF_PendingRequest*'.
82 static struct GNUNET_CONTAINER_MultiHashMap *pr_map;
86 * Datastore 'PUT' load tracking.
88 static struct GNUNET_LOAD_Value *datastore_put_load;
92 * Are we allowed to migrate content to this peer.
94 static int active_to_migration;
98 * Heap with the request that will expire next at the top. Contains
99 * pointers of type "struct PendingRequest*"; these will *also* be
100 * aliased from the "requests_by_peer" data structures and the
101 * "requests_by_query" table. Note that requests from our clients
102 * don't expire and are thus NOT in the "requests_by_expiration"
103 * (or the "requests_by_peer" tables).
105 static struct GNUNET_CONTAINER_Heap *requests_by_expiration_heap;
109 * How many bytes should a bloomfilter be if we have already seen
110 * entry_count responses? Note that BLOOMFILTER_K gives us the number
111 * of bits set per entry. Furthermore, we should not re-size the
112 * filter too often (to keep it cheap).
114 * Since other peers will also add entries but not resize the filter,
115 * we should generally pick a slightly larger size than what the
116 * strict math would suggest.
118 * @return must be a power of two and smaller or equal to 2^15.
121 compute_bloomfilter_size (unsigned int entry_count)
124 unsigned int ideal = (entry_count * BLOOMFILTER_K) / 4;
125 uint16_t max = 1 << 15;
127 if (entry_count > max)
130 while ((size < max) && (size < ideal))
139 * Recalculate our bloom filter for filtering replies. This function
140 * will create a new bloom filter from scratch, so it should only be
141 * called if we have no bloomfilter at all (and hence can create a
142 * fresh one of minimal size without problems) OR if our peer is the
143 * initiator (in which case we may resize to larger than mimimum size).
145 * @param pr request for which the BF is to be recomputed
146 * @return GNUNET_YES if a refresh actually happened
149 refresh_bloomfilter (struct GSF_PendingRequest *pr)
153 GNUNET_HashCode mhash;
155 nsize = compute_bloomfilter_size (pr->replies_seen_off);
157 (nsize == GNUNET_CONTAINER_bloomfilter_get_size (pr->bf)) )
158 return GNUNET_NO; /* size not changed */
160 GNUNET_CONTAINER_bloomfilter_free (pr->bf);
161 pr->mingle = (int32_t) GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
163 pr->bf = GNUNET_CONTAINER_bloomfilter_init (NULL,
166 for (i=0;i<pr->replies_seen_count;i++)
168 GNUNET_BLOCK_mingle_hash (&pr->replies_seen[i],
171 GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
178 * Create a new pending request.
180 * @param options request options
181 * @param type type of the block that is being requested
182 * @param query key for the lookup
183 * @param namespace namespace to lookup, NULL for no namespace
184 * @param target preferred target for the request, NULL for none
185 * @param bf_data raw data for bloom filter for known replies, can be NULL
186 * @param bf_size number of bytes in bf_data
187 * @param mingle mingle value for bf
188 * @param anonymity_level desired anonymity level
189 * @param priority maximum outgoing cummulative request priority to use
190 * @param ttl current time-to-live for the request
191 * @param replies_seen hash codes of known local replies
192 * @param replies_seen_count size of the 'replies_seen' array
193 * @param rh handle to call when we get a reply
194 * @param rh_cls closure for rh
195 * @return handle for the new pending request
197 struct GSF_PendingRequest *
198 GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
199 enum GNUNET_BLOCK_Type type,
200 const GNUNET_HashCode *query,
201 const GNUNET_HashCode *namespace,
202 const struct GNUNET_PeerIdentity *target,
206 uint32_t anonymity_level,
209 const GNUNET_HashCode *replies_seen,
210 unsigned int replies_seen_count,
211 GSF_PendingRequestReplyHandler rh,
214 struct GSF_PendingRequest *pr;
217 pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest));
218 pr->public_data.query = *query;
219 if (GNUNET_BLOCK_TYPE_SBLOCK == type)
221 GNUNET_assert (NULL != namespace);
222 pr->public_data.namespace = *namespace;
226 pr->public_data.target = *target;
227 pr->has_target = GNUNET_YES;
229 pr->public_data.anonymity_level = anonymity_data;
230 pr->public_data.priority = priority;
231 pr->public_data.options = options;
232 pr->public_data.type = type;
233 pr->public_data.start_time = GNUNET_TIME_absolute_get ();
237 pr->ttl = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
240 pr->ttl = GNUNET_TIME_absolute_subtract (pr->public_data.start_time,
241 GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
242 (uint32_t) (- ttl)));
243 if (replies_seen_count > 0)
245 pr->replies_seen_size = replies_seen_count;
246 pr->replies_seen = GNUNET_malloc (sizeof (GNUNET_HashCode) * pr->replies_seen_size);
247 memcpy (pr->replies_seen,
249 replies_seen_count * sizeof (struct GNUNET_HashCode));
250 pr->replies_seen_count = replies_seen_count;
254 pr->bf = GNUNET_CONTAINER_bloomfilter_init (bf_data,
259 else if ( (replies_seen_count > 0) &&
260 (0 != (options & GSF_PRO_BLOOMFILTER_FULL_REFRESH)) )
262 GNUNET_assert (GNUNET_YES == refresh_bloomfilter (pr));
264 GNUNET_CONTAINER_multihashmap_put (pr_map,
267 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
268 // FIXME: if not a local query, we also need to track the
269 // total number of external queries we currently have and
270 // bound it => need an additional heap!
272 pr->hnode = GNUNET_CONTAINER_heap_insert (requests_by_expiration_heap,
274 pr->start_time.abs_value + pr->ttl);
278 /* make sure we don't track too many requests */
279 if (GNUNET_CONTAINER_heap_get_size (requests_by_expiration_heap) > max_pending_requests)
281 pr = GNUNET_CONTAINER_heap_peek (requests_by_expiration_heap);
282 GNUNET_assert (pr != NULL);
283 destroy_pending_request (pr);
292 * Obtain the public data associated with a pending request
294 * @param pr pending request
295 * @return associated public data
297 struct GSF_PendingRequestData *
298 GSF_pending_request_get_data_ (struct GSF_PendingRequest *pr)
300 return &pr->public_data;
305 * Update a given pending request with additional replies
306 * that have been seen.
308 * @param pr request to update
309 * @param replies_seen hash codes of replies that we've seen
310 * @param replies_seen_count size of the replies_seen array
313 GSF_pending_request_update_ (struct GSF_PendingRequest *pr,
314 const GNUNET_HashCode *replies_seen,
315 unsigned int replies_seen_count)
318 GNUNET_HashCode mhash;
320 if (replies_seen_count + pr->replies_seen_count < pr->replies_seen_count)
321 return; /* integer overflow */
322 if (0 != (options & GSF_PRO_BLOOMFILTER_FULL_REFRESH))
324 /* we're responsible for the BF, full refresh */
325 if (replies_seen_count + pr->replies_seen_count > pr->replies_seen_size)
326 GNUNET_array_grow (pr->replies_seen,
327 pr->replies_seen_size,
328 replies_seen_count + pr->replies_seen_count);
329 memcpy (&pr->replies_seen[pr->replies_seen_count],
331 sizeof (GNUNET_HashCode) * replies_seen_count);
332 pr->replies_seen_count += replies_seen;
333 if (GNUNET_NO == refresh_bloomfilter (pr))
335 /* bf not recalculated, simply extend it with new bits */
336 for (i=0;i<pr->replies_seen_count;i++)
338 GNUNET_BLOCK_mingle_hash (&replies_seen[i],
341 GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
349 /* we're not the initiator, but the initiator did not give us
350 any bloom-filter, so we need to create one on-the-fly */
351 pr->mingle = (int32_t) GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
353 pr->bf = GNUNET_CONTAINER_bloomfilter_init (compute_bloomfilter_size (replies_seen_count),
357 for (i=0;i<pr->replies_seen_count;i++)
359 GNUNET_BLOCK_mingle_hash (&replies_seen[i],
362 GNUNET_CONTAINER_bloomfilter_add (pr->bf, &mhash);
369 * Generate the message corresponding to the given pending request for
370 * transmission to other peers (or at least determine its size).
372 * @param pr request to generate the message for
373 * @param do_route are we routing the reply
374 * @param buf_size number of bytes available in buf
375 * @param buf where to copy the message (can be NULL)
376 * @return number of bytes needed (if > buf_size) or used
379 GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
384 struct PendingMessage *pm;
385 char lbuf[GNUNET_SERVER_MAX_MESSAGE_SIZE];
386 struct GetMessage *gm;
387 GNUNET_HashCode *ext;
397 if (GNUNET_YES != do_route)
399 bm |= GET_MESSAGE_BIT_RETURN_TO;
402 if (GNUNET_BLOCK_TYPE_SBLOCK == pr->type)
404 bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
407 if (GNUNET_YES == pr->has_target)
409 bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
412 bf_size = GNUNET_CONTAINER_bloomfilter_get_size (pr->bf);
413 msize = sizeof (struct GetMessage) + bf_size + k * sizeof(GNUNET_HashCode);
414 GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
415 if (buf_size < msize)
417 gm = (struct GetMessage*) lbuf;
418 gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
419 gm->header.size = htons (msize);
420 gm->type = htonl (pr->type);
421 if (GNUNET_YES == do_route)
422 prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
423 pr->public_data.priority + 1);
426 pr->public_data.priority -= prio;
427 gm->priority = htonl (prio);
428 gm->ttl = htonl (pr->ttl);
429 gm->filter_mutator = htonl(pr->mingle);
430 gm->hash_bitmap = htonl (bm);
431 gm->query = pr->query;
432 ext = (GNUNET_HashCode*) &gm[1];
434 if (GNUNET_YES != do_route)
435 GNUNET_PEER_resolve (pr->cp->pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
436 if (GNUNET_BLOCK_TYPE_SBLOCK == pr->type)
437 memcpy (&ext[k++], pr->namespace, sizeof (GNUNET_HashCode));
438 if (GNUNET_YES == pr->has_target)
439 GNUNET_PEER_resolve (pr->target_pid, (struct GNUNET_PeerIdentity*) &ext[k++]);
441 GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
444 memcpy (buf, gm, msize);
450 * Iterator to free pending requests.
452 * @param cls closure, unused
453 * @param key current key code
454 * @param value value in the hash map (pending request)
455 * @return GNUNET_YES (we should continue to iterate)
458 clean_request (void *cls,
459 const GNUNET_HashCode * key,
462 struct GSF_PendingRequest *pr = value;
464 GNUNET_free_non_null (pr->replies_seen);
466 GNUNET_CONTAINER_bloomfilter_free (pr->bf);
473 * Explicitly cancel a pending request.
475 * @param pr request to cancel
478 GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr)
480 GNUNET_assert (GNUNET_OK ==
481 GNUNET_CONTAINER_multihashmap_remove (pr_map,
482 &pr->public_data.query,
484 GNUNET_assert (GNUNET_YES ==
485 clean_request (NULL, &pr->public_data.query, pr));
490 * Iterate over all pending requests.
492 * @param it function to call for each request
493 * @param cls closure for it
496 GSF_iterate_pending_pr_map_ (GSF_PendingRequestIterator it,
499 GNUNET_CONTAINER_multihashmap_iterate (pr_map,
500 (GNUNET_CONTAINER_HashMapIterator) it,
508 * Closure for "process_reply" function.
510 struct ProcessReplyClosure
513 * The data for the reply.
518 * Who gave us this reply? NULL for local host (or DHT)
520 struct ConnectedPeer *sender;
523 * When the reply expires.
525 struct GNUNET_TIME_Absolute expiration;
535 enum GNUNET_BLOCK_Type type;
538 * How much was this reply worth to us?
543 * Anonymity requirements for this reply.
545 uint32_t anonymity_level;
548 * Evaluation result (returned).
550 enum GNUNET_BLOCK_EvaluationResult eval;
553 * Did we finish processing the associated request?
558 * Did we find a matching request?
565 * Update the performance data for the sender (if any) since
566 * the sender successfully answered one of our queries.
568 * @param prq information about the sender
569 * @param pr request that was satisfied
572 update_request_performance_data (struct ProcessReplyClosure *prq,
573 struct GSF_PendingRequest *pr)
576 struct GNUNET_TIME_Relative cur_delay;
578 if (prq->sender == NULL)
580 /* FIXME: adapt code to new API... */
581 for (i=0;i<pr->used_targets_off;i++)
582 if (pr->used_targets[i].pid == prq->sender->pid)
584 if (i < pr->used_targets_off)
586 cur_delay = GNUNET_TIME_absolute_get_duration (pr->used_targets[i].last_request_time);
587 prq->sender->avg_delay.rel_value
588 = (prq->sender->avg_delay.rel_value *
589 (RUNAVG_DELAY_N - 1) + cur_delay.rel_value) / RUNAVG_DELAY_N;
590 prq->sender->avg_priority
591 = (prq->sender->avg_priority *
592 (RUNAVG_DELAY_N - 1) + pr->priority) / (double) RUNAVG_DELAY_N;
596 GNUNET_PEER_change_rc (prq->sender->last_p2p_replies
597 [prq->sender->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE],
599 GNUNET_PEER_change_rc (pr->cp->pid, 1);
600 prq->sender->last_p2p_replies
601 [(prq->sender->last_p2p_replies_woff++) % P2P_SUCCESS_LIST_SIZE]
606 if (NULL != prq->sender->last_client_replies
607 [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE])
608 GNUNET_SERVER_client_drop (prq->sender->last_client_replies
609 [(prq->sender->last_client_replies_woff) % CS2P_SUCCESS_LIST_SIZE]);
610 prq->sender->last_client_replies
611 [(prq->sender->last_client_replies_woff++) % CS2P_SUCCESS_LIST_SIZE]
612 = pr->client_request_list->client_list->client;
613 GNUNET_SERVER_client_keep (pr->client_request_list->client_list->client);
620 * We have received a reply; handle it!
622 * @param cls response (struct ProcessReplyClosure)
623 * @param key our query
624 * @param value value in the hash map (info about the query)
625 * @return GNUNET_YES (we should continue to iterate)
628 process_reply (void *cls,
629 const GNUNET_HashCode * key,
632 struct ProcessReplyClosure *prq = cls;
633 struct GSF_PendingRequest *pr = value;
634 struct PendingMessage *reply;
635 struct ClientResponseMessage *creply;
636 struct ClientList *cl;
637 struct PutMessage *pm;
638 struct ConnectedPeer *cp;
642 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
643 "Matched result (type %u) for query `%s' with pending request\n",
644 (unsigned int) prq->type,
647 GNUNET_STATISTICS_update (stats,
648 gettext_noop ("# replies received and matched"),
651 prq->eval = GNUNET_BLOCK_evaluate (block_ctx,
656 pr->namespace, (pr->namespace != NULL) ? sizeof (GNUNET_HashCode) : 0,
661 case GNUNET_BLOCK_EVALUATION_OK_MORE:
662 update_request_performance_data (prq, pr);
664 case GNUNET_BLOCK_EVALUATION_OK_LAST:
665 update_request_performance_data (prq, pr);
666 /* FIXME: adapt code to new API! */
667 while (NULL != pr->pending_head)
668 destroy_pending_message_list_entry (pr->pending_head);
671 if (pr->client_request_list != NULL)
672 GNUNET_SERVER_receive_done (pr->client_request_list->client_list->client,
674 GNUNET_DATASTORE_cancel (pr->qe);
677 pr->do_remove = GNUNET_YES;
678 if (pr->task != GNUNET_SCHEDULER_NO_TASK)
680 GNUNET_SCHEDULER_cancel (pr->task);
681 pr->task = GNUNET_SCHEDULER_NO_TASK;
683 GNUNET_break (GNUNET_YES ==
684 GNUNET_CONTAINER_multihashmap_remove (query_request_map,
687 GNUNET_LOAD_update (rt_entry_lifetime,
688 GNUNET_TIME_absolute_get_duration (pr->start_time).rel_value);
690 case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
691 GNUNET_STATISTICS_update (stats,
692 gettext_noop ("# duplicate replies discarded (bloomfilter)"),
696 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
697 "Duplicate response `%s', discarding.\n",
698 GNUNET_h2s (&mhash));
700 return GNUNET_YES; /* duplicate */
701 case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
702 return GNUNET_YES; /* wrong namespace */
703 case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
706 case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
709 case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED:
710 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
711 _("Unsupported block type %u\n"),
715 /* FIXME: adapt code to new API! */
716 if (pr->client_request_list != NULL)
718 if (pr->replies_seen_size == pr->replies_seen_off)
719 GNUNET_array_grow (pr->replies_seen,
720 pr->replies_seen_size,
721 pr->replies_seen_size * 2 + 4);
722 GNUNET_CRYPTO_hash (prq->data,
724 &pr->replies_seen[pr->replies_seen_off++]);
725 refresh_bloomfilter (pr);
727 if (NULL == prq->sender)
730 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
731 "Found result for query `%s' in local datastore\n",
734 GNUNET_STATISTICS_update (stats,
735 gettext_noop ("# results found locally"),
739 prq->priority += pr->remaining_priority;
740 pr->remaining_priority = 0;
742 prq->request_found = GNUNET_YES;
743 /* finally, pass on to other peers / local clients */
744 pr->rh (pr->rh_cls, pr, prq->data, prq->size);
750 * Continuation called to notify client about result of the
754 * @param success GNUNET_SYSERR on failure
755 * @param msg NULL on success, otherwise an error message
758 put_migration_continuation (void *cls,
762 struct GNUNET_TIME_Absolute *start = cls;
763 struct GNUNET_TIME_Relative delay;
765 delay = GNUNET_TIME_absolute_get_duration (*start);
767 /* FIXME: should we really update the load value on failure? */
768 GNUNET_LOAD_update (datastore_put_load,
770 if (GNUNET_OK == success)
772 GNUNET_STATISTICS_update (stats,
773 gettext_noop ("# datastore 'put' failures"),
780 * Test if the DATABASE (PUT) load on this peer is too high
781 * to even consider processing the query at
784 * @return GNUNET_YES if the load is too high to do anything (load high)
785 * GNUNET_NO to process normally (load normal or low)
788 test_put_load_too_high (uint32_t priority)
792 if (GNUNET_LOAD_get_average (datastore_put_load) < 50)
793 return GNUNET_NO; /* very fast */
794 ld = GNUNET_LOAD_get_load (datastore_put_load);
795 if (ld < 2.0 * (1 + priority))
797 GNUNET_STATISTICS_update (stats,
798 gettext_noop ("# storage requests dropped due to high load"),
806 * Iterator called on each result obtained for a DHT
807 * operation that expects a reply
810 * @param exp when will this value expire
811 * @param key key of the result
812 * @param get_path NULL-terminated array of pointers
813 * to the peers on reverse GET path (or NULL if not recorded)
814 * @param put_path NULL-terminated array of pointers
815 * to the peers on the PUT path (or NULL if not recorded)
816 * @param type type of the result
817 * @param size number of bytes in data
818 * @param data pointer to the result data
821 GSF_handle_dht_reply_ (void *cls,
822 struct GNUNET_TIME_Absolute exp,
823 const GNUNET_HashCode * key,
824 const struct GNUNET_PeerIdentity * const *get_path,
825 const struct GNUNET_PeerIdentity * const *put_path,
826 enum GNUNET_BLOCK_Type type,
830 struct GSF_PendingRequest *pr = cls;
831 struct ProcessReplyClosure prq;
833 memset (&prq, 0, sizeof (prq));
835 prq.expiration = exp;
838 process_reply (&prq, key, pr);
839 if ( (GNUNET_YES == active_to_migration) &&
840 (GNUNET_NO == test_put_load_too_high (prq.priority)) )
843 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
844 "Replicating result for query `%s' with priority %u\n",
848 start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
849 *start = GNUNET_TIME_absolute_get ();
850 GNUNET_DATASTORE_put (dsh,
851 0, &query, dsize, &put[1],
852 type, prq.priority, 1 /* anonymity */,
854 1 + prq.priority, MAX_DATASTORE_QUEUE,
855 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
856 &put_migration_continuation,
863 * Handle P2P "CONTENT" message. Checks that the message is
864 * well-formed and then checks if there are any pending requests for
865 * this content and possibly passes it on (to local clients or other
866 * peers). Does NOT perform migration (content caching at this peer).
868 * @param cp the other peer involved (sender or receiver, NULL
869 * for loopback messages where we are both sender and receiver)
870 * @param message the actual message
871 * @return GNUNET_OK if the message was well-formed,
872 * GNUNET_SYSERR if the message was malformed (close connection,
873 * do not cache under any circumstances)
876 GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
877 const struct GNUNET_MessageHeader *message)
879 const struct PutMessage *put;
882 enum GNUNET_BLOCK_Type type;
883 struct GNUNET_TIME_Absolute expiration;
884 GNUNET_HashCode query;
885 struct ProcessReplyClosure prq;
886 struct GNUNET_TIME_Relative block_time;
888 struct GNUNET_TIME_Absolute *start;
890 msize = ntohs (message->size);
891 if (msize < sizeof (struct PutMessage))
894 return GNUNET_SYSERR;
896 put = (const struct PutMessage*) message;
897 dsize = msize - sizeof (struct PutMessage);
898 type = ntohl (put->type);
899 expiration = GNUNET_TIME_absolute_ntoh (put->expiration);
900 if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
901 return GNUNET_SYSERR;
903 GNUNET_BLOCK_get_key (block_ctx,
910 return GNUNET_SYSERR;
912 /* now, lookup 'query' */
913 prq.data = (const void*) &put[1];
920 prq.expiration = expiration;
922 prq.anonymity_level = 1;
923 prq.finished = GNUNET_NO;
924 prq.request_found = GNUNET_NO;
925 GNUNET_CONTAINER_multihashmap_get_multiple (query_request_map,
931 GSF_connected_peer_change_preference (cp, CONTENT_BANDWIDTH_VALUE + 1000 * prq.priority);
932 GSF_get_peer_performance_data (cp)->trust += prq.priority;
934 if ( (GNUNET_YES == active_to_migration) &&
935 (GNUNET_NO == test_put_load_too_high (prq.priority)) )
938 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
939 "Replicating result for query `%s' with priority %u\n",
943 start = GNUNET_malloc (sizeof (struct GNUNET_TIME_Absolute));
944 *start = GNUNET_TIME_absolute_get ();
945 GNUNET_DATASTORE_put (dsh,
946 0, &query, dsize, &put[1],
947 type, prq.priority, 1 /* anonymity */,
949 1 + prq.priority, MAX_DATASTORE_QUEUE,
950 GNUNET_CONSTANTS_SERVICE_TIMEOUT,
951 &put_migration_continuation,
954 putl = GNUNET_LOAD_get_load (datastore_put_load);
955 if ( (NULL != (cp = prq.sender)) &&
956 (GNUNET_NO == prq.request_found) &&
957 ( (GNUNET_YES != active_to_migration) ||
958 (putl > 2.5 * (1 + prq.priority)) ) )
960 if (GNUNET_YES != active_to_migration)
961 putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
962 block_time = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
963 5000 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
964 (unsigned int) (60000 * putl * putl)));
965 GSF_block_peer_migration (cp, block_time);
972 * Setup the subsystem.
975 GSF_pending_request_init_ ()
977 pr_map = GNUNET_CONTAINER_multihashmap_create (32 * 1024);
982 * Shutdown the subsystem.
985 GSF_pending_request_done_ ()
987 GNUNET_CONTAINER_multihashmap_iterate (pr_map,
990 GNUNET_CONTAINER_multihashmap_destroy (pr_map);
995 /* end of gnunet-service-fs_pr.c */