small API change: do no longer pass rarely needed GNUNET_SCHEDULER_TaskContext to...
[oweals/gnunet.git] / src / transport / transport_api.c
1 /*
2      This file is part of GNUnet.
3      Copyright (C) 2009-2013 GNUnet e.V.
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18      Boston, MA 02110-1301, USA.
19 */
20
21 /**
22  * @file transport/transport_api.c
23  * @brief library to access the low-level P2P IO service
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - test test test
28  */
29 #include "platform.h"
30 #include "gnunet_util_lib.h"
31 #include "gnunet_constants.h"
32 #include "gnunet_arm_service.h"
33 #include "gnunet_hello_lib.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_transport_service.h"
36 #include "transport.h"
37
38 #define LOG(kind,...) GNUNET_log_from (kind, "transport-api",__VA_ARGS__)
39
40 /**
41  * If we could not send any payload to a peer for this amount of
42  * time, we print a warning.
43  */
44 #define UNREADY_WARN_TIME GNUNET_TIME_UNIT_MINUTES
45
46 /**
47  * How large to start with for the hashmap of neighbours.
48  */
49 #define STARTING_NEIGHBOURS_SIZE 16
50
51 /**
52  * Handle for a message that should be transmitted to the service.
53  * Used for both control messages and normal messages.
54  */
55 struct GNUNET_TRANSPORT_TransmitHandle
56 {
57
58   /**
59    * We keep all requests in a DLL.
60    */
61   struct GNUNET_TRANSPORT_TransmitHandle *next;
62
63   /**
64    * We keep all requests in a DLL.
65    */
66   struct GNUNET_TRANSPORT_TransmitHandle *prev;
67
68   /**
69    * Neighbour for this handle, NULL for control messages.
70    */
71   struct Neighbour *neighbour;
72
73   /**
74    * Function to call when @e notify_size bytes are available
75    * for transmission.
76    */
77   GNUNET_TRANSPORT_TransmitReadyNotify notify;
78
79   /**
80    * Closure for @e notify.
81    */
82   void *notify_cls;
83
84   /**
85    * Time at which this request was originally scheduled.
86    */
87   struct GNUNET_TIME_Absolute request_start;
88
89   /**
90    * Timeout for this request, 0 for control messages.
91    */
92   struct GNUNET_TIME_Absolute timeout;
93
94   /**
95    * Task to trigger request timeout if the request is stalled due to
96    * congestion.
97    */
98   struct GNUNET_SCHEDULER_Task *timeout_task;
99
100   /**
101    * How many bytes is our notify callback waiting for?
102    */
103   size_t notify_size;
104
105 };
106
107
108 /**
109  * Entry in hash table of all of our current (connected) neighbours.
110  */
111 struct Neighbour
112 {
113   /**
114    * Overall transport handle.
115    */
116   struct GNUNET_TRANSPORT_Handle *h;
117
118   /**
119    * Active transmit handle or NULL.
120    */
121   struct GNUNET_TRANSPORT_TransmitHandle *th;
122
123   /**
124    * Identity of this neighbour.
125    */
126   struct GNUNET_PeerIdentity id;
127
128   /**
129    * Outbound bandwidh tracker.
130    */
131   struct GNUNET_BANDWIDTH_Tracker out_tracker;
132
133   /**
134    * Entry in our readyness heap (which is sorted by @e next_ready
135    * value).  NULL if there is no pending transmission request for
136    * this neighbour or if we're waiting for @e is_ready to become
137    * true AFTER the @e out_tracker suggested that this peer's quota
138    * has been satisfied (so once @e is_ready goes to #GNUNET_YES,
139    * we should immediately go back into the heap).
140    */
141   struct GNUNET_CONTAINER_HeapNode *hn;
142
143   /**
144    * Last time when this peer received payload from us.
145    */
146   struct GNUNET_TIME_Absolute last_payload;
147
148   /**
149    * Task to trigger warnings if we do not get SEND_OK after a while.
150    */
151   struct GNUNET_SCHEDULER_Task *unready_warn_task;
152
153   /**
154    * Is this peer currently ready to receive a message?
155    */
156   int is_ready;
157
158   /**
159    * Sending consumed more bytes on wire than payload was announced
160    * This overhead is added to the delay of next sending operation
161    */
162   size_t traffic_overhead;
163 };
164
165
166 /**
167  * Linked list of functions to call whenever our HELLO is updated.
168  */
169 struct GNUNET_TRANSPORT_GetHelloHandle
170 {
171
172   /**
173    * This is a doubly linked list.
174    */
175   struct GNUNET_TRANSPORT_GetHelloHandle *next;
176
177   /**
178    * This is a doubly linked list.
179    */
180   struct GNUNET_TRANSPORT_GetHelloHandle *prev;
181
182   /**
183    * Transport handle.
184    */
185   struct GNUNET_TRANSPORT_Handle *handle;
186
187   /**
188    * Callback to call once we got our HELLO.
189    */
190   GNUNET_TRANSPORT_HelloUpdateCallback rec;
191
192   /**
193    * Task for calling the HelloUpdateCallback when we already have a HELLO
194    */
195   struct GNUNET_SCHEDULER_Task *notify_task;
196
197   /**
198    * Closure for @e rec.
199    */
200   void *rec_cls;
201
202 };
203
204
205 /**
206  * Entry in linked list for all offer-HELLO requests.
207  */
208 struct GNUNET_TRANSPORT_OfferHelloHandle
209 {
210   /**
211    * For the DLL.
212    */
213   struct GNUNET_TRANSPORT_OfferHelloHandle *prev;
214
215   /**
216    * For the DLL.
217    */
218   struct GNUNET_TRANSPORT_OfferHelloHandle *next;
219
220   /**
221    * Transport service handle we use for transmission.
222    */
223   struct GNUNET_TRANSPORT_Handle *th;
224
225   /**
226    * Transmission handle for this request.
227    */
228   struct GNUNET_TRANSPORT_TransmitHandle *tth;
229
230   /**
231    * Function to call once we are done.
232    */
233   GNUNET_SCHEDULER_TaskCallback cont;
234
235   /**
236    * Closure for @e cont
237    */
238   void *cls;
239
240   /**
241    * The HELLO message to be transmitted.
242    */
243   struct GNUNET_MessageHeader *msg;
244 };
245
246
247 /**
248  * Handle for the transport service (includes all of the
249  * state for the transport service).
250  */
251 struct GNUNET_TRANSPORT_Handle
252 {
253
254   /**
255    * Closure for the callbacks.
256    */
257   void *cls;
258
259   /**
260    * Function to call for received data.
261    */
262   GNUNET_TRANSPORT_ReceiveCallback rec;
263
264   /**
265    * function to call on connect events
266    */
267   GNUNET_TRANSPORT_NotifyConnect nc_cb;
268
269   /**
270    * function to call on disconnect events
271    */
272   GNUNET_TRANSPORT_NotifyDisconnect nd_cb;
273
274   /**
275    * function to call on excess bandwidth events
276    */
277   GNUNET_TRANSPORT_NotifyExcessBandwidth neb_cb;
278
279   /**
280    * Head of DLL of control messages.
281    */
282   struct GNUNET_TRANSPORT_TransmitHandle *control_head;
283
284   /**
285    * Tail of DLL of control messages.
286    */
287   struct GNUNET_TRANSPORT_TransmitHandle *control_tail;
288
289   /**
290    * The current HELLO message for this peer.  Updated
291    * whenever transports change their addresses.
292    */
293   struct GNUNET_MessageHeader *my_hello;
294
295   /**
296    * My client connection to the transport service.
297    */
298   struct GNUNET_CLIENT_Connection *client;
299
300   /**
301    * Handle to our registration with the client for notification.
302    */
303   struct GNUNET_CLIENT_TransmitHandle *cth;
304
305   /**
306    * Linked list of pending requests for our HELLO.
307    */
308   struct GNUNET_TRANSPORT_GetHelloHandle *hwl_head;
309
310   /**
311    * Linked list of pending requests for our HELLO.
312    */
313   struct GNUNET_TRANSPORT_GetHelloHandle *hwl_tail;
314
315   /**
316    * Linked list of pending offer HELLO requests head
317    */
318   struct GNUNET_TRANSPORT_OfferHelloHandle *oh_head;
319
320   /**
321    * Linked list of pending offer HELLO requests tail
322    */
323   struct GNUNET_TRANSPORT_OfferHelloHandle *oh_tail;
324
325   /**
326    * My configuration.
327    */
328   const struct GNUNET_CONFIGURATION_Handle *cfg;
329
330   /**
331    * Hash map of the current connected neighbours of this peer.
332    * Maps peer identities to `struct Neighbour` entries.
333    */
334   struct GNUNET_CONTAINER_MultiPeerMap *neighbours;
335
336   /**
337    * Heap sorting peers with pending messages by the timestamps that
338    * specify when we could next send a message to the respective peer.
339    * Excludes control messages (which can always go out immediately).
340    * Maps time stamps to `struct Neighbour` entries.
341    */
342   struct GNUNET_CONTAINER_Heap *ready_heap;
343
344   /**
345    * Peer identity as assumed by this process, or all zeros.
346    */
347   struct GNUNET_PeerIdentity self;
348
349   /**
350    * ID of the task trying to reconnect to the service.
351    */
352   struct GNUNET_SCHEDULER_Task *reconnect_task;
353
354   /**
355    * ID of the task trying to trigger transmission for a peer while
356    * maintaining bandwidth quotas.  In use if there are no control
357    * messages and the smallest entry in the @e ready_heap has a time
358    * stamp in the future.
359    */
360   struct GNUNET_SCHEDULER_Task *quota_task;
361
362   /**
363    * Delay until we try to reconnect.
364    */
365   struct GNUNET_TIME_Relative reconnect_delay;
366
367   /**
368    * Should we check that @e self matches what the service thinks?
369    * (if #GNUNET_NO, then @e self is all zeros!).
370    */
371   int check_self;
372
373   /**
374    * Reconnect in progress
375    */
376   int reconnecting;
377 };
378
379
380 /**
381  * Schedule the task to send one message, either from the control
382  * list or the peer message queues  to the service.
383  *
384  * @param h transport service to schedule a transmission for
385  */
386 static void
387 schedule_transmission (struct GNUNET_TRANSPORT_Handle *h);
388
389
390 /**
391  * Function that will schedule the job that will try
392  * to connect us again to the client.
393  *
394  * @param h transport service to reconnect
395  */
396 static void
397 disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h);
398
399
400 /**
401  * A neighbour has not gotten a SEND_OK in a  while. Print a warning.
402  *
403  * @param cls the `struct Neighbour`
404  */
405 static void
406 do_warn_unready (void *cls)
407 {
408   struct Neighbour *n = cls;
409   struct GNUNET_TIME_Relative delay;
410
411   delay = GNUNET_TIME_absolute_get_duration (n->last_payload);
412   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
413               "Lacking SEND_OK, no payload could be send to %s for %s\n",
414               GNUNET_i2s (&n->id),
415               GNUNET_STRINGS_relative_time_to_string (delay,
416                                                       GNUNET_YES));
417   n->unready_warn_task
418     = GNUNET_SCHEDULER_add_delayed (UNREADY_WARN_TIME,
419                                     &do_warn_unready,
420                                     n);
421 }
422
423
424 /**
425  * Get the neighbour list entry for the given peer
426  *
427  * @param h our context
428  * @param peer peer to look up
429  * @return NULL if no such peer entry exists
430  */
431 static struct Neighbour *
432 neighbour_find (struct GNUNET_TRANSPORT_Handle *h,
433                 const struct GNUNET_PeerIdentity *peer)
434 {
435   return GNUNET_CONTAINER_multipeermap_get (h->neighbours,
436                                             peer);
437 }
438
439
440 /**
441  * The outbound quota has changed in a way that may require
442  * us to reset the timeout.  Update the timeout.
443  *
444  * @param cls the `struct Neighbour` for which the timeout changed
445  */
446 static void
447 outbound_bw_tracker_update (void *cls)
448 {
449   struct Neighbour *n = cls;
450   struct GNUNET_TIME_Relative delay;
451
452   if (NULL == n->hn)
453     return;
454   delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
455                                               n->th->notify_size + n->traffic_overhead);
456   LOG (GNUNET_ERROR_TYPE_DEBUG,
457        "New outbound delay %s us\n",
458        GNUNET_STRINGS_relative_time_to_string (delay,
459                                                GNUNET_NO));
460   GNUNET_CONTAINER_heap_update_cost (n->h->ready_heap,
461       n->hn, delay.rel_value_us);
462   schedule_transmission (n->h);
463 }
464
465
466 /**
467  * Function called by the bandwidth tracker if we have excess
468  * bandwidth.
469  *
470  * @param cls the `struct Neighbour` that has excess bandwidth
471  */
472 static void
473 notify_excess_cb (void *cls)
474 {
475   struct Neighbour *n = cls;
476   struct GNUNET_TRANSPORT_Handle *h = n->h;
477
478   if (NULL != h->neb_cb)
479     h->neb_cb (h->cls,
480                &n->id);
481 }
482
483
484 /**
485  * Add neighbour to our list
486  *
487  * @return NULL if this API is currently disconnecting from the service
488  */
489 static struct Neighbour *
490 neighbour_add (struct GNUNET_TRANSPORT_Handle *h,
491                const struct GNUNET_PeerIdentity *pid)
492 {
493   struct Neighbour *n;
494
495   LOG (GNUNET_ERROR_TYPE_DEBUG,
496        "Creating entry for neighbour `%s'.\n",
497        GNUNET_i2s (pid));
498   n = GNUNET_new (struct Neighbour);
499   n->id = *pid;
500   n->h = h;
501   n->is_ready = GNUNET_YES;
502   n->traffic_overhead = 0;
503   GNUNET_BANDWIDTH_tracker_init2 (&n->out_tracker,
504                                   &outbound_bw_tracker_update,
505                                   n,
506                                   GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
507                                   MAX_BANDWIDTH_CARRY_S,
508                                   &notify_excess_cb,
509                                   n);
510   GNUNET_assert (GNUNET_OK ==
511                  GNUNET_CONTAINER_multipeermap_put (h->neighbours,
512                                                     &n->id,
513                                                     n,
514                                                     GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
515   return n;
516 }
517
518
519 /**
520  * Iterator over hash map entries, for deleting state of a neighbour.
521  *
522  * @param cls the `struct GNUNET_TRANSPORT_Handle *`
523  * @param key peer identity
524  * @param value value in the hash map, the neighbour entry to delete
525  * @return #GNUNET_YES if we should continue to
526  *         iterate,
527  *         #GNUNET_NO if not.
528  */
529 static int
530 neighbour_delete (void *cls,
531                   const struct GNUNET_PeerIdentity *key,
532                   void *value)
533 {
534   struct GNUNET_TRANSPORT_Handle *handle = cls;
535   struct Neighbour *n = value;
536
537   LOG (GNUNET_ERROR_TYPE_DEBUG,
538        "Dropping entry for neighbour `%s'.\n",
539        GNUNET_i2s (key));
540   GNUNET_BANDWIDTH_tracker_notification_stop (&n->out_tracker);
541   if (NULL != handle->nd_cb)
542     handle->nd_cb (handle->cls,
543                    &n->id);
544   if (NULL != n->unready_warn_task)
545   {
546     GNUNET_SCHEDULER_cancel (n->unready_warn_task);
547     n->unready_warn_task = NULL;
548   }
549   GNUNET_assert (NULL == n->th);
550   GNUNET_assert (NULL == n->hn);
551   GNUNET_assert (GNUNET_YES ==
552                  GNUNET_CONTAINER_multipeermap_remove (handle->neighbours,
553                                                        key,
554                                                        n));
555   GNUNET_free (n);
556   return GNUNET_YES;
557 }
558
559
560 /**
561  * Function we use for handling incoming messages.
562  *
563  * @param cls closure, a `struct GNUNET_TRANSPORT_Handle *`
564  * @param msg message received, NULL on timeout or fatal error
565  */
566 static void
567 demultiplexer (void *cls,
568                const struct GNUNET_MessageHeader *msg)
569 {
570   struct GNUNET_TRANSPORT_Handle *h = cls;
571   const struct DisconnectInfoMessage *dim;
572   const struct ConnectInfoMessage *cim;
573   const struct InboundMessage *im;
574   const struct GNUNET_MessageHeader *imm;
575   const struct SendOkMessage *okm;
576   const struct QuotaSetMessage *qm;
577   struct GNUNET_TRANSPORT_GetHelloHandle *hwl;
578   struct GNUNET_TRANSPORT_GetHelloHandle *next_hwl;
579   struct Neighbour *n;
580   struct GNUNET_PeerIdentity me;
581   uint16_t size;
582   uint32_t bytes_msg;
583   uint32_t bytes_physical;
584
585   GNUNET_assert (NULL != h->client);
586   if (GNUNET_YES == h->reconnecting)
587   {
588     return;
589   }
590   if (NULL == msg)
591   {
592     LOG (GNUNET_ERROR_TYPE_DEBUG,
593          "Error receiving from transport service, disconnecting temporarily.\n");
594     h->reconnecting = GNUNET_YES;
595     disconnect_and_schedule_reconnect (h);
596     return;
597   }
598   GNUNET_CLIENT_receive (h->client,
599                          &demultiplexer,
600                          h,
601                          GNUNET_TIME_UNIT_FOREVER_REL);
602   size = ntohs (msg->size);
603   switch (ntohs (msg->type))
604   {
605   case GNUNET_MESSAGE_TYPE_HELLO:
606     if (GNUNET_OK !=
607         GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg,
608                              &me))
609     {
610       GNUNET_break (0);
611       break;
612     }
613     LOG (GNUNET_ERROR_TYPE_DEBUG,
614          "Receiving (my own) HELLO message (%u bytes), I am `%s'.\n",
615          (unsigned int) size,
616          GNUNET_i2s (&me));
617     GNUNET_free_non_null (h->my_hello);
618     h->my_hello = NULL;
619     if (size < sizeof (struct GNUNET_MessageHeader))
620     {
621       GNUNET_break (0);
622       break;
623     }
624     h->my_hello = GNUNET_copy_message (msg);
625     hwl = h->hwl_head;
626     while (NULL != hwl)
627     {
628       next_hwl = hwl->next;
629       hwl->rec (hwl->rec_cls,
630                 h->my_hello);
631       hwl = next_hwl;
632     }
633     break;
634   case GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT:
635     if (size < sizeof (struct ConnectInfoMessage))
636     {
637       GNUNET_break (0);
638       h->reconnecting = GNUNET_YES;
639       disconnect_and_schedule_reconnect (h);
640       break;
641     }
642     cim = (const struct ConnectInfoMessage *) msg;
643     if (size !=
644         sizeof (struct ConnectInfoMessage))
645     {
646       GNUNET_break (0);
647       h->reconnecting = GNUNET_YES;
648       disconnect_and_schedule_reconnect (h);
649       break;
650     }
651     LOG (GNUNET_ERROR_TYPE_DEBUG,
652          "Receiving CONNECT message for `%s'.\n",
653          GNUNET_i2s (&cim->id));
654     n = neighbour_find (h, &cim->id);
655     if (NULL != n)
656     {
657       GNUNET_break (0);
658       h->reconnecting = GNUNET_YES;
659       disconnect_and_schedule_reconnect (h);
660       break;
661     }
662     n = neighbour_add (h,
663                        &cim->id);
664     LOG (GNUNET_ERROR_TYPE_DEBUG,
665          "Receiving CONNECT message for `%s' with quota %u\n",
666          GNUNET_i2s (&cim->id),
667          ntohl (cim->quota_out.value__));
668     GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
669                                            cim->quota_out);
670     if (NULL != h->nc_cb)
671       h->nc_cb (h->cls,
672                 &n->id);
673     break;
674   case GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT:
675     if (size != sizeof (struct DisconnectInfoMessage))
676     {
677       GNUNET_break (0);
678       h->reconnecting = GNUNET_YES;
679       disconnect_and_schedule_reconnect (h);
680       break;
681     }
682     dim = (const struct DisconnectInfoMessage *) msg;
683     GNUNET_break (ntohl (dim->reserved) == 0);
684     LOG (GNUNET_ERROR_TYPE_DEBUG,
685          "Receiving DISCONNECT message for `%s'.\n",
686          GNUNET_i2s (&dim->peer));
687     n = neighbour_find (h, &dim->peer);
688     if (NULL == n)
689     {
690       GNUNET_break (0);
691       h->reconnecting = GNUNET_YES;
692       disconnect_and_schedule_reconnect (h);
693       break;
694     }
695     neighbour_delete (h,
696                       &dim->peer,
697                       n);
698     break;
699   case GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK:
700     if (size != sizeof (struct SendOkMessage))
701     {
702       GNUNET_break (0);
703       h->reconnecting = GNUNET_YES;
704       disconnect_and_schedule_reconnect (h);
705       break;
706     }
707     okm = (const struct SendOkMessage *) msg;
708     bytes_msg = ntohl (okm->bytes_msg);
709     bytes_physical = ntohl (okm->bytes_physical);
710     LOG (GNUNET_ERROR_TYPE_DEBUG,
711          "Receiving SEND_OK message, transmission to %s %s.\n",
712          GNUNET_i2s (&okm->peer),
713          ntohl (okm->success) == GNUNET_OK ? "succeeded" : "failed");
714
715     n = neighbour_find (h,
716                         &okm->peer);
717     if (NULL == n)
718     {
719       /* We should never get a 'SEND_OK' for a peer that we are not
720          connected to */
721       GNUNET_break (0);
722       h->reconnecting = GNUNET_YES;
723       disconnect_and_schedule_reconnect (h);
724       break;
725     }
726     if (bytes_physical > bytes_msg)
727     {
728       LOG (GNUNET_ERROR_TYPE_DEBUG,
729            "Overhead for %u byte message was %u\n",
730            bytes_msg,
731            bytes_physical - bytes_msg);
732       n->traffic_overhead += bytes_physical - bytes_msg;
733     }
734     GNUNET_break (GNUNET_NO == n->is_ready);
735     n->is_ready = GNUNET_YES;
736     if (NULL != n->unready_warn_task)
737     {
738       GNUNET_SCHEDULER_cancel (n->unready_warn_task);
739       n->unready_warn_task = NULL;
740     }
741     if ((NULL != n->th) && (NULL == n->hn))
742     {
743       GNUNET_assert (NULL != n->th->timeout_task);
744       GNUNET_SCHEDULER_cancel (n->th->timeout_task);
745       n->th->timeout_task = NULL;
746       /* we've been waiting for this (congestion, not quota,
747        * caused delayed transmission) */
748       n->hn = GNUNET_CONTAINER_heap_insert (h->ready_heap,
749                                             n,
750                                             0);
751     }
752     schedule_transmission (h);
753     break;
754   case GNUNET_MESSAGE_TYPE_TRANSPORT_RECV:
755     if (size <
756         sizeof (struct InboundMessage) + sizeof (struct GNUNET_MessageHeader))
757     {
758       GNUNET_break (0);
759       h->reconnecting = GNUNET_YES;
760       disconnect_and_schedule_reconnect (h);
761       break;
762     }
763     im = (const struct InboundMessage *) msg;
764     imm = (const struct GNUNET_MessageHeader *) &im[1];
765     if (ntohs (imm->size) + sizeof (struct InboundMessage) != size)
766     {
767       GNUNET_break (0);
768       h->reconnecting = GNUNET_YES;
769       disconnect_and_schedule_reconnect (h);
770       break;
771     }
772     LOG (GNUNET_ERROR_TYPE_DEBUG,
773          "Received message of type %u with %u bytes from `%s'.\n",
774          (unsigned int) ntohs (imm->type),
775          (unsigned int) ntohs (imm->size),
776          GNUNET_i2s (&im->peer));
777     n = neighbour_find (h, &im->peer);
778     if (NULL == n)
779     {
780       GNUNET_break (0);
781       h->reconnecting = GNUNET_YES;
782       disconnect_and_schedule_reconnect (h);
783       break;
784     }
785     if (NULL != h->rec)
786       h->rec (h->cls,
787               &im->peer,
788               imm);
789     break;
790   case GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA:
791     if (size != sizeof (struct QuotaSetMessage))
792     {
793       GNUNET_break (0);
794       h->reconnecting = GNUNET_YES;
795       disconnect_and_schedule_reconnect (h);
796       break;
797     }
798     qm = (const struct QuotaSetMessage *) msg;
799     n = neighbour_find (h, &qm->peer);
800     if (NULL == n)
801     {
802       GNUNET_break (0);
803       h->reconnecting = GNUNET_YES;
804       disconnect_and_schedule_reconnect (h);
805       break;
806     }
807     LOG (GNUNET_ERROR_TYPE_DEBUG,
808          "Receiving SET_QUOTA message for `%s' with quota %u\n",
809          GNUNET_i2s (&qm->peer),
810          ntohl (qm->quota.value__));
811     GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
812                                            qm->quota);
813     break;
814   default:
815     LOG (GNUNET_ERROR_TYPE_ERROR,
816          _("Received unexpected message of type %u in %s:%u\n"),
817          ntohs (msg->type),
818          __FILE__,
819          __LINE__);
820     GNUNET_break (0);
821     break;
822   }
823 }
824
825
826 /**
827  * A transmission request could not be satisfied because of
828  * network congestion.  Notify the initiator and clean up.
829  *
830  * @param cls the `struct GNUNET_TRANSPORT_TransmitHandle`
831  */
832 static void
833 timeout_request_due_to_congestion (void *cls)
834 {
835   struct GNUNET_TRANSPORT_TransmitHandle *th = cls;
836   struct Neighbour *n = th->neighbour;
837   struct GNUNET_TIME_Relative delay;
838
839   n->th->timeout_task = NULL;
840   delay = GNUNET_TIME_absolute_get_duration (th->request_start);
841   LOG (GNUNET_ERROR_TYPE_WARNING,
842        "Discarding %u bytes of payload message after %s delay due to congestion\n",
843        th->notify_size,
844        GNUNET_STRINGS_relative_time_to_string (delay,
845                                                GNUNET_YES));
846   GNUNET_assert (th == n->th);
847   GNUNET_assert (NULL == n->hn);
848   n->th = NULL;
849   th->notify (th->notify_cls,
850               0,
851               NULL);
852   GNUNET_free (th);
853 }
854
855
856 /**
857  * Transmit message(s) to service.
858  *
859  * @param cls handle to transport
860  * @param size number of bytes available in @a buf
861  * @param buf where to copy the message
862  * @return number of bytes copied to @a buf
863  */
864 static size_t
865 transport_notify_ready (void *cls,
866                         size_t size,
867                         void *buf)
868 {
869   struct GNUNET_TRANSPORT_Handle *h = cls;
870   struct GNUNET_TRANSPORT_TransmitHandle *th;
871   struct GNUNET_TIME_Relative delay;
872   struct Neighbour *n;
873   char *cbuf;
874   struct OutboundMessage obm;
875   size_t ret;
876   size_t nret;
877   size_t mret;
878
879   GNUNET_assert (NULL != h->client);
880   h->cth = NULL;
881   if (NULL == buf)
882   {
883     /* transmission failed */
884     disconnect_and_schedule_reconnect (h);
885     return 0;
886   }
887
888   cbuf = buf;
889   ret = 0;
890   /* first send control messages */
891   while ( (NULL != (th = h->control_head)) &&
892           (th->notify_size <= size) )
893   {
894     GNUNET_CONTAINER_DLL_remove (h->control_head,
895                                  h->control_tail,
896                                  th);
897     nret = th->notify (th->notify_cls,
898                        size,
899                        &cbuf[ret]);
900     delay = GNUNET_TIME_absolute_get_duration (th->request_start);
901     if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
902       LOG (GNUNET_ERROR_TYPE_WARNING,
903            "Added %u bytes of control message at %u after %s delay\n",
904            nret,
905            ret,
906            GNUNET_STRINGS_relative_time_to_string (delay,
907                                                    GNUNET_YES));
908     else
909       LOG (GNUNET_ERROR_TYPE_DEBUG,
910            "Added %u bytes of control message at %u after %s delay\n",
911            nret,
912            ret,
913            GNUNET_STRINGS_relative_time_to_string (delay,
914                                                    GNUNET_YES));
915     GNUNET_free (th);
916     ret += nret;
917     size -= nret;
918   }
919
920   /* then, if possible and no control messages pending, send data messages */
921   while ( (NULL == h->control_head) &&
922           (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap))) )
923   {
924     if (GNUNET_YES != n->is_ready)
925     {
926       /* peer not ready, wait for notification! */
927       GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap));
928       n->hn = NULL;
929       GNUNET_assert (NULL == n->th->timeout_task);
930       n->th->timeout_task
931         = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining
932                                         (n->th->timeout),
933                                         &timeout_request_due_to_congestion,
934                                         n->th);
935       continue;
936     }
937     th = n->th;
938     if (th->notify_size + sizeof (struct OutboundMessage) > size)
939       break;                    /* does not fit */
940     if (GNUNET_BANDWIDTH_tracker_get_delay
941         (&n->out_tracker,
942          th->notify_size).rel_value_us > 0)
943       break;                    /* too early */
944     GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap));
945     n->hn = NULL;
946     n->th = NULL;
947     GNUNET_assert (size >= sizeof (struct OutboundMessage));
948     mret = th->notify (th->notify_cls,
949                        size - sizeof (struct OutboundMessage),
950                        &cbuf[ret + sizeof (struct OutboundMessage)]);
951     GNUNET_assert (mret <= size - sizeof (struct OutboundMessage));
952     if (0 == mret)
953     {
954       GNUNET_free (th);
955       continue;
956     }
957     if (NULL != n->unready_warn_task)
958       n->unready_warn_task
959         = GNUNET_SCHEDULER_add_delayed (UNREADY_WARN_TIME,
960                                         &do_warn_unready,
961                                         n);
962     n->last_payload = GNUNET_TIME_absolute_get ();
963     n->is_ready = GNUNET_NO;
964     GNUNET_assert (mret + sizeof (struct OutboundMessage) <
965                    GNUNET_SERVER_MAX_MESSAGE_SIZE);
966     obm.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND);
967     obm.header.size = htons (mret + sizeof (struct OutboundMessage));
968     obm.reserved = htonl (0);
969     obm.timeout =
970       GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining
971                                  (th->timeout));
972     obm.peer = n->id;
973     memcpy (&cbuf[ret],
974             &obm,
975             sizeof (struct OutboundMessage));
976     ret += (mret + sizeof (struct OutboundMessage));
977     size -= (mret + sizeof (struct OutboundMessage));
978     GNUNET_BANDWIDTH_tracker_consume (&n->out_tracker,
979                                       mret);
980     delay = GNUNET_TIME_absolute_get_duration (th->request_start);
981     if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
982       LOG (GNUNET_ERROR_TYPE_WARNING,
983            "Added %u bytes of payload message for %s after %s delay at %u b/s\n",
984            mret,
985            GNUNET_i2s (&n->id),
986            GNUNET_STRINGS_relative_time_to_string (delay,
987                                                    GNUNET_YES),
988            (unsigned int) n->out_tracker.available_bytes_per_s__);
989     else
990       LOG (GNUNET_ERROR_TYPE_DEBUG,
991            "Added %u bytes of payload message for %s after %s delay at %u b/s\n",
992            mret,
993            GNUNET_i2s (&n->id),
994            GNUNET_STRINGS_relative_time_to_string (delay,
995                                                    GNUNET_YES),
996            (unsigned int) n->out_tracker.available_bytes_per_s__);
997     GNUNET_free (th);
998     break;
999   }
1000   /* if there are more pending messages, try to schedule those */
1001   schedule_transmission (h);
1002   LOG (GNUNET_ERROR_TYPE_DEBUG,
1003        "Transmitting %u bytes to transport service\n",
1004        ret);
1005   return ret;
1006 }
1007
1008
1009 /**
1010  * Schedule the task to send one message, either from the control
1011  * list or the peer message queues  to the service.
1012  *
1013  * @param cls transport service to schedule a transmission for
1014  */
1015 static void
1016 schedule_transmission_task (void *cls)
1017 {
1018   struct GNUNET_TRANSPORT_Handle *h = cls;
1019   size_t size;
1020   struct GNUNET_TRANSPORT_TransmitHandle *th;
1021   struct Neighbour *n;
1022
1023   h->quota_task = NULL;
1024   GNUNET_assert (NULL != h->client);
1025   /* destroy all requests that have timed out */
1026   while ( (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap))) &&
1027           (0 == GNUNET_TIME_absolute_get_remaining (n->th->timeout).rel_value_us) )
1028   {
1029     /* notify client that the request could not be satisfied within
1030      * the given time constraints */
1031     th = n->th;
1032     n->th = NULL;
1033     GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap));
1034     n->hn = NULL;
1035     LOG (GNUNET_ERROR_TYPE_DEBUG,
1036          "Signalling timeout for transmission to peer %s due to congestion\n",
1037          GNUNET_i2s (&n->id));
1038     GNUNET_assert (0 == th->notify (th->notify_cls,
1039                                     0,
1040                                     NULL));
1041     GNUNET_free (th);
1042   }
1043   if (NULL != h->cth)
1044     return;
1045   if (NULL != h->control_head)
1046   {
1047     size = h->control_head->notify_size;
1048   }
1049   else
1050   {
1051     n = GNUNET_CONTAINER_heap_peek (h->ready_heap);
1052     if (NULL == n)
1053       return;                   /* no pending messages */
1054     size = n->th->notify_size + sizeof (struct OutboundMessage);
1055   }
1056   LOG (GNUNET_ERROR_TYPE_DEBUG,
1057        "Calling notify_transmit_ready\n");
1058   h->cth
1059     = GNUNET_CLIENT_notify_transmit_ready (h->client,
1060                                            size,
1061                                            GNUNET_TIME_UNIT_FOREVER_REL,
1062                                            GNUNET_NO,
1063                                            &transport_notify_ready,
1064                                            h);
1065   GNUNET_assert (NULL != h->cth);
1066 }
1067
1068
1069 /**
1070  * Schedule the task to send one message, either from the control
1071  * list or the peer message queues  to the service.
1072  *
1073  * @param h transport service to schedule a transmission for
1074  */
1075 static void
1076 schedule_transmission (struct GNUNET_TRANSPORT_Handle *h)
1077 {
1078   struct GNUNET_TIME_Relative delay;
1079   struct Neighbour *n;
1080
1081   GNUNET_assert (NULL != h->client);
1082   if (NULL != h->quota_task)
1083   {
1084     GNUNET_SCHEDULER_cancel (h->quota_task);
1085     h->quota_task = NULL;
1086   }
1087   if (NULL != h->control_head)
1088     delay = GNUNET_TIME_UNIT_ZERO;
1089   else if (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap)))
1090   {
1091     delay =
1092         GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
1093                                             n->th->notify_size + n->traffic_overhead);
1094     n->traffic_overhead = 0;
1095   }
1096   else
1097   {
1098     LOG (GNUNET_ERROR_TYPE_DEBUG,
1099          "No work to be done, not scheduling transmission.\n");
1100     return;                     /* no work to be done */
1101   }
1102   LOG (GNUNET_ERROR_TYPE_DEBUG,
1103        "Scheduling next transmission to service in %s\n",
1104        GNUNET_STRINGS_relative_time_to_string (delay,
1105                                                GNUNET_YES));
1106   h->quota_task =
1107       GNUNET_SCHEDULER_add_delayed (delay,
1108                                     &schedule_transmission_task,
1109                                     h);
1110 }
1111
1112
1113 /**
1114  * Queue control request for transmission to the transport
1115  * service.
1116  *
1117  * @param h handle to the transport service
1118  * @param size number of bytes to be transmitted
1119  * @param notify function to call to get the content
1120  * @param notify_cls closure for @a notify
1121  * @return a `struct GNUNET_TRANSPORT_TransmitHandle`
1122  */
1123 static struct GNUNET_TRANSPORT_TransmitHandle *
1124 schedule_control_transmit (struct GNUNET_TRANSPORT_Handle *h,
1125                            size_t size,
1126                            GNUNET_TRANSPORT_TransmitReadyNotify notify,
1127                            void *notify_cls)
1128 {
1129   struct GNUNET_TRANSPORT_TransmitHandle *th;
1130
1131   LOG (GNUNET_ERROR_TYPE_DEBUG,
1132        "Control transmit of %u bytes requested\n",
1133        size);
1134   th = GNUNET_new (struct GNUNET_TRANSPORT_TransmitHandle);
1135   th->notify = notify;
1136   th->notify_cls = notify_cls;
1137   th->notify_size = size;
1138   th->request_start = GNUNET_TIME_absolute_get ();
1139   GNUNET_CONTAINER_DLL_insert_tail (h->control_head,
1140                                     h->control_tail,
1141                                     th);
1142   schedule_transmission (h);
1143   return th;
1144 }
1145
1146
1147 /**
1148  * Transmit START message to service.
1149  *
1150  * @param cls unused
1151  * @param size number of bytes available in @a buf
1152  * @param buf where to copy the message
1153  * @return number of bytes copied to @a buf
1154  */
1155 static size_t
1156 send_start (void *cls,
1157             size_t size,
1158             void *buf)
1159 {
1160   struct GNUNET_TRANSPORT_Handle *h = cls;
1161   struct StartMessage s;
1162   uint32_t options;
1163
1164   if (NULL == buf)
1165   {
1166     /* Can only be shutdown, just give up */
1167     LOG (GNUNET_ERROR_TYPE_DEBUG,
1168          "Shutdown while trying to transmit START request.\n");
1169     return 0;
1170   }
1171   LOG (GNUNET_ERROR_TYPE_DEBUG,
1172        "Transmitting START request.\n");
1173   GNUNET_assert (size >= sizeof (struct StartMessage));
1174   s.header.size = htons (sizeof (struct StartMessage));
1175   s.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_START);
1176   options = 0;
1177   if (h->check_self)
1178     options |= 1;
1179   if (NULL != h->rec)
1180     options |= 2;
1181   s.options = htonl (options);
1182   s.self = h->self;
1183   memcpy (buf, &s, sizeof (struct StartMessage));
1184   GNUNET_CLIENT_receive (h->client, &demultiplexer, h,
1185                          GNUNET_TIME_UNIT_FOREVER_REL);
1186   return sizeof (struct StartMessage);
1187 }
1188
1189
1190 /**
1191  * Try again to connect to transport service.
1192  *
1193  * @param cls the handle to the transport service
1194  */
1195 static void
1196 reconnect (void *cls)
1197 {
1198   struct GNUNET_TRANSPORT_Handle *h = cls;
1199   const struct GNUNET_SCHEDULER_TaskContext *tc;
1200
1201   h->reconnect_task = NULL;
1202   tc = GNUNET_SCHEDULER_get_task_context ();
1203   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
1204   {
1205     /* shutdown, just give up */
1206     return;
1207   }
1208   LOG (GNUNET_ERROR_TYPE_DEBUG,
1209        "Connecting to transport service.\n");
1210   GNUNET_assert (NULL == h->client);
1211   GNUNET_assert (NULL == h->control_head);
1212   GNUNET_assert (NULL == h->control_tail);
1213   h->reconnecting = GNUNET_NO;
1214   h->client = GNUNET_CLIENT_connect ("transport", h->cfg);
1215
1216   GNUNET_assert (NULL != h->client);
1217   schedule_control_transmit (h, sizeof (struct StartMessage),
1218                              &send_start, h);
1219 }
1220
1221
1222 /**
1223  * Function that will schedule the job that will try
1224  * to connect us again to the client.
1225  *
1226  * @param h transport service to reconnect
1227  */
1228 static void
1229 disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h)
1230 {
1231   struct GNUNET_TRANSPORT_TransmitHandle *th;
1232
1233   GNUNET_assert (NULL == h->reconnect_task);
1234   if (NULL != h->cth)
1235   {
1236     GNUNET_CLIENT_notify_transmit_ready_cancel (h->cth);
1237     h->cth = NULL;
1238   }
1239   if (NULL != h->client)
1240   {
1241     GNUNET_CLIENT_disconnect (h->client);
1242     h->client = NULL;
1243 /*    LOG (GNUNET_ERROR_TYPE_ERROR,
1244          "Client disconnect done \n");*/
1245   }
1246   /* Forget about all neighbours that we used to be connected to */
1247   GNUNET_CONTAINER_multipeermap_iterate (h->neighbours,
1248                                          &neighbour_delete,
1249                                          h);
1250   if (NULL != h->quota_task)
1251   {
1252     GNUNET_SCHEDULER_cancel (h->quota_task);
1253     h->quota_task = NULL;
1254   }
1255   while ((NULL != (th = h->control_head)))
1256   {
1257     GNUNET_CONTAINER_DLL_remove (h->control_head,
1258                                  h->control_tail,
1259                                  th);
1260     th->notify (th->notify_cls,
1261                 0,
1262                 NULL);
1263     GNUNET_free (th);
1264   }
1265   LOG (GNUNET_ERROR_TYPE_DEBUG,
1266        "Scheduling task to reconnect to transport service in %s.\n",
1267        GNUNET_STRINGS_relative_time_to_string (h->reconnect_delay,
1268                                                GNUNET_YES));
1269   h->reconnect_task =
1270       GNUNET_SCHEDULER_add_delayed (h->reconnect_delay,
1271                                     &reconnect,
1272                                     h);
1273   h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay);
1274 }
1275
1276
1277 /**
1278  * Cancel control request for transmission to the transport service.
1279  *
1280  * @param th handle to the transport service
1281  * @param tth transmit handle to cancel
1282  */
1283 static void
1284 cancel_control_transmit (struct GNUNET_TRANSPORT_Handle *th,
1285                          struct GNUNET_TRANSPORT_TransmitHandle *tth)
1286 {
1287   LOG (GNUNET_ERROR_TYPE_DEBUG,
1288        "Canceling transmit of contral transmission requested\n");
1289   GNUNET_CONTAINER_DLL_remove (th->control_head,
1290                                th->control_tail,
1291                                tth);
1292   GNUNET_free (tth);
1293 }
1294
1295
1296 /**
1297  * Send HELLO message to the service.
1298  *
1299  * @param cls the HELLO message to send
1300  * @param size number of bytes available in @a buf
1301  * @param buf where to copy the message
1302  * @return number of bytes copied to @a buf
1303  */
1304 static size_t
1305 send_hello (void *cls,
1306             size_t size,
1307             void *buf)
1308 {
1309   struct GNUNET_TRANSPORT_OfferHelloHandle *ohh = cls;
1310   struct GNUNET_MessageHeader *msg = ohh->msg;
1311   uint16_t ssize;
1312
1313   if (NULL == buf)
1314   {
1315     LOG (GNUNET_ERROR_TYPE_DEBUG,
1316          "Timeout while trying to transmit `%s' request.\n",
1317          "HELLO");
1318     if (NULL != ohh->cont)
1319       ohh->cont (ohh->cls);
1320     GNUNET_free (msg);
1321     GNUNET_CONTAINER_DLL_remove (ohh->th->oh_head,
1322                                  ohh->th->oh_tail,
1323                                  ohh);
1324     GNUNET_free (ohh);
1325     return 0;
1326   }
1327   LOG (GNUNET_ERROR_TYPE_DEBUG,
1328        "Transmitting `%s' request.\n",
1329        "HELLO");
1330   ssize = ntohs (msg->size);
1331   GNUNET_assert (size >= ssize);
1332   memcpy (buf,
1333           msg,
1334           ssize);
1335   GNUNET_free (msg);
1336   if (NULL != ohh->cont)
1337     ohh->cont (ohh->cls);
1338   GNUNET_CONTAINER_DLL_remove (ohh->th->oh_head,
1339                                ohh->th->oh_tail,
1340                                ohh);
1341   GNUNET_free (ohh);
1342   return ssize;
1343 }
1344
1345
1346 /**
1347  * Send traffic metric message to the service.
1348  *
1349  * @param cls the message to send
1350  * @param size number of bytes available in @a buf
1351  * @param buf where to copy the message
1352  * @return number of bytes copied to @a buf
1353  */
1354 static size_t
1355 send_metric (void *cls,
1356              size_t size,
1357              void *buf)
1358 {
1359   struct TrafficMetricMessage *msg = cls;
1360   uint16_t ssize;
1361
1362   if (NULL == buf)
1363   {
1364     LOG (GNUNET_ERROR_TYPE_DEBUG,
1365          "Timeout while trying to transmit TRAFFIC_METRIC request.\n");
1366     GNUNET_free (msg);
1367     return 0;
1368   }
1369   LOG (GNUNET_ERROR_TYPE_DEBUG,
1370        "Transmitting TRAFFIC_METRIC request.\n");
1371   ssize = ntohs (msg->header.size);
1372   GNUNET_assert (size >= ssize);
1373   memcpy (buf, msg, ssize);
1374   GNUNET_free (msg);
1375   return ssize;
1376 }
1377
1378
1379 /**
1380  * Set transport metrics for a peer and a direction
1381  *
1382  * @param handle transport handle
1383  * @param peer the peer to set the metric for
1384  * @param prop the performance metrics to set
1385  * @param delay_in inbound delay to introduce
1386  * @param delay_out outbound delay to introduce
1387  *
1388  * Note: Delay restrictions in receiving direction will be enforced
1389  * with one message delay.
1390  */
1391 void
1392 GNUNET_TRANSPORT_set_traffic_metric (struct GNUNET_TRANSPORT_Handle *handle,
1393                                      const struct GNUNET_PeerIdentity *peer,
1394                                      const struct GNUNET_ATS_Properties *prop,
1395                                      struct GNUNET_TIME_Relative delay_in,
1396                                      struct GNUNET_TIME_Relative delay_out)
1397 {
1398   struct TrafficMetricMessage *msg;
1399
1400   msg = GNUNET_new (struct TrafficMetricMessage);
1401   msg->header.size = htons (sizeof (struct TrafficMetricMessage));
1402   msg->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_TRAFFIC_METRIC);
1403   msg->reserved = htonl (0);
1404   msg->peer = *peer;
1405   GNUNET_ATS_properties_hton (&msg->properties,
1406                               prop);
1407   msg->delay_in = GNUNET_TIME_relative_hton (delay_in);
1408   msg->delay_out = GNUNET_TIME_relative_hton (delay_out);
1409   schedule_control_transmit (handle,
1410                              sizeof (struct TrafficMetricMessage),
1411                              &send_metric,
1412                              msg);
1413 }
1414
1415
1416 /**
1417  * Offer the transport service the HELLO of another peer.  Note that
1418  * the transport service may just ignore this message if the HELLO is
1419  * malformed or useless due to our local configuration.
1420  *
1421  * @param handle connection to transport service
1422  * @param hello the hello message
1423  * @param cont continuation to call when HELLO has been sent,
1424  *      tc reason #GNUNET_SCHEDULER_REASON_TIMEOUT for fail
1425  *      tc reasong #GNUNET_SCHEDULER_REASON_READ_READY for success
1426  * @param cont_cls closure for @a cont
1427  * @return a `struct GNUNET_TRANSPORT_OfferHelloHandle` handle or NULL on failure,
1428  *      in case of failure @a cont will not be called
1429  *
1430  */
1431 struct GNUNET_TRANSPORT_OfferHelloHandle *
1432 GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle,
1433                               const struct GNUNET_MessageHeader *hello,
1434                               GNUNET_SCHEDULER_TaskCallback cont,
1435                               void *cont_cls)
1436 {
1437   struct GNUNET_TRANSPORT_OfferHelloHandle *ohh;
1438   struct GNUNET_MessageHeader *msg;
1439   struct GNUNET_PeerIdentity peer;
1440   uint16_t size;
1441
1442   if (NULL == handle->client)
1443     return NULL;
1444   GNUNET_break (ntohs (hello->type) == GNUNET_MESSAGE_TYPE_HELLO);
1445   size = ntohs (hello->size);
1446   GNUNET_break (size >= sizeof (struct GNUNET_MessageHeader));
1447   if (GNUNET_OK !=
1448       GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) hello,
1449                            &peer))
1450   {
1451     GNUNET_break (0);
1452     return NULL;
1453   }
1454
1455   msg = GNUNET_malloc (size);
1456   memcpy (msg, hello, size);
1457   LOG (GNUNET_ERROR_TYPE_DEBUG,
1458        "Offering HELLO message of `%s' to transport for validation.\n",
1459        GNUNET_i2s (&peer));
1460
1461   ohh = GNUNET_new (struct GNUNET_TRANSPORT_OfferHelloHandle);
1462   ohh->th = handle;
1463   ohh->cont = cont;
1464   ohh->cls = cont_cls;
1465   ohh->msg = msg;
1466   ohh->tth = schedule_control_transmit (handle,
1467                                         size,
1468                                         &send_hello,
1469                                         ohh);
1470   GNUNET_CONTAINER_DLL_insert (handle->oh_head,
1471                                handle->oh_tail,
1472                                ohh);
1473   return ohh;
1474 }
1475
1476
1477 /**
1478  * Cancel the request to transport to offer the HELLO message
1479  *
1480  * @param ohh the handle for the operation to cancel
1481  */
1482 void
1483 GNUNET_TRANSPORT_offer_hello_cancel (struct GNUNET_TRANSPORT_OfferHelloHandle *ohh)
1484 {
1485   struct GNUNET_TRANSPORT_Handle *th = ohh->th;
1486
1487   cancel_control_transmit (ohh->th, ohh->tth);
1488   GNUNET_CONTAINER_DLL_remove (th->oh_head,
1489                                th->oh_tail,
1490                                ohh);
1491   GNUNET_free (ohh->msg);
1492   GNUNET_free (ohh);
1493 }
1494
1495
1496 /**
1497  * Checks if a given peer is connected to us
1498  *
1499  * @param handle connection to transport service
1500  * @param peer the peer to check
1501  * @return #GNUNET_YES (connected) or #GNUNET_NO (disconnected)
1502  */
1503 int
1504 GNUNET_TRANSPORT_check_peer_connected (struct GNUNET_TRANSPORT_Handle *handle,
1505                                        const struct GNUNET_PeerIdentity *peer)
1506 {
1507   if (GNUNET_YES ==
1508       GNUNET_CONTAINER_multipeermap_contains (handle->neighbours,
1509                                               peer))
1510     return GNUNET_YES;
1511   return GNUNET_NO;
1512 }
1513
1514
1515 /**
1516  * Task to call the HelloUpdateCallback of the GetHelloHandle
1517  *
1518  * @param cls the `struct GNUNET_TRANSPORT_GetHelloHandle`
1519  */
1520 static void
1521 call_hello_update_cb_async (void *cls)
1522 {
1523   struct GNUNET_TRANSPORT_GetHelloHandle *ghh = cls;
1524
1525   GNUNET_assert (NULL != ghh->handle->my_hello);
1526   GNUNET_assert (NULL != ghh->notify_task);
1527   ghh->notify_task = NULL;
1528   ghh->rec (ghh->rec_cls,
1529             ghh->handle->my_hello);
1530 }
1531
1532
1533 /**
1534  * Obtain the HELLO message for this peer.  The callback given in this function
1535  * is never called synchronously.
1536  *
1537  * @param handle connection to transport service
1538  * @param rec function to call with the HELLO, sender will be our peer
1539  *            identity; message and sender will be NULL on timeout
1540  *            (handshake with transport service pending/failed).
1541  *             cost estimate will be 0.
1542  * @param rec_cls closure for @a rec
1543  * @return handle to cancel the operation
1544  */
1545 struct GNUNET_TRANSPORT_GetHelloHandle *
1546 GNUNET_TRANSPORT_get_hello (struct GNUNET_TRANSPORT_Handle *handle,
1547                             GNUNET_TRANSPORT_HelloUpdateCallback rec,
1548                             void *rec_cls)
1549 {
1550   struct GNUNET_TRANSPORT_GetHelloHandle *hwl;
1551
1552   hwl = GNUNET_new (struct GNUNET_TRANSPORT_GetHelloHandle);
1553   hwl->rec = rec;
1554   hwl->rec_cls = rec_cls;
1555   hwl->handle = handle;
1556   GNUNET_CONTAINER_DLL_insert (handle->hwl_head,
1557                                handle->hwl_tail,
1558                                hwl);
1559   if (NULL != handle->my_hello)
1560     hwl->notify_task = GNUNET_SCHEDULER_add_now (&call_hello_update_cb_async,
1561                                                  hwl);
1562   return hwl;
1563 }
1564
1565
1566 /**
1567  * Stop receiving updates about changes to our HELLO message.
1568  *
1569  * @param ghh handle to cancel
1570  */
1571 void
1572 GNUNET_TRANSPORT_get_hello_cancel (struct GNUNET_TRANSPORT_GetHelloHandle *ghh)
1573 {
1574   struct GNUNET_TRANSPORT_Handle *handle = ghh->handle;
1575
1576   if (NULL != ghh->notify_task)
1577     GNUNET_SCHEDULER_cancel (ghh->notify_task);
1578   GNUNET_CONTAINER_DLL_remove (handle->hwl_head,
1579                                handle->hwl_tail,
1580                                ghh);
1581   GNUNET_free (ghh);
1582 }
1583
1584
1585 /**
1586  * Connect to the transport service.  Note that the connection may
1587  * complete (or fail) asynchronously.
1588  *
1589  * @param cfg configuration to use
1590  * @param self our own identity (API should check that it matches
1591  *             the identity found by transport), or NULL (no check)
1592  * @param cls closure for the callbacks
1593  * @param rec receive function to call
1594  * @param nc function to call on connect events
1595  * @param nd function to call on disconnect events
1596  * @return NULL on error
1597  */
1598 struct GNUNET_TRANSPORT_Handle *
1599 GNUNET_TRANSPORT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
1600                           const struct GNUNET_PeerIdentity *self,
1601                           void *cls,
1602                           GNUNET_TRANSPORT_ReceiveCallback rec,
1603                           GNUNET_TRANSPORT_NotifyConnect nc,
1604                           GNUNET_TRANSPORT_NotifyDisconnect nd)
1605 {
1606   return GNUNET_TRANSPORT_connect2 (cfg,
1607                                     self,
1608                                     cls,
1609                                     rec,
1610                                     nc,
1611                                     nd,
1612                                     NULL);
1613 }
1614
1615
1616 /**
1617  * Connect to the transport service.  Note that the connection may
1618  * complete (or fail) asynchronously.
1619  *
1620  * @param cfg configuration to use
1621  * @param self our own identity (API should check that it matches
1622  *             the identity found by transport), or NULL (no check)
1623  * @param cls closure for the callbacks
1624  * @param rec receive function to call
1625  * @param nc function to call on connect events
1626  * @param nd function to call on disconnect events
1627  * @param neb function to call if we have excess bandwidth to a peer
1628  * @return NULL on error
1629  */
1630 struct GNUNET_TRANSPORT_Handle *
1631 GNUNET_TRANSPORT_connect2 (const struct GNUNET_CONFIGURATION_Handle *cfg,
1632                            const struct GNUNET_PeerIdentity *self,
1633                            void *cls,
1634                            GNUNET_TRANSPORT_ReceiveCallback rec,
1635                            GNUNET_TRANSPORT_NotifyConnect nc,
1636                            GNUNET_TRANSPORT_NotifyDisconnect nd,
1637                            GNUNET_TRANSPORT_NotifyExcessBandwidth neb)
1638 {
1639   struct GNUNET_TRANSPORT_Handle *ret;
1640
1641   ret = GNUNET_new (struct GNUNET_TRANSPORT_Handle);
1642   if (NULL != self)
1643   {
1644     ret->self = *self;
1645     ret->check_self = GNUNET_YES;
1646   }
1647   ret->cfg = cfg;
1648   ret->cls = cls;
1649   ret->rec = rec;
1650   ret->nc_cb = nc;
1651   ret->nd_cb = nd;
1652   ret->neb_cb = neb;
1653   ret->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
1654   LOG (GNUNET_ERROR_TYPE_DEBUG,
1655        "Connecting to transport service.\n");
1656   ret->client = GNUNET_CLIENT_connect ("transport",
1657                                        cfg);
1658   if (NULL == ret->client)
1659   {
1660     GNUNET_free (ret);
1661     return NULL;
1662   }
1663   ret->neighbours =
1664     GNUNET_CONTAINER_multipeermap_create (STARTING_NEIGHBOURS_SIZE,
1665                                           GNUNET_YES);
1666   ret->ready_heap =
1667       GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1668   schedule_control_transmit (ret,
1669                              sizeof (struct StartMessage),
1670                              &send_start,
1671                              ret);
1672   return ret;
1673 }
1674
1675
1676 /**
1677  * Disconnect from the transport service.
1678  *
1679  * @param handle handle to the service as returned from #GNUNET_TRANSPORT_connect()
1680  */
1681 void
1682 GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle)
1683 {
1684   LOG (GNUNET_ERROR_TYPE_DEBUG,
1685        "Transport disconnect called!\n");
1686   /* this disconnects all neighbours... */
1687   if (NULL == handle->reconnect_task)
1688     disconnect_and_schedule_reconnect (handle);
1689   /* and now we stop trying to connect again... */
1690   if (NULL != handle->reconnect_task)
1691   {
1692     GNUNET_SCHEDULER_cancel (handle->reconnect_task);
1693     handle->reconnect_task = NULL;
1694   }
1695   GNUNET_CONTAINER_multipeermap_destroy (handle->neighbours);
1696   handle->neighbours = NULL;
1697   if (NULL != handle->quota_task)
1698   {
1699     GNUNET_SCHEDULER_cancel (handle->quota_task);
1700     handle->quota_task = NULL;
1701   }
1702   GNUNET_free_non_null (handle->my_hello);
1703   handle->my_hello = NULL;
1704   GNUNET_assert (NULL == handle->hwl_head);
1705   GNUNET_assert (NULL == handle->hwl_tail);
1706   GNUNET_CONTAINER_heap_destroy (handle->ready_heap);
1707   handle->ready_heap = NULL;
1708   GNUNET_free (handle);
1709 }
1710
1711
1712 /**
1713  * Check if we could queue a message of the given size for
1714  * transmission.  The transport service will take both its
1715  * internal buffers and bandwidth limits imposed by the
1716  * other peer into consideration when answering this query.
1717  *
1718  * @param handle connection to transport service
1719  * @param target who should receive the message
1720  * @param size how big is the message we want to transmit?
1721  * @param timeout after how long should we give up (and call
1722  *        notify with buf NULL and size 0)?
1723  * @param notify function to call when we are ready to
1724  *        send such a message
1725  * @param notify_cls closure for @a notify
1726  * @return NULL if someone else is already waiting to be notified
1727  *         non-NULL if the notify callback was queued (can be used to cancel
1728  *         using #GNUNET_TRANSPORT_notify_transmit_ready_cancel)
1729  */
1730 struct GNUNET_TRANSPORT_TransmitHandle *
1731 GNUNET_TRANSPORT_notify_transmit_ready (struct GNUNET_TRANSPORT_Handle *handle,
1732                                         const struct GNUNET_PeerIdentity *target,
1733                                         size_t size,
1734                                         struct GNUNET_TIME_Relative timeout,
1735                                         GNUNET_TRANSPORT_TransmitReadyNotify notify,
1736                                         void *notify_cls)
1737 {
1738   struct Neighbour *n;
1739   struct GNUNET_TRANSPORT_TransmitHandle *th;
1740   struct GNUNET_TIME_Relative delay;
1741
1742   n = neighbour_find (handle, target);
1743   if (NULL == n)
1744   {
1745     /* only use this function
1746      * once a connection has been established */
1747     GNUNET_assert (0);
1748     return NULL;
1749   }
1750   if (NULL != n->th)
1751   {
1752     /* attempt to send two messages at the same time to the same peer */
1753     GNUNET_assert (0);
1754     return NULL;
1755   }
1756   GNUNET_assert (NULL == n->hn);
1757   th = GNUNET_new (struct GNUNET_TRANSPORT_TransmitHandle);
1758   th->neighbour = n;
1759   th->notify = notify;
1760   th->notify_cls = notify_cls;
1761   th->request_start = GNUNET_TIME_absolute_get ();
1762   th->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1763   th->notify_size = size;
1764   n->th = th;
1765   /* calculate when our transmission should be ready */
1766   delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
1767                                               size + n->traffic_overhead);
1768   n->traffic_overhead = 0;
1769   if (delay.rel_value_us > timeout.rel_value_us)
1770     delay.rel_value_us = 0;        /* notify immediately (with failure) */
1771   if (delay.rel_value_us > GNUNET_CONSTANTS_LATENCY_WARN.rel_value_us)
1772     LOG (GNUNET_ERROR_TYPE_WARNING,
1773          "At bandwidth %u byte/s next transmission to %s in %s\n",
1774          (unsigned int) n->out_tracker.available_bytes_per_s__,
1775          GNUNET_i2s (target),
1776          GNUNET_STRINGS_relative_time_to_string (delay,
1777                                                  GNUNET_YES));
1778   else
1779     LOG (GNUNET_ERROR_TYPE_DEBUG,
1780          "At bandwidth %u byte/s next transmission to %s in %s\n",
1781          (unsigned int) n->out_tracker.available_bytes_per_s__,
1782          GNUNET_i2s (target),
1783          GNUNET_STRINGS_relative_time_to_string (delay,
1784                                                  GNUNET_YES));
1785   n->hn = GNUNET_CONTAINER_heap_insert (handle->ready_heap,
1786                                         n,
1787                                         delay.rel_value_us);
1788   schedule_transmission (handle);
1789   return th;
1790 }
1791
1792
1793 /**
1794  * Cancel the specified transmission-ready notification.
1795  *
1796  * @param th handle returned from #GNUNET_TRANSPORT_notify_transmit_ready()
1797  */
1798 void
1799 GNUNET_TRANSPORT_notify_transmit_ready_cancel (struct GNUNET_TRANSPORT_TransmitHandle *th)
1800 {
1801   struct Neighbour *n;
1802
1803   GNUNET_assert (NULL == th->next);
1804   GNUNET_assert (NULL == th->prev);
1805   n = th->neighbour;
1806   GNUNET_assert (th == n->th);
1807   n->th = NULL;
1808   if (NULL != n->hn)
1809   {
1810     GNUNET_CONTAINER_heap_remove_node (n->hn);
1811     n->hn = NULL;
1812   }
1813   else
1814   {
1815     GNUNET_assert (NULL != th->timeout_task);
1816     GNUNET_SCHEDULER_cancel (th->timeout_task);
1817     th->timeout_task = NULL;
1818   }
1819   GNUNET_free (th);
1820 }
1821
1822
1823 /* end of transport_api.c */