-remove deprecated try_disconnect API
[oweals/gnunet.git] / src / transport / transport_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009-2013 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file transport/transport_api.c
23  * @brief library to access the low-level P2P IO service
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - test test test
28  */
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_arm_service.h"
33 #include "gnunet_hello_lib.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_transport_service.h"
36 #include "transport.h"
37
38 #define LOG(kind,...) GNUNET_log_from (kind, "transport-api",__VA_ARGS__)
39
40 /**
41  * If we could not send any payload to a peer for this amount of
42  * time, we print a warning.
43  */
44 #define UNREADY_WARN_TIME GNUNET_TIME_UNIT_MINUTES
45
46 /**
47  * How large to start with for the hashmap of neighbours.
48  */
49 #define STARTING_NEIGHBOURS_SIZE 16
50
51 /**
52  * Handle for a message that should be transmitted to the service.
53  * Used for both control messages and normal messages.
54  */
55 struct GNUNET_TRANSPORT_TransmitHandle
56 {
57
58   /**
59    * We keep all requests in a DLL.
60    */
61   struct GNUNET_TRANSPORT_TransmitHandle *next;
62
63   /**
64    * We keep all requests in a DLL.
65    */
66   struct GNUNET_TRANSPORT_TransmitHandle *prev;
67
68   /**
69    * Neighbour for this handle, NULL for control messages.
70    */
71   struct Neighbour *neighbour;
72
73   /**
74    * Function to call when @e notify_size bytes are available
75    * for transmission.
76    */
77   GNUNET_TRANSPORT_TransmitReadyNotify notify;
78
79   /**
80    * Closure for @e notify.
81    */
82   void *notify_cls;
83
84   /**
85    * Time at which this request was originally scheduled.
86    */
87   struct GNUNET_TIME_Absolute request_start;
88
89   /**
90    * Timeout for this request, 0 for control messages.
91    */
92   struct GNUNET_TIME_Absolute timeout;
93
94   /**
95    * Task to trigger request timeout if the request is stalled due to
96    * congestion.
97    */
98   struct GNUNET_SCHEDULER_Task *timeout_task;
99
100   /**
101    * How many bytes is our notify callback waiting for?
102    */
103   size_t notify_size;
104
105 };
106
107
108 /**
109  * Entry in hash table of all of our current (connected) neighbours.
110  */
111 struct Neighbour
112 {
113   /**
114    * Overall transport handle.
115    */
116   struct GNUNET_TRANSPORT_Handle *h;
117
118   /**
119    * Active transmit handle or NULL.
120    */
121   struct GNUNET_TRANSPORT_TransmitHandle *th;
122
123   /**
124    * Identity of this neighbour.
125    */
126   struct GNUNET_PeerIdentity id;
127
128   /**
129    * Outbound bandwidh tracker.
130    */
131   struct GNUNET_BANDWIDTH_Tracker out_tracker;
132
133   /**
134    * Entry in our readyness heap (which is sorted by @e next_ready
135    * value).  NULL if there is no pending transmission request for
136    * this neighbour or if we're waiting for @e is_ready to become
137    * true AFTER the @e out_tracker suggested that this peer's quota
138    * has been satisfied (so once @e is_ready goes to #GNUNET_YES,
139    * we should immediately go back into the heap).
140    */
141   struct GNUNET_CONTAINER_HeapNode *hn;
142
143   /**
144    * Last time when this peer received payload from us.
145    */
146   struct GNUNET_TIME_Absolute last_payload;
147
148   /**
149    * Task to trigger warnings if we do not get SEND_OK after a while.
150    */
151   struct GNUNET_SCHEDULER_Task *unready_warn_task;
152
153   /**
154    * Is this peer currently ready to receive a message?
155    */
156   int is_ready;
157
158   /**
159    * Sending consumed more bytes on wire than payload was announced
160    * This overhead is added to the delay of next sending operation
161    */
162   size_t traffic_overhead;
163 };
164
165
166 /**
167  * Linked list of functions to call whenever our HELLO is updated.
168  */
169 struct GNUNET_TRANSPORT_GetHelloHandle
170 {
171
172   /**
173    * This is a doubly linked list.
174    */
175   struct GNUNET_TRANSPORT_GetHelloHandle *next;
176
177   /**
178    * This is a doubly linked list.
179    */
180   struct GNUNET_TRANSPORT_GetHelloHandle *prev;
181
182   /**
183    * Transport handle.
184    */
185   struct GNUNET_TRANSPORT_Handle *handle;
186
187   /**
188    * Callback to call once we got our HELLO.
189    */
190   GNUNET_TRANSPORT_HelloUpdateCallback rec;
191
192   /**
193    * Task for calling the HelloUpdateCallback when we already have a HELLO
194    */
195   struct GNUNET_SCHEDULER_Task *notify_task;
196
197   /**
198    * Closure for @e rec.
199    */
200   void *rec_cls;
201
202 };
203
204
205 /**
206  * Entry in linked list for a try-connect request.
207  */
208 struct GNUNET_TRANSPORT_TryConnectHandle
209 {
210   /**
211    * For the DLL.
212    */
213   struct GNUNET_TRANSPORT_TryConnectHandle *prev;
214
215   /**
216    * For the DLL.
217    */
218   struct GNUNET_TRANSPORT_TryConnectHandle *next;
219
220   /**
221    * Peer we should try to connect to.
222    */
223   struct GNUNET_PeerIdentity pid;
224
225   /**
226    * Transport service handle this request is part of.
227    */
228   struct GNUNET_TRANSPORT_Handle *th;
229
230   /**
231    * Message transmission request to communicate to service.
232    */
233   struct GNUNET_TRANSPORT_TransmitHandle *tth;
234
235   /**
236    * Function to call upon completion (of request transmission).
237    */
238   GNUNET_TRANSPORT_TryConnectCallback cb;
239
240   /**
241    * Closure for @e cb.
242    */
243   void *cb_cls;
244
245 };
246
247
248 /**
249  * Entry in linked list for all offer-HELLO requests.
250  */
251 struct GNUNET_TRANSPORT_OfferHelloHandle
252 {
253   /**
254    * For the DLL.
255    */
256   struct GNUNET_TRANSPORT_OfferHelloHandle *prev;
257
258   /**
259    * For the DLL.
260    */
261   struct GNUNET_TRANSPORT_OfferHelloHandle *next;
262
263   /**
264    * Transport service handle we use for transmission.
265    */
266   struct GNUNET_TRANSPORT_Handle *th;
267
268   /**
269    * Transmission handle for this request.
270    */
271   struct GNUNET_TRANSPORT_TransmitHandle *tth;
272
273   /**
274    * Function to call once we are done.
275    */
276   GNUNET_SCHEDULER_TaskCallback cont;
277
278   /**
279    * Closure for @e cont
280    */
281   void *cls;
282
283   /**
284    * The HELLO message to be transmitted.
285    */
286   struct GNUNET_MessageHeader *msg;
287 };
288
289
290 /**
291  * Handle for the transport service (includes all of the
292  * state for the transport service).
293  */
294 struct GNUNET_TRANSPORT_Handle
295 {
296
297   /**
298    * Closure for the callbacks.
299    */
300   void *cls;
301
302   /**
303    * Function to call for received data.
304    */
305   GNUNET_TRANSPORT_ReceiveCallback rec;
306
307   /**
308    * function to call on connect events
309    */
310   GNUNET_TRANSPORT_NotifyConnect nc_cb;
311
312   /**
313    * function to call on disconnect events
314    */
315   GNUNET_TRANSPORT_NotifyDisconnect nd_cb;
316
317   /**
318    * function to call on excess bandwidth events
319    */
320   GNUNET_TRANSPORT_NotifyExcessBandwidth neb_cb;
321
322   /**
323    * Head of DLL of control messages.
324    */
325   struct GNUNET_TRANSPORT_TransmitHandle *control_head;
326
327   /**
328    * Tail of DLL of control messages.
329    */
330   struct GNUNET_TRANSPORT_TransmitHandle *control_tail;
331
332   /**
333    * The current HELLO message for this peer.  Updated
334    * whenever transports change their addresses.
335    */
336   struct GNUNET_MessageHeader *my_hello;
337
338   /**
339    * My client connection to the transport service.
340    */
341   struct GNUNET_CLIENT_Connection *client;
342
343   /**
344    * Handle to our registration with the client for notification.
345    */
346   struct GNUNET_CLIENT_TransmitHandle *cth;
347
348   /**
349    * Linked list of pending requests for our HELLO.
350    */
351   struct GNUNET_TRANSPORT_GetHelloHandle *hwl_head;
352
353   /**
354    * Linked list of pending requests for our HELLO.
355    */
356   struct GNUNET_TRANSPORT_GetHelloHandle *hwl_tail;
357
358   /**
359    * Linked list of pending try connect requests head
360    */
361   struct GNUNET_TRANSPORT_TryConnectHandle *tc_head;
362
363   /**
364    * Linked list of pending try connect requests tail
365    */
366   struct GNUNET_TRANSPORT_TryConnectHandle *tc_tail;
367
368   /**
369    * Linked list of pending offer HELLO requests head
370    */
371   struct GNUNET_TRANSPORT_OfferHelloHandle *oh_head;
372
373   /**
374    * Linked list of pending offer HELLO requests tail
375    */
376   struct GNUNET_TRANSPORT_OfferHelloHandle *oh_tail;
377
378   /**
379    * My configuration.
380    */
381   const struct GNUNET_CONFIGURATION_Handle *cfg;
382
383   /**
384    * Hash map of the current connected neighbours of this peer.
385    * Maps peer identities to `struct Neighbour` entries.
386    */
387   struct GNUNET_CONTAINER_MultiPeerMap *neighbours;
388
389   /**
390    * Heap sorting peers with pending messages by the timestamps that
391    * specify when we could next send a message to the respective peer.
392    * Excludes control messages (which can always go out immediately).
393    * Maps time stamps to `struct Neighbour` entries.
394    */
395   struct GNUNET_CONTAINER_Heap *ready_heap;
396
397   /**
398    * Peer identity as assumed by this process, or all zeros.
399    */
400   struct GNUNET_PeerIdentity self;
401
402   /**
403    * ID of the task trying to reconnect to the service.
404    */
405   struct GNUNET_SCHEDULER_Task *reconnect_task;
406
407   /**
408    * ID of the task trying to trigger transmission for a peer while
409    * maintaining bandwidth quotas.  In use if there are no control
410    * messages and the smallest entry in the @e ready_heap has a time
411    * stamp in the future.
412    */
413   struct GNUNET_SCHEDULER_Task *quota_task;
414
415   /**
416    * Delay until we try to reconnect.
417    */
418   struct GNUNET_TIME_Relative reconnect_delay;
419
420   /**
421    * Should we check that @e self matches what the service thinks?
422    * (if #GNUNET_NO, then @e self is all zeros!).
423    */
424   int check_self;
425
426   /**
427    * Reconnect in progress
428    */
429   int reconnecting;
430 };
431
432
433 /**
434  * Schedule the task to send one message, either from the control
435  * list or the peer message queues  to the service.
436  *
437  * @param h transport service to schedule a transmission for
438  */
439 static void
440 schedule_transmission (struct GNUNET_TRANSPORT_Handle *h);
441
442
443 /**
444  * Function that will schedule the job that will try
445  * to connect us again to the client.
446  *
447  * @param h transport service to reconnect
448  */
449 static void
450 disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h);
451
452
453 /**
454  * A neighbour has not gotten a SEND_OK in a  while. Print a warning.
455  *
456  * @param cls the `struct Neighbour`
457  * @param tc scheduler context
458  */
459 static void
460 do_warn_unready (void *cls,
461                  const struct GNUNET_SCHEDULER_TaskContext *tc)
462 {
463   struct Neighbour *n = cls;
464   struct GNUNET_TIME_Relative delay;
465
466   delay = GNUNET_TIME_absolute_get_duration (n->last_payload);
467   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
468               "Lacking SEND_OK, no payload could be send to %s for %s\n",
469               GNUNET_i2s (&n->id),
470               GNUNET_STRINGS_relative_time_to_string (delay,
471                                                       GNUNET_YES));
472   n->unready_warn_task
473     = GNUNET_SCHEDULER_add_delayed (UNREADY_WARN_TIME,
474                                     &do_warn_unready,
475                                     n);
476 }
477
478
479 /**
480  * Get the neighbour list entry for the given peer
481  *
482  * @param h our context
483  * @param peer peer to look up
484  * @return NULL if no such peer entry exists
485  */
486 static struct Neighbour *
487 neighbour_find (struct GNUNET_TRANSPORT_Handle *h,
488                 const struct GNUNET_PeerIdentity *peer)
489 {
490   return GNUNET_CONTAINER_multipeermap_get (h->neighbours,
491                                             peer);
492 }
493
494
495 /**
496  * The outbound quota has changed in a way that may require
497  * us to reset the timeout.  Update the timeout.
498  *
499  * @param cls the `struct Neighbour` for which the timeout changed
500  */
501 static void
502 outbound_bw_tracker_update (void *cls)
503 {
504   struct Neighbour *n = cls;
505   struct GNUNET_TIME_Relative delay;
506
507   if (NULL == n->hn)
508     return;
509   delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
510                                               n->th->notify_size + n->traffic_overhead);
511   LOG (GNUNET_ERROR_TYPE_DEBUG,
512        "New outbound delay %s us\n",
513        GNUNET_STRINGS_relative_time_to_string (delay,
514                                                GNUNET_NO));
515   GNUNET_CONTAINER_heap_update_cost (n->h->ready_heap,
516       n->hn, delay.rel_value_us);
517   schedule_transmission (n->h);
518 }
519
520
521 /**
522  * Function called by the bandwidth tracker if we have excess
523  * bandwidth.
524  *
525  * @param cls the `struct Neighbour` that has excess bandwidth
526  */
527 static void
528 notify_excess_cb (void *cls)
529 {
530   struct Neighbour *n = cls;
531   struct GNUNET_TRANSPORT_Handle *h = n->h;
532
533   if (NULL != h->neb_cb)
534     h->neb_cb (h->cls,
535                &n->id);
536 }
537
538
539 /**
540  * Add neighbour to our list
541  *
542  * @return NULL if this API is currently disconnecting from the service
543  */
544 static struct Neighbour *
545 neighbour_add (struct GNUNET_TRANSPORT_Handle *h,
546                const struct GNUNET_PeerIdentity *pid)
547 {
548   struct Neighbour *n;
549
550   LOG (GNUNET_ERROR_TYPE_DEBUG,
551        "Creating entry for neighbour `%s'.\n",
552        GNUNET_i2s (pid));
553   n = GNUNET_new (struct Neighbour);
554   n->id = *pid;
555   n->h = h;
556   n->is_ready = GNUNET_YES;
557   n->traffic_overhead = 0;
558   GNUNET_BANDWIDTH_tracker_init2 (&n->out_tracker,
559                                   &outbound_bw_tracker_update,
560                                   n,
561                                   GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
562                                   MAX_BANDWIDTH_CARRY_S,
563                                   &notify_excess_cb,
564                                   n);
565   GNUNET_assert (GNUNET_OK ==
566                  GNUNET_CONTAINER_multipeermap_put (h->neighbours,
567                                                     &n->id,
568                                                     n,
569                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
570   return n;
571 }
572
573
574 /**
575  * Iterator over hash map entries, for deleting state of a neighbour.
576  *
577  * @param cls the `struct GNUNET_TRANSPORT_Handle *`
578  * @param key peer identity
579  * @param value value in the hash map, the neighbour entry to delete
580  * @return #GNUNET_YES if we should continue to
581  *         iterate,
582  *         #GNUNET_NO if not.
583  */
584 static int
585 neighbour_delete (void *cls,
586                   const struct GNUNET_PeerIdentity *key,
587                   void *value)
588 {
589   struct GNUNET_TRANSPORT_Handle *handle = cls;
590   struct Neighbour *n = value;
591
592   LOG (GNUNET_ERROR_TYPE_DEBUG,
593        "Dropping entry for neighbour `%s'.\n",
594        GNUNET_i2s (key));
595   GNUNET_BANDWIDTH_tracker_notification_stop (&n->out_tracker);
596   if (NULL != handle->nd_cb)
597     handle->nd_cb (handle->cls,
598                    &n->id);
599   if (NULL != n->unready_warn_task)
600   {
601     GNUNET_SCHEDULER_cancel (n->unready_warn_task);
602     n->unready_warn_task = NULL;
603   }
604   GNUNET_assert (NULL == n->th);
605   GNUNET_assert (NULL == n->hn);
606   GNUNET_assert (GNUNET_YES ==
607                  GNUNET_CONTAINER_multipeermap_remove (handle->neighbours,
608                                                        key,
609                                                        n));
610   GNUNET_free (n);
611   return GNUNET_YES;
612 }
613
614
615 /**
616  * Function we use for handling incoming messages.
617  *
618  * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
619  * @param msg message received, NULL on timeout or fatal error
620  */
621 static void
622 demultiplexer (void *cls,
623                const struct GNUNET_MessageHeader *msg)
624 {
625   struct GNUNET_TRANSPORT_Handle *h = cls;
626   const struct DisconnectInfoMessage *dim;
627   const struct ConnectInfoMessage *cim;
628   const struct InboundMessage *im;
629   const struct GNUNET_MessageHeader *imm;
630   const struct SendOkMessage *okm;
631   const struct QuotaSetMessage *qm;
632   struct GNUNET_TRANSPORT_GetHelloHandle *hwl;
633   struct GNUNET_TRANSPORT_GetHelloHandle *next_hwl;
634   struct Neighbour *n;
635   struct GNUNET_PeerIdentity me;
636   uint16_t size;
637   uint32_t bytes_msg;
638   uint32_t bytes_physical;
639
640   GNUNET_assert (NULL != h->client);
641   if (GNUNET_YES == h->reconnecting)
642   {
643     return;
644   }
645   if (NULL == msg)
646   {
647     LOG (GNUNET_ERROR_TYPE_DEBUG,
648          "Error receiving from transport service, disconnecting temporarily.\n");
649     h->reconnecting = GNUNET_YES;
650     disconnect_and_schedule_reconnect (h);
651     return;
652   }
653   GNUNET_CLIENT_receive (h->client,
654                          &demultiplexer,
655                          h,
656                          GNUNET_TIME_UNIT_FOREVER_REL);
657   size = ntohs (msg->size);
658   switch (ntohs (msg->type))
659   {
660   case GNUNET_MESSAGE_TYPE_HELLO:
661     if (GNUNET_OK !=
662         GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg,
663                              &me))
664     {
665       GNUNET_break (0);
666       break;
667     }
668     LOG (GNUNET_ERROR_TYPE_DEBUG,
669          "Receiving (my own) HELLO message (%u bytes), I am `%s'.\n",
670          (unsigned int) size,
671          GNUNET_i2s (&me));
672     GNUNET_free_non_null (h->my_hello);
673     h->my_hello = NULL;
674     if (size < sizeof (struct GNUNET_MessageHeader))
675     {
676       GNUNET_break (0);
677       break;
678     }
679     h->my_hello = GNUNET_copy_message (msg);
680     hwl = h->hwl_head;
681     while (NULL != hwl)
682     {
683       next_hwl = hwl->next;
684       hwl->rec (hwl->rec_cls,
685                 h->my_hello);
686       hwl = next_hwl;
687     }
688     break;
689   case GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT:
690     if (size < sizeof (struct ConnectInfoMessage))
691     {
692       GNUNET_break (0);
693       h->reconnecting = GNUNET_YES;
694       disconnect_and_schedule_reconnect (h);
695       break;
696     }
697     cim = (const struct ConnectInfoMessage *) msg;
698     if (size !=
699         sizeof (struct ConnectInfoMessage))
700     {
701       GNUNET_break (0);
702       h->reconnecting = GNUNET_YES;
703       disconnect_and_schedule_reconnect (h);
704       break;
705     }
706     LOG (GNUNET_ERROR_TYPE_DEBUG,
707          "Receiving CONNECT message for `%s'.\n",
708          GNUNET_i2s (&cim->id));
709     n = neighbour_find (h, &cim->id);
710     if (NULL != n)
711     {
712       GNUNET_break (0);
713       h->reconnecting = GNUNET_YES;
714       disconnect_and_schedule_reconnect (h);
715       break;
716     }
717     n = neighbour_add (h,
718                        &cim->id);
719     LOG (GNUNET_ERROR_TYPE_DEBUG,
720          "Receiving CONNECT message for `%s' with quota %u\n",
721          GNUNET_i2s (&cim->id),
722          ntohl (cim->quota_out.value__));
723     GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
724                                            cim->quota_out);
725     if (NULL != h->nc_cb)
726       h->nc_cb (h->cls,
727                 &n->id);
728     break;
729   case GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT:
730     if (size != sizeof (struct DisconnectInfoMessage))
731     {
732       GNUNET_break (0);
733       h->reconnecting = GNUNET_YES;
734       disconnect_and_schedule_reconnect (h);
735       break;
736     }
737     dim = (const struct DisconnectInfoMessage *) msg;
738     GNUNET_break (ntohl (dim->reserved) == 0);
739     LOG (GNUNET_ERROR_TYPE_DEBUG,
740          "Receiving DISCONNECT message for `%s'.\n",
741          GNUNET_i2s (&dim->peer));
742     n = neighbour_find (h, &dim->peer);
743     if (NULL == n)
744     {
745       GNUNET_break (0);
746       h->reconnecting = GNUNET_YES;
747       disconnect_and_schedule_reconnect (h);
748       break;
749     }
750     neighbour_delete (h,
751                       &dim->peer,
752                       n);
753     break;
754   case GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK:
755     if (size != sizeof (struct SendOkMessage))
756     {
757       GNUNET_break (0);
758       h->reconnecting = GNUNET_YES;
759       disconnect_and_schedule_reconnect (h);
760       break;
761     }
762     okm = (const struct SendOkMessage *) msg;
763     bytes_msg = ntohl (okm->bytes_msg);
764     bytes_physical = ntohl (okm->bytes_physical);
765     LOG (GNUNET_ERROR_TYPE_DEBUG,
766          "Receiving SEND_OK message, transmission %s.\n",
767          ntohl (okm->success) == GNUNET_OK ? "succeeded" : "failed");
768
769     n = neighbour_find (h,
770                         &okm->peer);
771     if (NULL == n)
772     {
773       /* We should never get a 'SEND_OK' for a peer that we are not
774          connected to */
775       GNUNET_break (0);
776       h->reconnecting = GNUNET_YES;
777       disconnect_and_schedule_reconnect (h);
778       break;
779     }
780     if (bytes_physical >= bytes_msg)
781     {
782       LOG (GNUNET_ERROR_TYPE_DEBUG,
783            "Overhead for %u byte message: %u\n",
784            bytes_msg,
785            bytes_physical - bytes_msg);
786       n->traffic_overhead += bytes_physical - bytes_msg;
787     }
788     GNUNET_break (GNUNET_NO == n->is_ready);
789     n->is_ready = GNUNET_YES;
790     if (NULL != n->unready_warn_task)
791     {
792       GNUNET_SCHEDULER_cancel (n->unready_warn_task);
793       n->unready_warn_task = NULL;
794     }
795     if ((NULL != n->th) && (NULL == n->hn))
796     {
797       GNUNET_assert (NULL != n->th->timeout_task);
798       GNUNET_SCHEDULER_cancel (n->th->timeout_task);
799       n->th->timeout_task = NULL;
800       /* we've been waiting for this (congestion, not quota,
801        * caused delayed transmission) */
802       n->hn = GNUNET_CONTAINER_heap_insert (h->ready_heap, n, 0);
803       schedule_transmission (h);
804     }
805     break;
806   case GNUNET_MESSAGE_TYPE_TRANSPORT_RECV:
807     if (size <
808         sizeof (struct InboundMessage) + sizeof (struct GNUNET_MessageHeader))
809     {
810       GNUNET_break (0);
811       h->reconnecting = GNUNET_YES;
812       disconnect_and_schedule_reconnect (h);
813       break;
814     }
815     im = (const struct InboundMessage *) msg;
816     imm = (const struct GNUNET_MessageHeader *) &im[1];
817     if (ntohs (imm->size) + sizeof (struct InboundMessage) != size)
818     {
819       GNUNET_break (0);
820       h->reconnecting = GNUNET_YES;
821       disconnect_and_schedule_reconnect (h);
822       break;
823     }
824     LOG (GNUNET_ERROR_TYPE_DEBUG,
825          "Received message of type %u from `%s'.\n",
826          ntohs (imm->type), GNUNET_i2s (&im->peer));
827     n = neighbour_find (h, &im->peer);
828     if (NULL == n)
829     {
830       GNUNET_break (0);
831       h->reconnecting = GNUNET_YES;
832       disconnect_and_schedule_reconnect (h);
833       break;
834     }
835     if (NULL != h->rec)
836       h->rec (h->cls,
837               &im->peer,
838               imm);
839     break;
840   case GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA:
841     if (size != sizeof (struct QuotaSetMessage))
842     {
843       GNUNET_break (0);
844       h->reconnecting = GNUNET_YES;
845       disconnect_and_schedule_reconnect (h);
846       break;
847     }
848     qm = (const struct QuotaSetMessage *) msg;
849     n = neighbour_find (h, &qm->peer);
850     if (NULL == n)
851     {
852       GNUNET_break (0);
853       h->reconnecting = GNUNET_YES;
854       disconnect_and_schedule_reconnect (h);
855       break;
856     }
857     LOG (GNUNET_ERROR_TYPE_DEBUG,
858          "Receiving SET_QUOTA message for `%s' with quota %u\n",
859          GNUNET_i2s (&qm->peer),
860          ntohl (qm->quota.value__));
861     GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
862                                            qm->quota);
863     break;
864   default:
865     LOG (GNUNET_ERROR_TYPE_ERROR,
866          _("Received unexpected message of type %u in %s:%u\n"),
867          ntohs (msg->type),
868          __FILE__,
869          __LINE__);
870     GNUNET_break (0);
871     break;
872   }
873 }
874
875
876 /**
877  * A transmission request could not be satisfied because of
878  * network congestion.  Notify the initiator and clean up.
879  *
880  * @param cls the `struct GNUNET_TRANSPORT_TransmitHandle`
881  * @param tc scheduler context
882  */
883 static void
884 timeout_request_due_to_congestion (void *cls,
885                                    const struct GNUNET_SCHEDULER_TaskContext *tc)
886 {
887   struct GNUNET_TRANSPORT_TransmitHandle *th = cls;
888   struct Neighbour *n = th->neighbour;
889   struct GNUNET_TIME_Relative delay;
890
891   n->th->timeout_task = NULL;
892   delay = GNUNET_TIME_absolute_get_duration (th->request_start);
893   LOG (GNUNET_ERROR_TYPE_WARNING,
894        "Discarding %u bytes of payload message after %s delay due to congestion\n",
895        th->notify_size,
896        GNUNET_STRINGS_relative_time_to_string (delay,
897                                                GNUNET_YES));
898   GNUNET_assert (th == n->th);
899   GNUNET_assert (NULL == n->hn);
900   n->th = NULL;
901   th->notify (th->notify_cls,
902               0,
903               NULL);
904   GNUNET_free (th);
905 }
906
907
908 /**
909  * Transmit message(s) to service.
910  *
911  * @param cls handle to transport
912  * @param size number of bytes available in @a buf
913  * @param buf where to copy the message
914  * @return number of bytes copied to @a buf
915  */
916 static size_t
917 transport_notify_ready (void *cls,
918                         size_t size,
919                         void *buf)
920 {
921   struct GNUNET_TRANSPORT_Handle *h = cls;
922   struct GNUNET_TRANSPORT_TransmitHandle *th;
923   struct GNUNET_TIME_Relative delay;
924   struct Neighbour *n;
925   char *cbuf;
926   struct OutboundMessage obm;
927   size_t ret;
928   size_t nret;
929   size_t mret;
930
931   GNUNET_assert (NULL != h->client);
932   h->cth = NULL;
933   if (NULL == buf)
934   {
935     /* transmission failed */
936     disconnect_and_schedule_reconnect (h);
937     return 0;
938   }
939
940   cbuf = buf;
941   ret = 0;
942   /* first send control messages */
943   while ( (NULL != (th = h->control_head)) &&
944           (th->notify_size <= size) )
945   {
946     GNUNET_CONTAINER_DLL_remove (h->control_head,
947                                  h->control_tail,
948                                  th);
949     nret = th->notify (th->notify_cls,
950                        size,
951                        &cbuf[ret]);
952     delay = GNUNET_TIME_absolute_get_duration (th->request_start);
953     if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
954       LOG (GNUNET_ERROR_TYPE_WARNING,
955            "Added %u bytes of control message at %u after %s delay\n",
956            nret,
957            ret,
958            GNUNET_STRINGS_relative_time_to_string (delay,
959                                                    GNUNET_YES));
960     else
961       LOG (GNUNET_ERROR_TYPE_DEBUG,
962            "Added %u bytes of control message at %u after %s delay\n",
963            nret,
964            ret,
965            GNUNET_STRINGS_relative_time_to_string (delay,
966                                                    GNUNET_YES));
967     GNUNET_free (th);
968     ret += nret;
969     size -= nret;
970   }
971
972   /* then, if possible and no control messages pending, send data messages */
973   while ( (NULL == h->control_head) &&
974           (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap))) )
975   {
976     if (GNUNET_YES != n->is_ready)
977     {
978       /* peer not ready, wait for notification! */
979       GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap));
980       n->hn = NULL;
981       GNUNET_assert (NULL == n->th->timeout_task);
982       n->th->timeout_task
983         = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining
984                                         (n->th->timeout),
985                                         &timeout_request_due_to_congestion,
986                                         n->th);
987       continue;
988     }
989     th = n->th;
990     if (th->notify_size + sizeof (struct OutboundMessage) > size)
991       break;                    /* does not fit */
992     if (GNUNET_BANDWIDTH_tracker_get_delay
993         (&n->out_tracker, th->notify_size).rel_value_us > 0)
994       break;                    /* too early */
995     GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap));
996     n->hn = NULL;
997     n->th = NULL;
998     GNUNET_assert (size >= sizeof (struct OutboundMessage));
999     mret = th->notify (th->notify_cls,
1000                        size - sizeof (struct OutboundMessage),
1001                        &cbuf[ret + sizeof (struct OutboundMessage)]);
1002     GNUNET_assert (mret <= size - sizeof (struct OutboundMessage));
1003     if (0 == mret)
1004     {
1005       GNUNET_free (th);
1006       continue;
1007     }
1008     if (NULL != n->unready_warn_task)
1009       n->unready_warn_task
1010         = GNUNET_SCHEDULER_add_delayed (UNREADY_WARN_TIME,
1011                                         &do_warn_unready,
1012                                         n);
1013     n->last_payload = GNUNET_TIME_absolute_get ();
1014     n->is_ready = GNUNET_NO;
1015     GNUNET_assert (mret + sizeof (struct OutboundMessage) <
1016                    GNUNET_SERVER_MAX_MESSAGE_SIZE);
1017     obm.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND);
1018     obm.header.size = htons (mret + sizeof (struct OutboundMessage));
1019     obm.reserved = htonl (0);
1020     obm.timeout =
1021       GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining
1022                                  (th->timeout));
1023     obm.peer = n->id;
1024     memcpy (&cbuf[ret],
1025             &obm,
1026             sizeof (struct OutboundMessage));
1027     ret += (mret + sizeof (struct OutboundMessage));
1028     size -= (mret + sizeof (struct OutboundMessage));
1029     GNUNET_BANDWIDTH_tracker_consume (&n->out_tracker,
1030                                       mret);
1031     delay = GNUNET_TIME_absolute_get_duration (th->request_start);
1032     if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
1033       LOG (GNUNET_ERROR_TYPE_WARNING,
1034            "Added %u bytes of payload message for %s after %s delay at %u b/s\n",
1035            mret,
1036            GNUNET_i2s (&n->id),
1037            GNUNET_STRINGS_relative_time_to_string (delay,
1038                                                    GNUNET_YES),
1039            (unsigned int) n->out_tracker.available_bytes_per_s__);
1040     else
1041       LOG (GNUNET_ERROR_TYPE_DEBUG,
1042            "Added %u bytes of payload message for %s after %s delay at %u b/s\n",
1043            mret,
1044            GNUNET_i2s (&n->id),
1045            GNUNET_STRINGS_relative_time_to_string (delay,
1046                                                    GNUNET_YES),
1047            (unsigned int) n->out_tracker.available_bytes_per_s__);
1048     GNUNET_free (th);
1049     break;
1050   }
1051   /* if there are more pending messages, try to schedule those */
1052   schedule_transmission (h);
1053   LOG (GNUNET_ERROR_TYPE_DEBUG,
1054        "Transmitting %u bytes to transport service\n",
1055        ret);
1056   return ret;
1057 }
1058
1059
1060 /**
1061  * Schedule the task to send one message, either from the control
1062  * list or the peer message queues  to the service.
1063  *
1064  * @param cls transport service to schedule a transmission for
1065  * @param tc scheduler context
1066  */
1067 static void
1068 schedule_transmission_task (void *cls,
1069                             const struct GNUNET_SCHEDULER_TaskContext *tc)
1070 {
1071   struct GNUNET_TRANSPORT_Handle *h = cls;
1072   size_t size;
1073   struct GNUNET_TRANSPORT_TransmitHandle *th;
1074   struct Neighbour *n;
1075
1076   h->quota_task = NULL;
1077   GNUNET_assert (NULL != h->client);
1078   /* destroy all requests that have timed out */
1079   while ( (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap))) &&
1080           (0 == GNUNET_TIME_absolute_get_remaining (n->th->timeout).rel_value_us) )
1081   {
1082     /* notify client that the request could not be satisfied within
1083      * the given time constraints */
1084     th = n->th;
1085     n->th = NULL;
1086     GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap));
1087     n->hn = NULL;
1088     LOG (GNUNET_ERROR_TYPE_DEBUG,
1089          "Signalling timeout for transmission to peer %s due to congestion\n",
1090          GNUNET_i2s (&n->id));
1091     GNUNET_assert (0 == th->notify (th->notify_cls,
1092                                     0,
1093                                     NULL));
1094     GNUNET_free (th);
1095   }
1096   if (NULL != h->cth)
1097     return;
1098   if (NULL != h->control_head)
1099   {
1100     size = h->control_head->notify_size;
1101   }
1102   else
1103   {
1104     n = GNUNET_CONTAINER_heap_peek (h->ready_heap);
1105     if (NULL == n)
1106       return;                   /* no pending messages */
1107     size = n->th->notify_size + sizeof (struct OutboundMessage);
1108   }
1109   LOG (GNUNET_ERROR_TYPE_DEBUG,
1110        "Calling notify_transmit_ready\n");
1111   h->cth
1112     = GNUNET_CLIENT_notify_transmit_ready (h->client,
1113                                            size,
1114                                            GNUNET_TIME_UNIT_FOREVER_REL,
1115                                            GNUNET_NO,
1116                                            &transport_notify_ready,
1117                                            h);
1118   GNUNET_assert (NULL != h->cth);
1119 }
1120
1121
1122 /**
1123  * Schedule the task to send one message, either from the control
1124  * list or the peer message queues  to the service.
1125  *
1126  * @param h transport service to schedule a transmission for
1127  */
1128 static void
1129 schedule_transmission (struct GNUNET_TRANSPORT_Handle *h)
1130 {
1131   struct GNUNET_TIME_Relative delay;
1132   struct Neighbour *n;
1133
1134   GNUNET_assert (NULL != h->client);
1135   if (NULL != h->quota_task)
1136   {
1137     GNUNET_SCHEDULER_cancel (h->quota_task);
1138     h->quota_task = NULL;
1139   }
1140   if (NULL != h->control_head)
1141     delay = GNUNET_TIME_UNIT_ZERO;
1142   else if (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap)))
1143   {
1144     delay =
1145         GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
1146                                             n->th->notify_size + n->traffic_overhead);
1147     n->traffic_overhead = 0;
1148   }
1149   else
1150     return;                     /* no work to be done */
1151   LOG (GNUNET_ERROR_TYPE_DEBUG,
1152        "Scheduling next transmission to service in %s\n",
1153        GNUNET_STRINGS_relative_time_to_string (delay,
1154                                                GNUNET_YES));
1155   h->quota_task =
1156       GNUNET_SCHEDULER_add_delayed (delay,
1157                                     &schedule_transmission_task,
1158                                     h);
1159 }
1160
1161
1162 /**
1163  * Queue control request for transmission to the transport
1164  * service.
1165  *
1166  * @param h handle to the transport service
1167  * @param size number of bytes to be transmitted
1168  * @param notify function to call to get the content
1169  * @param notify_cls closure for @a notify
1170  * @return a `struct GNUNET_TRANSPORT_TransmitHandle`
1171  */
1172 static struct GNUNET_TRANSPORT_TransmitHandle *
1173 schedule_control_transmit (struct GNUNET_TRANSPORT_Handle *h,
1174                            size_t size,
1175                            GNUNET_TRANSPORT_TransmitReadyNotify notify,
1176                            void *notify_cls)
1177 {
1178   struct GNUNET_TRANSPORT_TransmitHandle *th;
1179
1180   LOG (GNUNET_ERROR_TYPE_DEBUG,
1181        "Control transmit of %u bytes requested\n",
1182        size);
1183   th = GNUNET_new (struct GNUNET_TRANSPORT_TransmitHandle);
1184   th->notify = notify;
1185   th->notify_cls = notify_cls;
1186   th->notify_size = size;
1187   th->request_start = GNUNET_TIME_absolute_get ();
1188   GNUNET_CONTAINER_DLL_insert_tail (h->control_head,
1189                                     h->control_tail,
1190                                     th);
1191   schedule_transmission (h);
1192   return th;
1193 }
1194
1195
1196 /**
1197  * Transmit START message to service.
1198  *
1199  * @param cls unused
1200  * @param size number of bytes available in @a buf
1201  * @param buf where to copy the message
1202  * @return number of bytes copied to @a buf
1203  */
1204 static size_t
1205 send_start (void *cls,
1206             size_t size,
1207             void *buf)
1208 {
1209   struct GNUNET_TRANSPORT_Handle *h = cls;
1210   struct StartMessage s;
1211   uint32_t options;
1212
1213   if (NULL == buf)
1214   {
1215     /* Can only be shutdown, just give up */
1216     LOG (GNUNET_ERROR_TYPE_DEBUG,
1217          "Shutdown while trying to transmit START request.\n");
1218     return 0;
1219   }
1220   LOG (GNUNET_ERROR_TYPE_DEBUG,
1221        "Transmitting START request.\n");
1222   GNUNET_assert (size >= sizeof (struct StartMessage));
1223   s.header.size = htons (sizeof (struct StartMessage));
1224   s.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_START);
1225   options = 0;
1226   if (h->check_self)
1227     options |= 1;
1228   if (NULL != h->rec)
1229     options |= 2;
1230   s.options = htonl (options);
1231   s.self = h->self;
1232   memcpy (buf, &s, sizeof (struct StartMessage));
1233   GNUNET_CLIENT_receive (h->client, &demultiplexer, h,
1234                          GNUNET_TIME_UNIT_FOREVER_REL);
1235   return sizeof (struct StartMessage);
1236 }
1237
1238
1239 /**
1240  * Try again to connect to transport service.
1241  *
1242  * @param cls the handle to the transport service
1243  * @param tc scheduler context
1244  */
1245 static void
1246 reconnect (void *cls,
1247            const struct GNUNET_SCHEDULER_TaskContext *tc)
1248 {
1249   struct GNUNET_TRANSPORT_Handle *h = cls;
1250
1251   h->reconnect_task = NULL;
1252   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1253   {
1254     /* shutdown, just give up */
1255     return;
1256   }
1257   LOG (GNUNET_ERROR_TYPE_DEBUG,
1258        "Connecting to transport service.\n");
1259   GNUNET_assert (NULL == h->client);
1260   GNUNET_assert (NULL == h->control_head);
1261   GNUNET_assert (NULL == h->control_tail);
1262   h->reconnecting = GNUNET_NO;
1263   h->client = GNUNET_CLIENT_connect ("transport", h->cfg);
1264
1265   GNUNET_assert (NULL != h->client);
1266   schedule_control_transmit (h, sizeof (struct StartMessage),
1267                              &send_start, h);
1268 }
1269
1270
1271 /**
1272  * Function that will schedule the job that will try
1273  * to connect us again to the client.
1274  *
1275  * @param h transport service to reconnect
1276  */
1277 static void
1278 disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h)
1279 {
1280   struct GNUNET_TRANSPORT_TransmitHandle *th;
1281
1282   GNUNET_assert (NULL == h->reconnect_task);
1283   if (NULL != h->cth)
1284   {
1285     GNUNET_CLIENT_notify_transmit_ready_cancel (h->cth);
1286     h->cth = NULL;
1287   }
1288   if (NULL != h->client)
1289   {
1290     GNUNET_CLIENT_disconnect (h->client);
1291     h->client = NULL;
1292 /*    LOG (GNUNET_ERROR_TYPE_ERROR,
1293          "Client disconnect done \n");*/
1294   }
1295   /* Forget about all neighbours that we used to be connected to */
1296   GNUNET_CONTAINER_multipeermap_iterate (h->neighbours,
1297                                          &neighbour_delete,
1298                                          h);
1299   if (NULL != h->quota_task)
1300   {
1301     GNUNET_SCHEDULER_cancel (h->quota_task);
1302     h->quota_task = NULL;
1303   }
1304   while ((NULL != (th = h->control_head)))
1305   {
1306     GNUNET_CONTAINER_DLL_remove (h->control_head,
1307                                  h->control_tail,
1308                                  th);
1309     th->notify (th->notify_cls,
1310                 0,
1311                 NULL);
1312     GNUNET_free (th);
1313   }
1314   LOG (GNUNET_ERROR_TYPE_DEBUG,
1315        "Scheduling task to reconnect to transport service in %s.\n",
1316        GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay,
1317                                                GNUNET_YES));
1318   h->reconnect_task =
1319       GNUNET_SCHEDULER_add_delayed (h->reconnect_delay,
1320                                     &reconnect,
1321                                     h);
1322   h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
1323 }
1324
1325
1326 /**
1327  * Cancel control request for transmission to the transport service.
1328  *
1329  * @param th handle to the transport service
1330  * @param tth transmit handle to cancel
1331  */
1332 static void
1333 cancel_control_transmit (struct GNUNET_TRANSPORT_Handle *th,
1334                          struct GNUNET_TRANSPORT_TransmitHandle *tth)
1335 {
1336   LOG (GNUNET_ERROR_TYPE_DEBUG,
1337        "Canceling transmit of contral transmission requested\n");
1338   GNUNET_CONTAINER_DLL_remove (th->control_head,
1339                                th->control_tail,
1340                                tth);
1341   GNUNET_free (tth);
1342 }
1343
1344
1345 /**
1346  * Send #GNUNET_MESSAGE_TYPE_TRANSPORT_REQUEST_CONNECT message to the
1347  * service.
1348  *
1349  * @param cls the `struct GNUNET_TRANSPORT_TryConnectHandle`
1350  * @param size number of bytes available in @a buf
1351  * @param buf where to copy the message
1352  * @return number of bytes copied to @a buf
1353  */
1354 static size_t
1355 send_try_connect (void *cls,
1356                   size_t size,
1357                   void *buf)
1358 {
1359   struct GNUNET_TRANSPORT_TryConnectHandle *tch = cls;
1360   struct TransportRequestConnectMessage msg;
1361
1362   tch->tth = NULL;
1363   if (NULL == buf)
1364   {
1365     LOG (GNUNET_ERROR_TYPE_DEBUG,
1366          "Discarding  `%s' request to `%s' due to error in transport service connection.\n",
1367          "REQUEST_CONNECT",
1368          GNUNET_i2s (&tch->pid));
1369     if (NULL != tch->cb)
1370       tch->cb (tch->cb_cls,
1371                GNUNET_SYSERR);
1372     GNUNET_TRANSPORT_try_connect_cancel (tch);
1373     return 0;
1374   }
1375   LOG (GNUNET_ERROR_TYPE_DEBUG,
1376        "Transmitting `%s' request with respect to `%s'.\n",
1377        "REQUEST_CONNECT",
1378        GNUNET_i2s (&tch->pid));
1379   GNUNET_assert (size >= sizeof (struct TransportRequestConnectMessage));
1380   msg.header.size = htons (sizeof (struct TransportRequestConnectMessage));
1381   msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_REQUEST_CONNECT);
1382   msg.reserved = htonl (0);
1383   msg.peer = tch->pid;
1384   memcpy (buf, &msg, sizeof (msg));
1385   if (NULL != tch->cb)
1386     tch->cb (tch->cb_cls, GNUNET_OK);
1387   GNUNET_TRANSPORT_try_connect_cancel (tch);
1388   return sizeof (struct TransportRequestConnectMessage);
1389 }
1390
1391
1392 /**
1393  * Ask the transport service to establish a connection to
1394  * the given peer.
1395  *
1396  * @param handle connection to transport service
1397  * @param target who we should try to connect to
1398  * @param cb callback to be called when request was transmitted to transport
1399  *         service
1400  * @param cb_cls closure for the callback
1401  * @return a `struct GNUNET_TRANSPORT_TryConnectHandle` handle or
1402  *         NULL on failure (cb will not be called)
1403  */
1404 struct GNUNET_TRANSPORT_TryConnectHandle *
1405 GNUNET_TRANSPORT_try_connect (struct GNUNET_TRANSPORT_Handle *handle,
1406                               const struct GNUNET_PeerIdentity *target,
1407                               GNUNET_TRANSPORT_TryConnectCallback cb,
1408                               void *cb_cls)
1409 {
1410   struct GNUNET_TRANSPORT_TryConnectHandle *tch;
1411
1412   if (NULL == handle->client)
1413     return NULL;
1414   tch = GNUNET_new (struct GNUNET_TRANSPORT_TryConnectHandle);
1415   tch->th = handle;
1416   tch->pid = *target;
1417   tch->cb = cb;
1418   tch->cb_cls = cb_cls;
1419   tch->tth = schedule_control_transmit (handle,
1420                                         sizeof (struct TransportRequestConnectMessage),
1421                                         &send_try_connect, tch);
1422   GNUNET_CONTAINER_DLL_insert (handle->tc_head,
1423                                handle->tc_tail,
1424                                tch);
1425   return tch;
1426 }
1427
1428
1429 /**
1430  * Cancel the request to transport to try a connect
1431  * Callback will not be called
1432  *
1433  * @param tch the handle to cancel
1434  */
1435 void
1436 GNUNET_TRANSPORT_try_connect_cancel (struct GNUNET_TRANSPORT_TryConnectHandle *tch)
1437 {
1438   struct GNUNET_TRANSPORT_Handle *th;
1439
1440   th = tch->th;
1441   if (NULL != tch->tth)
1442     cancel_control_transmit (th, tch->tth);
1443   GNUNET_CONTAINER_DLL_remove (th->tc_head,
1444                                th->tc_tail,
1445                                tch);
1446   GNUNET_free (tch);
1447 }
1448
1449
1450 /**
1451  * Send HELLO message to the service.
1452  *
1453  * @param cls the HELLO message to send
1454  * @param size number of bytes available in @a buf
1455  * @param buf where to copy the message
1456  * @return number of bytes copied to @a buf
1457  */
1458 static size_t
1459 send_hello (void *cls,
1460             size_t size,
1461             void *buf)
1462 {
1463   struct GNUNET_TRANSPORT_OfferHelloHandle *ohh = cls;
1464   struct GNUNET_MessageHeader *msg = ohh->msg;
1465   uint16_t ssize;
1466   struct GNUNET_SCHEDULER_TaskContext tc;
1467
1468   tc.read_ready = NULL;
1469   tc.write_ready = NULL;
1470   tc.reason = GNUNET_SCHEDULER_REASON_TIMEOUT;
1471   if (NULL == buf)
1472   {
1473     LOG (GNUNET_ERROR_TYPE_DEBUG,
1474          "Timeout while trying to transmit `%s' request.\n",
1475          "HELLO");
1476     if (NULL != ohh->cont)
1477       ohh->cont (ohh->cls,
1478                  &tc);
1479     GNUNET_free (msg);
1480     GNUNET_CONTAINER_DLL_remove (ohh->th->oh_head,
1481                                  ohh->th->oh_tail,
1482                                  ohh);
1483     GNUNET_free (ohh);
1484     return 0;
1485   }
1486   LOG (GNUNET_ERROR_TYPE_DEBUG,
1487        "Transmitting `%s' request.\n",
1488        "HELLO");
1489   ssize = ntohs (msg->size);
1490   GNUNET_assert (size >= ssize);
1491   memcpy (buf,
1492           msg,
1493           ssize);
1494   GNUNET_free (msg);
1495   tc.reason = GNUNET_SCHEDULER_REASON_READ_READY;
1496   if (NULL != ohh->cont)
1497     ohh->cont (ohh->cls,
1498                &tc);
1499   GNUNET_CONTAINER_DLL_remove (ohh->th->oh_head,
1500                                ohh->th->oh_tail,
1501                                ohh);
1502   GNUNET_free (ohh);
1503   return ssize;
1504 }
1505
1506
1507 /**
1508  * Send traffic metric message to the service.
1509  *
1510  * @param cls the message to send
1511  * @param size number of bytes available in @a buf
1512  * @param buf where to copy the message
1513  * @return number of bytes copied to @a buf
1514  */
1515 static size_t
1516 send_metric (void *cls,
1517              size_t size,
1518              void *buf)
1519 {
1520   struct TrafficMetricMessage *msg = cls;
1521   uint16_t ssize;
1522
1523   if (NULL == buf)
1524   {
1525     LOG (GNUNET_ERROR_TYPE_DEBUG,
1526          "Timeout while trying to transmit TRAFFIC_METRIC request.\n");
1527     GNUNET_free (msg);
1528     return 0;
1529   }
1530   LOG (GNUNET_ERROR_TYPE_DEBUG,
1531        "Transmitting TRAFFIC_METRIC request.\n");
1532   ssize = ntohs (msg->header.size);
1533   GNUNET_assert (size >= ssize);
1534   memcpy (buf, msg, ssize);
1535   GNUNET_free (msg);
1536   return ssize;
1537 }
1538
1539
1540 /**
1541  * Set transport metrics for a peer and a direction
1542  *
1543  * @param handle transport handle
1544  * @param peer the peer to set the metric for
1545  * @param prop the performance metrics to set
1546  * @param delay_in inbound delay to introduce
1547  * @param delay_out outbound delay to introduce
1548  *
1549  * Note: Delay restrictions in receiving direction will be enforced
1550  * with one message delay.
1551  */
1552 void
1553 GNUNET_TRANSPORT_set_traffic_metric (struct GNUNET_TRANSPORT_Handle *handle,
1554                                      const struct GNUNET_PeerIdentity *peer,
1555                                      const struct GNUNET_ATS_Properties *prop,
1556                                      struct GNUNET_TIME_Relative delay_in,
1557                                      struct GNUNET_TIME_Relative delay_out)
1558 {
1559   struct TrafficMetricMessage *msg;
1560
1561   msg = GNUNET_new (struct TrafficMetricMessage);
1562   msg->header.size = htons (sizeof (struct TrafficMetricMessage));
1563   msg->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_TRAFFIC_METRIC);
1564   msg->reserved = htonl (0);
1565   msg->peer = *peer;
1566   GNUNET_ATS_properties_hton (&msg->properties,
1567                               prop);
1568   msg->delay_in = GNUNET_TIME_relative_hton (delay_in);
1569   msg->delay_out = GNUNET_TIME_relative_hton (delay_out);
1570   schedule_control_transmit (handle,
1571                              sizeof (struct TrafficMetricMessage),
1572                              &send_metric,
1573                              msg);
1574 }
1575
1576
1577 /**
1578  * Offer the transport service the HELLO of another peer.  Note that
1579  * the transport service may just ignore this message if the HELLO is
1580  * malformed or useless due to our local configuration.
1581  *
1582  * @param handle connection to transport service
1583  * @param hello the hello message
1584  * @param cont continuation to call when HELLO has been sent,
1585  *      tc reason #GNUNET_SCHEDULER_REASON_TIMEOUT for fail
1586  *      tc reasong #GNUNET_SCHEDULER_REASON_READ_READY for success
1587  * @param cont_cls closure for @a cont
1588  * @return a `struct GNUNET_TRANSPORT_OfferHelloHandle` handle or NULL on failure,
1589  *      in case of failure @a cont will not be called
1590  *
1591  */
1592 struct GNUNET_TRANSPORT_OfferHelloHandle *
1593 GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle,
1594                               const struct GNUNET_MessageHeader *hello,
1595                               GNUNET_SCHEDULER_TaskCallback cont,
1596                               void *cont_cls)
1597 {
1598   struct GNUNET_TRANSPORT_OfferHelloHandle *ohh;
1599   struct GNUNET_MessageHeader *msg;
1600   struct GNUNET_PeerIdentity peer;
1601   uint16_t size;
1602
1603   if (NULL == handle->client)
1604     return NULL;
1605   GNUNET_break (ntohs (hello->type) == GNUNET_MESSAGE_TYPE_HELLO);
1606   size = ntohs (hello->size);
1607   GNUNET_break (size >= sizeof (struct GNUNET_MessageHeader));
1608   if (GNUNET_OK !=
1609       GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) hello,
1610                            &peer))
1611   {
1612     GNUNET_break (0);
1613     return NULL;
1614   }
1615
1616   msg = GNUNET_malloc (size);
1617   memcpy (msg, hello, size);
1618   LOG (GNUNET_ERROR_TYPE_DEBUG,
1619        "Offering HELLO message of `%s' to transport for validation.\n",
1620        GNUNET_i2s (&peer));
1621
1622   ohh = GNUNET_new (struct GNUNET_TRANSPORT_OfferHelloHandle);
1623   ohh->th = handle;
1624   ohh->cont = cont;
1625   ohh->cls = cont_cls;
1626   ohh->msg = msg;
1627   ohh->tth = schedule_control_transmit (handle,
1628                                         size,
1629                                         &send_hello,
1630                                         ohh);
1631   GNUNET_CONTAINER_DLL_insert (handle->oh_head,
1632                                handle->oh_tail,
1633                                ohh);
1634   return ohh;
1635 }
1636
1637
1638 /**
1639  * Cancel the request to transport to offer the HELLO message
1640  *
1641  * @param ohh the handle for the operation to cancel
1642  */
1643 void
1644 GNUNET_TRANSPORT_offer_hello_cancel (struct GNUNET_TRANSPORT_OfferHelloHandle *ohh)
1645 {
1646   struct GNUNET_TRANSPORT_Handle *th = ohh->th;
1647
1648   cancel_control_transmit (ohh->th, ohh->tth);
1649   GNUNET_CONTAINER_DLL_remove (th->oh_head,
1650                                th->oh_tail,
1651                                ohh);
1652   GNUNET_free (ohh->msg);
1653   GNUNET_free (ohh);
1654 }
1655
1656
1657 /**
1658  * Checks if a given peer is connected to us
1659  *
1660  * @param handle connection to transport service
1661  * @param peer the peer to check
1662  * @return #GNUNET_YES (connected) or #GNUNET_NO (disconnected)
1663  */
1664 int
1665 GNUNET_TRANSPORT_check_peer_connected (struct GNUNET_TRANSPORT_Handle *handle,
1666                                        const struct GNUNET_PeerIdentity *peer)
1667 {
1668   if (GNUNET_YES ==
1669       GNUNET_CONTAINER_multipeermap_contains (handle->neighbours,
1670                                               peer))
1671     return GNUNET_YES;
1672   return GNUNET_NO;
1673 }
1674
1675
1676 /**
1677  * Task to call the HelloUpdateCallback of the GetHelloHandle
1678  *
1679  * @param cls the `struct GNUNET_TRANSPORT_GetHelloHandle`
1680  * @param tc the scheduler task context
1681  */
1682 static void
1683 call_hello_update_cb_async (void *cls,
1684                             const struct GNUNET_SCHEDULER_TaskContext *tc)
1685 {
1686   struct GNUNET_TRANSPORT_GetHelloHandle *ghh = cls;
1687
1688   GNUNET_assert (NULL != ghh->handle->my_hello);
1689   GNUNET_assert (NULL != ghh->notify_task);
1690   ghh->notify_task = NULL;
1691   ghh->rec (ghh->rec_cls,
1692             ghh->handle->my_hello);
1693 }
1694
1695
1696 /**
1697  * Obtain the HELLO message for this peer.  The callback given in this function
1698  * is never called synchronously.
1699  *
1700  * @param handle connection to transport service
1701  * @param rec function to call with the HELLO, sender will be our peer
1702  *            identity; message and sender will be NULL on timeout
1703  *            (handshake with transport service pending/failed).
1704  *             cost estimate will be 0.
1705  * @param rec_cls closure for @a rec
1706  * @return handle to cancel the operation
1707  */
1708 struct GNUNET_TRANSPORT_GetHelloHandle *
1709 GNUNET_TRANSPORT_get_hello (struct GNUNET_TRANSPORT_Handle *handle,
1710                             GNUNET_TRANSPORT_HelloUpdateCallback rec,
1711                             void *rec_cls)
1712 {
1713   struct GNUNET_TRANSPORT_GetHelloHandle *hwl;
1714
1715   hwl = GNUNET_new (struct GNUNET_TRANSPORT_GetHelloHandle);
1716   hwl->rec = rec;
1717   hwl->rec_cls = rec_cls;
1718   hwl->handle = handle;
1719   GNUNET_CONTAINER_DLL_insert (handle->hwl_head,
1720                                handle->hwl_tail,
1721                                hwl);
1722   if (NULL != handle->my_hello)
1723     hwl->notify_task = GNUNET_SCHEDULER_add_now (&call_hello_update_cb_async,
1724                                                  hwl);
1725   return hwl;
1726 }
1727
1728
1729 /**
1730  * Stop receiving updates about changes to our HELLO message.
1731  *
1732  * @param ghh handle to cancel
1733  */
1734 void
1735 GNUNET_TRANSPORT_get_hello_cancel (struct GNUNET_TRANSPORT_GetHelloHandle *ghh)
1736 {
1737   struct GNUNET_TRANSPORT_Handle *handle = ghh->handle;
1738
1739   if (NULL != ghh->notify_task)
1740     GNUNET_SCHEDULER_cancel (ghh->notify_task);
1741   GNUNET_CONTAINER_DLL_remove (handle->hwl_head, handle->hwl_tail, ghh);
1742   GNUNET_free (ghh);
1743 }
1744
1745
1746 /**
1747  * Connect to the transport service.  Note that the connection may
1748  * complete (or fail) asynchronously.
1749  *
1750  * @param cfg configuration to use
1751  * @param self our own identity (API should check that it matches
1752  *             the identity found by transport), or NULL (no check)
1753  * @param cls closure for the callbacks
1754  * @param rec receive function to call
1755  * @param nc function to call on connect events
1756  * @param nd function to call on disconnect events
1757  * @return NULL on error
1758  */
1759 struct GNUNET_TRANSPORT_Handle *
1760 GNUNET_TRANSPORT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
1761                           const struct GNUNET_PeerIdentity *self,
1762                           void *cls,
1763                           GNUNET_TRANSPORT_ReceiveCallback rec,
1764                           GNUNET_TRANSPORT_NotifyConnect nc,
1765                           GNUNET_TRANSPORT_NotifyDisconnect nd)
1766 {
1767   return GNUNET_TRANSPORT_connect2 (cfg,
1768                                     self,
1769                                     cls,
1770                                     rec,
1771                                     nc,
1772                                     nd,
1773                                     NULL);
1774 }
1775
1776
1777 /**
1778  * Connect to the transport service.  Note that the connection may
1779  * complete (or fail) asynchronously.
1780  *
1781  * @param cfg configuration to use
1782  * @param self our own identity (API should check that it matches
1783  *             the identity found by transport), or NULL (no check)
1784  * @param cls closure for the callbacks
1785  * @param rec receive function to call
1786  * @param nc function to call on connect events
1787  * @param nd function to call on disconnect events
1788  * @param neb function to call if we have excess bandwidth to a peer
1789  * @return NULL on error
1790  */
1791 struct GNUNET_TRANSPORT_Handle *
1792 GNUNET_TRANSPORT_connect2 (const struct GNUNET_CONFIGURATION_Handle *cfg,
1793                            const struct GNUNET_PeerIdentity *self,
1794                            void *cls,
1795                            GNUNET_TRANSPORT_ReceiveCallback rec,
1796                            GNUNET_TRANSPORT_NotifyConnect nc,
1797                            GNUNET_TRANSPORT_NotifyDisconnect nd,
1798                            GNUNET_TRANSPORT_NotifyExcessBandwidth neb)
1799 {
1800   struct GNUNET_TRANSPORT_Handle *ret;
1801
1802   ret = GNUNET_new (struct GNUNET_TRANSPORT_Handle);
1803   if (NULL != self)
1804   {
1805     ret->self = *self;
1806     ret->check_self = GNUNET_YES;
1807   }
1808   ret->cfg = cfg;
1809   ret->cls = cls;
1810   ret->rec = rec;
1811   ret->nc_cb = nc;
1812   ret->nd_cb = nd;
1813   ret->neb_cb = neb;
1814   ret->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
1815   LOG (GNUNET_ERROR_TYPE_DEBUG,
1816        "Connecting to transport service.\n");
1817   ret->client = GNUNET_CLIENT_connect ("transport",
1818                                        cfg);
1819   if (NULL == ret->client)
1820   {
1821     GNUNET_free (ret);
1822     return NULL;
1823   }
1824   ret->neighbours =
1825     GNUNET_CONTAINER_multipeermap_create (STARTING_NEIGHBOURS_SIZE,
1826                                           GNUNET_YES);
1827   ret->ready_heap =
1828       GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1829   schedule_control_transmit (ret,
1830                              sizeof (struct StartMessage),
1831                              &send_start,
1832                              ret);
1833   return ret;
1834 }
1835
1836
1837 /**
1838  * Disconnect from the transport service.
1839  *
1840  * @param handle handle to the service as returned from #GNUNET_TRANSPORT_connect()
1841  */
1842 void
1843 GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle)
1844 {
1845   LOG (GNUNET_ERROR_TYPE_DEBUG,
1846        "Transport disconnect called!\n");
1847   /* this disconnects all neighbours... */
1848   if (NULL == handle->reconnect_task)
1849     disconnect_and_schedule_reconnect (handle);
1850   /* and now we stop trying to connect again... */
1851   if (NULL != handle->reconnect_task)
1852   {
1853     GNUNET_SCHEDULER_cancel (handle->reconnect_task);
1854     handle->reconnect_task = NULL;
1855   }
1856   GNUNET_CONTAINER_multipeermap_destroy (handle->neighbours);
1857   handle->neighbours = NULL;
1858   if (NULL != handle->quota_task)
1859   {
1860     GNUNET_SCHEDULER_cancel (handle->quota_task);
1861     handle->quota_task = NULL;
1862   }
1863   GNUNET_free_non_null (handle->my_hello);
1864   handle->my_hello = NULL;
1865   GNUNET_assert (NULL == handle->tc_head);
1866   GNUNET_assert (NULL == handle->tc_tail);
1867   GNUNET_assert (NULL == handle->hwl_head);
1868   GNUNET_assert (NULL == handle->hwl_tail);
1869   GNUNET_CONTAINER_heap_destroy (handle->ready_heap);
1870   handle->ready_heap = NULL;
1871   GNUNET_free (handle);
1872 }
1873
1874
1875 /**
1876  * Check if we could queue a message of the given size for
1877  * transmission.  The transport service will take both its
1878  * internal buffers and bandwidth limits imposed by the
1879  * other peer into consideration when answering this query.
1880  *
1881  * @param handle connection to transport service
1882  * @param target who should receive the message
1883  * @param size how big is the message we want to transmit?
1884  * @param timeout after how long should we give up (and call
1885  *        notify with buf NULL and size 0)?
1886  * @param notify function to call when we are ready to
1887  *        send such a message
1888  * @param notify_cls closure for @a notify
1889  * @return NULL if someone else is already waiting to be notified
1890  *         non-NULL if the notify callback was queued (can be used to cancel
1891  *         using #GNUNET_TRANSPORT_notify_transmit_ready_cancel)
1892  */
1893 struct GNUNET_TRANSPORT_TransmitHandle *
1894 GNUNET_TRANSPORT_notify_transmit_ready (struct GNUNET_TRANSPORT_Handle *handle,
1895                                         const struct GNUNET_PeerIdentity *target,
1896                                         size_t size,
1897                                         struct GNUNET_TIME_Relative timeout,
1898                                         GNUNET_TRANSPORT_TransmitReadyNotify notify,
1899                                         void *notify_cls)
1900 {
1901   struct Neighbour *n;
1902   struct GNUNET_TRANSPORT_TransmitHandle *th;
1903   struct GNUNET_TIME_Relative delay;
1904
1905   n = neighbour_find (handle, target);
1906   if (NULL == n)
1907   {
1908     /* use GNUNET_TRANSPORT_try_connect first, only use this function
1909      * once a connection has been established */
1910     GNUNET_assert (0);
1911     return NULL;
1912   }
1913   if (NULL != n->th)
1914   {
1915     /* attempt to send two messages at the same time to the same peer */
1916     GNUNET_assert (0);
1917     return NULL;
1918   }
1919   GNUNET_assert (NULL == n->hn);
1920   th = GNUNET_new (struct GNUNET_TRANSPORT_TransmitHandle);
1921   th->neighbour = n;
1922   th->notify = notify;
1923   th->notify_cls = notify_cls;
1924   th->request_start = GNUNET_TIME_absolute_get ();
1925   th->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1926   th->notify_size = size;
1927   n->th = th;
1928   /* calculate when our transmission should be ready */
1929   delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
1930                                               size + n->traffic_overhead);
1931   n->traffic_overhead = 0;
1932   if (delay.rel_value_us > timeout.rel_value_us)
1933     delay.rel_value_us = 0;        /* notify immediately (with failure) */
1934   if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
1935     LOG (GNUNET_ERROR_TYPE_WARNING,
1936          "At bandwidth %u byte/s next transmission to %s in %s\n",
1937          (unsigned int) n->out_tracker.available_bytes_per_s__,
1938          GNUNET_i2s (target),
1939          GNUNET_STRINGS_relative_time_to_string (delay,
1940                                                  GNUNET_YES));
1941   else
1942     LOG (GNUNET_ERROR_TYPE_DEBUG,
1943          "At bandwidth %u byte/s next transmission to %s in %s\n",
1944          (unsigned int) n->out_tracker.available_bytes_per_s__,
1945          GNUNET_i2s (target),
1946          GNUNET_STRINGS_relative_time_to_string (delay,
1947                                                  GNUNET_YES));
1948   n->hn = GNUNET_CONTAINER_heap_insert (handle->ready_heap,
1949                                         n,
1950                                         delay.rel_value_us);
1951   schedule_transmission (handle);
1952   return th;
1953 }
1954
1955
1956 /**
1957  * Cancel the specified transmission-ready notification.
1958  *
1959  * @param th handle returned from #GNUNET_TRANSPORT_notify_transmit_ready()
1960  */
1961 void
1962 GNUNET_TRANSPORT_notify_transmit_ready_cancel (struct GNUNET_TRANSPORT_TransmitHandle *th)
1963 {
1964   struct Neighbour *n;
1965
1966   GNUNET_assert (NULL == th->next);
1967   GNUNET_assert (NULL == th->prev);
1968   n = th->neighbour;
1969   GNUNET_assert (th == n->th);
1970   n->th = NULL;
1971   if (NULL != n->hn)
1972   {
1973     GNUNET_CONTAINER_heap_remove_node (n->hn);
1974     n->hn = NULL;
1975   }
1976   else
1977   {
1978     GNUNET_assert (NULL != th->timeout_task);
1979     GNUNET_SCHEDULER_cancel (th->timeout_task);
1980     th->timeout_task = NULL;
1981   }
1982   GNUNET_free (th);
1983 }
1984
1985
1986 /* end of transport_api.c */