implementing #1747
[oweals/gnunet.git] / src / ats / ats_api_performance.c
1 /*
2      This file is part of GNUnet.
3      (C) 2010,2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20 /**
21  * @file ats/ats_api_performance.c
22  * @brief automatic transport selection and outbound bandwidth determination
23  * @author Christian Grothoff
24  * @author Matthias Wachs
25   */
26 #include "platform.h"
27 #include "gnunet_ats_service.h"
28 #include "ats.h"
29
30
31 /**
32  * Message in linked list we should send to the ATS service.  The
33  * actual binary message follows this struct.
34  */
35 struct PendingMessage
36 {
37
38   /**
39    * Kept in a DLL.
40    */ 
41   struct PendingMessage *next;
42
43   /**
44    * Kept in a DLL.
45    */ 
46   struct PendingMessage *prev;
47
48   /**
49    * Size of the message.
50    */
51   size_t size;
52
53   /**
54    * Is this the 'ATS_START' message?
55    */ 
56   int is_init;
57 };
58
59
60 /**
61  * Linked list of pending reservations.
62  */
63 struct GNUNET_ATS_ReservationContext
64 {
65
66   /**
67    * Kept in a DLL.
68    */ 
69   struct GNUNET_ATS_ReservationContext *next;
70
71   /**
72    * Kept in a DLL.
73    */ 
74   struct GNUNET_ATS_ReservationContext *prev;
75
76   /**
77    * Target peer.
78    */
79   struct GNUNET_PeerIdentity peer;
80                             
81   /**
82    * Desired reservation
83    */
84   int32_t size;
85
86   /**
87    * Function to call on result.
88    */
89   GNUNET_ATS_ReservationCallback rcb;
90
91   /**
92    * Closure for 'rcb'
93    */
94   void *rcb_cls;
95
96   /**
97    * Do we need to undo this reservation if it succeeded?  Set to
98    * GNUNET_YES if a reservation is cancelled.  (at that point, 'info'
99    * is also set to NULL; however, info will ALSO be NULL for the
100    * reservation context that is created to undo the original request,
101    * so 'info' being NULL cannot be used to check if undo is
102    * required).
103    */
104   int undo;
105 };
106
107
108 /**
109  * ATS Handle to obtain and/or modify performance information.
110  */
111 struct GNUNET_ATS_PerformanceHandle
112 {
113  
114   /**
115    * Our configuration.
116    */
117   const struct GNUNET_CONFIGURATION_Handle *cfg;
118
119   /**
120    * Callback to invoke on performance changes.
121    */
122   GNUNET_ATS_PeerInformationCallback infocb;
123   
124   /**
125    * Closure for 'infocb'.
126    */
127   void *infocb_cls;
128
129   /**
130    * Connection to ATS service.
131    */
132   struct GNUNET_CLIENT_Connection *client;
133
134   /**
135    * Head of list of messages for the ATS service.
136    */
137   struct PendingMessage *pending_head;
138
139   /**
140    * Tail of list of messages for the ATS service
141    */
142   struct PendingMessage *pending_tail;
143
144   /**
145    * Head of linked list of pending reservation requests.
146    */
147   struct GNUNET_ATS_ReservationContext *reservation_head;
148
149   /**
150    * Tail of linked list of pending reservation requests.
151    */
152   struct GNUNET_ATS_ReservationContext *reservation_tail;
153
154   /**
155    * Current request for transmission to ATS.
156    */
157   struct GNUNET_CLIENT_TransmitHandle *th;
158
159   /**
160    * Task to trigger reconnect.
161    */ 
162   GNUNET_SCHEDULER_TaskIdentifier task;
163   
164 };
165
166
167 /**
168  * Re-establish the connection to the ATS service.
169  *
170  * @param sh handle to use to re-connect.
171  */
172 static void
173 reconnect (struct GNUNET_ATS_PerformanceHandle *ph);
174
175
176 /**
177  * Re-establish the connection to the ATS service.
178  *
179  * @param cls handle to use to re-connect.
180  * @param tc scheduler context
181  */
182 static void
183 reconnect_task (void *cls,
184                 const struct GNUNET_SCHEDULER_TaskContext *tc)
185 {
186   struct GNUNET_ATS_PerformanceHandle *ph = cls;
187
188   ph->task = GNUNET_SCHEDULER_NO_TASK;
189   reconnect (ph);
190 }
191
192
193 /**
194  * Transmit messages from the message queue to the service
195  * (if there are any, and if we are not already trying).
196  *
197  * @param sh handle to use
198  */
199 static void
200 do_transmit (struct GNUNET_ATS_PerformanceHandle *ph);
201
202
203 /**
204  * We can now transmit a message to ATS. Do it.
205  *
206  * @param cls the 'struct GNUNET_ATS_SchedulingHandle'
207  * @param size number of bytes we can transmit to ATS
208  * @param buf where to copy the messages
209  * @return number of bytes copied into buf
210  */
211 static size_t
212 transmit_message_to_ats (void *cls,
213                          size_t size,
214                          void *buf)
215 {
216   struct GNUNET_ATS_PerformanceHandle *ph = cls;
217   struct PendingMessage *p;
218   size_t ret;
219   char *cbuf;
220
221   ph->th = NULL;
222   ret = 0;
223   cbuf = buf;
224   while ( (NULL != (p = ph->pending_head)) &&
225           (p->size <= size) )
226   {
227     memcpy (&cbuf[ret], &p[1], p->size);    
228     ret += p->size;
229     size -= p->size;
230     GNUNET_CONTAINER_DLL_remove (ph->pending_head,
231                                  ph->pending_tail,
232                                  p);
233     GNUNET_free (p);
234   }
235   do_transmit (ph);
236   return ret;
237 }
238
239
240 /**
241  * Transmit messages from the message queue to the service
242  * (if there are any, and if we are not already trying).
243  *
244  * @param ph handle to use
245  */
246 static void
247 do_transmit (struct GNUNET_ATS_PerformanceHandle *ph)
248 {
249   struct PendingMessage *p;
250
251   if (NULL != ph->th)
252     return;
253   if (NULL == (p = ph->pending_head))
254     return;
255   if (NULL == ph->client)
256     return; /* currently reconnecting */
257   ph->th = GNUNET_CLIENT_notify_transmit_ready (ph->client,
258                                                 p->size,
259                                                 GNUNET_TIME_UNIT_FOREVER_REL,
260                                                 GNUNET_YES,
261                                                 &transmit_message_to_ats, ph);
262 }
263
264
265 /**
266  * We received a peer information message.  Validate and process it.
267  *
268  * @param ph our context with the callback
269  * @param msg the message
270  * @return GNUNET_OK if the message was well-formed
271  */
272 static int
273 process_pi_message (struct GNUNET_ATS_PerformanceHandle *ph,
274                     const struct GNUNET_MessageHeader *msg)
275 {
276   const struct PeerInformationMessage *pi;
277   const struct GNUNET_ATS_Information *atsi;
278   const char *address;
279   const char *plugin_name;
280   uint16_t address_length;
281   uint16_t plugin_name_length;
282   uint32_t ats_count;
283
284   if (ph->infocb == NULL)
285   {
286     GNUNET_break (0);
287     return GNUNET_SYSERR;
288   }    
289   if (ntohs (msg->size) < sizeof (struct PeerInformationMessage))
290   {
291     GNUNET_break (0);
292     return GNUNET_SYSERR;
293   }
294   pi = (const struct PeerInformationMessage*) msg;
295   ats_count = ntohl (pi->ats_count);
296   address_length = ntohs (pi->address_length);
297   plugin_name_length = ntohs (pi->plugin_name_length);
298   atsi = (const struct GNUNET_ATS_Information*) &pi[1];
299   address = (const char*) &atsi[ats_count];
300   plugin_name = &address[address_length];
301   if ( (address_length +
302         plugin_name_length +
303         ats_count * sizeof (struct GNUNET_ATS_Information) +
304         sizeof (struct PeerInformationMessage) != ntohs (msg->size))  ||
305        (ats_count > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_ATS_Information)) ||
306        (plugin_name[plugin_name_length - 1] != '\0') )
307   {
308     GNUNET_break (0);
309     return GNUNET_SYSERR;
310   }
311   ph->infocb (ph->infocb_cls,
312               &pi->peer,
313               plugin_name,
314               address, address_length,
315               pi->bandwidth_out,
316               pi->bandwidth_in,
317               atsi,
318               ats_count);
319   return GNUNET_OK;
320 }
321
322
323 /**
324  * We received a reservation result message.  Validate and process it.
325  *
326  * @param ph our context with the callback
327  * @param msg the message
328  * @return GNUNET_OK if the message was well-formed
329  */
330 static int
331 process_rr_message (struct GNUNET_ATS_PerformanceHandle *ph,
332                     const struct GNUNET_MessageHeader *msg)
333 {
334   const struct ReservationResultMessage *rr;
335   struct GNUNET_ATS_ReservationContext *rc;
336   int32_t amount;
337
338   if (ntohs (msg->size) < sizeof (struct ReservationResultMessage))
339   {
340     GNUNET_break (0);
341     return GNUNET_SYSERR;
342   }
343   rr = (const struct ReservationResultMessage*) msg;
344   amount = ntohl (rr->amount);
345   rc = ph->reservation_head;
346   if (0 != memcmp (&rr->peer,
347                    &rc->peer,
348                    sizeof (struct GNUNET_PeerIdentity)))
349   {
350     GNUNET_break (0);
351     return GNUNET_SYSERR;
352   }
353   GNUNET_CONTAINER_DLL_remove (ph->reservation_head,
354                                ph->reservation_tail,
355                                rc);
356   if ( (amount == 0) ||
357        (rc->rcb != NULL) )
358   {
359     /* tell client if not cancelled */
360     if (rc->rcb != NULL)
361       rc->rcb (rc->rcb_cls,
362                &rr->peer,
363                amount,
364               GNUNET_TIME_relative_ntoh (rr->res_delay));       
365     GNUNET_free (rc);
366     return GNUNET_OK;
367   }
368   /* amount non-zero, but client cancelled, consider undo! */
369   if (GNUNET_YES != rc->undo)
370   {
371     GNUNET_free (rc);
372     return GNUNET_OK; /* do not try to undo failed undos or negative amounts */
373   }
374   GNUNET_free (rc);
375   (void) GNUNET_ATS_reserve_bandwidth (ph, &rr->peer, -amount, NULL, NULL);
376   return GNUNET_OK;
377 }
378
379
380 /**
381  * Type of a function to call when we receive a message
382  * from the service.
383  *
384  * @param cls the 'struct GNUNET_ATS_SchedulingHandle'
385  * @param msg message received, NULL on timeout or fatal error
386  */
387 static void
388 process_ats_message (void *cls,
389                      const struct GNUNET_MessageHeader *msg)
390 {
391   struct GNUNET_ATS_PerformanceHandle *ph = cls;
392
393   if (NULL == msg) 
394     goto reconnect;
395   switch (ntohs (msg->type))
396   {
397   case GNUNET_MESSAGE_TYPE_ATS_PEER_INFORMATION:
398     if (GNUNET_OK != process_pi_message (ph, msg))
399       goto reconnect;
400     break;
401   case GNUNET_MESSAGE_TYPE_ATS_RESERVATION_RESULT:
402     if (GNUNET_OK != process_rr_message (ph, msg))
403       goto reconnect;    
404     break;
405   default:
406     GNUNET_break (0);
407     goto reconnect;
408     return;
409   }
410   GNUNET_CLIENT_receive (ph->client,
411                          &process_ats_message, ph,
412                          GNUNET_TIME_UNIT_FOREVER_REL);
413   return;
414  reconnect:
415   GNUNET_CLIENT_disconnect (ph->client, GNUNET_NO);
416   ph->client = NULL;
417   ph->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
418                                            &reconnect_task, ph);
419 }
420
421
422 /**
423  * Re-establish the connection to the ATS service.
424  *
425  * @param ph handle to use to re-connect.
426  */
427 static void
428 reconnect (struct GNUNET_ATS_PerformanceHandle *ph)
429 {
430   struct PendingMessage *p;
431   struct ClientStartMessage *init;
432
433   GNUNET_assert (NULL == ph->client);
434   ph->client = GNUNET_CLIENT_connect ("ats", ph->cfg);
435   GNUNET_assert (NULL != ph->client);
436   GNUNET_CLIENT_receive (ph->client,
437                          &process_ats_message, ph,
438                          GNUNET_TIME_UNIT_FOREVER_REL);
439   if ( (NULL == (p = ph->pending_head)) ||
440        (GNUNET_YES != p->is_init) )
441   {
442     p = GNUNET_malloc (sizeof (struct PendingMessage) +
443                        sizeof (struct ClientStartMessage));
444     p->size = sizeof (struct ClientStartMessage);
445     p->is_init = GNUNET_YES;
446     init = (struct ClientStartMessage *) &p[1];
447     init->header.type = htons (GNUNET_MESSAGE_TYPE_ATS_START);
448     init->header.size = htons (sizeof (struct ClientStartMessage));
449     init->start_flag = htonl ((ph->infocb == NULL) 
450                               ? START_FLAG_PERFORMANCE_NO_PIC 
451                               : START_FLAG_PERFORMANCE_WITH_PIC);
452     GNUNET_CONTAINER_DLL_insert (ph->pending_head,
453                                  ph->pending_tail,
454                                  p);
455   }
456   do_transmit (ph);
457 }
458
459
460
461 /**
462  * Get handle to access performance API of the ATS subsystem.
463  *
464  * @param cfg configuration to use
465  * @param infocb function to call on allocation changes, can be NULL
466  * @param infocb_cls closure for infocb
467  * @return ats performance context
468  */
469 struct GNUNET_ATS_PerformanceHandle *
470 GNUNET_ATS_performance_init (const struct GNUNET_CONFIGURATION_Handle *cfg,
471                              GNUNET_ATS_PeerInformationCallback infocb,
472                              void *infocb_cls)
473 {
474   struct GNUNET_ATS_PerformanceHandle *ph;
475
476   ph = GNUNET_malloc (sizeof (struct GNUNET_ATS_PerformanceHandle));
477   ph->cfg = cfg;
478   ph->infocb = infocb;
479   ph->infocb_cls = infocb_cls;
480   reconnect (ph);
481   return ph;
482 }
483
484
485 /**
486  * Client is done using the ATS performance subsystem, release resources.
487  *
488  * @param ph handle
489  */
490 void
491 GNUNET_ATS_performance_done (struct GNUNET_ATS_PerformanceHandle *ph)
492 {
493   struct PendingMessage *p;
494   struct GNUNET_ATS_ReservationContext *rc;
495   
496   while (NULL != (p = ph->pending_head))
497   {
498     GNUNET_CONTAINER_DLL_remove (ph->pending_head,
499                                  ph->pending_tail,
500                                  p);
501     GNUNET_free (p);
502   }
503   while (NULL != (rc = ph->reservation_head))
504   {
505     GNUNET_CONTAINER_DLL_remove (ph->reservation_head,
506                                  ph->reservation_tail,
507                                  rc);
508     GNUNET_break (NULL == rc->rcb);
509     GNUNET_free (rc);
510   }  
511   if (GNUNET_SCHEDULER_NO_TASK != ph->task)
512   {
513     GNUNET_SCHEDULER_cancel (ph->task);
514     ph->task = GNUNET_SCHEDULER_NO_TASK;
515   }
516   if (NULL != ph->client)
517   {
518     GNUNET_CLIENT_disconnect (ph->client, GNUNET_NO);
519     ph->client = NULL;
520   }
521   GNUNET_free (ph);
522 }
523
524
525 /**
526  * Reserve inbound bandwidth from the given peer.  ATS will look at
527  * the current amount of traffic we receive from the peer and ensure
528  * that the peer could add 'amount' of data to its stream.
529  *
530  * @param ph performance handle
531  * @param peer identifies the peer
532  * @param amount reserve N bytes for receiving, negative
533  *                amounts can be used to undo a (recent) reservation;
534  * @param rcb function to call with the resulting reservation information
535  * @param rcb_cls closure for info
536  * @return NULL on error
537  * @deprecated will be replaced soon
538  */
539 struct GNUNET_ATS_ReservationContext *
540 GNUNET_ATS_reserve_bandwidth (struct GNUNET_ATS_PerformanceHandle *ph,
541                               const struct GNUNET_PeerIdentity *peer,
542                               int32_t amount, 
543                               GNUNET_ATS_ReservationCallback rcb, 
544                               void *rcb_cls)
545 {
546   struct GNUNET_ATS_ReservationContext *rc;
547   struct PendingMessage *p;
548   struct ReservationRequestMessage *m;
549
550   rc = GNUNET_malloc (sizeof (struct GNUNET_ATS_ReservationContext));
551   rc->size = amount;
552   rc->peer = *peer;
553   rc->rcb = rcb;
554   rc->rcb_cls = rcb_cls;
555   if ( (rcb != NULL) && (amount > 0) )
556     rc->undo = GNUNET_YES;
557   GNUNET_CONTAINER_DLL_insert_tail (ph->reservation_head,
558                                     ph->reservation_tail,
559                                     rc);
560   
561   p = GNUNET_malloc (sizeof (struct PendingMessage) + 
562                      sizeof (struct ReservationRequestMessage));
563   p->size = sizeof (struct ReservationRequestMessage);
564   p->is_init = GNUNET_NO;
565   m = (struct ReservationRequestMessage*) &p[1];
566   m->header.type = htons (GNUNET_MESSAGE_TYPE_ATS_RESERVATION_REQUEST);
567   m->header.size = htons (sizeof (struct ReservationRequestMessage));
568   m->amount = htonl (amount);
569   m->peer = *peer;
570   GNUNET_CONTAINER_DLL_insert_tail (ph->pending_head,
571                                     ph->pending_tail,
572                                     p);
573   do_transmit (ph);
574   return rc;
575 }
576
577
578 /**
579  * Cancel request for reserving bandwidth.
580  *
581  * @param rc context returned by the original GNUNET_ATS_reserve_bandwidth call
582  */
583 void
584 GNUNET_ATS_reserve_bandwidth_cancel (struct
585                                      GNUNET_ATS_ReservationContext *rc)
586 {
587   rc->rcb = NULL;
588 }
589
590
591 /**
592  * Change preferences for the given peer. Preference changes are forgotten if peers
593  * disconnect.
594  * 
595  * @param ph performance handle
596  * @param peer identifies the peer
597  * @param ... 0-terminated specification of the desired changes
598  */
599 void
600 GNUNET_ATS_change_preference (struct GNUNET_ATS_PerformanceHandle *ph,
601                               const struct GNUNET_PeerIdentity *peer,
602                               ...)
603 {
604   struct PendingMessage *p;
605   struct ChangePreferenceMessage *m;
606   size_t msize;
607   uint32_t count;
608   struct PreferenceInformation *pi;
609   va_list ap;
610   enum GNUNET_ATS_PreferenceKind kind;
611
612   count = 0;
613   va_start (ap, peer);
614   while (GNUNET_ATS_PREFERENCE_END != (kind = va_arg (ap, enum GNUNET_ATS_PreferenceKind)))
615   {
616     switch (kind)
617     {
618     case GNUNET_ATS_PREFERENCE_BANDWIDTH:
619       count++;
620       (void) va_arg (ap, double);
621       break;
622     case GNUNET_ATS_PREFERENCE_LATENCY:
623       count++;
624       (void) va_arg (ap, double);
625       break;
626     default:
627       GNUNET_assert (0);      
628     }
629   }
630   va_end (ap);
631   msize = count * sizeof (struct PreferenceInformation) +
632     sizeof (struct ChangePreferenceMessage);
633   p = GNUNET_malloc (sizeof (struct PendingMessage) + 
634                      msize);
635   p->size = msize;
636   p->is_init = GNUNET_NO;
637   m = (struct ChangePreferenceMessage*) &p[1];
638   m->header.type = htons (GNUNET_MESSAGE_TYPE_ATS_PREFERENCE_CHANGE);
639   m->header.size = htons (msize);
640   m->num_preferences = htonl (count);
641   m->peer = *peer;
642   pi = (struct PreferenceInformation*) &m[1];
643   count = 0;
644   va_start (ap, peer);
645   while (GNUNET_ATS_PREFERENCE_END != (kind = va_arg (ap, enum GNUNET_ATS_PreferenceKind)))
646   {
647     pi[count].preference_kind = htonl (kind);
648     switch (kind)
649     {
650     case GNUNET_ATS_PREFERENCE_BANDWIDTH:
651       pi[count].preference_value = (float) va_arg (ap, double);
652       count++;
653       break;
654     case GNUNET_ATS_PREFERENCE_LATENCY:
655       pi[count].preference_value = (float) va_arg (ap, double);
656       count++;
657       break;
658     default:
659       GNUNET_assert (0);      
660     }
661   }
662   va_end (ap);
663   GNUNET_CONTAINER_DLL_insert_tail (ph->pending_head,
664                                     ph->pending_tail,
665                                     p);
666   do_transmit (ph);
667 }
668
669 /* end of ats_api_performance.c */
670