draft ATS API for TNG
[oweals/gnunet.git] / src / ats / ats_api2_transport.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2010-2015, 2018 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18 /**
19  * @file ats/ats_api2_transport.c
20  * @brief address suggestions and bandwidth allocation
21  * @author Christian Grothoff
22  * @author Matthias Wachs
23  */
24 #include "platform.h"
25 #include "gnunet_ats_transport_service.h"
26 #include "ats2.h"
27
28 #define LOG(kind,...) GNUNET_log_from(kind, "ats-transport-api", __VA_ARGS__)
29
30
31 /**
32  * Information we track per session, incoming or outgoing.  It also
33  * doesn't matter if we have a session, any session that ATS is
34  * allowed to suggest right now should be tracked.
35  */
36 struct GNUNET_ATS_SessionRecord
37 {
38
39   /**
40    * Transport handle this session record belongs to.
41    */
42   struct GNUNET_ATS_TransportHandle *ath;
43
44   /**
45    * Address data.
46    */
47   const char *address;
48
49   /**
50    * Session handle, NULL if inbound-only (also implies we cannot
51    * actually control inbound traffic via transport!).  So if
52    * @e session is NULL, the @e properties are informative for
53    * ATS (connection exists, utilization) but ATS cannot directly
54    * influence it (and should thus not call the
55    * #GNUNET_ATS_AllocationCallback for this @e session, which is
56    * obvious as NULL is not a meaningful session to allocation
57    * resources to).
58    */
59   struct GNUNET_ATS_Session *session;
60
61   /**
62    * Identity of the peer reached at @e address.
63    */
64   struct GNUNET_PeerIdentity pid;
65
66   /**
67    * Performance data about the @e session.
68    */
69   struct GNUNET_ATS_Properties properties;
70
71   /**
72    * Unique ID to identify this session at this @a pid in IPC
73    * messages.
74    */
75   uint32_t slot;
76
77 };
78
79
80 /**
81  * Handle to the ATS subsystem for bandwidth/transport transport information.
82  */
83 struct GNUNET_ATS_TransportHandle
84 {
85
86   /**
87    * Our configuration.
88    */
89   const struct GNUNET_CONFIGURATION_Handle *cfg;
90
91   /**
92    * Callback to invoke on suggestions.
93    */
94   GNUNET_ATS_SuggestionCallback suggest_cb;
95
96   /**
97    * Closure for @e suggest_cb.
98    */
99   void *suggest_cb_cls;
100
101   /**
102    * Callback to invoke on allocations.
103    */
104   GNUNET_ATS_AllocationCallback alloc_cb;
105
106   /**
107    * Closure for @e alloc_cb.
108    */
109   void *alloc_cb_cls;
110
111   /**
112    * Message queue for sending requests to the ATS service.
113    */
114   struct GNUNET_MQ_Handle *mq;
115
116   /**
117    * Task to trigger reconnect.
118    */
119   struct GNUNET_SCHEDULER_Task *task;
120
121   /**
122    * Hash map mapping PIDs to session records.
123    */
124   struct GNUNET_CONTAINER_MultiPeerMap *records;
125
126   /**
127    * Reconnect backoff delay.
128    */
129   struct GNUNET_TIME_Relative backoff;
130
131 };
132
133
134 /**
135  * Re-establish the connection to the ATS service.
136  *
137  * @param sh handle to use to re-connect.
138  */
139 static void
140 reconnect (struct GNUNET_ATS_TransportHandle *ath);
141
142
143 /**
144  * Re-establish the connection to the ATS service.
145  *
146  * @param cls handle to use to re-connect.
147  */
148 static void
149 reconnect_task (void *cls)
150 {
151   struct GNUNET_ATS_TransportHandle *ath = cls;
152
153   ath->task = NULL;
154   reconnect (ath);
155 }
156
157
158 /**
159  * Disconnect from ATS and then reconnect.
160  *
161  * @param ath our handle
162  */
163 static void
164 force_reconnect (struct GNUNET_ATS_TransportHandle *ath)
165 {
166   if (NULL != ath->mq)
167   {
168     GNUNET_MQ_destroy (ath->mq);
169     ath->mq = NULL;
170   }
171   /* FIXME: do we tell transport service about disconnect events? CON:
172      initially ATS will have a really screwed picture of the world and
173      the rapid change would be bad.  PRO: if we don't, ATS and
174      transport may disagree about the allocation for a while...
175      For now: lazy: do nothing. */
176   ath->backoff = GNUNET_TIME_STD_BACKOFF (ath->backoff);
177   ath->task = GNUNET_SCHEDULER_add_delayed (ath->backoff,
178                                            &reconnect_task,
179                                            ath);
180 }
181
182
183 /**
184  * Check format of address suggestion message from the service.
185  *
186  * @param cls the `struct GNUNET_ATS_TransportHandle`
187  * @param m message received
188  */
189 static int
190 check_ats_address_suggestion (void *cls,
191                               const struct AddressSuggestionMessage *m)
192 {
193   // FIXME: check 0-termination!
194   // FIXME: MQ API should really have a macro for this!
195   return GNUNET_SYSERR;
196 }
197
198
199 /**
200  * We received an address suggestion message from the service.
201  *
202  * @param cls the `struct GNUNET_ATS_TransportHandle`
203  * @param m message received
204  */
205 static void
206 handle_ats_address_suggestion (void *cls,
207                                const struct AddressSuggestionMessage *m)
208 {
209   struct GNUNET_ATS_TransportHandle *ath = cls;
210   const char *address = (const char *) &m[1];
211
212   ath->suggest_cb (ath->suggest_cb_cls,
213                   &m->peer,
214                   address);
215 }
216
217
218 /**
219  * Closure for #match_session_cb.
220  */
221 struct FindContext
222 {
223   /**
224    * Key to look for.
225    */
226   uint32_t session_id;
227
228   /**
229    * Where to store the result.
230    */
231   struct GNUNET_ATS_SessionRecord *sr;
232 };
233
234
235 /**
236  * Finds matching session record.
237  *
238  * @param cls a `struct FindContext`
239  * @param pid peer identity (unused)
240  * @param value a `struct GNUNET_ATS_SessionRecord`
241  * @return #GNUNET_NO if match found, #GNUNET_YES to continue searching
242  */
243 static int
244 match_session_cb (void *cls,
245                   const struct GNUNET_PeerIdentity *pid,
246                   void *value)
247 {
248   struct FindContext *fc = cls;
249   struct GNUNET_ATS_SessionRecord *sr = value;
250
251   (void) pid;
252   if (fc->session_id == sr->slot)
253     {
254       fc->sr = sr;
255       return GNUNET_NO;
256     }
257   return GNUNET_YES;
258 }
259
260
261
262 /**
263  * Find session record for peer @a pid and session @a session_id
264  *
265  * @param ath transport handle to search
266  * @param session_id session ID to match
267  * @param pid peer to search under
268  * @return NULL if no such record exists
269  */
270 static struct GNUNET_ATS_SessionRecord *
271 find_session (struct GNUNET_ATS_TransportHandle *ath,
272               uint32_t session_id,
273               const struct GNUNET_PeerIdentity *pid)
274 {
275   struct FindContext fc = {
276     .session_id = session_id,
277     .sr = NULL
278   };
279   GNUNET_CONTAINER_multipeermap_get_multiple (ath->records,
280                                               pid,
281                                               &match_session_cb,
282                                               &fc);
283   return fc.sr;
284 }
285
286
287 /**
288  * We received a session allocation message from the service.
289  *
290  * @param cls the `struct GNUNET_ATS_TransportHandle`
291  * @param m message received
292  */
293 static void
294 handle_ats_session_allocation (void *cls,
295                                const struct SessionAllocationMessage *m)
296 {
297   struct GNUNET_ATS_TransportHandle *ath = cls;
298   struct GNUNET_ATS_SessionRecord *ar;
299   uint32_t session_id;
300
301   session_id = ntohl (m->session_id);
302   ar = find_session (ath,
303                      session_id,
304                      &m->peer);
305   if (NULL == ar)
306   {
307     /* this can (rarely) happen if ATS changes an sessiones allocation
308        just when the transport service deleted it */
309     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
310                 "Allocation ignored, session unknown\n");
311     return;
312   }
313   ath->backoff = GNUNET_TIME_UNIT_ZERO;
314   LOG (GNUNET_ERROR_TYPE_DEBUG,
315        "ATS allocates bandwidth for peer `%s' using address %s\n",
316        GNUNET_i2s (&ar->pid),
317        ar->address);
318   ath->alloc_cb (ath->alloc_cb_cls,
319                  ar->session,
320                  m->bandwidth_out,
321                  m->bandwidth_in);
322 }
323
324
325 /**
326  * We encountered an error handling the MQ to the ATS service.
327  * Reconnect.
328  *
329  * @param cls the `struct GNUNET_ATS_TransportHandle`
330  * @param error details about the error
331  */
332 static void
333 error_handler (void *cls,
334                enum GNUNET_MQ_Error error)
335 {
336   struct GNUNET_ATS_TransportHandle *ath = cls;
337
338   LOG (GNUNET_ERROR_TYPE_DEBUG,
339        "ATS connection died (code %d), reconnecting\n",
340        (int) error);
341   force_reconnect (ath);
342 }
343
344
345 /**
346  * Generate and transmit the `struct SessionAddMessage` for the given
347  * session record.
348  *
349  * @param ar the session to inform the ATS service about
350  */
351 static void
352 send_add_session_message (const struct GNUNET_ATS_SessionRecord *ar)
353 {
354   struct GNUNET_ATS_TransportHandle *ath = ar->ath;
355   struct GNUNET_MQ_Envelope *ev;
356   struct SessionAddMessage *m;
357   size_t alen;
358
359   if (NULL == ath->mq)
360     return; /* disconnected, skip for now */
361   alen = strlen (ar->address) + 1;
362   ev = GNUNET_MQ_msg_extra (m,
363                             alen,
364                             (NULL == ar->session)
365                             ? GNUNET_MESSAGE_TYPE_ATS_SESSION_ADD_INBOUND_ONLY
366                             : GNUNET_MESSAGE_TYPE_ATS_SESSION_ADD);
367   m->peer = ar->pid;
368   m->session_id = htonl (ar->slot);
369   // FIXME: convert endianess here!
370   // m->properties = ar->properties;
371   GNUNET_memcpy (&m[1],
372                  ar->address,
373                  alen);
374
375   LOG (GNUNET_ERROR_TYPE_DEBUG,
376        "Adding address `%s' for peer `%s'\n",
377        ar->address,
378        GNUNET_i2s (&ar->pid));
379   GNUNET_MQ_send (ath->mq,
380                   ev);
381 }
382
383
384 /**
385  * Send ATS information about the session record.
386  *
387  * @param cls our `struct GNUNET_ATS_TransportHandle *`, unused
388  * @param pid unused
389  * @param value the `struct GNUNET_ATS_SessionRecord *` to add
390  * @return #GNUNET_OK
391  */
392 static int
393 send_add_session_cb (void *cls,
394                      const struct GNUNET_PeerIdentity *pid,
395                      void *value)
396 {
397   struct GNUNET_ATS_SessionRecord *ar = value;
398
399   (void) cls;
400   (void) pid;
401   send_add_session_message (ar);
402   return GNUNET_OK;
403 }
404
405
406 /**
407  * Re-establish the connection to the ATS service.
408  *
409  * @param ath handle to use to re-connect.
410  */
411 static void
412 reconnect (struct GNUNET_ATS_TransportHandle *ath)
413 {
414   struct GNUNET_MQ_MessageHandler handlers[] = {
415     GNUNET_MQ_hd_var_size (ats_address_suggestion,
416                            GNUNET_MESSAGE_TYPE_ATS_ADDRESS_SUGGESTION,
417                            struct AddressSuggestionMessage,
418                            ath),
419     GNUNET_MQ_hd_fixed_size (ats_session_allocation,
420                              GNUNET_MESSAGE_TYPE_ATS_SESSION_ALLOCATION,
421                              struct SessionAllocationMessage,
422                              ath),
423     GNUNET_MQ_handler_end ()
424   };
425   struct GNUNET_MQ_Envelope *ev;
426   struct GNUNET_MessageHeader *init;
427
428   GNUNET_assert (NULL == ath->mq);
429   ath->mq = GNUNET_CLIENT_connect (ath->cfg,
430                                   "ats",
431                                   handlers,
432                                   &error_handler,
433                                   ath);
434   if (NULL == ath->mq)
435   {
436     GNUNET_break (0);
437     force_reconnect (ath);
438     return;
439   }
440   ev = GNUNET_MQ_msg (init,
441                       GNUNET_MESSAGE_TYPE_ATS_START);
442   GNUNET_MQ_send (ath->mq,
443                   ev);
444   if (NULL == ath->mq)
445     return;
446   GNUNET_CONTAINER_multipeermap_iterate (ath->records,
447                                          &send_add_session_cb,
448                                          ath);
449 }
450
451
452 /**
453  * Initialize the ATS subsystem.
454  *
455  * @param cfg configuration to use
456  * @param alloc_cb notification to call whenever the allocation changed
457  * @param alloc_cb_cls closure for @a alloc_cb
458  * @param suggest_cb notification to call whenever the suggestation is made
459  * @param suggest_cb_cls closure for @a suggest_cb
460  * @return ats context
461  */
462 struct GNUNET_ATS_TransportHandle *
463 GNUNET_ATS_transport_init (const struct GNUNET_CONFIGURATION_Handle *cfg,
464                            GNUNET_ATS_AllocationCallback alloc_cb,
465                            void *alloc_cb_cls,
466                            GNUNET_ATS_SuggestionCallback suggest_cb,
467                            void *suggest_cb_cls)
468 {
469   struct GNUNET_ATS_TransportHandle *ath;
470
471   ath = GNUNET_new (struct GNUNET_ATS_TransportHandle);
472   ath->cfg = cfg;
473   ath->suggest_cb = suggest_cb;
474   ath->suggest_cb_cls = suggest_cb_cls;
475   ath->alloc_cb = alloc_cb;
476   ath->alloc_cb_cls = alloc_cb_cls;
477   ath->records = GNUNET_CONTAINER_multipeermap_create (128,
478                                                       GNUNET_YES);
479   reconnect (ath);
480   return ath;
481 }
482
483
484 /**
485  * Release memory associated with the session record.
486  *
487  * @param cls NULL
488  * @param pid unused
489  * @param value a `struct GNUNET_ATS_SessionRecord`
490  * @return #GNUNET_OK
491  */
492 static int
493 free_record (void *cls,
494              const struct GNUNET_PeerIdentity *pid,
495              void *value)
496 {
497   struct GNUNET_ATS_SessionRecord *ar = value;
498
499   (void) cls;
500   (void) pid;
501   GNUNET_free (ar);
502   return GNUNET_OK;
503 }
504
505
506 /**
507  * Client is done with ATS transport, release resources.
508  *
509  * @param ath handle to release
510  */
511 void
512 GNUNET_ATS_transport_done (struct GNUNET_ATS_TransportHandle *ath)
513 {
514   if (NULL != ath->mq)
515   {
516     GNUNET_MQ_destroy (ath->mq);
517     ath->mq = NULL;
518   }
519   if (NULL != ath->task)
520   {
521     GNUNET_SCHEDULER_cancel (ath->task);
522     ath->task = NULL;
523   }
524   GNUNET_CONTAINER_multipeermap_iterate (ath->records,
525                                          &free_record,
526                                          NULL);
527   GNUNET_CONTAINER_multipeermap_destroy (ath->records);
528   GNUNET_free (ath);
529 }
530
531
532 /**
533  * We have a new session ATS should know. Sessiones have to be added
534  * with this function before they can be: updated, set in use and
535  * destroyed.
536  *
537  * @param ath handle
538  * @param pid peer we connected to
539  * @param address the address (human readable version)
540  * @param session transport-internal handle for the session/queue, NULL if
541  *        the session is inbound-only
542  * @param prop performance data for the session
543  * @return handle to the session representation inside ATS, NULL
544  *         on error (i.e. ATS knows this exact session already)
545  */
546 struct GNUNET_ATS_SessionRecord *
547 GNUNET_ATS_session_add (struct GNUNET_ATS_TransportHandle *ath,
548                         const struct GNUNET_PeerIdentity *pid,
549                         const char *address,
550                         struct GNUNET_ATS_Session *session,
551                         const struct GNUNET_ATS_Properties *prop)
552 {
553   struct GNUNET_ATS_SessionRecord *ar;
554   uint32_t s;
555   size_t alen;
556
557   if (NULL == address)
558   {
559     /* we need a valid address */
560     GNUNET_break (0);
561     return NULL;
562   }
563   alen = strlen (address) + 1;
564   if ( (alen + sizeof (struct SessionAddMessage) >= GNUNET_MAX_MESSAGE_SIZE) ||
565        (alen >= GNUNET_MAX_MESSAGE_SIZE) )
566   {
567     /* address too large for us, this should not happen */
568     GNUNET_break (0);
569     return NULL;
570   }
571
572   /* Spin 's' until we find an unused session ID for this pid */
573   for (s = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
574                                      UINT32_MAX);
575        NULL != find_session (ath,
576                              s,
577                              pid);
578        s++) ;
579
580   alen = strlen (address) + 1;
581   ar = GNUNET_malloc (sizeof (struct GNUNET_ATS_SessionRecord) + alen);
582   ar->ath = ath;
583   ar->slot = 42; // FIXME: make up unique number!
584   ar->session = session;
585   ar->address = (const char *) &ar[1];
586   ar->pid = *pid;
587   ar->properties = *prop;
588   memcpy (&ar[1],
589           address,
590           alen);
591   (void) GNUNET_CONTAINER_multipeermap_put (ath->records,
592                                             &ar->pid,
593                                             ar,
594                                             GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
595   send_add_session_message (ar);
596   return ar;
597 }
598
599
600 /**
601  * We have updated performance statistics for a given session.  Note
602  * that this function can be called for sessiones that are currently
603  * in use as well as sessiones that are valid but not actively in use.
604  * Furthermore, the peer may not even be connected to us right now (in
605  * which case the call may be ignored or the information may be stored
606  * for later use).  Update bandwidth assignments.
607  *
608  * @param ar session record to update information for
609  * @param prop performance data for the session
610  */
611 void
612 GNUNET_ATS_session_update (struct GNUNET_ATS_SessionRecord *ar,
613                            const struct GNUNET_ATS_Properties *prop)
614 {
615   struct GNUNET_ATS_TransportHandle *ath = ar->ath;
616   struct GNUNET_MQ_Envelope *ev;
617   struct SessionUpdateMessage *m;
618
619   LOG (GNUNET_ERROR_TYPE_DEBUG,
620        "Updating address `%s' for peer `%s'\n",
621        ar->address,
622        GNUNET_i2s (&ar->pid));
623   ar->properties = *prop;
624   if (NULL == ath->mq)
625     return; /* disconnected, skip for now */
626   ev = GNUNET_MQ_msg (m,
627                       GNUNET_MESSAGE_TYPE_ATS_SESSION_UPDATE);
628   m->session_id = htonl (ar->slot);
629   m->peer = ar->pid;
630   // FIXME: convert endianess here!
631   // m->properties = ar->properties;
632   GNUNET_MQ_send (ath->mq,
633                   ev);
634 }
635
636
637 /**
638  * A session was destroyed, ATS should now schedule and
639  * allocate under the assumption that this @a ar is no
640  * longer in use.
641  *
642  * @param ar session record to drop
643  */
644 void
645 GNUNET_ATS_session_del (struct GNUNET_ATS_SessionRecord *ar)
646 {
647   struct GNUNET_ATS_TransportHandle *ath = ar->ath;
648   struct GNUNET_MQ_Envelope *ev;
649   struct SessionDelMessage *m;
650
651   LOG (GNUNET_ERROR_TYPE_DEBUG,
652        "Deleting address `%s' for peer `%s'\n",
653        ar->address,
654        GNUNET_i2s (&ar->pid));
655   if (NULL == ath->mq)
656     return;
657   ev = GNUNET_MQ_msg (m,
658                       GNUNET_MESSAGE_TYPE_ATS_SESSION_DEL);
659   m->session_id = htonl (ar->slot);
660   m->peer = ar->pid;
661   GNUNET_MQ_send (ath->mq,
662                   ev);
663 }
664
665
666 /* end of ats_api2_transport.c */