2 This file is part of GNUnet.
3 (C) 2010-2015 Christian Grothoff (and other contributing authors)
5 GNUnet is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published
7 by the Free Software Foundation; either version 3, or (at your
8 option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with GNUnet; see the file COPYING. If not, write to the
17 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18 Boston, MA 02111-1307, USA.
21 * @file ats/ats_api_scheduling.c
22 * @brief automatic transport selection and outbound bandwidth determination
23 * @author Christian Grothoff
24 * @author Matthias Wachs
27 * - we could avoid a linear scan over the
28 * active addresses in some cases, so if
29 * there is need, we can still optimize here
30 * - we might want to split off the logic to
31 * determine LAN vs. WAN, as it has nothing
32 * to do with accessing the ATS service.
35 #include "gnunet_ats_service.h"
39 * How frequently do we scan the interfaces for changes to the addresses?
41 #define INTERFACE_PROCESSING_INTERVAL GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
45 * Session ID we use if there is no session / slot.
51 * Information we track per address, incoming or outgoing. It also
52 * doesn't matter if we have a session, any address that ATS is
53 * allowed to suggest right now should be tracked.
55 struct GNUNET_ATS_AddressRecord
59 * Scheduling handle this address record belongs to.
61 struct GNUNET_ATS_SchedulingHandle *sh;
66 struct GNUNET_HELLO_Address *address;
69 * Session handle. NULL if we have an address but no
70 * active session for this address.
72 struct Session *session;
75 * Array with performance data about the address.
77 struct GNUNET_ATS_Information *ats;
80 * Number of entries in @e ats.
85 * Which slot (index) in the session array does
86 * this record correspond to? FIXME:
87 * FIXME: a linear search on this is really crappy!
88 * Maybe switch to a 64-bit global counter and be
89 * done with it? Or does that then cause too much
90 * trouble on the ATS-service side?
95 * We're about to destroy this address record, just ATS does
96 * not know this yet. Once ATS confirms its destruction,
104 * Handle to the ATS subsystem for bandwidth/transport scheduling information.
106 struct GNUNET_ATS_SchedulingHandle
112 const struct GNUNET_CONFIGURATION_Handle *cfg;
115 * Callback to invoke on suggestions.
117 GNUNET_ATS_AddressSuggestionCallback suggest_cb;
120 * Closure for @e suggest_cb.
122 void *suggest_cb_cls;
125 * Connection to ATS service.
127 struct GNUNET_CLIENT_Connection *client;
130 * Message queue for sending requests to the ATS service.
132 struct GNUNET_MQ_Handle *mq;
135 * Array of session objects (we need to translate them to numbers and back
136 * for the protocol; the offset in the array is the session number on the
137 * network). Index 0 is always NULL and reserved to represent the NULL pointer.
138 * Unused entries are also NULL.
140 struct GNUNET_ATS_AddressRecord **session_array;
143 * Task to trigger reconnect.
145 struct GNUNET_SCHEDULER_Task *task;
148 * Size of the @e session_array.
150 unsigned int session_array_size;
156 * Re-establish the connection to the ATS service.
158 * @param sh handle to use to re-connect.
161 reconnect (struct GNUNET_ATS_SchedulingHandle *sh);
165 * Re-establish the connection to the ATS service.
167 * @param cls handle to use to re-connect.
168 * @param tc scheduler context
171 reconnect_task (void *cls,
172 const struct GNUNET_SCHEDULER_TaskContext *tc)
174 struct GNUNET_ATS_SchedulingHandle *sh = cls;
182 * Disconnect from ATS and then reconnect.
184 * @param sh our handle
187 force_reconnect (struct GNUNET_ATS_SchedulingHandle *sh)
191 GNUNET_MQ_destroy (sh->mq);
194 if (NULL != sh->client)
196 GNUNET_CLIENT_disconnect (sh->client);
199 sh->suggest_cb (sh->suggest_cb_cls,
201 GNUNET_BANDWIDTH_ZERO,
202 GNUNET_BANDWIDTH_ZERO);
203 sh->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
210 * Find the session object corresponding to the given session ID.
212 * @param sh our handle
213 * @param session_id current session ID
214 * @param peer peer the session belongs to
215 * @return the session object (or NULL)
217 static struct GNUNET_ATS_AddressRecord *
218 find_session (struct GNUNET_ATS_SchedulingHandle *sh,
220 const struct GNUNET_PeerIdentity *peer)
222 struct GNUNET_ATS_AddressRecord *ar;
224 if (session_id >= sh->session_array_size)
231 ar = sh->session_array[session_id];
237 if (NULL == ar->address)
239 /* address was destroyed in the meantime, this can happen
240 as we communicate asynchronously with the ATS service. */
243 if (0 != memcmp (peer,
245 sizeof (struct GNUNET_PeerIdentity)))
248 force_reconnect (sh);
256 * Get an available session ID.
258 * @param sh our handle
259 * @return an unused slot, but never NOT_FOUND (0)
262 find_empty_session_slot (struct GNUNET_ATS_SchedulingHandle *sh)
268 while ( ( (NOT_FOUND == off) ||
269 (NULL != sh->session_array[off % sh->session_array_size]) ) &&
270 (i < sh->session_array_size) )
275 if ( (NOT_FOUND != off % sh->session_array_size) &&
276 (NULL == sh->session_array[off % sh->session_array_size]) )
278 i = sh->session_array_size;
279 GNUNET_array_grow (sh->session_array,
280 sh->session_array_size,
281 sh->session_array_size * 2);
287 * Get the ID for the given session object.
289 * @param sh our handle
290 * @param session session object
291 * @param address the address we are looking for
292 * @return the session id or NOT_FOUND for error
295 find_session_id (struct GNUNET_ATS_SchedulingHandle *sh,
296 struct Session *session,
297 const struct GNUNET_HELLO_Address *address)
306 for (i = 1; i < sh->session_array_size; i++)
307 if ( (NULL != sh->session_array[i]) &&
308 ( (session == sh->session_array[i]->session) ||
309 (NULL == sh->session_array[i]->session) ) &&
310 (0 == GNUNET_HELLO_address_cmp (address,
311 sh->session_array[i]->address)) )
318 * Release the session slot from the session table (ATS service is
319 * also done using it).
321 * @param sh our handle
322 * @param session_id identifies session that is no longer valid
325 release_session (struct GNUNET_ATS_SchedulingHandle *sh,
328 struct GNUNET_ATS_AddressRecord *ar;
330 if (NOT_FOUND == session_id)
332 if (session_id >= sh->session_array_size)
335 force_reconnect (sh);
338 /* this slot should have been removed from remove_session before */
339 ar = sh->session_array[session_id];
340 if (NULL != ar->session)
343 force_reconnect (sh);
346 GNUNET_HELLO_address_free (ar->address);
348 sh->session_array[session_id] = NULL;
353 * Type of a function to call when we receive a session release
354 * message from the service.
356 * @param cls the `struct GNUNET_ATS_SchedulingHandle`
357 * @param msg message received, NULL on timeout or fatal error
360 process_ats_session_release_message (void *cls,
361 const struct GNUNET_MessageHeader *msg)
363 struct GNUNET_ATS_SchedulingHandle *sh = cls;
364 const struct SessionReleaseMessage *srm;
366 srm = (const struct SessionReleaseMessage *) msg;
367 /* Note: peer field in srm not necessary right now,
368 but might be good to have in the future */
370 ntohl (srm->session_id));
375 * Type of a function to call when we receive a address suggestion
376 * message from the service.
378 * @param cls the `struct GNUNET_ATS_SchedulingHandle`
379 * @param msg message received, NULL on timeout or fatal error
382 process_ats_address_suggestion_message (void *cls,
383 const struct GNUNET_MessageHeader *msg)
385 struct GNUNET_ATS_SchedulingHandle *sh = cls;
386 const struct AddressSuggestionMessage *m;
387 struct GNUNET_ATS_AddressRecord *ar;
390 m = (const struct AddressSuggestionMessage *) msg;
391 session_id = ntohl (m->session_id);
395 force_reconnect (sh);
398 ar = find_session (sh, session_id, &m->peer);
402 force_reconnect (sh);
405 if (NULL == sh->suggest_cb)
407 if (GNUNET_YES == ar->in_destroy)
409 /* ignore suggestion, as this address is dying, unless BW is 0,
410 in that case signal 'disconnect' via BW 0 */
411 if ( (0 == ntohl (m->bandwidth_out.value__)) &&
412 (0 == ntohl (m->bandwidth_in.value__)) )
414 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
415 "ATS suggests disconnect from peer `%s' with BW %u/%u\n",
416 GNUNET_i2s (&ar->address->peer),
417 (unsigned int) ntohl (m->bandwidth_out.value__),
418 (unsigned int) ntohl (m->bandwidth_in.value__));
419 sh->suggest_cb (sh->suggest_cb_cls,
428 if ( (NULL == ar->session) &&
429 (GNUNET_HELLO_address_check_option (ar->address,
430 GNUNET_HELLO_ADDRESS_INFO_INBOUND)) )
435 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
436 "ATS suggests address slot %u for peer `%s' using plugin %s\n",
438 GNUNET_i2s (&ar->address->peer),
439 ar->address->transport_name);
440 sh->suggest_cb (sh->suggest_cb_cls,
450 * We encountered an error handling the MQ to the
451 * ATS service. Reconnect.
453 * @param cls the `struct GNUNET_ATS_SchedulingHandle`
454 * @param error details about the error
457 error_handler (void *cls,
458 enum GNUNET_MQ_Error error)
460 struct GNUNET_ATS_SchedulingHandle *sh = cls;
462 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
463 "ATS connection died (code %d), reconnecting\n",
465 force_reconnect (sh);
470 * Generate and transmit the `struct AddressAddMessage` for the given
473 * @param sh the scheduling handle to use for transmission
474 * @param ar the address to inform the ATS service about
477 send_add_address_message (struct GNUNET_ATS_SchedulingHandle *sh,
478 const struct GNUNET_ATS_AddressRecord *ar)
480 struct GNUNET_MQ_Envelope *ev;
481 struct AddressAddMessage *m;
482 struct GNUNET_ATS_Information *am;
488 return; /* disconnected, skip for now */
489 namelen = (NULL == ar->address->transport_name)
491 : strlen (ar->address->transport_name) + 1;
492 msize = ar->address->address_length +
493 ar->ats_count * sizeof (struct GNUNET_ATS_Information) + namelen;
495 ev = GNUNET_MQ_msg_extra (m, msize, GNUNET_MESSAGE_TYPE_ATS_ADDRESS_ADD);
496 m->ats_count = htonl (ar->ats_count);
497 m->peer = ar->address->peer;
498 m->address_length = htons (ar->address->address_length);
499 m->address_local_info = htonl ((uint32_t) ar->address->local_info);
500 m->plugin_name_length = htons (namelen);
501 m->session_id = htonl (ar->slot);
503 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
504 "Adding address for peer `%s', plugin `%s', session %p slot %u\n",
505 GNUNET_i2s (&ar->address->peer),
506 ar->address->transport_name,
509 am = (struct GNUNET_ATS_Information *) &m[1];
512 ar->ats_count * sizeof (struct GNUNET_ATS_Information));
513 pm = (char *) &am[ar->ats_count];
515 ar->address->address,
516 ar->address->address_length);
517 if (NULL != ar->address->transport_name)
518 memcpy (&pm[ar->address->address_length],
519 ar->address->transport_name,
521 GNUNET_MQ_send (sh->mq, ev);
526 * Re-establish the connection to the ATS service.
528 * @param sh handle to use to re-connect.
531 reconnect (struct GNUNET_ATS_SchedulingHandle *sh)
533 static const struct GNUNET_MQ_MessageHandler handlers[] =
534 { { &process_ats_session_release_message,
535 GNUNET_MESSAGE_TYPE_ATS_SESSION_RELEASE,
536 sizeof (struct SessionReleaseMessage) },
537 { &process_ats_address_suggestion_message,
538 GNUNET_MESSAGE_TYPE_ATS_ADDRESS_SUGGESTION,
539 sizeof (struct AddressSuggestionMessage) },
541 struct GNUNET_MQ_Envelope *ev;
542 struct ClientStartMessage *init;
544 struct GNUNET_ATS_AddressRecord *ar;
546 GNUNET_assert (NULL == sh->client);
547 sh->client = GNUNET_CLIENT_connect ("ats", sh->cfg);
548 if (NULL == sh->client)
550 force_reconnect (sh);
553 sh->mq = GNUNET_MQ_queue_for_connection_client (sh->client,
557 ev = GNUNET_MQ_msg (init,
558 GNUNET_MESSAGE_TYPE_ATS_START);
559 init->start_flag = htonl (START_FLAG_SCHEDULING);
560 GNUNET_MQ_send (sh->mq, ev);
563 for (i=0;i<sh->session_array_size;i++)
565 ar = sh->session_array[i];
568 send_add_address_message (sh, ar);
576 * Initialize the ATS subsystem.
578 * @param cfg configuration to use
579 * @param suggest_cb notification to call whenever the suggestation changed
580 * @param suggest_cb_cls closure for @a suggest_cb
581 * @return ats context
583 struct GNUNET_ATS_SchedulingHandle *
584 GNUNET_ATS_scheduling_init (const struct GNUNET_CONFIGURATION_Handle *cfg,
585 GNUNET_ATS_AddressSuggestionCallback suggest_cb,
586 void *suggest_cb_cls)
588 struct GNUNET_ATS_SchedulingHandle *sh;
590 sh = GNUNET_new (struct GNUNET_ATS_SchedulingHandle);
592 sh->suggest_cb = suggest_cb;
593 sh->suggest_cb_cls = suggest_cb_cls;
594 GNUNET_array_grow (sh->session_array,
595 sh->session_array_size,
603 * Client is done with ATS scheduling, release resources.
605 * @param sh handle to release
608 GNUNET_ATS_scheduling_done (struct GNUNET_ATS_SchedulingHandle *sh)
612 GNUNET_MQ_destroy (sh->mq);
615 if (NULL != sh->client)
617 GNUNET_CLIENT_disconnect (sh->client);
620 if (NULL != sh->task)
622 GNUNET_SCHEDULER_cancel (sh->task);
625 GNUNET_array_grow (sh->session_array,
626 sh->session_array_size,
633 * Test if a address and a session is known to ATS
635 * @param sh the scheduling handle
636 * @param address the address
637 * @param session the session
638 * @return #GNUNET_YES or #GNUNET_NO
641 GNUNET_ATS_session_known (struct GNUNET_ATS_SchedulingHandle *sh,
642 const struct GNUNET_HELLO_Address *address,
643 struct Session *session)
647 if (NOT_FOUND != find_session_id (sh,
650 return GNUNET_YES; /* Exists */
656 * We have a new address ATS should know. Addresses have to be added
657 * with this function before they can be: updated, set in use and
661 * @param address the address
662 * @param session session handle, can be NULL
663 * @param ats performance data for the address
664 * @param ats_count number of performance records in @a ats
665 * @return handle to the address representation inside ATS, NULL
666 * on error (i.e. ATS knows this exact address already)
668 struct GNUNET_ATS_AddressRecord *
669 GNUNET_ATS_address_add (struct GNUNET_ATS_SchedulingHandle *sh,
670 const struct GNUNET_HELLO_Address *address,
671 struct Session *session,
672 const struct GNUNET_ATS_Information *ats,
675 struct GNUNET_ATS_AddressRecord *ar;
682 /* we need a valid address */
686 namelen = (NULL == address->transport_name)
688 : strlen (address->transport_name) + 1;
689 msize = address->address_length +
690 ats_count * sizeof (struct GNUNET_ATS_Information) + namelen;
691 if ((msize + sizeof (struct AddressUpdateMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
692 (address->address_length >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
693 (namelen >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
695 GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_ATS_Information)))
697 /* address too large for us, this should not happen */
702 if (NOT_FOUND != find_session_id (sh, session, address))
704 /* Already existing, nothing todo, but this should not happen */
708 s = find_empty_session_slot (sh);
709 ar = GNUNET_new (struct GNUNET_ATS_AddressRecord);
712 ar->session = session;
713 ar->address = GNUNET_HELLO_address_copy (address);
714 GNUNET_array_grow (ar->ats,
719 ats_count * sizeof (struct GNUNET_ATS_Information));
720 sh->session_array[s] = ar;
721 send_add_address_message (sh, ar);
727 * An address was used to initiate a session.
729 * @param ar address record to update information for
730 * @param session session handle
733 GNUNET_ATS_address_add_session (struct GNUNET_ATS_AddressRecord *ar,
734 struct Session *session)
736 GNUNET_break (NULL == ar->session);
737 ar->session = session;
742 * A session was destroyed, disassociate it from the
743 * given address record. If this was an incoming
744 * addess, destroy the address as well.
746 * @param ar address record to update information for
747 * @param session session handle
748 * @return #GNUNET_YES if the @a ar was destroyed because
749 * it was an incoming address,
750 * #GNUNET_NO if the @ar was kept because we can
751 * use it still to establish a new session
754 GNUNET_ATS_address_del_session (struct GNUNET_ATS_AddressRecord *ar,
755 struct Session *session)
757 GNUNET_break (session == ar->session);
759 if (GNUNET_HELLO_address_check_option (ar->address,
760 GNUNET_HELLO_ADDRESS_INFO_INBOUND))
762 GNUNET_ATS_address_destroy (ar);
770 * We have updated performance statistics for a given address. Note
771 * that this function can be called for addresses that are currently
772 * in use as well as addresses that are valid but not actively in use.
773 * Furthermore, the peer may not even be connected to us right now (in
774 * which case the call may be ignored or the information may be stored
775 * for later use). Update bandwidth assignments.
777 * @param ar address record to update information for
778 * @param ats performance data for the address
779 * @param ats_count number of performance records in @a ats
782 GNUNET_ATS_address_update (struct GNUNET_ATS_AddressRecord *ar,
783 const struct GNUNET_ATS_Information *ats,
786 struct GNUNET_ATS_SchedulingHandle *sh = ar->sh;
787 struct GNUNET_MQ_Envelope *ev;
788 struct AddressUpdateMessage *m;
789 struct GNUNET_ATS_Information *am;
792 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
793 "Updating address for peer `%s', plugin `%s', session %p slot %u\n",
794 GNUNET_i2s (&ar->address->peer),
795 ar->address->transport_name,
798 GNUNET_array_grow (ar->ats,
803 ats_count * sizeof (struct GNUNET_ATS_Information));
806 return; /* disconnected, skip for now */
807 msize = ar->ats_count * sizeof (struct GNUNET_ATS_Information);
808 ev = GNUNET_MQ_msg_extra (m, msize, GNUNET_MESSAGE_TYPE_ATS_ADDRESS_UPDATE);
809 m->ats_count = htonl (ar->ats_count);
810 m->peer = ar->address->peer;
811 m->session_id = htonl (ar->slot);
812 am = (struct GNUNET_ATS_Information *) &m[1];
815 ar->ats_count * sizeof (struct GNUNET_ATS_Information));
816 GNUNET_MQ_send (sh->mq, ev);
822 * An address got destroyed, stop using it as a valid address.
824 * @param ar address to destroy
827 GNUNET_ATS_address_destroy (struct GNUNET_ATS_AddressRecord *ar)
829 struct GNUNET_ATS_SchedulingHandle *sh = ar->sh;
830 struct GNUNET_MQ_Envelope *ev;
831 struct AddressDestroyedMessage *m;
833 GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
834 "Deleting address for peer `%s', plugin `%s', slot %u session %p\n",
835 GNUNET_i2s (&ar->address->peer),
836 ar->address->transport_name,
839 GNUNET_break (NULL == ar->session);
841 ar->in_destroy = GNUNET_YES;
842 GNUNET_array_grow (ar->ats,
847 ev = GNUNET_MQ_msg (m, GNUNET_MESSAGE_TYPE_ATS_ADDRESS_DESTROYED);
848 m->session_id = htonl (ar->slot);
849 m->peer = ar->address->peer;
850 GNUNET_MQ_send (sh->mq, ev);
854 /* end of ats_api_scheduling.c */