af977cb668a6c678fd469b32df2524b229ae6049
[oweals/gnunet.git] / src / ats / ats_api_performance.c
1 /*
2      This file is part of GNUnet.
3      (C) 2010,2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20 /**
21  * @file ats/ats_api_performance.c
22  * @brief automatic transport selection and outbound bandwidth determination
23  * @author Christian Grothoff
24  * @author Matthias Wachs
25   */
26 #include "platform.h"
27 #include "gnunet_ats_service.h"
28 #include "ats.h"
29
30
31 /**
32  * Message in linked list we should send to the ATS service.  The
33  * actual binary message follows this struct.
34  */
35 struct PendingMessage
36 {
37
38   /**
39    * Kept in a DLL.
40    */ 
41   struct PendingMessage *next;
42
43   /**
44    * Kept in a DLL.
45    */ 
46   struct PendingMessage *prev;
47
48   /**
49    * Size of the message.
50    */
51   size_t size;
52
53   /**
54    * Is this the 'ATS_START' message?
55    */ 
56   int is_init;
57 };
58
59
60 /**
61  * Linked list of pending reservations.
62  */
63 struct GNUNET_ATS_ReservationContext
64 {
65
66   /**
67    * Kept in a DLL.
68    */ 
69   struct GNUNET_ATS_ReservationContext *next;
70
71   /**
72    * Kept in a DLL.
73    */ 
74   struct GNUNET_ATS_ReservationContext *prev;
75
76   /**
77    * Target peer.
78    */
79   struct GNUNET_PeerIdentity peer;
80                             
81   /**
82    * Desired reservation
83    */
84   int32_t size;
85
86   /**
87    * Function to call on result.
88    */
89   GNUNET_ATS_ReservationCallback rcb;
90
91   /**
92    * Closure for 'rcb'
93    */
94   void *rcb_cls;
95
96   /**
97    * Do we need to undo this reservation if it succeeded?  Set to
98    * GNUNET_YES if a reservation is cancelled.  (at that point, 'info'
99    * is also set to NULL; however, info will ALSO be NULL for the
100    * reservation context that is created to undo the original request,
101    * so 'info' being NULL cannot be used to check if undo is
102    * required).
103    */
104   int undo;
105 };
106
107
108 /**
109  * ATS Handle to obtain and/or modify performance information.
110  */
111 struct GNUNET_ATS_PerformanceHandle
112 {
113  
114   /**
115    * Our configuration.
116    */
117   const struct GNUNET_CONFIGURATION_Handle *cfg;
118
119   /**
120    * Callback to invoke on performance changes.
121    */
122   GNUNET_ATS_PeerInformationCallback infocb;
123   
124   /**
125    * Closure for 'infocb'.
126    */
127   void *infocb_cls;
128
129   /**
130    * Connection to ATS service.
131    */
132   struct GNUNET_CLIENT_Connection *client;
133
134   /**
135    * Head of list of messages for the ATS service.
136    */
137   struct PendingMessage *pending_head;
138
139   /**
140    * Tail of list of messages for the ATS service
141    */
142   struct PendingMessage *pending_tail;
143
144   /**
145    * Head of linked list of pending reservation requests.
146    */
147   struct GNUNET_ATS_ReservationContext *reservation_head;
148
149   /**
150    * Tail of linked list of pending reservation requests.
151    */
152   struct GNUNET_ATS_ReservationContext *reservation_tail;
153
154   /**
155    * Current request for transmission to ATS.
156    */
157   struct GNUNET_CLIENT_TransmitHandle *th;
158
159   /**
160    * Task to trigger reconnect.
161    */ 
162   GNUNET_SCHEDULER_TaskIdentifier task;
163   
164 };
165
166
167 /**
168  * Re-establish the connection to the ATS service.
169  *
170  * @param sh handle to use to re-connect.
171  */
172 static void
173 reconnect (struct GNUNET_ATS_PerformanceHandle *ph);
174
175
176 /**
177  * Re-establish the connection to the ATS service.
178  *
179  * @param cls handle to use to re-connect.
180  * @param tc scheduler context
181  */
182 static void
183 reconnect_task (void *cls,
184                 const struct GNUNET_SCHEDULER_TaskContext *tc)
185 {
186   struct GNUNET_ATS_PerformanceHandle *ph = cls;
187
188   ph->task = GNUNET_SCHEDULER_NO_TASK;
189   reconnect (ph);
190 }
191
192
193 /**
194  * Transmit messages from the message queue to the service
195  * (if there are any, and if we are not already trying).
196  *
197  * @param sh handle to use
198  */
199 static void
200 do_transmit (struct GNUNET_ATS_PerformanceHandle *ph);
201
202
203 /**
204  * We can now transmit a message to ATS. Do it.
205  *
206  * @param cls the 'struct GNUNET_ATS_SchedulingHandle'
207  * @param size number of bytes we can transmit to ATS
208  * @param buf where to copy the messages
209  * @return number of bytes copied into buf
210  */
211 static size_t
212 transmit_message_to_ats (void *cls,
213                          size_t size,
214                          void *buf)
215 {
216   struct GNUNET_ATS_PerformanceHandle *ph = cls;
217   struct PendingMessage *p;
218   size_t ret;
219   char *cbuf;
220
221   ph->th = NULL;
222   ret = 0;
223   cbuf = buf;
224   while ( (NULL != (p = ph->pending_head)) &&
225           (p->size <= size) )
226   {
227     memcpy (&cbuf[ret], &p[1], p->size);    
228     ret += p->size;
229     size -= p->size;
230     GNUNET_CONTAINER_DLL_remove (ph->pending_head,
231                                  ph->pending_tail,
232                                  p);
233     GNUNET_free (p);
234   }
235   do_transmit (ph);
236   return ret;
237 }
238
239
240 /**
241  * Transmit messages from the message queue to the service
242  * (if there are any, and if we are not already trying).
243  *
244  * @param ph handle to use
245  */
246 static void
247 do_transmit (struct GNUNET_ATS_PerformanceHandle *ph)
248 {
249   struct PendingMessage *p;
250
251   if (NULL != ph->th)
252     return;
253   if (NULL == (p = ph->pending_head))
254     return;
255   ph->th = GNUNET_CLIENT_notify_transmit_ready (ph->client,
256                                                 p->size,
257                                                 GNUNET_TIME_UNIT_FOREVER_REL,
258                                                 GNUNET_YES,
259                                                 &transmit_message_to_ats, ph);
260 }
261
262
263 /**
264  * We received a peer information message.  Validate and process it.
265  *
266  * @param ph our context with the callback
267  * @param msg the message
268  * @return GNUNET_OK if the message was well-formed
269  */
270 static int
271 process_pi_message (struct GNUNET_ATS_PerformanceHandle *ph,
272                     const struct GNUNET_MessageHeader *msg)
273 {
274   const struct PeerInformationMessage *pi;
275   const struct GNUNET_ATS_Information *atsi;
276   const char *address;
277   const char *plugin_name;
278   uint16_t address_length;
279   uint16_t plugin_name_length;
280   uint32_t ats_count;
281
282   if (ph->infocb == NULL)
283   {
284     GNUNET_break (0);
285     return GNUNET_SYSERR;
286   }    
287   if (ntohs (msg->size) < sizeof (struct PeerInformationMessage))
288   {
289     GNUNET_break (0);
290     return GNUNET_SYSERR;
291   }
292   pi = (const struct PeerInformationMessage*) msg;
293   ats_count = ntohl (pi->ats_count);
294   address_length = ntohs (pi->address_length);
295   plugin_name_length = ntohs (pi->plugin_name_length);
296   atsi = (const struct GNUNET_ATS_Information*) &pi[1];
297   address = (const char*) &atsi[ats_count];
298   plugin_name = &address[address_length];
299   if ( (address_length +
300         plugin_name_length +
301         ats_count * sizeof (struct GNUNET_ATS_Information) +
302         sizeof (struct PeerInformationMessage) != ntohs (msg->size))  ||
303        (ats_count > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_ATS_Information)) ||
304        (plugin_name[plugin_name_length - 1] != '\0') )
305   {
306     GNUNET_break (0);
307     return GNUNET_SYSERR;
308   }
309   ph->infocb (ph->infocb_cls,
310               &pi->peer,
311               plugin_name,
312               address, address_length,
313               pi->bandwidth_out,
314               pi->bandwidth_in,
315               atsi,
316               ats_count);
317   return GNUNET_OK;
318 }
319
320
321 /**
322  * We received a reservation result message.  Validate and process it.
323  *
324  * @param ph our context with the callback
325  * @param msg the message
326  * @return GNUNET_OK if the message was well-formed
327  */
328 static int
329 process_rr_message (struct GNUNET_ATS_PerformanceHandle *ph,
330                     const struct GNUNET_MessageHeader *msg)
331 {
332   const struct ReservationResultMessage *rr;
333   struct GNUNET_ATS_ReservationContext *rc;
334   int32_t amount;
335
336   if (ph->infocb == NULL)
337   {
338     GNUNET_break (0);
339     return GNUNET_SYSERR;
340   }    
341   if (ntohs (msg->size) < sizeof (struct ReservationResultMessage))
342   {
343     GNUNET_break (0);
344     return GNUNET_SYSERR;
345   }
346   rr = (const struct ReservationResultMessage*) msg;
347   amount = ntohl (rr->amount);
348   rc = ph->reservation_head;
349   if (0 != memcmp (&rr->peer,
350                    &rc->peer,
351                    sizeof (struct GNUNET_PeerIdentity)))
352   {
353     GNUNET_break (0);
354     return GNUNET_SYSERR;
355   }
356   GNUNET_CONTAINER_DLL_remove (ph->reservation_head,
357                                ph->reservation_tail,
358                                rc);
359   if ( (amount == 0) ||
360        (rc->rcb != NULL) )
361   {
362     /* tell client if not cancelled */
363     if (rc->rcb != NULL)
364       rc->rcb (rc->rcb_cls,
365                &rr->peer,
366                amount,
367               GNUNET_TIME_relative_ntoh (rr->res_delay));       
368     GNUNET_free (rc);
369     return GNUNET_OK;
370   }
371   /* amount non-zero, but client cancelled, consider undo! */
372   if (GNUNET_YES != rc->undo)
373   {
374     GNUNET_free (rc);
375     return GNUNET_OK; /* do not try to undo failed undos or negative amounts */
376   }
377   GNUNET_free (rc);
378   (void) GNUNET_ATS_reserve_bandwidth (ph, &rr->peer, -amount, NULL, NULL);
379   return GNUNET_OK;
380 }
381
382
383 /**
384  * Type of a function to call when we receive a message
385  * from the service.
386  *
387  * @param cls the 'struct GNUNET_ATS_SchedulingHandle'
388  * @param msg message received, NULL on timeout or fatal error
389  */
390 static void
391 process_ats_message (void *cls,
392                      const struct GNUNET_MessageHeader *msg)
393 {
394   struct GNUNET_ATS_PerformanceHandle *ph = cls;
395
396   if (NULL == msg) 
397     goto reconnect;
398   switch (ntohs (msg->type))
399   {
400   case GNUNET_MESSAGE_TYPE_ATS_PEER_INFORMATION:
401     if (GNUNET_OK != process_pi_message (ph, msg))
402       goto reconnect;
403     break;
404   case GNUNET_MESSAGE_TYPE_ATS_RESERVATION_RESULT:
405     if (GNUNET_OK != process_rr_message (ph, msg))
406       goto reconnect;    
407     break;
408   default:
409     GNUNET_break (0);
410     goto reconnect;
411     return;
412   }
413   GNUNET_CLIENT_receive (ph->client,
414                          &process_ats_message, ph,
415                          GNUNET_TIME_UNIT_FOREVER_REL);
416   return;
417  reconnect:
418   GNUNET_CLIENT_disconnect (ph->client, GNUNET_NO);
419   ph->client = NULL;
420   ph->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
421                                            &reconnect_task, ph);
422 }
423
424
425 /**
426  * Re-establish the connection to the ATS service.
427  *
428  * @param ph handle to use to re-connect.
429  */
430 static void
431 reconnect (struct GNUNET_ATS_PerformanceHandle *ph)
432 {
433   struct PendingMessage *p;
434   struct ClientStartMessage *init;
435
436   GNUNET_assert (NULL == ph->client);
437   ph->client = GNUNET_CLIENT_connect ("ats", ph->cfg);
438   GNUNET_assert (NULL != ph->client);
439   GNUNET_CLIENT_receive (ph->client,
440                          &process_ats_message, ph,
441                          GNUNET_TIME_UNIT_FOREVER_REL);
442   if ( (NULL == (p = ph->pending_head)) ||
443        (GNUNET_YES != p->is_init) )
444   {
445     p = GNUNET_malloc (sizeof (struct PendingMessage) +
446                        sizeof (struct ClientStartMessage));
447     p->size = sizeof (struct ClientStartMessage);
448     p->is_init = GNUNET_YES;
449     init = (struct ClientStartMessage *) &p[1];
450     init->header.type = htons (GNUNET_MESSAGE_TYPE_ATS_START);
451     init->header.size = htons (sizeof (struct ClientStartMessage));
452     init->start_flag = htonl ((ph->infocb == NULL) 
453                               ? START_FLAG_PERFORMANCE_NO_PIC 
454                               : START_FLAG_PERFORMANCE_WITH_PIC);
455     GNUNET_CONTAINER_DLL_insert (ph->pending_head,
456                                  ph->pending_tail,
457                                  p);
458   }
459   do_transmit (ph);
460 }
461
462
463
464 /**
465  * Get handle to access performance API of the ATS subsystem.
466  *
467  * @param cfg configuration to use
468  * @param infocb function to call on allocation changes, can be NULL
469  * @param infocb_cls closure for infocb
470  * @return ats performance context
471  */
472 struct GNUNET_ATS_PerformanceHandle *
473 GNUNET_ATS_performance_init (const struct GNUNET_CONFIGURATION_Handle *cfg,
474                              GNUNET_ATS_PeerInformationCallback infocb,
475                              void *infocb_cls)
476 {
477   struct GNUNET_ATS_PerformanceHandle *ph;
478
479   ph = GNUNET_malloc (sizeof (struct GNUNET_ATS_PerformanceHandle));
480   ph->cfg = cfg;
481   ph->infocb = infocb;
482   ph->infocb_cls = infocb_cls;
483   reconnect (ph);
484   return ph;
485 }
486
487
488 /**
489  * Client is done using the ATS performance subsystem, release resources.
490  *
491  * @param ph handle
492  */
493 void
494 GNUNET_ATS_performance_done (struct GNUNET_ATS_PerformanceHandle *ph)
495 {
496   struct PendingMessage *p;
497   struct GNUNET_ATS_ReservationContext *rc;
498   
499   while (NULL != (p = ph->pending_head))
500   {
501     GNUNET_CONTAINER_DLL_remove (ph->pending_head,
502                                  ph->pending_tail,
503                                  p);
504     GNUNET_free (p);
505   }
506   while (NULL != (rc = ph->reservation_head))
507   {
508     GNUNET_CONTAINER_DLL_remove (ph->reservation_head,
509                                  ph->reservation_tail,
510                                  rc);
511     GNUNET_break (NULL == rc->rcb);
512     GNUNET_free (rc);
513   }  
514   if (GNUNET_SCHEDULER_NO_TASK != ph->task)
515   {
516     GNUNET_SCHEDULER_cancel (ph->task);
517     ph->task = GNUNET_SCHEDULER_NO_TASK;
518   }
519   if (NULL != ph->client)
520   {
521     GNUNET_CLIENT_disconnect (ph->client, GNUNET_NO);
522     ph->client = NULL;
523   }
524   GNUNET_free (ph);
525 }
526
527
528 /**
529  * Reserve inbound bandwidth from the given peer.  ATS will look at
530  * the current amount of traffic we receive from the peer and ensure
531  * that the peer could add 'amount' of data to its stream.
532  *
533  * @param ph performance handle
534  * @param peer identifies the peer
535  * @param amount reserve N bytes for receiving, negative
536  *                amounts can be used to undo a (recent) reservation;
537  * @param rcb function to call with the resulting reservation information
538  * @param rcb_cls closure for info
539  * @return NULL on error
540  * @deprecated will be replaced soon
541  */
542 struct GNUNET_ATS_ReservationContext *
543 GNUNET_ATS_reserve_bandwidth (struct GNUNET_ATS_PerformanceHandle *ph,
544                               const struct GNUNET_PeerIdentity *peer,
545                               int32_t amount, 
546                               GNUNET_ATS_ReservationCallback rcb, 
547                               void *rcb_cls)
548 {
549   struct GNUNET_ATS_ReservationContext *rc;
550   struct PendingMessage *p;
551   struct ReservationRequestMessage *m;
552
553   rc = GNUNET_malloc (sizeof (struct GNUNET_ATS_ReservationContext));
554   rc->size = amount;
555   rc->peer = *peer;
556   rc->rcb = rcb;
557   rc->rcb_cls = rcb_cls;
558   if ( (rcb != NULL) && (amount > 0) )
559     rc->undo = GNUNET_YES;
560   GNUNET_CONTAINER_DLL_insert_tail (ph->reservation_head,
561                                     ph->reservation_tail,
562                                     rc);
563   
564   p = GNUNET_malloc (sizeof (struct PendingMessage) + 
565                      sizeof (struct ReservationRequestMessage));
566   p->size = sizeof (struct ReservationRequestMessage);
567   p->is_init = GNUNET_NO;
568   m = (struct ReservationRequestMessage*) &p[1];
569   m->header.type = htons (GNUNET_MESSAGE_TYPE_ATS_ADDRESS_UPDATE);
570   m->header.size = htons (sizeof (struct ReservationRequestMessage));
571   m->amount = htonl (amount);
572   m->peer = *peer;
573   GNUNET_CONTAINER_DLL_insert_tail (ph->pending_head,
574                                     ph->pending_tail,
575                                     p);
576   return rc;
577 }
578
579
580 /**
581  * Cancel request for reserving bandwidth.
582  *
583  * @param rc context returned by the original GNUNET_ATS_reserve_bandwidth call
584  */
585 void
586 GNUNET_ATS_reserve_bandwidth_cancel (struct
587                                      GNUNET_ATS_ReservationContext *rc)
588 {
589   rc->rcb = NULL;
590 }
591
592
593 /**
594  * Change preferences for the given peer. Preference changes are forgotten if peers
595  * disconnect.
596  * 
597  * @param ph performance handle
598  * @param peer identifies the peer
599  * @param ... 0-terminated specification of the desired changes
600  */
601 void
602 GNUNET_ATS_change_preference (struct GNUNET_ATS_PerformanceHandle *ph,
603                               const struct GNUNET_PeerIdentity *peer,
604                               ...)
605 {
606   struct PendingMessage *p;
607   struct ChangePreferenceMessage *m;
608   size_t msize;
609   uint32_t count;
610   struct PreferenceInformation *pi;
611   va_list ap;
612   enum GNUNET_ATS_PreferenceKind kind;
613
614   count = 0;
615   va_start (ap, peer);
616   while (GNUNET_ATS_PREFERENCE_END != (kind = va_arg (ap, enum GNUNET_ATS_PreferenceKind)))
617   {
618     switch (kind)
619     {
620     case GNUNET_ATS_PREFERENCE_BANDWIDTH:
621       count++;
622       (void) va_arg (ap, double);
623       break;
624     case GNUNET_ATS_PREFERENCE_LATENCY:
625       count++;
626       (void) va_arg (ap, double);
627       break;
628     default:
629       GNUNET_assert (0);      
630     }
631   }
632   va_end (ap);
633   msize = count * sizeof (struct PreferenceInformation) +
634     sizeof (struct ChangePreferenceMessage);
635   p = GNUNET_malloc (sizeof (struct PendingMessage) + 
636                      msize);
637   p->size = msize;
638   p->is_init = GNUNET_NO;
639   m = (struct ChangePreferenceMessage*) &p[1];
640   m->header.type = htons (GNUNET_MESSAGE_TYPE_ATS_ADDRESS_UPDATE);
641   m->header.size = htons (msize);
642   m->num_preferences = htonl (count);
643   m->peer = *peer;
644   pi = (struct PreferenceInformation*) &m[1];
645   count = 0;
646   va_start (ap, peer);
647   while (GNUNET_ATS_PREFERENCE_END != (kind = va_arg (ap, enum GNUNET_ATS_PreferenceKind)))
648   {
649     pi[count].preference_kind = htonl (kind);
650     switch (kind)
651     {
652     case GNUNET_ATS_PREFERENCE_BANDWIDTH:
653       pi[count].preference_value = (float) va_arg (ap, double);
654       count++;
655       break;
656     case GNUNET_ATS_PREFERENCE_LATENCY:
657       pi[count].preference_value = (float) va_arg (ap, double);
658       count++;
659       break;
660     default:
661       GNUNET_assert (0);      
662     }
663   }
664   va_end (ap);
665   GNUNET_CONTAINER_DLL_insert_tail (ph->pending_head,
666                                     ph->pending_tail,
667                                     p);
668 }
669
670 /* end of ats_api_performance.c */
671