ATS test: Give return value a meaning
[oweals/gnunet.git] / src / ats / ats_api2_transport.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2010-2015, 2018 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18      SPDX-License-Identifier: AGPL3.0-or-later
19 */
20 /**
21  * @file ats/ats_api2_transport.c
22  * @brief address suggestions and bandwidth allocation
23  * @author Christian Grothoff
24  * @author Matthias Wachs
25  */
26 #include "platform.h"
27 #include "gnunet_ats_transport_service.h"
28 #include "ats2.h"
29
30 #define LOG(kind,...) GNUNET_log_from(kind, "ats-transport-api", __VA_ARGS__)
31
32
33 /**
34  * Information we track per session, incoming or outgoing.  It also
35  * doesn't matter if we have a session, any session that ATS is
36  * allowed to suggest right now should be tracked.
37  */
38 struct GNUNET_ATS_SessionRecord
39 {
40
41   /**
42    * Transport handle this session record belongs to.
43    */
44   struct GNUNET_ATS_TransportHandle *ath;
45
46   /**
47    * Address data.
48    */
49   const char *address;
50
51   /**
52    * Session handle, NULL if inbound-only (also implies we cannot
53    * actually control inbound traffic via transport!).  So if
54    * @e session is NULL, the @e properties are informative for
55    * ATS (connection exists, utilization) but ATS cannot directly
56    * influence it (and should thus not call the
57    * #GNUNET_ATS_AllocationCallback for this @e session, which is
58    * obvious as NULL is not a meaningful session to allocation
59    * resources to).
60    */
61   struct GNUNET_ATS_Session *session;
62
63   /**
64    * Identity of the peer reached at @e address.
65    */
66   struct GNUNET_PeerIdentity pid;
67
68   /**
69    * Performance data about the @e session.
70    */
71   struct GNUNET_ATS_Properties properties;
72
73   /**
74    * Unique ID to identify this session at this @a pid in IPC
75    * messages.
76    */
77   uint32_t slot;
78
79 };
80
81
82 /**
83  * Handle to the ATS subsystem for bandwidth/transport transport information.
84  */
85 struct GNUNET_ATS_TransportHandle
86 {
87
88   /**
89    * Our configuration.
90    */
91   const struct GNUNET_CONFIGURATION_Handle *cfg;
92
93   /**
94    * Callback to invoke on suggestions.
95    */
96   GNUNET_ATS_SuggestionCallback suggest_cb;
97
98   /**
99    * Closure for @e suggest_cb.
100    */
101   void *suggest_cb_cls;
102
103   /**
104    * Callback to invoke on allocations.
105    */
106   GNUNET_ATS_AllocationCallback alloc_cb;
107
108   /**
109    * Closure for @e alloc_cb.
110    */
111   void *alloc_cb_cls;
112
113   /**
114    * Message queue for sending requests to the ATS service.
115    */
116   struct GNUNET_MQ_Handle *mq;
117
118   /**
119    * Task to trigger reconnect.
120    */
121   struct GNUNET_SCHEDULER_Task *task;
122
123   /**
124    * Hash map mapping PIDs to session records.
125    */
126   struct GNUNET_CONTAINER_MultiPeerMap *records;
127
128   /**
129    * Reconnect backoff delay.
130    */
131   struct GNUNET_TIME_Relative backoff;
132
133 };
134
135
136
137 /**
138  * Convert ATS properties from host to network byte order.
139  *
140  * @param nbo[OUT] value written
141  * @param hbo value read
142  */
143 static void
144 properties_hton (struct PropertiesNBO *nbo,
145                  const struct GNUNET_ATS_Properties *hbo)
146 {
147   nbo->delay = GNUNET_TIME_relative_hton (hbo->delay);
148   nbo->goodput_out = htonl (hbo->goodput_out);
149   nbo->goodput_in = htonl (hbo->goodput_in);
150   nbo->utilization_out = htonl (hbo->utilization_out);
151   nbo->utilization_in = htonl (hbo->utilization_in);
152   nbo->distance = htonl (hbo->distance);
153   nbo->mtu = htonl (hbo->mtu);
154   nbo->nt = htonl ((uint32_t) hbo->nt);
155   nbo->cc = htonl ((uint32_t) hbo->cc);
156 }
157
158
159 /**
160  * Re-establish the connection to the ATS service.
161  *
162  * @param sh handle to use to re-connect.
163  */
164 static void
165 reconnect (struct GNUNET_ATS_TransportHandle *ath);
166
167
168 /**
169  * Re-establish the connection to the ATS service.
170  *
171  * @param cls handle to use to re-connect.
172  */
173 static void
174 reconnect_task (void *cls)
175 {
176   struct GNUNET_ATS_TransportHandle *ath = cls;
177
178   ath->task = NULL;
179   reconnect (ath);
180 }
181
182
183 /**
184  * Disconnect from ATS and then reconnect.
185  *
186  * @param ath our handle
187  */
188 static void
189 force_reconnect (struct GNUNET_ATS_TransportHandle *ath)
190 {
191   if (NULL != ath->mq)
192   {
193     GNUNET_MQ_destroy (ath->mq);
194     ath->mq = NULL;
195   }
196   /* FIXME: do we tell transport service about disconnect events? CON:
197      initially ATS will have a really screwed picture of the world and
198      the rapid change would be bad.  PRO: if we don't, ATS and
199      transport may disagree about the allocation for a while...
200      For now: lazy: do nothing. */
201   ath->backoff = GNUNET_TIME_STD_BACKOFF (ath->backoff);
202   ath->task = GNUNET_SCHEDULER_add_delayed (ath->backoff,
203                                            &reconnect_task,
204                                            ath);
205 }
206
207
208 /**
209  * Check format of address suggestion message from the service.
210  *
211  * @param cls the `struct GNUNET_ATS_TransportHandle`
212  * @param m message received
213  */
214 static int
215 check_ats_address_suggestion (void *cls,
216                               const struct AddressSuggestionMessage *m)
217 {
218   (void) cls;
219   GNUNET_MQ_check_zero_termination (m);
220   return GNUNET_SYSERR;
221 }
222
223
224 /**
225  * We received an address suggestion message from the service.
226  *
227  * @param cls the `struct GNUNET_ATS_TransportHandle`
228  * @param m message received
229  */
230 static void
231 handle_ats_address_suggestion (void *cls,
232                                const struct AddressSuggestionMessage *m)
233 {
234   struct GNUNET_ATS_TransportHandle *ath = cls;
235   const char *address = (const char *) &m[1];
236
237   ath->suggest_cb (ath->suggest_cb_cls,
238                   &m->peer,
239                   address);
240 }
241
242
243 /**
244  * Closure for #match_session_cb.
245  */
246 struct FindContext
247 {
248   /**
249    * Key to look for.
250    */
251   uint32_t session_id;
252
253   /**
254    * Where to store the result.
255    */
256   struct GNUNET_ATS_SessionRecord *sr;
257 };
258
259
260 /**
261  * Finds matching session record.
262  *
263  * @param cls a `struct FindContext`
264  * @param pid peer identity (unused)
265  * @param value a `struct GNUNET_ATS_SessionRecord`
266  * @return #GNUNET_NO if match found, #GNUNET_YES to continue searching
267  */
268 static int
269 match_session_cb (void *cls,
270                   const struct GNUNET_PeerIdentity *pid,
271                   void *value)
272 {
273   struct FindContext *fc = cls;
274   struct GNUNET_ATS_SessionRecord *sr = value;
275
276   (void) pid;
277   if (fc->session_id == sr->slot)
278   {
279     fc->sr = sr;
280     return GNUNET_NO;
281   }
282   return GNUNET_YES;
283 }
284
285
286
287 /**
288  * Find session record for peer @a pid and session @a session_id
289  *
290  * @param ath transport handle to search
291  * @param session_id session ID to match
292  * @param pid peer to search under
293  * @return NULL if no such record exists
294  */
295 static struct GNUNET_ATS_SessionRecord *
296 find_session (struct GNUNET_ATS_TransportHandle *ath,
297               uint32_t session_id,
298               const struct GNUNET_PeerIdentity *pid)
299 {
300   struct FindContext fc = {
301     .session_id = session_id,
302     .sr = NULL
303   };
304   GNUNET_CONTAINER_multipeermap_get_multiple (ath->records,
305                                               pid,
306                                               &match_session_cb,
307                                               &fc);
308   return fc.sr;
309 }
310
311
312 /**
313  * We received a session allocation message from the service.
314  *
315  * @param cls the `struct GNUNET_ATS_TransportHandle`
316  * @param m message received
317  */
318 static void
319 handle_ats_session_allocation (void *cls,
320                                const struct SessionAllocationMessage *m)
321 {
322   struct GNUNET_ATS_TransportHandle *ath = cls;
323   struct GNUNET_ATS_SessionRecord *ar;
324   uint32_t session_id;
325
326   session_id = ntohl (m->session_id);
327   ar = find_session (ath,
328                      session_id,
329                      &m->peer);
330   if (NULL == ar)
331   {
332     /* this can (rarely) happen if ATS changes an sessiones allocation
333        just when the transport service deleted it */
334     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
335                 "Allocation ignored, session unknown\n");
336     return;
337   }
338   ath->backoff = GNUNET_TIME_UNIT_ZERO;
339   LOG (GNUNET_ERROR_TYPE_DEBUG,
340        "ATS allocates bandwidth for peer `%s' using address %s\n",
341        GNUNET_i2s (&ar->pid),
342        ar->address);
343   ath->alloc_cb (ath->alloc_cb_cls,
344                  ar->session,
345                  m->bandwidth_out,
346                  m->bandwidth_in);
347 }
348
349
350 /**
351  * We encountered an error handling the MQ to the ATS service.
352  * Reconnect.
353  *
354  * @param cls the `struct GNUNET_ATS_TransportHandle`
355  * @param error details about the error
356  */
357 static void
358 error_handler (void *cls,
359                enum GNUNET_MQ_Error error)
360 {
361   struct GNUNET_ATS_TransportHandle *ath = cls;
362
363   LOG (GNUNET_ERROR_TYPE_DEBUG,
364        "ATS connection died (code %d), reconnecting\n",
365        (int) error);
366   force_reconnect (ath);
367 }
368
369
370 /**
371  * Generate and transmit the `struct SessionAddMessage` for the given
372  * session record.
373  *
374  * @param ar the session to inform the ATS service about
375  */
376 static void
377 send_add_session_message (const struct GNUNET_ATS_SessionRecord *ar)
378 {
379   struct GNUNET_ATS_TransportHandle *ath = ar->ath;
380   struct GNUNET_MQ_Envelope *ev;
381   struct SessionAddMessage *m;
382   size_t alen;
383
384   if (NULL == ath->mq)
385     return; /* disconnected, skip for now */
386   alen = strlen (ar->address) + 1;
387   ev = GNUNET_MQ_msg_extra (m,
388                             alen,
389                             (NULL == ar->session)
390                             ? GNUNET_MESSAGE_TYPE_ATS_SESSION_ADD_INBOUND_ONLY
391                             : GNUNET_MESSAGE_TYPE_ATS_SESSION_ADD);
392   m->peer = ar->pid;
393   m->session_id = htonl (ar->slot);
394   properties_hton (&m->properties,
395                    &ar->properties);
396   GNUNET_memcpy (&m[1],
397                  ar->address,
398                  alen);
399
400   LOG (GNUNET_ERROR_TYPE_DEBUG,
401        "Adding address `%s' for peer `%s'\n",
402        ar->address,
403        GNUNET_i2s (&ar->pid));
404   GNUNET_MQ_send (ath->mq,
405                   ev);
406 }
407
408
409 /**
410  * Send ATS information about the session record.
411  *
412  * @param cls our `struct GNUNET_ATS_TransportHandle *`, unused
413  * @param pid unused
414  * @param value the `struct GNUNET_ATS_SessionRecord *` to add
415  * @return #GNUNET_OK
416  */
417 static int
418 send_add_session_cb (void *cls,
419                      const struct GNUNET_PeerIdentity *pid,
420                      void *value)
421 {
422   struct GNUNET_ATS_SessionRecord *ar = value;
423
424   (void) cls;
425   (void) pid;
426   send_add_session_message (ar);
427   return GNUNET_OK;
428 }
429
430
431 /**
432  * Re-establish the connection to the ATS service.
433  *
434  * @param ath handle to use to re-connect.
435  */
436 static void
437 reconnect (struct GNUNET_ATS_TransportHandle *ath)
438 {
439   struct GNUNET_MQ_MessageHandler handlers[] = {
440     GNUNET_MQ_hd_var_size (ats_address_suggestion,
441                            GNUNET_MESSAGE_TYPE_ATS_ADDRESS_SUGGESTION,
442                            struct AddressSuggestionMessage,
443                            ath),
444     GNUNET_MQ_hd_fixed_size (ats_session_allocation,
445                              GNUNET_MESSAGE_TYPE_ATS_SESSION_ALLOCATION,
446                              struct SessionAllocationMessage,
447                              ath),
448     GNUNET_MQ_handler_end ()
449   };
450   struct GNUNET_MQ_Envelope *ev;
451   struct GNUNET_MessageHeader *init;
452
453   GNUNET_assert (NULL == ath->mq);
454   ath->mq = GNUNET_CLIENT_connect (ath->cfg,
455                                   "ats",
456                                   handlers,
457                                   &error_handler,
458                                   ath);
459   if (NULL == ath->mq)
460   {
461     GNUNET_break (0);
462     force_reconnect (ath);
463     return;
464   }
465   ev = GNUNET_MQ_msg (init,
466                       GNUNET_MESSAGE_TYPE_ATS_START);
467   GNUNET_MQ_send (ath->mq,
468                   ev);
469   if (NULL == ath->mq)
470     return;
471   GNUNET_CONTAINER_multipeermap_iterate (ath->records,
472                                          &send_add_session_cb,
473                                          ath);
474 }
475
476
477 /**
478  * Initialize the ATS subsystem.
479  *
480  * @param cfg configuration to use
481  * @param alloc_cb notification to call whenever the allocation changed
482  * @param alloc_cb_cls closure for @a alloc_cb
483  * @param suggest_cb notification to call whenever the suggestation is made
484  * @param suggest_cb_cls closure for @a suggest_cb
485  * @return ats context
486  */
487 struct GNUNET_ATS_TransportHandle *
488 GNUNET_ATS_transport_init (const struct GNUNET_CONFIGURATION_Handle *cfg,
489                            GNUNET_ATS_AllocationCallback alloc_cb,
490                            void *alloc_cb_cls,
491                            GNUNET_ATS_SuggestionCallback suggest_cb,
492                            void *suggest_cb_cls)
493 {
494   struct GNUNET_ATS_TransportHandle *ath;
495
496   ath = GNUNET_new (struct GNUNET_ATS_TransportHandle);
497   ath->cfg = cfg;
498   ath->suggest_cb = suggest_cb;
499   ath->suggest_cb_cls = suggest_cb_cls;
500   ath->alloc_cb = alloc_cb;
501   ath->alloc_cb_cls = alloc_cb_cls;
502   ath->records = GNUNET_CONTAINER_multipeermap_create (128,
503                                                       GNUNET_YES);
504   reconnect (ath);
505   return ath;
506 }
507
508
509 /**
510  * Release memory associated with the session record.
511  *
512  * @param cls NULL
513  * @param pid unused
514  * @param value a `struct GNUNET_ATS_SessionRecord`
515  * @return #GNUNET_OK
516  */
517 static int
518 free_record (void *cls,
519              const struct GNUNET_PeerIdentity *pid,
520              void *value)
521 {
522   struct GNUNET_ATS_SessionRecord *ar = value;
523
524   (void) cls;
525   (void) pid;
526   GNUNET_free (ar);
527   return GNUNET_OK;
528 }
529
530
531 /**
532  * Client is done with ATS transport, release resources.
533  *
534  * @param ath handle to release
535  */
536 void
537 GNUNET_ATS_transport_done (struct GNUNET_ATS_TransportHandle *ath)
538 {
539   if (NULL != ath->mq)
540   {
541     GNUNET_MQ_destroy (ath->mq);
542     ath->mq = NULL;
543   }
544   if (NULL != ath->task)
545   {
546     GNUNET_SCHEDULER_cancel (ath->task);
547     ath->task = NULL;
548   }
549   GNUNET_CONTAINER_multipeermap_iterate (ath->records,
550                                          &free_record,
551                                          NULL);
552   GNUNET_CONTAINER_multipeermap_destroy (ath->records);
553   GNUNET_free (ath);
554 }
555
556
557 /**
558  * We have a new session ATS should know. Sessiones have to be added
559  * with this function before they can be: updated, set in use and
560  * destroyed.
561  *
562  * @param ath handle
563  * @param pid peer we connected to
564  * @param address the address (human readable version)
565  * @param session transport-internal handle for the session/queue, NULL if
566  *        the session is inbound-only
567  * @param prop performance data for the session
568  * @return handle to the session representation inside ATS, NULL
569  *         on error (i.e. ATS knows this exact session already)
570  */
571 struct GNUNET_ATS_SessionRecord *
572 GNUNET_ATS_session_add (struct GNUNET_ATS_TransportHandle *ath,
573                         const struct GNUNET_PeerIdentity *pid,
574                         const char *address,
575                         struct GNUNET_ATS_Session *session,
576                         const struct GNUNET_ATS_Properties *prop)
577 {
578   struct GNUNET_ATS_SessionRecord *ar;
579   uint32_t s;
580   size_t alen;
581
582   if (NULL == address)
583   {
584     /* we need a valid address */
585     GNUNET_break (0);
586     return NULL;
587   }
588   alen = strlen (address) + 1;
589   if ( (alen + sizeof (struct SessionAddMessage) >= GNUNET_MAX_MESSAGE_SIZE) ||
590        (alen >= GNUNET_MAX_MESSAGE_SIZE) )
591   {
592     /* address too large for us, this should not happen */
593     GNUNET_break (0);
594     return NULL;
595   }
596
597   /* Spin 's' until we find an unused session ID for this pid */
598   for (s = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
599                                      UINT32_MAX);
600        NULL != find_session (ath,
601                              s,
602                              pid);
603        s++) ;
604
605   alen = strlen (address) + 1;
606   ar = GNUNET_malloc (sizeof (struct GNUNET_ATS_SessionRecord) + alen);
607   ar->ath = ath;
608   ar->slot = s;
609   ar->session = session;
610   ar->address = (const char *) &ar[1];
611   ar->pid = *pid;
612   ar->properties = *prop;
613   memcpy (&ar[1],
614           address,
615           alen);
616   (void) GNUNET_CONTAINER_multipeermap_put (ath->records,
617                                             &ar->pid,
618                                             ar,
619                                             GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
620   send_add_session_message (ar);
621   return ar;
622 }
623
624
625 /**
626  * We have updated performance statistics for a given session.  Note
627  * that this function can be called for sessiones that are currently
628  * in use as well as sessiones that are valid but not actively in use.
629  * Furthermore, the peer may not even be connected to us right now (in
630  * which case the call may be ignored or the information may be stored
631  * for later use).  Update bandwidth assignments.
632  *
633  * @param ar session record to update information for
634  * @param prop performance data for the session
635  */
636 void
637 GNUNET_ATS_session_update (struct GNUNET_ATS_SessionRecord *ar,
638                            const struct GNUNET_ATS_Properties *prop)
639 {
640   struct GNUNET_ATS_TransportHandle *ath = ar->ath;
641   struct GNUNET_MQ_Envelope *ev;
642   struct SessionUpdateMessage *m;
643
644   LOG (GNUNET_ERROR_TYPE_DEBUG,
645        "Updating address `%s' for peer `%s'\n",
646        ar->address,
647        GNUNET_i2s (&ar->pid));
648   ar->properties = *prop;
649   if (NULL == ath->mq)
650     return; /* disconnected, skip for now */
651   ev = GNUNET_MQ_msg (m,
652                       GNUNET_MESSAGE_TYPE_ATS_SESSION_UPDATE);
653   m->session_id = htonl (ar->slot);
654   m->peer = ar->pid;
655   properties_hton (&m->properties,
656                    &ar->properties);
657   GNUNET_MQ_send (ath->mq,
658                   ev);
659 }
660
661
662 /**
663  * A session was destroyed, ATS should now schedule and
664  * allocate under the assumption that this @a ar is no
665  * longer in use.
666  *
667  * @param ar session record to drop
668  */
669 void
670 GNUNET_ATS_session_del (struct GNUNET_ATS_SessionRecord *ar)
671 {
672   struct GNUNET_ATS_TransportHandle *ath = ar->ath;
673   struct GNUNET_MQ_Envelope *ev;
674   struct SessionDelMessage *m;
675
676   LOG (GNUNET_ERROR_TYPE_DEBUG,
677        "Deleting address `%s' for peer `%s'\n",
678        ar->address,
679        GNUNET_i2s (&ar->pid));
680   if (NULL == ath->mq)
681     return;
682   ev = GNUNET_MQ_msg (m,
683                       GNUNET_MESSAGE_TYPE_ATS_SESSION_DEL);
684   m->session_id = htonl (ar->slot);
685   m->peer = ar->pid;
686   GNUNET_MQ_send (ath->mq,
687                   ev);
688 }
689
690
691 /* end of ats_api2_transport.c */