extra checks
[oweals/gnunet.git] / src / ats / ats_api_scheduling.c
1 /*
2      This file is part of GNUnet.
3      (C) 2010,2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20 /**
21  * @file ats/ats_api_scheduling.c
22  * @brief automatic transport selection and outbound bandwidth determination
23  * @author Christian Grothoff
24  * @author Matthias Wachs
25  */
26 #include "platform.h"
27 #include "gnunet_ats_service.h"
28 #include "ats.h"
29
30
31 /**
32  * Message in linked list we should send to the ATS service.  The
33  * actual binary message follows this struct.
34  */
35 struct PendingMessage
36 {
37
38   /**
39    * Kept in a DLL.
40    */ 
41   struct PendingMessage *next;
42
43   /**
44    * Kept in a DLL.
45    */ 
46   struct PendingMessage *prev;
47
48   /**
49    * Size of the message.
50    */
51   size_t size;
52
53   /**
54    * Is this the 'ATS_START' message?
55    */ 
56   int is_init;
57 };
58
59
60 /**
61  * Information we track per session.
62  */
63 struct SessionRecord
64 {
65   /**
66    * Identity of the peer (just needed for error checking).
67    */
68   struct GNUNET_PeerIdentity peer;
69
70   /**
71    * Session handle.
72    */
73   struct Session *session;
74 };
75
76
77 /**
78  * Handle to the ATS subsystem for bandwidth/transport scheduling information.
79  */
80 struct GNUNET_ATS_SchedulingHandle
81 {
82   
83   /**
84    * Our configuration.
85    */
86   const struct GNUNET_CONFIGURATION_Handle *cfg;
87
88   /**
89    * Callback to invoke on suggestions.
90    */
91   GNUNET_ATS_AddressSuggestionCallback suggest_cb;
92   
93   /**
94    * Closure for 'suggest_cb'.
95    */
96   void *suggest_cb_cls;
97
98   /**
99    * Connection to ATS service.
100    */
101   struct GNUNET_CLIENT_Connection *client;
102
103   /**
104    * Head of list of messages for the ATS service.
105    */
106   struct PendingMessage *pending_head;
107
108   /**
109    * Tail of list of messages for the ATS service
110    */
111   struct PendingMessage *pending_tail;
112
113   /**
114    * Current request for transmission to ATS.
115    */
116   struct GNUNET_CLIENT_TransmitHandle *th;
117
118   /**
119    * Array of session objects (we need to translate them to numbers and back
120    * for the protocol; the offset in the array is the session number on the
121    * network).  Index 0 is always NULL and reserved to represent the NULL pointer.
122    * Unused entries are also NULL.
123    */
124   struct SessionRecord *session_array;
125
126   /**
127    * Task to trigger reconnect.
128    */ 
129   GNUNET_SCHEDULER_TaskIdentifier task;
130   
131   /**
132    * Size of the session array.
133    */
134   unsigned int session_array_size;
135
136 };
137
138
139 /**
140  * Re-establish the connection to the ATS service.
141  *
142  * @param sh handle to use to re-connect.
143  */
144 static void
145 reconnect (struct GNUNET_ATS_SchedulingHandle *sh);
146
147
148
149 /**
150  * Re-establish the connection to the ATS service.
151  *
152  * @param cls handle to use to re-connect.
153  * @param tc scheduler context
154  */
155 static void
156 reconnect_task (void *cls,
157                 const struct GNUNET_SCHEDULER_TaskContext *tc)
158 {
159   struct GNUNET_ATS_SchedulingHandle *sh = cls;
160
161   sh->task = GNUNET_SCHEDULER_NO_TASK;
162   reconnect (sh);
163 }
164
165
166 /**
167  * Transmit messages from the message queue to the service
168  * (if there are any, and if we are not already trying).
169  *
170  * @param sh handle to use
171  */
172 static void
173 do_transmit (struct GNUNET_ATS_SchedulingHandle *sh);
174
175
176 /**
177  * Type of a function to call when we receive a message
178  * from the service.
179  *
180  * @param cls the 'struct GNUNET_ATS_SchedulingHandle'
181  * @param msg message received, NULL on timeout or fatal error
182  */
183 static void
184 process_ats_message (void *cls,
185                      const struct GNUNET_MessageHeader *msg);
186
187
188 /**
189  * We can now transmit a message to ATS. Do it.
190  *
191  * @param cls the 'struct GNUNET_ATS_SchedulingHandle'
192  * @param size number of bytes we can transmit to ATS
193  * @param buf where to copy the messages
194  * @return number of bytes copied into buf
195  */
196 static size_t
197 transmit_message_to_ats (void *cls,
198                          size_t size,
199                          void *buf)
200 {
201   struct GNUNET_ATS_SchedulingHandle *sh = cls;
202   struct PendingMessage *p;
203   size_t ret;
204   char *cbuf;
205
206   sh->th = NULL;
207   ret = 0;
208   cbuf = buf;
209   while ( (NULL != (p = sh->pending_head)) &&
210           (p->size <= size) )
211   {
212     memcpy (&cbuf[ret], &p[1], p->size);    
213     ret += p->size;
214     size -= p->size;
215     GNUNET_CONTAINER_DLL_remove (sh->pending_head,
216                                  sh->pending_tail,
217                                  p);
218     if (GNUNET_YES == p->is_init)
219       GNUNET_CLIENT_receive (sh->client,
220                              &process_ats_message, sh,
221                              GNUNET_TIME_UNIT_FOREVER_REL);
222     GNUNET_free (p);
223   }
224   do_transmit (sh);
225   return ret;
226 }
227
228
229 /**
230  * Transmit messages from the message queue to the service
231  * (if there are any, and if we are not already trying).
232  *
233  * @param sh handle to use
234  */
235 static void
236 do_transmit (struct GNUNET_ATS_SchedulingHandle *sh)
237 {
238   struct PendingMessage *p;
239
240   if (NULL != sh->th)
241     return;
242   if (NULL == (p = sh->pending_head))
243     return;
244   if (NULL == sh->client)
245     return; /* currently reconnecting */
246   sh->th = GNUNET_CLIENT_notify_transmit_ready (sh->client,
247                                                 p->size,
248                                                 GNUNET_TIME_UNIT_FOREVER_REL,
249                                                 GNUNET_YES,
250                                                 &transmit_message_to_ats, sh);
251 }
252
253
254 /**
255  * Find the session object corresponding to the given session ID.
256  *
257  * @param sh our handle
258  * @param session_id current session ID
259  * @param peer peer the session belongs to
260  * @return the session object (or NULL)
261  */
262 static struct Session*
263 find_session (struct GNUNET_ATS_SchedulingHandle *sh,
264               uint32_t session_id,
265               const struct GNUNET_PeerIdentity *peer)
266 {
267   if (session_id >= sh->session_array_size)
268   {
269     GNUNET_break (0);
270     return NULL;
271   }
272   GNUNET_assert (0 == memcmp (peer,
273                               &sh->session_array[session_id].peer,
274                               sizeof (struct GNUNET_PeerIdentity)));
275   return sh->session_array[session_id].session;
276 }
277
278
279 /**
280  * Get the ID for the given session object.  If we do not have an ID for
281  * the given session object, allocate one.
282  *
283  * @param sh our handle
284  * @param session session object
285  * @param peer peer the session belongs to
286  * @return the session id
287  */
288 static uint32_t 
289 get_session_id (struct GNUNET_ATS_SchedulingHandle *sh,
290                 struct Session *session,
291                 const struct GNUNET_PeerIdentity *peer)
292 {
293   unsigned int i;
294   unsigned int f;
295   
296   f = 0;
297   for (i=1;i<sh->session_array_size;i++)
298   {
299     if (session == sh->session_array[i].session)
300     {
301       GNUNET_assert (0 == memcmp (peer,
302                                   &sh->session_array[i].peer,
303                                   sizeof (struct GNUNET_PeerIdentity)));
304       return i;
305     }
306     if ( (f == 0) &&
307          (sh->session_array[i].session == NULL) )
308       f = i;
309   }
310   if (f == 0)
311   {    
312     f = sh->session_array_size;
313     GNUNET_array_grow (sh->session_array,
314                        sh->session_array_size,
315                        sh->session_array_size * 2);
316   }
317   GNUNET_assert (f > 0);
318   sh->session_array[f].session = session;
319   sh->session_array[f].peer = *peer;
320   return f;
321 }
322
323
324 /**
325  * Remove the session of the given session ID from the session
326  * table (it is no longer valid).
327  *
328  * @param sh our handle
329  * @param session_id identifies session that is no longer valid
330  * @param peer peer the session belongs to
331  */
332 static void
333 remove_session (struct GNUNET_ATS_SchedulingHandle *sh,
334                 uint32_t session_id,
335                 const struct GNUNET_PeerIdentity *peer)
336 {
337   GNUNET_assert (session_id < sh->session_array_size);
338   GNUNET_assert (0 == memcmp (peer,
339                               &sh->session_array[session_id].peer,
340                               sizeof (struct GNUNET_PeerIdentity)));
341   sh->session_array[session_id].session = NULL;
342 }
343
344
345 /**
346  * Type of a function to call when we receive a message
347  * from the service.
348  *
349  * @param cls the 'struct GNUNET_ATS_SchedulingHandle'
350  * @param msg message received, NULL on timeout or fatal error
351  */
352 static void
353 process_ats_message (void *cls,
354                      const struct GNUNET_MessageHeader *msg)
355 {
356   struct GNUNET_ATS_SchedulingHandle *sh = cls;
357   const struct AddressSuggestionMessage *m;
358   const struct GNUNET_ATS_Information *atsi;
359   const char *address;
360   const char *plugin_name;
361   uint16_t address_length;
362   uint16_t plugin_name_length;
363   uint32_t ats_count;
364
365   if (NULL == msg) 
366   {
367     GNUNET_CLIENT_disconnect (sh->client, GNUNET_NO);
368     sh->client = NULL;
369     sh->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
370                                              &reconnect_task, sh);
371     return;
372   }
373   if ( (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_ATS_ADDRESS_SUGGESTION) ||
374        (ntohs (msg->size) <= sizeof (struct AddressSuggestionMessage)) )
375   {
376     GNUNET_break (0);
377     GNUNET_CLIENT_disconnect (sh->client, GNUNET_NO);
378     sh->client = NULL;
379     sh->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
380                                              &reconnect_task, sh);
381     return;
382   }
383   m = (const struct AddressSuggestionMessage*) msg;
384   ats_count = ntohl (m->ats_count);
385   address_length = ntohs (m->address_length);
386   atsi = (const struct GNUNET_ATS_Information*) &m[1];
387   address = (const char*) &atsi[ats_count];
388   plugin_name = &address[address_length];
389   plugin_name_length = ntohs (m->plugin_name_length);
390   if ( (address_length +
391         plugin_name_length +
392         ats_count * sizeof (struct GNUNET_ATS_Information) +
393         sizeof (struct AddressSuggestionMessage) != ntohs (msg->size))  ||
394        (ats_count > GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_ATS_Information)) ||
395        (plugin_name[plugin_name_length - 1] != '\0') )
396   {
397     GNUNET_break (0);
398     GNUNET_CLIENT_disconnect (sh->client, GNUNET_NO);
399     sh->client = NULL;
400     sh->task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
401                                              &reconnect_task, sh);
402     return;
403   }
404   sh->suggest_cb (sh->suggest_cb_cls,
405                   &m->peer,
406                   plugin_name,
407                   address, address_length,
408                   find_session (sh, ntohl (m->session_id), &m->peer),
409                   m->bandwidth_out,
410                   m->bandwidth_in,
411                   atsi,
412                   ats_count);
413   GNUNET_CLIENT_receive (sh->client,
414                          &process_ats_message, sh,
415                          GNUNET_TIME_UNIT_FOREVER_REL);
416 }
417
418
419 /**
420  * Re-establish the connection to the ATS service.
421  *
422  * @param sh handle to use to re-connect.
423  */
424 static void
425 reconnect (struct GNUNET_ATS_SchedulingHandle *sh)
426 {
427   struct PendingMessage *p;
428   struct ClientStartMessage *init;
429
430   GNUNET_assert (NULL == sh->client);
431   sh->client = GNUNET_CLIENT_connect ("ats", sh->cfg);
432   GNUNET_assert (NULL != sh->client);
433   if ( (NULL == (p = sh->pending_head)) ||
434        (GNUNET_YES != p->is_init) )
435   {
436     p = GNUNET_malloc (sizeof (struct PendingMessage) +
437                        sizeof (struct ClientStartMessage));
438     p->size = sizeof (struct ClientStartMessage);
439     p->is_init = GNUNET_YES;
440     init = (struct ClientStartMessage *) &p[1];
441     init->header.type = htons (GNUNET_MESSAGE_TYPE_ATS_START);
442     init->header.size = htons (sizeof (struct ClientStartMessage));
443     init->start_flag = htonl (START_FLAG_SCHEDULING);
444     GNUNET_CONTAINER_DLL_insert (sh->pending_head,
445                                  sh->pending_tail,
446                                  p);
447   }
448   do_transmit (sh);
449 }
450
451
452 /**
453  * Initialize the ATS subsystem.
454  *
455  * @param cfg configuration to use
456  * @param suggest_cb notification to call whenever the suggestation changed
457  * @param suggest_cb_cls closure for 'suggest_cb'
458  * @return ats context
459  */
460 struct GNUNET_ATS_SchedulingHandle *
461 GNUNET_ATS_scheduling_init (const struct GNUNET_CONFIGURATION_Handle *cfg,
462                             GNUNET_ATS_AddressSuggestionCallback suggest_cb,
463                             void *suggest_cb_cls)
464 {
465   struct GNUNET_ATS_SchedulingHandle *sh;
466
467   sh = GNUNET_malloc (sizeof (struct GNUNET_ATS_SchedulingHandle));
468   sh->cfg = cfg;
469   sh->suggest_cb = suggest_cb;
470   sh->suggest_cb_cls = suggest_cb_cls;
471   GNUNET_array_grow (sh->session_array,
472                      sh->session_array_size,
473                      4);
474   reconnect (sh);
475   return sh;
476 }
477
478
479 /**
480  * Client is done with ATS scheduling, release resources.
481  *
482  * @param sh handle to release
483  */
484 void
485 GNUNET_ATS_scheduling_done (struct GNUNET_ATS_SchedulingHandle *sh)
486 {
487   struct PendingMessage *p;
488
489   while (NULL != (p = sh->pending_head))
490   {
491     GNUNET_CONTAINER_DLL_remove (sh->pending_head,
492                                  sh->pending_tail,
493                                  p);
494     GNUNET_free (p);
495   }
496   if (NULL != sh->client)
497   {
498     GNUNET_CLIENT_disconnect (sh->client, GNUNET_NO);
499     sh->client = NULL;
500   }
501   if (GNUNET_SCHEDULER_NO_TASK != sh->task)
502   {
503     GNUNET_SCHEDULER_cancel (sh->task);
504     sh->task = GNUNET_SCHEDULER_NO_TASK;
505   }
506   GNUNET_array_grow (sh->session_array,
507                      sh->session_array_size,
508                      0);
509   GNUNET_free (sh);
510 }
511
512
513 /**
514  * We would like to establish a new connection with a peer.  ATS
515  * should suggest a good address to begin with.
516  *
517  * @param sh handle
518  * @param peer identity of the peer we need an address for
519  */
520 void
521 GNUNET_ATS_suggest_address (struct GNUNET_ATS_SchedulingHandle *sh,
522                             const struct GNUNET_PeerIdentity *peer)
523 {
524   struct PendingMessage *p;
525   struct RequestAddressMessage *m;
526
527   p = GNUNET_malloc (sizeof (struct PendingMessage) +
528                      sizeof (struct RequestAddressMessage));
529   p->size = sizeof (struct RequestAddressMessage);
530   p->is_init = GNUNET_NO;
531   m = (struct RequestAddressMessage*) &p[1];
532   m->header.type = htons (GNUNET_MESSAGE_TYPE_ATS_REQUEST_ADDRESS);
533   m->header.size = htons (sizeof (struct RequestAddressMessage));
534   m->reserved = htonl (0);
535   m->peer = *peer;
536   GNUNET_CONTAINER_DLL_insert_tail (sh->pending_head,
537                                     sh->pending_tail,
538                                     p);
539   do_transmit (sh);
540 }
541
542
543 /**
544  * We have updated performance statistics for a given address.  Note
545  * that this function can be called for addresses that are currently
546  * in use as well as addresses that are valid but not actively in use.
547  * Furthermore, the peer may not even be connected to us right now (in
548  * which case the call may be ignored or the information may be stored
549  * for later use).  Update bandwidth assignments.
550  *
551  * @param sh handle
552  * @param peer identity of the new peer
553  * @param plugin_name name of the transport plugin
554  * @param plugin_addr address  (if available)
555  * @param plugin_addr_len number of bytes in plugin_addr
556  * @param session session handle (if available)
557  * @param ats performance data for the address
558  * @param ats_count number of performance records in 'ats'
559  */
560 void
561 GNUNET_ATS_address_update (struct GNUNET_ATS_SchedulingHandle *sh,
562                            const struct GNUNET_PeerIdentity *peer,
563                            const char *plugin_name,
564                            const void *plugin_addr, size_t plugin_addr_len,
565                            struct Session *session,
566                            const struct GNUNET_ATS_Information *ats,
567                            uint32_t ats_count)
568 {
569   struct PendingMessage *p;
570   struct AddressUpdateMessage *m;
571   struct GNUNET_ATS_Information *am;
572   char *pm;
573   size_t namelen;
574   size_t msize;
575
576   namelen = (plugin_name == NULL) ? 0 : strlen (plugin_name) + 1;                                               
577   msize = sizeof (struct AddressUpdateMessage) + plugin_addr_len + 
578     ats_count * sizeof (struct GNUNET_ATS_Information) + namelen;
579   if ( (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
580        (plugin_addr_len  >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
581        (namelen  >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
582        (ats_count >= GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_ATS_Information)) )
583   {
584     GNUNET_break (0);
585     return;
586   }
587   p = GNUNET_malloc (sizeof (struct PendingMessage) +  msize);
588   p->size = msize;
589   p->is_init = GNUNET_NO;
590   m = (struct AddressUpdateMessage*) &p[1];
591   m->header.type = htons (GNUNET_MESSAGE_TYPE_ATS_ADDRESS_UPDATE);
592   m->header.size = htons (msize);
593   m->ats_count = htonl (ats_count);
594   m->peer = *peer;
595   m->address_length = htons (plugin_addr_len);
596   m->plugin_name_length = htons (namelen);
597   m->session_id = htonl (get_session_id (sh, session, peer));
598   am = (struct GNUNET_ATS_Information*) &m[1];
599   memcpy (am, ats, ats_count * sizeof (struct GNUNET_ATS_Information));
600   pm = (char *) &am[ats_count];
601   memcpy (pm, plugin_addr, plugin_addr_len);
602   memcpy (&pm[plugin_addr_len], plugin_name, namelen);
603   GNUNET_CONTAINER_DLL_insert_tail (sh->pending_head,
604                                     sh->pending_tail,
605                                     p);
606   do_transmit (sh);
607 }
608
609
610 /**
611  * A session got destroyed, stop including it as a valid address.
612  *
613  * @param sh handle
614  * @param peer identity of the peer
615  * @param plugin_name name of the transport plugin
616  * @param plugin_addr address  (if available)
617  * @param plugin_addr_len number of bytes in plugin_addr
618  * @param session session handle that is no longer valid
619  */
620 void
621 GNUNET_ATS_address_destroyed (struct GNUNET_ATS_SchedulingHandle *sh,
622                               const struct GNUNET_PeerIdentity *peer,
623                               const char *plugin_name,
624                               const void *plugin_addr, 
625                               size_t plugin_addr_len,
626                               struct Session *session)
627 {
628   struct PendingMessage *p;
629   struct AddressDestroyedMessage *m;
630   char *pm;
631   size_t namelen;
632   size_t msize;
633   uint32_t session_id;
634
635   namelen = (plugin_name == NULL) ? 0 : strlen (plugin_name) + 1;                                               
636   msize = sizeof (struct AddressDestroyedMessage) + plugin_addr_len + 
637     namelen;
638   if ( (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
639        (plugin_addr_len  >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
640        (namelen  >= GNUNET_SERVER_MAX_MESSAGE_SIZE) )
641   {
642     GNUNET_break (0);
643     return;
644   }
645   p = GNUNET_malloc (sizeof (struct PendingMessage) +  msize);
646   p->size = msize;
647   p->is_init = GNUNET_NO;
648   m = (struct AddressDestroyedMessage*) &p[1];
649   m->header.type = htons (GNUNET_MESSAGE_TYPE_ATS_ADDRESS_DESTROYED);
650   m->header.size = htons (msize);
651   m->reserved = htonl (0);
652   m->peer = *peer;
653   m->address_length = htons (plugin_addr_len);
654   m->plugin_name_length = htons (namelen);
655   m->session_id = htonl (session_id = get_session_id (sh, session, peer));
656   pm = (char *) &m[1];
657   memcpy (pm, plugin_addr, plugin_addr_len);
658   memcpy (&pm[plugin_addr_len], plugin_name, namelen);
659   GNUNET_CONTAINER_DLL_insert_tail (sh->pending_head,
660                                     sh->pending_tail,
661                                     p);
662   do_transmit (sh);
663   remove_session (sh, session_id, peer);
664 }
665
666 /* end of ats_api_scheduling.c */