fix stack overflow
[oweals/gnunet.git] / src / ats / ats_api_scheduling.c
1 /*
2      This file is part of GNUnet.
3      (C) 2010,2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20 /**
21  * @file ats/ats_api_scheduling.c
22  * @brief automatic transport selection and outbound bandwidth determination
23  * @author Christian Grothoff
24  * @author Matthias Wachs
25  */
26 #include "platform.h"
27 #include "gnunet_ats_service.h"
28 #include "ats.h"
29
30
31 /**
32  * Message in linked list we should send to the ATS service.  The
33  * actual binary message follows this struct.
34  */
35 struct PendingMessage
36 {
37
38   /**
39    * Kept in a DLL.
40    */ 
41   struct PendingMessage *next;
42
43   /**
44    * Kept in a DLL.
45    */ 
46   struct PendingMessage *prev;
47
48   /**
49    * Size of the message.
50    */
51   size_t size;
52
53   /**
54    * Is this the 'ATS_START' message?
55    */ 
56   int is_init;
57 };
58
59
60 /**
61  * Handle to the ATS subsystem for bandwidth/transport scheduling information.
62  */
63 struct GNUNET_ATS_SchedulingHandle
64 {
65   
66   /**
67    * Our configuration.
68    */
69   const struct GNUNET_CONFIGURATION_Handle *cfg;
70
71   /**
72    * Callback to invoke on suggestions.
73    */
74   GNUNET_ATS_AddressSuggestionCallback suggest_cb;
75   
76   /**
77    * Closure for 'suggest_cb'.
78    */
79   void *suggest_cb_cls;
80
81   /**
82    * Connection to ATS service.
83    */
84   struct GNUNET_CLIENT_Connection *client;
85
86   /**
87    * Head of list of messages for the ATS service.
88    */
89   struct PendingMessage *pending_head;
90
91   /**
92    * Tail of list of messages for the ATS service
93    */
94   struct PendingMessage *pending_tail;
95
96   /**
97    * Current request for transmission to ATS.
98    */
99   struct GNUNET_CLIENT_TransmitHandle *th;
100
101   /**
102    * Array of session objects (we need to translate them to numbers and back
103    * for the protocol; the offset in the array is the session number on the
104    * network).  Index 0 is always NULL and reserved to represent the NULL pointer.
105    * Unused entries are also NULL.
106    */
107   struct Session **session_array;
108
109   /**
110    * Task to trigger reconnect.
111    */ 
112   GNUNET_SCHEDULER_TaskIdentifier task;
113   
114   /**
115    * Size of the session array.
116    */
117   unsigned int session_array_size;
118
119 };
120
121
122 /**
123  * Re-establish the connection to the ATS service.
124  *
125  * @param sh handle to use to re-connect.
126  */
127 static void
128 reconnect (struct GNUNET_ATS_SchedulingHandle *sh);
129
130
131
132 /**
133  * Re-establish the connection to the ATS service.
134  *
135  * @param cls handle to use to re-connect.
136  * @param tc scheduler context
137  */
138 static void
139 reconnect_task (void *cls,
140                 const struct GNUNET_SCHEDULER_TaskContext *tc)
141 {
142   struct GNUNET_ATS_SchedulingHandle *sh = cls;
143
144   sh->task = GNUNET_SCHEDULER_NO_TASK;
145   reconnect (sh);
146 }
147
148
149 /**
150  * Transmit messages from the message queue to the service
151  * (if there are any, and if we are not already trying).
152  *
153  * @param sh handle to use
154  */
155 static void
156 do_transmit (struct GNUNET_ATS_SchedulingHandle *sh);
157
158
159 /**
160  * We can now transmit a message to ATS. Do it.
161  *
162  * @param cls the 'struct GNUNET_ATS_SchedulingHandle'
163  * @param size number of bytes we can transmit to ATS
164  * @param buf where to copy the messages
165  * @return number of bytes copied into buf
166  */
167 static size_t
168 transmit_message_to_ats (void *cls,
169                          size_t size,
170                          void *buf)
171 {
172   struct GNUNET_ATS_SchedulingHandle *sh = cls;
173   struct PendingMessage *p;
174   size_t ret;
175   char *cbuf;
176
177   sh->th = NULL;
178   ret = 0;
179   cbuf = buf;
180   while ( (NULL != (p = sh->pending_head)) &&
181           (p->size <= size) )
182   {
183     memcpy (&cbuf[ret], &p[1], p->size);    
184     ret += p->size;
185     size -= p->size;
186     GNUNET_CONTAINER_DLL_remove (sh->pending_head,
187                                  sh->pending_tail,
188                                  p);
189     GNUNET_free (p);
190   }
191   do_transmit (sh);
192   return ret;
193 }
194
195
196 /**
197  * Transmit messages from the message queue to the service
198  * (if there are any, and if we are not already trying).
199  *
200  * @param sh handle to use
201  */
202 static void
203 do_transmit (struct GNUNET_ATS_SchedulingHandle *sh)
204 {
205   struct PendingMessage *p;
206
207   if (NULL != sh->th)
208     return;
209   if (NULL == (p = sh->pending_head))
210     return;
211   sh->th = GNUNET_CLIENT_notify_transmit_ready (sh->client,
212                                                 p->size,
213                                                 GNUNET_TIME_UNIT_FOREVER_REL,
214                                                 GNUNET_YES,
215                                                 &transmit_message_to_ats, sh);
216 }
217
218
219 /**
220  * Find the session object corresponding to the given session ID.
221  *
222  * @param sh our handle
223  * @param session_id current session ID
224  * @return the session object (or NULL)
225  */
226 static struct Session*
227 find_session (struct GNUNET_ATS_SchedulingHandle *sh,
228               uint32_t session_id)
229 {
230   if (session_id >= sh->session_array_size)
231   {
232     GNUNET_break (0);
233     return NULL;
234   }
235   return sh->session_array[session_id];
236 }
237
238
239 /**
240  * Get the ID for the given session object.  If we do not have an ID for
241  * the given session object, allocate one.
242  *
243  * @param sh our handle
244  * @param session session object
245  * @return the session id
246  */
247 static uint32_t 
248 get_session_id (struct GNUNET_ATS_SchedulingHandle *sh,
249                 struct Session *session)
250 {
251   unsigned int i;
252   unsigned int f;
253   
254   f = 0;
255   for (i=1;i<sh->session_array_size;i++)
256   {
257     if (session == sh->session_array[i])
258       return i;
259     if ( (f == 0) &&
260          (sh->session_array[i] == NULL) )
261       f = i;
262   }
263   if (f == 0)
264   {
265     f = sh->session_array_size;
266     GNUNET_array_grow (sh->session_array,
267                        sh->session_array_size,
268                        sh->session_array_size * 2);
269   }
270   sh->session_array[f] = session;
271   return f;
272 }
273
274
275 /**
276  * Remove the session of the given session ID from the session
277  * table (it is no longer valid).
278  *
279  * @param sh our handle
280  * @param session_id identifies session that is no longer valid
281  */
282 static void
283 remove_session (struct GNUNET_ATS_SchedulingHandle *sh,
284                 uint32_t session_id)
285 {
286   GNUNET_assert (session_id < sh->session_array_size);
287   sh->session_array[session_id] = NULL;
288 }
289
290
291 /**
292  * Type of a function to call when we receive a message
293  * from the service.
294  *
295  * @param cls the 'struct GNUNET_ATS_SchedulingHandle'
296  * @param msg message received, NULL on timeout or fatal error
297  */
298 static void
299 process_ats_message (void *cls,
300                      const struct GNUNET_MessageHeader *msg)
301 {
302   struct GNUNET_ATS_SchedulingHandle *sh = cls;
303   const struct AddressSuggestionMessage *m;
304   const struct GNUNET_TRANSPORT_ATS_Information *atsi;
305   const char *address;
306   const char *plugin_name;
307   uint16_t address_length;
308   uint16_t plugin_name_length;
309   uint32_t ats_count;
310
311   if (NULL == msg) 
312   {
313     GNUNET_CLIENT_disconnect (sh->client, GNUNET_NO);
314     sh->client = NULL;
315     sh->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
316                                              &reconnect_task, sh);
317     return;
318   }
319   if ( (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_ATS_ADDRESS_SUGGESTION) ||
320        (ntohs (msg->size) <= sizeof (struct AddressSuggestionMessage)) )
321   {
322     GNUNET_break (0);
323     GNUNET_CLIENT_disconnect (sh->client, GNUNET_NO);
324     sh->client = NULL;
325     sh->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
326                                              &reconnect_task, sh);
327     return;
328   }
329   m = (const struct AddressSuggestionMessage*) msg;
330   ats_count = ntohl (m->ats_count);
331   address_length = ntohs (m->address_length);
332   atsi = (const struct GNUNET_TRANSPORT_ATS_Information*) &m[1];
333   address = (const char*) &atsi[ats_count];
334   plugin_name = &address[address_length];
335   plugin_name_length = ntohs (m->plugin_name_length);
336   if ( (address_length +
337         plugin_name_length +
338         ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information) +
339         sizeof (struct AddressSuggestionMessage) != ntohs (msg->size))  ||
340        (ats_count > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_TRANSPORT_ATS_Information)) ||
341        (plugin_name[plugin_name_length - 1] != '\0') )
342   {
343     GNUNET_break (0);
344     GNUNET_CLIENT_disconnect (sh->client, GNUNET_NO);
345     sh->client = NULL;
346     sh->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
347                                              &reconnect_task, sh);
348     return;
349   }
350   sh->suggest_cb (sh->suggest_cb_cls,
351                   &m->peer,
352                   plugin_name,
353                   address, address_length,
354                   find_session (sh, ntohl (m->session_id)),
355                   m->bandwidth_out,
356                   m->bandwidth_in,
357                   atsi,
358                   ats_count);
359   GNUNET_CLIENT_receive (sh->client,
360                          &process_ats_message, sh,
361                          GNUNET_TIME_UNIT_FOREVER_REL);
362 }
363
364
365 /**
366  * Re-establish the connection to the ATS service.
367  *
368  * @param sh handle to use to re-connect.
369  */
370 static void
371 reconnect (struct GNUNET_ATS_SchedulingHandle *sh)
372 {
373   struct PendingMessage *p;
374   struct ClientStartMessage *init;
375
376   GNUNET_assert (NULL == sh->client);
377   sh->client = GNUNET_CLIENT_connect ("ats", sh->cfg);
378   GNUNET_assert (NULL != sh->client);
379   GNUNET_CLIENT_receive (sh->client,
380                          &process_ats_message, sh,
381                          GNUNET_TIME_UNIT_FOREVER_REL);
382   if ( (NULL == (p = sh->pending_head)) ||
383        (GNUNET_YES != p->is_init) )
384   {
385     p = GNUNET_malloc (sizeof (struct PendingMessage) +
386                        sizeof (struct ClientStartMessage));
387     p->size = sizeof (struct ClientStartMessage);
388     p->is_init = GNUNET_YES;
389     init = (struct ClientStartMessage *) &p[1];
390     init->header.type = htons (GNUNET_MESSAGE_TYPE_ATS_START);
391     init->header.size = htons (sizeof (struct ClientStartMessage));
392     init->start_flag = htonl (START_FLAG_SCHEDULING);
393     GNUNET_CONTAINER_DLL_insert (sh->pending_head,
394                                  sh->pending_tail,
395                                  p);
396   }
397   do_transmit (sh);
398 }
399
400
401 /**
402  * Initialize the ATS subsystem.
403  *
404  * @param cfg configuration to use
405  * @param suggest_cb notification to call whenever the suggestation changed
406  * @param suggest_cb_cls closure for 'suggest_cb'
407  * @return ats context
408  */
409 struct GNUNET_ATS_SchedulingHandle *
410 GNUNET_ATS_scheduling_init (const struct GNUNET_CONFIGURATION_Handle *cfg,
411                             GNUNET_ATS_AddressSuggestionCallback suggest_cb,
412                             void *suggest_cb_cls)
413 {
414   struct GNUNET_ATS_SchedulingHandle *sh;
415
416   sh = GNUNET_malloc (sizeof (struct GNUNET_ATS_SchedulingHandle));
417   sh->cfg = cfg;
418   sh->suggest_cb = suggest_cb;
419   sh->suggest_cb_cls = suggest_cb_cls;
420   GNUNET_array_grow (sh->session_array,
421                      sh->session_array_size,
422                      4);
423   reconnect (sh);
424   return sh;
425 }
426
427
428 /**
429  * Client is done with ATS scheduling, release resources.
430  *
431  * @param sh handle to release
432  */
433 void
434 GNUNET_ATS_scheduling_done (struct GNUNET_ATS_SchedulingHandle *sh)
435 {
436   struct PendingMessage *p;
437
438   while (NULL != (p = sh->pending_head))
439   {
440     GNUNET_CONTAINER_DLL_remove (sh->pending_head,
441                                  sh->pending_tail,
442                                  p);
443     GNUNET_free (p);
444   }
445   if (NULL != sh->client)
446   {
447     GNUNET_CLIENT_disconnect (sh->client, GNUNET_NO);
448     sh->client = NULL;
449   }
450   if (GNUNET_SCHEDULER_NO_TASK != sh->task)
451   {
452     GNUNET_SCHEDULER_cancel (sh->task);
453     sh->task = GNUNET_SCHEDULER_NO_TASK;
454   }
455   GNUNET_array_grow (sh->session_array,
456                      sh->session_array_size,
457                      0);
458   GNUNET_free (sh);
459 }
460
461
462 /**
463  * We would like to establish a new connection with a peer.  ATS
464  * should suggest a good address to begin with.
465  *
466  * @param sh handle
467  * @param peer identity of the peer we need an address for
468  */
469 void
470 GNUNET_ATS_suggest_address (struct GNUNET_ATS_SchedulingHandle *sh,
471                             const struct GNUNET_PeerIdentity *peer)
472 {
473   struct PendingMessage *p;
474   struct RequestAddressMessage *m;
475
476   p = GNUNET_malloc (sizeof (struct PendingMessage) +
477                      sizeof (struct RequestAddressMessage));
478   p->size = sizeof (struct RequestAddressMessage);
479   p->is_init = GNUNET_NO;
480   m = (struct RequestAddressMessage*) &p[1];
481   m->header.type = htons (GNUNET_MESSAGE_TYPE_ATS_REQUEST_ADDRESS);
482   m->header.size = htons (sizeof (struct RequestAddressMessage));
483   m->reserved = htonl (0);
484   m->peer = *peer;
485   GNUNET_CONTAINER_DLL_insert_tail (sh->pending_head,
486                                     sh->pending_tail,
487                                     p);
488   do_transmit (sh);
489 }
490
491
492 /**
493  * We have updated performance statistics for a given address.  Note
494  * that this function can be called for addresses that are currently
495  * in use as well as addresses that are valid but not actively in use.
496  * Furthermore, the peer may not even be connected to us right now (in
497  * which case the call may be ignored or the information may be stored
498  * for later use).  Update bandwidth assignments.
499  *
500  * @param sh handle
501  * @param peer identity of the new peer
502  * @param plugin_name name of the transport plugin
503  * @param plugin_addr address  (if available)
504  * @param plugin_addr_len number of bytes in plugin_addr
505  * @param session session handle (if available)
506  * @param ats performance data for the address
507  * @param ats_count number of performance records in 'ats'
508  */
509 void
510 GNUNET_ATS_address_update (struct GNUNET_ATS_SchedulingHandle *sh,
511                            const struct GNUNET_PeerIdentity *peer,
512                            const char *plugin_name,
513                            const void *plugin_addr, size_t plugin_addr_len,
514                            struct Session *session,
515                            const struct GNUNET_TRANSPORT_ATS_Information *ats,
516                            uint32_t ats_count)
517 {
518   struct PendingMessage *p;
519   struct AddressUpdateMessage *m;
520   struct GNUNET_TRANSPORT_ATS_Information *am;
521   char *pm;
522   size_t namelen;
523   size_t msize;
524
525   namelen = (plugin_name == NULL) ? 0 : strlen (plugin_name) + 1;                                               
526   msize = sizeof (struct AddressUpdateMessage) + plugin_addr_len + 
527     ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information) + namelen;
528   if ( (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
529        (plugin_addr_len  >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
530        (namelen  >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
531        (ats_count >= GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_TRANSPORT_ATS_Information)) )
532   {
533     GNUNET_break (0);
534     return;
535   }
536   p = GNUNET_malloc (sizeof (struct PendingMessage) +  msize);
537   p->size = msize;
538   p->is_init = GNUNET_NO;
539   m = (struct AddressUpdateMessage*) &p[1];
540   m->header.type = htons (GNUNET_MESSAGE_TYPE_ATS_ADDRESS_UPDATE);
541   m->header.size = htons (msize);
542   m->ats_count = htonl (ats_count);
543   m->peer = *peer;
544   m->address_length = htons (plugin_addr_len);
545   m->plugin_name_length = htons (namelen);
546   m->session_id = htonl (get_session_id (sh, session));
547   am = (struct GNUNET_TRANSPORT_ATS_Information*) &m[1];
548   memcpy (am, ats, ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information));
549   pm = (char *) &am[ats_count];
550   memcpy (pm, plugin_addr, plugin_addr_len);
551   memcpy (&pm[plugin_addr_len], plugin_name, namelen);
552   GNUNET_CONTAINER_DLL_insert_tail (sh->pending_head,
553                                     sh->pending_tail,
554                                     p);
555   do_transmit (sh);
556 }
557
558
559 /**
560  * A session got destroyed, stop including it as a valid address.
561  *
562  * @param sh handle
563  * @param peer identity of the peer
564  * @param plugin_name name of the transport plugin
565  * @param plugin_addr address  (if available)
566  * @param plugin_addr_len number of bytes in plugin_addr
567  * @param session session handle that is no longer valid
568  */
569 void
570 GNUNET_ATS_address_destroyed (struct GNUNET_ATS_SchedulingHandle *sh,
571                               const struct GNUNET_PeerIdentity *peer,
572                               const char *plugin_name,
573                               const void *plugin_addr, 
574                               size_t plugin_addr_len,
575                               struct Session *session)
576 {
577   struct PendingMessage *p;
578   struct AddressDestroyedMessage *m;
579   char *pm;
580   size_t namelen;
581   size_t msize;
582   uint32_t session_id;
583
584   namelen = (plugin_name == NULL) ? 0 : strlen (plugin_name) + 1;                                               
585   msize = sizeof (struct AddressUpdateMessage) + plugin_addr_len + 
586     namelen;
587   if ( (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
588        (plugin_addr_len  >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
589        (namelen  >= GNUNET_SERVER_MAX_MESSAGE_SIZE) )
590   {
591     GNUNET_break (0);
592     return;
593   }
594   p = GNUNET_malloc (sizeof (struct PendingMessage) +  msize);
595   p->size = msize;
596   p->is_init = GNUNET_NO;
597   m = (struct AddressDestroyedMessage*) &p[1];
598   m->header.type = htons (GNUNET_MESSAGE_TYPE_ATS_ADDRESS_DESTROYED);
599   m->header.size = htons (msize);
600   m->reserved = htonl (0);
601   m->peer = *peer;
602   m->address_length = htons (plugin_addr_len);
603   m->plugin_name_length = htons (namelen);
604   m->session_id = htonl (session_id = get_session_id (sh, session));
605   pm = (char *) &m[1];
606   memcpy (pm, plugin_addr, plugin_addr_len);
607   memcpy (&pm[plugin_addr_len], plugin_name, namelen);
608   GNUNET_CONTAINER_DLL_insert_tail (sh->pending_head,
609                                     sh->pending_tail,
610                                     p);
611   do_transmit (sh);
612   remove_session (sh, session_id);
613 }
614
615 /* end of ats_api_scheduling.c */