2 This file is part of GNUnet.
3 Copyright (C) 2010-2015 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
18 SPDX-License-Identifier: AGPL3.0-or-later
21 * @file ats/ats_api_scheduling.c
22 * @brief automatic transport selection and outbound bandwidth determination
23 * @author Christian Grothoff
24 * @author Matthias Wachs
27 * - we could avoid a linear scan over the
28 * active addresses in some cases, so if
29 * there is need, we can still optimize here
30 * - we might want to split off the logic to
31 * determine LAN vs. WAN, as it has nothing
32 * to do with accessing the ATS service.
35 #include "gnunet_ats_service.h"
39 * How frequently do we scan the interfaces for changes to the addresses?
41 #define INTERFACE_PROCESSING_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2)
43 #define LOG(kind, ...) GNUNET_log_from(kind, "ats-scheduling-api", __VA_ARGS__)
46 * Session ID we use if there is no session / slot.
52 * Information we track per address, incoming or outgoing. It also
53 * doesn't matter if we have a session, any address that ATS is
54 * allowed to suggest right now should be tracked.
56 struct GNUNET_ATS_AddressRecord {
58 * Scheduling handle this address record belongs to.
60 struct GNUNET_ATS_SchedulingHandle *sh;
65 struct GNUNET_HELLO_Address *address;
68 * Session handle. NULL if we have an address but no
69 * active session for this address.
71 struct GNUNET_ATS_Session *session;
74 * Performance data about the address.
76 struct GNUNET_ATS_PropertiesNBO properties;
79 * Which slot (index) in the session array does
80 * this record correspond to?
81 * FIXME: a linear search on this is really crappy!
82 * Maybe switch to a 64-bit global counter and be
83 * done with it? Or does that then cause too much
84 * trouble on the ATS-service side?
89 * We're about to destroy this address record, just ATS does
90 * not know this yet. Once ATS confirms its destruction,
98 * Handle to the ATS subsystem for bandwidth/transport scheduling information.
100 struct GNUNET_ATS_SchedulingHandle {
104 const struct GNUNET_CONFIGURATION_Handle *cfg;
107 * Callback to invoke on suggestions.
109 GNUNET_ATS_AddressSuggestionCallback suggest_cb;
112 * Closure for @e suggest_cb.
114 void *suggest_cb_cls;
117 * Message queue for sending requests to the ATS service.
119 struct GNUNET_MQ_Handle *mq;
122 * Array of session objects (we need to translate them to numbers and back
123 * for the protocol; the offset in the array is the session number on the
124 * network). Index 0 is always NULL and reserved to represent the NULL pointer.
125 * Unused entries are also NULL.
127 struct GNUNET_ATS_AddressRecord **session_array;
130 * Task to trigger reconnect.
132 struct GNUNET_SCHEDULER_Task *task;
135 * Reconnect backoff delay.
137 struct GNUNET_TIME_Relative backoff;
140 * Size of the @e session_array.
142 unsigned int session_array_size;
147 * Re-establish the connection to the ATS service.
149 * @param sh handle to use to re-connect.
152 reconnect(struct GNUNET_ATS_SchedulingHandle *sh);
156 * Re-establish the connection to the ATS service.
158 * @param cls handle to use to re-connect.
161 reconnect_task(void *cls)
163 struct GNUNET_ATS_SchedulingHandle *sh = cls;
171 * Disconnect from ATS and then reconnect.
173 * @param sh our handle
176 force_reconnect(struct GNUNET_ATS_SchedulingHandle *sh)
180 GNUNET_MQ_destroy(sh->mq);
183 sh->suggest_cb(sh->suggest_cb_cls,
185 GNUNET_BANDWIDTH_ZERO,
186 GNUNET_BANDWIDTH_ZERO);
187 sh->backoff = GNUNET_TIME_STD_BACKOFF(sh->backoff);
188 sh->task = GNUNET_SCHEDULER_add_delayed(sh->backoff,
195 * Find the session object corresponding to the given session ID.
197 * @param sh our handle
198 * @param session_id current session ID
199 * @param peer peer the session belongs to
200 * @return the session object (or NULL)
202 static struct GNUNET_ATS_AddressRecord *
203 find_session(struct GNUNET_ATS_SchedulingHandle *sh,
205 const struct GNUNET_PeerIdentity *peer)
207 struct GNUNET_ATS_AddressRecord *ar;
209 if (session_id >= sh->session_array_size)
216 ar = sh->session_array[session_id];
222 if (NULL == ar->address)
224 /* address was destroyed in the meantime, this can happen
225 as we communicate asynchronously with the ATS service. */
228 if (0 != GNUNET_memcmp(peer,
239 * Get an available session ID.
241 * @param sh our handle
242 * @return an unused slot, but never NOT_FOUND (0)
245 find_empty_session_slot(struct GNUNET_ATS_SchedulingHandle *sh)
250 GNUNET_assert(0 != sh->session_array_size);
252 while (((NOT_FOUND == off) ||
253 (NULL != sh->session_array[off % sh->session_array_size])) &&
254 (i < sh->session_array_size))
259 if ((NOT_FOUND != off % sh->session_array_size) &&
260 (NULL == sh->session_array[off % sh->session_array_size]))
262 i = sh->session_array_size;
263 GNUNET_array_grow(sh->session_array,
264 sh->session_array_size,
265 sh->session_array_size * 2);
271 * Get the ID for the given session object.
273 * @param sh our handle
274 * @param session session object
275 * @param address the address we are looking for
276 * @return the session id or NOT_FOUND for error
279 find_session_id(struct GNUNET_ATS_SchedulingHandle *sh,
280 struct GNUNET_ATS_Session *session,
281 const struct GNUNET_HELLO_Address *address)
290 for (i = 1; i < sh->session_array_size; i++)
291 if ((NULL != sh->session_array[i]) &&
292 (GNUNET_NO == sh->session_array[i]->in_destroy) &&
293 ((session == sh->session_array[i]->session) ||
294 (NULL == sh->session_array[i]->session)) &&
295 (0 == GNUNET_memcmp(&address->peer,
296 &sh->session_array[i]->address->peer)) &&
297 (0 == GNUNET_HELLO_address_cmp(address,
298 sh->session_array[i]->address)))
305 * Release the session slot from the session table (ATS service is
306 * also done using it).
308 * @param sh our handle
309 * @param session_id identifies session that is no longer valid
312 release_session(struct GNUNET_ATS_SchedulingHandle *sh,
315 struct GNUNET_ATS_AddressRecord *ar;
317 if (NOT_FOUND == session_id)
319 if (session_id >= sh->session_array_size)
325 /* this slot should have been removed from remove_session before */
326 ar = sh->session_array[session_id];
327 if (NULL != ar->session)
333 GNUNET_HELLO_address_free(ar->address);
335 sh->session_array[session_id] = NULL;
340 * Type of a function to call when we receive a session release
341 * message from the service.
343 * @param cls the `struct GNUNET_ATS_SchedulingHandle`
344 * @param srm message received
347 handle_ats_session_release(void *cls,
348 const struct GNUNET_ATS_SessionReleaseMessage *srm)
350 struct GNUNET_ATS_SchedulingHandle *sh = cls;
352 /* Note: peer field in srm not necessary right now,
353 but might be good to have in the future */
355 ntohl(srm->session_id));
360 * Type of a function to call when we receive a address suggestion
361 * message from the service.
363 * @param cls the `struct GNUNET_ATS_SchedulingHandle`
364 * @param m message received
367 handle_ats_address_suggestion(void *cls,
368 const struct AddressSuggestionMessage *m)
370 struct GNUNET_ATS_SchedulingHandle *sh = cls;
371 struct GNUNET_ATS_AddressRecord *ar;
374 session_id = ntohl(m->session_id);
381 ar = find_session(sh,
390 if (NULL == sh->suggest_cb)
392 if (GNUNET_YES == ar->in_destroy)
394 /* ignore suggestion, as this address is dying, unless BW is 0,
395 in that case signal 'disconnect' via BW 0 */
396 if ((0 == ntohl(m->bandwidth_out.value__)) &&
397 (0 == ntohl(m->bandwidth_in.value__)))
399 LOG(GNUNET_ERROR_TYPE_DEBUG,
400 "ATS suggests disconnect from peer `%s' with BW %u/%u\n",
401 GNUNET_i2s(&ar->address->peer),
402 (unsigned int)ntohl(m->bandwidth_out.value__),
403 (unsigned int)ntohl(m->bandwidth_in.value__));
404 sh->suggest_cb(sh->suggest_cb_cls,
413 if ((NULL == ar->session) &&
414 (GNUNET_HELLO_address_check_option(ar->address,
415 GNUNET_HELLO_ADDRESS_INFO_INBOUND)))
420 sh->backoff = GNUNET_TIME_UNIT_ZERO;
421 LOG(GNUNET_ERROR_TYPE_DEBUG,
422 "ATS suggests address slot %u for peer `%s' using plugin %s\n",
424 GNUNET_i2s(&ar->address->peer),
425 ar->address->transport_name);
426 sh->suggest_cb(sh->suggest_cb_cls,
436 * We encountered an error handling the MQ to the
437 * ATS service. Reconnect.
439 * @param cls the `struct GNUNET_ATS_SchedulingHandle`
440 * @param error details about the error
443 error_handler(void *cls,
444 enum GNUNET_MQ_Error error)
446 struct GNUNET_ATS_SchedulingHandle *sh = cls;
448 LOG(GNUNET_ERROR_TYPE_DEBUG,
449 "ATS connection died (code %d), reconnecting\n",
456 * Generate and transmit the `struct AddressAddMessage` for the given
459 * @param sh the scheduling handle to use for transmission
460 * @param ar the address to inform the ATS service about
463 send_add_address_message(struct GNUNET_ATS_SchedulingHandle *sh,
464 const struct GNUNET_ATS_AddressRecord *ar)
466 struct GNUNET_MQ_Envelope *ev;
467 struct AddressAddMessage *m;
473 return; /* disconnected, skip for now */
474 GNUNET_break(GNUNET_NT_UNSPECIFIED != ar->properties.scope);
475 namelen = strlen(ar->address->transport_name) + 1;
476 msize = ar->address->address_length + namelen;
477 ev = GNUNET_MQ_msg_extra(m, msize, GNUNET_MESSAGE_TYPE_ATS_ADDRESS_ADD);
478 m->peer = ar->address->peer;
479 m->address_length = htons(ar->address->address_length);
480 m->address_local_info = htonl((uint32_t)ar->address->local_info);
481 m->plugin_name_length = htons(namelen);
482 m->session_id = htonl(ar->slot);
483 m->properties = ar->properties;
485 LOG(GNUNET_ERROR_TYPE_DEBUG,
486 "Adding address for peer `%s', plugin `%s', session %p slot %u\n",
487 GNUNET_i2s(&ar->address->peer),
488 ar->address->transport_name,
493 ar->address->address,
494 ar->address->address_length);
495 if (NULL != ar->address->transport_name)
496 GNUNET_memcpy(&pm[ar->address->address_length],
497 ar->address->transport_name,
499 GNUNET_MQ_send(sh->mq, ev);
504 * Re-establish the connection to the ATS service.
506 * @param sh handle to use to re-connect.
509 reconnect(struct GNUNET_ATS_SchedulingHandle *sh)
511 struct GNUNET_MQ_MessageHandler handlers[] = {
512 GNUNET_MQ_hd_fixed_size(ats_session_release,
513 GNUNET_MESSAGE_TYPE_ATS_SESSION_RELEASE,
514 struct GNUNET_ATS_SessionReleaseMessage,
516 GNUNET_MQ_hd_fixed_size(ats_address_suggestion,
517 GNUNET_MESSAGE_TYPE_ATS_ADDRESS_SUGGESTION,
518 struct AddressSuggestionMessage,
520 GNUNET_MQ_handler_end()
522 struct GNUNET_MQ_Envelope *ev;
523 struct ClientStartMessage *init;
525 struct GNUNET_ATS_AddressRecord *ar;
527 GNUNET_assert(NULL == sh->mq);
528 sh->mq = GNUNET_CLIENT_connect(sh->cfg,
539 ev = GNUNET_MQ_msg(init,
540 GNUNET_MESSAGE_TYPE_ATS_START);
541 init->start_flag = htonl(START_FLAG_SCHEDULING);
542 GNUNET_MQ_send(sh->mq, ev);
545 for (i = 0; i < sh->session_array_size; i++)
547 ar = sh->session_array[i];
550 send_add_address_message(sh, ar);
558 * Initialize the ATS subsystem.
560 * @param cfg configuration to use
561 * @param suggest_cb notification to call whenever the suggestation changed
562 * @param suggest_cb_cls closure for @a suggest_cb
563 * @return ats context
565 struct GNUNET_ATS_SchedulingHandle *
566 GNUNET_ATS_scheduling_init(const struct GNUNET_CONFIGURATION_Handle *cfg,
567 GNUNET_ATS_AddressSuggestionCallback suggest_cb,
568 void *suggest_cb_cls)
570 struct GNUNET_ATS_SchedulingHandle *sh;
572 sh = GNUNET_new(struct GNUNET_ATS_SchedulingHandle);
574 sh->suggest_cb = suggest_cb;
575 sh->suggest_cb_cls = suggest_cb_cls;
576 GNUNET_array_grow(sh->session_array,
577 sh->session_array_size,
585 * Client is done with ATS scheduling, release resources.
587 * @param sh handle to release
590 GNUNET_ATS_scheduling_done(struct GNUNET_ATS_SchedulingHandle *sh)
592 struct GNUNET_ATS_AddressRecord *ar;
597 GNUNET_MQ_destroy(sh->mq);
600 if (NULL != sh->task)
602 GNUNET_SCHEDULER_cancel(sh->task);
605 for (i = 0; i < sh->session_array_size; i++)
607 if (NULL != (ar = sh->session_array[i]))
609 GNUNET_HELLO_address_free(ar->address);
611 sh->session_array[i] = NULL;
614 GNUNET_array_grow(sh->session_array,
615 sh->session_array_size,
622 * We have a new address ATS should know. Addresses have to be added
623 * with this function before they can be: updated, set in use and
627 * @param address the address
628 * @param session session handle, can be NULL
629 * @param prop performance data for the address
630 * @return handle to the address representation inside ATS, NULL
631 * on error (i.e. ATS knows this exact address already)
633 struct GNUNET_ATS_AddressRecord *
634 GNUNET_ATS_address_add(struct GNUNET_ATS_SchedulingHandle *sh,
635 const struct GNUNET_HELLO_Address *address,
636 struct GNUNET_ATS_Session *session,
637 const struct GNUNET_ATS_Properties *prop)
639 struct GNUNET_ATS_AddressRecord *ar;
646 /* we need a valid address */
650 GNUNET_break(GNUNET_NT_UNSPECIFIED != prop->scope);
651 namelen = strlen(address->transport_name) + 1;
652 msize = address->address_length + namelen;
653 if ((msize + sizeof(struct AddressUpdateMessage) >= GNUNET_MAX_MESSAGE_SIZE) ||
654 (address->address_length >= GNUNET_MAX_MESSAGE_SIZE) ||
655 (namelen >= GNUNET_MAX_MESSAGE_SIZE))
657 /* address too large for us, this should not happen */
667 /* Already existing, nothing todo, but this should not happen */
671 s = find_empty_session_slot(sh);
672 ar = GNUNET_new(struct GNUNET_ATS_AddressRecord);
675 ar->session = session;
676 ar->address = GNUNET_HELLO_address_copy(address);
677 GNUNET_ATS_properties_hton(&ar->properties,
679 sh->session_array[s] = ar;
680 send_add_address_message(sh, ar);
686 * An address was used to initiate a session.
688 * @param ar address record to update information for
689 * @param session session handle
692 GNUNET_ATS_address_add_session(struct GNUNET_ATS_AddressRecord *ar,
693 struct GNUNET_ATS_Session *session)
695 GNUNET_break(NULL == ar->session);
696 ar->session = session;
701 * A session was destroyed, disassociate it from the
702 * given address record. If this was an incoming
703 * addess, destroy the address as well.
705 * @param ar address record to update information for
706 * @param session session handle
707 * @return #GNUNET_YES if the @a ar was destroyed because
708 * it was an incoming address,
709 * #GNUNET_NO if the @ar was kept because we can
710 * use it still to establish a new session
713 GNUNET_ATS_address_del_session(struct GNUNET_ATS_AddressRecord *ar,
714 struct GNUNET_ATS_Session *session)
716 GNUNET_assert(session == ar->session);
718 if (GNUNET_HELLO_address_check_option(ar->address,
719 GNUNET_HELLO_ADDRESS_INFO_INBOUND))
721 GNUNET_ATS_address_destroy(ar);
729 * We have updated performance statistics for a given address. Note
730 * that this function can be called for addresses that are currently
731 * in use as well as addresses that are valid but not actively in use.
732 * Furthermore, the peer may not even be connected to us right now (in
733 * which case the call may be ignored or the information may be stored
734 * for later use). Update bandwidth assignments.
736 * @param ar address record to update information for
737 * @param prop performance data for the address
740 GNUNET_ATS_address_update(struct GNUNET_ATS_AddressRecord *ar,
741 const struct GNUNET_ATS_Properties *prop)
743 struct GNUNET_ATS_SchedulingHandle *sh = ar->sh;
744 struct GNUNET_MQ_Envelope *ev;
745 struct AddressUpdateMessage *m;
747 LOG(GNUNET_ERROR_TYPE_DEBUG,
748 "Updating address for peer `%s', plugin `%s', session %p slot %u\n",
749 GNUNET_i2s(&ar->address->peer),
750 ar->address->transport_name,
753 GNUNET_break(GNUNET_NT_UNSPECIFIED != prop->scope);
754 GNUNET_ATS_properties_hton(&ar->properties,
757 return; /* disconnected, skip for now */
758 ev = GNUNET_MQ_msg(m, GNUNET_MESSAGE_TYPE_ATS_ADDRESS_UPDATE);
759 m->session_id = htonl(ar->slot);
760 m->peer = ar->address->peer;
761 m->properties = ar->properties;
762 GNUNET_MQ_send(sh->mq,
768 * An address got destroyed, stop using it as a valid address.
770 * @param ar address to destroy
773 GNUNET_ATS_address_destroy(struct GNUNET_ATS_AddressRecord *ar)
775 struct GNUNET_ATS_SchedulingHandle *sh = ar->sh;
776 struct GNUNET_MQ_Envelope *ev;
777 struct AddressDestroyedMessage *m;
779 LOG(GNUNET_ERROR_TYPE_DEBUG,
780 "Deleting address for peer `%s', plugin `%s', slot %u session %p\n",
781 GNUNET_i2s(&ar->address->peer),
782 ar->address->transport_name,
785 GNUNET_break(NULL == ar->session);
787 ar->in_destroy = GNUNET_YES;
790 ev = GNUNET_MQ_msg(m, GNUNET_MESSAGE_TYPE_ATS_ADDRESS_DESTROYED);
791 m->session_id = htonl(ar->slot);
792 m->peer = ar->address->peer;
793 GNUNET_MQ_send(sh->mq, ev);
797 /* end of ats_api_scheduling.c */