never pass addresses for ourselves to ATS
[oweals/gnunet.git] / src / transport / transport_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009-2013 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file transport/transport_api.c
23  * @brief library to access the low-level P2P IO service
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - test test test
28  */
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_arm_service.h"
33 #include "gnunet_hello_lib.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_transport_service.h"
36 #include "transport.h"
37
38 #define LOG(kind,...) GNUNET_log_from (kind, "transport-api",__VA_ARGS__)
39
40 /**
41  * If we could not send any payload to a peer for this amount of
42  * time, we print a warning.
43  */
44 #define UNREADY_WARN_TIME GNUNET_TIME_UNIT_MINUTES
45
46 /**
47  * How large to start with for the hashmap of neighbours.
48  */
49 #define STARTING_NEIGHBOURS_SIZE 16
50
51 /**
52  * Handle for a message that should be transmitted to the service.
53  * Used for both control messages and normal messages.
54  */
55 struct GNUNET_TRANSPORT_TransmitHandle
56 {
57
58   /**
59    * We keep all requests in a DLL.
60    */
61   struct GNUNET_TRANSPORT_TransmitHandle *next;
62
63   /**
64    * We keep all requests in a DLL.
65    */
66   struct GNUNET_TRANSPORT_TransmitHandle *prev;
67
68   /**
69    * Neighbour for this handle, NULL for control messages.
70    */
71   struct Neighbour *neighbour;
72
73   /**
74    * Function to call when @e notify_size bytes are available
75    * for transmission.
76    */
77   GNUNET_TRANSPORT_TransmitReadyNotify notify;
78
79   /**
80    * Closure for @e notify.
81    */
82   void *notify_cls;
83
84   /**
85    * Time at which this request was originally scheduled.
86    */
87   struct GNUNET_TIME_Absolute request_start;
88
89   /**
90    * Timeout for this request, 0 for control messages.
91    */
92   struct GNUNET_TIME_Absolute timeout;
93
94   /**
95    * Task to trigger request timeout if the request is stalled due to
96    * congestion.
97    */
98   struct GNUNET_SCHEDULER_Task *timeout_task;
99
100   /**
101    * How many bytes is our notify callback waiting for?
102    */
103   size_t notify_size;
104
105 };
106
107
108 /**
109  * Entry in hash table of all of our current (connected) neighbours.
110  */
111 struct Neighbour
112 {
113   /**
114    * Overall transport handle.
115    */
116   struct GNUNET_TRANSPORT_Handle *h;
117
118   /**
119    * Active transmit handle or NULL.
120    */
121   struct GNUNET_TRANSPORT_TransmitHandle *th;
122
123   /**
124    * Identity of this neighbour.
125    */
126   struct GNUNET_PeerIdentity id;
127
128   /**
129    * Outbound bandwidh tracker.
130    */
131   struct GNUNET_BANDWIDTH_Tracker out_tracker;
132
133   /**
134    * Entry in our readyness heap (which is sorted by @e next_ready
135    * value).  NULL if there is no pending transmission request for
136    * this neighbour or if we're waiting for @e is_ready to become
137    * true AFTER the @e out_tracker suggested that this peer's quota
138    * has been satisfied (so once @e is_ready goes to #GNUNET_YES,
139    * we should immediately go back into the heap).
140    */
141   struct GNUNET_CONTAINER_HeapNode *hn;
142
143   /**
144    * Last time when this peer received payload from us.
145    */
146   struct GNUNET_TIME_Absolute last_payload;
147
148   /**
149    * Task to trigger warnings if we do not get SEND_OK after a while.
150    */
151   struct GNUNET_SCHEDULER_Task *unready_warn_task;
152
153   /**
154    * Is this peer currently ready to receive a message?
155    */
156   int is_ready;
157
158   /**
159    * Sending consumed more bytes on wire than payload was announced
160    * This overhead is added to the delay of next sending operation
161    */
162   size_t traffic_overhead;
163 };
164
165
166 /**
167  * Linked list of functions to call whenever our HELLO is updated.
168  */
169 struct GNUNET_TRANSPORT_GetHelloHandle
170 {
171
172   /**
173    * This is a doubly linked list.
174    */
175   struct GNUNET_TRANSPORT_GetHelloHandle *next;
176
177   /**
178    * This is a doubly linked list.
179    */
180   struct GNUNET_TRANSPORT_GetHelloHandle *prev;
181
182   /**
183    * Transport handle.
184    */
185   struct GNUNET_TRANSPORT_Handle *handle;
186
187   /**
188    * Callback to call once we got our HELLO.
189    */
190   GNUNET_TRANSPORT_HelloUpdateCallback rec;
191
192   /**
193    * Task for calling the HelloUpdateCallback when we already have a HELLO
194    */
195   struct GNUNET_SCHEDULER_Task *notify_task;
196
197   /**
198    * Closure for @e rec.
199    */
200   void *rec_cls;
201
202 };
203
204
205 /**
206  * Entry in linked list for a try-connect request.
207  */
208 struct GNUNET_TRANSPORT_TryConnectHandle
209 {
210   /**
211    * For the DLL.
212    */
213   struct GNUNET_TRANSPORT_TryConnectHandle *prev;
214
215   /**
216    * For the DLL.
217    */
218   struct GNUNET_TRANSPORT_TryConnectHandle *next;
219
220   /**
221    * Peer we should try to connect to.
222    */
223   struct GNUNET_PeerIdentity pid;
224
225   /**
226    * Transport service handle this request is part of.
227    */
228   struct GNUNET_TRANSPORT_Handle *th;
229
230   /**
231    * Message transmission request to communicate to service.
232    */
233   struct GNUNET_TRANSPORT_TransmitHandle *tth;
234
235   /**
236    * Function to call upon completion (of request transmission).
237    */
238   GNUNET_TRANSPORT_TryConnectCallback cb;
239
240   /**
241    * Closure for @e cb.
242    */
243   void *cb_cls;
244
245 };
246
247
248 /**
249  * Entry in linked list for all offer-HELLO requests.
250  */
251 struct GNUNET_TRANSPORT_OfferHelloHandle
252 {
253   /**
254    * For the DLL.
255    */
256   struct GNUNET_TRANSPORT_OfferHelloHandle *prev;
257
258   /**
259    * For the DLL.
260    */
261   struct GNUNET_TRANSPORT_OfferHelloHandle *next;
262
263   /**
264    * Transport service handle we use for transmission.
265    */
266   struct GNUNET_TRANSPORT_Handle *th;
267
268   /**
269    * Transmission handle for this request.
270    */
271   struct GNUNET_TRANSPORT_TransmitHandle *tth;
272
273   /**
274    * Function to call once we are done.
275    */
276   GNUNET_SCHEDULER_TaskCallback cont;
277
278   /**
279    * Closure for @e cont
280    */
281   void *cls;
282
283   /**
284    * The HELLO message to be transmitted.
285    */
286   struct GNUNET_MessageHeader *msg;
287 };
288
289
290 /**
291  * Handle for the transport service (includes all of the
292  * state for the transport service).
293  */
294 struct GNUNET_TRANSPORT_Handle
295 {
296
297   /**
298    * Closure for the callbacks.
299    */
300   void *cls;
301
302   /**
303    * Function to call for received data.
304    */
305   GNUNET_TRANSPORT_ReceiveCallback rec;
306
307   /**
308    * function to call on connect events
309    */
310   GNUNET_TRANSPORT_NotifyConnect nc_cb;
311
312   /**
313    * function to call on disconnect events
314    */
315   GNUNET_TRANSPORT_NotifyDisconnect nd_cb;
316
317   /**
318    * function to call on excess bandwidth events
319    */
320   GNUNET_TRANSPORT_NotifyExcessBandwidth neb_cb;
321
322   /**
323    * Head of DLL of control messages.
324    */
325   struct GNUNET_TRANSPORT_TransmitHandle *control_head;
326
327   /**
328    * Tail of DLL of control messages.
329    */
330   struct GNUNET_TRANSPORT_TransmitHandle *control_tail;
331
332   /**
333    * The current HELLO message for this peer.  Updated
334    * whenever transports change their addresses.
335    */
336   struct GNUNET_MessageHeader *my_hello;
337
338   /**
339    * My client connection to the transport service.
340    */
341   struct GNUNET_CLIENT_Connection *client;
342
343   /**
344    * Handle to our registration with the client for notification.
345    */
346   struct GNUNET_CLIENT_TransmitHandle *cth;
347
348   /**
349    * Linked list of pending requests for our HELLO.
350    */
351   struct GNUNET_TRANSPORT_GetHelloHandle *hwl_head;
352
353   /**
354    * Linked list of pending requests for our HELLO.
355    */
356   struct GNUNET_TRANSPORT_GetHelloHandle *hwl_tail;
357
358   /**
359    * Linked list of pending try connect requests head
360    */
361   struct GNUNET_TRANSPORT_TryConnectHandle *tc_head;
362
363   /**
364    * Linked list of pending try connect requests tail
365    */
366   struct GNUNET_TRANSPORT_TryConnectHandle *tc_tail;
367
368   /**
369    * Linked list of pending offer HELLO requests head
370    */
371   struct GNUNET_TRANSPORT_OfferHelloHandle *oh_head;
372
373   /**
374    * Linked list of pending offer HELLO requests tail
375    */
376   struct GNUNET_TRANSPORT_OfferHelloHandle *oh_tail;
377
378   /**
379    * My configuration.
380    */
381   const struct GNUNET_CONFIGURATION_Handle *cfg;
382
383   /**
384    * Hash map of the current connected neighbours of this peer.
385    * Maps peer identities to `struct Neighbour` entries.
386    */
387   struct GNUNET_CONTAINER_MultiPeerMap *neighbours;
388
389   /**
390    * Heap sorting peers with pending messages by the timestamps that
391    * specify when we could next send a message to the respective peer.
392    * Excludes control messages (which can always go out immediately).
393    * Maps time stamps to `struct Neighbour` entries.
394    */
395   struct GNUNET_CONTAINER_Heap *ready_heap;
396
397   /**
398    * Peer identity as assumed by this process, or all zeros.
399    */
400   struct GNUNET_PeerIdentity self;
401
402   /**
403    * ID of the task trying to reconnect to the service.
404    */
405   struct GNUNET_SCHEDULER_Task *reconnect_task;
406
407   /**
408    * ID of the task trying to trigger transmission for a peer while
409    * maintaining bandwidth quotas.  In use if there are no control
410    * messages and the smallest entry in the @e ready_heap has a time
411    * stamp in the future.
412    */
413   struct GNUNET_SCHEDULER_Task *quota_task;
414
415   /**
416    * Delay until we try to reconnect.
417    */
418   struct GNUNET_TIME_Relative reconnect_delay;
419
420   /**
421    * Should we check that @e self matches what the service thinks?
422    * (if #GNUNET_NO, then @e self is all zeros!).
423    */
424   int check_self;
425
426   /**
427    * Reconnect in progress
428    */
429   int reconnecting;
430 };
431
432
433 /**
434  * Schedule the task to send one message, either from the control
435  * list or the peer message queues  to the service.
436  *
437  * @param h transport service to schedule a transmission for
438  */
439 static void
440 schedule_transmission (struct GNUNET_TRANSPORT_Handle *h);
441
442
443 /**
444  * Function that will schedule the job that will try
445  * to connect us again to the client.
446  *
447  * @param h transport service to reconnect
448  */
449 static void
450 disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h);
451
452
453 /**
454  * A neighbour has not gotten a SEND_OK in a  while. Print a warning.
455  *
456  * @param cls the `struct Neighbour`
457  * @param tc scheduler context
458  */
459 static void
460 do_warn_unready (void *cls,
461                  const struct GNUNET_SCHEDULER_TaskContext *tc)
462 {
463   struct Neighbour *n = cls;
464   struct GNUNET_TIME_Relative delay;
465
466   delay = GNUNET_TIME_absolute_get_duration (n->last_payload);
467   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
468               "Lacking SEND_OK, no payload could be send to %s for %s\n",
469               GNUNET_i2s (&n->id),
470               GNUNET_STRINGS_relative_time_to_string (delay,
471                                                       GNUNET_YES));
472   n->unready_warn_task
473     = GNUNET_SCHEDULER_add_delayed (UNREADY_WARN_TIME,
474                                     &do_warn_unready,
475                                     n);
476 }
477
478
479 /**
480  * Get the neighbour list entry for the given peer
481  *
482  * @param h our context
483  * @param peer peer to look up
484  * @return NULL if no such peer entry exists
485  */
486 static struct Neighbour *
487 neighbour_find (struct GNUNET_TRANSPORT_Handle *h,
488                 const struct GNUNET_PeerIdentity *peer)
489 {
490   return GNUNET_CONTAINER_multipeermap_get (h->neighbours,
491                                             peer);
492 }
493
494
495 /**
496  * The outbound quota has changed in a way that may require
497  * us to reset the timeout.  Update the timeout.
498  *
499  * @param cls the `struct Neighbour` for which the timeout changed
500  */
501 static void
502 outbound_bw_tracker_update (void *cls)
503 {
504   struct Neighbour *n = cls;
505   struct GNUNET_TIME_Relative delay;
506
507   if (NULL == n->hn)
508     return;
509   delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
510                                               n->th->notify_size + n->traffic_overhead);
511   LOG (GNUNET_ERROR_TYPE_DEBUG,
512        "New outbound delay %s us\n",
513        GNUNET_STRINGS_relative_time_to_string (delay,
514                                                GNUNET_NO));
515   GNUNET_CONTAINER_heap_update_cost (n->h->ready_heap,
516       n->hn, delay.rel_value_us);
517   schedule_transmission (n->h);
518 }
519
520
521 /**
522  * Function called by the bandwidth tracker if we have excess
523  * bandwidth.
524  *
525  * @param cls the `struct Neighbour` that has excess bandwidth
526  */
527 static void
528 notify_excess_cb (void *cls)
529 {
530   struct Neighbour *n = cls;
531   struct GNUNET_TRANSPORT_Handle *h = n->h;
532
533   if (NULL != h->neb_cb)
534     h->neb_cb (h->cls,
535                &n->id);
536 }
537
538
539 /**
540  * Add neighbour to our list
541  *
542  * @return NULL if this API is currently disconnecting from the service
543  */
544 static struct Neighbour *
545 neighbour_add (struct GNUNET_TRANSPORT_Handle *h,
546                const struct GNUNET_PeerIdentity *pid)
547 {
548   struct Neighbour *n;
549
550   LOG (GNUNET_ERROR_TYPE_DEBUG,
551        "Creating entry for neighbour `%s'.\n",
552        GNUNET_i2s (pid));
553   n = GNUNET_new (struct Neighbour);
554   n->id = *pid;
555   n->h = h;
556   n->is_ready = GNUNET_YES;
557   n->traffic_overhead = 0;
558   GNUNET_BANDWIDTH_tracker_init2 (&n->out_tracker,
559                                   &outbound_bw_tracker_update,
560                                   n,
561                                   GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
562                                   MAX_BANDWIDTH_CARRY_S,
563                                   &notify_excess_cb,
564                                   n);
565   GNUNET_assert (GNUNET_OK ==
566                  GNUNET_CONTAINER_multipeermap_put (h->neighbours,
567                                                     &n->id,
568                                                     n,
569                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
570   return n;
571 }
572
573
574 /**
575  * Iterator over hash map entries, for deleting state of a neighbour.
576  *
577  * @param cls the `struct GNUNET_TRANSPORT_Handle *`
578  * @param key peer identity
579  * @param value value in the hash map, the neighbour entry to delete
580  * @return #GNUNET_YES if we should continue to
581  *         iterate,
582  *         #GNUNET_NO if not.
583  */
584 static int
585 neighbour_delete (void *cls,
586                   const struct GNUNET_PeerIdentity *key,
587                   void *value)
588 {
589   struct GNUNET_TRANSPORT_Handle *handle = cls;
590   struct Neighbour *n = value;
591
592   LOG (GNUNET_ERROR_TYPE_DEBUG,
593        "Dropping entry for neighbour `%s'.\n",
594        GNUNET_i2s (key));
595   GNUNET_BANDWIDTH_tracker_notification_stop (&n->out_tracker);
596   if (NULL != handle->nd_cb)
597     handle->nd_cb (handle->cls,
598                    &n->id);
599   if (NULL != n->unready_warn_task)
600   {
601     GNUNET_SCHEDULER_cancel (n->unready_warn_task);
602     n->unready_warn_task = NULL;
603   }
604   GNUNET_assert (NULL == n->th);
605   GNUNET_assert (NULL == n->hn);
606   GNUNET_assert (GNUNET_YES ==
607                  GNUNET_CONTAINER_multipeermap_remove (handle->neighbours,
608                                                        key,
609                                                        n));
610   GNUNET_free (n);
611   return GNUNET_YES;
612 }
613
614
615 /**
616  * Function we use for handling incoming messages.
617  *
618  * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
619  * @param msg message received, NULL on timeout or fatal error
620  */
621 static void
622 demultiplexer (void *cls,
623                const struct GNUNET_MessageHeader *msg)
624 {
625   struct GNUNET_TRANSPORT_Handle *h = cls;
626   const struct DisconnectInfoMessage *dim;
627   const struct ConnectInfoMessage *cim;
628   const struct InboundMessage *im;
629   const struct GNUNET_MessageHeader *imm;
630   const struct SendOkMessage *okm;
631   const struct QuotaSetMessage *qm;
632   struct GNUNET_TRANSPORT_GetHelloHandle *hwl;
633   struct GNUNET_TRANSPORT_GetHelloHandle *next_hwl;
634   struct Neighbour *n;
635   struct GNUNET_PeerIdentity me;
636   uint16_t size;
637   uint32_t bytes_msg;
638   uint32_t bytes_physical;
639
640   GNUNET_assert (NULL != h->client);
641   if (GNUNET_YES == h->reconnecting)
642   {
643     return;
644   }
645   if (NULL == msg)
646   {
647     LOG (GNUNET_ERROR_TYPE_DEBUG,
648          "Error receiving from transport service, disconnecting temporarily.\n");
649     h->reconnecting = GNUNET_YES;
650     disconnect_and_schedule_reconnect (h);
651     return;
652   }
653   GNUNET_CLIENT_receive (h->client,
654                          &demultiplexer,
655                          h,
656                          GNUNET_TIME_UNIT_FOREVER_REL);
657   size = ntohs (msg->size);
658   switch (ntohs (msg->type))
659   {
660   case GNUNET_MESSAGE_TYPE_HELLO:
661     if (GNUNET_OK !=
662         GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg,
663                              &me))
664     {
665       GNUNET_break (0);
666       break;
667     }
668     LOG (GNUNET_ERROR_TYPE_DEBUG,
669          "Receiving (my own) HELLO message (%u bytes), I am `%s'.\n",
670          (unsigned int) size,
671          GNUNET_i2s (&me));
672     GNUNET_free_non_null (h->my_hello);
673     h->my_hello = NULL;
674     if (size < sizeof (struct GNUNET_MessageHeader))
675     {
676       GNUNET_break (0);
677       break;
678     }
679     h->my_hello = GNUNET_copy_message (msg);
680     hwl = h->hwl_head;
681     while (NULL != hwl)
682     {
683       next_hwl = hwl->next;
684       hwl->rec (hwl->rec_cls,
685                 h->my_hello);
686       hwl = next_hwl;
687     }
688     break;
689   case GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT:
690     if (size < sizeof (struct ConnectInfoMessage))
691     {
692       GNUNET_break (0);
693       h->reconnecting = GNUNET_YES;
694       disconnect_and_schedule_reconnect (h);
695       break;
696     }
697     cim = (const struct ConnectInfoMessage *) msg;
698     if (size !=
699         sizeof (struct ConnectInfoMessage))
700     {
701       GNUNET_break (0);
702       h->reconnecting = GNUNET_YES;
703       disconnect_and_schedule_reconnect (h);
704       break;
705     }
706     LOG (GNUNET_ERROR_TYPE_DEBUG,
707          "Receiving CONNECT message for `%s'.\n",
708          GNUNET_i2s (&cim->id));
709     n = neighbour_find (h, &cim->id);
710     if (NULL != n)
711     {
712       GNUNET_break (0);
713       h->reconnecting = GNUNET_YES;
714       disconnect_and_schedule_reconnect (h);
715       break;
716     }
717     n = neighbour_add (h,
718                        &cim->id);
719     LOG (GNUNET_ERROR_TYPE_DEBUG,
720          "Receiving CONNECT message for `%s' with quota %u\n",
721          GNUNET_i2s (&cim->id),
722          ntohl (cim->quota_out.value__));
723     GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
724                                            cim->quota_out);
725     if (NULL != h->nc_cb)
726       h->nc_cb (h->cls,
727                 &n->id);
728     break;
729   case GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT:
730     if (size != sizeof (struct DisconnectInfoMessage))
731     {
732       GNUNET_break (0);
733       h->reconnecting = GNUNET_YES;
734       disconnect_and_schedule_reconnect (h);
735       break;
736     }
737     dim = (const struct DisconnectInfoMessage *) msg;
738     GNUNET_break (ntohl (dim->reserved) == 0);
739     LOG (GNUNET_ERROR_TYPE_DEBUG,
740          "Receiving DISCONNECT message for `%s'.\n",
741          GNUNET_i2s (&dim->peer));
742     n = neighbour_find (h, &dim->peer);
743     if (NULL == n)
744     {
745       GNUNET_break (0);
746       h->reconnecting = GNUNET_YES;
747       disconnect_and_schedule_reconnect (h);
748       break;
749     }
750     neighbour_delete (h,
751                       &dim->peer,
752                       n);
753     break;
754   case GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK:
755     if (size != sizeof (struct SendOkMessage))
756     {
757       GNUNET_break (0);
758       h->reconnecting = GNUNET_YES;
759       disconnect_and_schedule_reconnect (h);
760       break;
761     }
762     okm = (const struct SendOkMessage *) msg;
763     bytes_msg = ntohl (okm->bytes_msg);
764     bytes_physical = ntohl (okm->bytes_physical);
765     LOG (GNUNET_ERROR_TYPE_DEBUG,
766          "Receiving SEND_OK message, transmission %s.\n",
767          ntohl (okm->success) == GNUNET_OK ? "succeeded" : "failed");
768
769     n = neighbour_find (h,
770                         &okm->peer);
771     if (NULL == n)
772     {
773       /* We should never get a 'SEND_OK' for a peer that we are not
774          connected to */
775       GNUNET_break (0);
776       h->reconnecting = GNUNET_YES;
777       disconnect_and_schedule_reconnect (h);
778       break;
779     }
780     if (bytes_physical >= bytes_msg)
781     {
782       LOG (GNUNET_ERROR_TYPE_DEBUG,
783            "Overhead for %u byte message: %u\n",
784            bytes_msg,
785            bytes_physical - bytes_msg);
786       n->traffic_overhead += bytes_physical - bytes_msg;
787     }
788     GNUNET_break (GNUNET_NO == n->is_ready);
789     n->is_ready = GNUNET_YES;
790     if (NULL != n->unready_warn_task)
791     {
792       GNUNET_SCHEDULER_cancel (n->unready_warn_task);
793       n->unready_warn_task = NULL;
794     }
795     if ((NULL != n->th) && (NULL == n->hn))
796     {
797       GNUNET_assert (NULL != n->th->timeout_task);
798       GNUNET_SCHEDULER_cancel (n->th->timeout_task);
799       n->th->timeout_task = NULL;
800       /* we've been waiting for this (congestion, not quota,
801        * caused delayed transmission) */
802       n->hn = GNUNET_CONTAINER_heap_insert (h->ready_heap, n, 0);
803       schedule_transmission (h);
804     }
805     break;
806   case GNUNET_MESSAGE_TYPE_TRANSPORT_RECV:
807     if (size <
808         sizeof (struct InboundMessage) + sizeof (struct GNUNET_MessageHeader))
809     {
810       GNUNET_break (0);
811       h->reconnecting = GNUNET_YES;
812       disconnect_and_schedule_reconnect (h);
813       break;
814     }
815     im = (const struct InboundMessage *) msg;
816     imm = (const struct GNUNET_MessageHeader *) &im[1];
817     if (ntohs (imm->size) + sizeof (struct InboundMessage) != size)
818     {
819       GNUNET_break (0);
820       h->reconnecting = GNUNET_YES;
821       disconnect_and_schedule_reconnect (h);
822       break;
823     }
824     LOG (GNUNET_ERROR_TYPE_DEBUG,
825          "Received message of type %u from `%s'.\n",
826          ntohs (imm->type), GNUNET_i2s (&im->peer));
827     n = neighbour_find (h, &im->peer);
828     if (NULL == n)
829     {
830       GNUNET_break (0);
831       h->reconnecting = GNUNET_YES;
832       disconnect_and_schedule_reconnect (h);
833       break;
834     }
835     if (NULL != h->rec)
836       h->rec (h->cls,
837               &im->peer,
838               imm);
839     break;
840   case GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA:
841     if (size != sizeof (struct QuotaSetMessage))
842     {
843       GNUNET_break (0);
844       h->reconnecting = GNUNET_YES;
845       disconnect_and_schedule_reconnect (h);
846       break;
847     }
848     qm = (const struct QuotaSetMessage *) msg;
849     n = neighbour_find (h, &qm->peer);
850     if (NULL == n)
851     {
852       GNUNET_break (0);
853       h->reconnecting = GNUNET_YES;
854       disconnect_and_schedule_reconnect (h);
855       break;
856     }
857     LOG (GNUNET_ERROR_TYPE_DEBUG,
858          "Receiving SET_QUOTA message for `%s' with quota %u\n",
859          GNUNET_i2s (&qm->peer),
860          ntohl (qm->quota.value__));
861     GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
862                                            qm->quota);
863     break;
864   default:
865     LOG (GNUNET_ERROR_TYPE_ERROR,
866          _("Received unexpected message of type %u in %s:%u\n"),
867          ntohs (msg->type),
868          __FILE__,
869          __LINE__);
870     GNUNET_break (0);
871     break;
872   }
873 }
874
875
876 /**
877  * A transmission request could not be satisfied because of
878  * network congestion.  Notify the initiator and clean up.
879  *
880  * @param cls the `struct GNUNET_TRANSPORT_TransmitHandle`
881  * @param tc scheduler context
882  */
883 static void
884 timeout_request_due_to_congestion (void *cls,
885                                    const struct GNUNET_SCHEDULER_TaskContext *tc)
886 {
887   struct GNUNET_TRANSPORT_TransmitHandle *th = cls;
888   struct Neighbour *n = th->neighbour;
889   struct GNUNET_TIME_Relative delay;
890
891   n->th->timeout_task = NULL;
892   delay = GNUNET_TIME_absolute_get_duration (th->request_start);
893   LOG (GNUNET_ERROR_TYPE_WARNING,
894        "Discarding %u bytes of payload message after %s delay due to congestion\n",
895        th->notify_size,
896        GNUNET_STRINGS_relative_time_to_string (delay,
897                                                GNUNET_YES));
898   GNUNET_assert (th == n->th);
899   GNUNET_assert (NULL == n->hn);
900   n->th = NULL;
901   th->notify (th->notify_cls,
902               0,
903               NULL);
904   GNUNET_free (th);
905 }
906
907
908 /**
909  * Transmit message(s) to service.
910  *
911  * @param cls handle to transport
912  * @param size number of bytes available in @a buf
913  * @param buf where to copy the message
914  * @return number of bytes copied to @a buf
915  */
916 static size_t
917 transport_notify_ready (void *cls,
918                         size_t size,
919                         void *buf)
920 {
921   struct GNUNET_TRANSPORT_Handle *h = cls;
922   struct GNUNET_TRANSPORT_TransmitHandle *th;
923   struct GNUNET_TIME_Relative delay;
924   struct Neighbour *n;
925   char *cbuf;
926   struct OutboundMessage obm;
927   size_t ret;
928   size_t nret;
929   size_t mret;
930
931   GNUNET_assert (NULL != h->client);
932   h->cth = NULL;
933   if (NULL == buf)
934   {
935     /* transmission failed */
936     disconnect_and_schedule_reconnect (h);
937     return 0;
938   }
939
940   cbuf = buf;
941   ret = 0;
942   /* first send control messages */
943   while ( (NULL != (th = h->control_head)) &&
944           (th->notify_size <= size) )
945   {
946     GNUNET_CONTAINER_DLL_remove (h->control_head,
947                                  h->control_tail,
948                                  th);
949     nret = th->notify (th->notify_cls,
950                        size,
951                        &cbuf[ret]);
952     delay = GNUNET_TIME_absolute_get_duration (th->request_start);
953     if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
954       LOG (GNUNET_ERROR_TYPE_WARNING,
955            "Added %u bytes of control message at %u after %s delay\n",
956            nret,
957            ret,
958            GNUNET_STRINGS_relative_time_to_string (delay,
959                                                    GNUNET_YES));
960     else
961       LOG (GNUNET_ERROR_TYPE_DEBUG,
962            "Added %u bytes of control message at %u after %s delay\n",
963            nret,
964            ret,
965            GNUNET_STRINGS_relative_time_to_string (delay,
966                                                    GNUNET_YES));
967     GNUNET_free (th);
968     ret += nret;
969     size -= nret;
970   }
971
972   /* then, if possible and no control messages pending, send data messages */
973   while ( (NULL == h->control_head) &&
974           (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap))) )
975   {
976     if (GNUNET_YES != n->is_ready)
977     {
978       /* peer not ready, wait for notification! */
979       GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap));
980       n->hn = NULL;
981       GNUNET_assert (NULL == n->th->timeout_task);
982       n->th->timeout_task
983         = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining
984                                         (n->th->timeout),
985                                         &timeout_request_due_to_congestion,
986                                         n->th);
987       continue;
988     }
989     th = n->th;
990     if (th->notify_size + sizeof (struct OutboundMessage) > size)
991       break;                    /* does not fit */
992     if (GNUNET_BANDWIDTH_tracker_get_delay
993         (&n->out_tracker,
994          th->notify_size).rel_value_us > 0)
995       break;                    /* too early */
996     GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap));
997     n->hn = NULL;
998     n->th = NULL;
999     GNUNET_assert (size >= sizeof (struct OutboundMessage));
1000     mret = th->notify (th->notify_cls,
1001                        size - sizeof (struct OutboundMessage),
1002                        &cbuf[ret + sizeof (struct OutboundMessage)]);
1003     GNUNET_assert (mret <= size - sizeof (struct OutboundMessage));
1004     if (0 == mret)
1005     {
1006       GNUNET_free (th);
1007       continue;
1008     }
1009     if (NULL != n->unready_warn_task)
1010       n->unready_warn_task
1011         = GNUNET_SCHEDULER_add_delayed (UNREADY_WARN_TIME,
1012                                         &do_warn_unready,
1013                                         n);
1014     n->last_payload = GNUNET_TIME_absolute_get ();
1015     n->is_ready = GNUNET_NO;
1016     GNUNET_assert (mret + sizeof (struct OutboundMessage) <
1017                    GNUNET_SERVER_MAX_MESSAGE_SIZE);
1018     obm.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND);
1019     obm.header.size = htons (mret + sizeof (struct OutboundMessage));
1020     obm.reserved = htonl (0);
1021     obm.timeout =
1022       GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining
1023                                  (th->timeout));
1024     obm.peer = n->id;
1025     memcpy (&cbuf[ret],
1026             &obm,
1027             sizeof (struct OutboundMessage));
1028     ret += (mret + sizeof (struct OutboundMessage));
1029     size -= (mret + sizeof (struct OutboundMessage));
1030     GNUNET_BANDWIDTH_tracker_consume (&n->out_tracker,
1031                                       mret);
1032     delay = GNUNET_TIME_absolute_get_duration (th->request_start);
1033     if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
1034       LOG (GNUNET_ERROR_TYPE_WARNING,
1035            "Added %u bytes of payload message for %s after %s delay at %u b/s\n",
1036            mret,
1037            GNUNET_i2s (&n->id),
1038            GNUNET_STRINGS_relative_time_to_string (delay,
1039                                                    GNUNET_YES),
1040            (unsigned int) n->out_tracker.available_bytes_per_s__);
1041     else
1042       LOG (GNUNET_ERROR_TYPE_DEBUG,
1043            "Added %u bytes of payload message for %s after %s delay at %u b/s\n",
1044            mret,
1045            GNUNET_i2s (&n->id),
1046            GNUNET_STRINGS_relative_time_to_string (delay,
1047                                                    GNUNET_YES),
1048            (unsigned int) n->out_tracker.available_bytes_per_s__);
1049     GNUNET_free (th);
1050     break;
1051   }
1052   /* if there are more pending messages, try to schedule those */
1053   schedule_transmission (h);
1054   LOG (GNUNET_ERROR_TYPE_DEBUG,
1055        "Transmitting %u bytes to transport service\n",
1056        ret);
1057   return ret;
1058 }
1059
1060
1061 /**
1062  * Schedule the task to send one message, either from the control
1063  * list or the peer message queues  to the service.
1064  *
1065  * @param cls transport service to schedule a transmission for
1066  * @param tc scheduler context
1067  */
1068 static void
1069 schedule_transmission_task (void *cls,
1070                             const struct GNUNET_SCHEDULER_TaskContext *tc)
1071 {
1072   struct GNUNET_TRANSPORT_Handle *h = cls;
1073   size_t size;
1074   struct GNUNET_TRANSPORT_TransmitHandle *th;
1075   struct Neighbour *n;
1076
1077   h->quota_task = NULL;
1078   GNUNET_assert (NULL != h->client);
1079   /* destroy all requests that have timed out */
1080   while ( (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap))) &&
1081           (0 == GNUNET_TIME_absolute_get_remaining (n->th->timeout).rel_value_us) )
1082   {
1083     /* notify client that the request could not be satisfied within
1084      * the given time constraints */
1085     th = n->th;
1086     n->th = NULL;
1087     GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap));
1088     n->hn = NULL;
1089     LOG (GNUNET_ERROR_TYPE_DEBUG,
1090          "Signalling timeout for transmission to peer %s due to congestion\n",
1091          GNUNET_i2s (&n->id));
1092     GNUNET_assert (0 == th->notify (th->notify_cls,
1093                                     0,
1094                                     NULL));
1095     GNUNET_free (th);
1096   }
1097   if (NULL != h->cth)
1098     return;
1099   if (NULL != h->control_head)
1100   {
1101     size = h->control_head->notify_size;
1102   }
1103   else
1104   {
1105     n = GNUNET_CONTAINER_heap_peek (h->ready_heap);
1106     if (NULL == n)
1107       return;                   /* no pending messages */
1108     size = n->th->notify_size + sizeof (struct OutboundMessage);
1109   }
1110   LOG (GNUNET_ERROR_TYPE_DEBUG,
1111        "Calling notify_transmit_ready\n");
1112   h->cth
1113     = GNUNET_CLIENT_notify_transmit_ready (h->client,
1114                                            size,
1115                                            GNUNET_TIME_UNIT_FOREVER_REL,
1116                                            GNUNET_NO,
1117                                            &transport_notify_ready,
1118                                            h);
1119   GNUNET_assert (NULL != h->cth);
1120 }
1121
1122
1123 /**
1124  * Schedule the task to send one message, either from the control
1125  * list or the peer message queues  to the service.
1126  *
1127  * @param h transport service to schedule a transmission for
1128  */
1129 static void
1130 schedule_transmission (struct GNUNET_TRANSPORT_Handle *h)
1131 {
1132   struct GNUNET_TIME_Relative delay;
1133   struct Neighbour *n;
1134
1135   GNUNET_assert (NULL != h->client);
1136   if (NULL != h->quota_task)
1137   {
1138     GNUNET_SCHEDULER_cancel (h->quota_task);
1139     h->quota_task = NULL;
1140   }
1141   if (NULL != h->control_head)
1142     delay = GNUNET_TIME_UNIT_ZERO;
1143   else if (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap)))
1144   {
1145     delay =
1146         GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
1147                                             n->th->notify_size + n->traffic_overhead);
1148     n->traffic_overhead = 0;
1149   }
1150   else
1151     return;                     /* no work to be done */
1152   LOG (GNUNET_ERROR_TYPE_DEBUG,
1153        "Scheduling next transmission to service in %s\n",
1154        GNUNET_STRINGS_relative_time_to_string (delay,
1155                                                GNUNET_YES));
1156   h->quota_task =
1157       GNUNET_SCHEDULER_add_delayed (delay,
1158                                     &schedule_transmission_task,
1159                                     h);
1160 }
1161
1162
1163 /**
1164  * Queue control request for transmission to the transport
1165  * service.
1166  *
1167  * @param h handle to the transport service
1168  * @param size number of bytes to be transmitted
1169  * @param notify function to call to get the content
1170  * @param notify_cls closure for @a notify
1171  * @return a `struct GNUNET_TRANSPORT_TransmitHandle`
1172  */
1173 static struct GNUNET_TRANSPORT_TransmitHandle *
1174 schedule_control_transmit (struct GNUNET_TRANSPORT_Handle *h,
1175                            size_t size,
1176                            GNUNET_TRANSPORT_TransmitReadyNotify notify,
1177                            void *notify_cls)
1178 {
1179   struct GNUNET_TRANSPORT_TransmitHandle *th;
1180
1181   LOG (GNUNET_ERROR_TYPE_DEBUG,
1182        "Control transmit of %u bytes requested\n",
1183        size);
1184   th = GNUNET_new (struct GNUNET_TRANSPORT_TransmitHandle);
1185   th->notify = notify;
1186   th->notify_cls = notify_cls;
1187   th->notify_size = size;
1188   th->request_start = GNUNET_TIME_absolute_get ();
1189   GNUNET_CONTAINER_DLL_insert_tail (h->control_head,
1190                                     h->control_tail,
1191                                     th);
1192   schedule_transmission (h);
1193   return th;
1194 }
1195
1196
1197 /**
1198  * Transmit START message to service.
1199  *
1200  * @param cls unused
1201  * @param size number of bytes available in @a buf
1202  * @param buf where to copy the message
1203  * @return number of bytes copied to @a buf
1204  */
1205 static size_t
1206 send_start (void *cls,
1207             size_t size,
1208             void *buf)
1209 {
1210   struct GNUNET_TRANSPORT_Handle *h = cls;
1211   struct StartMessage s;
1212   uint32_t options;
1213
1214   if (NULL == buf)
1215   {
1216     /* Can only be shutdown, just give up */
1217     LOG (GNUNET_ERROR_TYPE_DEBUG,
1218          "Shutdown while trying to transmit START request.\n");
1219     return 0;
1220   }
1221   LOG (GNUNET_ERROR_TYPE_DEBUG,
1222        "Transmitting START request.\n");
1223   GNUNET_assert (size >= sizeof (struct StartMessage));
1224   s.header.size = htons (sizeof (struct StartMessage));
1225   s.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_START);
1226   options = 0;
1227   if (h->check_self)
1228     options |= 1;
1229   if (NULL != h->rec)
1230     options |= 2;
1231   s.options = htonl (options);
1232   s.self = h->self;
1233   memcpy (buf, &s, sizeof (struct StartMessage));
1234   GNUNET_CLIENT_receive (h->client, &demultiplexer, h,
1235                          GNUNET_TIME_UNIT_FOREVER_REL);
1236   return sizeof (struct StartMessage);
1237 }
1238
1239
1240 /**
1241  * Try again to connect to transport service.
1242  *
1243  * @param cls the handle to the transport service
1244  * @param tc scheduler context
1245  */
1246 static void
1247 reconnect (void *cls,
1248            const struct GNUNET_SCHEDULER_TaskContext *tc)
1249 {
1250   struct GNUNET_TRANSPORT_Handle *h = cls;
1251
1252   h->reconnect_task = NULL;
1253   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1254   {
1255     /* shutdown, just give up */
1256     return;
1257   }
1258   LOG (GNUNET_ERROR_TYPE_DEBUG,
1259        "Connecting to transport service.\n");
1260   GNUNET_assert (NULL == h->client);
1261   GNUNET_assert (NULL == h->control_head);
1262   GNUNET_assert (NULL == h->control_tail);
1263   h->reconnecting = GNUNET_NO;
1264   h->client = GNUNET_CLIENT_connect ("transport", h->cfg);
1265
1266   GNUNET_assert (NULL != h->client);
1267   schedule_control_transmit (h, sizeof (struct StartMessage),
1268                              &send_start, h);
1269 }
1270
1271
1272 /**
1273  * Function that will schedule the job that will try
1274  * to connect us again to the client.
1275  *
1276  * @param h transport service to reconnect
1277  */
1278 static void
1279 disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h)
1280 {
1281   struct GNUNET_TRANSPORT_TransmitHandle *th;
1282
1283   GNUNET_assert (NULL == h->reconnect_task);
1284   if (NULL != h->cth)
1285   {
1286     GNUNET_CLIENT_notify_transmit_ready_cancel (h->cth);
1287     h->cth = NULL;
1288   }
1289   if (NULL != h->client)
1290   {
1291     GNUNET_CLIENT_disconnect (h->client);
1292     h->client = NULL;
1293 /*    LOG (GNUNET_ERROR_TYPE_ERROR,
1294          "Client disconnect done \n");*/
1295   }
1296   /* Forget about all neighbours that we used to be connected to */
1297   GNUNET_CONTAINER_multipeermap_iterate (h->neighbours,
1298                                          &neighbour_delete,
1299                                          h);
1300   if (NULL != h->quota_task)
1301   {
1302     GNUNET_SCHEDULER_cancel (h->quota_task);
1303     h->quota_task = NULL;
1304   }
1305   while ((NULL != (th = h->control_head)))
1306   {
1307     GNUNET_CONTAINER_DLL_remove (h->control_head,
1308                                  h->control_tail,
1309                                  th);
1310     th->notify (th->notify_cls,
1311                 0,
1312                 NULL);
1313     GNUNET_free (th);
1314   }
1315   LOG (GNUNET_ERROR_TYPE_DEBUG,
1316        "Scheduling task to reconnect to transport service in %s.\n",
1317        GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay,
1318                                                GNUNET_YES));
1319   h->reconnect_task =
1320       GNUNET_SCHEDULER_add_delayed (h->reconnect_delay,
1321                                     &reconnect,
1322                                     h);
1323   h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
1324 }
1325
1326
1327 /**
1328  * Cancel control request for transmission to the transport service.
1329  *
1330  * @param th handle to the transport service
1331  * @param tth transmit handle to cancel
1332  */
1333 static void
1334 cancel_control_transmit (struct GNUNET_TRANSPORT_Handle *th,
1335                          struct GNUNET_TRANSPORT_TransmitHandle *tth)
1336 {
1337   LOG (GNUNET_ERROR_TYPE_DEBUG,
1338        "Canceling transmit of contral transmission requested\n");
1339   GNUNET_CONTAINER_DLL_remove (th->control_head,
1340                                th->control_tail,
1341                                tth);
1342   GNUNET_free (tth);
1343 }
1344
1345
1346 /**
1347  * Send #GNUNET_MESSAGE_TYPE_TRANSPORT_REQUEST_CONNECT message to the
1348  * service.
1349  *
1350  * @param cls the `struct GNUNET_TRANSPORT_TryConnectHandle`
1351  * @param size number of bytes available in @a buf
1352  * @param buf where to copy the message
1353  * @return number of bytes copied to @a buf
1354  */
1355 static size_t
1356 send_try_connect (void *cls,
1357                   size_t size,
1358                   void *buf)
1359 {
1360   struct GNUNET_TRANSPORT_TryConnectHandle *tch = cls;
1361   struct TransportRequestConnectMessage msg;
1362
1363   tch->tth = NULL;
1364   if (NULL == buf)
1365   {
1366     LOG (GNUNET_ERROR_TYPE_DEBUG,
1367          "Discarding  `%s' request to `%s' due to error in transport service connection.\n",
1368          "REQUEST_CONNECT",
1369          GNUNET_i2s (&tch->pid));
1370     if (NULL != tch->cb)
1371       tch->cb (tch->cb_cls,
1372                GNUNET_SYSERR);
1373     GNUNET_TRANSPORT_try_connect_cancel (tch);
1374     return 0;
1375   }
1376   LOG (GNUNET_ERROR_TYPE_DEBUG,
1377        "Transmitting `%s' request with respect to `%s'.\n",
1378        "REQUEST_CONNECT",
1379        GNUNET_i2s (&tch->pid));
1380   GNUNET_assert (size >= sizeof (struct TransportRequestConnectMessage));
1381   msg.header.size = htons (sizeof (struct TransportRequestConnectMessage));
1382   msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_REQUEST_CONNECT);
1383   msg.reserved = htonl (0);
1384   msg.peer = tch->pid;
1385   memcpy (buf, &msg, sizeof (msg));
1386   if (NULL != tch->cb)
1387     tch->cb (tch->cb_cls, GNUNET_OK);
1388   GNUNET_TRANSPORT_try_connect_cancel (tch);
1389   return sizeof (struct TransportRequestConnectMessage);
1390 }
1391
1392
1393 /**
1394  * Ask the transport service to establish a connection to
1395  * the given peer.
1396  *
1397  * @param handle connection to transport service
1398  * @param target who we should try to connect to
1399  * @param cb callback to be called when request was transmitted to transport
1400  *         service
1401  * @param cb_cls closure for the callback
1402  * @return a `struct GNUNET_TRANSPORT_TryConnectHandle` handle or
1403  *         NULL on failure (cb will not be called)
1404  */
1405 struct GNUNET_TRANSPORT_TryConnectHandle *
1406 GNUNET_TRANSPORT_try_connect (struct GNUNET_TRANSPORT_Handle *handle,
1407                               const struct GNUNET_PeerIdentity *target,
1408                               GNUNET_TRANSPORT_TryConnectCallback cb,
1409                               void *cb_cls)
1410 {
1411   struct GNUNET_TRANSPORT_TryConnectHandle *tch;
1412
1413   if (NULL == handle->client)
1414     return NULL;
1415   tch = GNUNET_new (struct GNUNET_TRANSPORT_TryConnectHandle);
1416   tch->th = handle;
1417   tch->pid = *target;
1418   tch->cb = cb;
1419   tch->cb_cls = cb_cls;
1420   tch->tth = schedule_control_transmit (handle,
1421                                         sizeof (struct TransportRequestConnectMessage),
1422                                         &send_try_connect, tch);
1423   GNUNET_CONTAINER_DLL_insert (handle->tc_head,
1424                                handle->tc_tail,
1425                                tch);
1426   return tch;
1427 }
1428
1429
1430 /**
1431  * Cancel the request to transport to try a connect
1432  * Callback will not be called
1433  *
1434  * @param tch the handle to cancel
1435  */
1436 void
1437 GNUNET_TRANSPORT_try_connect_cancel (struct GNUNET_TRANSPORT_TryConnectHandle *tch)
1438 {
1439   struct GNUNET_TRANSPORT_Handle *th;
1440
1441   th = tch->th;
1442   if (NULL != tch->tth)
1443     cancel_control_transmit (th, tch->tth);
1444   GNUNET_CONTAINER_DLL_remove (th->tc_head,
1445                                th->tc_tail,
1446                                tch);
1447   GNUNET_free (tch);
1448 }
1449
1450
1451 /**
1452  * Send HELLO message to the service.
1453  *
1454  * @param cls the HELLO message to send
1455  * @param size number of bytes available in @a buf
1456  * @param buf where to copy the message
1457  * @return number of bytes copied to @a buf
1458  */
1459 static size_t
1460 send_hello (void *cls,
1461             size_t size,
1462             void *buf)
1463 {
1464   struct GNUNET_TRANSPORT_OfferHelloHandle *ohh = cls;
1465   struct GNUNET_MessageHeader *msg = ohh->msg;
1466   uint16_t ssize;
1467   struct GNUNET_SCHEDULER_TaskContext tc;
1468
1469   tc.read_ready = NULL;
1470   tc.write_ready = NULL;
1471   tc.reason = GNUNET_SCHEDULER_REASON_TIMEOUT;
1472   if (NULL == buf)
1473   {
1474     LOG (GNUNET_ERROR_TYPE_DEBUG,
1475          "Timeout while trying to transmit `%s' request.\n",
1476          "HELLO");
1477     if (NULL != ohh->cont)
1478       ohh->cont (ohh->cls,
1479                  &tc);
1480     GNUNET_free (msg);
1481     GNUNET_CONTAINER_DLL_remove (ohh->th->oh_head,
1482                                  ohh->th->oh_tail,
1483                                  ohh);
1484     GNUNET_free (ohh);
1485     return 0;
1486   }
1487   LOG (GNUNET_ERROR_TYPE_DEBUG,
1488        "Transmitting `%s' request.\n",
1489        "HELLO");
1490   ssize = ntohs (msg->size);
1491   GNUNET_assert (size >= ssize);
1492   memcpy (buf,
1493           msg,
1494           ssize);
1495   GNUNET_free (msg);
1496   tc.reason = GNUNET_SCHEDULER_REASON_READ_READY;
1497   if (NULL != ohh->cont)
1498     ohh->cont (ohh->cls,
1499                &tc);
1500   GNUNET_CONTAINER_DLL_remove (ohh->th->oh_head,
1501                                ohh->th->oh_tail,
1502                                ohh);
1503   GNUNET_free (ohh);
1504   return ssize;
1505 }
1506
1507
1508 /**
1509  * Send traffic metric message to the service.
1510  *
1511  * @param cls the message to send
1512  * @param size number of bytes available in @a buf
1513  * @param buf where to copy the message
1514  * @return number of bytes copied to @a buf
1515  */
1516 static size_t
1517 send_metric (void *cls,
1518              size_t size,
1519              void *buf)
1520 {
1521   struct TrafficMetricMessage *msg = cls;
1522   uint16_t ssize;
1523
1524   if (NULL == buf)
1525   {
1526     LOG (GNUNET_ERROR_TYPE_DEBUG,
1527          "Timeout while trying to transmit TRAFFIC_METRIC request.\n");
1528     GNUNET_free (msg);
1529     return 0;
1530   }
1531   LOG (GNUNET_ERROR_TYPE_DEBUG,
1532        "Transmitting TRAFFIC_METRIC request.\n");
1533   ssize = ntohs (msg->header.size);
1534   GNUNET_assert (size >= ssize);
1535   memcpy (buf, msg, ssize);
1536   GNUNET_free (msg);
1537   return ssize;
1538 }
1539
1540
1541 /**
1542  * Set transport metrics for a peer and a direction
1543  *
1544  * @param handle transport handle
1545  * @param peer the peer to set the metric for
1546  * @param prop the performance metrics to set
1547  * @param delay_in inbound delay to introduce
1548  * @param delay_out outbound delay to introduce
1549  *
1550  * Note: Delay restrictions in receiving direction will be enforced
1551  * with one message delay.
1552  */
1553 void
1554 GNUNET_TRANSPORT_set_traffic_metric (struct GNUNET_TRANSPORT_Handle *handle,
1555                                      const struct GNUNET_PeerIdentity *peer,
1556                                      const struct GNUNET_ATS_Properties *prop,
1557                                      struct GNUNET_TIME_Relative delay_in,
1558                                      struct GNUNET_TIME_Relative delay_out)
1559 {
1560   struct TrafficMetricMessage *msg;
1561
1562   msg = GNUNET_new (struct TrafficMetricMessage);
1563   msg->header.size = htons (sizeof (struct TrafficMetricMessage));
1564   msg->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_TRAFFIC_METRIC);
1565   msg->reserved = htonl (0);
1566   msg->peer = *peer;
1567   GNUNET_ATS_properties_hton (&msg->properties,
1568                               prop);
1569   msg->delay_in = GNUNET_TIME_relative_hton (delay_in);
1570   msg->delay_out = GNUNET_TIME_relative_hton (delay_out);
1571   schedule_control_transmit (handle,
1572                              sizeof (struct TrafficMetricMessage),
1573                              &send_metric,
1574                              msg);
1575 }
1576
1577
1578 /**
1579  * Offer the transport service the HELLO of another peer.  Note that
1580  * the transport service may just ignore this message if the HELLO is
1581  * malformed or useless due to our local configuration.
1582  *
1583  * @param handle connection to transport service
1584  * @param hello the hello message
1585  * @param cont continuation to call when HELLO has been sent,
1586  *      tc reason #GNUNET_SCHEDULER_REASON_TIMEOUT for fail
1587  *      tc reasong #GNUNET_SCHEDULER_REASON_READ_READY for success
1588  * @param cont_cls closure for @a cont
1589  * @return a `struct GNUNET_TRANSPORT_OfferHelloHandle` handle or NULL on failure,
1590  *      in case of failure @a cont will not be called
1591  *
1592  */
1593 struct GNUNET_TRANSPORT_OfferHelloHandle *
1594 GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle,
1595                               const struct GNUNET_MessageHeader *hello,
1596                               GNUNET_SCHEDULER_TaskCallback cont,
1597                               void *cont_cls)
1598 {
1599   struct GNUNET_TRANSPORT_OfferHelloHandle *ohh;
1600   struct GNUNET_MessageHeader *msg;
1601   struct GNUNET_PeerIdentity peer;
1602   uint16_t size;
1603
1604   if (NULL == handle->client)
1605     return NULL;
1606   GNUNET_break (ntohs (hello->type) == GNUNET_MESSAGE_TYPE_HELLO);
1607   size = ntohs (hello->size);
1608   GNUNET_break (size >= sizeof (struct GNUNET_MessageHeader));
1609   if (GNUNET_OK !=
1610       GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) hello,
1611                            &peer))
1612   {
1613     GNUNET_break (0);
1614     return NULL;
1615   }
1616
1617   msg = GNUNET_malloc (size);
1618   memcpy (msg, hello, size);
1619   LOG (GNUNET_ERROR_TYPE_DEBUG,
1620        "Offering HELLO message of `%s' to transport for validation.\n",
1621        GNUNET_i2s (&peer));
1622
1623   ohh = GNUNET_new (struct GNUNET_TRANSPORT_OfferHelloHandle);
1624   ohh->th = handle;
1625   ohh->cont = cont;
1626   ohh->cls = cont_cls;
1627   ohh->msg = msg;
1628   ohh->tth = schedule_control_transmit (handle,
1629                                         size,
1630                                         &send_hello,
1631                                         ohh);
1632   GNUNET_CONTAINER_DLL_insert (handle->oh_head,
1633                                handle->oh_tail,
1634                                ohh);
1635   return ohh;
1636 }
1637
1638
1639 /**
1640  * Cancel the request to transport to offer the HELLO message
1641  *
1642  * @param ohh the handle for the operation to cancel
1643  */
1644 void
1645 GNUNET_TRANSPORT_offer_hello_cancel (struct GNUNET_TRANSPORT_OfferHelloHandle *ohh)
1646 {
1647   struct GNUNET_TRANSPORT_Handle *th = ohh->th;
1648
1649   cancel_control_transmit (ohh->th, ohh->tth);
1650   GNUNET_CONTAINER_DLL_remove (th->oh_head,
1651                                th->oh_tail,
1652                                ohh);
1653   GNUNET_free (ohh->msg);
1654   GNUNET_free (ohh);
1655 }
1656
1657
1658 /**
1659  * Checks if a given peer is connected to us
1660  *
1661  * @param handle connection to transport service
1662  * @param peer the peer to check
1663  * @return #GNUNET_YES (connected) or #GNUNET_NO (disconnected)
1664  */
1665 int
1666 GNUNET_TRANSPORT_check_peer_connected (struct GNUNET_TRANSPORT_Handle *handle,
1667                                        const struct GNUNET_PeerIdentity *peer)
1668 {
1669   if (GNUNET_YES ==
1670       GNUNET_CONTAINER_multipeermap_contains (handle->neighbours,
1671                                               peer))
1672     return GNUNET_YES;
1673   return GNUNET_NO;
1674 }
1675
1676
1677 /**
1678  * Task to call the HelloUpdateCallback of the GetHelloHandle
1679  *
1680  * @param cls the `struct GNUNET_TRANSPORT_GetHelloHandle`
1681  * @param tc the scheduler task context
1682  */
1683 static void
1684 call_hello_update_cb_async (void *cls,
1685                             const struct GNUNET_SCHEDULER_TaskContext *tc)
1686 {
1687   struct GNUNET_TRANSPORT_GetHelloHandle *ghh = cls;
1688
1689   GNUNET_assert (NULL != ghh->handle->my_hello);
1690   GNUNET_assert (NULL != ghh->notify_task);
1691   ghh->notify_task = NULL;
1692   ghh->rec (ghh->rec_cls,
1693             ghh->handle->my_hello);
1694 }
1695
1696
1697 /**
1698  * Obtain the HELLO message for this peer.  The callback given in this function
1699  * is never called synchronously.
1700  *
1701  * @param handle connection to transport service
1702  * @param rec function to call with the HELLO, sender will be our peer
1703  *            identity; message and sender will be NULL on timeout
1704  *            (handshake with transport service pending/failed).
1705  *             cost estimate will be 0.
1706  * @param rec_cls closure for @a rec
1707  * @return handle to cancel the operation
1708  */
1709 struct GNUNET_TRANSPORT_GetHelloHandle *
1710 GNUNET_TRANSPORT_get_hello (struct GNUNET_TRANSPORT_Handle *handle,
1711                             GNUNET_TRANSPORT_HelloUpdateCallback rec,
1712                             void *rec_cls)
1713 {
1714   struct GNUNET_TRANSPORT_GetHelloHandle *hwl;
1715
1716   hwl = GNUNET_new (struct GNUNET_TRANSPORT_GetHelloHandle);
1717   hwl->rec = rec;
1718   hwl->rec_cls = rec_cls;
1719   hwl->handle = handle;
1720   GNUNET_CONTAINER_DLL_insert (handle->hwl_head,
1721                                handle->hwl_tail,
1722                                hwl);
1723   if (NULL != handle->my_hello)
1724     hwl->notify_task = GNUNET_SCHEDULER_add_now (&call_hello_update_cb_async,
1725                                                  hwl);
1726   return hwl;
1727 }
1728
1729
1730 /**
1731  * Stop receiving updates about changes to our HELLO message.
1732  *
1733  * @param ghh handle to cancel
1734  */
1735 void
1736 GNUNET_TRANSPORT_get_hello_cancel (struct GNUNET_TRANSPORT_GetHelloHandle *ghh)
1737 {
1738   struct GNUNET_TRANSPORT_Handle *handle = ghh->handle;
1739
1740   if (NULL != ghh->notify_task)
1741     GNUNET_SCHEDULER_cancel (ghh->notify_task);
1742   GNUNET_CONTAINER_DLL_remove (handle->hwl_head, handle->hwl_tail, ghh);
1743   GNUNET_free (ghh);
1744 }
1745
1746
1747 /**
1748  * Connect to the transport service.  Note that the connection may
1749  * complete (or fail) asynchronously.
1750  *
1751  * @param cfg configuration to use
1752  * @param self our own identity (API should check that it matches
1753  *             the identity found by transport), or NULL (no check)
1754  * @param cls closure for the callbacks
1755  * @param rec receive function to call
1756  * @param nc function to call on connect events
1757  * @param nd function to call on disconnect events
1758  * @return NULL on error
1759  */
1760 struct GNUNET_TRANSPORT_Handle *
1761 GNUNET_TRANSPORT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
1762                           const struct GNUNET_PeerIdentity *self,
1763                           void *cls,
1764                           GNUNET_TRANSPORT_ReceiveCallback rec,
1765                           GNUNET_TRANSPORT_NotifyConnect nc,
1766                           GNUNET_TRANSPORT_NotifyDisconnect nd)
1767 {
1768   return GNUNET_TRANSPORT_connect2 (cfg,
1769                                     self,
1770                                     cls,
1771                                     rec,
1772                                     nc,
1773                                     nd,
1774                                     NULL);
1775 }
1776
1777
1778 /**
1779  * Connect to the transport service.  Note that the connection may
1780  * complete (or fail) asynchronously.
1781  *
1782  * @param cfg configuration to use
1783  * @param self our own identity (API should check that it matches
1784  *             the identity found by transport), or NULL (no check)
1785  * @param cls closure for the callbacks
1786  * @param rec receive function to call
1787  * @param nc function to call on connect events
1788  * @param nd function to call on disconnect events
1789  * @param neb function to call if we have excess bandwidth to a peer
1790  * @return NULL on error
1791  */
1792 struct GNUNET_TRANSPORT_Handle *
1793 GNUNET_TRANSPORT_connect2 (const struct GNUNET_CONFIGURATION_Handle *cfg,
1794                            const struct GNUNET_PeerIdentity *self,
1795                            void *cls,
1796                            GNUNET_TRANSPORT_ReceiveCallback rec,
1797                            GNUNET_TRANSPORT_NotifyConnect nc,
1798                            GNUNET_TRANSPORT_NotifyDisconnect nd,
1799                            GNUNET_TRANSPORT_NotifyExcessBandwidth neb)
1800 {
1801   struct GNUNET_TRANSPORT_Handle *ret;
1802
1803   ret = GNUNET_new (struct GNUNET_TRANSPORT_Handle);
1804   if (NULL != self)
1805   {
1806     ret->self = *self;
1807     ret->check_self = GNUNET_YES;
1808   }
1809   ret->cfg = cfg;
1810   ret->cls = cls;
1811   ret->rec = rec;
1812   ret->nc_cb = nc;
1813   ret->nd_cb = nd;
1814   ret->neb_cb = neb;
1815   ret->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
1816   LOG (GNUNET_ERROR_TYPE_DEBUG,
1817        "Connecting to transport service.\n");
1818   ret->client = GNUNET_CLIENT_connect ("transport",
1819                                        cfg);
1820   if (NULL == ret->client)
1821   {
1822     GNUNET_free (ret);
1823     return NULL;
1824   }
1825   ret->neighbours =
1826     GNUNET_CONTAINER_multipeermap_create (STARTING_NEIGHBOURS_SIZE,
1827                                           GNUNET_YES);
1828   ret->ready_heap =
1829       GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1830   schedule_control_transmit (ret,
1831                              sizeof (struct StartMessage),
1832                              &send_start,
1833                              ret);
1834   return ret;
1835 }
1836
1837
1838 /**
1839  * Disconnect from the transport service.
1840  *
1841  * @param handle handle to the service as returned from #GNUNET_TRANSPORT_connect()
1842  */
1843 void
1844 GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle)
1845 {
1846   LOG (GNUNET_ERROR_TYPE_DEBUG,
1847        "Transport disconnect called!\n");
1848   /* this disconnects all neighbours... */
1849   if (NULL == handle->reconnect_task)
1850     disconnect_and_schedule_reconnect (handle);
1851   /* and now we stop trying to connect again... */
1852   if (NULL != handle->reconnect_task)
1853   {
1854     GNUNET_SCHEDULER_cancel (handle->reconnect_task);
1855     handle->reconnect_task = NULL;
1856   }
1857   GNUNET_CONTAINER_multipeermap_destroy (handle->neighbours);
1858   handle->neighbours = NULL;
1859   if (NULL != handle->quota_task)
1860   {
1861     GNUNET_SCHEDULER_cancel (handle->quota_task);
1862     handle->quota_task = NULL;
1863   }
1864   GNUNET_free_non_null (handle->my_hello);
1865   handle->my_hello = NULL;
1866   GNUNET_assert (NULL == handle->tc_head);
1867   GNUNET_assert (NULL == handle->tc_tail);
1868   GNUNET_assert (NULL == handle->hwl_head);
1869   GNUNET_assert (NULL == handle->hwl_tail);
1870   GNUNET_CONTAINER_heap_destroy (handle->ready_heap);
1871   handle->ready_heap = NULL;
1872   GNUNET_free (handle);
1873 }
1874
1875
1876 /**
1877  * Check if we could queue a message of the given size for
1878  * transmission.  The transport service will take both its
1879  * internal buffers and bandwidth limits imposed by the
1880  * other peer into consideration when answering this query.
1881  *
1882  * @param handle connection to transport service
1883  * @param target who should receive the message
1884  * @param size how big is the message we want to transmit?
1885  * @param timeout after how long should we give up (and call
1886  *        notify with buf NULL and size 0)?
1887  * @param notify function to call when we are ready to
1888  *        send such a message
1889  * @param notify_cls closure for @a notify
1890  * @return NULL if someone else is already waiting to be notified
1891  *         non-NULL if the notify callback was queued (can be used to cancel
1892  *         using #GNUNET_TRANSPORT_notify_transmit_ready_cancel)
1893  */
1894 struct GNUNET_TRANSPORT_TransmitHandle *
1895 GNUNET_TRANSPORT_notify_transmit_ready (struct GNUNET_TRANSPORT_Handle *handle,
1896                                         const struct GNUNET_PeerIdentity *target,
1897                                         size_t size,
1898                                         struct GNUNET_TIME_Relative timeout,
1899                                         GNUNET_TRANSPORT_TransmitReadyNotify notify,
1900                                         void *notify_cls)
1901 {
1902   struct Neighbour *n;
1903   struct GNUNET_TRANSPORT_TransmitHandle *th;
1904   struct GNUNET_TIME_Relative delay;
1905
1906   n = neighbour_find (handle, target);
1907   if (NULL == n)
1908   {
1909     /* use GNUNET_TRANSPORT_try_connect first, only use this function
1910      * once a connection has been established */
1911     GNUNET_assert (0);
1912     return NULL;
1913   }
1914   if (NULL != n->th)
1915   {
1916     /* attempt to send two messages at the same time to the same peer */
1917     GNUNET_assert (0);
1918     return NULL;
1919   }
1920   GNUNET_assert (NULL == n->hn);
1921   th = GNUNET_new (struct GNUNET_TRANSPORT_TransmitHandle);
1922   th->neighbour = n;
1923   th->notify = notify;
1924   th->notify_cls = notify_cls;
1925   th->request_start = GNUNET_TIME_absolute_get ();
1926   th->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1927   th->notify_size = size;
1928   n->th = th;
1929   /* calculate when our transmission should be ready */
1930   delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
1931                                               size + n->traffic_overhead);
1932   n->traffic_overhead = 0;
1933   if (delay.rel_value_us > timeout.rel_value_us)
1934     delay.rel_value_us = 0;        /* notify immediately (with failure) */
1935   if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
1936     LOG (GNUNET_ERROR_TYPE_WARNING,
1937          "At bandwidth %u byte/s next transmission to %s in %s\n",
1938          (unsigned int) n->out_tracker.available_bytes_per_s__,
1939          GNUNET_i2s (target),
1940          GNUNET_STRINGS_relative_time_to_string (delay,
1941                                                  GNUNET_YES));
1942   else
1943     LOG (GNUNET_ERROR_TYPE_DEBUG,
1944          "At bandwidth %u byte/s next transmission to %s in %s\n",
1945          (unsigned int) n->out_tracker.available_bytes_per_s__,
1946          GNUNET_i2s (target),
1947          GNUNET_STRINGS_relative_time_to_string (delay,
1948                                                  GNUNET_YES));
1949   n->hn = GNUNET_CONTAINER_heap_insert (handle->ready_heap,
1950                                         n,
1951                                         delay.rel_value_us);
1952   schedule_transmission (handle);
1953   return th;
1954 }
1955
1956
1957 /**
1958  * Cancel the specified transmission-ready notification.
1959  *
1960  * @param th handle returned from #GNUNET_TRANSPORT_notify_transmit_ready()
1961  */
1962 void
1963 GNUNET_TRANSPORT_notify_transmit_ready_cancel (struct GNUNET_TRANSPORT_TransmitHandle *th)
1964 {
1965   struct Neighbour *n;
1966
1967   GNUNET_assert (NULL == th->next);
1968   GNUNET_assert (NULL == th->prev);
1969   n = th->neighbour;
1970   GNUNET_assert (th == n->th);
1971   n->th = NULL;
1972   if (NULL != n->hn)
1973   {
1974     GNUNET_CONTAINER_heap_remove_node (n->hn);
1975     n->hn = NULL;
1976   }
1977   else
1978   {
1979     GNUNET_assert (NULL != th->timeout_task);
1980     GNUNET_SCHEDULER_cancel (th->timeout_task);
1981     th->timeout_task = NULL;
1982   }
1983   GNUNET_free (th);
1984 }
1985
1986
1987 /* end of transport_api.c */