fix
[oweals/gnunet.git] / src / ats / ats_api_scheduling.c
1 /*
2      This file is part of GNUnet.
3      (C) 2010,2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20 /**
21  * @file ats/ats_api_scheduling.c
22  * @brief automatic transport selection and outbound bandwidth determination
23  * @author Christian Grothoff
24  * @author Matthias Wachs
25  */
26 #include "platform.h"
27 #include "gnunet_ats_service.h"
28 #include "ats.h"
29
30
31 /**
32  * Message in linked list we should send to the ATS service.  The
33  * actual binary message follows this struct.
34  */
35 struct PendingMessage
36 {
37
38   /**
39    * Kept in a DLL.
40    */ 
41   struct PendingMessage *next;
42
43   /**
44    * Kept in a DLL.
45    */ 
46   struct PendingMessage *prev;
47
48   /**
49    * Size of the message.
50    */
51   size_t size;
52
53   /**
54    * Is this the 'ATS_START' message?
55    */ 
56   int is_init;
57 };
58
59
60 /**
61  * Handle to the ATS subsystem for bandwidth/transport scheduling information.
62  */
63 struct GNUNET_ATS_SchedulingHandle
64 {
65   
66   /**
67    * Our configuration.
68    */
69   const struct GNUNET_CONFIGURATION_Handle *cfg;
70
71   /**
72    * Callback to invoke on suggestions.
73    */
74   GNUNET_ATS_AddressSuggestionCallback suggest_cb;
75   
76   /**
77    * Closure for 'suggest_cb'.
78    */
79   void *suggest_cb_cls;
80
81   /**
82    * Connection to ATS service.
83    */
84   struct GNUNET_CLIENT_Connection *client;
85
86   /**
87    * Head of list of messages for the ATS service.
88    */
89   struct PendingMessage *pending_head;
90
91   /**
92    * Tail of list of messages for the ATS service
93    */
94   struct PendingMessage *pending_tail;
95
96   /**
97    * Current request for transmission to ATS.
98    */
99   struct GNUNET_CLIENT_TransmitHandle *th;
100
101   /**
102    * Array of session objects (we need to translate them to numbers and back
103    * for the protocol; the offset in the array is the session number on the
104    * network).  Index 0 is always NULL and reserved to represent the NULL pointer.
105    * Unused entries are also NULL.
106    */
107   struct Session **session_array;
108   
109   /**
110    * Size of the session array.
111    */
112   unsigned int session_array_size;
113
114 };
115
116
117 /**
118  * Re-establish the connection to the ATS service.
119  *
120  * @param sh handle to use to re-connect.
121  */
122 static void
123 reconnect (struct GNUNET_ATS_SchedulingHandle *sh);
124
125
126 /**
127  * Transmit messages from the message queue to the service
128  * (if there are any, and if we are not already trying).
129  *
130  * @param sh handle to use
131  */
132 static void
133 do_transmit (struct GNUNET_ATS_SchedulingHandle *sh);
134
135
136 /**
137  * We can now transmit a message to ATS. Do it.
138  *
139  * @param cls the 'struct GNUNET_ATS_SchedulingHandle'
140  * @param size number of bytes we can transmit to ATS
141  * @param buf where to copy the messages
142  * @return number of bytes copied into buf
143  */
144 static size_t
145 transmit_message_to_ats (void *cls,
146                          size_t size,
147                          void *buf)
148 {
149   struct GNUNET_ATS_SchedulingHandle *sh = cls;
150   struct PendingMessage *p;
151   size_t ret;
152   char *cbuf;
153
154   sh->th = NULL;
155   ret = 0;
156   cbuf = buf;
157   while ( (NULL != (p = sh->pending_head)) &&
158           (p->size <= size) )
159   {
160     memcpy (&cbuf[ret], &p[1], p->size);    
161     ret += p->size;
162     size -= p->size;
163     GNUNET_CONTAINER_DLL_remove (sh->pending_head,
164                                  sh->pending_tail,
165                                  p);
166     GNUNET_free (p);
167   }
168   do_transmit (sh);
169   return ret;
170 }
171
172
173 /**
174  * Transmit messages from the message queue to the service
175  * (if there are any, and if we are not already trying).
176  *
177  * @param sh handle to use
178  */
179 static void
180 do_transmit (struct GNUNET_ATS_SchedulingHandle *sh)
181 {
182   struct PendingMessage *p;
183
184   if (NULL != sh->th)
185     return;
186   if (NULL == (p = sh->pending_head))
187     return;
188   sh->th = GNUNET_CLIENT_notify_transmit_ready (sh->client,
189                                                 p->size,
190                                                 GNUNET_TIME_UNIT_FOREVER_REL,
191                                                 GNUNET_YES,
192                                                 &transmit_message_to_ats, sh);
193 }
194
195
196 /**
197  * Find the session object corresponding to the given session ID.
198  *
199  * @param sh our handle
200  * @param session_id current session ID
201  * @return the session object (or NULL)
202  */
203 static struct Session*
204 find_session (struct GNUNET_ATS_SchedulingHandle *sh,
205               uint32_t session_id)
206 {
207   if (session_id >= sh->session_array_size)
208   {
209     GNUNET_break (0);
210     return NULL;
211   }
212   return sh->session_array[session_id];
213 }
214
215
216 /**
217  * Get the ID for the given session object.  If we do not have an ID for
218  * the given session object, allocate one.
219  *
220  * @param sh our handle
221  * @param session session object
222  * @return the session id
223  */
224 static uint32_t 
225 get_session_id (struct GNUNET_ATS_SchedulingHandle *sh,
226                 struct Session *session)
227 {
228   unsigned int i;
229   unsigned int f;
230   
231   f = 0;
232   for (i=1;i<sh->session_array_size;i++)
233   {
234     if (session == sh->session_array[i])
235       return i;
236     if ( (f == 0) &&
237          (sh->session_array[i] == NULL) )
238       f = i;
239   }
240   if (f == 0)
241   {
242     f = sh->session_array_size;
243     GNUNET_array_grow (sh->session_array,
244                        sh->session_array_size,
245                        sh->session_array_size * 2);
246   }
247   sh->session_array[f] = session;
248   return f;
249 }
250
251
252 /**
253  * Remove the session of the given session ID from the session
254  * table (it is no longer valid).
255  *
256  * @param sh our handle
257  * @param session_id identifies session that is no longer valid
258  */
259 static void
260 remove_session (struct GNUNET_ATS_SchedulingHandle *sh,
261                 uint32_t session_id)
262 {
263   GNUNET_assert (session_id < sh->session_array_size);
264   sh->session_array[session_id] = NULL;
265 }
266
267
268 /**
269  * Type of a function to call when we receive a message
270  * from the service.
271  *
272  * @param cls the 'struct GNUNET_ATS_SchedulingHandle'
273  * @param msg message received, NULL on timeout or fatal error
274  */
275 static void
276 process_ats_message (void *cls,
277                      const struct GNUNET_MessageHeader *msg)
278 {
279   struct GNUNET_ATS_SchedulingHandle *sh = cls;
280   const struct AddressSuggestionMessage *m;
281   const struct GNUNET_TRANSPORT_ATS_Information *atsi;
282   const char *address;
283   const char *plugin_name;
284   uint16_t address_length;
285   uint16_t plugin_name_length;
286   uint32_t ats_count;
287
288   if (NULL == msg) 
289   {
290     GNUNET_CLIENT_disconnect (sh->client, GNUNET_NO);
291     sh->client = NULL;
292     reconnect (sh);
293     return;
294   }
295   if ( (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_ATS_ADDRESS_SUGGESTION) ||
296        (ntohs (msg->size) <= sizeof (struct AddressSuggestionMessage)) )
297   {
298     GNUNET_break (0);
299     GNUNET_CLIENT_disconnect (sh->client, GNUNET_NO);
300     sh->client = NULL;
301     reconnect (sh);
302     return;
303   }
304   m = (const struct AddressSuggestionMessage*) msg;
305   ats_count = ntohl (m->ats_count);
306   address_length = ntohs (m->address_length);
307   atsi = (const struct GNUNET_TRANSPORT_ATS_Information*) &m[1];
308   address = (const char*) &atsi[ats_count];
309   plugin_name = &address[address_length];
310   plugin_name_length = ntohs (m->plugin_name_length);
311   if ( (address_length +
312         plugin_name_length +
313         ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information) +
314         sizeof (struct AddressSuggestionMessage) != ntohs (msg->size))  ||
315        (ats_count > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_TRANSPORT_ATS_Information)) ||
316        (plugin_name[plugin_name_length - 1] != '\0') )
317   {
318     GNUNET_break (0);
319     GNUNET_CLIENT_disconnect (sh->client, GNUNET_NO);
320     sh->client = NULL;
321     reconnect (sh);
322     return;
323   }
324   sh->suggest_cb (sh->suggest_cb_cls,
325                   &m->peer,
326                   plugin_name,
327                   address, address_length,
328                   find_session (sh, ntohl (m->session_id)),
329                   m->bandwidth_out,
330                   m->bandwidth_in,
331                   atsi,
332                   ats_count);
333   GNUNET_CLIENT_receive (sh->client,
334                          &process_ats_message, sh,
335                          GNUNET_TIME_UNIT_FOREVER_REL);
336 }
337
338
339 /**
340  * Re-establish the connection to the ATS service.
341  *
342  * @param sh handle to use to re-connect.
343  */
344 static void
345 reconnect (struct GNUNET_ATS_SchedulingHandle *sh)
346 {
347   struct PendingMessage *p;
348   struct ClientStartMessage *init;
349
350   GNUNET_assert (NULL == sh->client);
351   sh->client = GNUNET_CLIENT_connect ("ats", sh->cfg);
352   GNUNET_assert (NULL != sh->client);
353   GNUNET_CLIENT_receive (sh->client,
354                          &process_ats_message, sh,
355                          GNUNET_TIME_UNIT_FOREVER_REL);
356   if ( (NULL == (p = sh->pending_head)) ||
357        (GNUNET_YES != p->is_init) )
358   {
359     p = GNUNET_malloc (sizeof (struct PendingMessage) +
360                        sizeof (struct ClientStartMessage));
361     p->size = sizeof (struct ClientStartMessage);
362     p->is_init = GNUNET_YES;
363     init = (struct ClientStartMessage *) &p[1];
364     init->header.type = htons (GNUNET_MESSAGE_TYPE_ATS_START);
365     init->header.size = htons (sizeof (struct ClientStartMessage));
366     init->start_flag = htonl (START_FLAG_SCHEDULING);
367     GNUNET_CONTAINER_DLL_insert (sh->pending_head,
368                                  sh->pending_tail,
369                                  p);
370   }
371   do_transmit (sh);
372 }
373
374
375 /**
376  * Initialize the ATS subsystem.
377  *
378  * @param cfg configuration to use
379  * @param suggest_cb notification to call whenever the suggestation changed
380  * @param suggest_cb_cls closure for 'suggest_cb'
381  * @return ats context
382  */
383 struct GNUNET_ATS_SchedulingHandle *
384 GNUNET_ATS_scheduling_init (const struct GNUNET_CONFIGURATION_Handle *cfg,
385                             GNUNET_ATS_AddressSuggestionCallback suggest_cb,
386                             void *suggest_cb_cls)
387 {
388   struct GNUNET_ATS_SchedulingHandle *sh;
389
390   sh = GNUNET_malloc (sizeof (struct GNUNET_ATS_SchedulingHandle));
391   sh->cfg = cfg;
392   sh->suggest_cb = suggest_cb;
393   sh->suggest_cb_cls = suggest_cb_cls;
394   GNUNET_array_grow (sh->session_array,
395                      sh->session_array_size,
396                      4);
397   reconnect (sh);
398   return sh;
399 }
400
401
402 /**
403  * Client is done with ATS scheduling, release resources.
404  *
405  * @param sh handle to release
406  */
407 void
408 GNUNET_ATS_scheduling_done (struct GNUNET_ATS_SchedulingHandle *sh)
409 {
410   struct PendingMessage *p;
411
412   while (NULL != (p = sh->pending_head))
413   {
414     GNUNET_CONTAINER_DLL_remove (sh->pending_head,
415                                  sh->pending_tail,
416                                  p);
417     GNUNET_free (p);
418   }
419   GNUNET_CLIENT_disconnect (sh->client, GNUNET_NO);
420   GNUNET_array_grow (sh->session_array,
421                      sh->session_array_size,
422                      0);
423   GNUNET_free (sh);
424 }
425
426
427 /**
428  * We would like to establish a new connection with a peer.  ATS
429  * should suggest a good address to begin with.
430  *
431  * @param sh handle
432  * @param peer identity of the peer we need an address for
433  */
434 void
435 GNUNET_ATS_suggest_address (struct GNUNET_ATS_SchedulingHandle *sh,
436                             const struct GNUNET_PeerIdentity *peer)
437 {
438   struct PendingMessage *p;
439   struct RequestAddressMessage *m;
440
441   p = GNUNET_malloc (sizeof (struct PendingMessage) +
442                      sizeof (struct RequestAddressMessage));
443   p->size = sizeof (struct RequestAddressMessage);
444   p->is_init = GNUNET_NO;
445   m = (struct RequestAddressMessage*) &p[1];
446   m->header.type = htons (GNUNET_MESSAGE_TYPE_ATS_REQUEST_ADDRESS);
447   m->header.size = htons (sizeof (struct RequestAddressMessage));
448   m->reserved = htonl (0);
449   m->peer = *peer;
450   GNUNET_CONTAINER_DLL_insert_tail (sh->pending_head,
451                                     sh->pending_tail,
452                                     p);
453   do_transmit (sh);
454 }
455
456
457 /**
458  * We have updated performance statistics for a given address.  Note
459  * that this function can be called for addresses that are currently
460  * in use as well as addresses that are valid but not actively in use.
461  * Furthermore, the peer may not even be connected to us right now (in
462  * which case the call may be ignored or the information may be stored
463  * for later use).  Update bandwidth assignments.
464  *
465  * @param sh handle
466  * @param peer identity of the new peer
467  * @param plugin_name name of the transport plugin
468  * @param plugin_addr address  (if available)
469  * @param plugin_addr_len number of bytes in plugin_addr
470  * @param session session handle (if available)
471  * @param ats performance data for the address
472  * @param ats_count number of performance records in 'ats'
473  */
474 void
475 GNUNET_ATS_address_update (struct GNUNET_ATS_SchedulingHandle *sh,
476                            const struct GNUNET_PeerIdentity *peer,
477                            const char *plugin_name,
478                            const void *plugin_addr, size_t plugin_addr_len,
479                            struct Session *session,
480                            const struct GNUNET_TRANSPORT_ATS_Information *ats,
481                            uint32_t ats_count)
482 {
483   struct PendingMessage *p;
484   struct AddressUpdateMessage *m;
485   struct GNUNET_TRANSPORT_ATS_Information *am;
486   char *pm;
487   size_t namelen;
488   size_t msize;
489
490   namelen = (plugin_name == NULL) ? 0 : strlen (plugin_name) + 1;                                               
491   msize = sizeof (struct AddressUpdateMessage) + plugin_addr_len + 
492     ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information) + namelen;
493   if ( (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
494        (plugin_addr_len  >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
495        (namelen  >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
496        (ats_count >= GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_TRANSPORT_ATS_Information)) )
497   {
498     GNUNET_break (0);
499     return;
500   }
501   p = GNUNET_malloc (sizeof (struct PendingMessage) +  msize);
502   p->size = msize;
503   p->is_init = GNUNET_NO;
504   m = (struct AddressUpdateMessage*) &p[1];
505   m->header.type = htons (GNUNET_MESSAGE_TYPE_ATS_ADDRESS_UPDATE);
506   m->header.size = htons (msize);
507   m->ats_count = htonl (ats_count);
508   m->peer = *peer;
509   m->address_length = htons (plugin_addr_len);
510   m->plugin_name_length = htons (namelen);
511   m->session_id = htonl (get_session_id (sh, session));
512   am = (struct GNUNET_TRANSPORT_ATS_Information*) &m[1];
513   memcpy (am, ats, ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information));
514   pm = (char *) &am[ats_count];
515   memcpy (pm, plugin_addr, plugin_addr_len);
516   memcpy (&pm[plugin_addr_len], plugin_name, namelen);
517   GNUNET_CONTAINER_DLL_insert_tail (sh->pending_head,
518                                     sh->pending_tail,
519                                     p);
520   do_transmit (sh);
521 }
522
523
524 /**
525  * A session got destroyed, stop including it as a valid address.
526  *
527  * @param sh handle
528  * @param peer identity of the peer
529  * @param plugin_name name of the transport plugin
530  * @param plugin_addr address  (if available)
531  * @param plugin_addr_len number of bytes in plugin_addr
532  * @param session session handle that is no longer valid
533  */
534 void
535 GNUNET_ATS_address_destroyed (struct GNUNET_ATS_SchedulingHandle *sh,
536                               const struct GNUNET_PeerIdentity *peer,
537                               const char *plugin_name,
538                               const void *plugin_addr, 
539                               size_t plugin_addr_len,
540                               struct Session *session)
541 {
542   struct PendingMessage *p;
543   struct AddressDestroyedMessage *m;
544   char *pm;
545   size_t namelen;
546   size_t msize;
547   uint32_t session_id;
548
549   namelen = (plugin_name == NULL) ? 0 : strlen (plugin_name) + 1;                                               
550   msize = sizeof (struct AddressUpdateMessage) + plugin_addr_len + 
551     namelen;
552   if ( (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
553        (plugin_addr_len  >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
554        (namelen  >= GNUNET_SERVER_MAX_MESSAGE_SIZE) )
555   {
556     GNUNET_break (0);
557     return;
558   }
559   p = GNUNET_malloc (sizeof (struct PendingMessage) +  msize);
560   p->size = msize;
561   p->is_init = GNUNET_NO;
562   m = (struct AddressDestroyedMessage*) &p[1];
563   m->header.type = htons (GNUNET_MESSAGE_TYPE_ATS_ADDRESS_DESTROYED);
564   m->header.size = htons (msize);
565   m->reserved = htonl (0);
566   m->peer = *peer;
567   m->address_length = htons (plugin_addr_len);
568   m->plugin_name_length = htons (namelen);
569   m->session_id = htonl (session_id = get_session_id (sh, session));
570   pm = (char *) &m[1];
571   memcpy (pm, plugin_addr, plugin_addr_len);
572   memcpy (&pm[plugin_addr_len], plugin_name, namelen);
573   GNUNET_CONTAINER_DLL_insert_tail (sh->pending_head,
574                                     sh->pending_tail,
575                                     p);
576   do_transmit (sh);
577   remove_session (sh, session_id);
578 }
579
580 /* end of ats_api_scheduling.c */