2 This file is part of GNUnet.
3 Copyright (C) 2010-2015 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
19 * @file ats/ats_api_scheduling.c
20 * @brief automatic transport selection and outbound bandwidth determination
21 * @author Christian Grothoff
22 * @author Matthias Wachs
25 * - we could avoid a linear scan over the
26 * active addresses in some cases, so if
27 * there is need, we can still optimize here
28 * - we might want to split off the logic to
29 * determine LAN vs. WAN, as it has nothing
30 * to do with accessing the ATS service.
33 #include "gnunet_ats_service.h"
37 * How frequently do we scan the interfaces for changes to the addresses?
39 #define INTERFACE_PROCESSING_INTERVAL GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
41 #define LOG(kind,...) GNUNET_log_from(kind, "ats-scheduling-api", __VA_ARGS__)
44 * Session ID we use if there is no session / slot.
50 * Information we track per address, incoming or outgoing. It also
51 * doesn't matter if we have a session, any address that ATS is
52 * allowed to suggest right now should be tracked.
54 struct GNUNET_ATS_AddressRecord
58 * Scheduling handle this address record belongs to.
60 struct GNUNET_ATS_SchedulingHandle *sh;
65 struct GNUNET_HELLO_Address *address;
68 * Session handle. NULL if we have an address but no
69 * active session for this address.
71 struct GNUNET_ATS_Session *session;
74 * Performance data about the address.
76 struct GNUNET_ATS_PropertiesNBO properties;
79 * Which slot (index) in the session array does
80 * this record correspond to?
81 * FIXME: a linear search on this is really crappy!
82 * Maybe switch to a 64-bit global counter and be
83 * done with it? Or does that then cause too much
84 * trouble on the ATS-service side?
89 * We're about to destroy this address record, just ATS does
90 * not know this yet. Once ATS confirms its destruction,
98 * Handle to the ATS subsystem for bandwidth/transport scheduling information.
100 struct GNUNET_ATS_SchedulingHandle
106 const struct GNUNET_CONFIGURATION_Handle *cfg;
109 * Callback to invoke on suggestions.
111 GNUNET_ATS_AddressSuggestionCallback suggest_cb;
114 * Closure for @e suggest_cb.
116 void *suggest_cb_cls;
119 * Message queue for sending requests to the ATS service.
121 struct GNUNET_MQ_Handle *mq;
124 * Array of session objects (we need to translate them to numbers and back
125 * for the protocol; the offset in the array is the session number on the
126 * network). Index 0 is always NULL and reserved to represent the NULL pointer.
127 * Unused entries are also NULL.
129 struct GNUNET_ATS_AddressRecord **session_array;
132 * Task to trigger reconnect.
134 struct GNUNET_SCHEDULER_Task *task;
137 * Reconnect backoff delay.
139 struct GNUNET_TIME_Relative backoff;
142 * Size of the @e session_array.
144 unsigned int session_array_size;
150 * Re-establish the connection to the ATS service.
152 * @param sh handle to use to re-connect.
155 reconnect (struct GNUNET_ATS_SchedulingHandle *sh);
159 * Re-establish the connection to the ATS service.
161 * @param cls handle to use to re-connect.
164 reconnect_task (void *cls)
166 struct GNUNET_ATS_SchedulingHandle *sh = cls;
174 * Disconnect from ATS and then reconnect.
176 * @param sh our handle
179 force_reconnect (struct GNUNET_ATS_SchedulingHandle *sh)
183 GNUNET_MQ_destroy (sh->mq);
186 sh->suggest_cb (sh->suggest_cb_cls,
188 GNUNET_BANDWIDTH_ZERO,
189 GNUNET_BANDWIDTH_ZERO);
190 sh->backoff = GNUNET_TIME_STD_BACKOFF (sh->backoff);
191 sh->task = GNUNET_SCHEDULER_add_delayed (sh->backoff,
198 * Find the session object corresponding to the given session ID.
200 * @param sh our handle
201 * @param session_id current session ID
202 * @param peer peer the session belongs to
203 * @return the session object (or NULL)
205 static struct GNUNET_ATS_AddressRecord *
206 find_session (struct GNUNET_ATS_SchedulingHandle *sh,
208 const struct GNUNET_PeerIdentity *peer)
210 struct GNUNET_ATS_AddressRecord *ar;
212 if (session_id >= sh->session_array_size)
219 ar = sh->session_array[session_id];
225 if (NULL == ar->address)
227 /* address was destroyed in the meantime, this can happen
228 as we communicate asynchronously with the ATS service. */
231 if (0 != memcmp (peer,
233 sizeof (struct GNUNET_PeerIdentity)))
243 * Get an available session ID.
245 * @param sh our handle
246 * @return an unused slot, but never NOT_FOUND (0)
249 find_empty_session_slot (struct GNUNET_ATS_SchedulingHandle *sh)
254 GNUNET_assert (0 != sh->session_array_size);
256 while ( ( (NOT_FOUND == off) ||
257 (NULL != sh->session_array[off % sh->session_array_size]) ) &&
258 (i < sh->session_array_size) )
263 if ( (NOT_FOUND != off % sh->session_array_size) &&
264 (NULL == sh->session_array[off % sh->session_array_size]) )
266 i = sh->session_array_size;
267 GNUNET_array_grow (sh->session_array,
268 sh->session_array_size,
269 sh->session_array_size * 2);
275 * Get the ID for the given session object.
277 * @param sh our handle
278 * @param session session object
279 * @param address the address we are looking for
280 * @return the session id or NOT_FOUND for error
283 find_session_id (struct GNUNET_ATS_SchedulingHandle *sh,
284 struct GNUNET_ATS_Session *session,
285 const struct GNUNET_HELLO_Address *address)
294 for (i = 1; i < sh->session_array_size; i++)
295 if ( (NULL != sh->session_array[i]) &&
296 (GNUNET_NO == sh->session_array[i]->in_destroy) &&
297 ( (session == sh->session_array[i]->session) ||
298 (NULL == sh->session_array[i]->session) ) &&
299 (0 == memcmp (&address->peer,
300 &sh->session_array[i]->address->peer,
301 sizeof (struct GNUNET_PeerIdentity))) &&
302 (0 == GNUNET_HELLO_address_cmp (address,
303 sh->session_array[i]->address)) )
310 * Release the session slot from the session table (ATS service is
311 * also done using it).
313 * @param sh our handle
314 * @param session_id identifies session that is no longer valid
317 release_session (struct GNUNET_ATS_SchedulingHandle *sh,
320 struct GNUNET_ATS_AddressRecord *ar;
322 if (NOT_FOUND == session_id)
324 if (session_id >= sh->session_array_size)
327 force_reconnect (sh);
330 /* this slot should have been removed from remove_session before */
331 ar = sh->session_array[session_id];
332 if (NULL != ar->session)
335 force_reconnect (sh);
338 GNUNET_HELLO_address_free (ar->address);
340 sh->session_array[session_id] = NULL;
345 * Type of a function to call when we receive a session release
346 * message from the service.
348 * @param cls the `struct GNUNET_ATS_SchedulingHandle`
349 * @param srm message received
352 handle_ats_session_release (void *cls,
353 const struct GNUNET_ATS_SessionReleaseMessage *srm)
355 struct GNUNET_ATS_SchedulingHandle *sh = cls;
357 /* Note: peer field in srm not necessary right now,
358 but might be good to have in the future */
360 ntohl (srm->session_id));
365 * Type of a function to call when we receive a address suggestion
366 * message from the service.
368 * @param cls the `struct GNUNET_ATS_SchedulingHandle`
369 * @param m message received
372 handle_ats_address_suggestion (void *cls,
373 const struct AddressSuggestionMessage *m)
375 struct GNUNET_ATS_SchedulingHandle *sh = cls;
376 struct GNUNET_ATS_AddressRecord *ar;
379 session_id = ntohl (m->session_id);
383 force_reconnect (sh);
386 ar = find_session (sh,
392 force_reconnect (sh);
395 if (NULL == sh->suggest_cb)
397 if (GNUNET_YES == ar->in_destroy)
399 /* ignore suggestion, as this address is dying, unless BW is 0,
400 in that case signal 'disconnect' via BW 0 */
401 if ( (0 == ntohl (m->bandwidth_out.value__)) &&
402 (0 == ntohl (m->bandwidth_in.value__)) )
404 LOG (GNUNET_ERROR_TYPE_DEBUG,
405 "ATS suggests disconnect from peer `%s' with BW %u/%u\n",
406 GNUNET_i2s (&ar->address->peer),
407 (unsigned int) ntohl (m->bandwidth_out.value__),
408 (unsigned int) ntohl (m->bandwidth_in.value__));
409 sh->suggest_cb (sh->suggest_cb_cls,
418 if ( (NULL == ar->session) &&
419 (GNUNET_HELLO_address_check_option (ar->address,
420 GNUNET_HELLO_ADDRESS_INFO_INBOUND)) )
425 sh->backoff = GNUNET_TIME_UNIT_ZERO;
426 LOG (GNUNET_ERROR_TYPE_DEBUG,
427 "ATS suggests address slot %u for peer `%s' using plugin %s\n",
429 GNUNET_i2s (&ar->address->peer),
430 ar->address->transport_name);
431 sh->suggest_cb (sh->suggest_cb_cls,
441 * We encountered an error handling the MQ to the
442 * ATS service. Reconnect.
444 * @param cls the `struct GNUNET_ATS_SchedulingHandle`
445 * @param error details about the error
448 error_handler (void *cls,
449 enum GNUNET_MQ_Error error)
451 struct GNUNET_ATS_SchedulingHandle *sh = cls;
453 LOG (GNUNET_ERROR_TYPE_DEBUG,
454 "ATS connection died (code %d), reconnecting\n",
456 force_reconnect (sh);
461 * Generate and transmit the `struct AddressAddMessage` for the given
464 * @param sh the scheduling handle to use for transmission
465 * @param ar the address to inform the ATS service about
468 send_add_address_message (struct GNUNET_ATS_SchedulingHandle *sh,
469 const struct GNUNET_ATS_AddressRecord *ar)
471 struct GNUNET_MQ_Envelope *ev;
472 struct AddressAddMessage *m;
478 return; /* disconnected, skip for now */
479 GNUNET_break (GNUNET_ATS_NET_UNSPECIFIED != ar->properties.scope);
480 namelen = strlen (ar->address->transport_name) + 1;
481 msize = ar->address->address_length + namelen;
482 ev = GNUNET_MQ_msg_extra (m, msize, GNUNET_MESSAGE_TYPE_ATS_ADDRESS_ADD);
483 m->peer = ar->address->peer;
484 m->address_length = htons (ar->address->address_length);
485 m->address_local_info = htonl ((uint32_t) ar->address->local_info);
486 m->plugin_name_length = htons (namelen);
487 m->session_id = htonl (ar->slot);
488 m->properties = ar->properties;
490 LOG (GNUNET_ERROR_TYPE_DEBUG,
491 "Adding address for peer `%s', plugin `%s', session %p slot %u\n",
492 GNUNET_i2s (&ar->address->peer),
493 ar->address->transport_name,
498 ar->address->address,
499 ar->address->address_length);
500 if (NULL != ar->address->transport_name)
501 GNUNET_memcpy (&pm[ar->address->address_length],
502 ar->address->transport_name,
504 GNUNET_MQ_send (sh->mq, ev);
509 * Re-establish the connection to the ATS service.
511 * @param sh handle to use to re-connect.
514 reconnect (struct GNUNET_ATS_SchedulingHandle *sh)
516 struct GNUNET_MQ_MessageHandler handlers[] = {
517 GNUNET_MQ_hd_fixed_size (ats_session_release,
518 GNUNET_MESSAGE_TYPE_ATS_SESSION_RELEASE,
519 struct GNUNET_ATS_SessionReleaseMessage,
521 GNUNET_MQ_hd_fixed_size (ats_address_suggestion,
522 GNUNET_MESSAGE_TYPE_ATS_ADDRESS_SUGGESTION,
523 struct AddressSuggestionMessage,
525 GNUNET_MQ_handler_end ()
527 struct GNUNET_MQ_Envelope *ev;
528 struct ClientStartMessage *init;
530 struct GNUNET_ATS_AddressRecord *ar;
532 GNUNET_assert (NULL == sh->mq);
533 sh->mq = GNUNET_CLIENT_connect (sh->cfg,
541 force_reconnect (sh);
544 ev = GNUNET_MQ_msg (init,
545 GNUNET_MESSAGE_TYPE_ATS_START);
546 init->start_flag = htonl (START_FLAG_SCHEDULING);
547 GNUNET_MQ_send (sh->mq, ev);
550 for (i=0;i<sh->session_array_size;i++)
552 ar = sh->session_array[i];
555 send_add_address_message (sh, ar);
563 * Initialize the ATS subsystem.
565 * @param cfg configuration to use
566 * @param suggest_cb notification to call whenever the suggestation changed
567 * @param suggest_cb_cls closure for @a suggest_cb
568 * @return ats context
570 struct GNUNET_ATS_SchedulingHandle *
571 GNUNET_ATS_scheduling_init (const struct GNUNET_CONFIGURATION_Handle *cfg,
572 GNUNET_ATS_AddressSuggestionCallback suggest_cb,
573 void *suggest_cb_cls)
575 struct GNUNET_ATS_SchedulingHandle *sh;
577 sh = GNUNET_new (struct GNUNET_ATS_SchedulingHandle);
579 sh->suggest_cb = suggest_cb;
580 sh->suggest_cb_cls = suggest_cb_cls;
581 GNUNET_array_grow (sh->session_array,
582 sh->session_array_size,
590 * Client is done with ATS scheduling, release resources.
592 * @param sh handle to release
595 GNUNET_ATS_scheduling_done (struct GNUNET_ATS_SchedulingHandle *sh)
597 struct GNUNET_ATS_AddressRecord *ar;
602 GNUNET_MQ_destroy (sh->mq);
605 if (NULL != sh->task)
607 GNUNET_SCHEDULER_cancel (sh->task);
610 for (i=0;i<sh->session_array_size;i++)
612 if (NULL != (ar = sh->session_array[i]))
614 GNUNET_HELLO_address_free (ar->address);
616 sh->session_array[i] = NULL;
619 GNUNET_array_grow (sh->session_array,
620 sh->session_array_size,
627 * We have a new address ATS should know. Addresses have to be added
628 * with this function before they can be: updated, set in use and
632 * @param address the address
633 * @param session session handle, can be NULL
634 * @param prop performance data for the address
635 * @return handle to the address representation inside ATS, NULL
636 * on error (i.e. ATS knows this exact address already)
638 struct GNUNET_ATS_AddressRecord *
639 GNUNET_ATS_address_add (struct GNUNET_ATS_SchedulingHandle *sh,
640 const struct GNUNET_HELLO_Address *address,
641 struct GNUNET_ATS_Session *session,
642 const struct GNUNET_ATS_Properties *prop)
644 struct GNUNET_ATS_AddressRecord *ar;
651 /* we need a valid address */
655 GNUNET_break (GNUNET_ATS_NET_UNSPECIFIED != prop->scope);
656 namelen = strlen (address->transport_name) + 1;
657 msize = address->address_length + namelen;
658 if ((msize + sizeof (struct AddressUpdateMessage) >= GNUNET_MAX_MESSAGE_SIZE) ||
659 (address->address_length >= GNUNET_MAX_MESSAGE_SIZE) ||
660 (namelen >= GNUNET_MAX_MESSAGE_SIZE) )
662 /* address too large for us, this should not happen */
672 /* Already existing, nothing todo, but this should not happen */
676 s = find_empty_session_slot (sh);
677 ar = GNUNET_new (struct GNUNET_ATS_AddressRecord);
680 ar->session = session;
681 ar->address = GNUNET_HELLO_address_copy (address);
682 GNUNET_ATS_properties_hton (&ar->properties,
684 sh->session_array[s] = ar;
685 send_add_address_message (sh, ar);
691 * An address was used to initiate a session.
693 * @param ar address record to update information for
694 * @param session session handle
697 GNUNET_ATS_address_add_session (struct GNUNET_ATS_AddressRecord *ar,
698 struct GNUNET_ATS_Session *session)
700 GNUNET_break (NULL == ar->session);
701 ar->session = session;
706 * A session was destroyed, disassociate it from the
707 * given address record. If this was an incoming
708 * addess, destroy the address as well.
710 * @param ar address record to update information for
711 * @param session session handle
712 * @return #GNUNET_YES if the @a ar was destroyed because
713 * it was an incoming address,
714 * #GNUNET_NO if the @ar was kept because we can
715 * use it still to establish a new session
718 GNUNET_ATS_address_del_session (struct GNUNET_ATS_AddressRecord *ar,
719 struct GNUNET_ATS_Session *session)
721 GNUNET_assert (session == ar->session);
723 if (GNUNET_HELLO_address_check_option (ar->address,
724 GNUNET_HELLO_ADDRESS_INFO_INBOUND))
726 GNUNET_ATS_address_destroy (ar);
734 * We have updated performance statistics for a given address. Note
735 * that this function can be called for addresses that are currently
736 * in use as well as addresses that are valid but not actively in use.
737 * Furthermore, the peer may not even be connected to us right now (in
738 * which case the call may be ignored or the information may be stored
739 * for later use). Update bandwidth assignments.
741 * @param ar address record to update information for
742 * @param prop performance data for the address
745 GNUNET_ATS_address_update (struct GNUNET_ATS_AddressRecord *ar,
746 const struct GNUNET_ATS_Properties *prop)
748 struct GNUNET_ATS_SchedulingHandle *sh = ar->sh;
749 struct GNUNET_MQ_Envelope *ev;
750 struct AddressUpdateMessage *m;
752 LOG (GNUNET_ERROR_TYPE_DEBUG,
753 "Updating address for peer `%s', plugin `%s', session %p slot %u\n",
754 GNUNET_i2s (&ar->address->peer),
755 ar->address->transport_name,
758 GNUNET_break (GNUNET_ATS_NET_UNSPECIFIED != prop->scope);
759 GNUNET_ATS_properties_hton (&ar->properties,
762 return; /* disconnected, skip for now */
763 ev = GNUNET_MQ_msg (m, GNUNET_MESSAGE_TYPE_ATS_ADDRESS_UPDATE);
764 m->session_id = htonl (ar->slot);
765 m->peer = ar->address->peer;
766 m->properties = ar->properties;
767 GNUNET_MQ_send (sh->mq,
773 * An address got destroyed, stop using it as a valid address.
775 * @param ar address to destroy
778 GNUNET_ATS_address_destroy (struct GNUNET_ATS_AddressRecord *ar)
780 struct GNUNET_ATS_SchedulingHandle *sh = ar->sh;
781 struct GNUNET_MQ_Envelope *ev;
782 struct AddressDestroyedMessage *m;
784 LOG (GNUNET_ERROR_TYPE_DEBUG,
785 "Deleting address for peer `%s', plugin `%s', slot %u session %p\n",
786 GNUNET_i2s (&ar->address->peer),
787 ar->address->transport_name,
790 GNUNET_break (NULL == ar->session);
792 ar->in_destroy = GNUNET_YES;
795 ev = GNUNET_MQ_msg (m, GNUNET_MESSAGE_TYPE_ATS_ADDRESS_DESTROYED);
796 m->session_id = htonl (ar->slot);
797 m->peer = ar->address->peer;
798 GNUNET_MQ_send (sh->mq, ev);
802 /* end of ats_api_scheduling.c */