finish (?) libgnunetatstransport for now
[oweals/gnunet.git] / src / ats / ats_api2_transport.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2010-2015, 2018 GNUnet e.V.
4
5      GNUnet is free software: you can redistribute it and/or modify it
6      under the terms of the GNU Affero General Public License as published
7      by the Free Software Foundation, either version 3 of the License,
8      or (at your option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      Affero General Public License for more details.
14
15      You should have received a copy of the GNU Affero General Public License
16      along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18 /**
19  * @file ats/ats_api2_transport.c
20  * @brief address suggestions and bandwidth allocation
21  * @author Christian Grothoff
22  * @author Matthias Wachs
23  */
24 #include "platform.h"
25 #include "gnunet_ats_transport_service.h"
26 #include "ats2.h"
27
28 #define LOG(kind,...) GNUNET_log_from(kind, "ats-transport-api", __VA_ARGS__)
29
30
31 /**
32  * Information we track per session, incoming or outgoing.  It also
33  * doesn't matter if we have a session, any session that ATS is
34  * allowed to suggest right now should be tracked.
35  */
36 struct GNUNET_ATS_SessionRecord
37 {
38
39   /**
40    * Transport handle this session record belongs to.
41    */
42   struct GNUNET_ATS_TransportHandle *ath;
43
44   /**
45    * Address data.
46    */
47   const char *address;
48
49   /**
50    * Session handle, NULL if inbound-only (also implies we cannot
51    * actually control inbound traffic via transport!).  So if
52    * @e session is NULL, the @e properties are informative for
53    * ATS (connection exists, utilization) but ATS cannot directly
54    * influence it (and should thus not call the
55    * #GNUNET_ATS_AllocationCallback for this @e session, which is
56    * obvious as NULL is not a meaningful session to allocation
57    * resources to).
58    */
59   struct GNUNET_ATS_Session *session;
60
61   /**
62    * Identity of the peer reached at @e address.
63    */
64   struct GNUNET_PeerIdentity pid;
65
66   /**
67    * Performance data about the @e session.
68    */
69   struct GNUNET_ATS_Properties properties;
70
71   /**
72    * Unique ID to identify this session at this @a pid in IPC
73    * messages.
74    */
75   uint32_t slot;
76
77 };
78
79
80 /**
81  * Handle to the ATS subsystem for bandwidth/transport transport information.
82  */
83 struct GNUNET_ATS_TransportHandle
84 {
85
86   /**
87    * Our configuration.
88    */
89   const struct GNUNET_CONFIGURATION_Handle *cfg;
90
91   /**
92    * Callback to invoke on suggestions.
93    */
94   GNUNET_ATS_SuggestionCallback suggest_cb;
95
96   /**
97    * Closure for @e suggest_cb.
98    */
99   void *suggest_cb_cls;
100
101   /**
102    * Callback to invoke on allocations.
103    */
104   GNUNET_ATS_AllocationCallback alloc_cb;
105
106   /**
107    * Closure for @e alloc_cb.
108    */
109   void *alloc_cb_cls;
110
111   /**
112    * Message queue for sending requests to the ATS service.
113    */
114   struct GNUNET_MQ_Handle *mq;
115
116   /**
117    * Task to trigger reconnect.
118    */
119   struct GNUNET_SCHEDULER_Task *task;
120
121   /**
122    * Hash map mapping PIDs to session records.
123    */
124   struct GNUNET_CONTAINER_MultiPeerMap *records;
125
126   /**
127    * Reconnect backoff delay.
128    */
129   struct GNUNET_TIME_Relative backoff;
130
131 };
132
133
134
135 /**
136  * Convert ATS properties from host to network byte order.
137  *
138  * @param nbo[OUT] value written
139  * @param hbo value read
140  */
141 static void
142 properties_hton (struct PropertiesNBO *nbo,
143                  const struct GNUNET_ATS_Properties *hbo)
144 {
145   nbo->delay = GNUNET_TIME_relative_hton (hbo->delay);
146   nbo->goodput_out = htonl (hbo->goodput_out);
147   nbo->goodput_in = htonl (hbo->goodput_in);
148   nbo->utilization_out = htonl (hbo->utilization_out);
149   nbo->utilization_in = htonl (hbo->utilization_in);
150   nbo->distance = htonl (hbo->distance);
151   nbo->mtu = htonl (hbo->mtu);
152   nbo->nt = htonl ((uint32_t) hbo->nt);
153   nbo->cc = htonl ((uint32_t) hbo->cc);
154 }
155
156
157 /**
158  * Re-establish the connection to the ATS service.
159  *
160  * @param sh handle to use to re-connect.
161  */
162 static void
163 reconnect (struct GNUNET_ATS_TransportHandle *ath);
164
165
166 /**
167  * Re-establish the connection to the ATS service.
168  *
169  * @param cls handle to use to re-connect.
170  */
171 static void
172 reconnect_task (void *cls)
173 {
174   struct GNUNET_ATS_TransportHandle *ath = cls;
175
176   ath->task = NULL;
177   reconnect (ath);
178 }
179
180
181 /**
182  * Disconnect from ATS and then reconnect.
183  *
184  * @param ath our handle
185  */
186 static void
187 force_reconnect (struct GNUNET_ATS_TransportHandle *ath)
188 {
189   if (NULL != ath->mq)
190   {
191     GNUNET_MQ_destroy (ath->mq);
192     ath->mq = NULL;
193   }
194   /* FIXME: do we tell transport service about disconnect events? CON:
195      initially ATS will have a really screwed picture of the world and
196      the rapid change would be bad.  PRO: if we don't, ATS and
197      transport may disagree about the allocation for a while...
198      For now: lazy: do nothing. */
199   ath->backoff = GNUNET_TIME_STD_BACKOFF (ath->backoff);
200   ath->task = GNUNET_SCHEDULER_add_delayed (ath->backoff,
201                                            &reconnect_task,
202                                            ath);
203 }
204
205
206 /**
207  * Check format of address suggestion message from the service.
208  *
209  * @param cls the `struct GNUNET_ATS_TransportHandle`
210  * @param m message received
211  */
212 static int
213 check_ats_address_suggestion (void *cls,
214                               const struct AddressSuggestionMessage *m)
215 {
216   (void) cls;
217   GNUNET_MQ_check_zero_termination (m);
218   return GNUNET_SYSERR;
219 }
220
221
222 /**
223  * We received an address suggestion message from the service.
224  *
225  * @param cls the `struct GNUNET_ATS_TransportHandle`
226  * @param m message received
227  */
228 static void
229 handle_ats_address_suggestion (void *cls,
230                                const struct AddressSuggestionMessage *m)
231 {
232   struct GNUNET_ATS_TransportHandle *ath = cls;
233   const char *address = (const char *) &m[1];
234
235   ath->suggest_cb (ath->suggest_cb_cls,
236                   &m->peer,
237                   address);
238 }
239
240
241 /**
242  * Closure for #match_session_cb.
243  */
244 struct FindContext
245 {
246   /**
247    * Key to look for.
248    */
249   uint32_t session_id;
250
251   /**
252    * Where to store the result.
253    */
254   struct GNUNET_ATS_SessionRecord *sr;
255 };
256
257
258 /**
259  * Finds matching session record.
260  *
261  * @param cls a `struct FindContext`
262  * @param pid peer identity (unused)
263  * @param value a `struct GNUNET_ATS_SessionRecord`
264  * @return #GNUNET_NO if match found, #GNUNET_YES to continue searching
265  */
266 static int
267 match_session_cb (void *cls,
268                   const struct GNUNET_PeerIdentity *pid,
269                   void *value)
270 {
271   struct FindContext *fc = cls;
272   struct GNUNET_ATS_SessionRecord *sr = value;
273
274   (void) pid;
275   if (fc->session_id == sr->slot)
276   {
277     fc->sr = sr;
278     return GNUNET_NO;
279   }
280   return GNUNET_YES;
281 }
282
283
284
285 /**
286  * Find session record for peer @a pid and session @a session_id
287  *
288  * @param ath transport handle to search
289  * @param session_id session ID to match
290  * @param pid peer to search under
291  * @return NULL if no such record exists
292  */
293 static struct GNUNET_ATS_SessionRecord *
294 find_session (struct GNUNET_ATS_TransportHandle *ath,
295               uint32_t session_id,
296               const struct GNUNET_PeerIdentity *pid)
297 {
298   struct FindContext fc = {
299     .session_id = session_id,
300     .sr = NULL
301   };
302   GNUNET_CONTAINER_multipeermap_get_multiple (ath->records,
303                                               pid,
304                                               &match_session_cb,
305                                               &fc);
306   return fc.sr;
307 }
308
309
310 /**
311  * We received a session allocation message from the service.
312  *
313  * @param cls the `struct GNUNET_ATS_TransportHandle`
314  * @param m message received
315  */
316 static void
317 handle_ats_session_allocation (void *cls,
318                                const struct SessionAllocationMessage *m)
319 {
320   struct GNUNET_ATS_TransportHandle *ath = cls;
321   struct GNUNET_ATS_SessionRecord *ar;
322   uint32_t session_id;
323
324   session_id = ntohl (m->session_id);
325   ar = find_session (ath,
326                      session_id,
327                      &m->peer);
328   if (NULL == ar)
329   {
330     /* this can (rarely) happen if ATS changes an sessiones allocation
331        just when the transport service deleted it */
332     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
333                 "Allocation ignored, session unknown\n");
334     return;
335   }
336   ath->backoff = GNUNET_TIME_UNIT_ZERO;
337   LOG (GNUNET_ERROR_TYPE_DEBUG,
338        "ATS allocates bandwidth for peer `%s' using address %s\n",
339        GNUNET_i2s (&ar->pid),
340        ar->address);
341   ath->alloc_cb (ath->alloc_cb_cls,
342                  ar->session,
343                  m->bandwidth_out,
344                  m->bandwidth_in);
345 }
346
347
348 /**
349  * We encountered an error handling the MQ to the ATS service.
350  * Reconnect.
351  *
352  * @param cls the `struct GNUNET_ATS_TransportHandle`
353  * @param error details about the error
354  */
355 static void
356 error_handler (void *cls,
357                enum GNUNET_MQ_Error error)
358 {
359   struct GNUNET_ATS_TransportHandle *ath = cls;
360
361   LOG (GNUNET_ERROR_TYPE_DEBUG,
362        "ATS connection died (code %d), reconnecting\n",
363        (int) error);
364   force_reconnect (ath);
365 }
366
367
368 /**
369  * Generate and transmit the `struct SessionAddMessage` for the given
370  * session record.
371  *
372  * @param ar the session to inform the ATS service about
373  */
374 static void
375 send_add_session_message (const struct GNUNET_ATS_SessionRecord *ar)
376 {
377   struct GNUNET_ATS_TransportHandle *ath = ar->ath;
378   struct GNUNET_MQ_Envelope *ev;
379   struct SessionAddMessage *m;
380   size_t alen;
381
382   if (NULL == ath->mq)
383     return; /* disconnected, skip for now */
384   alen = strlen (ar->address) + 1;
385   ev = GNUNET_MQ_msg_extra (m,
386                             alen,
387                             (NULL == ar->session)
388                             ? GNUNET_MESSAGE_TYPE_ATS_SESSION_ADD_INBOUND_ONLY
389                             : GNUNET_MESSAGE_TYPE_ATS_SESSION_ADD);
390   m->peer = ar->pid;
391   m->session_id = htonl (ar->slot);
392   properties_hton (&m->properties,
393                    &ar->properties);
394   GNUNET_memcpy (&m[1],
395                  ar->address,
396                  alen);
397
398   LOG (GNUNET_ERROR_TYPE_DEBUG,
399        "Adding address `%s' for peer `%s'\n",
400        ar->address,
401        GNUNET_i2s (&ar->pid));
402   GNUNET_MQ_send (ath->mq,
403                   ev);
404 }
405
406
407 /**
408  * Send ATS information about the session record.
409  *
410  * @param cls our `struct GNUNET_ATS_TransportHandle *`, unused
411  * @param pid unused
412  * @param value the `struct GNUNET_ATS_SessionRecord *` to add
413  * @return #GNUNET_OK
414  */
415 static int
416 send_add_session_cb (void *cls,
417                      const struct GNUNET_PeerIdentity *pid,
418                      void *value)
419 {
420   struct GNUNET_ATS_SessionRecord *ar = value;
421
422   (void) cls;
423   (void) pid;
424   send_add_session_message (ar);
425   return GNUNET_OK;
426 }
427
428
429 /**
430  * Re-establish the connection to the ATS service.
431  *
432  * @param ath handle to use to re-connect.
433  */
434 static void
435 reconnect (struct GNUNET_ATS_TransportHandle *ath)
436 {
437   struct GNUNET_MQ_MessageHandler handlers[] = {
438     GNUNET_MQ_hd_var_size (ats_address_suggestion,
439                            GNUNET_MESSAGE_TYPE_ATS_ADDRESS_SUGGESTION,
440                            struct AddressSuggestionMessage,
441                            ath),
442     GNUNET_MQ_hd_fixed_size (ats_session_allocation,
443                              GNUNET_MESSAGE_TYPE_ATS_SESSION_ALLOCATION,
444                              struct SessionAllocationMessage,
445                              ath),
446     GNUNET_MQ_handler_end ()
447   };
448   struct GNUNET_MQ_Envelope *ev;
449   struct GNUNET_MessageHeader *init;
450
451   GNUNET_assert (NULL == ath->mq);
452   ath->mq = GNUNET_CLIENT_connect (ath->cfg,
453                                   "ats",
454                                   handlers,
455                                   &error_handler,
456                                   ath);
457   if (NULL == ath->mq)
458   {
459     GNUNET_break (0);
460     force_reconnect (ath);
461     return;
462   }
463   ev = GNUNET_MQ_msg (init,
464                       GNUNET_MESSAGE_TYPE_ATS_START);
465   GNUNET_MQ_send (ath->mq,
466                   ev);
467   if (NULL == ath->mq)
468     return;
469   GNUNET_CONTAINER_multipeermap_iterate (ath->records,
470                                          &send_add_session_cb,
471                                          ath);
472 }
473
474
475 /**
476  * Initialize the ATS subsystem.
477  *
478  * @param cfg configuration to use
479  * @param alloc_cb notification to call whenever the allocation changed
480  * @param alloc_cb_cls closure for @a alloc_cb
481  * @param suggest_cb notification to call whenever the suggestation is made
482  * @param suggest_cb_cls closure for @a suggest_cb
483  * @return ats context
484  */
485 struct GNUNET_ATS_TransportHandle *
486 GNUNET_ATS_transport_init (const struct GNUNET_CONFIGURATION_Handle *cfg,
487                            GNUNET_ATS_AllocationCallback alloc_cb,
488                            void *alloc_cb_cls,
489                            GNUNET_ATS_SuggestionCallback suggest_cb,
490                            void *suggest_cb_cls)
491 {
492   struct GNUNET_ATS_TransportHandle *ath;
493
494   ath = GNUNET_new (struct GNUNET_ATS_TransportHandle);
495   ath->cfg = cfg;
496   ath->suggest_cb = suggest_cb;
497   ath->suggest_cb_cls = suggest_cb_cls;
498   ath->alloc_cb = alloc_cb;
499   ath->alloc_cb_cls = alloc_cb_cls;
500   ath->records = GNUNET_CONTAINER_multipeermap_create (128,
501                                                       GNUNET_YES);
502   reconnect (ath);
503   return ath;
504 }
505
506
507 /**
508  * Release memory associated with the session record.
509  *
510  * @param cls NULL
511  * @param pid unused
512  * @param value a `struct GNUNET_ATS_SessionRecord`
513  * @return #GNUNET_OK
514  */
515 static int
516 free_record (void *cls,
517              const struct GNUNET_PeerIdentity *pid,
518              void *value)
519 {
520   struct GNUNET_ATS_SessionRecord *ar = value;
521
522   (void) cls;
523   (void) pid;
524   GNUNET_free (ar);
525   return GNUNET_OK;
526 }
527
528
529 /**
530  * Client is done with ATS transport, release resources.
531  *
532  * @param ath handle to release
533  */
534 void
535 GNUNET_ATS_transport_done (struct GNUNET_ATS_TransportHandle *ath)
536 {
537   if (NULL != ath->mq)
538   {
539     GNUNET_MQ_destroy (ath->mq);
540     ath->mq = NULL;
541   }
542   if (NULL != ath->task)
543   {
544     GNUNET_SCHEDULER_cancel (ath->task);
545     ath->task = NULL;
546   }
547   GNUNET_CONTAINER_multipeermap_iterate (ath->records,
548                                          &free_record,
549                                          NULL);
550   GNUNET_CONTAINER_multipeermap_destroy (ath->records);
551   GNUNET_free (ath);
552 }
553
554
555 /**
556  * We have a new session ATS should know. Sessiones have to be added
557  * with this function before they can be: updated, set in use and
558  * destroyed.
559  *
560  * @param ath handle
561  * @param pid peer we connected to
562  * @param address the address (human readable version)
563  * @param session transport-internal handle for the session/queue, NULL if
564  *        the session is inbound-only
565  * @param prop performance data for the session
566  * @return handle to the session representation inside ATS, NULL
567  *         on error (i.e. ATS knows this exact session already)
568  */
569 struct GNUNET_ATS_SessionRecord *
570 GNUNET_ATS_session_add (struct GNUNET_ATS_TransportHandle *ath,
571                         const struct GNUNET_PeerIdentity *pid,
572                         const char *address,
573                         struct GNUNET_ATS_Session *session,
574                         const struct GNUNET_ATS_Properties *prop)
575 {
576   struct GNUNET_ATS_SessionRecord *ar;
577   uint32_t s;
578   size_t alen;
579
580   if (NULL == address)
581   {
582     /* we need a valid address */
583     GNUNET_break (0);
584     return NULL;
585   }
586   alen = strlen (address) + 1;
587   if ( (alen + sizeof (struct SessionAddMessage) >= GNUNET_MAX_MESSAGE_SIZE) ||
588        (alen >= GNUNET_MAX_MESSAGE_SIZE) )
589   {
590     /* address too large for us, this should not happen */
591     GNUNET_break (0);
592     return NULL;
593   }
594
595   /* Spin 's' until we find an unused session ID for this pid */
596   for (s = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
597                                      UINT32_MAX);
598        NULL != find_session (ath,
599                              s,
600                              pid);
601        s++) ;
602
603   alen = strlen (address) + 1;
604   ar = GNUNET_malloc (sizeof (struct GNUNET_ATS_SessionRecord) + alen);
605   ar->ath = ath;
606   ar->slot = s;
607   ar->session = session;
608   ar->address = (const char *) &ar[1];
609   ar->pid = *pid;
610   ar->properties = *prop;
611   memcpy (&ar[1],
612           address,
613           alen);
614   (void) GNUNET_CONTAINER_multipeermap_put (ath->records,
615                                             &ar->pid,
616                                             ar,
617                                             GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
618   send_add_session_message (ar);
619   return ar;
620 }
621
622
623 /**
624  * We have updated performance statistics for a given session.  Note
625  * that this function can be called for sessiones that are currently
626  * in use as well as sessiones that are valid but not actively in use.
627  * Furthermore, the peer may not even be connected to us right now (in
628  * which case the call may be ignored or the information may be stored
629  * for later use).  Update bandwidth assignments.
630  *
631  * @param ar session record to update information for
632  * @param prop performance data for the session
633  */
634 void
635 GNUNET_ATS_session_update (struct GNUNET_ATS_SessionRecord *ar,
636                            const struct GNUNET_ATS_Properties *prop)
637 {
638   struct GNUNET_ATS_TransportHandle *ath = ar->ath;
639   struct GNUNET_MQ_Envelope *ev;
640   struct SessionUpdateMessage *m;
641
642   LOG (GNUNET_ERROR_TYPE_DEBUG,
643        "Updating address `%s' for peer `%s'\n",
644        ar->address,
645        GNUNET_i2s (&ar->pid));
646   ar->properties = *prop;
647   if (NULL == ath->mq)
648     return; /* disconnected, skip for now */
649   ev = GNUNET_MQ_msg (m,
650                       GNUNET_MESSAGE_TYPE_ATS_SESSION_UPDATE);
651   m->session_id = htonl (ar->slot);
652   m->peer = ar->pid;
653   properties_hton (&m->properties,
654                    &ar->properties);
655   GNUNET_MQ_send (ath->mq,
656                   ev);
657 }
658
659
660 /**
661  * A session was destroyed, ATS should now schedule and
662  * allocate under the assumption that this @a ar is no
663  * longer in use.
664  *
665  * @param ar session record to drop
666  */
667 void
668 GNUNET_ATS_session_del (struct GNUNET_ATS_SessionRecord *ar)
669 {
670   struct GNUNET_ATS_TransportHandle *ath = ar->ath;
671   struct GNUNET_MQ_Envelope *ev;
672   struct SessionDelMessage *m;
673
674   LOG (GNUNET_ERROR_TYPE_DEBUG,
675        "Deleting address `%s' for peer `%s'\n",
676        ar->address,
677        GNUNET_i2s (&ar->pid));
678   if (NULL == ath->mq)
679     return;
680   ev = GNUNET_MQ_msg (m,
681                       GNUNET_MESSAGE_TYPE_ATS_SESSION_DEL);
682   m->session_id = htonl (ar->slot);
683   m->peer = ar->pid;
684   GNUNET_MQ_send (ath->mq,
685                   ev);
686 }
687
688
689 /* end of ats_api2_transport.c */