2 This file is part of GNUnet.
3 Copyright (C) 2010-2015, 2018 GNUnet e.V.
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
19 * @file ats/ats_api2_transport.c
20 * @brief address suggestions and bandwidth allocation
21 * @author Christian Grothoff
22 * @author Matthias Wachs
25 #include "gnunet_ats_transport_service.h"
28 #define LOG(kind,...) GNUNET_log_from(kind, "ats-transport-api", __VA_ARGS__)
32 * Information we track per session, incoming or outgoing. It also
33 * doesn't matter if we have a session, any session that ATS is
34 * allowed to suggest right now should be tracked.
36 struct GNUNET_ATS_SessionRecord
40 * Transport handle this session record belongs to.
42 struct GNUNET_ATS_TransportHandle *ath;
50 * Session handle, NULL if inbound-only (also implies we cannot
51 * actually control inbound traffic via transport!). So if
52 * @e session is NULL, the @e properties are informative for
53 * ATS (connection exists, utilization) but ATS cannot directly
54 * influence it (and should thus not call the
55 * #GNUNET_ATS_AllocationCallback for this @e session, which is
56 * obvious as NULL is not a meaningful session to allocation
59 struct GNUNET_ATS_Session *session;
62 * Identity of the peer reached at @e address.
64 struct GNUNET_PeerIdentity pid;
67 * Performance data about the @e session.
69 struct GNUNET_ATS_Properties properties;
72 * Unique ID to identify this session at this @a pid in IPC
81 * Handle to the ATS subsystem for bandwidth/transport transport information.
83 struct GNUNET_ATS_TransportHandle
89 const struct GNUNET_CONFIGURATION_Handle *cfg;
92 * Callback to invoke on suggestions.
94 GNUNET_ATS_SuggestionCallback suggest_cb;
97 * Closure for @e suggest_cb.
102 * Callback to invoke on allocations.
104 GNUNET_ATS_AllocationCallback alloc_cb;
107 * Closure for @e alloc_cb.
112 * Message queue for sending requests to the ATS service.
114 struct GNUNET_MQ_Handle *mq;
117 * Task to trigger reconnect.
119 struct GNUNET_SCHEDULER_Task *task;
122 * Hash map mapping PIDs to session records.
124 struct GNUNET_CONTAINER_MultiPeerMap *records;
127 * Reconnect backoff delay.
129 struct GNUNET_TIME_Relative backoff;
135 * Re-establish the connection to the ATS service.
137 * @param sh handle to use to re-connect.
140 reconnect (struct GNUNET_ATS_TransportHandle *ath);
144 * Re-establish the connection to the ATS service.
146 * @param cls handle to use to re-connect.
149 reconnect_task (void *cls)
151 struct GNUNET_ATS_TransportHandle *ath = cls;
159 * Disconnect from ATS and then reconnect.
161 * @param ath our handle
164 force_reconnect (struct GNUNET_ATS_TransportHandle *ath)
168 GNUNET_MQ_destroy (ath->mq);
171 /* FIXME: do we tell transport service about disconnect events? CON:
172 initially ATS will have a really screwed picture of the world and
173 the rapid change would be bad. PRO: if we don't, ATS and
174 transport may disagree about the allocation for a while...
175 For now: lazy: do nothing. */
176 ath->backoff = GNUNET_TIME_STD_BACKOFF (ath->backoff);
177 ath->task = GNUNET_SCHEDULER_add_delayed (ath->backoff,
184 * Check format of address suggestion message from the service.
186 * @param cls the `struct GNUNET_ATS_TransportHandle`
187 * @param m message received
190 check_ats_address_suggestion (void *cls,
191 const struct AddressSuggestionMessage *m)
194 GNUNET_MQ_check_zero_termination (m);
195 return GNUNET_SYSERR;
200 * We received an address suggestion message from the service.
202 * @param cls the `struct GNUNET_ATS_TransportHandle`
203 * @param m message received
206 handle_ats_address_suggestion (void *cls,
207 const struct AddressSuggestionMessage *m)
209 struct GNUNET_ATS_TransportHandle *ath = cls;
210 const char *address = (const char *) &m[1];
212 ath->suggest_cb (ath->suggest_cb_cls,
219 * Closure for #match_session_cb.
229 * Where to store the result.
231 struct GNUNET_ATS_SessionRecord *sr;
236 * Finds matching session record.
238 * @param cls a `struct FindContext`
239 * @param pid peer identity (unused)
240 * @param value a `struct GNUNET_ATS_SessionRecord`
241 * @return #GNUNET_NO if match found, #GNUNET_YES to continue searching
244 match_session_cb (void *cls,
245 const struct GNUNET_PeerIdentity *pid,
248 struct FindContext *fc = cls;
249 struct GNUNET_ATS_SessionRecord *sr = value;
252 if (fc->session_id == sr->slot)
263 * Find session record for peer @a pid and session @a session_id
265 * @param ath transport handle to search
266 * @param session_id session ID to match
267 * @param pid peer to search under
268 * @return NULL if no such record exists
270 static struct GNUNET_ATS_SessionRecord *
271 find_session (struct GNUNET_ATS_TransportHandle *ath,
273 const struct GNUNET_PeerIdentity *pid)
275 struct FindContext fc = {
276 .session_id = session_id,
279 GNUNET_CONTAINER_multipeermap_get_multiple (ath->records,
288 * We received a session allocation message from the service.
290 * @param cls the `struct GNUNET_ATS_TransportHandle`
291 * @param m message received
294 handle_ats_session_allocation (void *cls,
295 const struct SessionAllocationMessage *m)
297 struct GNUNET_ATS_TransportHandle *ath = cls;
298 struct GNUNET_ATS_SessionRecord *ar;
301 session_id = ntohl (m->session_id);
302 ar = find_session (ath,
307 /* this can (rarely) happen if ATS changes an sessiones allocation
308 just when the transport service deleted it */
309 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
310 "Allocation ignored, session unknown\n");
313 ath->backoff = GNUNET_TIME_UNIT_ZERO;
314 LOG (GNUNET_ERROR_TYPE_DEBUG,
315 "ATS allocates bandwidth for peer `%s' using address %s\n",
316 GNUNET_i2s (&ar->pid),
318 ath->alloc_cb (ath->alloc_cb_cls,
326 * We encountered an error handling the MQ to the ATS service.
329 * @param cls the `struct GNUNET_ATS_TransportHandle`
330 * @param error details about the error
333 error_handler (void *cls,
334 enum GNUNET_MQ_Error error)
336 struct GNUNET_ATS_TransportHandle *ath = cls;
338 LOG (GNUNET_ERROR_TYPE_DEBUG,
339 "ATS connection died (code %d), reconnecting\n",
341 force_reconnect (ath);
346 * Generate and transmit the `struct SessionAddMessage` for the given
349 * @param ar the session to inform the ATS service about
352 send_add_session_message (const struct GNUNET_ATS_SessionRecord *ar)
354 struct GNUNET_ATS_TransportHandle *ath = ar->ath;
355 struct GNUNET_MQ_Envelope *ev;
356 struct SessionAddMessage *m;
360 return; /* disconnected, skip for now */
361 alen = strlen (ar->address) + 1;
362 ev = GNUNET_MQ_msg_extra (m,
364 (NULL == ar->session)
365 ? GNUNET_MESSAGE_TYPE_ATS_SESSION_ADD_INBOUND_ONLY
366 : GNUNET_MESSAGE_TYPE_ATS_SESSION_ADD);
368 m->session_id = htonl (ar->slot);
369 // FIXME: convert endianess here!
370 // m->properties = ar->properties;
371 GNUNET_memcpy (&m[1],
375 LOG (GNUNET_ERROR_TYPE_DEBUG,
376 "Adding address `%s' for peer `%s'\n",
378 GNUNET_i2s (&ar->pid));
379 GNUNET_MQ_send (ath->mq,
385 * Send ATS information about the session record.
387 * @param cls our `struct GNUNET_ATS_TransportHandle *`, unused
389 * @param value the `struct GNUNET_ATS_SessionRecord *` to add
393 send_add_session_cb (void *cls,
394 const struct GNUNET_PeerIdentity *pid,
397 struct GNUNET_ATS_SessionRecord *ar = value;
401 send_add_session_message (ar);
407 * Re-establish the connection to the ATS service.
409 * @param ath handle to use to re-connect.
412 reconnect (struct GNUNET_ATS_TransportHandle *ath)
414 struct GNUNET_MQ_MessageHandler handlers[] = {
415 GNUNET_MQ_hd_var_size (ats_address_suggestion,
416 GNUNET_MESSAGE_TYPE_ATS_ADDRESS_SUGGESTION,
417 struct AddressSuggestionMessage,
419 GNUNET_MQ_hd_fixed_size (ats_session_allocation,
420 GNUNET_MESSAGE_TYPE_ATS_SESSION_ALLOCATION,
421 struct SessionAllocationMessage,
423 GNUNET_MQ_handler_end ()
425 struct GNUNET_MQ_Envelope *ev;
426 struct GNUNET_MessageHeader *init;
428 GNUNET_assert (NULL == ath->mq);
429 ath->mq = GNUNET_CLIENT_connect (ath->cfg,
437 force_reconnect (ath);
440 ev = GNUNET_MQ_msg (init,
441 GNUNET_MESSAGE_TYPE_ATS_START);
442 GNUNET_MQ_send (ath->mq,
446 GNUNET_CONTAINER_multipeermap_iterate (ath->records,
447 &send_add_session_cb,
453 * Initialize the ATS subsystem.
455 * @param cfg configuration to use
456 * @param alloc_cb notification to call whenever the allocation changed
457 * @param alloc_cb_cls closure for @a alloc_cb
458 * @param suggest_cb notification to call whenever the suggestation is made
459 * @param suggest_cb_cls closure for @a suggest_cb
460 * @return ats context
462 struct GNUNET_ATS_TransportHandle *
463 GNUNET_ATS_transport_init (const struct GNUNET_CONFIGURATION_Handle *cfg,
464 GNUNET_ATS_AllocationCallback alloc_cb,
466 GNUNET_ATS_SuggestionCallback suggest_cb,
467 void *suggest_cb_cls)
469 struct GNUNET_ATS_TransportHandle *ath;
471 ath = GNUNET_new (struct GNUNET_ATS_TransportHandle);
473 ath->suggest_cb = suggest_cb;
474 ath->suggest_cb_cls = suggest_cb_cls;
475 ath->alloc_cb = alloc_cb;
476 ath->alloc_cb_cls = alloc_cb_cls;
477 ath->records = GNUNET_CONTAINER_multipeermap_create (128,
485 * Release memory associated with the session record.
489 * @param value a `struct GNUNET_ATS_SessionRecord`
493 free_record (void *cls,
494 const struct GNUNET_PeerIdentity *pid,
497 struct GNUNET_ATS_SessionRecord *ar = value;
507 * Client is done with ATS transport, release resources.
509 * @param ath handle to release
512 GNUNET_ATS_transport_done (struct GNUNET_ATS_TransportHandle *ath)
516 GNUNET_MQ_destroy (ath->mq);
519 if (NULL != ath->task)
521 GNUNET_SCHEDULER_cancel (ath->task);
524 GNUNET_CONTAINER_multipeermap_iterate (ath->records,
527 GNUNET_CONTAINER_multipeermap_destroy (ath->records);
533 * We have a new session ATS should know. Sessiones have to be added
534 * with this function before they can be: updated, set in use and
538 * @param pid peer we connected to
539 * @param address the address (human readable version)
540 * @param session transport-internal handle for the session/queue, NULL if
541 * the session is inbound-only
542 * @param prop performance data for the session
543 * @return handle to the session representation inside ATS, NULL
544 * on error (i.e. ATS knows this exact session already)
546 struct GNUNET_ATS_SessionRecord *
547 GNUNET_ATS_session_add (struct GNUNET_ATS_TransportHandle *ath,
548 const struct GNUNET_PeerIdentity *pid,
550 struct GNUNET_ATS_Session *session,
551 const struct GNUNET_ATS_Properties *prop)
553 struct GNUNET_ATS_SessionRecord *ar;
559 /* we need a valid address */
563 alen = strlen (address) + 1;
564 if ( (alen + sizeof (struct SessionAddMessage) >= GNUNET_MAX_MESSAGE_SIZE) ||
565 (alen >= GNUNET_MAX_MESSAGE_SIZE) )
567 /* address too large for us, this should not happen */
572 /* Spin 's' until we find an unused session ID for this pid */
573 for (s = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
575 NULL != find_session (ath,
580 alen = strlen (address) + 1;
581 ar = GNUNET_malloc (sizeof (struct GNUNET_ATS_SessionRecord) + alen);
583 ar->slot = 42; // FIXME: make up unique number!
584 ar->session = session;
585 ar->address = (const char *) &ar[1];
587 ar->properties = *prop;
591 (void) GNUNET_CONTAINER_multipeermap_put (ath->records,
594 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
595 send_add_session_message (ar);
601 * We have updated performance statistics for a given session. Note
602 * that this function can be called for sessiones that are currently
603 * in use as well as sessiones that are valid but not actively in use.
604 * Furthermore, the peer may not even be connected to us right now (in
605 * which case the call may be ignored or the information may be stored
606 * for later use). Update bandwidth assignments.
608 * @param ar session record to update information for
609 * @param prop performance data for the session
612 GNUNET_ATS_session_update (struct GNUNET_ATS_SessionRecord *ar,
613 const struct GNUNET_ATS_Properties *prop)
615 struct GNUNET_ATS_TransportHandle *ath = ar->ath;
616 struct GNUNET_MQ_Envelope *ev;
617 struct SessionUpdateMessage *m;
619 LOG (GNUNET_ERROR_TYPE_DEBUG,
620 "Updating address `%s' for peer `%s'\n",
622 GNUNET_i2s (&ar->pid));
623 ar->properties = *prop;
625 return; /* disconnected, skip for now */
626 ev = GNUNET_MQ_msg (m,
627 GNUNET_MESSAGE_TYPE_ATS_SESSION_UPDATE);
628 m->session_id = htonl (ar->slot);
630 // FIXME: convert endianess here!
631 // m->properties = ar->properties;
632 GNUNET_MQ_send (ath->mq,
638 * A session was destroyed, ATS should now schedule and
639 * allocate under the assumption that this @a ar is no
642 * @param ar session record to drop
645 GNUNET_ATS_session_del (struct GNUNET_ATS_SessionRecord *ar)
647 struct GNUNET_ATS_TransportHandle *ath = ar->ath;
648 struct GNUNET_MQ_Envelope *ev;
649 struct SessionDelMessage *m;
651 LOG (GNUNET_ERROR_TYPE_DEBUG,
652 "Deleting address `%s' for peer `%s'\n",
654 GNUNET_i2s (&ar->pid));
657 ev = GNUNET_MQ_msg (m,
658 GNUNET_MESSAGE_TYPE_ATS_SESSION_DEL);
659 m->session_id = htonl (ar->slot);
661 GNUNET_MQ_send (ath->mq,
666 /* end of ats_api2_transport.c */