2 This file is part of GNUnet.
3 Copyright (C) 2010-2015 GNUnet e.V.
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18 Boston, MA 02110-1301, USA.
21 * @file ats/ats_api_scheduling.c
22 * @brief automatic transport selection and outbound bandwidth determination
23 * @author Christian Grothoff
24 * @author Matthias Wachs
27 * - we could avoid a linear scan over the
28 * active addresses in some cases, so if
29 * there is need, we can still optimize here
30 * - we might want to split off the logic to
31 * determine LAN vs. WAN, as it has nothing
32 * to do with accessing the ATS service.
35 #include "gnunet_ats_service.h"
39 * How frequently do we scan the interfaces for changes to the addresses?
41 #define INTERFACE_PROCESSING_INTERVAL GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
43 #define LOG(kind,...) GNUNET_log_from(kind, "ats-scheduling-api", __VA_ARGS__)
46 * Session ID we use if there is no session / slot.
52 * Information we track per address, incoming or outgoing. It also
53 * doesn't matter if we have a session, any address that ATS is
54 * allowed to suggest right now should be tracked.
56 struct GNUNET_ATS_AddressRecord
60 * Scheduling handle this address record belongs to.
62 struct GNUNET_ATS_SchedulingHandle *sh;
67 struct GNUNET_HELLO_Address *address;
70 * Session handle. NULL if we have an address but no
71 * active session for this address.
73 struct GNUNET_ATS_Session *session;
76 * Performance data about the address.
78 struct GNUNET_ATS_PropertiesNBO properties;
81 * Which slot (index) in the session array does
82 * this record correspond to?
83 * FIXME: a linear search on this is really crappy!
84 * Maybe switch to a 64-bit global counter and be
85 * done with it? Or does that then cause too much
86 * trouble on the ATS-service side?
91 * We're about to destroy this address record, just ATS does
92 * not know this yet. Once ATS confirms its destruction,
100 * Handle to the ATS subsystem for bandwidth/transport scheduling information.
102 struct GNUNET_ATS_SchedulingHandle
108 const struct GNUNET_CONFIGURATION_Handle *cfg;
111 * Callback to invoke on suggestions.
113 GNUNET_ATS_AddressSuggestionCallback suggest_cb;
116 * Closure for @e suggest_cb.
118 void *suggest_cb_cls;
121 * Connection to ATS service.
123 struct GNUNET_CLIENT_Connection *client;
126 * Message queue for sending requests to the ATS service.
128 struct GNUNET_MQ_Handle *mq;
131 * Array of session objects (we need to translate them to numbers and back
132 * for the protocol; the offset in the array is the session number on the
133 * network). Index 0 is always NULL and reserved to represent the NULL pointer.
134 * Unused entries are also NULL.
136 struct GNUNET_ATS_AddressRecord **session_array;
139 * Task to trigger reconnect.
141 struct GNUNET_SCHEDULER_Task *task;
144 * Reconnect backoff delay.
146 struct GNUNET_TIME_Relative backoff;
149 * Size of the @e session_array.
151 unsigned int session_array_size;
157 * Re-establish the connection to the ATS service.
159 * @param sh handle to use to re-connect.
162 reconnect (struct GNUNET_ATS_SchedulingHandle *sh);
166 * Re-establish the connection to the ATS service.
168 * @param cls handle to use to re-connect.
169 * @param tc scheduler context
172 reconnect_task (void *cls,
173 const struct GNUNET_SCHEDULER_TaskContext *tc)
175 struct GNUNET_ATS_SchedulingHandle *sh = cls;
183 * Disconnect from ATS and then reconnect.
185 * @param sh our handle
188 force_reconnect (struct GNUNET_ATS_SchedulingHandle *sh)
192 GNUNET_MQ_destroy (sh->mq);
195 if (NULL != sh->client)
197 GNUNET_CLIENT_disconnect (sh->client);
200 sh->suggest_cb (sh->suggest_cb_cls,
202 GNUNET_BANDWIDTH_ZERO,
203 GNUNET_BANDWIDTH_ZERO);
204 sh->backoff = GNUNET_TIME_STD_BACKOFF (sh->backoff);
205 sh->task = GNUNET_SCHEDULER_add_delayed (sh->backoff,
212 * Find the session object corresponding to the given session ID.
214 * @param sh our handle
215 * @param session_id current session ID
216 * @param peer peer the session belongs to
217 * @return the session object (or NULL)
219 static struct GNUNET_ATS_AddressRecord *
220 find_session (struct GNUNET_ATS_SchedulingHandle *sh,
222 const struct GNUNET_PeerIdentity *peer)
224 struct GNUNET_ATS_AddressRecord *ar;
226 if (session_id >= sh->session_array_size)
233 ar = sh->session_array[session_id];
239 if (NULL == ar->address)
241 /* address was destroyed in the meantime, this can happen
242 as we communicate asynchronously with the ATS service. */
245 if (0 != memcmp (peer,
247 sizeof (struct GNUNET_PeerIdentity)))
257 * Get an available session ID.
259 * @param sh our handle
260 * @return an unused slot, but never NOT_FOUND (0)
263 find_empty_session_slot (struct GNUNET_ATS_SchedulingHandle *sh)
269 while ( ( (NOT_FOUND == off) ||
270 (NULL != sh->session_array[off % sh->session_array_size]) ) &&
271 (i < sh->session_array_size) )
276 if ( (NOT_FOUND != off % sh->session_array_size) &&
277 (NULL == sh->session_array[off % sh->session_array_size]) )
279 i = sh->session_array_size;
280 GNUNET_array_grow (sh->session_array,
281 sh->session_array_size,
282 sh->session_array_size * 2);
288 * Get the ID for the given session object.
290 * @param sh our handle
291 * @param session session object
292 * @param address the address we are looking for
293 * @return the session id or NOT_FOUND for error
296 find_session_id (struct GNUNET_ATS_SchedulingHandle *sh,
297 struct GNUNET_ATS_Session *session,
298 const struct GNUNET_HELLO_Address *address)
307 for (i = 1; i < sh->session_array_size; i++)
308 if ( (NULL != sh->session_array[i]) &&
309 (GNUNET_NO == sh->session_array[i]->in_destroy) &&
310 ( (session == sh->session_array[i]->session) ||
311 (NULL == sh->session_array[i]->session) ) &&
312 (0 == memcmp (&address->peer,
313 &sh->session_array[i]->address->peer,
314 sizeof (struct GNUNET_PeerIdentity))) &&
315 (0 == GNUNET_HELLO_address_cmp (address,
316 sh->session_array[i]->address)) )
323 * Release the session slot from the session table (ATS service is
324 * also done using it).
326 * @param sh our handle
327 * @param session_id identifies session that is no longer valid
330 release_session (struct GNUNET_ATS_SchedulingHandle *sh,
333 struct GNUNET_ATS_AddressRecord *ar;
335 if (NOT_FOUND == session_id)
337 if (session_id >= sh->session_array_size)
340 force_reconnect (sh);
343 /* this slot should have been removed from remove_session before */
344 ar = sh->session_array[session_id];
345 if (NULL != ar->session)
348 force_reconnect (sh);
351 GNUNET_HELLO_address_free (ar->address);
353 sh->session_array[session_id] = NULL;
358 * Type of a function to call when we receive a session release
359 * message from the service.
361 * @param cls the `struct GNUNET_ATS_SchedulingHandle`
362 * @param msg message received, NULL on timeout or fatal error
365 process_ats_session_release_message (void *cls,
366 const struct GNUNET_MessageHeader *msg)
368 struct GNUNET_ATS_SchedulingHandle *sh = cls;
369 const struct GNUNET_ATS_SessionReleaseMessage *srm;
371 srm = (const struct GNUNET_ATS_SessionReleaseMessage *) msg;
372 /* Note: peer field in srm not necessary right now,
373 but might be good to have in the future */
375 ntohl (srm->session_id));
380 * Type of a function to call when we receive a address suggestion
381 * message from the service.
383 * @param cls the `struct GNUNET_ATS_SchedulingHandle`
384 * @param msg message received, NULL on timeout or fatal error
387 process_ats_address_suggestion_message (void *cls,
388 const struct GNUNET_MessageHeader *msg)
390 struct GNUNET_ATS_SchedulingHandle *sh = cls;
391 const struct AddressSuggestionMessage *m;
392 struct GNUNET_ATS_AddressRecord *ar;
395 m = (const struct AddressSuggestionMessage *) msg;
396 session_id = ntohl (m->session_id);
400 force_reconnect (sh);
403 ar = find_session (sh,
409 force_reconnect (sh);
412 if (NULL == sh->suggest_cb)
414 if (GNUNET_YES == ar->in_destroy)
416 /* ignore suggestion, as this address is dying, unless BW is 0,
417 in that case signal 'disconnect' via BW 0 */
418 if ( (0 == ntohl (m->bandwidth_out.value__)) &&
419 (0 == ntohl (m->bandwidth_in.value__)) )
421 LOG (GNUNET_ERROR_TYPE_DEBUG,
422 "ATS suggests disconnect from peer `%s' with BW %u/%u\n",
423 GNUNET_i2s (&ar->address->peer),
424 (unsigned int) ntohl (m->bandwidth_out.value__),
425 (unsigned int) ntohl (m->bandwidth_in.value__));
426 sh->suggest_cb (sh->suggest_cb_cls,
435 if ( (NULL == ar->session) &&
436 (GNUNET_HELLO_address_check_option (ar->address,
437 GNUNET_HELLO_ADDRESS_INFO_INBOUND)) )
442 sh->backoff = GNUNET_TIME_UNIT_ZERO;
443 LOG (GNUNET_ERROR_TYPE_DEBUG,
444 "ATS suggests address slot %u for peer `%s' using plugin %s\n",
446 GNUNET_i2s (&ar->address->peer),
447 ar->address->transport_name);
448 sh->suggest_cb (sh->suggest_cb_cls,
458 * We encountered an error handling the MQ to the
459 * ATS service. Reconnect.
461 * @param cls the `struct GNUNET_ATS_SchedulingHandle`
462 * @param error details about the error
465 error_handler (void *cls,
466 enum GNUNET_MQ_Error error)
468 struct GNUNET_ATS_SchedulingHandle *sh = cls;
470 LOG (GNUNET_ERROR_TYPE_ERROR,
471 "ATS connection died (code %d), reconnecting\n",
473 force_reconnect (sh);
478 * Generate and transmit the `struct AddressAddMessage` for the given
481 * @param sh the scheduling handle to use for transmission
482 * @param ar the address to inform the ATS service about
485 send_add_address_message (struct GNUNET_ATS_SchedulingHandle *sh,
486 const struct GNUNET_ATS_AddressRecord *ar)
488 struct GNUNET_MQ_Envelope *ev;
489 struct AddressAddMessage *m;
495 return; /* disconnected, skip for now */
496 GNUNET_break (GNUNET_ATS_NET_UNSPECIFIED != ar->properties.scope);
497 namelen = strlen (ar->address->transport_name) + 1;
498 msize = ar->address->address_length + namelen;
499 ev = GNUNET_MQ_msg_extra (m, msize, GNUNET_MESSAGE_TYPE_ATS_ADDRESS_ADD);
500 m->peer = ar->address->peer;
501 m->address_length = htons (ar->address->address_length);
502 m->address_local_info = htonl ((uint32_t) ar->address->local_info);
503 m->plugin_name_length = htons (namelen);
504 m->session_id = htonl (ar->slot);
505 m->properties = ar->properties;
507 LOG (GNUNET_ERROR_TYPE_DEBUG,
508 "Adding address for peer `%s', plugin `%s', session %p slot %u\n",
509 GNUNET_i2s (&ar->address->peer),
510 ar->address->transport_name,
515 ar->address->address,
516 ar->address->address_length);
517 if (NULL != ar->address->transport_name)
518 memcpy (&pm[ar->address->address_length],
519 ar->address->transport_name,
521 GNUNET_MQ_send (sh->mq, ev);
526 * Re-establish the connection to the ATS service.
528 * @param sh handle to use to re-connect.
531 reconnect (struct GNUNET_ATS_SchedulingHandle *sh)
533 static const struct GNUNET_MQ_MessageHandler handlers[] =
534 { { &process_ats_session_release_message,
535 GNUNET_MESSAGE_TYPE_ATS_SESSION_RELEASE,
536 sizeof (struct GNUNET_ATS_SessionReleaseMessage) },
537 { &process_ats_address_suggestion_message,
538 GNUNET_MESSAGE_TYPE_ATS_ADDRESS_SUGGESTION,
539 sizeof (struct AddressSuggestionMessage) },
541 struct GNUNET_MQ_Envelope *ev;
542 struct ClientStartMessage *init;
544 struct GNUNET_ATS_AddressRecord *ar;
546 GNUNET_assert (NULL == sh->client);
547 sh->client = GNUNET_CLIENT_connect ("ats",
549 if (NULL == sh->client)
552 force_reconnect (sh);
555 sh->mq = GNUNET_MQ_queue_for_connection_client (sh->client,
559 ev = GNUNET_MQ_msg (init,
560 GNUNET_MESSAGE_TYPE_ATS_START);
561 init->start_flag = htonl (START_FLAG_SCHEDULING);
562 GNUNET_MQ_send (sh->mq, ev);
565 for (i=0;i<sh->session_array_size;i++)
567 ar = sh->session_array[i];
570 send_add_address_message (sh, ar);
578 * Initialize the ATS subsystem.
580 * @param cfg configuration to use
581 * @param suggest_cb notification to call whenever the suggestation changed
582 * @param suggest_cb_cls closure for @a suggest_cb
583 * @return ats context
585 struct GNUNET_ATS_SchedulingHandle *
586 GNUNET_ATS_scheduling_init (const struct GNUNET_CONFIGURATION_Handle *cfg,
587 GNUNET_ATS_AddressSuggestionCallback suggest_cb,
588 void *suggest_cb_cls)
590 struct GNUNET_ATS_SchedulingHandle *sh;
592 sh = GNUNET_new (struct GNUNET_ATS_SchedulingHandle);
594 sh->suggest_cb = suggest_cb;
595 sh->suggest_cb_cls = suggest_cb_cls;
596 GNUNET_array_grow (sh->session_array,
597 sh->session_array_size,
605 * Client is done with ATS scheduling, release resources.
607 * @param sh handle to release
610 GNUNET_ATS_scheduling_done (struct GNUNET_ATS_SchedulingHandle *sh)
612 struct GNUNET_ATS_AddressRecord *ar;
617 GNUNET_MQ_destroy (sh->mq);
620 if (NULL != sh->client)
622 GNUNET_CLIENT_disconnect (sh->client);
625 if (NULL != sh->task)
627 GNUNET_SCHEDULER_cancel (sh->task);
630 for (i=0;i<sh->session_array_size;i++)
632 if (NULL != (ar = sh->session_array[i]))
634 GNUNET_HELLO_address_free (ar->address);
636 sh->session_array[i] = NULL;
639 GNUNET_array_grow (sh->session_array,
640 sh->session_array_size,
647 * We have a new address ATS should know. Addresses have to be added
648 * with this function before they can be: updated, set in use and
652 * @param address the address
653 * @param session session handle, can be NULL
654 * @param prop performance data for the address
655 * @return handle to the address representation inside ATS, NULL
656 * on error (i.e. ATS knows this exact address already)
658 struct GNUNET_ATS_AddressRecord *
659 GNUNET_ATS_address_add (struct GNUNET_ATS_SchedulingHandle *sh,
660 const struct GNUNET_HELLO_Address *address,
661 struct GNUNET_ATS_Session *session,
662 const struct GNUNET_ATS_Properties *prop)
664 struct GNUNET_ATS_AddressRecord *ar;
671 /* we need a valid address */
675 GNUNET_break (GNUNET_ATS_NET_UNSPECIFIED != prop->scope);
676 namelen = strlen (address->transport_name) + 1;
677 msize = address->address_length + namelen;
678 if ((msize + sizeof (struct AddressUpdateMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
679 (address->address_length >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
680 (namelen >= GNUNET_SERVER_MAX_MESSAGE_SIZE) )
682 /* address too large for us, this should not happen */
692 /* Already existing, nothing todo, but this should not happen */
696 s = find_empty_session_slot (sh);
697 ar = GNUNET_new (struct GNUNET_ATS_AddressRecord);
700 ar->session = session;
701 ar->address = GNUNET_HELLO_address_copy (address);
702 GNUNET_ATS_properties_hton (&ar->properties,
704 sh->session_array[s] = ar;
705 send_add_address_message (sh, ar);
711 * An address was used to initiate a session.
713 * @param ar address record to update information for
714 * @param session session handle
717 GNUNET_ATS_address_add_session (struct GNUNET_ATS_AddressRecord *ar,
718 struct GNUNET_ATS_Session *session)
720 GNUNET_break (NULL == ar->session);
721 ar->session = session;
726 * A session was destroyed, disassociate it from the
727 * given address record. If this was an incoming
728 * addess, destroy the address as well.
730 * @param ar address record to update information for
731 * @param session session handle
732 * @return #GNUNET_YES if the @a ar was destroyed because
733 * it was an incoming address,
734 * #GNUNET_NO if the @ar was kept because we can
735 * use it still to establish a new session
738 GNUNET_ATS_address_del_session (struct GNUNET_ATS_AddressRecord *ar,
739 struct GNUNET_ATS_Session *session)
741 GNUNET_assert (session == ar->session);
743 if (GNUNET_HELLO_address_check_option (ar->address,
744 GNUNET_HELLO_ADDRESS_INFO_INBOUND))
746 GNUNET_ATS_address_destroy (ar);
754 * We have updated performance statistics for a given address. Note
755 * that this function can be called for addresses that are currently
756 * in use as well as addresses that are valid but not actively in use.
757 * Furthermore, the peer may not even be connected to us right now (in
758 * which case the call may be ignored or the information may be stored
759 * for later use). Update bandwidth assignments.
761 * @param ar address record to update information for
762 * @param prop performance data for the address
765 GNUNET_ATS_address_update (struct GNUNET_ATS_AddressRecord *ar,
766 const struct GNUNET_ATS_Properties *prop)
768 struct GNUNET_ATS_SchedulingHandle *sh = ar->sh;
769 struct GNUNET_MQ_Envelope *ev;
770 struct AddressUpdateMessage *m;
772 LOG (GNUNET_ERROR_TYPE_DEBUG,
773 "Updating address for peer `%s', plugin `%s', session %p slot %u\n",
774 GNUNET_i2s (&ar->address->peer),
775 ar->address->transport_name,
778 GNUNET_break (GNUNET_ATS_NET_UNSPECIFIED != prop->scope);
779 GNUNET_ATS_properties_hton (&ar->properties,
782 return; /* disconnected, skip for now */
783 ev = GNUNET_MQ_msg (m, GNUNET_MESSAGE_TYPE_ATS_ADDRESS_UPDATE);
784 m->session_id = htonl (ar->slot);
785 m->peer = ar->address->peer;
786 m->properties = ar->properties;
787 GNUNET_MQ_send (sh->mq,
793 * An address got destroyed, stop using it as a valid address.
795 * @param ar address to destroy
798 GNUNET_ATS_address_destroy (struct GNUNET_ATS_AddressRecord *ar)
800 struct GNUNET_ATS_SchedulingHandle *sh = ar->sh;
801 struct GNUNET_MQ_Envelope *ev;
802 struct AddressDestroyedMessage *m;
804 LOG (GNUNET_ERROR_TYPE_DEBUG,
805 "Deleting address for peer `%s', plugin `%s', slot %u session %p\n",
806 GNUNET_i2s (&ar->address->peer),
807 ar->address->transport_name,
810 GNUNET_break (NULL == ar->session);
812 ar->in_destroy = GNUNET_YES;
815 ev = GNUNET_MQ_msg (m, GNUNET_MESSAGE_TYPE_ATS_ADDRESS_DESTROYED);
816 m->session_id = htonl (ar->slot);
817 m->peer = ar->address->peer;
818 GNUNET_MQ_send (sh->mq, ev);
822 /* end of ats_api_scheduling.c */