(no commit message)
[oweals/gnunet.git] / src / transport / transport_api_new.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/transport_api.c
23  * @brief library to access the low-level P2P IO service
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - adjust testcases to use new 'try connect' style (should be easy, breaks API compatibility!)
28  * - adjust core service to use new 'try connect' style (should be MUCH nicer there as well!)
29  * - test test test
30  */
31 #include "platform.h"
32 #include "gnunet_bandwidth_lib.h"
33 #include "gnunet_client_lib.h"
34 #include "gnunet_constants.h"
35 #include "gnunet_container_lib.h"
36 #include "gnunet_arm_service.h"
37 #include "gnunet_hello_lib.h"
38 #include "gnunet_protocols.h"
39 #include "gnunet_server_lib.h"
40 #include "gnunet_time_lib.h"
41 #include "gnunet_transport_service.h"
42 #include "transport.h"
43
44 /**
45  * How large to start with for the hashmap of neighbours.
46  */
47 #define STARTING_NEIGHBOURS_SIZE 16
48
49
50 /**
51  * Handle for a message that should be transmitted to the service.
52  * Used for both control messages and normal messages.
53  */
54 struct GNUNET_TRANSPORT_TransmitHandle
55 {
56
57   /**
58    * We keep all requests in a DLL.
59    */
60   struct GNUNET_TRANSPORT_TransmitHandle *next;
61
62   /**
63    * We keep all requests in a DLL.
64    */
65   struct GNUNET_TRANSPORT_TransmitHandle *prev;
66
67   /**
68    * Neighbour for this handle, NULL for control messages.
69    */
70   struct Neighbour *neighbour;
71
72   /**
73    * Function to call when notify_size bytes are available
74    * for transmission.
75    */
76   GNUNET_CONNECTION_TransmitReadyNotify notify;
77
78   /**
79    * Closure for notify.
80    */
81   void *notify_cls;
82
83   /**
84    * Timeout for this request, 0 for control messages.
85    */
86   struct GNUNET_TIME_Absolute timeout;
87
88   /**
89    * Task to trigger request timeout if the request is stalled due to 
90    * congestion.
91    */
92   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
93
94   /**
95    * How many bytes is our notify callback waiting for?
96    */
97   size_t notify_size;
98
99   /**
100    * How important is this message? Not used for control messages.
101    */
102   uint32_t priority;
103
104 };
105
106
107 /**
108  * Entry in hash table of all of our current neighbours.
109  */
110 struct Neighbour
111 {
112   /**
113    * Overall transport handle.
114    */
115   struct GNUNET_TRANSPORT_Handle *h;
116
117   /**
118    * Active transmit handle or NULL.
119    */
120   struct GNUNET_TRANSPORT_TransmitHandle *th;
121
122   /**
123    * Identity of this neighbour.
124    */
125   struct GNUNET_PeerIdentity id;
126
127   /**
128    * Outbound bandwidh tracker.
129    */
130   struct GNUNET_BANDWIDTH_Tracker out_tracker;
131
132   /**
133    * Entry in our readyness heap (which is sorted by 'next_ready'
134    * value).  NULL if there is no pending transmission request for
135    * this neighbour or if we're waiting for 'is_ready' to become
136    * true AFTER the 'out_tracker' suggested that this peer's quota
137    * has been satisfied (so once 'is_ready' goes to GNUNET_YES,
138    * we should immediately go back into the heap).
139    */
140   struct GNUNET_CONTAINER_HeapNode *hn;
141
142   /**
143    * Is this peer currently ready to receive a message?
144    */
145   int is_ready;
146
147 };
148
149
150 /**
151  * Linked list of functions to call whenever our HELLO is updated.
152  */
153 struct HelloWaitList
154 {
155
156   /**
157    * This is a doubly linked list.
158    */
159   struct HelloWaitList *next;
160
161   /**
162    * This is a doubly linked list.
163    */
164   struct HelloWaitList *prev;
165
166   /**
167    * Callback to call once we got our HELLO.
168    */
169   GNUNET_TRANSPORT_HelloUpdateCallback rec;
170
171   /**
172    * Closure for rec.
173    */
174   void *rec_cls;
175
176 };
177
178
179 /**
180  * Handle for the transport service (includes all of the
181  * state for the transport service).
182  */
183 struct GNUNET_TRANSPORT_Handle
184 {
185
186   /**
187    * Closure for the callbacks.
188    */
189   void *cls;
190
191   /**
192    * Function to call for received data.
193    */
194   GNUNET_TRANSPORT_ReceiveCallback rec;
195
196   /**
197    * function to call on connect events
198    */
199   GNUNET_TRANSPORT_NotifyConnect nc_cb;
200
201   /**
202    * function to call on disconnect events
203    */
204   GNUNET_TRANSPORT_NotifyDisconnect nd_cb;
205
206   /**
207    * Head of DLL of control messages.
208    */
209   struct GNUNET_TRANSPORT_TransmitHandle *control_head;
210
211   /**
212    * Tail of DLL of control messages.
213    */
214   struct GNUNET_TRANSPORT_TransmitHandle *control_tail;
215
216   /**
217    * The current HELLO message for this peer.  Updated
218    * whenever transports change their addresses.
219    */
220   struct GNUNET_HELLO_Message *my_hello;
221
222   /**
223    * My client connection to the transport service.
224    */
225   struct GNUNET_CLIENT_Connection *client;
226
227   /**
228    * Handle to our registration with the client for notification.
229    */
230   struct GNUNET_CLIENT_TransmitHandle *cth;
231
232   /**
233    * Linked list of pending requests for our HELLO.
234    */
235   struct HelloWaitList *hwl_head;
236
237   /**
238    * Linked list of pending requests for our HELLO.
239    */
240   struct HelloWaitList *hwl_tail;
241
242   /**
243    * My configuration.
244    */
245   const struct GNUNET_CONFIGURATION_Handle *cfg;
246
247   /**
248    * Hash map of the current connected neighbours of this peer.
249    * Maps peer identities to 'struct Neighbour' entries.
250    */
251   struct GNUNET_CONTAINER_MultiHashMap *neighbours;
252
253   /**
254    * Heap sorting peers with pending messages by the timestamps that
255    * specify when we could next send a message to the respective peer.
256    * Excludes control messages (which can always go out immediately).
257    * Maps time stamps to 'struct Neighbour' entries.
258    */ 
259   struct GNUNET_CONTAINER_Heap *ready_heap;
260
261   /**
262    * Peer identity as assumed by this process, or all zeros.
263    */
264   struct GNUNET_PeerIdentity self;
265
266   /**
267    * ID of the task trying to reconnect to the service.
268    */
269   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
270
271   /**
272    * ID of the task trying to trigger transmission for a peer while
273    * maintaining bandwidth quotas.  In use if there are no control
274    * messages and the smallest entry in the 'ready_heap' has a time
275    * stamp in the future.
276    */
277   GNUNET_SCHEDULER_TaskIdentifier quota_task;
278
279   /**
280    * Delay until we try to reconnect.
281    */
282   struct GNUNET_TIME_Relative reconnect_delay;
283
284   /**
285    * Should we check that 'self' matches what the service thinks?
286    * (if GNUNET_NO, then 'self' is all zeros!).
287    */
288   int check_self;
289 };
290
291
292 /**
293  * Schedule the task to send one message, either from the control
294  * list or the peer message queues  to the service.
295  *
296  * @param h transport service to schedule a transmission for
297  */
298 static void
299 schedule_transmission (struct GNUNET_TRANSPORT_Handle *h);
300
301
302 /**
303  * Function that will schedule the job that will try
304  * to connect us again to the client.
305  *
306  * @param h transport service to reconnect
307  */
308 static void
309 disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h);
310
311
312 /**
313  * Get the neighbour list entry for the given peer
314  *
315  * @param h our context
316  * @param peer peer to look up
317  * @return NULL if no such peer entry exists
318  */
319 static struct Neighbour *
320 neighbour_find (struct GNUNET_TRANSPORT_Handle *h,
321                 const struct GNUNET_PeerIdentity *peer)
322 {
323   return GNUNET_CONTAINER_multihashmap_get(h->neighbours, &peer->hashPubKey);
324 }
325
326
327 /**
328  * Add neighbour to our list
329  *
330  * @return NULL if this API is currently disconnecting from the service
331  */
332 static struct Neighbour *
333 neighbour_add (struct GNUNET_TRANSPORT_Handle *h,
334                const struct GNUNET_PeerIdentity *pid)
335 {
336   struct Neighbour *n;
337
338 #if DEBUG_TRANSPORT
339   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
340               "Creating entry for neighbour `%4s'.\n",
341               GNUNET_i2s (pid));
342 #endif
343   n = GNUNET_malloc (sizeof (struct Neighbour));
344   n->id = *pid;
345   n->h = h;
346   n->is_ready = GNUNET_YES;
347   GNUNET_BANDWIDTH_tracker_init (&n->out_tracker,
348                                  GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
349                                  MAX_BANDWIDTH_CARRY_S);
350   GNUNET_CONTAINER_multihashmap_put (h->neighbours,
351                                      &pid->hashPubKey,
352                                      n,
353                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
354   return n;
355 }
356
357
358 /**
359  * Iterator over hash map entries, for deleting state of a neighbour.
360  *
361  * @param cls the 'struct GNUNET_TRANSPORT_Handle*'
362  * @param key peer identity
363  * @param value value in the hash map, the neighbour entry to delete
364  * @return GNUNET_YES if we should continue to
365  *         iterate,
366  *         GNUNET_NO if not.
367  */
368 static int
369 neighbour_delete (void *cls,
370                   const GNUNET_HashCode * key,
371                   void *value)
372 {
373   struct GNUNET_TRANSPORT_Handle *handle = cls;
374   struct Neighbour *n = value;
375
376   if (NULL != handle->nd_cb)
377     handle->nd_cb (handle->cls,
378                    &n->id);
379   GNUNET_assert (NULL == n->th);
380   GNUNET_assert (NULL == n->hn);
381   GNUNET_CONTAINER_multihashmap_remove (handle->neighbours,
382                                         key,
383                                         n);
384   GNUNET_free (n);
385   return GNUNET_YES;
386 }
387
388
389 /**
390  * Function we use for handling incoming messages.
391  *
392  * @param cls closure (struct GNUNET_TRANSPORT_Handle *)
393  * @param msg message received, NULL on timeout or fatal error
394  */
395 static void
396 demultiplexer (void *cls,
397                const struct GNUNET_MessageHeader *msg)
398 {
399   struct GNUNET_TRANSPORT_Handle *h = cls;
400   const struct DisconnectInfoMessage *dim;
401   const struct ConnectInfoMessage *cim;
402   const struct InboundMessage *im;
403   const struct GNUNET_MessageHeader *imm;
404   const struct SendOkMessage *okm;
405   struct HelloWaitList *hwl;
406   struct HelloWaitList *next_hwl;
407   struct Neighbour *n;
408   struct GNUNET_PeerIdentity me;
409   uint16_t size;
410   uint32_t ats_count;
411
412   GNUNET_assert (h->client != NULL);
413   if (msg == NULL)
414     {
415 #if DEBUG_TRANSPORT
416       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
417                   "Error receiving from transport service, disconnecting temporarily.\n");
418 #endif
419       disconnect_and_schedule_reconnect (h);
420       return;
421     }
422   GNUNET_CLIENT_receive (h->client,
423                          &demultiplexer, h, 
424                          GNUNET_TIME_UNIT_FOREVER_REL);
425   size = ntohs (msg->size);
426   switch (ntohs (msg->type))
427     {
428     case GNUNET_MESSAGE_TYPE_HELLO:
429       if (GNUNET_OK !=
430           GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg,
431                                &me))
432         {
433           GNUNET_break (0);
434           break;
435         }
436 #if DEBUG_TRANSPORT
437       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
438                   "Receiving (my own) `%s' message, I am `%4s'.\n",
439                   "HELLO", GNUNET_i2s (&me));
440 #endif
441       GNUNET_free_non_null (h->my_hello);
442       h->my_hello = NULL;
443       if (size < sizeof (struct GNUNET_MessageHeader))
444         {
445           GNUNET_break (0);
446           break;
447         }
448       h->my_hello = GNUNET_malloc (size);
449       memcpy (h->my_hello, msg, size);
450       hwl = h->hwl_head;
451       while (NULL != hwl)
452         {
453           next_hwl = hwl->next;
454           hwl->rec (hwl->rec_cls,
455                     (const struct GNUNET_MessageHeader *) h->my_hello);
456           hwl = next_hwl;
457         }
458       break;
459     case GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT:
460       if (size < sizeof (struct ConnectInfoMessage))
461         {
462           GNUNET_break (0);
463           break;
464         }
465       cim = (const struct ConnectInfoMessage *) msg;
466       ats_count = ntohl (cim->ats_count);
467       if (size != sizeof (struct ConnectInfoMessage) + ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information))
468         {
469           GNUNET_break (0);
470           break;
471         }
472 #if DEBUG_TRANSPORT
473       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
474                   "Receiving `%s' message for `%4s'.\n",
475                   "CONNECT", GNUNET_i2s (&cim->id));
476 #endif
477       n = neighbour_find (h, &cim->id);
478       if (n != NULL)
479         {
480           GNUNET_break (0);
481           break;
482         }
483       n = neighbour_add (h, &cim->id);
484       if (h->nc_cb != NULL)
485         h->nc_cb (h->cls, &n->id,
486                   &cim->ats, ats_count);
487       break;
488     case GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT:
489       if (size != sizeof (struct DisconnectInfoMessage))
490         {
491           GNUNET_break (0);
492           break;
493         }
494       dim = (const struct DisconnectInfoMessage *) msg;
495       GNUNET_break (ntohl (dim->reserved) == 0);
496 #if DEBUG_TRANSPORT_DISCONNECT
497       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
498                   "Receiving `%s' message for `%4s'.\n",
499                   "DISCONNECT",
500                   GNUNET_i2s (&dim->peer));
501 #endif
502       n = neighbour_find (h, &dim->peer);
503       if (n == NULL)
504         {
505           GNUNET_break (0);
506           break;
507         }
508       neighbour_delete (h, &dim->peer.hashPubKey, n);
509       break;
510     case GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK:
511       if (size != sizeof (struct SendOkMessage))
512         {
513           GNUNET_break (0);
514           break;
515         }
516       okm = (const struct SendOkMessage *) msg;
517 #if DEBUG_TRANSPORT
518       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
519                   "Receiving `%s' message, transmission %s.\n", "SEND_OK",
520                   ntohl (okm->success) == GNUNET_OK ? "succeeded" : "failed");
521 #endif
522       n = neighbour_find (h, &okm->peer);
523       if (n == NULL)
524         break;  
525       GNUNET_break (GNUNET_NO == n->is_ready);
526       n->is_ready = GNUNET_YES;
527       if ( (n->th != NULL) &&
528            (n->hn == NULL) )
529         {
530           GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != n->th->timeout_task);
531           GNUNET_SCHEDULER_cancel (n->th->timeout_task);
532           n->th->timeout_task = GNUNET_SCHEDULER_NO_TASK;
533           /* we've been waiting for this (congestion, not quota, 
534              caused delayed transmission) */
535           n->hn = GNUNET_CONTAINER_heap_insert (h->ready_heap,
536                                                 n, 0);
537           schedule_transmission (h);
538         }
539       break;
540     case GNUNET_MESSAGE_TYPE_TRANSPORT_RECV:
541 #if DEBUG_TRANSPORT
542       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
543                   "Receiving `%s' message.\n", "RECV");
544 #endif
545       if (size <
546           sizeof (struct InboundMessage) +
547           sizeof (struct GNUNET_MessageHeader))
548         {
549           GNUNET_break (0);
550           break;
551         }
552       im = (const struct InboundMessage *) msg;
553       GNUNET_break (0 == ntohl (im->reserved));
554       ats_count = ntohl(im->ats_count);
555       imm = (const struct GNUNET_MessageHeader *) &((&(im->ats))[ats_count+1]);
556
557       if (ntohs (imm->size) + sizeof (struct InboundMessage) + ats_count * sizeof (struct GNUNET_TRANSPORT_ATS_Information) != size)
558         {
559           GNUNET_break (0);
560           break;
561         }
562 #if DEBUG_TRANSPORT
563       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
564                   "Received message of type %u from `%4s'.\n",
565                   ntohs (imm->type), GNUNET_i2s (&im->peer));
566 #endif
567       n = neighbour_find (h, &im->peer);
568       if (n == NULL)
569         {
570           GNUNET_break (0);
571           break;
572         }
573       if (h->rec != NULL)
574         h->rec (h->cls, &im->peer, imm,
575                 &im->ats, ats_count);
576       break;
577     default:
578       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
579                   _
580                   ("Received unexpected message of type %u in %s:%u\n"),
581                   ntohs (msg->type), __FILE__, __LINE__);
582       GNUNET_break (0);
583       break;
584     }
585 }
586
587
588 /**
589  * A transmission request could not be satisfied because of
590  * network congestion.  Notify the initiator and clean up.
591  *
592  * @param cls the 'struct GNUNET_TRANSPORT_TransmitHandle'
593  * @param tc scheduler context
594  */
595 static void
596 timeout_request_due_to_congestion (void *cls,
597                                    const struct GNUNET_SCHEDULER_TaskContext *tc)
598 {
599   struct GNUNET_TRANSPORT_TransmitHandle *th = cls;
600   struct Neighbour *n = th->neighbour;
601
602   n->th->timeout_task = GNUNET_SCHEDULER_NO_TASK;
603   GNUNET_assert (th == n->th);
604   GNUNET_assert (NULL == n->hn);
605   n->th = NULL;
606   th->notify (th->notify_cls, 0, NULL);
607   GNUNET_free (th);
608 }
609
610
611 /**
612  * Transmit message(s) to service.
613  *
614  * @param cls handle to transport
615  * @param size number of bytes available in buf
616  * @param buf where to copy the message
617  * @return number of bytes copied to buf
618  */
619 static size_t
620 transport_notify_ready (void *cls, size_t size, void *buf)
621 {
622   struct GNUNET_TRANSPORT_Handle *h = cls;
623   struct GNUNET_TRANSPORT_TransmitHandle *th;
624   struct Neighbour *n;
625   char *cbuf;
626   struct OutboundMessage obm;
627   size_t ret;
628   size_t nret;
629   size_t mret;
630
631   GNUNET_assert (NULL != h->client);
632   h->cth = NULL;
633   if (NULL == buf)
634     {
635       /* transmission failed */
636       disconnect_and_schedule_reconnect (h);
637       return 0;
638     }
639
640   cbuf = buf;
641   ret = 0;
642   /* first send control messages */
643   while ( (NULL != (th = h->control_head)) &&
644           (th->notify_size <= size) )
645     {
646       GNUNET_CONTAINER_DLL_remove (h->control_head,
647                                    h->control_tail,
648                                    th);
649       nret = th->notify (th->notify_cls, size, &cbuf[ret]);
650 #if DEBUG_TRANSPORT
651       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
652                   "Added %u bytes of control message at %u\n",
653                   nret,
654                   ret);
655 #endif
656       GNUNET_free (th);
657       ret += nret;
658       size -= nret;
659     }
660
661   /* then, if possible and no control messages pending, send data messages */
662   while ( (NULL == h->control_head) &&
663           (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap))) )
664     {
665        if (GNUNET_YES != n->is_ready)
666         {
667           /* peer not ready, wait for notification! */
668           GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap));
669           n->hn = NULL;
670           GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == n->th->timeout_task);
671           n->th->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (n->th->timeout),
672                                                               &timeout_request_due_to_congestion,
673                                                               n->th);
674           continue;
675         }
676       th = n->th;
677       if (th->notify_size + sizeof (struct OutboundMessage) > size)
678         break; /* does not fit */
679       if (GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, th->notify_size).rel_value > 0)
680         break; /* too early */
681       GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap));
682       n->hn = NULL;
683       n->th = NULL;
684       n->is_ready = GNUNET_NO;
685       GNUNET_assert (size >= sizeof (struct OutboundMessage));
686       mret = th->notify (th->notify_cls,
687                          size - sizeof (struct OutboundMessage),
688                          &cbuf[ret + sizeof (struct OutboundMessage)]);
689       GNUNET_assert (mret <= size - sizeof (struct OutboundMessage));
690       if (mret != 0)    
691         {
692           GNUNET_assert (mret + sizeof (struct OutboundMessage) < GNUNET_SERVER_MAX_MESSAGE_SIZE);
693           obm.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND);
694           obm.header.size = htons (mret + sizeof (struct OutboundMessage));
695           obm.priority = htonl (th->priority);
696           obm.timeout = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (th->timeout));
697           obm.peer = n->id;
698           memcpy (&cbuf[ret], &obm, sizeof (struct OutboundMessage));
699           ret += (mret + sizeof (struct OutboundMessage));
700           size -= (mret + sizeof (struct OutboundMessage));
701           GNUNET_BANDWIDTH_tracker_consume (&n->out_tracker, mret);
702         }
703       GNUNET_free (th);
704     }
705   /* if there are more pending messages, try to schedule those */
706   schedule_transmission (h);
707 #if DEBUG_TRANSPORT
708   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
709               "Transmitting %u bytes to transport service\n", ret);
710 #endif
711   return ret;
712 }
713
714
715 /**
716  * Schedule the task to send one message, either from the control
717  * list or the peer message queues  to the service.
718  *
719  * @param cls transport service to schedule a transmission for
720  * @param tc scheduler context
721  */
722 static void
723 schedule_transmission_task (void *cls,
724                             const struct GNUNET_SCHEDULER_TaskContext *tc)
725 {
726   struct GNUNET_TRANSPORT_Handle *h = cls;
727   size_t size;
728   struct GNUNET_TRANSPORT_TransmitHandle *th;
729   struct Neighbour *n;
730
731   h->quota_task = GNUNET_SCHEDULER_NO_TASK;
732   GNUNET_assert (NULL != h->client);
733   /* destroy all requests that have timed out */
734   while ( (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap))) &&
735           (GNUNET_TIME_absolute_get_remaining (n->th->timeout).rel_value == 0) )
736     {
737       /* notify client that the request could not be satisfied within
738          the given time constraints */
739       th = n->th;
740       n->th = NULL;
741       GNUNET_assert (n == GNUNET_CONTAINER_heap_remove_root (h->ready_heap));
742       n->hn = NULL;
743 #if DEBUG_TRANSPORT
744       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
745                   "Signalling timeout for transmission to peer %s due to congestion\n",
746                   GNUNET_i2s (&n->id));
747 #endif
748       GNUNET_assert (0 == 
749                      th->notify (th->notify_cls, 0, NULL));
750       GNUNET_free (th);      
751     }
752   if (NULL != h->cth)
753     return;
754   if (NULL != h->control_head)
755     {
756       size = h->control_head->notify_size;
757     }
758   else
759     {
760       n = GNUNET_CONTAINER_heap_peek (h->ready_heap);
761       if (NULL == n)
762         return; /* no pending messages */
763       size = n->th->notify_size + sizeof (struct OutboundMessage);
764     }  
765 #if DEBUG_TRANSPORT
766   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
767               "Calling notify_transmit_ready\n");
768 #endif
769   h->cth =
770     GNUNET_CLIENT_notify_transmit_ready (h->client,
771                                          size,
772                                          GNUNET_TIME_UNIT_FOREVER_REL,
773                                          GNUNET_NO,
774                                          &transport_notify_ready,
775                                          h);
776   GNUNET_assert (NULL != h->cth);
777 }
778
779
780 /**
781  * Schedule the task to send one message, either from the control
782  * list or the peer message queues  to the service.
783  *
784  * @param h transport service to schedule a transmission for
785  */
786 static void
787 schedule_transmission (struct GNUNET_TRANSPORT_Handle *h)
788 {
789   struct GNUNET_TIME_Relative delay;
790   struct Neighbour *n;
791
792   GNUNET_assert (NULL != h->client);
793   if (h->quota_task != GNUNET_SCHEDULER_NO_TASK)
794     {
795       GNUNET_SCHEDULER_cancel (h->quota_task);
796       h->quota_task = GNUNET_SCHEDULER_NO_TASK;
797     }
798   if (NULL != h->control_head)
799     delay = GNUNET_TIME_UNIT_ZERO;
800   else if (NULL != (n = GNUNET_CONTAINER_heap_peek (h->ready_heap)))    
801     delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, n->th->notify_size);
802   else
803     return; /* no work to be done */
804   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
805               "Scheduling next transmission to service in %llu ms\n",
806               (unsigned long long) delay.rel_value);
807   h->quota_task = GNUNET_SCHEDULER_add_delayed (delay,
808                                                 &schedule_transmission_task,
809                                                 h);
810 }
811
812
813 /**
814  * Queue control request for transmission to the transport
815  * service.
816  *
817  * @param h handle to the transport service
818  * @param size number of bytes to be transmitted
819  * @param notify function to call to get the content
820  * @param notify_cls closure for notify
821  */
822 static void
823 schedule_control_transmit (struct GNUNET_TRANSPORT_Handle *h,
824                            size_t size,
825                            GNUNET_CONNECTION_TransmitReadyNotify notify,
826                            void *notify_cls)
827 {
828   struct GNUNET_TRANSPORT_TransmitHandle *th;
829
830 #if DEBUG_TRANSPORT
831   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
832               "Control transmit of %u bytes requested\n",
833               size);
834 #endif
835   th = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_TransmitHandle));
836   th->notify = notify;
837   th->notify_cls = notify_cls;
838   th->notify_size = size;
839   GNUNET_CONTAINER_DLL_insert_tail (h->control_head,
840                                     h->control_tail,
841                                     th);
842   schedule_transmission (h);
843 }
844
845
846 /**
847  * Transmit START message to service.
848  *
849  * @param cls unused
850  * @param size number of bytes available in buf
851  * @param buf where to copy the message
852  * @return number of bytes copied to buf
853  */
854 static size_t
855 send_start (void *cls, size_t size, void *buf)
856 {
857   struct GNUNET_TRANSPORT_Handle *h = cls;
858   struct StartMessage s;
859
860   if (buf == NULL)
861     {
862       /* Can only be shutdown, just give up */
863 #if DEBUG_TRANSPORT
864       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
865                   "Shutdown while trying to transmit `%s' request.\n",
866                   "START");
867 #endif
868       return 0;
869     }
870 #if DEBUG_TRANSPORT
871   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
872               "Transmitting `%s' request.\n", "START");
873 #endif
874   GNUNET_assert (size >= sizeof (struct StartMessage));
875   s.header.size = htons (sizeof (struct StartMessage));
876   s.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_START);
877   s.do_check = htonl (h->check_self);
878   s.self = h->self;
879   memcpy (buf, &s, sizeof (struct StartMessage));
880   GNUNET_CLIENT_receive (h->client,
881                          &demultiplexer, h, GNUNET_TIME_UNIT_FOREVER_REL);
882   return sizeof (struct StartMessage);
883 }
884
885
886 /**
887  * Try again to connect to transport service.
888  *
889  * @param cls the handle to the transport service
890  * @param tc scheduler context
891  */
892 static void
893 reconnect (void *cls,
894            const struct GNUNET_SCHEDULER_TaskContext *tc)
895 {
896   struct GNUNET_TRANSPORT_Handle *h = cls;
897
898   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
899   if ( (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
900     {
901       /* shutdown, just give up */
902       return;
903     }
904 #if DEBUG_TRANSPORT
905   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
906               "Connecting to transport service.\n");
907 #endif
908   GNUNET_assert (h->client == NULL);
909   GNUNET_assert (h->control_head == NULL);
910   GNUNET_assert (h->control_tail == NULL);
911   h->client = GNUNET_CLIENT_connect ("transport", h->cfg);
912   GNUNET_assert (h->client != NULL);
913   schedule_control_transmit (h,
914                              sizeof (struct StartMessage),
915                              &send_start, h);
916 }
917
918
919 /**
920  * Function that will schedule the job that will try
921  * to connect us again to the client.
922  *
923  * @param h transport service to reconnect
924  */
925 static void
926 disconnect_and_schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h)
927 {
928   struct GNUNET_TRANSPORT_TransmitHandle *th;
929
930   GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
931   /* Forget about all neighbours that we used to be connected to */
932   GNUNET_CONTAINER_multihashmap_iterate(h->neighbours, 
933                                         &neighbour_delete, 
934                                         h);
935   if (NULL != h->cth)
936     {
937       GNUNET_CLIENT_notify_transmit_ready_cancel (h->cth);
938       h->cth = NULL;
939     }
940   if (NULL != h->client)
941     {
942       GNUNET_CLIENT_disconnect (h->client, GNUNET_YES);
943       h->client = NULL;
944     }
945   if (h->quota_task != GNUNET_SCHEDULER_NO_TASK)
946     {
947       GNUNET_SCHEDULER_cancel (h->quota_task);
948       h->quota_task = GNUNET_SCHEDULER_NO_TASK;
949     }
950   while ( (NULL != (th = h->control_head)))
951     {
952       GNUNET_CONTAINER_DLL_remove (h->control_head,
953                                    h->control_tail,
954                                    th);
955       th->notify (th->notify_cls, 0, NULL);
956       GNUNET_free (th);
957     }
958 #if DEBUG_TRANSPORT
959   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
960               "Scheduling task to reconnect to transport service in %llu ms.\n",
961               h->reconnect_delay.rel_value);
962 #endif
963   h->reconnect_task
964     = GNUNET_SCHEDULER_add_delayed (h->reconnect_delay,
965                                     &reconnect, h);
966   if (h->reconnect_delay.rel_value == 0)
967     {
968       h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
969     }
970   else
971     {
972       h->reconnect_delay = GNUNET_TIME_relative_multiply (h->reconnect_delay, 2);
973       h->reconnect_delay = GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_SECONDS,
974                                                      h->reconnect_delay);
975     }
976 }
977
978
979 /**
980  * Closure for 'send_set_quota'.
981  */
982 struct SetQuotaContext
983 {
984
985   /**
986    * Identity of the peer impacted by the quota change.
987    */
988   struct GNUNET_PeerIdentity target;
989
990   /**
991    * Quota to transmit.
992    */
993   struct GNUNET_BANDWIDTH_Value32NBO quota_in;
994 };
995
996
997 /**
998  * Send SET_QUOTA message to the service.
999  *
1000  * @param cls the 'struct SetQuotaContext'
1001  * @param size number of bytes available in buf
1002  * @param buf where to copy the message
1003  * @return number of bytes copied to buf
1004  */
1005 static size_t
1006 send_set_quota (void *cls, size_t size, void *buf)
1007 {
1008   struct SetQuotaContext *sqc = cls;
1009   struct QuotaSetMessage msg;
1010
1011   if (buf == NULL)
1012     {
1013       GNUNET_free (sqc);
1014       return 0;
1015     }
1016 #if DEBUG_TRANSPORT
1017   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1018               "Transmitting `%s' request with respect to `%4s'.\n",
1019               "SET_QUOTA",
1020               GNUNET_i2s (&sqc->target));
1021 #endif
1022   GNUNET_assert (size >= sizeof (struct QuotaSetMessage));
1023   msg.header.size = htons (sizeof (struct QuotaSetMessage));
1024   msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
1025   msg.quota = sqc->quota_in;
1026   msg.peer = sqc->target;
1027   memcpy (buf, &msg, sizeof (msg));
1028   GNUNET_free (sqc);
1029   return sizeof (struct QuotaSetMessage);
1030 }
1031
1032
1033 /**
1034  * Set the share of incoming bandwidth for the given
1035  * peer to the specified amount.
1036  *
1037  * @param handle connection to transport service
1038  * @param target who's bandwidth quota is being changed
1039  * @param quota_in incoming bandwidth quota in bytes per ms
1040  * @param quota_out outgoing bandwidth quota in bytes per ms
1041  */
1042 void
1043 GNUNET_TRANSPORT_set_quota (struct GNUNET_TRANSPORT_Handle *handle,
1044                             const struct GNUNET_PeerIdentity *target,
1045                             struct GNUNET_BANDWIDTH_Value32NBO quota_in,
1046                             struct GNUNET_BANDWIDTH_Value32NBO quota_out)
1047 {
1048   struct Neighbour *n;
1049   struct SetQuotaContext *sqc;
1050    
1051   n = neighbour_find (handle, target);
1052   if (NULL == n)
1053     {
1054       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1055                   "Quota changed to %u for peer `%s', but I have no such neighbour!\n",
1056                   (unsigned int) ntohl (quota_out.value__),
1057                   GNUNET_i2s (target));
1058       return;
1059     }
1060   GNUNET_assert (NULL != handle->client);
1061 #if DEBUG_TRANSPORT
1062   if (ntohl (quota_out.value__) != n->out_tracker.available_bytes_per_s__)
1063     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1064                 "Quota changed from %u to %u for peer `%s'\n",
1065                 (unsigned int) n->out_tracker.available_bytes_per_s__,
1066                 (unsigned int) ntohl (quota_out.value__),
1067                 GNUNET_i2s (target));
1068   else
1069     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1070                 "Quota remains at %u for peer `%s'\n",
1071                 (unsigned int) n->out_tracker.available_bytes_per_s__,
1072                 GNUNET_i2s (target));
1073 #endif
1074   GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
1075                                          quota_out);
1076   sqc = GNUNET_malloc (sizeof (struct SetQuotaContext));
1077   sqc->target = *target;
1078   sqc->quota_in = quota_in;
1079   schedule_control_transmit (handle,
1080                              sizeof (struct QuotaSetMessage),
1081                              &send_set_quota, sqc);
1082 }
1083
1084
1085 /**
1086  * Send REQUEST_CONNECT message to the service.
1087  *
1088  * @param cls the 'struct GNUNET_PeerIdentity'
1089  * @param size number of bytes available in buf
1090  * @param buf where to copy the message
1091  * @return number of bytes copied to buf
1092  */
1093 static size_t
1094 send_try_connect (void *cls, size_t size, void *buf)
1095 {
1096   struct GNUNET_PeerIdentity *pid = cls;
1097   struct TransportRequestConnectMessage msg;
1098
1099   if (buf == NULL)
1100     {
1101       GNUNET_free (pid);
1102       return 0;
1103     }
1104 #if DEBUG_TRANSPORT
1105   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1106               "Transmitting `%s' request with respect to `%4s'.\n",
1107               "REQUEST_CONNECT",
1108               GNUNET_i2s (pid));
1109 #endif
1110   GNUNET_assert (size >= sizeof (struct TransportRequestConnectMessage));
1111   msg.header.size = htons (sizeof (struct TransportRequestConnectMessage));
1112   msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_REQUEST_CONNECT);
1113   msg.reserved = htonl (0);
1114   msg.peer = *pid;
1115   memcpy (buf, &msg, sizeof (msg));
1116   GNUNET_free (pid);
1117   return sizeof (struct TransportRequestConnectMessage);
1118 }
1119
1120
1121 /**
1122  * Ask the transport service to establish a connection to 
1123  * the given peer.
1124  *
1125  * @param handle connection to transport service
1126  * @param target who we should try to connect to
1127  */
1128 void
1129 GNUNET_TRANSPORT_try_connect (struct GNUNET_TRANSPORT_Handle *handle,
1130                               const struct GNUNET_PeerIdentity *target)
1131 {
1132   struct GNUNET_PeerIdentity *pid;
1133
1134   if (NULL == handle->client)
1135     return;
1136   pid = GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity));
1137   *pid = *target;
1138   schedule_control_transmit (handle,
1139                              sizeof (struct TransportRequestConnectMessage),
1140                              &send_try_connect, pid);
1141 }
1142
1143
1144 /**
1145  * Send HELLO message to the service.
1146  *
1147  * @param cls the HELLO message to send
1148  * @param size number of bytes available in buf
1149  * @param buf where to copy the message
1150  * @return number of bytes copied to buf
1151  */
1152 static size_t
1153 send_hello (void *cls, size_t size, void *buf)
1154 {
1155   struct GNUNET_MessageHeader *msg = cls;
1156   uint16_t ssize;
1157
1158   if (buf == NULL)
1159     {
1160 #if DEBUG_TRANSPORT_TIMEOUT
1161       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1162                   "Timeout while trying to transmit `%s' request.\n",
1163                   "HELLO");
1164 #endif
1165       GNUNET_free (msg);
1166       return 0;
1167     }
1168 #if DEBUG_TRANSPORT
1169   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1170               "Transmitting `%s' request.\n", "HELLO");
1171 #endif
1172   ssize = ntohs (msg->size);
1173   GNUNET_assert (size >= ssize);
1174   memcpy (buf, msg, ssize);
1175   GNUNET_free (msg);
1176   return ssize;
1177 }
1178
1179
1180 /**
1181  * Offer the transport service the HELLO of another peer.  Note that
1182  * the transport service may just ignore this message if the HELLO is
1183  * malformed or useless due to our local configuration.
1184  *
1185  * @param handle connection to transport service
1186  * @param hello the hello message
1187  * @param cont continuation to call when HELLO has been sent
1188  * @param cls closure for continuation
1189  *
1190  */
1191 void
1192 GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle,
1193                               const struct GNUNET_MessageHeader *hello,
1194                               GNUNET_SCHEDULER_Task cont,
1195                               void *cls)
1196 {
1197   uint16_t size;
1198   struct GNUNET_PeerIdentity peer;
1199   struct GNUNET_MessageHeader *msg;
1200
1201   if (NULL == handle->client)
1202     return;
1203   GNUNET_break (ntohs (hello->type) == GNUNET_MESSAGE_TYPE_HELLO);
1204   size = ntohs (hello->size);
1205   GNUNET_break (size >= sizeof (struct GNUNET_MessageHeader));
1206   if (GNUNET_OK != GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message*) hello,
1207                                         &peer))
1208     {
1209       GNUNET_break (0);
1210       return;
1211     }
1212   msg = GNUNET_malloc(size);
1213   memcpy (msg, hello, size);
1214 #if DEBUG_TRANSPORT
1215   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1216               "Offering `%s' message of `%4s' to transport for validation.\n",
1217               "HELLO",
1218               GNUNET_i2s (&peer));
1219 #endif
1220   schedule_control_transmit (handle,
1221                              size,
1222                              &send_hello, msg);
1223 }
1224
1225
1226 /**
1227  * Obtain the HELLO message for this peer.
1228  *
1229  * @param handle connection to transport service
1230  * @param rec function to call with the HELLO, sender will be our peer
1231  *            identity; message and sender will be NULL on timeout
1232  *            (handshake with transport service pending/failed).
1233  *             cost estimate will be 0.
1234  * @param rec_cls closure for rec
1235  */
1236 void
1237 GNUNET_TRANSPORT_get_hello (struct GNUNET_TRANSPORT_Handle *handle,
1238                             GNUNET_TRANSPORT_HelloUpdateCallback rec,
1239                             void *rec_cls)
1240 {
1241   struct HelloWaitList *hwl;
1242
1243   hwl = GNUNET_malloc (sizeof (struct HelloWaitList));
1244   hwl->rec = rec;
1245   hwl->rec_cls = rec_cls;
1246   GNUNET_CONTAINER_DLL_insert (handle->hwl_head,
1247                                handle->hwl_tail,
1248                                hwl);
1249   if (handle->my_hello == NULL)
1250     return;
1251   rec (rec_cls, (const struct GNUNET_MessageHeader *) handle->my_hello);
1252 }
1253
1254
1255 /**
1256  * Stop receiving updates about changes to our HELLO message.
1257  *
1258  * @param handle connection to transport service
1259  * @param rec function previously registered to be called with the HELLOs
1260  * @param rec_cls closure for rec
1261  */
1262 void
1263 GNUNET_TRANSPORT_get_hello_cancel (struct GNUNET_TRANSPORT_Handle *handle,
1264                                    GNUNET_TRANSPORT_HelloUpdateCallback rec,
1265                                    void *rec_cls)
1266 {
1267   struct HelloWaitList *pos;
1268
1269   pos = handle->hwl_head;
1270   while (pos != NULL)
1271     {
1272       if ( (pos->rec == rec) &&
1273            (pos->rec_cls == rec_cls) )
1274         break;
1275       pos = pos->next;
1276     }
1277   GNUNET_break (pos != NULL);
1278   if (pos == NULL)
1279     return;
1280   GNUNET_CONTAINER_DLL_remove (handle->hwl_head,
1281                                handle->hwl_tail,
1282                                pos);
1283   GNUNET_free (pos);
1284 }
1285
1286
1287 /**
1288  * Connect to the transport service.  Note that the connection may
1289  * complete (or fail) asynchronously.
1290  *
1291  * @param cfg configuration to use
1292  * @param self our own identity (API should check that it matches
1293  *             the identity found by transport), or NULL (no check)
1294  * @param cls closure for the callbacks
1295  * @param rec receive function to call
1296  * @param nc function to call on connect events
1297  * @param nd function to call on disconnect events
1298  */
1299 struct GNUNET_TRANSPORT_Handle *
1300 GNUNET_TRANSPORT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
1301                           const struct GNUNET_PeerIdentity *self,
1302                           void *cls,
1303                           GNUNET_TRANSPORT_ReceiveCallback rec,
1304                           GNUNET_TRANSPORT_NotifyConnect nc,
1305                           GNUNET_TRANSPORT_NotifyDisconnect nd)
1306 {
1307   struct GNUNET_TRANSPORT_Handle *ret;
1308
1309   ret = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_Handle));
1310   if (self != NULL)
1311     {
1312       ret->self = *self;
1313       ret->check_self = GNUNET_YES;
1314     }
1315   ret->cfg = cfg;
1316   ret->cls = cls;
1317   ret->rec = rec;
1318   ret->nc_cb = nc;
1319   ret->nd_cb = nd;
1320   ret->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
1321   ret->neighbours = GNUNET_CONTAINER_multihashmap_create(STARTING_NEIGHBOURS_SIZE);
1322   ret->ready_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
1323   ret->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, ret);
1324   return ret;
1325 }
1326
1327
1328 /**
1329  * Disconnect from the transport service.
1330  *
1331  * @param handle handle to the service as returned from GNUNET_TRANSPORT_connect
1332  */
1333 void
1334 GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle)
1335 {
1336 #if DEBUG_TRANSPORT
1337   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, 
1338               "Transport disconnect called!\n");
1339 #endif
1340   /* this disconnects all neighbours... */
1341   if (handle->reconnect_task == GNUNET_SCHEDULER_NO_TASK)
1342     disconnect_and_schedule_reconnect (handle);
1343   /* and now we stop trying to connect again... */
1344   if (handle->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
1345     {
1346       GNUNET_SCHEDULER_cancel (handle->reconnect_task);
1347       handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
1348     }  
1349   GNUNET_CONTAINER_multihashmap_destroy (handle->neighbours);
1350   handle->neighbours = NULL;
1351   if (handle->quota_task != GNUNET_SCHEDULER_NO_TASK)
1352     {
1353       GNUNET_SCHEDULER_cancel (handle->quota_task);
1354       handle->quota_task = GNUNET_SCHEDULER_NO_TASK;
1355     }
1356   GNUNET_free_non_null (handle->my_hello);
1357   handle->my_hello = NULL;
1358   GNUNET_assert (handle->hwl_head == NULL);
1359   GNUNET_assert (handle->hwl_tail == NULL);
1360   GNUNET_free (handle);
1361 }
1362
1363
1364 /**
1365  * Check if we could queue a message of the given size for
1366  * transmission.  The transport service will take both its
1367  * internal buffers and bandwidth limits imposed by the
1368  * other peer into consideration when answering this query.
1369  *
1370  * @param handle connection to transport service
1371  * @param target who should receive the message
1372  * @param size how big is the message we want to transmit?
1373  * @param priority how important is the message?
1374  * @param timeout after how long should we give up (and call
1375  *        notify with buf NULL and size 0)?
1376  * @param notify function to call when we are ready to
1377  *        send such a message
1378  * @param notify_cls closure for notify
1379  * @return NULL if someone else is already waiting to be notified
1380  *         non-NULL if the notify callback was queued (can be used to cancel
1381  *         using GNUNET_TRANSPORT_notify_transmit_ready_cancel)
1382  */
1383 struct GNUNET_TRANSPORT_TransmitHandle *
1384 GNUNET_TRANSPORT_notify_transmit_ready (struct GNUNET_TRANSPORT_Handle *handle,
1385                                         const struct GNUNET_PeerIdentity *target,
1386                                         size_t size,
1387                                         uint32_t priority,
1388                                         struct GNUNET_TIME_Relative timeout,
1389                                         GNUNET_CONNECTION_TransmitReadyNotify notify, 
1390                                         void *notify_cls)
1391 {
1392   struct Neighbour *n;
1393   struct GNUNET_TRANSPORT_TransmitHandle *th;
1394   struct GNUNET_TIME_Relative delay;
1395   
1396   n = neighbour_find (handle, target);
1397   if (NULL == n)
1398     {
1399       /* use GNUNET_TRANSPORT_try_connect first, only use this function
1400          once a connection has been established */
1401       GNUNET_break (0);
1402       return NULL;
1403     }
1404   if (NULL != n->th)
1405     {
1406       /* attempt to send two messages at the same time to the same peer */
1407       GNUNET_break (0);
1408       return NULL;
1409     }
1410   GNUNET_assert (NULL == n->hn);
1411   th = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_TransmitHandle));
1412   th->neighbour = n;
1413   th->notify = notify;
1414   th->notify_cls = notify_cls;
1415   th->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1416   th->notify_size = size;
1417   th->priority = priority;
1418   n->th = th;
1419   /* calculate when our transmission should be ready */
1420   delay = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker, size);
1421   if (delay.rel_value > timeout.rel_value)
1422     delay.rel_value = 0; /* notify immediately (with failure) */
1423 #if DEBUG_TRANSPORT
1424   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1425               "Bandwidth tracker allows next transmission to peer %s in %llu ms\n",
1426               GNUNET_i2s (target),
1427               (unsigned long long) delay.rel_value);
1428 #endif
1429   n->hn = GNUNET_CONTAINER_heap_insert (handle->ready_heap,
1430                                         n, 
1431                                         delay.rel_value);
1432   schedule_transmission (handle);
1433   return th;
1434 }
1435
1436
1437 /**
1438  * Cancel the specified transmission-ready notification.
1439  *
1440  * @param th handle returned from GNUNET_TRANSPORT_notify_transmit_ready
1441  */
1442 void
1443 GNUNET_TRANSPORT_notify_transmit_ready_cancel (struct GNUNET_TRANSPORT_TransmitHandle *th)
1444 {
1445   struct Neighbour *n;
1446
1447   GNUNET_assert (NULL == th->next);
1448   GNUNET_assert (NULL == th->prev);
1449   n = th->neighbour;
1450   GNUNET_assert (th == n->th);
1451   n->th = NULL;
1452   if (n->hn != NULL)
1453     {
1454       GNUNET_CONTAINER_heap_remove_node (n->hn);
1455       n->hn = NULL;
1456     }
1457   else
1458     {
1459       GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != th->timeout_task);
1460       GNUNET_SCHEDULER_cancel (th->timeout_task);
1461       th->timeout_task = GNUNET_SCHEDULER_NO_TASK;
1462     }
1463   GNUNET_free (th);                                        
1464 }
1465
1466
1467 /* end of transport_api.c */