removing definitively dead try_connect-related logic (#3675)
[oweals/gnunet.git] / src / transport / transport_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009-2013 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file transport/transport_api.c
23  * @brief library to access the low-level P2P IO service
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - test test test
28  */
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_arm_service.h"
33 #include "gnunet_hello_lib.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_transport_service.h"
36 #include "transport.h"
37
38 #define LOG(kind,...) GNUNET_log_from (kind, "transport-api",__VA_ARGS__)
39
40 /**
41  * If we could not send any payload to a peer for this amount of
42  * time, we print a warning.
43  */
44 #define UNREADY_WARN_TIME GNUNET_TIME_UNIT_MINUTES
45
46 /**
47  * How large to start with for the hashmap of neighbours.
48  */
49 #define STARTING_NEIGHBOURS_SIZE 16
50
51 /**
52  * Handle for a message that should be transmitted to the service.
53  * Used for both control messages and normal messages.
54  */
55 struct GNUNET_TRANSPORT_TransmitHandle
56 {
57
58   /**
59    * We keep all requests in a DLL.
60    */
61   struct GNUNET_TRANSPORT_TransmitHandle *next;
62
63   /**
64    * We keep all requests in a DLL.
65    */
66   struct GNUNET_TRANSPORT_TransmitHandle *prev;
67
68   /**
69    * Neighbour for this handle, NULL for control messages.
70    */
71   struct Neighbour *neighbour;
72
73   /**
74    * Function to call when @e notify_size bytes are available
75    * for transmission.
76    */
77   GNUNET_TRANSPORT_TransmitReadyNotify notify;
78
79   /**
80    * Closure for @e notify.
81    */
82   void *notify_cls;
83
84   /**
85    * Time at which this request was originally scheduled.
86    */
87   struct GNUNET_TIME_Absolute request_start;
88
89   /**
90    * Timeout for this request, 0 for control messages.
91    */
92   struct GNUNET_TIME_Absolute timeout;
93
94   /**
95    * Task to trigger request timeout if the request is stalled due to
96    * congestion.
97    */
98   struct GNUNET_SCHEDULER_Task *timeout_task;
99
100   /**
101    * How many bytes is our notify callback waiting for?
102    */
103   size_t notify_size;
104
105 };
106
107
108 /**
109  * Entry in hash table of all of our current (connected) neighbours.
110  */
111 struct Neighbour
112 {
113   /**
114    * Overall transport handle.
115    */
116   struct GNUNET_TRANSPORT_Handle *h;
117
118   /**
119    * Active transmit handle or NULL.
120    */
121   struct GNUNET_TRANSPORT_TransmitHandle *th;
122
123   /**
124    * Identity of this neighbour.
125    */
126   struct GNUNET_PeerIdentity id;
127
128   /**
129    * Outbound bandwidh tracker.
130    */
131   struct GNUNET_BANDWIDTH_Tracker out_tracker;
132
133   /**
134    * Entry in our readyness heap (which is sorted by @e next_ready
135    * value).  NULL if there is no pending transmission request for
136    * this neighbour or if we're waiting for @e is_ready to become
137    * true AFTER the @e out_tracker suggested that this peer's quota
138    * has been satisfied (so once @e is_ready goes to #GNUNET_YES,
139    * we should immediately go back into the heap).
140    */
141   struct GNUNET_CONTAINER_HeapNode *hn;
142
143   /**
144    * Last time when this peer received payload from us.
145    */
146   struct GNUNET_TIME_Absolute last_payload;
147
148   /**
149    * Task to trigger warnings if we do not get SEND_OK after a while.
150    */
151   struct GNUNET_SCHEDULER_Task *unready_warn_task;
152
153   /**
154    * Is this peer currently ready to receive a message?
155    */
156   int is_ready;
157
158   /**
159    * Sending consumed more bytes on wire than payload was announced
160    * This overhead is added to the delay of next sending operation
161    */
162   size_t traffic_overhead;
163 };
164
165
166 /**
167  * Linked list of functions to call whenever our HELLO is updated.
168  */
169 struct GNUNET_TRANSPORT_GetHelloHandle
170 {
171
172   /**
173    * This is a doubly linked list.
174    */
175   struct GNUNET_TRANSPORT_GetHelloHandle *next;
176
177   /**
178    * This is a doubly linked list.
179    */
180   struct GNUNET_TRANSPORT_GetHelloHandle *prev;
181
182   /**
183    * Transport handle.
184    */
185   struct GNUNET_TRANSPORT_Handle *handle;
186
187   /**
188    * Callback to call once we got our HELLO.
189    */
190   GNUNET_TRANSPORT_HelloUpdateCallback rec;
191
192   /**
193    * Task for calling the HelloUpdateCallback when we already have a HELLO
194    */
195   struct GNUNET_SCHEDULER_Task *notify_task;
196
197   /**
198    * Closure for @e rec.
199    */
200   void *rec_cls;
201
202 };
203
204
205 /**
206  * Entry in linked list for all offer-HELLO requests.
207  */
208 struct GNUNET_TRANSPORT_OfferHelloHandle
209 {
210   /**
211    * For the DLL.
212    */
213   struct GNUNET_TRANSPORT_OfferHelloHandle *prev;
214
215   /**
216    * For the DLL.
217    */
218   struct GNUNET_TRANSPORT_OfferHelloHandle *next;
219
220   /**
221    * Transport service handle we use for transmission.
222    */
223   struct GNUNET_TRANSPORT_Handle *th;
224
225   /**
226    * Transmission handle for this request.
227    */
228   struct GNUNET_TRANSPORT_TransmitHandle *tth;
229
230   /**
231    * Function to call once we are done.
232    */
233   GNUNET_SCHEDULER_TaskCallback cont;
234
235   /**
236    * Closure for @e cont
237    */
238   void *cls;
239
240   /**
241    * The HELLO message to be transmitted.
242    */
243   struct GNUNET_MessageHeader *msg;
244 };
245
246
247 /**
248  * Handle for the transport service (includes all of the
249  * state for the transport service).
250  */
251 struct GNUNET_TRANSPORT_Handle
252 {
253
254   /**
255    * Closure for the callbacks.
256    */
257   void *cls;
258
259   /**
260    * Function to call for received data.
261    */
262   GNUNET_TRANSPORT_ReceiveCallback rec;
263
264   /**
265    * function to call on connect events
266    */
267   GNUNET_TRANSPORT_NotifyConnect nc_cb;
268
269   /**
270    * function to call on disconnect events
271    */
272   GNUNET_TRANSPORT_NotifyDisconnect nd_cb;
273
274   /**
275    * function to call on excess bandwidth events
276    */
277   GNUNET_TRANSPORT_NotifyExcessBandwidth neb_cb;
278
279   /**
280    * Head of DLL of control messages.
281    */
282   struct GNUNET_TRANSPORT_TransmitHandle *control_head;
283
284   /**
285    * Tail of DLL of control messages.
286    */
287   struct GNUNET_TRANSPORT_TransmitHandle *control_tail;
288
289   /**
290    * The current HELLO message for this peer.  Updated
291    * whenever transports change their addresses.
292    */
293   struct GNUNET_MessageHeader *my_hello;
294
295   /**
296    * My client connection to the transport service.
297    */
298   struct GNUNET_CLIENT_Connection *client;
299
300   /**
301    * Handle to our registration with the client for notification.
302    */
303   struct GNUNET_CLIENT_TransmitHandle *cth;
304
305   /**
306    * Linked list of pending requests for our HELLO.
307    */
308   struct GNUNET_TRANSPORT_GetHelloHandle *hwl_head;
309
310   /**
311    * Linked list of pending requests for our HELLO.
312    */
313   struct GNUNET_TRANSPORT_GetHelloHandle *hwl_tail;
314
315   /**
316    * Linked list of pending offer HELLO requests head
317    */
318   struct GNUNET_TRANSPORT_OfferHelloHandle *oh_head;
319
320   /**
321    * Linked list of pending offer HELLO requests tail
322    */
323   struct GNUNET_TRANSPORT_OfferHelloHandle *oh_tail;
324
325   /**
326    * My configuration.
327    */
328   const struct GNUNET_CONFIGURATION_Handle *cfg;
329
330   /**
331    * Hash map of the current connected neighbours of this peer.
332    * Maps peer identities to `struct Neighbour` entries.
333    */
334   struct GNUNET_CONTAINER_MultiPeerMap *neighbours;
335
336   /**
337    * Heap sorting peers with pending messages by the timestamps that
338    * specify when we could next send a message to the respective peer.
339    * Excludes control messages (which can always go out immediately).
340    * Maps time stamps to `struct Neighbour` entries.
341    */
342   struct GNUNET_CONTAINER_Heap *ready_heap;
343
344   /**
345    * Peer identity as assumed by this process, or all zeros.
346    */
347   struct GNUNET_PeerIdentity self;
348
349   /**
350    * ID of the task trying to reconnect to the service.
351    */
352   struct GNUNET_SCHEDULER_Task *reconnect_task;
353
354   /**
355    * ID of the task trying to trigger transmission for a peer while
356    * maintaining bandwidth quotas.  In use if there are no control
357    * messages and the smallest entry in the @e ready_heap has a time
358    * stamp in the future.
359    */
360   struct GNUNET_SCHEDULER_Task *quota_task;
361
362   /**
363    * Delay until we try to reconnect.
364    */
365   struct GNUNET_TIME_Relative reconnect_delay;
366
367   /**
368    * Should we check that @e self matches what the service thinks?
369    * (if #GNUNET_NO, then @e self is all zeros!).
370    */
371   int check_self;
372
373   /**
374    * Reconnect in progress
375    */
376   int reconnecting;
377 };
378
379
380 /**
381  * Schedule the task to send one message, either from the control
382  * list or the peer message queues  to the service.
383  *
384  * @param h transport service to schedule a transmission for
385  */
386 static void
387 schedule_transmission (struct GNUNET_TRANSPORT_Handle *h);
388
389
390 /**
391  * Function that will schedule the job that will try
392  * to connect us again to the client.
393  *
394  * @param h transport service to reconnect
395  */
396 static void
397 disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h);
398
399
400 /**
401  * A neighbour has not gotten a SEND_OK in a  while. Print a warning.
402  *
403  * @param cls the `struct Neighbour`
404  * @param tc scheduler context
405  */
406 static void
407 do_warn_unready (void *cls,
408                  const struct GNUNET_SCHEDULER_TaskContext *tc)
409 {
410   struct Neighbour *n = cls;
411   struct GNUNET_TIME_Relative delay;
412
413   delay = GNUNET_TIME_absolute_get_duration (n->last_payload);
414   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
415               "Lacking SEND_OK, no payload could be send to %s for %s\n",
416               GNUNET_i2s (&n->id),
417               GNUNET_STRINGS_relative_time_to_string (delay,
418                                                       GNUNET_YES));
419   n->unready_warn_task
420     = GNUNET_SCHEDULER_add_delayed (UNREADY_WARN_TIME,
421                                     &do_warn_unready,
422                                     n);
423 }
424
425
426 /**
427  * Get the neighbour list entry for the given peer
428  *
429  * @param h our context
430  * @param peer peer to look up
431  * @return NULL if no such peer entry exists
432  */
433 static struct Neighbour *
434 neighbour_find (struct GNUNET_TRANSPORT_Handle *h,
435                 const struct GNUNET_PeerIdentity *peer)
436 {
437   return GNUNET_CONTAINER_multipeermap_get (h->neighbours,
438                                             peer);
439 }
440
441
442 /**
443  * The outbound quota has changed in a way that may require
444  * us to reset the timeout.  Update the timeout.
445  *
446  * @param cls the `struct Neighbour` for which the timeout changed
447  */
448 static void
449 outbound_bw_tracker_update (void *cls)
450 {
451   struct Neighbour *n = cls;
452   struct GNUNET_TIME_Relative delay;
453
454   if (NULL == n->hn)
455     return;
456   delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
457                                               n->th->notify_size + n->traffic_overhead);
458   LOG (GNUNET_ERROR_TYPE_DEBUG,
459        "New outbound delay %s us\n",
460        GNUNET_STRINGS_relative_time_to_string (delay,
461                                                GNUNET_NO));
462   GNUNET_CONTAINER_heap_update_cost (n->h->ready_heap,
463       n->hn, delay.rel_value_us);
464   schedule_transmission (n->h);
465 }
466
467
468 /**
469  * Function called by the bandwidth tracker if we have excess
470  * bandwidth.
471  *
472  * @param cls the `struct Neighbour` that has excess bandwidth
473  */
474 static void
475 notify_excess_cb (void *cls)
476 {
477   struct Neighbour *n = cls;
478   struct GNUNET_TRANSPORT_Handle *h = n->h;
479
480   if (NULL != h->neb_cb)
481     h->neb_cb (h->cls,
482                &n->id);
483 }
484
485
486 /**
487  * Add neighbour to our list
488  *
489  * @return NULL if this API is currently disconnecting from the service
490  */
491 static struct Neighbour *
492 neighbour_add (struct GNUNET_TRANSPORT_Handle *h,
493                const struct GNUNET_PeerIdentity *pid)
494 {
495   struct Neighbour *n;
496
497   LOG (GNUNET_ERROR_TYPE_DEBUG,
498        "Creating entry for neighbour `%s'.\n",
499        GNUNET_i2s (pid));
500   n = GNUNET_new (struct Neighbour);
501   n->id = *pid;
502   n->h = h;
503   n->is_ready = GNUNET_YES;
504   n->traffic_overhead = 0;
505   GNUNET_BANDWIDTH_tracker_init2 (&n->out_tracker,
506                                   &outbound_bw_tracker_update,
507                                   n,
508                                   GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
509                                   MAX_BANDWIDTH_CARRY_S,
510                                   &notify_excess_cb,
511                                   n);
512   GNUNET_assert (GNUNET_OK ==
513                  GNUNET_CONTAINER_multipeermap_put (h->neighbours,
514                                                     &n->id,
515                                                     n,
516                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
517   return n;
518 }
519
520
521 /**
522  * Iterator over hash map entries, for deleting state of a neighbour.
523  *
524  * @param cls the `struct GNUNET_TRANSPORT_Handle *`
525  * @param key peer identity
526  * @param value value in the hash map, the neighbour entry to delete
527  * @return #GNUNET_YES if we should continue to
528  *         iterate,
529  *         #GNUNET_NO if not.
530  */
531 static int
532 neighbour_delete (void *cls,
533                   const struct GNUNET_PeerIdentity *key,
534                   void *value)
535 {
536   struct GNUNET_TRANSPORT_Handle *handle = cls;
537   struct Neighbour *n = value;
538
539   LOG (GNUNET_ERROR_TYPE_DEBUG,
540        "Dropping entry for neighbour `%s'.\n",
541        GNUNET_i2s (key));
542   GNUNET_BANDWIDTH_tracker_notification_stop (&n->out_tracker);
543   if (NULL != handle->nd_cb)
544     handle->nd_cb (handle->cls,
545                    &n->id);
546   if (NULL != n->unready_warn_task)
547   {
548     GNUNET_SCHEDULER_cancel (n->unready_warn_task);
549     n->unready_warn_task = NULL;
550   }
551   GNUNET_assert (NULL == n->th);
552   GNUNET_assert (NULL == n->hn);
553   GNUNET_assert (GNUNET_YES ==
554                  GNUNET_CONTAINER_multipeermap_remove (handle->neighbours,
555                                                        key,
556                                                        n));
557   GNUNET_free (n);
558   return GNUNET_YES;
559 }
560
561
562 /**
563  * Function we use for handling incoming messages.
564  *
565  * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
566  * @param msg message received, NULL on timeout or fatal error
567  */
568 static void
569 demultiplexer (void *cls,
570                const struct GNUNET_MessageHeader *msg)
571 {
572   struct GNUNET_TRANSPORT_Handle *h = cls;
573   const struct DisconnectInfoMessage *dim;
574   const struct ConnectInfoMessage *cim;
575   const struct InboundMessage *im;
576   const struct GNUNET_MessageHeader *imm;
577   const struct SendOkMessage *okm;
578   const struct QuotaSetMessage *qm;
579   struct GNUNET_TRANSPORT_GetHelloHandle *hwl;
580   struct GNUNET_TRANSPORT_GetHelloHandle *next_hwl;
581   struct Neighbour *n;
582   struct GNUNET_PeerIdentity me;
583   uint16_t size;
584   uint32_t bytes_msg;
585   uint32_t bytes_physical;
586
587   GNUNET_assert (NULL != h->client);
588   if (GNUNET_YES == h->reconnecting)
589   {
590     return;
591   }
592   if (NULL == msg)
593   {
594     LOG (GNUNET_ERROR_TYPE_DEBUG,
595          "Error receiving from transport service, disconnecting temporarily.\n");
596     h->reconnecting = GNUNET_YES;
597     disconnect_and_schedule_reconnect (h);
598     return;
599   }
600   GNUNET_CLIENT_receive (h->client,
601                          &demultiplexer,
602                          h,
603                          GNUNET_TIME_UNIT_FOREVER_REL);
604   size = ntohs (msg->size);
605   switch (ntohs (msg->type))
606   {
607   case GNUNET_MESSAGE_TYPE_HELLO:
608     if (GNUNET_OK !=
609         GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg,
610                              &me))
611     {
612       GNUNET_break (0);
613       break;
614     }
615     LOG (GNUNET_ERROR_TYPE_DEBUG,
616          "Receiving (my own) HELLO message (%u bytes), I am `%s'.\n",
617          (unsigned int) size,
618          GNUNET_i2s (&me));
619     GNUNET_free_non_null (h->my_hello);
620     h->my_hello = NULL;
621     if (size < sizeof (struct GNUNET_MessageHeader))
622     {
623       GNUNET_break (0);
624       break;
625     }
626     h->my_hello = GNUNET_copy_message (msg);
627     hwl = h->hwl_head;
628     while (NULL != hwl)
629     {
630       next_hwl = hwl->next;
631       hwl->rec (hwl->rec_cls,
632                 h->my_hello);
633       hwl = next_hwl;
634     }
635     break;
636   case GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT:
637     if (size < sizeof (struct ConnectInfoMessage))
638     {
639       GNUNET_break (0);
640       h->reconnecting = GNUNET_YES;
641       disconnect_and_schedule_reconnect (h);
642       break;
643     }
644     cim = (const struct ConnectInfoMessage *) msg;
645     if (size !=
646         sizeof (struct ConnectInfoMessage))
647     {
648       GNUNET_break (0);
649       h->reconnecting = GNUNET_YES;
650       disconnect_and_schedule_reconnect (h);
651       break;
652     }
653     LOG (GNUNET_ERROR_TYPE_DEBUG,
654          "Receiving CONNECT message for `%s'.\n",
655          GNUNET_i2s (&cim->id));
656     n = neighbour_find (h, &cim->id);
657     if (NULL != n)
658     {
659       GNUNET_break (0);
660       h->reconnecting = GNUNET_YES;
661       disconnect_and_schedule_reconnect (h);
662       break;
663     }
664     n = neighbour_add (h,
665                        &cim->id);
666     LOG (GNUNET_ERROR_TYPE_DEBUG,
667          "Receiving CONNECT message for `%s' with quota %u\n",
668          GNUNET_i2s (&cim->id),
669          ntohl (cim->quota_out.value__));
670     GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
671                                            cim->quota_out);
672     if (NULL != h->nc_cb)
673       h->nc_cb (h->cls,
674                 &n->id);
675     break;
676   case GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT:
677     if (size != sizeof (struct DisconnectInfoMessage))
678     {
679       GNUNET_break (0);
680       h->reconnecting = GNUNET_YES;
681       disconnect_and_schedule_reconnect (h);
682       break;
683     }
684     dim = (const struct DisconnectInfoMessage *) msg;
685     GNUNET_break (ntohl (dim->reserved) == 0);
686     LOG (GNUNET_ERROR_TYPE_DEBUG,
687          "Receiving DISCONNECT message for `%s'.\n",
688          GNUNET_i2s (&dim->peer));
689     n = neighbour_find (h, &dim->peer);
690     if (NULL == n)
691     {
692       GNUNET_break (0);
693       h->reconnecting = GNUNET_YES;
694       disconnect_and_schedule_reconnect (h);
695       break;
696     }
697     neighbour_delete (h,
698                       &dim->peer,
699                       n);
700     break;
701   case GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK:
702     if (size != sizeof (struct SendOkMessage))
703     {
704       GNUNET_break (0);
705       h->reconnecting = GNUNET_YES;
706       disconnect_and_schedule_reconnect (h);
707       break;
708     }
709     okm = (const struct SendOkMessage *) msg;
710     bytes_msg = ntohl (okm->bytes_msg);
711     bytes_physical = ntohl (okm->bytes_physical);
712     LOG (GNUNET_ERROR_TYPE_DEBUG,
713          "Receiving SEND_OK message, transmission to %s %s.\n",
714          GNUNET_i2s (&okm->peer),
715          ntohl (okm->success) == GNUNET_OK ? "succeeded" : "failed");
716
717     n = neighbour_find (h,
718                         &okm->peer);
719     if (NULL == n)
720     {
721       /* We should never get a 'SEND_OK' for a peer that we are not
722          connected to */
723       GNUNET_break (0);
724       h->reconnecting = GNUNET_YES;
725       disconnect_and_schedule_reconnect (h);
726       break;
727     }
728     if (bytes_physical > bytes_msg)
729     {
730       LOG (GNUNET_ERROR_TYPE_DEBUG,
731            "Overhead for %u byte message was %u\n",
732            bytes_msg,
733            bytes_physical - bytes_msg);
734       n->traffic_overhead += bytes_physical - bytes_msg;
735     }
736     GNUNET_break (GNUNET_NO == n->is_ready);
737     n->is_ready = GNUNET_YES;
738     if (NULL != n->unready_warn_task)
739     {
740       GNUNET_SCHEDULER_cancel (n->unready_warn_task);
741       n->unready_warn_task = NULL;
742     }
743     if ((NULL != n->th) && (NULL == n->hn))
744     {
745       GNUNET_assert (NULL != n->th->timeout_task);
746       GNUNET_SCHEDULER_cancel (n->th->timeout_task);
747       n->th->timeout_task = NULL;
748       /* we've been waiting for this (congestion, not quota,
749        * caused delayed transmission) */
750       n->hn = GNUNET_CONTAINER_heap_insert (h->ready_heap,
751                                             n,
752                                             0);
753     }
754     schedule_transmission (h);
755     break;
756   case GNUNET_MESSAGE_TYPE_TRANSPORT_RECV:
757     if (size <
758         sizeof (struct InboundMessage) + sizeof (struct GNUNET_MessageHeader))
759     {
760       GNUNET_break (0);
761       h->reconnecting = GNUNET_YES;
762       disconnect_and_schedule_reconnect (h);
763       break;
764     }
765     im = (const struct InboundMessage *) msg;
766     imm = (const struct GNUNET_MessageHeader *) &im[1];
767     if (ntohs (imm->size) + sizeof (struct InboundMessage) != size)
768     {
769       GNUNET_break (0);
770       h->reconnecting = GNUNET_YES;
771       disconnect_and_schedule_reconnect (h);
772       break;
773     }
774     LOG (GNUNET_ERROR_TYPE_DEBUG,
775          "Received message of type %u with %u bytes from `%s'.\n",
776          (unsigned int) ntohs (imm->type),
777          (unsigned int) ntohs (imm->size),
778          GNUNET_i2s (&im->peer));
779     n = neighbour_find (h, &im->peer);
780     if (NULL == n)
781     {
782       GNUNET_break (0);
783       h->reconnecting = GNUNET_YES;
784       disconnect_and_schedule_reconnect (h);
785       break;
786     }
787     if (NULL != h->rec)
788       h->rec (h->cls,
789               &im->peer,
790               imm);
791     break;
792   case GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA:
793     if (size != sizeof (struct QuotaSetMessage))
794     {
795       GNUNET_break (0);
796       h->reconnecting = GNUNET_YES;
797       disconnect_and_schedule_reconnect (h);
798       break;
799     }
800     qm = (const struct QuotaSetMessage *) msg;
801     n = neighbour_find (h, &qm->peer);
802     if (NULL == n)
803     {
804       GNUNET_break (0);
805       h->reconnecting = GNUNET_YES;
806       disconnect_and_schedule_reconnect (h);
807       break;
808     }
809     LOG (GNUNET_ERROR_TYPE_DEBUG,
810          "Receiving SET_QUOTA message for `%s' with quota %u\n",
811          GNUNET_i2s (&qm->peer),
812          ntohl (qm->quota.value__));
813     GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
814                                            qm->quota);
815     break;
816   default:
817     LOG (GNUNET_ERROR_TYPE_ERROR,
818          _("Received unexpected message of type %u in %s:%u\n"),
819          ntohs (msg->type),
820          __FILE__,
821          __LINE__);
822     GNUNET_break (0);
823     break;
824   }
825 }
826
827
828 /**
829  * A transmission request could not be satisfied because of
830  * network congestion.  Notify the initiator and clean up.
831  *
832  * @param cls the `struct GNUNET_TRANSPORT_TransmitHandle`
833  * @param tc scheduler context
834  */
835 static void
836 timeout_request_due_to_congestion (void *cls,
837                                    const struct GNUNET_SCHEDULER_TaskContext *tc)
838 {
839   struct GNUNET_TRANSPORT_TransmitHandle *th = cls;
840   struct Neighbour *n = th->neighbour;
841   struct GNUNET_TIME_Relative delay;
842
843   n->th->timeout_task = NULL;
844   delay = GNUNET_TIME_absolute_get_duration (th->request_start);
845   LOG (GNUNET_ERROR_TYPE_WARNING,
846        "Discarding %u bytes of payload message after %s delay due to congestion\n",
847        th->notify_size,
848        GNUNET_STRINGS_relative_time_to_string (delay,
849                                                GNUNET_YES));
850   GNUNET_assert (th == n->th);
851   GNUNET_assert (NULL == n->hn);
852   n->th = NULL;
853   th->notify (th->notify_cls,
854               0,
855               NULL);
856   GNUNET_free (th);
857 }
858
859
860 /**
861  * Transmit message(s) to service.
862  *
863  * @param cls handle to transport
864  * @param size number of bytes available in @a buf
865  * @param buf where to copy the message
866  * @return number of bytes copied to @a buf
867  */
868 static size_t
869 transport_notify_ready (void *cls,
870                         size_t size,
871                         void *buf)
872 {
873   struct GNUNET_TRANSPORT_Handle *h = cls;
874   struct GNUNET_TRANSPORT_TransmitHandle *th;
875   struct GNUNET_TIME_Relative delay;
876   struct Neighbour *n;
877   char *cbuf;
878   struct OutboundMessage obm;
879   size_t ret;
880   size_t nret;
881   size_t mret;
882
883   GNUNET_assert (NULL != h->client);
884   h->cth = NULL;
885   if (NULL == buf)
886   {
887     /* transmission failed */
888     disconnect_and_schedule_reconnect (h);
889     return 0;
890   }
891
892   cbuf = buf;
893   ret = 0;
894   /* first send control messages */
895   while ( (NULL != (th = h->control_head)) &&
896           (th->notify_size <= size) )
897   {
898     GNUNET_CONTAINER_DLL_remove (h->control_head,
899                                  h->control_tail,
900                                  th);
901     nret = th->notify (th->notify_cls,
902                        size,
903                        &cbuf[ret]);
904     delay = GNUNET_TIME_absolute_get_duration (th->request_start);
905     if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
906       LOG (GNUNET_ERROR_TYPE_WARNING,
907            "Added %u bytes of control message at %u after %s delay\n",
908            nret,
909            ret,
910            GNUNET_STRINGS_relative_time_to_string (delay,
911                                                    GNUNET_YES));
912     else
913       LOG (GNUNET_ERROR_TYPE_DEBUG,
914            "Added %u bytes of control message at %u after %s delay\n",
915            nret,
916            ret,
917            GNUNET_STRINGS_relative_time_to_string (delay,
918                                                    GNUNET_YES));
919     GNUNET_free (th);
920     ret += nret;
921     size -= nret;
922   }
923
924   /* then, if possible and no control messages pending, send data messages */
925   while ( (NULL == h->control_head) &&
926           (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap))) )
927   {
928     if (GNUNET_YES != n->is_ready)
929     {
930       /* peer not ready, wait for notification! */
931       GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap));
932       n->hn = NULL;
933       GNUNET_assert (NULL == n->th->timeout_task);
934       n->th->timeout_task
935         = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining
936                                         (n->th->timeout),
937                                         &timeout_request_due_to_congestion,
938                                         n->th);
939       continue;
940     }
941     th = n->th;
942     if (th->notify_size + sizeof (struct OutboundMessage) > size)
943       break;                    /* does not fit */
944     if (GNUNET_BANDWIDTH_tracker_get_delay
945         (&n->out_tracker,
946          th->notify_size).rel_value_us > 0)
947       break;                    /* too early */
948     GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap));
949     n->hn = NULL;
950     n->th = NULL;
951     GNUNET_assert (size >= sizeof (struct OutboundMessage));
952     mret = th->notify (th->notify_cls,
953                        size - sizeof (struct OutboundMessage),
954                        &cbuf[ret + sizeof (struct OutboundMessage)]);
955     GNUNET_assert (mret <= size - sizeof (struct OutboundMessage));
956     if (0 == mret)
957     {
958       GNUNET_free (th);
959       continue;
960     }
961     if (NULL != n->unready_warn_task)
962       n->unready_warn_task
963         = GNUNET_SCHEDULER_add_delayed (UNREADY_WARN_TIME,
964                                         &do_warn_unready,
965                                         n);
966     n->last_payload = GNUNET_TIME_absolute_get ();
967     n->is_ready = GNUNET_NO;
968     GNUNET_assert (mret + sizeof (struct OutboundMessage) <
969                    GNUNET_SERVER_MAX_MESSAGE_SIZE);
970     obm.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND);
971     obm.header.size = htons (mret + sizeof (struct OutboundMessage));
972     obm.reserved = htonl (0);
973     obm.timeout =
974       GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining
975                                  (th->timeout));
976     obm.peer = n->id;
977     memcpy (&cbuf[ret],
978             &obm,
979             sizeof (struct OutboundMessage));
980     ret += (mret + sizeof (struct OutboundMessage));
981     size -= (mret + sizeof (struct OutboundMessage));
982     GNUNET_BANDWIDTH_tracker_consume (&n->out_tracker,
983                                       mret);
984     delay = GNUNET_TIME_absolute_get_duration (th->request_start);
985     if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
986       LOG (GNUNET_ERROR_TYPE_WARNING,
987            "Added %u bytes of payload message for %s after %s delay at %u b/s\n",
988            mret,
989            GNUNET_i2s (&n->id),
990            GNUNET_STRINGS_relative_time_to_string (delay,
991                                                    GNUNET_YES),
992            (unsigned int) n->out_tracker.available_bytes_per_s__);
993     else
994       LOG (GNUNET_ERROR_TYPE_DEBUG,
995            "Added %u bytes of payload message for %s after %s delay at %u b/s\n",
996            mret,
997            GNUNET_i2s (&n->id),
998            GNUNET_STRINGS_relative_time_to_string (delay,
999                                                    GNUNET_YES),
1000            (unsigned int) n->out_tracker.available_bytes_per_s__);
1001     GNUNET_free (th);
1002     break;
1003   }
1004   /* if there are more pending messages, try to schedule those */
1005   schedule_transmission (h);
1006   LOG (GNUNET_ERROR_TYPE_DEBUG,
1007        "Transmitting %u bytes to transport service\n",
1008        ret);
1009   return ret;
1010 }
1011
1012
1013 /**
1014  * Schedule the task to send one message, either from the control
1015  * list or the peer message queues  to the service.
1016  *
1017  * @param cls transport service to schedule a transmission for
1018  * @param tc scheduler context
1019  */
1020 static void
1021 schedule_transmission_task (void *cls,
1022                             const struct GNUNET_SCHEDULER_TaskContext *tc)
1023 {
1024   struct GNUNET_TRANSPORT_Handle *h = cls;
1025   size_t size;
1026   struct GNUNET_TRANSPORT_TransmitHandle *th;
1027   struct Neighbour *n;
1028
1029   h->quota_task = NULL;
1030   GNUNET_assert (NULL != h->client);
1031   /* destroy all requests that have timed out */
1032   while ( (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap))) &&
1033           (0 == GNUNET_TIME_absolute_get_remaining (n->th->timeout).rel_value_us) )
1034   {
1035     /* notify client that the request could not be satisfied within
1036      * the given time constraints */
1037     th = n->th;
1038     n->th = NULL;
1039     GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap));
1040     n->hn = NULL;
1041     LOG (GNUNET_ERROR_TYPE_DEBUG,
1042          "Signalling timeout for transmission to peer %s due to congestion\n",
1043          GNUNET_i2s (&n->id));
1044     GNUNET_assert (0 == th->notify (th->notify_cls,
1045                                     0,
1046                                     NULL));
1047     GNUNET_free (th);
1048   }
1049   if (NULL != h->cth)
1050     return;
1051   if (NULL != h->control_head)
1052   {
1053     size = h->control_head->notify_size;
1054   }
1055   else
1056   {
1057     n = GNUNET_CONTAINER_heap_peek (h->ready_heap);
1058     if (NULL == n)
1059       return;                   /* no pending messages */
1060     size = n->th->notify_size + sizeof (struct OutboundMessage);
1061   }
1062   LOG (GNUNET_ERROR_TYPE_DEBUG,
1063        "Calling notify_transmit_ready\n");
1064   h->cth
1065     = GNUNET_CLIENT_notify_transmit_ready (h->client,
1066                                            size,
1067                                            GNUNET_TIME_UNIT_FOREVER_REL,
1068                                            GNUNET_NO,
1069                                            &transport_notify_ready,
1070                                            h);
1071   GNUNET_assert (NULL != h->cth);
1072 }
1073
1074
1075 /**
1076  * Schedule the task to send one message, either from the control
1077  * list or the peer message queues  to the service.
1078  *
1079  * @param h transport service to schedule a transmission for
1080  */
1081 static void
1082 schedule_transmission (struct GNUNET_TRANSPORT_Handle *h)
1083 {
1084   struct GNUNET_TIME_Relative delay;
1085   struct Neighbour *n;
1086
1087   GNUNET_assert (NULL != h->client);
1088   if (NULL != h->quota_task)
1089   {
1090     GNUNET_SCHEDULER_cancel (h->quota_task);
1091     h->quota_task = NULL;
1092   }
1093   if (NULL != h->control_head)
1094     delay = GNUNET_TIME_UNIT_ZERO;
1095   else if (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap)))
1096   {
1097     delay =
1098         GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
1099                                             n->th->notify_size + n->traffic_overhead);
1100     n->traffic_overhead = 0;
1101   }
1102   else
1103   {
1104     LOG (GNUNET_ERROR_TYPE_DEBUG,
1105          "No work to be done, not scheduling transmission.\n");
1106     return;                     /* no work to be done */
1107   }
1108   LOG (GNUNET_ERROR_TYPE_DEBUG,
1109        "Scheduling next transmission to service in %s\n",
1110        GNUNET_STRINGS_relative_time_to_string (delay,
1111                                                GNUNET_YES));
1112   h->quota_task =
1113       GNUNET_SCHEDULER_add_delayed (delay,
1114                                     &schedule_transmission_task,
1115                                     h);
1116 }
1117
1118
1119 /**
1120  * Queue control request for transmission to the transport
1121  * service.
1122  *
1123  * @param h handle to the transport service
1124  * @param size number of bytes to be transmitted
1125  * @param notify function to call to get the content
1126  * @param notify_cls closure for @a notify
1127  * @return a `struct GNUNET_TRANSPORT_TransmitHandle`
1128  */
1129 static struct GNUNET_TRANSPORT_TransmitHandle *
1130 schedule_control_transmit (struct GNUNET_TRANSPORT_Handle *h,
1131                            size_t size,
1132                            GNUNET_TRANSPORT_TransmitReadyNotify notify,
1133                            void *notify_cls)
1134 {
1135   struct GNUNET_TRANSPORT_TransmitHandle *th;
1136
1137   LOG (GNUNET_ERROR_TYPE_DEBUG,
1138        "Control transmit of %u bytes requested\n",
1139        size);
1140   th = GNUNET_new (struct GNUNET_TRANSPORT_TransmitHandle);
1141   th->notify = notify;
1142   th->notify_cls = notify_cls;
1143   th->notify_size = size;
1144   th->request_start = GNUNET_TIME_absolute_get ();
1145   GNUNET_CONTAINER_DLL_insert_tail (h->control_head,
1146                                     h->control_tail,
1147                                     th);
1148   schedule_transmission (h);
1149   return th;
1150 }
1151
1152
1153 /**
1154  * Transmit START message to service.
1155  *
1156  * @param cls unused
1157  * @param size number of bytes available in @a buf
1158  * @param buf where to copy the message
1159  * @return number of bytes copied to @a buf
1160  */
1161 static size_t
1162 send_start (void *cls,
1163             size_t size,
1164             void *buf)
1165 {
1166   struct GNUNET_TRANSPORT_Handle *h = cls;
1167   struct StartMessage s;
1168   uint32_t options;
1169
1170   if (NULL == buf)
1171   {
1172     /* Can only be shutdown, just give up */
1173     LOG (GNUNET_ERROR_TYPE_DEBUG,
1174          "Shutdown while trying to transmit START request.\n");
1175     return 0;
1176   }
1177   LOG (GNUNET_ERROR_TYPE_DEBUG,
1178        "Transmitting START request.\n");
1179   GNUNET_assert (size >= sizeof (struct StartMessage));
1180   s.header.size = htons (sizeof (struct StartMessage));
1181   s.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_START);
1182   options = 0;
1183   if (h->check_self)
1184     options |= 1;
1185   if (NULL != h->rec)
1186     options |= 2;
1187   s.options = htonl (options);
1188   s.self = h->self;
1189   memcpy (buf, &s, sizeof (struct StartMessage));
1190   GNUNET_CLIENT_receive (h->client, &demultiplexer, h,
1191                          GNUNET_TIME_UNIT_FOREVER_REL);
1192   return sizeof (struct StartMessage);
1193 }
1194
1195
1196 /**
1197  * Try again to connect to transport service.
1198  *
1199  * @param cls the handle to the transport service
1200  * @param tc scheduler context
1201  */
1202 static void
1203 reconnect (void *cls,
1204            const struct GNUNET_SCHEDULER_TaskContext *tc)
1205 {
1206   struct GNUNET_TRANSPORT_Handle *h = cls;
1207
1208   h->reconnect_task = NULL;
1209   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1210   {
1211     /* shutdown, just give up */
1212     return;
1213   }
1214   LOG (GNUNET_ERROR_TYPE_DEBUG,
1215        "Connecting to transport service.\n");
1216   GNUNET_assert (NULL == h->client);
1217   GNUNET_assert (NULL == h->control_head);
1218   GNUNET_assert (NULL == h->control_tail);
1219   h->reconnecting = GNUNET_NO;
1220   h->client = GNUNET_CLIENT_connect ("transport", h->cfg);
1221
1222   GNUNET_assert (NULL != h->client);
1223   schedule_control_transmit (h, sizeof (struct StartMessage),
1224                              &send_start, h);
1225 }
1226
1227
1228 /**
1229  * Function that will schedule the job that will try
1230  * to connect us again to the client.
1231  *
1232  * @param h transport service to reconnect
1233  */
1234 static void
1235 disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h)
1236 {
1237   struct GNUNET_TRANSPORT_TransmitHandle *th;
1238
1239   GNUNET_assert (NULL == h->reconnect_task);
1240   if (NULL != h->cth)
1241   {
1242     GNUNET_CLIENT_notify_transmit_ready_cancel (h->cth);
1243     h->cth = NULL;
1244   }
1245   if (NULL != h->client)
1246   {
1247     GNUNET_CLIENT_disconnect (h->client);
1248     h->client = NULL;
1249 /*    LOG (GNUNET_ERROR_TYPE_ERROR,
1250          "Client disconnect done \n");*/
1251   }
1252   /* Forget about all neighbours that we used to be connected to */
1253   GNUNET_CONTAINER_multipeermap_iterate (h->neighbours,
1254                                          &neighbour_delete,
1255                                          h);
1256   if (NULL != h->quota_task)
1257   {
1258     GNUNET_SCHEDULER_cancel (h->quota_task);
1259     h->quota_task = NULL;
1260   }
1261   while ((NULL != (th = h->control_head)))
1262   {
1263     GNUNET_CONTAINER_DLL_remove (h->control_head,
1264                                  h->control_tail,
1265                                  th);
1266     th->notify (th->notify_cls,
1267                 0,
1268                 NULL);
1269     GNUNET_free (th);
1270   }
1271   LOG (GNUNET_ERROR_TYPE_DEBUG,
1272        "Scheduling task to reconnect to transport service in %s.\n",
1273        GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay,
1274                                                GNUNET_YES));
1275   h->reconnect_task =
1276       GNUNET_SCHEDULER_add_delayed (h->reconnect_delay,
1277                                     &reconnect,
1278                                     h);
1279   h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
1280 }
1281
1282
1283 /**
1284  * Cancel control request for transmission to the transport service.
1285  *
1286  * @param th handle to the transport service
1287  * @param tth transmit handle to cancel
1288  */
1289 static void
1290 cancel_control_transmit (struct GNUNET_TRANSPORT_Handle *th,
1291                          struct GNUNET_TRANSPORT_TransmitHandle *tth)
1292 {
1293   LOG (GNUNET_ERROR_TYPE_DEBUG,
1294        "Canceling transmit of contral transmission requested\n");
1295   GNUNET_CONTAINER_DLL_remove (th->control_head,
1296                                th->control_tail,
1297                                tth);
1298   GNUNET_free (tth);
1299 }
1300
1301
1302 /**
1303  * Send HELLO message to the service.
1304  *
1305  * @param cls the HELLO message to send
1306  * @param size number of bytes available in @a buf
1307  * @param buf where to copy the message
1308  * @return number of bytes copied to @a buf
1309  */
1310 static size_t
1311 send_hello (void *cls,
1312             size_t size,
1313             void *buf)
1314 {
1315   struct GNUNET_TRANSPORT_OfferHelloHandle *ohh = cls;
1316   struct GNUNET_MessageHeader *msg = ohh->msg;
1317   uint16_t ssize;
1318   struct GNUNET_SCHEDULER_TaskContext tc;
1319
1320   tc.read_ready = NULL;
1321   tc.write_ready = NULL;
1322   tc.reason = GNUNET_SCHEDULER_REASON_TIMEOUT;
1323   if (NULL == buf)
1324   {
1325     LOG (GNUNET_ERROR_TYPE_DEBUG,
1326          "Timeout while trying to transmit `%s' request.\n",
1327          "HELLO");
1328     if (NULL != ohh->cont)
1329       ohh->cont (ohh->cls,
1330                  &tc);
1331     GNUNET_free (msg);
1332     GNUNET_CONTAINER_DLL_remove (ohh->th->oh_head,
1333                                  ohh->th->oh_tail,
1334                                  ohh);
1335     GNUNET_free (ohh);
1336     return 0;
1337   }
1338   LOG (GNUNET_ERROR_TYPE_DEBUG,
1339        "Transmitting `%s' request.\n",
1340        "HELLO");
1341   ssize = ntohs (msg->size);
1342   GNUNET_assert (size >= ssize);
1343   memcpy (buf,
1344           msg,
1345           ssize);
1346   GNUNET_free (msg);
1347   tc.reason = GNUNET_SCHEDULER_REASON_READ_READY;
1348   if (NULL != ohh->cont)
1349     ohh->cont (ohh->cls,
1350                &tc);
1351   GNUNET_CONTAINER_DLL_remove (ohh->th->oh_head,
1352                                ohh->th->oh_tail,
1353                                ohh);
1354   GNUNET_free (ohh);
1355   return ssize;
1356 }
1357
1358
1359 /**
1360  * Send traffic metric message to the service.
1361  *
1362  * @param cls the message to send
1363  * @param size number of bytes available in @a buf
1364  * @param buf where to copy the message
1365  * @return number of bytes copied to @a buf
1366  */
1367 static size_t
1368 send_metric (void *cls,
1369              size_t size,
1370              void *buf)
1371 {
1372   struct TrafficMetricMessage *msg = cls;
1373   uint16_t ssize;
1374
1375   if (NULL == buf)
1376   {
1377     LOG (GNUNET_ERROR_TYPE_DEBUG,
1378          "Timeout while trying to transmit TRAFFIC_METRIC request.\n");
1379     GNUNET_free (msg);
1380     return 0;
1381   }
1382   LOG (GNUNET_ERROR_TYPE_DEBUG,
1383        "Transmitting TRAFFIC_METRIC request.\n");
1384   ssize = ntohs (msg->header.size);
1385   GNUNET_assert (size >= ssize);
1386   memcpy (buf, msg, ssize);
1387   GNUNET_free (msg);
1388   return ssize;
1389 }
1390
1391
1392 /**
1393  * Set transport metrics for a peer and a direction
1394  *
1395  * @param handle transport handle
1396  * @param peer the peer to set the metric for
1397  * @param prop the performance metrics to set
1398  * @param delay_in inbound delay to introduce
1399  * @param delay_out outbound delay to introduce
1400  *
1401  * Note: Delay restrictions in receiving direction will be enforced
1402  * with one message delay.
1403  */
1404 void
1405 GNUNET_TRANSPORT_set_traffic_metric (struct GNUNET_TRANSPORT_Handle *handle,
1406                                      const struct GNUNET_PeerIdentity *peer,
1407                                      const struct GNUNET_ATS_Properties *prop,
1408                                      struct GNUNET_TIME_Relative delay_in,
1409                                      struct GNUNET_TIME_Relative delay_out)
1410 {
1411   struct TrafficMetricMessage *msg;
1412
1413   msg = GNUNET_new (struct TrafficMetricMessage);
1414   msg->header.size = htons (sizeof (struct TrafficMetricMessage));
1415   msg->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_TRAFFIC_METRIC);
1416   msg->reserved = htonl (0);
1417   msg->peer = *peer;
1418   GNUNET_ATS_properties_hton (&msg->properties,
1419                               prop);
1420   msg->delay_in = GNUNET_TIME_relative_hton (delay_in);
1421   msg->delay_out = GNUNET_TIME_relative_hton (delay_out);
1422   schedule_control_transmit (handle,
1423                              sizeof (struct TrafficMetricMessage),
1424                              &send_metric,
1425                              msg);
1426 }
1427
1428
1429 /**
1430  * Offer the transport service the HELLO of another peer.  Note that
1431  * the transport service may just ignore this message if the HELLO is
1432  * malformed or useless due to our local configuration.
1433  *
1434  * @param handle connection to transport service
1435  * @param hello the hello message
1436  * @param cont continuation to call when HELLO has been sent,
1437  *      tc reason #GNUNET_SCHEDULER_REASON_TIMEOUT for fail
1438  *      tc reasong #GNUNET_SCHEDULER_REASON_READ_READY for success
1439  * @param cont_cls closure for @a cont
1440  * @return a `struct GNUNET_TRANSPORT_OfferHelloHandle` handle or NULL on failure,
1441  *      in case of failure @a cont will not be called
1442  *
1443  */
1444 struct GNUNET_TRANSPORT_OfferHelloHandle *
1445 GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle,
1446                               const struct GNUNET_MessageHeader *hello,
1447                               GNUNET_SCHEDULER_TaskCallback cont,
1448                               void *cont_cls)
1449 {
1450   struct GNUNET_TRANSPORT_OfferHelloHandle *ohh;
1451   struct GNUNET_MessageHeader *msg;
1452   struct GNUNET_PeerIdentity peer;
1453   uint16_t size;
1454
1455   if (NULL == handle->client)
1456     return NULL;
1457   GNUNET_break (ntohs (hello->type) == GNUNET_MESSAGE_TYPE_HELLO);
1458   size = ntohs (hello->size);
1459   GNUNET_break (size >= sizeof (struct GNUNET_MessageHeader));
1460   if (GNUNET_OK !=
1461       GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) hello,
1462                            &peer))
1463   {
1464     GNUNET_break (0);
1465     return NULL;
1466   }
1467
1468   msg = GNUNET_malloc (size);
1469   memcpy (msg, hello, size);
1470   LOG (GNUNET_ERROR_TYPE_DEBUG,
1471        "Offering HELLO message of `%s' to transport for validation.\n",
1472        GNUNET_i2s (&peer));
1473
1474   ohh = GNUNET_new (struct GNUNET_TRANSPORT_OfferHelloHandle);
1475   ohh->th = handle;
1476   ohh->cont = cont;
1477   ohh->cls = cont_cls;
1478   ohh->msg = msg;
1479   ohh->tth = schedule_control_transmit (handle,
1480                                         size,
1481                                         &send_hello,
1482                                         ohh);
1483   GNUNET_CONTAINER_DLL_insert (handle->oh_head,
1484                                handle->oh_tail,
1485                                ohh);
1486   return ohh;
1487 }
1488
1489
1490 /**
1491  * Cancel the request to transport to offer the HELLO message
1492  *
1493  * @param ohh the handle for the operation to cancel
1494  */
1495 void
1496 GNUNET_TRANSPORT_offer_hello_cancel (struct GNUNET_TRANSPORT_OfferHelloHandle *ohh)
1497 {
1498   struct GNUNET_TRANSPORT_Handle *th = ohh->th;
1499
1500   cancel_control_transmit (ohh->th, ohh->tth);
1501   GNUNET_CONTAINER_DLL_remove (th->oh_head,
1502                                th->oh_tail,
1503                                ohh);
1504   GNUNET_free (ohh->msg);
1505   GNUNET_free (ohh);
1506 }
1507
1508
1509 /**
1510  * Checks if a given peer is connected to us
1511  *
1512  * @param handle connection to transport service
1513  * @param peer the peer to check
1514  * @return #GNUNET_YES (connected) or #GNUNET_NO (disconnected)
1515  */
1516 int
1517 GNUNET_TRANSPORT_check_peer_connected (struct GNUNET_TRANSPORT_Handle *handle,
1518                                        const struct GNUNET_PeerIdentity *peer)
1519 {
1520   if (GNUNET_YES ==
1521       GNUNET_CONTAINER_multipeermap_contains (handle->neighbours,
1522                                               peer))
1523     return GNUNET_YES;
1524   return GNUNET_NO;
1525 }
1526
1527
1528 /**
1529  * Task to call the HelloUpdateCallback of the GetHelloHandle
1530  *
1531  * @param cls the `struct GNUNET_TRANSPORT_GetHelloHandle`
1532  * @param tc the scheduler task context
1533  */
1534 static void
1535 call_hello_update_cb_async (void *cls,
1536                             const struct GNUNET_SCHEDULER_TaskContext *tc)
1537 {
1538   struct GNUNET_TRANSPORT_GetHelloHandle *ghh = cls;
1539
1540   GNUNET_assert (NULL != ghh->handle->my_hello);
1541   GNUNET_assert (NULL != ghh->notify_task);
1542   ghh->notify_task = NULL;
1543   ghh->rec (ghh->rec_cls,
1544             ghh->handle->my_hello);
1545 }
1546
1547
1548 /**
1549  * Obtain the HELLO message for this peer.  The callback given in this function
1550  * is never called synchronously.
1551  *
1552  * @param handle connection to transport service
1553  * @param rec function to call with the HELLO, sender will be our peer
1554  *            identity; message and sender will be NULL on timeout
1555  *            (handshake with transport service pending/failed).
1556  *             cost estimate will be 0.
1557  * @param rec_cls closure for @a rec
1558  * @return handle to cancel the operation
1559  */
1560 struct GNUNET_TRANSPORT_GetHelloHandle *
1561 GNUNET_TRANSPORT_get_hello (struct GNUNET_TRANSPORT_Handle *handle,
1562                             GNUNET_TRANSPORT_HelloUpdateCallback rec,
1563                             void *rec_cls)
1564 {
1565   struct GNUNET_TRANSPORT_GetHelloHandle *hwl;
1566
1567   hwl = GNUNET_new (struct GNUNET_TRANSPORT_GetHelloHandle);
1568   hwl->rec = rec;
1569   hwl->rec_cls = rec_cls;
1570   hwl->handle = handle;
1571   GNUNET_CONTAINER_DLL_insert (handle->hwl_head,
1572                                handle->hwl_tail,
1573                                hwl);
1574   if (NULL != handle->my_hello)
1575     hwl->notify_task = GNUNET_SCHEDULER_add_now (&call_hello_update_cb_async,
1576                                                  hwl);
1577   return hwl;
1578 }
1579
1580
1581 /**
1582  * Stop receiving updates about changes to our HELLO message.
1583  *
1584  * @param ghh handle to cancel
1585  */
1586 void
1587 GNUNET_TRANSPORT_get_hello_cancel (struct GNUNET_TRANSPORT_GetHelloHandle *ghh)
1588 {
1589   struct GNUNET_TRANSPORT_Handle *handle = ghh->handle;
1590
1591   if (NULL != ghh->notify_task)
1592     GNUNET_SCHEDULER_cancel (ghh->notify_task);
1593   GNUNET_CONTAINER_DLL_remove (handle->hwl_head,
1594                                handle->hwl_tail,
1595                                ghh);
1596   GNUNET_free (ghh);
1597 }
1598
1599
1600 /**
1601  * Connect to the transport service.  Note that the connection may
1602  * complete (or fail) asynchronously.
1603  *
1604  * @param cfg configuration to use
1605  * @param self our own identity (API should check that it matches
1606  *             the identity found by transport), or NULL (no check)
1607  * @param cls closure for the callbacks
1608  * @param rec receive function to call
1609  * @param nc function to call on connect events
1610  * @param nd function to call on disconnect events
1611  * @return NULL on error
1612  */
1613 struct GNUNET_TRANSPORT_Handle *
1614 GNUNET_TRANSPORT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
1615                           const struct GNUNET_PeerIdentity *self,
1616                           void *cls,
1617                           GNUNET_TRANSPORT_ReceiveCallback rec,
1618                           GNUNET_TRANSPORT_NotifyConnect nc,
1619                           GNUNET_TRANSPORT_NotifyDisconnect nd)
1620 {
1621   return GNUNET_TRANSPORT_connect2 (cfg,
1622                                     self,
1623                                     cls,
1624                                     rec,
1625                                     nc,
1626                                     nd,
1627                                     NULL);
1628 }
1629
1630
1631 /**
1632  * Connect to the transport service.  Note that the connection may
1633  * complete (or fail) asynchronously.
1634  *
1635  * @param cfg configuration to use
1636  * @param self our own identity (API should check that it matches
1637  *             the identity found by transport), or NULL (no check)
1638  * @param cls closure for the callbacks
1639  * @param rec receive function to call
1640  * @param nc function to call on connect events
1641  * @param nd function to call on disconnect events
1642  * @param neb function to call if we have excess bandwidth to a peer
1643  * @return NULL on error
1644  */
1645 struct GNUNET_TRANSPORT_Handle *
1646 GNUNET_TRANSPORT_connect2 (const struct GNUNET_CONFIGURATION_Handle *cfg,
1647                            const struct GNUNET_PeerIdentity *self,
1648                            void *cls,
1649                            GNUNET_TRANSPORT_ReceiveCallback rec,
1650                            GNUNET_TRANSPORT_NotifyConnect nc,
1651                            GNUNET_TRANSPORT_NotifyDisconnect nd,
1652                            GNUNET_TRANSPORT_NotifyExcessBandwidth neb)
1653 {
1654   struct GNUNET_TRANSPORT_Handle *ret;
1655
1656   ret = GNUNET_new (struct GNUNET_TRANSPORT_Handle);
1657   if (NULL != self)
1658   {
1659     ret->self = *self;
1660     ret->check_self = GNUNET_YES;
1661   }
1662   ret->cfg = cfg;
1663   ret->cls = cls;
1664   ret->rec = rec;
1665   ret->nc_cb = nc;
1666   ret->nd_cb = nd;
1667   ret->neb_cb = neb;
1668   ret->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
1669   LOG (GNUNET_ERROR_TYPE_DEBUG,
1670        "Connecting to transport service.\n");
1671   ret->client = GNUNET_CLIENT_connect ("transport",
1672                                        cfg);
1673   if (NULL == ret->client)
1674   {
1675     GNUNET_free (ret);
1676     return NULL;
1677   }
1678   ret->neighbours =
1679     GNUNET_CONTAINER_multipeermap_create (STARTING_NEIGHBOURS_SIZE,
1680                                           GNUNET_YES);
1681   ret->ready_heap =
1682       GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1683   schedule_control_transmit (ret,
1684                              sizeof (struct StartMessage),
1685                              &send_start,
1686                              ret);
1687   return ret;
1688 }
1689
1690
1691 /**
1692  * Disconnect from the transport service.
1693  *
1694  * @param handle handle to the service as returned from #GNUNET_TRANSPORT_connect()
1695  */
1696 void
1697 GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle)
1698 {
1699   LOG (GNUNET_ERROR_TYPE_DEBUG,
1700        "Transport disconnect called!\n");
1701   /* this disconnects all neighbours... */
1702   if (NULL == handle->reconnect_task)
1703     disconnect_and_schedule_reconnect (handle);
1704   /* and now we stop trying to connect again... */
1705   if (NULL != handle->reconnect_task)
1706   {
1707     GNUNET_SCHEDULER_cancel (handle->reconnect_task);
1708     handle->reconnect_task = NULL;
1709   }
1710   GNUNET_CONTAINER_multipeermap_destroy (handle->neighbours);
1711   handle->neighbours = NULL;
1712   if (NULL != handle->quota_task)
1713   {
1714     GNUNET_SCHEDULER_cancel (handle->quota_task);
1715     handle->quota_task = NULL;
1716   }
1717   GNUNET_free_non_null (handle->my_hello);
1718   handle->my_hello = NULL;
1719   GNUNET_assert (NULL == handle->hwl_head);
1720   GNUNET_assert (NULL == handle->hwl_tail);
1721   GNUNET_CONTAINER_heap_destroy (handle->ready_heap);
1722   handle->ready_heap = NULL;
1723   GNUNET_free (handle);
1724 }
1725
1726
1727 /**
1728  * Check if we could queue a message of the given size for
1729  * transmission.  The transport service will take both its
1730  * internal buffers and bandwidth limits imposed by the
1731  * other peer into consideration when answering this query.
1732  *
1733  * @param handle connection to transport service
1734  * @param target who should receive the message
1735  * @param size how big is the message we want to transmit?
1736  * @param timeout after how long should we give up (and call
1737  *        notify with buf NULL and size 0)?
1738  * @param notify function to call when we are ready to
1739  *        send such a message
1740  * @param notify_cls closure for @a notify
1741  * @return NULL if someone else is already waiting to be notified
1742  *         non-NULL if the notify callback was queued (can be used to cancel
1743  *         using #GNUNET_TRANSPORT_notify_transmit_ready_cancel)
1744  */
1745 struct GNUNET_TRANSPORT_TransmitHandle *
1746 GNUNET_TRANSPORT_notify_transmit_ready (struct GNUNET_TRANSPORT_Handle *handle,
1747                                         const struct GNUNET_PeerIdentity *target,
1748                                         size_t size,
1749                                         struct GNUNET_TIME_Relative timeout,
1750                                         GNUNET_TRANSPORT_TransmitReadyNotify notify,
1751                                         void *notify_cls)
1752 {
1753   struct Neighbour *n;
1754   struct GNUNET_TRANSPORT_TransmitHandle *th;
1755   struct GNUNET_TIME_Relative delay;
1756
1757   n = neighbour_find (handle, target);
1758   if (NULL == n)
1759   {
1760     /* only use this function
1761      * once a connection has been established */
1762     GNUNET_assert (0);
1763     return NULL;
1764   }
1765   if (NULL != n->th)
1766   {
1767     /* attempt to send two messages at the same time to the same peer */
1768     GNUNET_assert (0);
1769     return NULL;
1770   }
1771   GNUNET_assert (NULL == n->hn);
1772   th = GNUNET_new (struct GNUNET_TRANSPORT_TransmitHandle);
1773   th->neighbour = n;
1774   th->notify = notify;
1775   th->notify_cls = notify_cls;
1776   th->request_start = GNUNET_TIME_absolute_get ();
1777   th->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1778   th->notify_size = size;
1779   n->th = th;
1780   /* calculate when our transmission should be ready */
1781   delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
1782                                               size + n->traffic_overhead);
1783   n->traffic_overhead = 0;
1784   if (delay.rel_value_us > timeout.rel_value_us)
1785     delay.rel_value_us = 0;        /* notify immediately (with failure) */
1786   if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
1787     LOG (GNUNET_ERROR_TYPE_WARNING,
1788          "At bandwidth %u byte/s next transmission to %s in %s\n",
1789          (unsigned int) n->out_tracker.available_bytes_per_s__,
1790          GNUNET_i2s (target),
1791          GNUNET_STRINGS_relative_time_to_string (delay,
1792                                                  GNUNET_YES));
1793   else
1794     LOG (GNUNET_ERROR_TYPE_DEBUG,
1795          "At bandwidth %u byte/s next transmission to %s in %s\n",
1796          (unsigned int) n->out_tracker.available_bytes_per_s__,
1797          GNUNET_i2s (target),
1798          GNUNET_STRINGS_relative_time_to_string (delay,
1799                                                  GNUNET_YES));
1800   n->hn = GNUNET_CONTAINER_heap_insert (handle->ready_heap,
1801                                         n,
1802                                         delay.rel_value_us);
1803   schedule_transmission (handle);
1804   return th;
1805 }
1806
1807
1808 /**
1809  * Cancel the specified transmission-ready notification.
1810  *
1811  * @param th handle returned from #GNUNET_TRANSPORT_notify_transmit_ready()
1812  */
1813 void
1814 GNUNET_TRANSPORT_notify_transmit_ready_cancel (struct GNUNET_TRANSPORT_TransmitHandle *th)
1815 {
1816   struct Neighbour *n;
1817
1818   GNUNET_assert (NULL == th->next);
1819   GNUNET_assert (NULL == th->prev);
1820   n = th->neighbour;
1821   GNUNET_assert (th == n->th);
1822   n->th = NULL;
1823   if (NULL != n->hn)
1824   {
1825     GNUNET_CONTAINER_heap_remove_node (n->hn);
1826     n->hn = NULL;
1827   }
1828   else
1829   {
1830     GNUNET_assert (NULL != th->timeout_task);
1831     GNUNET_SCHEDULER_cancel (th->timeout_task);
1832     th->timeout_task = NULL;
1833   }
1834   GNUNET_free (th);
1835 }
1836
1837
1838 /* end of transport_api.c */