2 This file is part of GNUnet.
3 Copyright (C) 2011, 2016 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
18 SPDX-License-Identifier: AGPL3.0-or-later
21 * @file fs/gnunet-service-fs_cp.c
22 * @brief API to handle 'connected peers'
23 * @author Christian Grothoff
26 #include "gnunet_util_lib.h"
27 #include "gnunet_load_lib.h"
28 #include "gnunet-service-fs.h"
29 #include "gnunet-service-fs_cp.h"
30 #include "gnunet-service-fs_pe.h"
31 #include "gnunet-service-fs_pr.h"
32 #include "gnunet-service-fs_push.h"
33 #include "gnunet_peerstore_service.h"
37 * Ratio for moving average delay calculation. The previous
38 * average goes in with a factor of (n-1) into the calculation.
41 #define RUNAVG_DELAY_N 16
44 * How often do we flush respect values to disk?
46 #define RESPECT_FLUSH_FREQ GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 5)
49 * After how long do we discard a reply?
51 #define REPLY_TIMEOUT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
54 * Collect an instane number of statistics? May cause excessive IPC.
56 #define INSANE_STATISTICS GNUNET_NO
60 * Handle to cancel a transmission request.
62 struct GSF_PeerTransmitHandle {
64 * Kept in a doubly-linked list.
66 struct GSF_PeerTransmitHandle *next;
69 * Kept in a doubly-linked list.
71 struct GSF_PeerTransmitHandle *prev;
74 * Time when this transmission request was issued.
76 struct GNUNET_TIME_Absolute transmission_request_start_time;
79 * Envelope with the actual message.
81 struct GNUNET_MQ_Envelope *env;
84 * Peer this request targets.
86 struct GSF_ConnectedPeer *cp;
89 * #GNUNET_YES if this is a query, #GNUNET_NO for content.
94 * Did we get a reservation already?
99 * Priority of this request.
106 * Handle for an entry in our delay list.
108 struct GSF_DelayedHandle {
110 * Kept in a doubly-linked list.
112 struct GSF_DelayedHandle *next;
115 * Kept in a doubly-linked list.
117 struct GSF_DelayedHandle *prev;
120 * Peer this transmission belongs to.
122 struct GSF_ConnectedPeer *cp;
125 * Envelope of the message that was delayed.
127 struct GNUNET_MQ_Envelope *env;
130 * Task for the delay.
132 struct GNUNET_SCHEDULER_Task *delay_task;
135 * Size of the message.
142 * Information per peer and request.
146 * Handle to generic request (generic: from peer or local client).
148 struct GSF_PendingRequest *pr;
151 * Which specific peer issued this request?
153 struct GSF_ConnectedPeer *cp;
156 * Task for asynchronous stopping of this request.
158 struct GNUNET_SCHEDULER_Task *kill_task;
165 struct GSF_ConnectedPeer {
167 * Performance data for this peer.
169 struct GSF_PeerPerformanceData ppd;
172 * Time until when we blocked this peer from migrating
175 struct GNUNET_TIME_Absolute last_migration_block;
178 * Task scheduled to revive migration to this peer.
180 struct GNUNET_SCHEDULER_Task *mig_revive_task;
183 * Messages (replies, queries, content migration) we would like to
184 * send to this peer in the near future. Sorted by priority, head.
186 struct GSF_PeerTransmitHandle *pth_head;
189 * Messages (replies, queries, content migration) we would like to
190 * send to this peer in the near future. Sorted by priority, tail.
192 struct GSF_PeerTransmitHandle *pth_tail;
195 * Messages (replies, queries, content migration) we would like to
196 * send to this peer in the near future. Sorted by priority, head.
198 struct GSF_DelayedHandle *delayed_head;
201 * Messages (replies, queries, content migration) we would like to
202 * send to this peer in the near future. Sorted by priority, tail.
204 struct GSF_DelayedHandle *delayed_tail;
207 * Context of our GNUNET_ATS_reserve_bandwidth call (or NULL).
209 struct GNUNET_ATS_ReservationContext *rc;
212 * Task scheduled if we need to retry bandwidth reservation later.
214 struct GNUNET_SCHEDULER_Task *rc_delay_task;
217 * Active requests from this neighbour, map of query to `struct PeerRequest`.
219 struct GNUNET_CONTAINER_MultiHashMap *request_map;
222 * Handle for an active request for transmission to this
225 struct GNUNET_MQ_Handle *mq;
228 * Increase in traffic preference still to be submitted
229 * to the core service for this peer.
231 uint64_t inc_preference;
234 * Number of entries in @e delayed_head DLL.
236 unsigned int delay_queue_size;
239 * Respect rating for this peer on disk.
241 uint32_t disk_respect;
244 * Which offset in @e last_p2p_replies will be updated next?
245 * (we go round-robin).
247 unsigned int last_p2p_replies_woff;
250 * Which offset in @e last_client_replies will be updated next?
251 * (we go round-robin).
253 unsigned int last_client_replies_woff;
256 * Current offset into @e last_request_times ring buffer.
258 unsigned int last_request_times_off;
261 * #GNUNET_YES if we did successfully reserve 32k bandwidth,
267 * Handle to the PEERSTORE iterate request for peer respect value
269 struct GNUNET_PEERSTORE_IterateContext *respect_iterate_req;
274 * Map from peer identities to `struct GSF_ConnectPeer` entries.
276 static struct GNUNET_CONTAINER_MultiPeerMap *cp_map;
279 * Handle to peerstore service.
281 static struct GNUNET_PEERSTORE_Handle *peerstore;
284 * Task used to flush respect values to disk.
286 static struct GNUNET_SCHEDULER_Task *fr_task;
290 * Update the latency information kept for the given peer.
292 * @param id peer record to update
293 * @param latency current latency value
296 GSF_update_peer_latency_(const struct GNUNET_PeerIdentity *id,
297 struct GNUNET_TIME_Relative latency)
299 struct GSF_ConnectedPeer *cp;
301 cp = GSF_peer_get_(id);
303 return; /* we're not yet connected at the core level, ignore */
304 GNUNET_LOAD_value_set_decline(cp->ppd.transmission_delay,
310 * Return the performance data record for the given peer
312 * @param cp peer to query
313 * @return performance data record for the peer
315 struct GSF_PeerPerformanceData *
316 GSF_get_peer_performance_data_(struct GSF_ConnectedPeer *cp)
323 * Core is ready to transmit to a peer, get the message.
325 * @param cp which peer to send a message to
328 peer_transmit(struct GSF_ConnectedPeer *cp);
332 * Function called by core upon success or failure of our bandwidth reservation request.
334 * @param cls the `struct GSF_ConnectedPeer` of the peer for which we made the request
335 * @param peer identifies the peer
336 * @param amount set to the amount that was actually reserved or unreserved;
337 * either the full requested amount or zero (no partial reservations)
338 * @param res_delay if the reservation could not be satisfied (amount was 0), how
339 * long should the client wait until re-trying?
342 ats_reserve_callback(void *cls,
343 const struct GNUNET_PeerIdentity *peer,
345 struct GNUNET_TIME_Relative res_delay);
349 * If ready (bandwidth reserved), try to schedule transmission via
350 * core for the given handle.
352 * @param pth transmission handle to schedule
355 schedule_transmission(struct GSF_PeerTransmitHandle *pth)
357 struct GSF_ConnectedPeer *cp;
358 struct GNUNET_PeerIdentity target;
361 GNUNET_assert(0 != cp->ppd.pid);
362 GNUNET_PEER_resolve(cp->ppd.pid, &target);
364 if (0 != cp->inc_preference)
366 GNUNET_ATS_performance_change_preference(GSF_ats,
368 GNUNET_ATS_PREFERENCE_BANDWIDTH,
369 (double)cp->inc_preference,
370 GNUNET_ATS_PREFERENCE_END);
371 cp->inc_preference = 0;
374 if ((GNUNET_YES == pth->is_query) &&
375 (GNUNET_YES != pth->was_reserved))
377 /* query, need reservation */
378 if (GNUNET_YES != cp->did_reserve)
379 return; /* not ready */
380 cp->did_reserve = GNUNET_NO;
381 /* reservation already done! */
382 pth->was_reserved = GNUNET_YES;
383 cp->rc = GNUNET_ATS_reserve_bandwidth(GSF_ats,
386 &ats_reserve_callback,
395 * Core is ready to transmit to a peer, get the message.
397 * @param cp which peer to send a message to
400 peer_transmit(struct GSF_ConnectedPeer *cp)
402 struct GSF_PeerTransmitHandle *pth = cp->pth_head;
403 struct GSF_PeerTransmitHandle *pos;
407 GNUNET_CONTAINER_DLL_remove(cp->pth_head,
410 if (GNUNET_YES == pth->is_query)
412 cp->ppd.last_request_times[(cp->last_request_times_off++) %
413 MAX_QUEUE_PER_PEER] =
414 GNUNET_TIME_absolute_get();
415 GNUNET_assert(0 < cp->ppd.pending_queries--);
417 else if (GNUNET_NO == pth->is_query)
419 GNUNET_assert(0 < cp->ppd.pending_replies--);
421 GNUNET_LOAD_update(cp->ppd.transmission_delay,
422 GNUNET_TIME_absolute_get_duration
423 (pth->transmission_request_start_time).rel_value_us);
424 GNUNET_MQ_send(cp->mq,
427 if (NULL != (pos = cp->pth_head))
429 GNUNET_assert(pos != pth);
430 schedule_transmission(pos);
436 * (re)try to reserve bandwidth from the given peer.
438 * @param cls the `struct GSF_ConnectedPeer` to reserve from
441 retry_reservation(void *cls)
443 struct GSF_ConnectedPeer *cp = cls;
444 struct GNUNET_PeerIdentity target;
446 GNUNET_PEER_resolve(cp->ppd.pid, &target);
447 cp->rc_delay_task = NULL;
449 GNUNET_ATS_reserve_bandwidth(GSF_ats,
452 &ats_reserve_callback, cp);
457 * Function called by core upon success or failure of our bandwidth reservation request.
459 * @param cls the `struct GSF_ConnectedPeer` of the peer for which we made the request
460 * @param peer identifies the peer
461 * @param amount set to the amount that was actually reserved or unreserved;
462 * either the full requested amount or zero (no partial reservations)
463 * @param res_delay if the reservation could not be satisfied (amount was 0), how
464 * long should the client wait until re-trying?
467 ats_reserve_callback(void *cls,
468 const struct GNUNET_PeerIdentity *peer,
470 struct GNUNET_TIME_Relative res_delay)
472 struct GSF_ConnectedPeer *cp = cls;
473 struct GSF_PeerTransmitHandle *pth;
475 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
476 "Reserved %d bytes / need to wait %s for reservation\n",
478 GNUNET_STRINGS_relative_time_to_string(res_delay, GNUNET_YES));
483 GNUNET_SCHEDULER_add_delayed(res_delay,
488 cp->did_reserve = GNUNET_YES;
492 /* reservation success, try transmission now! */
499 * Function called by PEERSTORE with peer respect record
501 * @param cls handle to connected peer entry
502 * @param record peerstore record information
503 * @param emsg error message, or NULL if no errors
506 peer_respect_cb(void *cls,
507 const struct GNUNET_PEERSTORE_Record *record,
510 struct GSF_ConnectedPeer *cp = cls;
512 GNUNET_assert(NULL != cp->respect_iterate_req);
513 if ((NULL != record) &&
514 (sizeof(cp->disk_respect) == record->value_size))
516 cp->disk_respect = *((uint32_t *)record->value);
517 cp->ppd.respect += *((uint32_t *)record->value);
521 GNUNET_PEERSTORE_iterate_cancel(cp->respect_iterate_req);
522 cp->respect_iterate_req = NULL;
527 * Function called for each pending request whenever a new
528 * peer connects, giving us a chance to decide about submitting
529 * the existing request to the new peer.
531 * @param cls the `struct GSF_ConnectedPeer` of the new peer
532 * @param key query for the request
533 * @param pr handle to the pending request
534 * @return #GNUNET_YES to continue to iterate
537 consider_peer_for_forwarding(void *cls,
538 const struct GNUNET_HashCode *key,
539 struct GSF_PendingRequest *pr)
541 struct GSF_ConnectedPeer *cp = cls;
542 struct GNUNET_PeerIdentity pid;
545 GSF_pending_request_test_active_(pr))
546 return GNUNET_YES; /* request is not actually active, skip! */
547 GSF_connected_peer_get_identity_(cp, &pid);
549 GSF_pending_request_test_target_(pr, &pid))
551 GNUNET_STATISTICS_update(GSF_stats,
552 gettext_noop("# Loopback routes suppressed"),
557 GSF_plan_add_(cp, pr);
563 * A peer connected to us. Setup the connected peer
567 * @param peer identity of peer that connected
568 * @param mq message queue for talking to @a peer
569 * @return our internal handle for the peer
572 GSF_peer_connect_handler(void *cls,
573 const struct GNUNET_PeerIdentity *peer,
574 struct GNUNET_MQ_Handle *mq)
576 struct GSF_ConnectedPeer *cp;
579 GNUNET_memcmp(&GSF_my_id,
582 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
583 "Connected to peer %s\n",
585 cp = GNUNET_new(struct GSF_ConnectedPeer);
586 cp->ppd.pid = GNUNET_PEER_intern(peer);
589 cp->ppd.transmission_delay = GNUNET_LOAD_value_init(GNUNET_TIME_UNIT_ZERO);
591 GNUNET_ATS_reserve_bandwidth(GSF_ats,
594 &ats_reserve_callback, cp);
595 cp->request_map = GNUNET_CONTAINER_multihashmap_create(128,
597 GNUNET_break(GNUNET_OK ==
598 GNUNET_CONTAINER_multipeermap_put(cp_map,
599 GSF_connected_peer_get_identity2_(cp),
601 GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
602 GNUNET_STATISTICS_set(GSF_stats,
603 gettext_noop("# peers connected"),
604 GNUNET_CONTAINER_multipeermap_size(cp_map),
606 cp->respect_iterate_req
607 = GNUNET_PEERSTORE_iterate(peerstore,
613 GSF_iterate_pending_requests_(&consider_peer_for_forwarding,
620 * It may be time to re-start migrating content to this
621 * peer. Check, and if so, restart migration.
623 * @param cls the `struct GSF_ConnectedPeer`
626 revive_migration(void *cls)
628 struct GSF_ConnectedPeer *cp = cls;
629 struct GNUNET_TIME_Relative bt;
631 cp->mig_revive_task = NULL;
632 bt = GNUNET_TIME_absolute_get_remaining(cp->ppd.migration_blocked_until);
633 if (0 != bt.rel_value_us)
635 /* still time left... */
636 cp->mig_revive_task =
637 GNUNET_SCHEDULER_add_delayed(bt, &revive_migration, cp);
645 * Get a handle for a connected peer.
647 * @param peer peer's identity
648 * @return NULL if the peer is not currently connected
650 struct GSF_ConnectedPeer *
651 GSF_peer_get_(const struct GNUNET_PeerIdentity *peer)
655 return GNUNET_CONTAINER_multipeermap_get(cp_map, peer);
660 * Handle P2P #GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP message.
662 * @param cls closure, the `struct GSF_ConnectedPeer`
663 * @param msm the actual message
666 handle_p2p_migration_stop(void *cls,
667 const struct MigrationStopMessage *msm)
669 struct GSF_ConnectedPeer *cp = cls;
670 struct GNUNET_TIME_Relative bt;
672 GNUNET_STATISTICS_update(GSF_stats,
673 gettext_noop("# migration stop messages received"),
675 bt = GNUNET_TIME_relative_ntoh(msm->duration);
676 GNUNET_log(GNUNET_ERROR_TYPE_INFO,
677 _("Migration of content to peer `%s' blocked for %s\n"),
678 GNUNET_i2s(cp->ppd.peer),
679 GNUNET_STRINGS_relative_time_to_string(bt, GNUNET_YES));
680 cp->ppd.migration_blocked_until = GNUNET_TIME_relative_to_absolute(bt);
681 if ((NULL == cp->mig_revive_task) &&
682 (NULL == cp->respect_iterate_req))
685 cp->mig_revive_task =
686 GNUNET_SCHEDULER_add_delayed(bt,
687 &revive_migration, cp);
693 * Free resources associated with the given peer request.
695 * @param peerreq request to free
698 free_pending_request(struct PeerRequest *peerreq)
700 struct GSF_ConnectedPeer *cp = peerreq->cp;
701 struct GSF_PendingRequestData *prd;
703 prd = GSF_pending_request_get_data_(peerreq->pr);
704 if (NULL != peerreq->kill_task)
706 GNUNET_SCHEDULER_cancel(peerreq->kill_task);
707 peerreq->kill_task = NULL;
709 GNUNET_STATISTICS_update(GSF_stats,
710 gettext_noop("# P2P searches active"),
713 GNUNET_break(GNUNET_YES ==
714 GNUNET_CONTAINER_multihashmap_remove(cp->request_map,
717 GNUNET_free(peerreq);
722 * Cancel all requests associated with the peer.
725 * @param query hash code of the request
726 * @param value the `struct GSF_PendingRequest`
727 * @return #GNUNET_YES (continue to iterate)
730 cancel_pending_request(void *cls,
731 const struct GNUNET_HashCode *query,
734 struct PeerRequest *peerreq = value;
735 struct GSF_PendingRequest *pr = peerreq->pr;
737 free_pending_request(peerreq);
738 GSF_pending_request_cancel_(pr,
745 * Free the given request.
747 * @param cls the request to free
750 peer_request_destroy(void *cls)
752 struct PeerRequest *peerreq = cls;
753 struct GSF_PendingRequest *pr = peerreq->pr;
754 struct GSF_PendingRequestData *prd;
756 peerreq->kill_task = NULL;
757 prd = GSF_pending_request_get_data_(pr);
758 cancel_pending_request(NULL,
765 * The artificial delay is over, transmit the message now.
767 * @param cls the `struct GSF_DelayedHandle` with the message
770 transmit_delayed_now(void *cls)
772 struct GSF_DelayedHandle *dh = cls;
773 struct GSF_ConnectedPeer *cp = dh->cp;
775 GNUNET_CONTAINER_DLL_remove(cp->delayed_head,
778 cp->delay_queue_size--;
779 GSF_peer_transmit_(cp,
788 * Get the randomized delay a response should be subjected to.
790 * @return desired delay
792 static struct GNUNET_TIME_Relative
793 get_randomized_delay()
795 struct GNUNET_TIME_Relative ret;
798 GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MILLISECONDS,
799 GNUNET_CRYPTO_random_u32
800 (GNUNET_CRYPTO_QUALITY_WEAK,
801 2 * GSF_avg_latency.rel_value_us + 1));
802 #if INSANE_STATISTICS
803 GNUNET_STATISTICS_update(GSF_stats,
805 ("# artificial delays introduced (ms)"),
806 ret.rel_value_us / 1000LL, GNUNET_NO);
813 * Handle a reply to a pending request. Also called if a request
814 * expires (then with data == NULL). The handler may be called
815 * many times (depending on the request type), but will not be
816 * called during or after a call to GSF_pending_request_cancel
817 * and will also not be called anymore after a call signalling
820 * @param cls `struct PeerRequest` this is an answer for
821 * @param eval evaluation of the result
822 * @param pr handle to the original pending request
823 * @param reply_anonymity_level anonymity level for the reply, UINT32_MAX for "unknown"
824 * @param expiration when does @a data expire?
825 * @param last_transmission when did we last transmit a request for this block
826 * @param type type of the block
827 * @param data response data, NULL on request expiration
828 * @param data_len number of bytes in @a data
831 handle_p2p_reply(void *cls,
832 enum GNUNET_BLOCK_EvaluationResult eval,
833 struct GSF_PendingRequest *pr,
834 uint32_t reply_anonymity_level,
835 struct GNUNET_TIME_Absolute expiration,
836 struct GNUNET_TIME_Absolute last_transmission,
837 enum GNUNET_BLOCK_Type type,
841 struct PeerRequest *peerreq = cls;
842 struct GSF_ConnectedPeer *cp = peerreq->cp;
843 struct GSF_PendingRequestData *prd;
844 struct GNUNET_MQ_Envelope *env;
845 struct PutMessage *pm;
848 GNUNET_assert(data_len + sizeof(struct PutMessage) <
849 GNUNET_MAX_MESSAGE_SIZE);
850 GNUNET_assert(peerreq->pr == pr);
851 prd = GSF_pending_request_get_data_(pr);
854 free_pending_request(peerreq);
857 GNUNET_break(GNUNET_BLOCK_TYPE_ANY != type);
858 if ((prd->type != type) && (GNUNET_BLOCK_TYPE_ANY != prd->type))
860 GNUNET_STATISTICS_update(GSF_stats,
862 ("# replies dropped due to type mismatch"),
866 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
867 "Transmitting result for query `%s' to peer\n",
868 GNUNET_h2s(&prd->query));
869 GNUNET_STATISTICS_update(GSF_stats,
870 gettext_noop("# replies received for other peers"),
872 msize = sizeof(struct PutMessage) + data_len;
873 if (msize >= GNUNET_MAX_MESSAGE_SIZE)
878 if ((UINT32_MAX != reply_anonymity_level) && (reply_anonymity_level > 1))
880 if (reply_anonymity_level - 1 > GSF_cover_content_count)
882 GNUNET_STATISTICS_update(GSF_stats,
884 ("# replies dropped due to insufficient cover traffic"),
888 GSF_cover_content_count -= (reply_anonymity_level - 1);
891 env = GNUNET_MQ_msg_extra(pm,
893 GNUNET_MESSAGE_TYPE_FS_PUT);
894 pm->type = htonl(type);
895 pm->expiration = GNUNET_TIME_absolute_hton(expiration);
896 GNUNET_memcpy(&pm[1],
899 if ((UINT32_MAX != reply_anonymity_level) &&
900 (0 != reply_anonymity_level) &&
901 (GNUNET_YES == GSF_enable_randomized_delays))
903 struct GSF_DelayedHandle *dh;
905 dh = GNUNET_new(struct GSF_DelayedHandle);
909 GNUNET_CONTAINER_DLL_insert(cp->delayed_head,
912 cp->delay_queue_size++;
914 GNUNET_SCHEDULER_add_delayed(get_randomized_delay(),
915 &transmit_delayed_now,
920 GSF_peer_transmit_(cp,
925 if (GNUNET_BLOCK_EVALUATION_OK_LAST != eval)
927 if (NULL == peerreq->kill_task)
929 GNUNET_STATISTICS_update(GSF_stats,
931 ("# P2P searches destroyed due to ultimate reply"),
935 GNUNET_SCHEDULER_add_now(&peer_request_destroy,
942 * Increase the peer's respect by a value.
944 * @param cp which peer to change the respect value on
945 * @param value is the int value by which the
946 * peer's credit is to be increased or decreased
947 * @returns the actual change in respect (positive or negative)
950 change_peer_respect(struct GSF_ConnectedPeer *cp, int value)
954 GNUNET_assert(NULL != cp);
957 if (cp->ppd.respect + value < cp->ppd.respect)
959 value = UINT32_MAX - cp->ppd.respect;
960 cp->ppd.respect = UINT32_MAX;
963 cp->ppd.respect += value;
967 if (cp->ppd.respect < -value)
969 value = -cp->ppd.respect;
973 cp->ppd.respect += value;
980 * We've received a request with the specified priority. Bound it
981 * according to how much we respect the given peer.
983 * @param prio_in requested priority
984 * @param cp the peer making the request
985 * @return effective priority
988 bound_priority(uint32_t prio_in,
989 struct GSF_ConnectedPeer *cp)
991 #define N ((double)128.0)
996 ld = GSF_test_get_load_too_high_(0);
997 if (GNUNET_SYSERR == ld)
999 #if INSANE_STATISTICS
1000 GNUNET_STATISTICS_update(GSF_stats,
1002 ("# requests done for free (low load)"), 1,
1005 return 0; /* excess resources */
1007 if (prio_in > INT32_MAX)
1008 prio_in = INT32_MAX;
1009 ret = -change_peer_respect(cp, -(int)prio_in);
1012 if (ret > GSF_current_priorities + N)
1013 rret = GSF_current_priorities + N;
1016 GSF_current_priorities = (GSF_current_priorities * (N - 1) + rret) / N;
1018 if ((GNUNET_YES == ld) && (ret > 0))
1020 /* try with charging */
1021 ld = GSF_test_get_load_too_high_(ret);
1023 if (GNUNET_YES == ld)
1025 GNUNET_STATISTICS_update(GSF_stats,
1027 ("# request dropped, priority insufficient"), 1,
1030 change_peer_respect(cp, (int)ret);
1031 return -1; /* not enough resources */
1035 GNUNET_STATISTICS_update(GSF_stats,
1037 ("# requests done for a price (normal load)"), 1,
1046 * The priority level imposes a bound on the maximum
1047 * value for the ttl that can be requested.
1049 * @param ttl_in requested ttl
1050 * @param prio given priority
1051 * @return @a ttl_in if @a ttl_in is below the limit,
1052 * otherwise the ttl-limit for the given @a prio
1055 bound_ttl(int32_t ttl_in,
1058 unsigned long long allowed;
1062 allowed = ((unsigned long long)prio) * TTL_DECREMENT / 1000;
1063 if (ttl_in > allowed)
1065 if (allowed >= (1 << 30))
1074 * Closure for #test_exist_cb().
1076 struct TestExistClosure {
1078 * Priority of the incoming request.
1083 * Relative TTL of the incoming request.
1088 * Type of the incoming request.
1090 enum GNUNET_BLOCK_Type type;
1093 * Set to #GNUNET_YES if we are done handling the query.
1100 * Test if the query already exists. If so, merge it, otherwise
1101 * keep `finished` at #GNUNET_NO.
1103 * @param cls our `struct TestExistClosure`
1104 * @param hc the key of the query
1105 * @param value the existing `struct PeerRequest`.
1106 * @return #GNUNET_YES to continue to iterate,
1107 * #GNUNET_NO if we successfully merged
1110 test_exist_cb(void *cls,
1111 const struct GNUNET_HashCode *hc,
1114 struct TestExistClosure *tec = cls;
1115 struct PeerRequest *peerreq = value;
1116 struct GSF_PendingRequest *pr;
1117 struct GSF_PendingRequestData *prd;
1120 prd = GSF_pending_request_get_data_(pr);
1121 if (prd->type != tec->type)
1123 if (prd->ttl.abs_value_us >=
1124 GNUNET_TIME_absolute_get().abs_value_us + tec->ttl * 1000LL)
1126 /* existing request has higher TTL, drop new one! */
1127 prd->priority += tec->priority;
1128 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1129 "Have existing request with higher TTL, dropping new request.\n");
1130 GNUNET_STATISTICS_update(GSF_stats,
1132 ("# requests dropped due to higher-TTL request"),
1134 tec->finished = GNUNET_YES;
1137 /* existing request has lower TTL, drop old one! */
1138 tec->priority += prd->priority;
1139 free_pending_request(peerreq);
1140 GSF_pending_request_cancel_(pr,
1147 * Handle P2P "QUERY" message. Creates the pending request entry
1148 * and sets up all of the data structures to that we will
1149 * process replies properly. Does not initiate forwarding or
1150 * local database lookups.
1152 * @param cls the other peer involved (sender of the message)
1153 * @param gm the GET message
1156 handle_p2p_get(void *cls,
1157 const struct GetMessage *gm)
1159 struct GSF_ConnectedPeer *cps = cls;
1160 struct PeerRequest *peerreq;
1161 struct GSF_PendingRequest *pr;
1162 struct GSF_ConnectedPeer *cp;
1163 const struct GNUNET_PeerIdentity *target;
1164 enum GSF_PendingRequestOptions options;
1167 const struct GNUNET_PeerIdentity *opt;
1170 uint32_t ttl_decrement;
1171 struct TestExistClosure tec;
1172 GNUNET_PEER_Id spid;
1173 const struct GSF_PendingRequestData *prd;
1175 msize = ntohs(gm->header.size);
1176 tec.type = ntohl(gm->type);
1177 bm = ntohl(gm->hash_bitmap);
1185 opt = (const struct GNUNET_PeerIdentity *)&gm[1];
1186 bfsize = msize - sizeof(struct GetMessage) - bits * sizeof(struct GNUNET_PeerIdentity);
1187 GNUNET_STATISTICS_update(GSF_stats,
1189 ("# GET requests received (from other peers)"),
1192 GSF_cover_query_count++;
1193 bm = ntohl(gm->hash_bitmap);
1195 if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1196 cp = GSF_peer_get_(&opt[bits++]);
1201 if (0 != (bm & GET_MESSAGE_BIT_RETURN_TO))
1202 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1203 "Failed to find RETURN-TO peer `%s' in connection set. Dropping query.\n",
1204 GNUNET_i2s(&opt[bits - 1]));
1207 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1208 "Failed to find peer `%s' in connection set. Dropping query.\n",
1209 GNUNET_i2s(cps->ppd.peer));
1210 GNUNET_STATISTICS_update(GSF_stats,
1212 ("# requests dropped due to missing reverse route"),
1217 unsigned int queue_size = GNUNET_MQ_get_length(cp->mq);
1218 queue_size += cp->ppd.pending_replies + cp->delay_queue_size;
1219 if (queue_size > MAX_QUEUE_PER_PEER)
1221 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1222 "Peer `%s' has too many replies queued already. Dropping query.\n",
1223 GNUNET_i2s(cps->ppd.peer));
1224 GNUNET_STATISTICS_update(GSF_stats,
1225 gettext_noop("# requests dropped due to full reply queue"),
1230 /* note that we can really only check load here since otherwise
1231 * peers could find out that we are overloaded by not being
1232 * disconnected after sending us a malformed query... */
1233 tec.priority = bound_priority(ntohl(gm->priority),
1235 if (tec.priority < 0)
1237 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1238 "Dropping query from `%s', this peer is too busy.\n",
1239 GNUNET_i2s(cps->ppd.peer));
1242 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1243 "Received request for `%s' of type %u from peer `%s' with flags %u\n",
1244 GNUNET_h2s(&gm->query),
1245 (unsigned int)tec.type,
1246 GNUNET_i2s(cps->ppd.peer),
1250 (bm & GET_MESSAGE_BIT_TRANSMIT_TO)) ? (&opt[bits++]) : NULL;
1251 options = GSF_PRO_DEFAULTS;
1253 if ((GNUNET_LOAD_get_load(cp->ppd.transmission_delay) > 3 * (1 + tec.priority))
1254 || (GNUNET_LOAD_get_average(cp->ppd.transmission_delay) >
1255 GNUNET_CONSTANTS_MAX_CORK_DELAY.rel_value_us * 2 +
1256 GNUNET_LOAD_get_average(GSF_rt_entry_lifetime)))
1258 /* don't have BW to send to peer, or would likely take longer than we have for it,
1259 * so at best indirect the query */
1261 options |= GSF_PRO_FORWARD_ONLY;
1262 spid = GNUNET_PEER_intern(cps->ppd.peer);
1263 GNUNET_assert(0 != spid);
1265 tec.ttl = bound_ttl(ntohl(gm->ttl),
1267 /* decrement ttl (always) */
1269 2 * TTL_DECREMENT + GNUNET_CRYPTO_random_u32(GNUNET_CRYPTO_QUALITY_WEAK,
1271 if ((tec.ttl < 0) &&
1272 (((int32_t)(tec.ttl - ttl_decrement)) > 0))
1274 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1275 "Dropping query from `%s' due to TTL underflow (%d - %u).\n",
1276 GNUNET_i2s(cps->ppd.peer),
1279 GNUNET_STATISTICS_update(GSF_stats,
1281 ("# requests dropped due TTL underflow"), 1,
1283 /* integer underflow => drop (should be very rare)! */
1286 tec.ttl -= ttl_decrement;
1288 /* test if the request already exists */
1289 tec.finished = GNUNET_NO;
1290 GNUNET_CONTAINER_multihashmap_get_multiple(cp->request_map,
1294 if (GNUNET_YES == tec.finished)
1295 return; /* merged into existing request, we're done */
1297 peerreq = GNUNET_new(struct PeerRequest);
1299 pr = GSF_pending_request_create_(options,
1304 ? (const char *)&opt[bits]
1307 ntohl(gm->filter_mutator),
1309 (uint32_t)tec.priority,
1312 GNUNET_PEER_intern(cps->ppd.peer),
1313 NULL, 0, /* replies_seen */
1316 GNUNET_assert(NULL != pr);
1317 prd = GSF_pending_request_get_data_(pr);
1319 GNUNET_break(GNUNET_OK ==
1320 GNUNET_CONTAINER_multihashmap_put(cp->request_map,
1323 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
1324 GNUNET_STATISTICS_update(GSF_stats,
1325 gettext_noop("# P2P query messages received and processed"),
1328 GNUNET_STATISTICS_update(GSF_stats,
1329 gettext_noop("# P2P searches active"),
1332 GSF_pending_request_get_data_(pr)->has_started = GNUNET_YES;
1333 GSF_local_lookup_(pr,
1334 &GSF_consider_forwarding,
1340 * Transmit a message to the given peer as soon as possible.
1341 * If the peer disconnects before the transmission can happen,
1342 * the callback is invoked with a `NULL` @a buffer.
1344 * @param cp target peer
1345 * @param is_query is this a query (#GNUNET_YES) or content (#GNUNET_NO) or neither (#GNUNET_SYSERR)
1346 * @param priority how important is this request?
1347 * @param timeout when does this request timeout
1348 * @param size number of bytes we would like to send to the peer
1349 * @param env message to send
1352 GSF_peer_transmit_(struct GSF_ConnectedPeer *cp,
1355 struct GNUNET_MQ_Envelope *env)
1357 struct GSF_PeerTransmitHandle *pth;
1358 struct GSF_PeerTransmitHandle *pos;
1359 struct GSF_PeerTransmitHandle *prev;
1361 pth = GNUNET_new(struct GSF_PeerTransmitHandle);
1362 pth->transmission_request_start_time = GNUNET_TIME_absolute_get();
1364 pth->is_query = is_query;
1365 pth->priority = priority;
1367 /* insertion sort (by priority, descending) */
1370 while ((NULL != pos) && (pos->priority > priority))
1375 GNUNET_CONTAINER_DLL_insert_after(cp->pth_head,
1379 if (GNUNET_YES == is_query)
1380 cp->ppd.pending_queries++;
1381 else if (GNUNET_NO == is_query)
1382 cp->ppd.pending_replies++;
1383 schedule_transmission(pth);
1388 * Report on receiving a reply; update the performance record of the given peer.
1390 * @param cp responding peer (will be updated)
1391 * @param request_time time at which the original query was transmitted
1392 * @param request_priority priority of the original request
1395 GSF_peer_update_performance_(struct GSF_ConnectedPeer *cp,
1396 struct GNUNET_TIME_Absolute request_time,
1397 uint32_t request_priority)
1399 struct GNUNET_TIME_Relative delay;
1401 delay = GNUNET_TIME_absolute_get_duration(request_time);
1402 cp->ppd.avg_reply_delay.rel_value_us =
1403 (cp->ppd.avg_reply_delay.rel_value_us * (RUNAVG_DELAY_N - 1) +
1404 delay.rel_value_us) / RUNAVG_DELAY_N;
1405 cp->ppd.avg_priority =
1406 (cp->ppd.avg_priority * (RUNAVG_DELAY_N - 1) +
1407 request_priority) / RUNAVG_DELAY_N;
1412 * Report on receiving a reply in response to an initiating client.
1413 * Remember that this peer is good for this client.
1415 * @param cp responding peer (will be updated)
1416 * @param initiator_client local client on responsible for query
1419 GSF_peer_update_responder_client_(struct GSF_ConnectedPeer *cp,
1420 struct GSF_LocalClient *initiator_client)
1422 cp->ppd.last_client_replies[cp->last_client_replies_woff++ %
1423 CS2P_SUCCESS_LIST_SIZE] = initiator_client;
1428 * Report on receiving a reply in response to an initiating peer.
1429 * Remember that this peer is good for this initiating peer.
1431 * @param cp responding peer (will be updated)
1432 * @param initiator_peer other peer responsible for query
1435 GSF_peer_update_responder_peer_(struct GSF_ConnectedPeer *cp,
1436 const struct GSF_ConnectedPeer *initiator_peer)
1440 woff = cp->last_p2p_replies_woff % P2P_SUCCESS_LIST_SIZE;
1441 GNUNET_PEER_change_rc(cp->ppd.last_p2p_replies[woff], -1);
1442 cp->ppd.last_p2p_replies[woff] = initiator_peer->ppd.pid;
1443 GNUNET_PEER_change_rc(initiator_peer->ppd.pid, 1);
1444 cp->last_p2p_replies_woff = (woff + 1) % P2P_SUCCESS_LIST_SIZE;
1449 * Write peer-respect information to a file - flush the buffer entry!
1452 * @param key peer identity
1453 * @param value the `struct GSF_ConnectedPeer` to flush
1454 * @return #GNUNET_OK to continue iteration
1457 flush_respect(void *cls,
1458 const struct GNUNET_PeerIdentity *key,
1461 struct GSF_ConnectedPeer *cp = value;
1462 struct GNUNET_PeerIdentity pid;
1464 if (cp->ppd.respect == cp->disk_respect)
1465 return GNUNET_OK; /* unchanged */
1466 GNUNET_assert(0 != cp->ppd.pid);
1467 GNUNET_PEER_resolve(cp->ppd.pid, &pid);
1468 GNUNET_PEERSTORE_store(peerstore, "fs", &pid, "respect", &cp->ppd.respect,
1469 sizeof(cp->ppd.respect),
1470 GNUNET_TIME_UNIT_FOREVER_ABS,
1471 GNUNET_PEERSTORE_STOREOPTION_REPLACE,
1479 * A peer disconnected from us. Tear down the connected peer
1483 * @param peer identity of peer that disconnected
1484 * @param internal_cls the corresponding `struct GSF_ConnectedPeer`
1487 GSF_peer_disconnect_handler(void *cls,
1488 const struct GNUNET_PeerIdentity *peer,
1491 struct GSF_ConnectedPeer *cp = internal_cls;
1492 struct GSF_PeerTransmitHandle *pth;
1493 struct GSF_DelayedHandle *dh;
1496 return; /* must have been disconnect from core with
1497 * 'peer' == my_id, ignore */
1501 GNUNET_assert(GNUNET_YES ==
1502 GNUNET_CONTAINER_multipeermap_remove(cp_map,
1505 GNUNET_STATISTICS_set(GSF_stats,
1506 gettext_noop("# peers connected"),
1507 GNUNET_CONTAINER_multipeermap_size(cp_map),
1509 if (NULL != cp->respect_iterate_req)
1511 GNUNET_PEERSTORE_iterate_cancel(cp->respect_iterate_req);
1512 cp->respect_iterate_req = NULL;
1516 GNUNET_ATS_reserve_bandwidth_cancel(cp->rc);
1519 if (NULL != cp->rc_delay_task)
1521 GNUNET_SCHEDULER_cancel(cp->rc_delay_task);
1522 cp->rc_delay_task = NULL;
1524 GNUNET_CONTAINER_multihashmap_iterate(cp->request_map,
1525 &cancel_pending_request,
1527 GNUNET_CONTAINER_multihashmap_destroy(cp->request_map);
1528 cp->request_map = NULL;
1529 GSF_plan_notify_peer_disconnect_(cp);
1530 GNUNET_LOAD_value_free(cp->ppd.transmission_delay);
1531 GNUNET_PEER_decrement_rcs(cp->ppd.last_p2p_replies,
1532 P2P_SUCCESS_LIST_SIZE);
1533 memset(cp->ppd.last_p2p_replies,
1535 sizeof(cp->ppd.last_p2p_replies));
1537 while (NULL != (pth = cp->pth_head))
1539 GNUNET_CONTAINER_DLL_remove(cp->pth_head,
1542 if (GNUNET_YES == pth->is_query)
1543 GNUNET_assert(0 < cp->ppd.pending_queries--);
1544 else if (GNUNET_NO == pth->is_query)
1545 GNUNET_assert(0 < cp->ppd.pending_replies--);
1548 while (NULL != (dh = cp->delayed_head))
1550 GNUNET_CONTAINER_DLL_remove(cp->delayed_head,
1553 GNUNET_MQ_discard(dh->env);
1554 cp->delay_queue_size--;
1555 GNUNET_SCHEDULER_cancel(dh->delay_task);
1558 GNUNET_PEER_change_rc(cp->ppd.pid, -1);
1559 if (NULL != cp->mig_revive_task)
1561 GNUNET_SCHEDULER_cancel(cp->mig_revive_task);
1562 cp->mig_revive_task = NULL;
1564 GNUNET_break(0 == cp->ppd.pending_queries);
1565 GNUNET_break(0 == cp->ppd.pending_replies);
1571 * Closure for #call_iterator().
1573 struct IterationContext {
1575 * Function to call on each entry.
1577 GSF_ConnectedPeerIterator it;
1580 * Closure for @e it.
1587 * Function that calls the callback for each peer.
1589 * @param cls the `struct IterationContext *`
1590 * @param key identity of the peer
1591 * @param value the `struct GSF_ConnectedPeer *`
1592 * @return #GNUNET_YES to continue iteration
1595 call_iterator(void *cls,
1596 const struct GNUNET_PeerIdentity *key,
1599 struct IterationContext *ic = cls;
1600 struct GSF_ConnectedPeer *cp = value;
1610 * Iterate over all connected peers.
1612 * @param it function to call for each peer
1613 * @param it_cls closure for @a it
1616 GSF_iterate_connected_peers_(GSF_ConnectedPeerIterator it,
1619 struct IterationContext ic;
1623 GNUNET_CONTAINER_multipeermap_iterate(cp_map,
1630 * Obtain the identity of a connected peer.
1632 * @param cp peer to get identity of
1633 * @param id identity to set (written to)
1636 GSF_connected_peer_get_identity_(const struct GSF_ConnectedPeer *cp,
1637 struct GNUNET_PeerIdentity *id)
1639 GNUNET_assert(0 != cp->ppd.pid);
1640 GNUNET_PEER_resolve(cp->ppd.pid, id);
1645 * Obtain the identity of a connected peer.
1647 * @param cp peer to get identity of
1648 * @return reference to peer identity, valid until peer disconnects (!)
1650 const struct GNUNET_PeerIdentity *
1651 GSF_connected_peer_get_identity2_(const struct GSF_ConnectedPeer *cp)
1653 GNUNET_assert(0 != cp->ppd.pid);
1654 return GNUNET_PEER_resolve2(cp->ppd.pid);
1659 * Ask a peer to stop migrating data to us until the given point
1662 * @param cp peer to ask
1663 * @param block_time until when to block
1666 GSF_block_peer_migration_(struct GSF_ConnectedPeer *cp,
1667 struct GNUNET_TIME_Absolute block_time)
1669 struct GNUNET_MQ_Envelope *env;
1670 struct MigrationStopMessage *msm;
1672 if (cp->last_migration_block.abs_value_us > block_time.abs_value_us)
1674 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG,
1675 "Migration already blocked for another %s\n",
1676 GNUNET_STRINGS_relative_time_to_string(GNUNET_TIME_absolute_get_remaining
1677 (cp->last_migration_block), GNUNET_YES));
1678 return; /* already blocked */
1680 GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Asking to stop migration for %s\n",
1681 GNUNET_STRINGS_relative_time_to_string(GNUNET_TIME_absolute_get_remaining(block_time),
1683 cp->last_migration_block = block_time;
1684 env = GNUNET_MQ_msg(msm,
1685 GNUNET_MESSAGE_TYPE_FS_MIGRATION_STOP);
1686 msm->reserved = htonl(0);
1688 = GNUNET_TIME_relative_hton(GNUNET_TIME_absolute_get_remaining
1689 (cp->last_migration_block));
1690 GNUNET_STATISTICS_update(GSF_stats,
1691 gettext_noop("# migration stop messages sent"),
1694 GSF_peer_transmit_(cp,
1702 * Notify core about a preference we have for the given peer
1703 * (to allocate more resources towards it). The change will
1704 * be communicated the next time we reserve bandwidth with
1705 * core (not instantly).
1707 * @param cp peer to reserve bandwidth from
1708 * @param pref preference change
1711 GSF_connected_peer_change_preference_(struct GSF_ConnectedPeer *cp,
1714 cp->inc_preference += pref;
1719 * Call this method periodically to flush respect information to disk.
1721 * @param cls closure, not used
1724 cron_flush_respect(void *cls)
1727 GNUNET_CONTAINER_multipeermap_iterate(cp_map,
1730 fr_task = GNUNET_SCHEDULER_add_delayed_with_priority(RESPECT_FLUSH_FREQ,
1731 GNUNET_SCHEDULER_PRIORITY_HIGH,
1732 &cron_flush_respect, NULL);
1737 * Initialize peer management subsystem.
1740 GSF_connected_peer_init_()
1742 cp_map = GNUNET_CONTAINER_multipeermap_create(128, GNUNET_YES);
1743 peerstore = GNUNET_PEERSTORE_connect(GSF_cfg);
1744 fr_task = GNUNET_SCHEDULER_add_with_priority(GNUNET_SCHEDULER_PRIORITY_HIGH,
1745 &cron_flush_respect, NULL);
1750 * Shutdown peer management subsystem.
1753 GSF_connected_peer_done_()
1755 GNUNET_CONTAINER_multipeermap_iterate(cp_map,
1758 GNUNET_SCHEDULER_cancel(fr_task);
1760 GNUNET_CONTAINER_multipeermap_destroy(cp_map);
1762 GNUNET_PEERSTORE_disconnect(peerstore,
1768 * Iterator to remove references to LC entry.
1770 * @param cls the `struct GSF_LocalClient *` to look for
1771 * @param key current key code
1772 * @param value value in the hash map (peer entry)
1773 * @return #GNUNET_YES (we should continue to iterate)
1776 clean_local_client(void *cls,
1777 const struct GNUNET_PeerIdentity *key,
1780 const struct GSF_LocalClient *lc = cls;
1781 struct GSF_ConnectedPeer *cp = value;
1784 for (i = 0; i < CS2P_SUCCESS_LIST_SIZE; i++)
1785 if (cp->ppd.last_client_replies[i] == lc)
1786 cp->ppd.last_client_replies[i] = NULL;
1792 * Notification that a local client disconnected. Clean up all of our
1793 * references to the given handle.
1795 * @param lc handle to the local client (henceforth invalid)
1798 GSF_handle_local_client_disconnect_(const struct GSF_LocalClient *lc)
1801 return; /* already cleaned up */
1802 GNUNET_CONTAINER_multipeermap_iterate(cp_map,
1803 &clean_local_client,
1808 /* end of gnunet-service-fs_cp.c */