- fix coverity
[oweals/gnunet.git] / src / transport / transport_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009-2013 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file transport/transport_api.c
23  * @brief library to access the low-level P2P IO service
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - test test test
28  */
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_arm_service.h"
33 #include "gnunet_hello_lib.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_transport_service.h"
36 #include "transport.h"
37
38 #define LOG(kind,...) GNUNET_log_from (kind, "transport-api",__VA_ARGS__)
39
40 /**
41  * If we could not send any payload to a peer for this amount of
42  * time, we print a warning.
43  */
44 #define UNREADY_WARN_TIME GNUNET_TIME_UNIT_MINUTES
45
46 /**
47  * How large to start with for the hashmap of neighbours.
48  */
49 #define STARTING_NEIGHBOURS_SIZE 16
50
51 /**
52  * Handle for a message that should be transmitted to the service.
53  * Used for both control messages and normal messages.
54  */
55 struct GNUNET_TRANSPORT_TransmitHandle
56 {
57
58   /**
59    * We keep all requests in a DLL.
60    */
61   struct GNUNET_TRANSPORT_TransmitHandle *next;
62
63   /**
64    * We keep all requests in a DLL.
65    */
66   struct GNUNET_TRANSPORT_TransmitHandle *prev;
67
68   /**
69    * Neighbour for this handle, NULL for control messages.
70    */
71   struct Neighbour *neighbour;
72
73   /**
74    * Function to call when @e notify_size bytes are available
75    * for transmission.
76    */
77   GNUNET_TRANSPORT_TransmitReadyNotify notify;
78
79   /**
80    * Closure for @e notify.
81    */
82   void *notify_cls;
83
84   /**
85    * Time at which this request was originally scheduled.
86    */
87   struct GNUNET_TIME_Absolute request_start;
88
89   /**
90    * Timeout for this request, 0 for control messages.
91    */
92   struct GNUNET_TIME_Absolute timeout;
93
94   /**
95    * Task to trigger request timeout if the request is stalled due to
96    * congestion.
97    */
98   struct GNUNET_SCHEDULER_Task *timeout_task;
99
100   /**
101    * How many bytes is our notify callback waiting for?
102    */
103   size_t notify_size;
104
105 };
106
107
108 /**
109  * Entry in hash table of all of our current (connected) neighbours.
110  */
111 struct Neighbour
112 {
113   /**
114    * Overall transport handle.
115    */
116   struct GNUNET_TRANSPORT_Handle *h;
117
118   /**
119    * Active transmit handle or NULL.
120    */
121   struct GNUNET_TRANSPORT_TransmitHandle *th;
122
123   /**
124    * Identity of this neighbour.
125    */
126   struct GNUNET_PeerIdentity id;
127
128   /**
129    * Outbound bandwidh tracker.
130    */
131   struct GNUNET_BANDWIDTH_Tracker out_tracker;
132
133   /**
134    * Entry in our readyness heap (which is sorted by @e next_ready
135    * value).  NULL if there is no pending transmission request for
136    * this neighbour or if we're waiting for @e is_ready to become
137    * true AFTER the @e out_tracker suggested that this peer's quota
138    * has been satisfied (so once @e is_ready goes to #GNUNET_YES,
139    * we should immediately go back into the heap).
140    */
141   struct GNUNET_CONTAINER_HeapNode *hn;
142
143   /**
144    * Last time when this peer received payload from us.
145    */
146   struct GNUNET_TIME_Absolute last_payload;
147
148   /**
149    * Task to trigger warnings if we do not get SEND_OK after a while.
150    */
151   struct GNUNET_SCHEDULER_Task *unready_warn_task;
152
153   /**
154    * Is this peer currently ready to receive a message?
155    */
156   int is_ready;
157
158   /**
159    * Sending consumed more bytes on wire than payload was announced
160    * This overhead is added to the delay of next sending operation
161    */
162   size_t traffic_overhead;
163 };
164
165
166 /**
167  * Linked list of functions to call whenever our HELLO is updated.
168  */
169 struct GNUNET_TRANSPORT_GetHelloHandle
170 {
171
172   /**
173    * This is a doubly linked list.
174    */
175   struct GNUNET_TRANSPORT_GetHelloHandle *next;
176
177   /**
178    * This is a doubly linked list.
179    */
180   struct GNUNET_TRANSPORT_GetHelloHandle *prev;
181
182   /**
183    * Transport handle.
184    */
185   struct GNUNET_TRANSPORT_Handle *handle;
186
187   /**
188    * Callback to call once we got our HELLO.
189    */
190   GNUNET_TRANSPORT_HelloUpdateCallback rec;
191
192   /**
193    * Task for calling the HelloUpdateCallback when we already have a HELLO
194    */
195   struct GNUNET_SCHEDULER_Task *notify_task;
196
197   /**
198    * Closure for @e rec.
199    */
200   void *rec_cls;
201
202 };
203
204
205 /**
206  * Entry in linked list for all offer-HELLO requests.
207  */
208 struct GNUNET_TRANSPORT_OfferHelloHandle
209 {
210   /**
211    * For the DLL.
212    */
213   struct GNUNET_TRANSPORT_OfferHelloHandle *prev;
214
215   /**
216    * For the DLL.
217    */
218   struct GNUNET_TRANSPORT_OfferHelloHandle *next;
219
220   /**
221    * Transport service handle we use for transmission.
222    */
223   struct GNUNET_TRANSPORT_Handle *th;
224
225   /**
226    * Transmission handle for this request.
227    */
228   struct GNUNET_TRANSPORT_TransmitHandle *tth;
229
230   /**
231    * Function to call once we are done.
232    */
233   GNUNET_SCHEDULER_TaskCallback cont;
234
235   /**
236    * Closure for @e cont
237    */
238   void *cls;
239
240   /**
241    * The HELLO message to be transmitted.
242    */
243   struct GNUNET_MessageHeader *msg;
244 };
245
246
247 /**
248  * Handle for the transport service (includes all of the
249  * state for the transport service).
250  */
251 struct GNUNET_TRANSPORT_Handle
252 {
253
254   /**
255    * Closure for the callbacks.
256    */
257   void *cls;
258
259   /**
260    * Function to call for received data.
261    */
262   GNUNET_TRANSPORT_ReceiveCallback rec;
263
264   /**
265    * function to call on connect events
266    */
267   GNUNET_TRANSPORT_NotifyConnect nc_cb;
268
269   /**
270    * function to call on disconnect events
271    */
272   GNUNET_TRANSPORT_NotifyDisconnect nd_cb;
273
274   /**
275    * function to call on excess bandwidth events
276    */
277   GNUNET_TRANSPORT_NotifyExcessBandwidth neb_cb;
278
279   /**
280    * Head of DLL of control messages.
281    */
282   struct GNUNET_TRANSPORT_TransmitHandle *control_head;
283
284   /**
285    * Tail of DLL of control messages.
286    */
287   struct GNUNET_TRANSPORT_TransmitHandle *control_tail;
288
289   /**
290    * The current HELLO message for this peer.  Updated
291    * whenever transports change their addresses.
292    */
293   struct GNUNET_MessageHeader *my_hello;
294
295   /**
296    * My client connection to the transport service.
297    */
298   struct GNUNET_CLIENT_Connection *client;
299
300   /**
301    * Handle to our registration with the client for notification.
302    */
303   struct GNUNET_CLIENT_TransmitHandle *cth;
304
305   /**
306    * Linked list of pending requests for our HELLO.
307    */
308   struct GNUNET_TRANSPORT_GetHelloHandle *hwl_head;
309
310   /**
311    * Linked list of pending requests for our HELLO.
312    */
313   struct GNUNET_TRANSPORT_GetHelloHandle *hwl_tail;
314
315   /**
316    * Linked list of pending offer HELLO requests head
317    */
318   struct GNUNET_TRANSPORT_OfferHelloHandle *oh_head;
319
320   /**
321    * Linked list of pending offer HELLO requests tail
322    */
323   struct GNUNET_TRANSPORT_OfferHelloHandle *oh_tail;
324
325   /**
326    * My configuration.
327    */
328   const struct GNUNET_CONFIGURATION_Handle *cfg;
329
330   /**
331    * Hash map of the current connected neighbours of this peer.
332    * Maps peer identities to `struct Neighbour` entries.
333    */
334   struct GNUNET_CONTAINER_MultiPeerMap *neighbours;
335
336   /**
337    * Heap sorting peers with pending messages by the timestamps that
338    * specify when we could next send a message to the respective peer.
339    * Excludes control messages (which can always go out immediately).
340    * Maps time stamps to `struct Neighbour` entries.
341    */
342   struct GNUNET_CONTAINER_Heap *ready_heap;
343
344   /**
345    * Peer identity as assumed by this process, or all zeros.
346    */
347   struct GNUNET_PeerIdentity self;
348
349   /**
350    * ID of the task trying to reconnect to the service.
351    */
352   struct GNUNET_SCHEDULER_Task *reconnect_task;
353
354   /**
355    * ID of the task trying to trigger transmission for a peer while
356    * maintaining bandwidth quotas.  In use if there are no control
357    * messages and the smallest entry in the @e ready_heap has a time
358    * stamp in the future.
359    */
360   struct GNUNET_SCHEDULER_Task *quota_task;
361
362   /**
363    * Delay until we try to reconnect.
364    */
365   struct GNUNET_TIME_Relative reconnect_delay;
366
367   /**
368    * Should we check that @e self matches what the service thinks?
369    * (if #GNUNET_NO, then @e self is all zeros!).
370    */
371   int check_self;
372
373   /**
374    * Reconnect in progress
375    */
376   int reconnecting;
377 };
378
379
380 /**
381  * Schedule the task to send one message, either from the control
382  * list or the peer message queues  to the service.
383  *
384  * @param h transport service to schedule a transmission for
385  */
386 static void
387 schedule_transmission (struct GNUNET_TRANSPORT_Handle *h);
388
389
390 /**
391  * Function that will schedule the job that will try
392  * to connect us again to the client.
393  *
394  * @param h transport service to reconnect
395  */
396 static void
397 disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h);
398
399
400 /**
401  * A neighbour has not gotten a SEND_OK in a  while. Print a warning.
402  *
403  * @param cls the `struct Neighbour`
404  */
405 static void
406 do_warn_unready (void *cls)
407 {
408   struct Neighbour *n = cls;
409   struct GNUNET_TIME_Relative delay;
410
411   delay = GNUNET_TIME_absolute_get_duration (n->last_payload);
412   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
413               "Lacking SEND_OK, no payload could be send to %s for %s\n",
414               GNUNET_i2s (&n->id),
415               GNUNET_STRINGS_relative_time_to_string (delay,
416                                                       GNUNET_YES));
417   n->unready_warn_task
418     = GNUNET_SCHEDULER_add_delayed (UNREADY_WARN_TIME,
419                                     &do_warn_unready,
420                                     n);
421 }
422
423
424 /**
425  * Get the neighbour list entry for the given peer
426  *
427  * @param h our context
428  * @param peer peer to look up
429  * @return NULL if no such peer entry exists
430  */
431 static struct Neighbour *
432 neighbour_find (struct GNUNET_TRANSPORT_Handle *h,
433                 const struct GNUNET_PeerIdentity *peer)
434 {
435   return GNUNET_CONTAINER_multipeermap_get (h->neighbours,
436                                             peer);
437 }
438
439
440 /**
441  * The outbound quota has changed in a way that may require
442  * us to reset the timeout.  Update the timeout.
443  *
444  * @param cls the `struct Neighbour` for which the timeout changed
445  */
446 static void
447 outbound_bw_tracker_update (void *cls)
448 {
449   struct Neighbour *n = cls;
450   struct GNUNET_TIME_Relative delay;
451
452   if (NULL == n->hn)
453     return;
454   delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
455                                               n->th->notify_size + n->traffic_overhead);
456   LOG (GNUNET_ERROR_TYPE_DEBUG,
457        "New outbound delay %s us\n",
458        GNUNET_STRINGS_relative_time_to_string (delay,
459                                                GNUNET_NO));
460   GNUNET_CONTAINER_heap_update_cost (n->h->ready_heap,
461       n->hn, delay.rel_value_us);
462   schedule_transmission (n->h);
463 }
464
465
466 /**
467  * Function called by the bandwidth tracker if we have excess
468  * bandwidth.
469  *
470  * @param cls the `struct Neighbour` that has excess bandwidth
471  */
472 static void
473 notify_excess_cb (void *cls)
474 {
475   struct Neighbour *n = cls;
476   struct GNUNET_TRANSPORT_Handle *h = n->h;
477
478   if (NULL != h->neb_cb)
479     h->neb_cb (h->cls,
480                &n->id);
481 }
482
483
484 /**
485  * Add neighbour to our list
486  *
487  * @return NULL if this API is currently disconnecting from the service
488  */
489 static struct Neighbour *
490 neighbour_add (struct GNUNET_TRANSPORT_Handle *h,
491                const struct GNUNET_PeerIdentity *pid)
492 {
493   struct Neighbour *n;
494
495   LOG (GNUNET_ERROR_TYPE_DEBUG,
496        "Creating entry for neighbour `%s'.\n",
497        GNUNET_i2s (pid));
498   n = GNUNET_new (struct Neighbour);
499   n->id = *pid;
500   n->h = h;
501   n->is_ready = GNUNET_YES;
502   n->traffic_overhead = 0;
503   GNUNET_BANDWIDTH_tracker_init2 (&n->out_tracker,
504                                   &outbound_bw_tracker_update,
505                                   n,
506                                   GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
507                                   MAX_BANDWIDTH_CARRY_S,
508                                   &notify_excess_cb,
509                                   n);
510   GNUNET_assert (GNUNET_OK ==
511                  GNUNET_CONTAINER_multipeermap_put (h->neighbours,
512                                                     &n->id,
513                                                     n,
514                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
515   return n;
516 }
517
518
519 /**
520  * Iterator over hash map entries, for deleting state of a neighbour.
521  *
522  * @param cls the `struct GNUNET_TRANSPORT_Handle *`
523  * @param key peer identity
524  * @param value value in the hash map, the neighbour entry to delete
525  * @return #GNUNET_YES if we should continue to
526  *         iterate,
527  *         #GNUNET_NO if not.
528  */
529 static int
530 neighbour_delete (void *cls,
531                   const struct GNUNET_PeerIdentity *key,
532                   void *value)
533 {
534   struct GNUNET_TRANSPORT_Handle *handle = cls;
535   struct Neighbour *n = value;
536
537   LOG (GNUNET_ERROR_TYPE_DEBUG,
538        "Dropping entry for neighbour `%s'.\n",
539        GNUNET_i2s (key));
540   GNUNET_BANDWIDTH_tracker_notification_stop (&n->out_tracker);
541   if (NULL != handle->nd_cb)
542     handle->nd_cb (handle->cls,
543                    &n->id);
544   if (NULL != n->unready_warn_task)
545   {
546     GNUNET_SCHEDULER_cancel (n->unready_warn_task);
547     n->unready_warn_task = NULL;
548   }
549   GNUNET_assert (NULL == n->th);
550   GNUNET_assert (NULL == n->hn);
551   GNUNET_assert (GNUNET_YES ==
552                  GNUNET_CONTAINER_multipeermap_remove (handle->neighbours,
553                                                        key,
554                                                        n));
555   GNUNET_free (n);
556   return GNUNET_YES;
557 }
558
559
560 /**
561  * Function we use for handling incoming messages.
562  *
563  * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
564  * @param msg message received, NULL on timeout or fatal error
565  */
566 static void
567 demultiplexer (void *cls,
568                const struct GNUNET_MessageHeader *msg)
569 {
570   struct GNUNET_TRANSPORT_Handle *h = cls;
571   const struct DisconnectInfoMessage *dim;
572   const struct ConnectInfoMessage *cim;
573   const struct InboundMessage *im;
574   const struct GNUNET_MessageHeader *imm;
575   const struct SendOkMessage *okm;
576   const struct QuotaSetMessage *qm;
577   struct GNUNET_TRANSPORT_GetHelloHandle *hwl;
578   struct GNUNET_TRANSPORT_GetHelloHandle *next_hwl;
579   struct Neighbour *n;
580   struct GNUNET_PeerIdentity me;
581   uint16_t size;
582   uint32_t bytes_msg;
583   uint32_t bytes_physical;
584
585   GNUNET_assert (NULL != h->client);
586   if (GNUNET_YES == h->reconnecting)
587   {
588     return;
589   }
590   if (NULL == msg)
591   {
592     LOG (GNUNET_ERROR_TYPE_DEBUG,
593          "Error receiving from transport service, disconnecting temporarily.\n");
594     h->reconnecting = GNUNET_YES;
595     disconnect_and_schedule_reconnect (h);
596     return;
597   }
598   GNUNET_CLIENT_receive (h->client,
599                          &demultiplexer,
600                          h,
601                          GNUNET_TIME_UNIT_FOREVER_REL);
602   size = ntohs (msg->size);
603   switch (ntohs (msg->type))
604   {
605   case GNUNET_MESSAGE_TYPE_HELLO:
606     if (GNUNET_OK !=
607         GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg,
608                              &me))
609     {
610       GNUNET_break (0);
611       break;
612     }
613     LOG (GNUNET_ERROR_TYPE_DEBUG,
614          "Receiving (my own) HELLO message (%u bytes), I am `%s'.\n",
615          (unsigned int) size,
616          GNUNET_i2s (&me));
617     GNUNET_free_non_null (h->my_hello);
618     h->my_hello = NULL;
619     if (size < sizeof (struct GNUNET_MessageHeader))
620     {
621       GNUNET_break (0);
622       break;
623     }
624     h->my_hello = GNUNET_copy_message (msg);
625     hwl = h->hwl_head;
626     while (NULL != hwl)
627     {
628       next_hwl = hwl->next;
629       hwl->rec (hwl->rec_cls,
630                 h->my_hello);
631       hwl = next_hwl;
632     }
633     break;
634   case GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT:
635     if (size < sizeof (struct ConnectInfoMessage))
636     {
637       GNUNET_break (0);
638       h->reconnecting = GNUNET_YES;
639       disconnect_and_schedule_reconnect (h);
640       break;
641     }
642     cim = (const struct ConnectInfoMessage *) msg;
643     if (size !=
644         sizeof (struct ConnectInfoMessage))
645     {
646       GNUNET_break (0);
647       h->reconnecting = GNUNET_YES;
648       disconnect_and_schedule_reconnect (h);
649       break;
650     }
651     LOG (GNUNET_ERROR_TYPE_DEBUG,
652          "Receiving CONNECT message for `%s'.\n",
653          GNUNET_i2s (&cim->id));
654     n = neighbour_find (h, &cim->id);
655     if (NULL != n)
656     {
657       GNUNET_break (0);
658       h->reconnecting = GNUNET_YES;
659       disconnect_and_schedule_reconnect (h);
660       break;
661     }
662     n = neighbour_add (h,
663                        &cim->id);
664     LOG (GNUNET_ERROR_TYPE_DEBUG,
665          "Receiving CONNECT message for `%s' with quota %u\n",
666          GNUNET_i2s (&cim->id),
667          ntohl (cim->quota_out.value__));
668     GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
669                                            cim->quota_out);
670     if (NULL != h->nc_cb)
671       h->nc_cb (h->cls,
672                 &n->id);
673     break;
674   case GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT:
675     if (size != sizeof (struct DisconnectInfoMessage))
676     {
677       GNUNET_break (0);
678       h->reconnecting = GNUNET_YES;
679       disconnect_and_schedule_reconnect (h);
680       break;
681     }
682     dim = (const struct DisconnectInfoMessage *) msg;
683     GNUNET_break (ntohl (dim->reserved) == 0);
684     LOG (GNUNET_ERROR_TYPE_DEBUG,
685          "Receiving DISCONNECT message for `%s'.\n",
686          GNUNET_i2s (&dim->peer));
687     n = neighbour_find (h, &dim->peer);
688     if (NULL == n)
689     {
690       GNUNET_break (0);
691       h->reconnecting = GNUNET_YES;
692       disconnect_and_schedule_reconnect (h);
693       break;
694     }
695     neighbour_delete (h,
696                       &dim->peer,
697                       n);
698     break;
699   case GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK:
700     if (size != sizeof (struct SendOkMessage))
701     {
702       GNUNET_break (0);
703       h->reconnecting = GNUNET_YES;
704       disconnect_and_schedule_reconnect (h);
705       break;
706     }
707     okm = (const struct SendOkMessage *) msg;
708     bytes_msg = ntohl (okm->bytes_msg);
709     bytes_physical = ntohl (okm->bytes_physical);
710     LOG (GNUNET_ERROR_TYPE_DEBUG,
711          "Receiving SEND_OK message, transmission to %s %s.\n",
712          GNUNET_i2s (&okm->peer),
713          ntohl (okm->success) == GNUNET_OK ? "succeeded" : "failed");
714
715     n = neighbour_find (h,
716                         &okm->peer);
717     if (NULL == n)
718     {
719       /* We should never get a 'SEND_OK' for a peer that we are not
720          connected to */
721       GNUNET_break (0);
722       h->reconnecting = GNUNET_YES;
723       disconnect_and_schedule_reconnect (h);
724       break;
725     }
726     if (bytes_physical > bytes_msg)
727     {
728       LOG (GNUNET_ERROR_TYPE_DEBUG,
729            "Overhead for %u byte message was %u\n",
730            bytes_msg,
731            bytes_physical - bytes_msg);
732       n->traffic_overhead += bytes_physical - bytes_msg;
733     }
734     GNUNET_break (GNUNET_NO == n->is_ready);
735     n->is_ready = GNUNET_YES;
736     if (NULL != n->unready_warn_task)
737     {
738       GNUNET_SCHEDULER_cancel (n->unready_warn_task);
739       n->unready_warn_task = NULL;
740     }
741     if ((NULL != n->th) && (NULL == n->hn))
742     {
743       GNUNET_assert (NULL != n->th->timeout_task);
744       GNUNET_SCHEDULER_cancel (n->th->timeout_task);
745       n->th->timeout_task = NULL;
746       /* we've been waiting for this (congestion, not quota,
747        * caused delayed transmission) */
748       n->hn = GNUNET_CONTAINER_heap_insert (h->ready_heap,
749                                             n,
750                                             0);
751     }
752     schedule_transmission (h);
753     break;
754   case GNUNET_MESSAGE_TYPE_TRANSPORT_RECV:
755     if (size <
756         sizeof (struct InboundMessage) + sizeof (struct GNUNET_MessageHeader))
757     {
758       GNUNET_break (0);
759       h->reconnecting = GNUNET_YES;
760       disconnect_and_schedule_reconnect (h);
761       break;
762     }
763     im = (const struct InboundMessage *) msg;
764     imm = (const struct GNUNET_MessageHeader *) &im[1];
765     if (ntohs (imm->size) + sizeof (struct InboundMessage) != size)
766     {
767       GNUNET_break (0);
768       h->reconnecting = GNUNET_YES;
769       disconnect_and_schedule_reconnect (h);
770       break;
771     }
772     LOG (GNUNET_ERROR_TYPE_DEBUG,
773          "Received message of type %u with %u bytes from `%s'.\n",
774          (unsigned int) ntohs (imm->type),
775          (unsigned int) ntohs (imm->size),
776          GNUNET_i2s (&im->peer));
777     n = neighbour_find (h, &im->peer);
778     if (NULL == n)
779     {
780       GNUNET_break (0);
781       h->reconnecting = GNUNET_YES;
782       disconnect_and_schedule_reconnect (h);
783       break;
784     }
785     if (NULL != h->rec)
786       h->rec (h->cls,
787               &im->peer,
788               imm);
789     break;
790   case GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA:
791     if (size != sizeof (struct QuotaSetMessage))
792     {
793       GNUNET_break (0);
794       h->reconnecting = GNUNET_YES;
795       disconnect_and_schedule_reconnect (h);
796       break;
797     }
798     qm = (const struct QuotaSetMessage *) msg;
799     n = neighbour_find (h, &qm->peer);
800     if (NULL == n)
801     {
802       GNUNET_break (0);
803       h->reconnecting = GNUNET_YES;
804       disconnect_and_schedule_reconnect (h);
805       break;
806     }
807     LOG (GNUNET_ERROR_TYPE_DEBUG,
808          "Receiving SET_QUOTA message for `%s' with quota %u\n",
809          GNUNET_i2s (&qm->peer),
810          ntohl (qm->quota.value__));
811     GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
812                                            qm->quota);
813     break;
814   default:
815     LOG (GNUNET_ERROR_TYPE_ERROR,
816          _("Received unexpected message of type %u in %s:%u\n"),
817          ntohs (msg->type),
818          __FILE__,
819          __LINE__);
820     GNUNET_break (0);
821     break;
822   }
823 }
824
825
826 /**
827  * A transmission request could not be satisfied because of
828  * network congestion.  Notify the initiator and clean up.
829  *
830  * @param cls the `struct GNUNET_TRANSPORT_TransmitHandle`
831  */
832 static void
833 timeout_request_due_to_congestion (void *cls)
834 {
835   struct GNUNET_TRANSPORT_TransmitHandle *th = cls;
836   struct Neighbour *n = th->neighbour;
837   struct GNUNET_TIME_Relative delay;
838
839   n->th->timeout_task = NULL;
840   delay = GNUNET_TIME_absolute_get_duration (th->request_start);
841   LOG (GNUNET_ERROR_TYPE_WARNING,
842        "Discarding %u bytes of payload message after %s delay due to congestion\n",
843        th->notify_size,
844        GNUNET_STRINGS_relative_time_to_string (delay,
845                                                GNUNET_YES));
846   GNUNET_assert (th == n->th);
847   GNUNET_assert (NULL == n->hn);
848   n->th = NULL;
849   th->notify (th->notify_cls,
850               0,
851               NULL);
852   GNUNET_free (th);
853 }
854
855
856 /**
857  * Transmit message(s) to service.
858  *
859  * @param cls handle to transport
860  * @param size number of bytes available in @a buf
861  * @param buf where to copy the message
862  * @return number of bytes copied to @a buf
863  */
864 static size_t
865 transport_notify_ready (void *cls,
866                         size_t size,
867                         void *buf)
868 {
869   struct GNUNET_TRANSPORT_Handle *h = cls;
870   struct GNUNET_TRANSPORT_TransmitHandle *th;
871   struct GNUNET_TIME_Relative delay;
872   struct Neighbour *n;
873   char *cbuf;
874   struct OutboundMessage obm;
875   size_t ret;
876   size_t nret;
877   size_t mret;
878
879   GNUNET_assert (NULL != h->client);
880   h->cth = NULL;
881   if (NULL == buf)
882   {
883     /* transmission failed */
884     disconnect_and_schedule_reconnect (h);
885     return 0;
886   }
887
888   cbuf = buf;
889   ret = 0;
890   /* first send control messages */
891   while ( (NULL != (th = h->control_head)) &&
892           (th->notify_size <= size) )
893   {
894     GNUNET_CONTAINER_DLL_remove (h->control_head,
895                                  h->control_tail,
896                                  th);
897     nret = th->notify (th->notify_cls,
898                        size,
899                        &cbuf[ret]);
900     delay = GNUNET_TIME_absolute_get_duration (th->request_start);
901     if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
902       LOG (GNUNET_ERROR_TYPE_WARNING,
903            "Added %u bytes of control message at %u after %s delay\n",
904            nret,
905            ret,
906            GNUNET_STRINGS_relative_time_to_string (delay,
907                                                    GNUNET_YES));
908     else
909       LOG (GNUNET_ERROR_TYPE_DEBUG,
910            "Added %u bytes of control message at %u after %s delay\n",
911            nret,
912            ret,
913            GNUNET_STRINGS_relative_time_to_string (delay,
914                                                    GNUNET_YES));
915     GNUNET_free (th);
916     ret += nret;
917     size -= nret;
918   }
919
920   /* then, if possible and no control messages pending, send data messages */
921   while ( (NULL == h->control_head) &&
922           (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap))) )
923   {
924     if (GNUNET_YES != n->is_ready)
925     {
926       /* peer not ready, wait for notification! */
927       GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap));
928       n->hn = NULL;
929       GNUNET_assert (NULL == n->th->timeout_task);
930       n->th->timeout_task
931         = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining
932                                         (n->th->timeout),
933                                         &timeout_request_due_to_congestion,
934                                         n->th);
935       continue;
936     }
937     th = n->th;
938     if (th->notify_size + sizeof (struct OutboundMessage) > size)
939       break;                    /* does not fit */
940     if (GNUNET_BANDWIDTH_tracker_get_delay
941         (&n->out_tracker,
942          th->notify_size).rel_value_us > 0)
943       break;                    /* too early */
944     GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap));
945     n->hn = NULL;
946     n->th = NULL;
947     GNUNET_assert (size >= sizeof (struct OutboundMessage));
948     mret = th->notify (th->notify_cls,
949                        size - sizeof (struct OutboundMessage),
950                        &cbuf[ret + sizeof (struct OutboundMessage)]);
951     GNUNET_assert (mret <= size - sizeof (struct OutboundMessage));
952     if (0 == mret)
953     {
954       GNUNET_free (th);
955       continue;
956     }
957     if (NULL != n->unready_warn_task)
958       n->unready_warn_task
959         = GNUNET_SCHEDULER_add_delayed (UNREADY_WARN_TIME,
960                                         &do_warn_unready,
961                                         n);
962     n->last_payload = GNUNET_TIME_absolute_get ();
963     n->is_ready = GNUNET_NO;
964     GNUNET_assert (mret + sizeof (struct OutboundMessage) <
965                    GNUNET_SERVER_MAX_MESSAGE_SIZE);
966     obm.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND);
967     obm.header.size = htons (mret + sizeof (struct OutboundMessage));
968     obm.reserved = htonl (0);
969     obm.timeout =
970       GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining
971                                  (th->timeout));
972     obm.peer = n->id;
973     memcpy (&cbuf[ret],
974             &obm,
975             sizeof (struct OutboundMessage));
976     ret += (mret + sizeof (struct OutboundMessage));
977     size -= (mret + sizeof (struct OutboundMessage));
978     GNUNET_BANDWIDTH_tracker_consume (&n->out_tracker,
979                                       mret);
980     delay = GNUNET_TIME_absolute_get_duration (th->request_start);
981     if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
982       LOG (GNUNET_ERROR_TYPE_WARNING,
983            "Added %u bytes of payload message for %s after %s delay at %u b/s\n",
984            mret,
985            GNUNET_i2s (&n->id),
986            GNUNET_STRINGS_relative_time_to_string (delay,
987                                                    GNUNET_YES),
988            (unsigned int) n->out_tracker.available_bytes_per_s__);
989     else
990       LOG (GNUNET_ERROR_TYPE_DEBUG,
991            "Added %u bytes of payload message for %s after %s delay at %u b/s\n",
992            mret,
993            GNUNET_i2s (&n->id),
994            GNUNET_STRINGS_relative_time_to_string (delay,
995                                                    GNUNET_YES),
996            (unsigned int) n->out_tracker.available_bytes_per_s__);
997     GNUNET_free (th);
998     break;
999   }
1000   /* if there are more pending messages, try to schedule those */
1001   schedule_transmission (h);
1002   LOG (GNUNET_ERROR_TYPE_DEBUG,
1003        "Transmitting %u bytes to transport service\n",
1004        ret);
1005   return ret;
1006 }
1007
1008
1009 /**
1010  * Schedule the task to send one message, either from the control
1011  * list or the peer message queues  to the service.
1012  *
1013  * @param cls transport service to schedule a transmission for
1014  */
1015 static void
1016 schedule_transmission_task (void *cls)
1017 {
1018   struct GNUNET_TRANSPORT_Handle *h = cls;
1019   size_t size;
1020   struct GNUNET_TRANSPORT_TransmitHandle *th;
1021   struct Neighbour *n;
1022
1023   h->quota_task = NULL;
1024   GNUNET_assert (NULL != h->client);
1025   /* destroy all requests that have timed out */
1026   while ( (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap))) &&
1027           (0 == GNUNET_TIME_absolute_get_remaining (n->th->timeout).rel_value_us) )
1028   {
1029     /* notify client that the request could not be satisfied within
1030      * the given time constraints */
1031     th = n->th;
1032     n->th = NULL;
1033     GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap));
1034     n->hn = NULL;
1035     LOG (GNUNET_ERROR_TYPE_DEBUG,
1036          "Signalling timeout for transmission to peer %s due to congestion\n",
1037          GNUNET_i2s (&n->id));
1038     GNUNET_assert (0 == th->notify (th->notify_cls,
1039                                     0,
1040                                     NULL));
1041     GNUNET_free (th);
1042   }
1043   if (NULL != h->cth)
1044     return;
1045   if (NULL != h->control_head)
1046   {
1047     size = h->control_head->notify_size;
1048   }
1049   else
1050   {
1051     n = GNUNET_CONTAINER_heap_peek (h->ready_heap);
1052     if (NULL == n)
1053       return;                   /* no pending messages */
1054     size = n->th->notify_size + sizeof (struct OutboundMessage);
1055   }
1056   LOG (GNUNET_ERROR_TYPE_DEBUG,
1057        "Calling notify_transmit_ready\n");
1058   h->cth
1059     = GNUNET_CLIENT_notify_transmit_ready (h->client,
1060                                            size,
1061                                            GNUNET_TIME_UNIT_FOREVER_REL,
1062                                            GNUNET_NO,
1063                                            &transport_notify_ready,
1064                                            h);
1065   GNUNET_assert (NULL != h->cth);
1066 }
1067
1068
1069 /**
1070  * Schedule the task to send one message, either from the control
1071  * list or the peer message queues  to the service.
1072  *
1073  * @param h transport service to schedule a transmission for
1074  */
1075 static void
1076 schedule_transmission (struct GNUNET_TRANSPORT_Handle *h)
1077 {
1078   struct GNUNET_TIME_Relative delay;
1079   struct Neighbour *n;
1080
1081   GNUNET_assert (NULL != h->client);
1082   if (NULL != h->quota_task)
1083   {
1084     GNUNET_SCHEDULER_cancel (h->quota_task);
1085     h->quota_task = NULL;
1086   }
1087   if (NULL != h->control_head)
1088     delay = GNUNET_TIME_UNIT_ZERO;
1089   else if (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap)))
1090   {
1091     delay =
1092         GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
1093                                             n->th->notify_size + n->traffic_overhead);
1094     n->traffic_overhead = 0;
1095   }
1096   else
1097   {
1098     LOG (GNUNET_ERROR_TYPE_DEBUG,
1099          "No work to be done, not scheduling transmission.\n");
1100     return;                     /* no work to be done */
1101   }
1102   LOG (GNUNET_ERROR_TYPE_DEBUG,
1103        "Scheduling next transmission to service in %s\n",
1104        GNUNET_STRINGS_relative_time_to_string (delay,
1105                                                GNUNET_YES));
1106   h->quota_task =
1107       GNUNET_SCHEDULER_add_delayed (delay,
1108                                     &schedule_transmission_task,
1109                                     h);
1110 }
1111
1112
1113 /**
1114  * Queue control request for transmission to the transport
1115  * service.
1116  *
1117  * @param h handle to the transport service
1118  * @param size number of bytes to be transmitted
1119  * @param notify function to call to get the content
1120  * @param notify_cls closure for @a notify
1121  * @return a `struct GNUNET_TRANSPORT_TransmitHandle`
1122  */
1123 static struct GNUNET_TRANSPORT_TransmitHandle *
1124 schedule_control_transmit (struct GNUNET_TRANSPORT_Handle *h,
1125                            size_t size,
1126                            GNUNET_TRANSPORT_TransmitReadyNotify notify,
1127                            void *notify_cls)
1128 {
1129   struct GNUNET_TRANSPORT_TransmitHandle *th;
1130
1131   LOG (GNUNET_ERROR_TYPE_DEBUG,
1132        "Control transmit of %u bytes requested\n",
1133        size);
1134   th = GNUNET_new (struct GNUNET_TRANSPORT_TransmitHandle);
1135   th->notify = notify;
1136   th->notify_cls = notify_cls;
1137   th->notify_size = size;
1138   th->request_start = GNUNET_TIME_absolute_get ();
1139   GNUNET_CONTAINER_DLL_insert_tail (h->control_head,
1140                                     h->control_tail,
1141                                     th);
1142   schedule_transmission (h);
1143   return th;
1144 }
1145
1146
1147 /**
1148  * Transmit START message to service.
1149  *
1150  * @param cls unused
1151  * @param size number of bytes available in @a buf
1152  * @param buf where to copy the message
1153  * @return number of bytes copied to @a buf
1154  */
1155 static size_t
1156 send_start (void *cls,
1157             size_t size,
1158             void *buf)
1159 {
1160   struct GNUNET_TRANSPORT_Handle *h = cls;
1161   struct StartMessage s;
1162   uint32_t options;
1163
1164   if (NULL == buf)
1165   {
1166     /* Can only be shutdown, just give up */
1167     LOG (GNUNET_ERROR_TYPE_DEBUG,
1168          "Shutdown while trying to transmit START request.\n");
1169     return 0;
1170   }
1171   LOG (GNUNET_ERROR_TYPE_DEBUG,
1172        "Transmitting START request.\n");
1173   GNUNET_assert (size >= sizeof (struct StartMessage));
1174   s.header.size = htons (sizeof (struct StartMessage));
1175   s.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_START);
1176   options = 0;
1177   if (h->check_self)
1178     options |= 1;
1179   if (NULL != h->rec)
1180     options |= 2;
1181   s.options = htonl (options);
1182   s.self = h->self;
1183   memcpy (buf, &s, sizeof (struct StartMessage));
1184   GNUNET_CLIENT_receive (h->client, &demultiplexer, h,
1185                          GNUNET_TIME_UNIT_FOREVER_REL);
1186   return sizeof (struct StartMessage);
1187 }
1188
1189
1190 /**
1191  * Try again to connect to transport service.
1192  *
1193  * @param cls the handle to the transport service
1194  */
1195 static void
1196 reconnect (void *cls)
1197 {
1198   struct GNUNET_TRANSPORT_Handle *h = cls;
1199
1200   h->reconnect_task = NULL;
1201   LOG (GNUNET_ERROR_TYPE_DEBUG,
1202        "Connecting to transport service.\n");
1203   GNUNET_assert (NULL == h->client);
1204   GNUNET_assert (NULL == h->control_head);
1205   GNUNET_assert (NULL == h->control_tail);
1206   h->reconnecting = GNUNET_NO;
1207   h->client = GNUNET_CLIENT_connect ("transport", h->cfg);
1208
1209   GNUNET_assert (NULL != h->client);
1210   schedule_control_transmit (h, sizeof (struct StartMessage),
1211                              &send_start, h);
1212 }
1213
1214
1215 /**
1216  * Function that will schedule the job that will try
1217  * to connect us again to the client.
1218  *
1219  * @param h transport service to reconnect
1220  */
1221 static void
1222 disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h)
1223 {
1224   struct GNUNET_TRANSPORT_TransmitHandle *th;
1225
1226   GNUNET_assert (NULL == h->reconnect_task);
1227   if (NULL != h->cth)
1228   {
1229     GNUNET_CLIENT_notify_transmit_ready_cancel (h->cth);
1230     h->cth = NULL;
1231   }
1232   if (NULL != h->client)
1233   {
1234     GNUNET_CLIENT_disconnect (h->client);
1235     h->client = NULL;
1236 /*    LOG (GNUNET_ERROR_TYPE_ERROR,
1237          "Client disconnect done \n");*/
1238   }
1239   /* Forget about all neighbours that we used to be connected to */
1240   GNUNET_CONTAINER_multipeermap_iterate (h->neighbours,
1241                                          &neighbour_delete,
1242                                          h);
1243   if (NULL != h->quota_task)
1244   {
1245     GNUNET_SCHEDULER_cancel (h->quota_task);
1246     h->quota_task = NULL;
1247   }
1248   while ((NULL != (th = h->control_head)))
1249   {
1250     GNUNET_CONTAINER_DLL_remove (h->control_head,
1251                                  h->control_tail,
1252                                  th);
1253     th->notify (th->notify_cls,
1254                 0,
1255                 NULL);
1256     GNUNET_free (th);
1257   }
1258   LOG (GNUNET_ERROR_TYPE_DEBUG,
1259        "Scheduling task to reconnect to transport service in %s.\n",
1260        GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay,
1261                                                GNUNET_YES));
1262   h->reconnect_task =
1263       GNUNET_SCHEDULER_add_delayed (h->reconnect_delay,
1264                                     &reconnect,
1265                                     h);
1266   h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
1267 }
1268
1269
1270 /**
1271  * Cancel control request for transmission to the transport service.
1272  *
1273  * @param th handle to the transport service
1274  * @param tth transmit handle to cancel
1275  */
1276 static void
1277 cancel_control_transmit (struct GNUNET_TRANSPORT_Handle *th,
1278                          struct GNUNET_TRANSPORT_TransmitHandle *tth)
1279 {
1280   LOG (GNUNET_ERROR_TYPE_DEBUG,
1281        "Canceling transmit of contral transmission requested\n");
1282   GNUNET_CONTAINER_DLL_remove (th->control_head,
1283                                th->control_tail,
1284                                tth);
1285   GNUNET_free (tth);
1286 }
1287
1288
1289 /**
1290  * Send HELLO message to the service.
1291  *
1292  * @param cls the HELLO message to send
1293  * @param size number of bytes available in @a buf
1294  * @param buf where to copy the message
1295  * @return number of bytes copied to @a buf
1296  */
1297 static size_t
1298 send_hello (void *cls,
1299             size_t size,
1300             void *buf)
1301 {
1302   struct GNUNET_TRANSPORT_OfferHelloHandle *ohh = cls;
1303   struct GNUNET_MessageHeader *msg = ohh->msg;
1304   uint16_t ssize;
1305
1306   if (NULL == buf)
1307   {
1308     LOG (GNUNET_ERROR_TYPE_DEBUG,
1309          "Timeout while trying to transmit `%s' request.\n",
1310          "HELLO");
1311     if (NULL != ohh->cont)
1312       ohh->cont (ohh->cls);
1313     GNUNET_free (msg);
1314     GNUNET_CONTAINER_DLL_remove (ohh->th->oh_head,
1315                                  ohh->th->oh_tail,
1316                                  ohh);
1317     GNUNET_free (ohh);
1318     return 0;
1319   }
1320   LOG (GNUNET_ERROR_TYPE_DEBUG,
1321        "Transmitting `%s' request.\n",
1322        "HELLO");
1323   ssize = ntohs (msg->size);
1324   GNUNET_assert (size >= ssize);
1325   memcpy (buf,
1326           msg,
1327           ssize);
1328   GNUNET_free (msg);
1329   if (NULL != ohh->cont)
1330     ohh->cont (ohh->cls);
1331   GNUNET_CONTAINER_DLL_remove (ohh->th->oh_head,
1332                                ohh->th->oh_tail,
1333                                ohh);
1334   GNUNET_free (ohh);
1335   return ssize;
1336 }
1337
1338
1339 /**
1340  * Send traffic metric message to the service.
1341  *
1342  * @param cls the message to send
1343  * @param size number of bytes available in @a buf
1344  * @param buf where to copy the message
1345  * @return number of bytes copied to @a buf
1346  */
1347 static size_t
1348 send_metric (void *cls,
1349              size_t size,
1350              void *buf)
1351 {
1352   struct TrafficMetricMessage *msg = cls;
1353   uint16_t ssize;
1354
1355   if (NULL == buf)
1356   {
1357     LOG (GNUNET_ERROR_TYPE_DEBUG,
1358          "Timeout while trying to transmit TRAFFIC_METRIC request.\n");
1359     GNUNET_free (msg);
1360     return 0;
1361   }
1362   LOG (GNUNET_ERROR_TYPE_DEBUG,
1363        "Transmitting TRAFFIC_METRIC request.\n");
1364   ssize = ntohs (msg->header.size);
1365   GNUNET_assert (size >= ssize);
1366   memcpy (buf, msg, ssize);
1367   GNUNET_free (msg);
1368   return ssize;
1369 }
1370
1371
1372 /**
1373  * Set transport metrics for a peer and a direction
1374  *
1375  * @param handle transport handle
1376  * @param peer the peer to set the metric for
1377  * @param prop the performance metrics to set
1378  * @param delay_in inbound delay to introduce
1379  * @param delay_out outbound delay to introduce
1380  *
1381  * Note: Delay restrictions in receiving direction will be enforced
1382  * with one message delay.
1383  */
1384 void
1385 GNUNET_TRANSPORT_set_traffic_metric (struct GNUNET_TRANSPORT_Handle *handle,
1386                                      const struct GNUNET_PeerIdentity *peer,
1387                                      const struct GNUNET_ATS_Properties *prop,
1388                                      struct GNUNET_TIME_Relative delay_in,
1389                                      struct GNUNET_TIME_Relative delay_out)
1390 {
1391   struct TrafficMetricMessage *msg;
1392
1393   msg = GNUNET_new (struct TrafficMetricMessage);
1394   msg->header.size = htons (sizeof (struct TrafficMetricMessage));
1395   msg->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_TRAFFIC_METRIC);
1396   msg->reserved = htonl (0);
1397   msg->peer = *peer;
1398   GNUNET_ATS_properties_hton (&msg->properties,
1399                               prop);
1400   msg->delay_in = GNUNET_TIME_relative_hton (delay_in);
1401   msg->delay_out = GNUNET_TIME_relative_hton (delay_out);
1402   schedule_control_transmit (handle,
1403                              sizeof (struct TrafficMetricMessage),
1404                              &send_metric,
1405                              msg);
1406 }
1407
1408
1409 /**
1410  * Offer the transport service the HELLO of another peer.  Note that
1411  * the transport service may just ignore this message if the HELLO is
1412  * malformed or useless due to our local configuration.
1413  *
1414  * @param handle connection to transport service
1415  * @param hello the hello message
1416  * @param cont continuation to call when HELLO has been sent,
1417  *      tc reason #GNUNET_SCHEDULER_REASON_TIMEOUT for fail
1418  *      tc reasong #GNUNET_SCHEDULER_REASON_READ_READY for success
1419  * @param cont_cls closure for @a cont
1420  * @return a `struct GNUNET_TRANSPORT_OfferHelloHandle` handle or NULL on failure,
1421  *      in case of failure @a cont will not be called
1422  *
1423  */
1424 struct GNUNET_TRANSPORT_OfferHelloHandle *
1425 GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle,
1426                               const struct GNUNET_MessageHeader *hello,
1427                               GNUNET_SCHEDULER_TaskCallback cont,
1428                               void *cont_cls)
1429 {
1430   struct GNUNET_TRANSPORT_OfferHelloHandle *ohh;
1431   struct GNUNET_MessageHeader *msg;
1432   struct GNUNET_PeerIdentity peer;
1433   uint16_t size;
1434
1435   if (NULL == handle->client)
1436     return NULL;
1437   GNUNET_break (ntohs (hello->type) == GNUNET_MESSAGE_TYPE_HELLO);
1438   size = ntohs (hello->size);
1439   GNUNET_break (size >= sizeof (struct GNUNET_MessageHeader));
1440   if (GNUNET_OK !=
1441       GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) hello,
1442                            &peer))
1443   {
1444     GNUNET_break (0);
1445     return NULL;
1446   }
1447
1448   msg = GNUNET_malloc (size);
1449   memcpy (msg, hello, size);
1450   LOG (GNUNET_ERROR_TYPE_DEBUG,
1451        "Offering HELLO message of `%s' to transport for validation.\n",
1452        GNUNET_i2s (&peer));
1453
1454   ohh = GNUNET_new (struct GNUNET_TRANSPORT_OfferHelloHandle);
1455   ohh->th = handle;
1456   ohh->cont = cont;
1457   ohh->cls = cont_cls;
1458   ohh->msg = msg;
1459   ohh->tth = schedule_control_transmit (handle,
1460                                         size,
1461                                         &send_hello,
1462                                         ohh);
1463   GNUNET_CONTAINER_DLL_insert (handle->oh_head,
1464                                handle->oh_tail,
1465                                ohh);
1466   return ohh;
1467 }
1468
1469
1470 /**
1471  * Cancel the request to transport to offer the HELLO message
1472  *
1473  * @param ohh the handle for the operation to cancel
1474  */
1475 void
1476 GNUNET_TRANSPORT_offer_hello_cancel (struct GNUNET_TRANSPORT_OfferHelloHandle *ohh)
1477 {
1478   struct GNUNET_TRANSPORT_Handle *th = ohh->th;
1479
1480   cancel_control_transmit (ohh->th, ohh->tth);
1481   GNUNET_CONTAINER_DLL_remove (th->oh_head,
1482                                th->oh_tail,
1483                                ohh);
1484   GNUNET_free (ohh->msg);
1485   GNUNET_free (ohh);
1486 }
1487
1488
1489 /**
1490  * Checks if a given peer is connected to us
1491  *
1492  * @param handle connection to transport service
1493  * @param peer the peer to check
1494  * @return #GNUNET_YES (connected) or #GNUNET_NO (disconnected)
1495  */
1496 int
1497 GNUNET_TRANSPORT_check_peer_connected (struct GNUNET_TRANSPORT_Handle *handle,
1498                                        const struct GNUNET_PeerIdentity *peer)
1499 {
1500   if (GNUNET_YES ==
1501       GNUNET_CONTAINER_multipeermap_contains (handle->neighbours,
1502                                               peer))
1503     return GNUNET_YES;
1504   return GNUNET_NO;
1505 }
1506
1507
1508 /**
1509  * Task to call the HelloUpdateCallback of the GetHelloHandle
1510  *
1511  * @param cls the `struct GNUNET_TRANSPORT_GetHelloHandle`
1512  */
1513 static void
1514 call_hello_update_cb_async (void *cls)
1515 {
1516   struct GNUNET_TRANSPORT_GetHelloHandle *ghh = cls;
1517
1518   GNUNET_assert (NULL != ghh->handle->my_hello);
1519   GNUNET_assert (NULL != ghh->notify_task);
1520   ghh->notify_task = NULL;
1521   ghh->rec (ghh->rec_cls,
1522             ghh->handle->my_hello);
1523 }
1524
1525
1526 /**
1527  * Obtain the HELLO message for this peer.  The callback given in this function
1528  * is never called synchronously.
1529  *
1530  * @param handle connection to transport service
1531  * @param rec function to call with the HELLO, sender will be our peer
1532  *            identity; message and sender will be NULL on timeout
1533  *            (handshake with transport service pending/failed).
1534  *             cost estimate will be 0.
1535  * @param rec_cls closure for @a rec
1536  * @return handle to cancel the operation
1537  */
1538 struct GNUNET_TRANSPORT_GetHelloHandle *
1539 GNUNET_TRANSPORT_get_hello (struct GNUNET_TRANSPORT_Handle *handle,
1540                             GNUNET_TRANSPORT_HelloUpdateCallback rec,
1541                             void *rec_cls)
1542 {
1543   struct GNUNET_TRANSPORT_GetHelloHandle *hwl;
1544
1545   hwl = GNUNET_new (struct GNUNET_TRANSPORT_GetHelloHandle);
1546   hwl->rec = rec;
1547   hwl->rec_cls = rec_cls;
1548   hwl->handle = handle;
1549   GNUNET_CONTAINER_DLL_insert (handle->hwl_head,
1550                                handle->hwl_tail,
1551                                hwl);
1552   if (NULL != handle->my_hello)
1553     hwl->notify_task = GNUNET_SCHEDULER_add_now (&call_hello_update_cb_async,
1554                                                  hwl);
1555   return hwl;
1556 }
1557
1558
1559 /**
1560  * Stop receiving updates about changes to our HELLO message.
1561  *
1562  * @param ghh handle to cancel
1563  */
1564 void
1565 GNUNET_TRANSPORT_get_hello_cancel (struct GNUNET_TRANSPORT_GetHelloHandle *ghh)
1566 {
1567   struct GNUNET_TRANSPORT_Handle *handle = ghh->handle;
1568
1569   if (NULL != ghh->notify_task)
1570     GNUNET_SCHEDULER_cancel (ghh->notify_task);
1571   GNUNET_CONTAINER_DLL_remove (handle->hwl_head,
1572                                handle->hwl_tail,
1573                                ghh);
1574   GNUNET_free (ghh);
1575 }
1576
1577
1578 /**
1579  * Connect to the transport service.  Note that the connection may
1580  * complete (or fail) asynchronously.
1581  *
1582  * @param cfg configuration to use
1583  * @param self our own identity (API should check that it matches
1584  *             the identity found by transport), or NULL (no check)
1585  * @param cls closure for the callbacks
1586  * @param rec receive function to call
1587  * @param nc function to call on connect events
1588  * @param nd function to call on disconnect events
1589  * @return NULL on error
1590  */
1591 struct GNUNET_TRANSPORT_Handle *
1592 GNUNET_TRANSPORT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
1593                           const struct GNUNET_PeerIdentity *self,
1594                           void *cls,
1595                           GNUNET_TRANSPORT_ReceiveCallback rec,
1596                           GNUNET_TRANSPORT_NotifyConnect nc,
1597                           GNUNET_TRANSPORT_NotifyDisconnect nd)
1598 {
1599   return GNUNET_TRANSPORT_connect2 (cfg,
1600                                     self,
1601                                     cls,
1602                                     rec,
1603                                     nc,
1604                                     nd,
1605                                     NULL);
1606 }
1607
1608
1609 /**
1610  * Connect to the transport service.  Note that the connection may
1611  * complete (or fail) asynchronously.
1612  *
1613  * @param cfg configuration to use
1614  * @param self our own identity (API should check that it matches
1615  *             the identity found by transport), or NULL (no check)
1616  * @param cls closure for the callbacks
1617  * @param rec receive function to call
1618  * @param nc function to call on connect events
1619  * @param nd function to call on disconnect events
1620  * @param neb function to call if we have excess bandwidth to a peer
1621  * @return NULL on error
1622  */
1623 struct GNUNET_TRANSPORT_Handle *
1624 GNUNET_TRANSPORT_connect2 (const struct GNUNET_CONFIGURATION_Handle *cfg,
1625                            const struct GNUNET_PeerIdentity *self,
1626                            void *cls,
1627                            GNUNET_TRANSPORT_ReceiveCallback rec,
1628                            GNUNET_TRANSPORT_NotifyConnect nc,
1629                            GNUNET_TRANSPORT_NotifyDisconnect nd,
1630                            GNUNET_TRANSPORT_NotifyExcessBandwidth neb)
1631 {
1632   struct GNUNET_TRANSPORT_Handle *ret;
1633
1634   ret = GNUNET_new (struct GNUNET_TRANSPORT_Handle);
1635   if (NULL != self)
1636   {
1637     ret->self = *self;
1638     ret->check_self = GNUNET_YES;
1639   }
1640   ret->cfg = cfg;
1641   ret->cls = cls;
1642   ret->rec = rec;
1643   ret->nc_cb = nc;
1644   ret->nd_cb = nd;
1645   ret->neb_cb = neb;
1646   ret->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
1647   LOG (GNUNET_ERROR_TYPE_DEBUG,
1648        "Connecting to transport service.\n");
1649   ret->client = GNUNET_CLIENT_connect ("transport",
1650                                        cfg);
1651   if (NULL == ret->client)
1652   {
1653     GNUNET_free (ret);
1654     return NULL;
1655   }
1656   ret->neighbours =
1657     GNUNET_CONTAINER_multipeermap_create (STARTING_NEIGHBOURS_SIZE,
1658                                           GNUNET_YES);
1659   ret->ready_heap =
1660       GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1661   schedule_control_transmit (ret,
1662                              sizeof (struct StartMessage),
1663                              &send_start,
1664                              ret);
1665   return ret;
1666 }
1667
1668
1669 /**
1670  * Disconnect from the transport service.
1671  *
1672  * @param handle handle to the service as returned from #GNUNET_TRANSPORT_connect()
1673  */
1674 void
1675 GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle)
1676 {
1677   LOG (GNUNET_ERROR_TYPE_DEBUG,
1678        "Transport disconnect called!\n");
1679   /* this disconnects all neighbours... */
1680   if (NULL == handle->reconnect_task)
1681     disconnect_and_schedule_reconnect (handle);
1682   /* and now we stop trying to connect again... */
1683   if (NULL != handle->reconnect_task)
1684   {
1685     GNUNET_SCHEDULER_cancel (handle->reconnect_task);
1686     handle->reconnect_task = NULL;
1687   }
1688   GNUNET_CONTAINER_multipeermap_destroy (handle->neighbours);
1689   handle->neighbours = NULL;
1690   if (NULL != handle->quota_task)
1691   {
1692     GNUNET_SCHEDULER_cancel (handle->quota_task);
1693     handle->quota_task = NULL;
1694   }
1695   GNUNET_free_non_null (handle->my_hello);
1696   handle->my_hello = NULL;
1697   GNUNET_assert (NULL == handle->hwl_head);
1698   GNUNET_assert (NULL == handle->hwl_tail);
1699   GNUNET_CONTAINER_heap_destroy (handle->ready_heap);
1700   handle->ready_heap = NULL;
1701   GNUNET_free (handle);
1702 }
1703
1704
1705 /**
1706  * Check if we could queue a message of the given size for
1707  * transmission.  The transport service will take both its
1708  * internal buffers and bandwidth limits imposed by the
1709  * other peer into consideration when answering this query.
1710  *
1711  * @param handle connection to transport service
1712  * @param target who should receive the message
1713  * @param size how big is the message we want to transmit?
1714  * @param timeout after how long should we give up (and call
1715  *        notify with buf NULL and size 0)?
1716  * @param notify function to call when we are ready to
1717  *        send such a message
1718  * @param notify_cls closure for @a notify
1719  * @return NULL if someone else is already waiting to be notified
1720  *         non-NULL if the notify callback was queued (can be used to cancel
1721  *         using #GNUNET_TRANSPORT_notify_transmit_ready_cancel)
1722  */
1723 struct GNUNET_TRANSPORT_TransmitHandle *
1724 GNUNET_TRANSPORT_notify_transmit_ready (struct GNUNET_TRANSPORT_Handle *handle,
1725                                         const struct GNUNET_PeerIdentity *target,
1726                                         size_t size,
1727                                         struct GNUNET_TIME_Relative timeout,
1728                                         GNUNET_TRANSPORT_TransmitReadyNotify notify,
1729                                         void *notify_cls)
1730 {
1731   struct Neighbour *n;
1732   struct GNUNET_TRANSPORT_TransmitHandle *th;
1733   struct GNUNET_TIME_Relative delay;
1734
1735   n = neighbour_find (handle, target);
1736   if (NULL == n)
1737   {
1738     /* only use this function
1739      * once a connection has been established */
1740     GNUNET_assert (0);
1741     return NULL;
1742   }
1743   if (NULL != n->th)
1744   {
1745     /* attempt to send two messages at the same time to the same peer */
1746     GNUNET_assert (0);
1747     return NULL;
1748   }
1749   GNUNET_assert (NULL == n->hn);
1750   th = GNUNET_new (struct GNUNET_TRANSPORT_TransmitHandle);
1751   th->neighbour = n;
1752   th->notify = notify;
1753   th->notify_cls = notify_cls;
1754   th->request_start = GNUNET_TIME_absolute_get ();
1755   th->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1756   th->notify_size = size;
1757   n->th = th;
1758   /* calculate when our transmission should be ready */
1759   delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
1760                                               size + n->traffic_overhead);
1761   n->traffic_overhead = 0;
1762   if (delay.rel_value_us > timeout.rel_value_us)
1763     delay.rel_value_us = 0;        /* notify immediately (with failure) */
1764   if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
1765     LOG (GNUNET_ERROR_TYPE_WARNING,
1766          "At bandwidth %u byte/s next transmission to %s in %s\n",
1767          (unsigned int) n->out_tracker.available_bytes_per_s__,
1768          GNUNET_i2s (target),
1769          GNUNET_STRINGS_relative_time_to_string (delay,
1770                                                  GNUNET_YES));
1771   else
1772     LOG (GNUNET_ERROR_TYPE_DEBUG,
1773          "At bandwidth %u byte/s next transmission to %s in %s\n",
1774          (unsigned int) n->out_tracker.available_bytes_per_s__,
1775          GNUNET_i2s (target),
1776          GNUNET_STRINGS_relative_time_to_string (delay,
1777                                                  GNUNET_YES));
1778   n->hn = GNUNET_CONTAINER_heap_insert (handle->ready_heap,
1779                                         n,
1780                                         delay.rel_value_us);
1781   schedule_transmission (handle);
1782   return th;
1783 }
1784
1785
1786 /**
1787  * Cancel the specified transmission-ready notification.
1788  *
1789  * @param th handle returned from #GNUNET_TRANSPORT_notify_transmit_ready()
1790  */
1791 void
1792 GNUNET_TRANSPORT_notify_transmit_ready_cancel (struct GNUNET_TRANSPORT_TransmitHandle *th)
1793 {
1794   struct Neighbour *n;
1795
1796   GNUNET_assert (NULL == th->next);
1797   GNUNET_assert (NULL == th->prev);
1798   n = th->neighbour;
1799   GNUNET_assert (th == n->th);
1800   n->th = NULL;
1801   if (NULL != n->hn)
1802   {
1803     GNUNET_CONTAINER_heap_remove_node (n->hn);
1804     n->hn = NULL;
1805   }
1806   else
1807   {
1808     GNUNET_assert (NULL != th->timeout_task);
1809     GNUNET_SCHEDULER_cancel (th->timeout_task);
1810     th->timeout_task = NULL;
1811   }
1812   GNUNET_free (th);
1813 }
1814
1815
1816 /* end of transport_api.c */