2 This file is part of GNUnet.
3 Copyright (C) 2010-2015 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
16 * @file ats/ats_api_scheduling.c
17 * @brief automatic transport selection and outbound bandwidth determination
18 * @author Christian Grothoff
19 * @author Matthias Wachs
22 * - we could avoid a linear scan over the
23 * active addresses in some cases, so if
24 * there is need, we can still optimize here
25 * - we might want to split off the logic to
26 * determine LAN vs. WAN, as it has nothing
27 * to do with accessing the ATS service.
30 #include "gnunet_ats_service.h"
34 * How frequently do we scan the interfaces for changes to the addresses?
36 #define INTERFACE_PROCESSING_INTERVAL GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
38 #define LOG(kind,...) GNUNET_log_from(kind, "ats-scheduling-api", __VA_ARGS__)
41 * Session ID we use if there is no session / slot.
47 * Information we track per address, incoming or outgoing. It also
48 * doesn't matter if we have a session, any address that ATS is
49 * allowed to suggest right now should be tracked.
51 struct GNUNET_ATS_AddressRecord
55 * Scheduling handle this address record belongs to.
57 struct GNUNET_ATS_SchedulingHandle *sh;
62 struct GNUNET_HELLO_Address *address;
65 * Session handle. NULL if we have an address but no
66 * active session for this address.
68 struct GNUNET_ATS_Session *session;
71 * Performance data about the address.
73 struct GNUNET_ATS_PropertiesNBO properties;
76 * Which slot (index) in the session array does
77 * this record correspond to?
78 * FIXME: a linear search on this is really crappy!
79 * Maybe switch to a 64-bit global counter and be
80 * done with it? Or does that then cause too much
81 * trouble on the ATS-service side?
86 * We're about to destroy this address record, just ATS does
87 * not know this yet. Once ATS confirms its destruction,
95 * Handle to the ATS subsystem for bandwidth/transport scheduling information.
97 struct GNUNET_ATS_SchedulingHandle
103 const struct GNUNET_CONFIGURATION_Handle *cfg;
106 * Callback to invoke on suggestions.
108 GNUNET_ATS_AddressSuggestionCallback suggest_cb;
111 * Closure for @e suggest_cb.
113 void *suggest_cb_cls;
116 * Message queue for sending requests to the ATS service.
118 struct GNUNET_MQ_Handle *mq;
121 * Array of session objects (we need to translate them to numbers and back
122 * for the protocol; the offset in the array is the session number on the
123 * network). Index 0 is always NULL and reserved to represent the NULL pointer.
124 * Unused entries are also NULL.
126 struct GNUNET_ATS_AddressRecord **session_array;
129 * Task to trigger reconnect.
131 struct GNUNET_SCHEDULER_Task *task;
134 * Reconnect backoff delay.
136 struct GNUNET_TIME_Relative backoff;
139 * Size of the @e session_array.
141 unsigned int session_array_size;
147 * Re-establish the connection to the ATS service.
149 * @param sh handle to use to re-connect.
152 reconnect (struct GNUNET_ATS_SchedulingHandle *sh);
156 * Re-establish the connection to the ATS service.
158 * @param cls handle to use to re-connect.
161 reconnect_task (void *cls)
163 struct GNUNET_ATS_SchedulingHandle *sh = cls;
171 * Disconnect from ATS and then reconnect.
173 * @param sh our handle
176 force_reconnect (struct GNUNET_ATS_SchedulingHandle *sh)
180 GNUNET_MQ_destroy (sh->mq);
183 sh->suggest_cb (sh->suggest_cb_cls,
185 GNUNET_BANDWIDTH_ZERO,
186 GNUNET_BANDWIDTH_ZERO);
187 sh->backoff = GNUNET_TIME_STD_BACKOFF (sh->backoff);
188 sh->task = GNUNET_SCHEDULER_add_delayed (sh->backoff,
195 * Find the session object corresponding to the given session ID.
197 * @param sh our handle
198 * @param session_id current session ID
199 * @param peer peer the session belongs to
200 * @return the session object (or NULL)
202 static struct GNUNET_ATS_AddressRecord *
203 find_session (struct GNUNET_ATS_SchedulingHandle *sh,
205 const struct GNUNET_PeerIdentity *peer)
207 struct GNUNET_ATS_AddressRecord *ar;
209 if (session_id >= sh->session_array_size)
216 ar = sh->session_array[session_id];
222 if (NULL == ar->address)
224 /* address was destroyed in the meantime, this can happen
225 as we communicate asynchronously with the ATS service. */
228 if (0 != memcmp (peer,
230 sizeof (struct GNUNET_PeerIdentity)))
240 * Get an available session ID.
242 * @param sh our handle
243 * @return an unused slot, but never NOT_FOUND (0)
246 find_empty_session_slot (struct GNUNET_ATS_SchedulingHandle *sh)
251 GNUNET_assert (0 != sh->session_array_size);
253 while ( ( (NOT_FOUND == off) ||
254 (NULL != sh->session_array[off % sh->session_array_size]) ) &&
255 (i < sh->session_array_size) )
260 if ( (NOT_FOUND != off % sh->session_array_size) &&
261 (NULL == sh->session_array[off % sh->session_array_size]) )
263 i = sh->session_array_size;
264 GNUNET_array_grow (sh->session_array,
265 sh->session_array_size,
266 sh->session_array_size * 2);
272 * Get the ID for the given session object.
274 * @param sh our handle
275 * @param session session object
276 * @param address the address we are looking for
277 * @return the session id or NOT_FOUND for error
280 find_session_id (struct GNUNET_ATS_SchedulingHandle *sh,
281 struct GNUNET_ATS_Session *session,
282 const struct GNUNET_HELLO_Address *address)
291 for (i = 1; i < sh->session_array_size; i++)
292 if ( (NULL != sh->session_array[i]) &&
293 (GNUNET_NO == sh->session_array[i]->in_destroy) &&
294 ( (session == sh->session_array[i]->session) ||
295 (NULL == sh->session_array[i]->session) ) &&
296 (0 == memcmp (&address->peer,
297 &sh->session_array[i]->address->peer,
298 sizeof (struct GNUNET_PeerIdentity))) &&
299 (0 == GNUNET_HELLO_address_cmp (address,
300 sh->session_array[i]->address)) )
307 * Release the session slot from the session table (ATS service is
308 * also done using it).
310 * @param sh our handle
311 * @param session_id identifies session that is no longer valid
314 release_session (struct GNUNET_ATS_SchedulingHandle *sh,
317 struct GNUNET_ATS_AddressRecord *ar;
319 if (NOT_FOUND == session_id)
321 if (session_id >= sh->session_array_size)
324 force_reconnect (sh);
327 /* this slot should have been removed from remove_session before */
328 ar = sh->session_array[session_id];
329 if (NULL != ar->session)
332 force_reconnect (sh);
335 GNUNET_HELLO_address_free (ar->address);
337 sh->session_array[session_id] = NULL;
342 * Type of a function to call when we receive a session release
343 * message from the service.
345 * @param cls the `struct GNUNET_ATS_SchedulingHandle`
346 * @param srm message received
349 handle_ats_session_release (void *cls,
350 const struct GNUNET_ATS_SessionReleaseMessage *srm)
352 struct GNUNET_ATS_SchedulingHandle *sh = cls;
354 /* Note: peer field in srm not necessary right now,
355 but might be good to have in the future */
357 ntohl (srm->session_id));
362 * Type of a function to call when we receive a address suggestion
363 * message from the service.
365 * @param cls the `struct GNUNET_ATS_SchedulingHandle`
366 * @param m message received
369 handle_ats_address_suggestion (void *cls,
370 const struct AddressSuggestionMessage *m)
372 struct GNUNET_ATS_SchedulingHandle *sh = cls;
373 struct GNUNET_ATS_AddressRecord *ar;
376 session_id = ntohl (m->session_id);
380 force_reconnect (sh);
383 ar = find_session (sh,
389 force_reconnect (sh);
392 if (NULL == sh->suggest_cb)
394 if (GNUNET_YES == ar->in_destroy)
396 /* ignore suggestion, as this address is dying, unless BW is 0,
397 in that case signal 'disconnect' via BW 0 */
398 if ( (0 == ntohl (m->bandwidth_out.value__)) &&
399 (0 == ntohl (m->bandwidth_in.value__)) )
401 LOG (GNUNET_ERROR_TYPE_DEBUG,
402 "ATS suggests disconnect from peer `%s' with BW %u/%u\n",
403 GNUNET_i2s (&ar->address->peer),
404 (unsigned int) ntohl (m->bandwidth_out.value__),
405 (unsigned int) ntohl (m->bandwidth_in.value__));
406 sh->suggest_cb (sh->suggest_cb_cls,
415 if ( (NULL == ar->session) &&
416 (GNUNET_HELLO_address_check_option (ar->address,
417 GNUNET_HELLO_ADDRESS_INFO_INBOUND)) )
422 sh->backoff = GNUNET_TIME_UNIT_ZERO;
423 LOG (GNUNET_ERROR_TYPE_DEBUG,
424 "ATS suggests address slot %u for peer `%s' using plugin %s\n",
426 GNUNET_i2s (&ar->address->peer),
427 ar->address->transport_name);
428 sh->suggest_cb (sh->suggest_cb_cls,
438 * We encountered an error handling the MQ to the
439 * ATS service. Reconnect.
441 * @param cls the `struct GNUNET_ATS_SchedulingHandle`
442 * @param error details about the error
445 error_handler (void *cls,
446 enum GNUNET_MQ_Error error)
448 struct GNUNET_ATS_SchedulingHandle *sh = cls;
450 LOG (GNUNET_ERROR_TYPE_DEBUG,
451 "ATS connection died (code %d), reconnecting\n",
453 force_reconnect (sh);
458 * Generate and transmit the `struct AddressAddMessage` for the given
461 * @param sh the scheduling handle to use for transmission
462 * @param ar the address to inform the ATS service about
465 send_add_address_message (struct GNUNET_ATS_SchedulingHandle *sh,
466 const struct GNUNET_ATS_AddressRecord *ar)
468 struct GNUNET_MQ_Envelope *ev;
469 struct AddressAddMessage *m;
475 return; /* disconnected, skip for now */
476 GNUNET_break (GNUNET_ATS_NET_UNSPECIFIED != ar->properties.scope);
477 namelen = strlen (ar->address->transport_name) + 1;
478 msize = ar->address->address_length + namelen;
479 ev = GNUNET_MQ_msg_extra (m, msize, GNUNET_MESSAGE_TYPE_ATS_ADDRESS_ADD);
480 m->peer = ar->address->peer;
481 m->address_length = htons (ar->address->address_length);
482 m->address_local_info = htonl ((uint32_t) ar->address->local_info);
483 m->plugin_name_length = htons (namelen);
484 m->session_id = htonl (ar->slot);
485 m->properties = ar->properties;
487 LOG (GNUNET_ERROR_TYPE_DEBUG,
488 "Adding address for peer `%s', plugin `%s', session %p slot %u\n",
489 GNUNET_i2s (&ar->address->peer),
490 ar->address->transport_name,
495 ar->address->address,
496 ar->address->address_length);
497 if (NULL != ar->address->transport_name)
498 GNUNET_memcpy (&pm[ar->address->address_length],
499 ar->address->transport_name,
501 GNUNET_MQ_send (sh->mq, ev);
506 * Re-establish the connection to the ATS service.
508 * @param sh handle to use to re-connect.
511 reconnect (struct GNUNET_ATS_SchedulingHandle *sh)
513 struct GNUNET_MQ_MessageHandler handlers[] = {
514 GNUNET_MQ_hd_fixed_size (ats_session_release,
515 GNUNET_MESSAGE_TYPE_ATS_SESSION_RELEASE,
516 struct GNUNET_ATS_SessionReleaseMessage,
518 GNUNET_MQ_hd_fixed_size (ats_address_suggestion,
519 GNUNET_MESSAGE_TYPE_ATS_ADDRESS_SUGGESTION,
520 struct AddressSuggestionMessage,
522 GNUNET_MQ_handler_end ()
524 struct GNUNET_MQ_Envelope *ev;
525 struct ClientStartMessage *init;
527 struct GNUNET_ATS_AddressRecord *ar;
529 GNUNET_assert (NULL == sh->mq);
530 sh->mq = GNUNET_CLIENT_connect (sh->cfg,
538 force_reconnect (sh);
541 ev = GNUNET_MQ_msg (init,
542 GNUNET_MESSAGE_TYPE_ATS_START);
543 init->start_flag = htonl (START_FLAG_SCHEDULING);
544 GNUNET_MQ_send (sh->mq, ev);
547 for (i=0;i<sh->session_array_size;i++)
549 ar = sh->session_array[i];
552 send_add_address_message (sh, ar);
560 * Initialize the ATS subsystem.
562 * @param cfg configuration to use
563 * @param suggest_cb notification to call whenever the suggestation changed
564 * @param suggest_cb_cls closure for @a suggest_cb
565 * @return ats context
567 struct GNUNET_ATS_SchedulingHandle *
568 GNUNET_ATS_scheduling_init (const struct GNUNET_CONFIGURATION_Handle *cfg,
569 GNUNET_ATS_AddressSuggestionCallback suggest_cb,
570 void *suggest_cb_cls)
572 struct GNUNET_ATS_SchedulingHandle *sh;
574 sh = GNUNET_new (struct GNUNET_ATS_SchedulingHandle);
576 sh->suggest_cb = suggest_cb;
577 sh->suggest_cb_cls = suggest_cb_cls;
578 GNUNET_array_grow (sh->session_array,
579 sh->session_array_size,
587 * Client is done with ATS scheduling, release resources.
589 * @param sh handle to release
592 GNUNET_ATS_scheduling_done (struct GNUNET_ATS_SchedulingHandle *sh)
594 struct GNUNET_ATS_AddressRecord *ar;
599 GNUNET_MQ_destroy (sh->mq);
602 if (NULL != sh->task)
604 GNUNET_SCHEDULER_cancel (sh->task);
607 for (i=0;i<sh->session_array_size;i++)
609 if (NULL != (ar = sh->session_array[i]))
611 GNUNET_HELLO_address_free (ar->address);
613 sh->session_array[i] = NULL;
616 GNUNET_array_grow (sh->session_array,
617 sh->session_array_size,
624 * We have a new address ATS should know. Addresses have to be added
625 * with this function before they can be: updated, set in use and
629 * @param address the address
630 * @param session session handle, can be NULL
631 * @param prop performance data for the address
632 * @return handle to the address representation inside ATS, NULL
633 * on error (i.e. ATS knows this exact address already)
635 struct GNUNET_ATS_AddressRecord *
636 GNUNET_ATS_address_add (struct GNUNET_ATS_SchedulingHandle *sh,
637 const struct GNUNET_HELLO_Address *address,
638 struct GNUNET_ATS_Session *session,
639 const struct GNUNET_ATS_Properties *prop)
641 struct GNUNET_ATS_AddressRecord *ar;
648 /* we need a valid address */
652 GNUNET_break (GNUNET_ATS_NET_UNSPECIFIED != prop->scope);
653 namelen = strlen (address->transport_name) + 1;
654 msize = address->address_length + namelen;
655 if ((msize + sizeof (struct AddressUpdateMessage) >= GNUNET_MAX_MESSAGE_SIZE) ||
656 (address->address_length >= GNUNET_MAX_MESSAGE_SIZE) ||
657 (namelen >= GNUNET_MAX_MESSAGE_SIZE) )
659 /* address too large for us, this should not happen */
669 /* Already existing, nothing todo, but this should not happen */
673 s = find_empty_session_slot (sh);
674 ar = GNUNET_new (struct GNUNET_ATS_AddressRecord);
677 ar->session = session;
678 ar->address = GNUNET_HELLO_address_copy (address);
679 GNUNET_ATS_properties_hton (&ar->properties,
681 sh->session_array[s] = ar;
682 send_add_address_message (sh, ar);
688 * An address was used to initiate a session.
690 * @param ar address record to update information for
691 * @param session session handle
694 GNUNET_ATS_address_add_session (struct GNUNET_ATS_AddressRecord *ar,
695 struct GNUNET_ATS_Session *session)
697 GNUNET_break (NULL == ar->session);
698 ar->session = session;
703 * A session was destroyed, disassociate it from the
704 * given address record. If this was an incoming
705 * addess, destroy the address as well.
707 * @param ar address record to update information for
708 * @param session session handle
709 * @return #GNUNET_YES if the @a ar was destroyed because
710 * it was an incoming address,
711 * #GNUNET_NO if the @ar was kept because we can
712 * use it still to establish a new session
715 GNUNET_ATS_address_del_session (struct GNUNET_ATS_AddressRecord *ar,
716 struct GNUNET_ATS_Session *session)
718 GNUNET_assert (session == ar->session);
720 if (GNUNET_HELLO_address_check_option (ar->address,
721 GNUNET_HELLO_ADDRESS_INFO_INBOUND))
723 GNUNET_ATS_address_destroy (ar);
731 * We have updated performance statistics for a given address. Note
732 * that this function can be called for addresses that are currently
733 * in use as well as addresses that are valid but not actively in use.
734 * Furthermore, the peer may not even be connected to us right now (in
735 * which case the call may be ignored or the information may be stored
736 * for later use). Update bandwidth assignments.
738 * @param ar address record to update information for
739 * @param prop performance data for the address
742 GNUNET_ATS_address_update (struct GNUNET_ATS_AddressRecord *ar,
743 const struct GNUNET_ATS_Properties *prop)
745 struct GNUNET_ATS_SchedulingHandle *sh = ar->sh;
746 struct GNUNET_MQ_Envelope *ev;
747 struct AddressUpdateMessage *m;
749 LOG (GNUNET_ERROR_TYPE_DEBUG,
750 "Updating address for peer `%s', plugin `%s', session %p slot %u\n",
751 GNUNET_i2s (&ar->address->peer),
752 ar->address->transport_name,
755 GNUNET_break (GNUNET_ATS_NET_UNSPECIFIED != prop->scope);
756 GNUNET_ATS_properties_hton (&ar->properties,
759 return; /* disconnected, skip for now */
760 ev = GNUNET_MQ_msg (m, GNUNET_MESSAGE_TYPE_ATS_ADDRESS_UPDATE);
761 m->session_id = htonl (ar->slot);
762 m->peer = ar->address->peer;
763 m->properties = ar->properties;
764 GNUNET_MQ_send (sh->mq,
770 * An address got destroyed, stop using it as a valid address.
772 * @param ar address to destroy
775 GNUNET_ATS_address_destroy (struct GNUNET_ATS_AddressRecord *ar)
777 struct GNUNET_ATS_SchedulingHandle *sh = ar->sh;
778 struct GNUNET_MQ_Envelope *ev;
779 struct AddressDestroyedMessage *m;
781 LOG (GNUNET_ERROR_TYPE_DEBUG,
782 "Deleting address for peer `%s', plugin `%s', slot %u session %p\n",
783 GNUNET_i2s (&ar->address->peer),
784 ar->address->transport_name,
787 GNUNET_break (NULL == ar->session);
789 ar->in_destroy = GNUNET_YES;
792 ev = GNUNET_MQ_msg (m, GNUNET_MESSAGE_TYPE_ATS_ADDRESS_DESTROYED);
793 m->session_id = htonl (ar->slot);
794 m->peer = ar->address->peer;
795 GNUNET_MQ_send (sh->mq, ev);
799 /* end of ats_api_scheduling.c */