fix
[oweals/gnunet.git] / src / ats / ats_api_performance.c
1 /*
2      This file is part of GNUnet.
3      (C) 2010,2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20 /**
21  * @file ats/ats_api_performance.c
22  * @brief automatic transport selection and outbound bandwidth determination
23  * @author Christian Grothoff
24  * @author Matthias Wachs
25   */
26 #include "platform.h"
27 #include "gnunet_ats_service.h"
28 #include "ats.h"
29
30
31 /**
32  * Message in linked list we should send to the ATS service.  The
33  * actual binary message follows this struct.
34  */
35 struct PendingMessage
36 {
37
38   /**
39    * Kept in a DLL.
40    */ 
41   struct PendingMessage *next;
42
43   /**
44    * Kept in a DLL.
45    */ 
46   struct PendingMessage *prev;
47
48   /**
49    * Size of the message.
50    */
51   size_t size;
52
53   /**
54    * Is this the 'ATS_START' message?
55    */ 
56   int is_init;
57 };
58
59
60 /**
61  * Linked list of pending reservations.
62  */
63 struct GNUNET_ATS_ReservationContext
64 {
65
66   /**
67    * Kept in a DLL.
68    */ 
69   struct GNUNET_ATS_ReservationContext *next;
70
71   /**
72    * Kept in a DLL.
73    */ 
74   struct GNUNET_ATS_ReservationContext *prev;
75
76   /**
77    * Target peer.
78    */
79   struct GNUNET_PeerIdentity peer;
80                             
81   /**
82    * Desired reservation
83    */
84   int32_t size;
85
86   /**
87    * Function to call on result.
88    */
89   GNUNET_ATS_ReservationCallback rcb;
90
91   /**
92    * Closure for 'rcb'
93    */
94   void *rcb_cls;
95
96   /**
97    * Do we need to undo this reservation if it succeeded?  Set to
98    * GNUNET_YES if a reservation is cancelled.  (at that point, 'info'
99    * is also set to NULL; however, info will ALSO be NULL for the
100    * reservation context that is created to undo the original request,
101    * so 'info' being NULL cannot be used to check if undo is
102    * required).
103    */
104   int undo;
105 };
106
107
108 /**
109  * ATS Handle to obtain and/or modify performance information.
110  */
111 struct GNUNET_ATS_PerformanceHandle
112 {
113  
114   /**
115    * Our configuration.
116    */
117   const struct GNUNET_CONFIGURATION_Handle *cfg;
118
119   /**
120    * Callback to invoke on performance changes.
121    */
122   GNUNET_ATS_PeerInformationCallback infocb;
123   
124   /**
125    * Closure for 'infocb'.
126    */
127   void *infocb_cls;
128
129   /**
130    * Connection to ATS service.
131    */
132   struct GNUNET_CLIENT_Connection *client;
133
134   /**
135    * Head of list of messages for the ATS service.
136    */
137   struct PendingMessage *pending_head;
138
139   /**
140    * Tail of list of messages for the ATS service
141    */
142   struct PendingMessage *pending_tail;
143
144   /**
145    * Head of linked list of pending reservation requests.
146    */
147   struct GNUNET_ATS_ReservationContext *reservation_head;
148
149   /**
150    * Tail of linked list of pending reservation requests.
151    */
152   struct GNUNET_ATS_ReservationContext *reservation_tail;
153
154   /**
155    * Current request for transmission to ATS.
156    */
157   struct GNUNET_CLIENT_TransmitHandle *th;
158
159 };
160
161
162 /**
163  * Re-establish the connection to the ATS service.
164  *
165  * @param sh handle to use to re-connect.
166  */
167 static void
168 reconnect (struct GNUNET_ATS_PerformanceHandle *ph);
169
170
171 /**
172  * Transmit messages from the message queue to the service
173  * (if there are any, and if we are not already trying).
174  *
175  * @param sh handle to use
176  */
177 static void
178 do_transmit (struct GNUNET_ATS_PerformanceHandle *ph);
179
180
181 /**
182  * We can now transmit a message to ATS. Do it.
183  *
184  * @param cls the 'struct GNUNET_ATS_SchedulingHandle'
185  * @param size number of bytes we can transmit to ATS
186  * @param buf where to copy the messages
187  * @return number of bytes copied into buf
188  */
189 static size_t
190 transmit_message_to_ats (void *cls,
191                          size_t size,
192                          void *buf)
193 {
194   struct GNUNET_ATS_PerformanceHandle *ph = cls;
195   struct PendingMessage *p;
196   size_t ret;
197   char *cbuf;
198
199   ph->th = NULL;
200   ret = 0;
201   cbuf = buf;
202   while ( (NULL != (p = ph->pending_head)) &&
203           (p->size <= size) )
204   {
205     memcpy (&cbuf[ret], &p[1], p->size);    
206     ret += p->size;
207     size -= p->size;
208     GNUNET_CONTAINER_DLL_remove (ph->pending_head,
209                                  ph->pending_tail,
210                                  p);
211     GNUNET_free (p);
212   }
213   do_transmit (ph);
214   return ret;
215 }
216
217
218 /**
219  * Transmit messages from the message queue to the service
220  * (if there are any, and if we are not already trying).
221  *
222  * @param ph handle to use
223  */
224 static void
225 do_transmit (struct GNUNET_ATS_PerformanceHandle *ph)
226 {
227   struct PendingMessage *p;
228
229   if (NULL != ph->th)
230     return;
231   if (NULL == (p = ph->pending_head))
232     return;
233   ph->th = GNUNET_CLIENT_notify_transmit_ready (ph->client,
234                                                 p->size,
235                                                 GNUNET_TIME_UNIT_FOREVER_REL,
236                                                 GNUNET_YES,
237                                                 &transmit_message_to_ats, ph);
238 }
239
240
241 /**
242  * We received a peer information message.  Validate and process it.
243  *
244  * @param ph our context with the callback
245  * @param msg the message
246  * @return GNUNET_OK if the message was well-formed
247  */
248 static int
249 process_pi_message (struct GNUNET_ATS_PerformanceHandle *ph,
250                     const struct GNUNET_MessageHeader *msg)
251 {
252   const struct PeerInformationMessage *pi;
253   const struct GNUNET_ATS_Information *atsi;
254   const char *address;
255   const char *plugin_name;
256   uint16_t address_length;
257   uint16_t plugin_name_length;
258   uint32_t ats_count;
259
260   if (ph->infocb == NULL)
261   {
262     GNUNET_break (0);
263     return GNUNET_SYSERR;
264   }    
265   if (ntohs (msg->size) < sizeof (struct PeerInformationMessage))
266   {
267     GNUNET_break (0);
268     return GNUNET_SYSERR;
269   }
270   pi = (const struct PeerInformationMessage*) msg;
271   ats_count = ntohl (pi->ats_count);
272   address_length = ntohs (pi->address_length);
273   plugin_name_length = ntohs (pi->plugin_name_length);
274   atsi = (const struct GNUNET_ATS_Information*) &pi[1];
275   address = (const char*) &atsi[ats_count];
276   plugin_name = &address[address_length];
277   if ( (address_length +
278         plugin_name_length +
279         ats_count * sizeof (struct GNUNET_ATS_Information) +
280         sizeof (struct PeerInformationMessage) != ntohs (msg->size))  ||
281        (ats_count > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_ATS_Information)) ||
282        (plugin_name[plugin_name_length - 1] != '\0') )
283   {
284     GNUNET_break (0);
285     return GNUNET_SYSERR;
286   }
287   ph->infocb (ph->infocb_cls,
288               &pi->peer,
289               plugin_name,
290               address, address_length,
291               pi->bandwidth_out,
292               pi->bandwidth_in,
293               atsi,
294               ats_count);
295   return GNUNET_OK;
296 }
297
298
299 /**
300  * We received a reservation result message.  Validate and process it.
301  *
302  * @param ph our context with the callback
303  * @param msg the message
304  * @return GNUNET_OK if the message was well-formed
305  */
306 static int
307 process_rr_message (struct GNUNET_ATS_PerformanceHandle *ph,
308                     const struct GNUNET_MessageHeader *msg)
309 {
310   const struct ReservationResultMessage *rr;
311   struct GNUNET_ATS_ReservationContext *rc;
312   int32_t amount;
313
314   if (ph->infocb == NULL)
315   {
316     GNUNET_break (0);
317     return GNUNET_SYSERR;
318   }    
319   if (ntohs (msg->size) < sizeof (struct ReservationResultMessage))
320   {
321     GNUNET_break (0);
322     return GNUNET_SYSERR;
323   }
324   rr = (const struct ReservationResultMessage*) msg;
325   amount = ntohl (rr->amount);
326   rc = ph->reservation_head;
327   if (0 != memcmp (&rr->peer,
328                    &rc->peer,
329                    sizeof (struct GNUNET_PeerIdentity)))
330   {
331     GNUNET_break (0);
332     return GNUNET_SYSERR;
333   }
334   GNUNET_CONTAINER_DLL_remove (ph->reservation_head,
335                                ph->reservation_tail,
336                                rc);
337   if ( (amount == 0) ||
338        (rc->rcb != NULL) )
339   {
340     /* tell client if not cancelled */
341     if (rc->rcb != NULL)
342       rc->rcb (rc->rcb_cls,
343                &rr->peer,
344                amount,
345               GNUNET_TIME_relative_ntoh (rr->res_delay));       
346     GNUNET_free (rc);
347     return GNUNET_OK;
348   }
349   /* amount non-zero, but client cancelled, consider undo! */
350   if (GNUNET_YES != rc->undo)
351   {
352     GNUNET_free (rc);
353     return GNUNET_OK; /* do not try to undo failed undos or negative amounts */
354   }
355   GNUNET_free (rc);
356   (void) GNUNET_ATS_reserve_bandwidth (ph, &rr->peer, -amount, NULL, NULL);
357   return GNUNET_OK;
358 }
359
360
361 /**
362  * Type of a function to call when we receive a message
363  * from the service.
364  *
365  * @param cls the 'struct GNUNET_ATS_SchedulingHandle'
366  * @param msg message received, NULL on timeout or fatal error
367  */
368 static void
369 process_ats_message (void *cls,
370                      const struct GNUNET_MessageHeader *msg)
371 {
372   struct GNUNET_ATS_PerformanceHandle *ph = cls;
373
374   if (NULL == msg) 
375   {
376     GNUNET_CLIENT_disconnect (ph->client, GNUNET_NO);
377     ph->client = NULL;
378     reconnect (ph);
379     return;
380   }
381   switch (ntohs (msg->type))
382   {
383   case GNUNET_MESSAGE_TYPE_ATS_PEER_INFORMATION:
384     if (GNUNET_OK != process_pi_message (ph, msg))
385     {
386       GNUNET_CLIENT_disconnect (ph->client, GNUNET_NO);
387       ph->client = NULL;
388       reconnect (ph);
389       return;
390     }
391     break;
392   case GNUNET_MESSAGE_TYPE_ATS_RESERVATION_RESULT:
393     if (GNUNET_OK != process_rr_message (ph, msg))
394     {
395       GNUNET_CLIENT_disconnect (ph->client, GNUNET_NO);
396       ph->client = NULL;
397       reconnect (ph);
398       return;
399     }
400     break;
401   default:
402     GNUNET_break (0);
403     GNUNET_CLIENT_disconnect (ph->client, GNUNET_NO);
404     ph->client = NULL;
405     reconnect (ph);
406     return;
407   }
408   GNUNET_CLIENT_receive (ph->client,
409                          &process_ats_message, ph,
410                          GNUNET_TIME_UNIT_FOREVER_REL);
411 }
412
413
414 /**
415  * Re-establish the connection to the ATS service.
416  *
417  * @param ph handle to use to re-connect.
418  */
419 static void
420 reconnect (struct GNUNET_ATS_PerformanceHandle *ph)
421 {
422   struct PendingMessage *p;
423   struct ClientStartMessage *init;
424
425   GNUNET_assert (NULL == ph->client);
426   ph->client = GNUNET_CLIENT_connect ("ats", ph->cfg);
427   GNUNET_assert (NULL != ph->client);
428   GNUNET_CLIENT_receive (ph->client,
429                          &process_ats_message, ph,
430                          GNUNET_TIME_UNIT_FOREVER_REL);
431   if ( (NULL == (p = ph->pending_head)) ||
432        (GNUNET_YES != p->is_init) )
433   {
434     p = GNUNET_malloc (sizeof (struct PendingMessage) +
435                        sizeof (struct ClientStartMessage));
436     p->size = sizeof (struct ClientStartMessage);
437     p->is_init = GNUNET_YES;
438     init = (struct ClientStartMessage *) &p[1];
439     init->header.type = htons (GNUNET_MESSAGE_TYPE_ATS_START);
440     init->header.size = htons (sizeof (struct ClientStartMessage));
441     init->start_flag = htonl ((ph->infocb == NULL) 
442                               ? START_FLAG_PERFORMANCE_NO_PIC 
443                               : START_FLAG_PERFORMANCE_WITH_PIC);
444     GNUNET_CONTAINER_DLL_insert (ph->pending_head,
445                                  ph->pending_tail,
446                                  p);
447   }
448   do_transmit (ph);
449 }
450
451
452
453 /**
454  * Get handle to access performance API of the ATS subsystem.
455  *
456  * @param cfg configuration to use
457  * @param infocb function to call on allocation changes, can be NULL
458  * @param infocb_cls closure for infocb
459  * @return ats performance context
460  */
461 struct GNUNET_ATS_PerformanceHandle *
462 GNUNET_ATS_performance_init (const struct GNUNET_CONFIGURATION_Handle *cfg,
463                              GNUNET_ATS_PeerInformationCallback infocb,
464                              void *infocb_cls)
465 {
466   struct GNUNET_ATS_PerformanceHandle *ph;
467
468   ph = GNUNET_malloc (sizeof (struct GNUNET_ATS_PerformanceHandle));
469   ph->cfg = cfg;
470   ph->infocb = infocb;
471   ph->infocb_cls = infocb_cls;
472   reconnect (ph);
473   return ph;
474 }
475
476
477 /**
478  * Client is done using the ATS performance subsystem, release resources.
479  *
480  * @param ph handle
481  */
482 void
483 GNUNET_ATS_performance_done (struct GNUNET_ATS_PerformanceHandle *ph)
484 {
485   struct PendingMessage *p;
486   struct GNUNET_ATS_ReservationContext *rc;
487   
488   while (NULL != (p = ph->pending_head))
489   {
490     GNUNET_CONTAINER_DLL_remove (ph->pending_head,
491                                  ph->pending_tail,
492                                  p);
493     GNUNET_free (p);
494   }
495   while (NULL != (rc = ph->reservation_head))
496   {
497     GNUNET_CONTAINER_DLL_remove (ph->reservation_head,
498                                  ph->reservation_tail,
499                                  rc);
500     GNUNET_break (NULL == rc->rcb);
501     GNUNET_free (p);
502   }  
503   GNUNET_CLIENT_disconnect (ph->client, GNUNET_NO);
504   GNUNET_free (ph);
505 }
506
507
508 /**
509  * Reserve inbound bandwidth from the given peer.  ATS will look at
510  * the current amount of traffic we receive from the peer and ensure
511  * that the peer could add 'amount' of data to its stream.
512  *
513  * @param ph performance handle
514  * @param peer identifies the peer
515  * @param amount reserve N bytes for receiving, negative
516  *                amounts can be used to undo a (recent) reservation;
517  * @param rcb function to call with the resulting reservation information
518  * @param rcb_cls closure for info
519  * @return NULL on error
520  * @deprecated will be replaced soon
521  */
522 struct GNUNET_ATS_ReservationContext *
523 GNUNET_ATS_reserve_bandwidth (struct GNUNET_ATS_PerformanceHandle *ph,
524                               const struct GNUNET_PeerIdentity *peer,
525                               int32_t amount, 
526                               GNUNET_ATS_ReservationCallback rcb, 
527                               void *rcb_cls)
528 {
529   struct GNUNET_ATS_ReservationContext *rc;
530   struct PendingMessage *p;
531   struct ReservationRequestMessage *m;
532
533   rc = GNUNET_malloc (sizeof (struct GNUNET_ATS_ReservationContext));
534   rc->size = amount;
535   rc->peer = *peer;
536   rc->rcb = rcb;
537   rc->rcb_cls = rcb_cls;
538   if ( (rcb != NULL) && (amount > 0) )
539     rc->undo = GNUNET_YES;
540   GNUNET_CONTAINER_DLL_insert_tail (ph->reservation_head,
541                                     ph->reservation_tail,
542                                     rc);
543   
544   p = GNUNET_malloc (sizeof (struct PendingMessage) + 
545                      sizeof (struct ReservationRequestMessage));
546   p->size = sizeof (struct ReservationRequestMessage);
547   p->is_init = GNUNET_NO;
548   m = (struct ReservationRequestMessage*) &p[1];
549   m->header.type = htons (GNUNET_MESSAGE_TYPE_ATS_ADDRESS_UPDATE);
550   m->header.size = htons (sizeof (struct ReservationRequestMessage));
551   m->amount = htonl (amount);
552   m->peer = *peer;
553   GNUNET_CONTAINER_DLL_insert_tail (ph->pending_head,
554                                     ph->pending_tail,
555                                     p);
556   return rc;
557 }
558
559
560 /**
561  * Cancel request for reserving bandwidth.
562  *
563  * @param rc context returned by the original GNUNET_ATS_reserve_bandwidth call
564  */
565 void
566 GNUNET_ATS_reserve_bandwidth_cancel (struct
567                                      GNUNET_ATS_ReservationContext *rc)
568 {
569   rc->rcb = NULL;
570 }
571
572
573 /**
574  * Change preferences for the given peer. Preference changes are forgotten if peers
575  * disconnect.
576  * 
577  * @param ph performance handle
578  * @param peer identifies the peer
579  * @param ... 0-terminated specification of the desired changes
580  */
581 void
582 GNUNET_ATS_change_preference (struct GNUNET_ATS_PerformanceHandle *ph,
583                               const struct GNUNET_PeerIdentity *peer,
584                               ...)
585 {
586   struct PendingMessage *p;
587   struct ChangePreferenceMessage *m;
588   size_t msize;
589   uint32_t count;
590   struct PreferenceInformation *pi;
591   va_list ap;
592   enum GNUNET_ATS_PreferenceKind kind;
593
594   count = 0;
595   va_start (ap, peer);
596   while (GNUNET_ATS_PREFERENCE_END != (kind = va_arg (ap, enum GNUNET_ATS_PreferenceKind)))
597   {
598     switch (kind)
599     {
600     case GNUNET_ATS_PREFERENCE_BANDWIDTH:
601       count++;
602       (void) va_arg (ap, double);
603       break;
604     case GNUNET_ATS_PREFERENCE_LATENCY:
605       count++;
606       (void) va_arg (ap, double);
607       break;
608     default:
609       GNUNET_assert (0);      
610     }
611   }
612   va_end (ap);
613   msize = count * sizeof (struct PreferenceInformation) +
614     sizeof (struct ChangePreferenceMessage);
615   p = GNUNET_malloc (sizeof (struct PendingMessage) + 
616                      msize);
617   p->size = msize;
618   p->is_init = GNUNET_NO;
619   m = (struct ChangePreferenceMessage*) &p[1];
620   m->header.type = htons (GNUNET_MESSAGE_TYPE_ATS_ADDRESS_UPDATE);
621   m->header.size = htons (msize);
622   m->num_preferences = htonl (count);
623   m->peer = *peer;
624   pi = (struct PreferenceInformation*) &m[1];
625   count = 0;
626   va_start (ap, peer);
627   while (GNUNET_ATS_PREFERENCE_END != (kind = va_arg (ap, enum GNUNET_ATS_PreferenceKind)))
628   {
629     pi[count].preference_kind = htonl (kind);
630     switch (kind)
631     {
632     case GNUNET_ATS_PREFERENCE_BANDWIDTH:
633       pi[count].preference_value = (float) va_arg (ap, double);
634       count++;
635       break;
636     case GNUNET_ATS_PREFERENCE_LATENCY:
637       pi[count].preference_value = (float) va_arg (ap, double);
638       count++;
639       break;
640     default:
641       GNUNET_assert (0);      
642     }
643   }
644   va_end (ap);
645   GNUNET_CONTAINER_DLL_insert_tail (ph->pending_head,
646                                     ph->pending_tail,
647                                     p);
648 }
649
650 /* end of ats_api_performance.c */
651