fix
[oweals/gnunet.git] / src / transport / transport_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/transport_api.c
23  * @brief library to access the low-level P2P IO service
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_client_lib.h"
28 #include "gnunet_constants.h"
29 #include "gnunet_container_lib.h"
30 #include "gnunet_arm_service.h"
31 #include "gnunet_hello_lib.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_server_lib.h"
34 #include "gnunet_time_lib.h"
35 #include "gnunet_transport_service.h"
36 #include "transport.h"
37
38 /**
39  * After how long do we give up on transmitting a HELLO
40  * to the service?
41  */
42 #define OFFER_HELLO_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
43
44 /**
45  * After how long do we automatically retry an unsuccessful
46  * CONNECT request?
47  */
48 #define CONNECT_RETRY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 750)
49
50 /**
51  * How long should ARM wait when starting up the
52  * transport service before reporting back?
53  */
54 #define START_SERVICE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
55
56 /**
57  * How long should ARM wait when stopping the
58  * transport service before reporting back?
59  */
60 #define STOP_SERVICE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
61
62
63 /**
64  * What stage are we in for transmission processing?
65  */
66 enum TransmitStage
67   {
68     /**
69      * No active message.
70      */
71     TS_NEW = 0,
72
73     /**
74      * Message in local queue, not given to service.
75      */
76     TS_QUEUED = 1,
77
78     /**
79      * Message given to service, not confirmed (no SEND_OK).
80      */
81     TS_TRANSMITTED = 2,
82
83     /**
84      * One message was given to service and before it was confirmed,
85      * another one was already queued (waiting for SEND_OK to pass on
86      * to service).
87      */
88     TS_TRANSMITTED_QUEUED = 3
89   };
90
91
92 /**
93  * Handle for a transmission-ready request.
94  */
95 struct GNUNET_TRANSPORT_TransmitHandle
96 {
97
98   /**
99    * Neighbour for this handle, NULL for control-traffic.
100    */
101   struct NeighbourList *neighbour;
102
103   /**
104    * Function to call when notify_size bytes are available
105    * for transmission.
106    */
107   GNUNET_CONNECTION_TransmitReadyNotify notify;
108
109   /**
110    * Closure for notify.
111    */
112   void *notify_cls;
113
114   /**
115    * transmit_ready task Id.  The task is used to introduce the
116    * artificial delay that may be required to maintain the bandwidth
117    * limits.  Later, this will be the ID of the "transmit_timeout"
118    * task which is used to signal a timeout if the transmission could
119    * not be done in a timely fashion.
120    */
121   GNUNET_SCHEDULER_TaskIdentifier notify_delay_task;
122
123   /**
124    * Timeout for this request.
125    */
126   struct GNUNET_TIME_Absolute timeout;
127
128   /**
129    * How many bytes is our notify callback waiting for?
130    */
131   size_t notify_size;
132
133   /**
134    * How important is this message?
135    */
136   unsigned int priority;
137
138 };
139
140
141 /**
142  * Handle for a control message queue entry.
143  */
144 struct ControlMessage
145 {
146
147   /**
148    * This is a doubly-linked list.
149    */
150   struct ControlMessage *next;
151
152   /**
153    * This is a doubly-linked list.
154    */
155   struct ControlMessage *prev;
156
157   /**
158    * Overall transport handle.
159    */
160   struct GNUNET_TRANSPORT_Handle *h;
161
162   /**
163    * Function to call when notify_size bytes are available
164    * for transmission.
165    */
166   GNUNET_CONNECTION_TransmitReadyNotify notify;
167
168   /**
169    * Closure for notify.
170    */
171   void *notify_cls;
172
173   /**
174    * transmit_ready task Id.  The task is used to introduce the
175    * artificial delay that may be required to maintain the bandwidth
176    * limits.  Later, this will be the ID of the "transmit_timeout"
177    * task which is used to signal a timeout if the transmission could
178    * not be done in a timely fashion.
179    */
180   GNUNET_SCHEDULER_TaskIdentifier notify_delay_task;
181
182   /**
183    * How many bytes is our notify callback waiting for?
184    */
185   size_t notify_size;
186
187 };
188
189
190 /**
191  * Entry in linked list of all of our current neighbours.
192  */
193 struct NeighbourList
194 {
195
196   /**
197    * This is a linked list.
198    */
199   struct NeighbourList *next;
200
201   /**
202    * Overall transport handle.
203    */
204   struct GNUNET_TRANSPORT_Handle *h;
205
206   /**
207    * Active transmit handle; available if 'transmit_forbidden'
208    * is GNUNET_NO.
209    */
210   struct GNUNET_TRANSPORT_TransmitHandle transmit_handle;
211
212   /**
213    * Identity of this neighbour.
214    */
215   struct GNUNET_PeerIdentity id;
216
217   /**
218    * At what time did we reset last_sent last?
219    */
220   struct GNUNET_TIME_Absolute last_quota_update;
221
222   /**
223    * How many bytes have we sent since the "last_quota_update"
224    * timestamp?
225    */
226   uint64_t last_sent;
227
228   /**
229    * Quota for outbound traffic to the neighbour in bytes/ms.
230    */
231   uint32_t quota_out;
232
233   /**
234    * Set to GNUNET_NO if we are currently allowed to accept a
235    * message to the transport service for this peer, GNUNET_YES
236    * if we have one and are waiting for transmission, GNUNET_SYSERR
237    * if we are waiting for confirmation AND have already accepted
238    * yet another message.
239    */
240   enum TransmitStage transmit_stage;
241
242   /**
243    * Have we received a notification that this peer is connected
244    * to us right now?
245    */
246   int is_connected;
247
248 };
249
250
251 /**
252  * Linked list of requests from clients for our HELLO that were
253  * deferred.
254  */
255 struct HelloWaitList
256 {
257
258   /**
259    * This is a linked list.
260    */
261   struct HelloWaitList *next;
262
263   /**
264    * Reference back to our transport handle.
265    */
266   struct GNUNET_TRANSPORT_Handle *handle;
267
268   /**
269    * Callback to call once we got our HELLO.
270    */
271   GNUNET_TRANSPORT_HelloUpdateCallback rec;
272
273   /**
274    * Closure for rec.
275    */
276   void *rec_cls;
277
278 };
279
280
281 /**
282  * Handle for the transport service (includes all of the
283  * state for the transport service).
284  */
285 struct GNUNET_TRANSPORT_Handle
286 {
287
288   /**
289    * Closure for the callbacks.
290    */
291   void *cls;
292
293   /**
294    * Function to call for received data.
295    */
296   GNUNET_TRANSPORT_ReceiveCallback rec;
297
298   /**
299    * function to call on connect events
300    */
301   GNUNET_TRANSPORT_NotifyConnect nc_cb;
302
303   /**
304    * function to call on disconnect events
305    */
306   GNUNET_TRANSPORT_NotifyDisconnect nd_cb;
307
308   /**
309    * Head of DLL of control messages.
310    */
311   struct ControlMessage *control_head;
312
313   /**
314    * Tail of DLL of control messages.
315    */
316   struct ControlMessage *control_tail;
317
318   /**
319    * The current HELLO message for this peer.  Updated
320    * whenever transports change their addresses.
321    */
322   struct GNUNET_HELLO_Message *my_hello;
323
324   /**
325    * My client connection to the transport service.
326    */
327   struct GNUNET_CLIENT_Connection *client;
328
329   /**
330    * Handle to our registration with the client for notification.
331    */
332   struct GNUNET_CLIENT_TransmitHandle *network_handle;
333
334   /**
335    * Linked list of pending requests for our HELLO.
336    */
337   struct HelloWaitList *hwl_head;
338
339   /**
340    * My scheduler.
341    */
342   struct GNUNET_SCHEDULER_Handle *sched;
343
344   /**
345    * My configuration.
346    */
347   const struct GNUNET_CONFIGURATION_Handle *cfg;
348
349   /**
350    * Linked list of the current neighbours of this peer.
351    */
352   struct NeighbourList *neighbours;
353
354   /**
355    * ID of the task trying to reconnect to the service.
356    */
357   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
358
359   /**
360    * ID of the task trying to trigger transmission for a peer
361    * while maintaining bandwidth quotas.
362    */
363   GNUNET_SCHEDULER_TaskIdentifier quota_task;
364
365   /**
366    * Delay until we try to reconnect.
367    */
368   struct GNUNET_TIME_Relative reconnect_delay;
369
370 };
371
372
373 // FIXME: replace with hash map!
374 /**
375  * Get the neighbour list entry for the given peer
376  *
377  * @param h our context
378  * @param peer peer to look up
379  * @return NULL if no such peer entry exists
380  */
381 static struct NeighbourList *
382 neighbour_find (struct GNUNET_TRANSPORT_Handle *h,
383                 const struct GNUNET_PeerIdentity *peer)
384 {
385   struct NeighbourList *pos;
386
387   pos = h->neighbours;
388   while ((pos != NULL) &&
389          (0 != memcmp (peer, &pos->id, sizeof (struct GNUNET_PeerIdentity))))
390     pos = pos->next;
391   return pos;
392 }
393
394
395 /**
396  * Schedule the task to send one message, either from the control
397  * list or the peer message queues  to the service.
398  */
399 static void schedule_transmission (struct GNUNET_TRANSPORT_Handle *h);
400
401
402 /**
403  * Function called by the scheduler when the timeout for bandwidth
404  * availablility for the target neighbour is reached.
405  *
406  * @param cls the 'struct GNUNET_TRANSPORT_Handle*'
407  * @param tc scheduler context
408  */
409 static void
410 quota_transmit_ready (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
411 {
412   struct GNUNET_TRANSPORT_Handle *h = cls;
413
414   h->quota_task = GNUNET_SCHEDULER_NO_TASK;
415   schedule_transmission (h);
416 }
417
418
419 /**
420  * Update the quota values for the given neighbour now.
421  *
422  * @param n neighbour to update
423  */
424 static void
425 update_quota (struct NeighbourList *n)
426 {
427   struct GNUNET_TIME_Relative delta;
428   uint64_t allowed;
429   uint64_t remaining;
430
431   delta = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
432   allowed = delta.value * n->quota_out;
433   if (n->last_sent < allowed)
434     {
435       remaining = allowed - n->last_sent;
436       if (n->quota_out > 0)
437         remaining /= n->quota_out;
438       else
439         remaining = 0;
440       if (remaining > MAX_BANDWIDTH_CARRY)
441         remaining = MAX_BANDWIDTH_CARRY;
442       n->last_sent = 0;
443       n->last_quota_update = GNUNET_TIME_absolute_get ();
444       n->last_quota_update.value -= remaining;
445     }
446   else
447     {
448       n->last_sent -= allowed;
449       n->last_quota_update = GNUNET_TIME_absolute_get ();
450     }
451 }
452
453
454 /**
455  * Figure out which transmission to a peer can be done right now.
456  * If none can, schedule a task to call 'schedule_transmission'
457  * whenever a peer transmission can be done in the future and
458  * return NULL.  Otherwise return the next transmission to be
459  * performed.
460  *
461  * @param h handle to transport
462  * @return NULL to wait longer before doing any peer transmissions
463  */
464 static struct GNUNET_TRANSPORT_TransmitHandle *
465 schedule_peer_transmission (struct GNUNET_TRANSPORT_Handle *h)
466 {
467   struct GNUNET_TRANSPORT_TransmitHandle *ret;
468   struct GNUNET_TRANSPORT_TransmitHandle *th;
469   struct NeighbourList *n;
470   struct NeighbourList *next;
471   struct GNUNET_TIME_Relative retry_time;
472   struct GNUNET_TIME_Relative duration;
473   uint64_t available;
474
475   if (h->quota_task != GNUNET_SCHEDULER_NO_TASK)
476     {
477       GNUNET_SCHEDULER_cancel (h->sched,
478                                h->quota_task);
479       h->quota_task = GNUNET_SCHEDULER_NO_TASK;
480     }
481   retry_time = GNUNET_TIME_UNIT_FOREVER_REL;
482   ret = NULL;
483   next = h->neighbours;
484   while (NULL != (n = next))
485     {
486       next = n->next;
487       if (n->transmit_stage != TS_QUEUED)
488         continue; /* not eligible */
489       th = &n->transmit_handle;
490       /* check outgoing quota */
491       duration = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
492       if (duration.value > MIN_QUOTA_REFRESH_TIME)
493         {
494           update_quota (n);
495           duration = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
496         }
497       available = duration.value * n->quota_out;
498       if (available < n->last_sent + th->notify_size)
499         {
500           /* calculate how much bandwidth we'd still need to
501              accumulate and based on that how long we'll have
502              to wait... */
503           available = n->last_sent + th->notify_size - available;
504           duration = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
505                                                     available / n->quota_out);
506           if (duration.value == 0)
507             duration = GNUNET_TIME_UNIT_MILLISECONDS;
508           if (th->timeout.value <
509               GNUNET_TIME_relative_to_absolute (duration).value)
510             {
511               /* signal timeout! */
512 #if DEBUG_TRANSPORT
513               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
514                           "Would need %llu ms before bandwidth is available for delivery to `%4s', that is too long.  Signaling timeout.\n",
515                           duration.value, GNUNET_i2s (&n->id));
516 #endif
517               if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
518                 {
519                   GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task);
520                   th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
521                 }             
522               n->transmit_stage = TS_NEW;
523               if (NULL != th->notify)
524                 GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL));
525               continue;
526             }
527 #if DEBUG_TRANSPORT
528           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
529                       "Need more bandwidth, delaying delivery to `%4s' by %llu ms\n",
530                       GNUNET_i2s (&n->id), duration.value);
531 #endif
532           retry_time = GNUNET_TIME_relative_min (retry_time,
533                                                  duration);
534           continue;
535         }
536 #if DEBUG_TRANSPORT
537       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
538                   "Bandwidth available for transmission to `%4s'\n",
539                   GNUNET_i2s (&n->id));
540 #endif
541       if ( (ret == NULL) ||
542            (ret->priority < th->priority) )
543         ret = th;
544     }
545   if (ret == NULL)
546     h->quota_task = GNUNET_SCHEDULER_add_delayed (h->sched,
547                                                   retry_time,
548                                                   &quota_transmit_ready,
549                                                   h);
550   return ret;
551 }
552
553
554 /**
555  * Transmit message(s) to service.
556  *
557  * @param cls handle to transport 
558  * @param size number of bytes available in buf
559  * @param buf where to copy the message
560  * @return number of bytes copied to buf
561  */
562 static size_t
563 transport_notify_ready (void *cls, size_t size, void *buf)
564 {
565   struct GNUNET_TRANSPORT_Handle *h = cls;
566   struct ControlMessage *cm;
567   struct GNUNET_TRANSPORT_TransmitHandle *th;
568   struct NeighbourList *n;
569   struct OutboundMessage obm;
570   size_t ret;
571   size_t mret;
572   char *cbuf;
573
574   h->network_handle = NULL;
575   if (buf == NULL)
576     {
577       schedule_transmission (h);
578       return 0;
579     }
580 #if DEBUG_TRANSPORT
581   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
582               "Ready to transmit %u bytes to transport service\n", size);
583 #endif
584   cbuf = buf;
585   ret = 0;
586   while ( (NULL != (cm = h->control_head)) &&
587           (cm->notify_size <= size) )
588     {
589       if (cm->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
590         {
591           GNUNET_SCHEDULER_cancel (h->sched, cm->notify_delay_task);
592           cm->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
593         }
594       GNUNET_CONTAINER_DLL_remove (h->control_head,
595                                    h->control_tail,
596                                    cm);
597       ret += cm->notify (cm->notify_cls, size, &cbuf[ret]);
598       GNUNET_free (cm);
599       size -= ret;
600     }
601   while ( (NULL != (th = schedule_peer_transmission (h))) &&
602           (th->notify_size <= size) )
603     {
604       if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
605         {
606           GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task);
607           th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
608         }
609       n = th->neighbour;
610       switch (n->transmit_stage)
611         {
612         case TS_NEW:
613           GNUNET_break (0);
614           break;
615         case TS_QUEUED:
616           n->transmit_stage = TS_TRANSMITTED;
617           break;
618         case TS_TRANSMITTED:
619           GNUNET_break (0);
620           break;
621         case TS_TRANSMITTED_QUEUED:
622           GNUNET_break (0);
623           break;
624         default:
625           GNUNET_break (0);
626         }
627       GNUNET_assert (size >= sizeof (struct OutboundMessage));
628       mret = th->notify (th->notify_cls, 
629                          size - sizeof (struct OutboundMessage),
630                          &cbuf[ret + sizeof (struct OutboundMessage)]);
631       GNUNET_assert (mret <= size - sizeof (struct OutboundMessage));
632       if (mret != 0)    
633         {
634           obm.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND);
635           obm.header.size = htons (mret + sizeof (struct OutboundMessage));
636           obm.priority = htonl (th->priority);
637           obm.timeout = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (th->timeout));
638           obm.peer = n->id;
639           memcpy (&cbuf[ret], &obm, sizeof (struct OutboundMessage));
640           ret += (mret + sizeof (struct OutboundMessage));
641           size -= (mret + sizeof (struct OutboundMessage));
642         }
643     }
644   schedule_transmission (h);
645 #if DEBUG_TRANSPORT
646   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
647               "Transmitting %u bytes to transport service\n", ret);
648 #endif
649   return ret;
650 }
651
652
653 /**
654  * Schedule the task to send one message, either from the control
655  * list or the peer message queues  to the service.
656  */
657 static void
658 schedule_transmission (struct GNUNET_TRANSPORT_Handle *h)
659 {  
660   size_t size;
661   struct GNUNET_TIME_Relative timeout;
662   struct GNUNET_TRANSPORT_TransmitHandle *th;
663
664   if (NULL != h->network_handle)
665     return;
666   if (h->client == NULL)
667     {
668       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
669                   "Could not yet schedule transmission: we are not yet connected to the transport service!\n");
670       return;                   /* not yet connected */
671     }
672   if (NULL != h->control_head) 
673     {
674       size = h->control_head->notify_size;
675       timeout = GNUNET_TIME_UNIT_FOREVER_REL;
676     }
677   else
678     {
679       th = schedule_peer_transmission (h);
680       if (th == NULL)
681         {
682           /* no transmission ready right now */
683           return;
684         }
685       size = th->notify_size;
686       timeout = GNUNET_TIME_absolute_get_remaining (th->timeout);
687     }
688   h->network_handle = 
689     GNUNET_CLIENT_notify_transmit_ready (h->client,
690                                          size,
691                                          timeout,
692                                          GNUNET_NO,
693                                          &transport_notify_ready,
694                                          h);
695   GNUNET_assert (NULL != h->network_handle);
696 }
697
698
699 /**
700  * Called when our transmit request timed out before any transport
701  * reported success connecting to the desired peer or before the
702  * transport was ready to receive.  Signal error and free
703  * TransmitHandle.
704  */
705 static void
706 control_transmit_timeout (void *cls,
707                           const struct GNUNET_SCHEDULER_TaskContext *tc)
708 {
709   struct ControlMessage *th = cls;
710
711   th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
712   if (NULL != th->notify)
713     th->notify (th->notify_cls, 0, NULL);
714   GNUNET_CONTAINER_DLL_remove (th->h->control_head,
715                                th->h->control_tail,
716                                th);
717   GNUNET_free (th);
718 }
719
720
721 /**
722  * Queue control request for transmission to the transport
723  * service.
724  *
725  * @param h handle to the transport service
726  * @param size number of bytes to be transmitted
727  * @param at_head request must be added to the head of the queue
728  *        (otherwise request will be appended)
729  * @param timeout how long this transmission can wait (at most)
730  * @param notify function to call to get the content
731  * @param notify_cls closure for notify
732  */
733 static void
734 schedule_control_transmit (struct GNUNET_TRANSPORT_Handle *h,
735                            size_t size,
736                            int at_head,
737                            struct GNUNET_TIME_Relative timeout,
738                            GNUNET_CONNECTION_TransmitReadyNotify notify,
739                            void *notify_cls)
740 {
741   struct ControlMessage *th;
742
743 #if DEBUG_TRANSPORT
744   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
745               "Control transmit of %u bytes within %llums requested\n",
746               size, (unsigned long long) timeout.value);
747 #endif
748   th = GNUNET_malloc (sizeof (struct ControlMessage));
749   th->h = h;
750   th->notify = notify;
751   th->notify_cls = notify_cls;
752   th->notify_size = size;
753   th->notify_delay_task
754     = GNUNET_SCHEDULER_add_delayed (h->sched,
755                                     timeout, &control_transmit_timeout, th);
756   if (at_head)    
757     GNUNET_CONTAINER_DLL_insert (h->control_head,
758                                  h->control_tail,
759                                  th);
760   else
761     GNUNET_CONTAINER_DLL_insert_after (h->control_head,
762                                        h->control_tail,
763                                        h->control_tail,
764                                        th);
765   schedule_transmission (h);
766 }
767
768
769 struct SetQuotaContext
770 {
771   struct GNUNET_TRANSPORT_Handle *handle;
772
773   struct GNUNET_PeerIdentity target;
774
775   GNUNET_SCHEDULER_Task cont;
776
777   void *cont_cls;
778
779   struct GNUNET_TIME_Absolute timeout;
780
781   uint32_t quota_in;
782 };
783
784
785 /**
786  * Send SET_QUOTA message to the service.
787  *
788  * @param cls the 'struct SetQuotaContext'
789  * @param size number of bytes available in buf
790  * @param buf where to copy the message
791  * @return number of bytes copied to buf
792  */
793 static size_t
794 send_set_quota (void *cls, size_t size, void *buf)
795 {
796   struct SetQuotaContext *sqc = cls;
797   struct QuotaSetMessage *msg;
798
799   if (buf == NULL)
800     {
801       GNUNET_SCHEDULER_add_continuation (sqc->handle->sched,
802                                          sqc->cont,
803                                          sqc->cont_cls,
804                                          GNUNET_SCHEDULER_REASON_TIMEOUT);
805       GNUNET_free (sqc);
806       return 0;
807     }
808 #if DEBUG_TRANSPORT
809   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
810               "Transmitting `%s' request with respect to `%4s'.\n",
811               "SET_QUOTA", 
812               GNUNET_i2s (&sqc->target));
813 #endif
814   GNUNET_assert (size >= sizeof (struct QuotaSetMessage));
815   msg = buf;
816   msg->header.size = htons (sizeof (struct QuotaSetMessage));
817   msg->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
818   msg->quota_in = htonl (sqc->quota_in);
819   memcpy (&msg->peer, &sqc->target, sizeof (struct GNUNET_PeerIdentity));
820   if (sqc->cont != NULL)
821     GNUNET_SCHEDULER_add_continuation (sqc->handle->sched,
822                                        sqc->cont,
823                                        sqc->cont_cls,
824                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
825   GNUNET_free (sqc);
826   return sizeof (struct QuotaSetMessage);
827 }
828
829
830 /**
831  * Set the share of incoming bandwidth for the given
832  * peer to the specified amount.
833  *
834  * @param handle connection to transport service
835  * @param target who's bandwidth quota is being changed
836  * @param quota_in incoming bandwidth quota in bytes per ms
837  * @param quota_out outgoing bandwidth quota in bytes per ms
838  * @param timeout how long to wait until signaling failure if
839  *        we can not communicate the quota change
840  * @param cont continuation to call when done, will be called
841  *        either with reason "TIMEOUT" or with reason "PREREQ_DONE"
842  * @param cont_cls closure for continuation
843  */
844 void
845 GNUNET_TRANSPORT_set_quota (struct GNUNET_TRANSPORT_Handle *handle,
846                             const struct GNUNET_PeerIdentity *target,
847                             uint32_t quota_in,
848                             uint32_t quota_out,
849                             struct GNUNET_TIME_Relative timeout,
850                             GNUNET_SCHEDULER_Task cont, void *cont_cls)
851 {
852   struct NeighbourList *n;
853   struct SetQuotaContext *sqc;
854
855   n = neighbour_find (handle, target);
856   if (n != NULL)
857     {
858       update_quota (n);
859       if (n->quota_out < quota_out)
860         n->last_quota_update = GNUNET_TIME_absolute_get ();
861       n->quota_out = quota_out;
862     }
863   sqc = GNUNET_malloc (sizeof (struct SetQuotaContext));
864   sqc->handle = handle;
865   sqc->target = *target;
866   sqc->cont = cont;
867   sqc->cont_cls = cont_cls;
868   sqc->timeout = GNUNET_TIME_relative_to_absolute (timeout);
869   sqc->quota_in = quota_in;
870   schedule_control_transmit (handle,
871                              sizeof (struct QuotaSetMessage),
872                              GNUNET_NO, timeout, &send_set_quota, sqc);
873 }
874
875
876 /**
877  * Obtain the HELLO message for this peer.
878  *
879  * @param handle connection to transport service
880  * @param timeout how long to wait for the HELLO
881  * @param rec function to call with the HELLO, sender will be our peer
882  *            identity; message and sender will be NULL on timeout
883  *            (handshake with transport service pending/failed).
884  *             cost estimate will be 0.
885  * @param rec_cls closure for rec
886  */
887 void
888 GNUNET_TRANSPORT_get_hello (struct GNUNET_TRANSPORT_Handle *handle,
889                             GNUNET_TRANSPORT_HelloUpdateCallback rec,
890                             void *rec_cls)
891 {
892   struct HelloWaitList *hwl;
893
894   hwl = GNUNET_malloc (sizeof (struct HelloWaitList));
895   hwl->next = handle->hwl_head;
896   handle->hwl_head = hwl;
897   hwl->handle = handle;
898   hwl->rec = rec;
899   hwl->rec_cls = rec_cls;
900   if (handle->my_hello == NULL)
901     return;    
902   rec (rec_cls, (const struct GNUNET_MessageHeader *) handle->my_hello);
903 }
904
905
906
907 /**
908  * Stop receiving updates about changes to our HELLO message.
909  *
910  * @param handle connection to transport service
911  * @param rec function previously registered to be called with the HELLOs
912  * @param rec_cls closure for rec
913  */
914 void
915 GNUNET_TRANSPORT_get_hello_cancel (struct GNUNET_TRANSPORT_Handle *handle,
916                                    GNUNET_TRANSPORT_HelloUpdateCallback rec,
917                                    void *rec_cls)
918 {
919   struct HelloWaitList *pos;
920   struct HelloWaitList *prev;
921
922   prev = NULL;
923   pos = handle->hwl_head;
924   while (pos != NULL)
925     {
926       if ( (pos->rec == rec) &&
927            (pos->rec_cls == rec_cls) )
928         break;
929       prev = pos;
930       pos = pos->next;
931     }
932   GNUNET_break (pos != NULL);
933   if (pos == NULL)
934     return;
935   if (prev == NULL)
936     handle->hwl_head = pos->next;
937   else
938     prev->next = pos->next;
939   GNUNET_free (pos);
940 }
941
942
943 /**
944  * Send HELLO message to the service.
945  *
946  * @param cls the HELLO message to send
947  * @param size number of bytes available in buf
948  * @param buf where to copy the message
949  * @return number of bytes copied to buf
950  */
951 static size_t
952 send_hello (void *cls, size_t size, void *buf)
953 {
954   struct GNUNET_MessageHeader *hello = cls;
955   uint16_t msize;
956
957   if (buf == NULL)
958     {
959 #if DEBUG_TRANSPORT
960       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
961                   "Timeout while trying to transmit `%s' request.\n",
962                   "HELLO");
963 #endif
964       GNUNET_free (hello);
965       return 0;
966     }
967 #if DEBUG_TRANSPORT
968   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
969               "Transmitting `%s' request.\n", "HELLO");
970 #endif
971   msize = ntohs (hello->size);
972   GNUNET_assert (size >= msize);
973   memcpy (buf, hello, msize);
974   GNUNET_free (hello);
975   return msize;
976 }
977
978
979 /**
980  * Offer the transport service the HELLO of another peer.  Note that
981  * the transport service may just ignore this message if the HELLO is
982  * malformed or useless due to our local configuration.
983  *
984  * @param handle connection to transport service
985  * @param hello the hello message
986  */
987 void
988 GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle,
989                               const struct GNUNET_MessageHeader *hello)
990 {
991   struct GNUNET_MessageHeader *hc;
992   uint16_t size;
993
994   GNUNET_break (ntohs (hello->type) == GNUNET_MESSAGE_TYPE_HELLO);
995   size = ntohs (hello->size);
996   GNUNET_break (size >= sizeof (struct GNUNET_MessageHeader));
997   hc = GNUNET_malloc (size);
998   memcpy (hc, hello, size);
999   schedule_control_transmit (handle,
1000                              size,
1001                              GNUNET_NO, OFFER_HELLO_TIMEOUT, &send_hello, hc);
1002 }
1003
1004
1005 /**
1006  * Transmit START message to service.
1007  *
1008  * @param cls unused
1009  * @param size number of bytes available in buf
1010  * @param buf where to copy the message
1011  * @return number of bytes copied to buf
1012  */
1013 static size_t
1014 send_start (void *cls, size_t size, void *buf)
1015 {
1016   struct GNUNET_MessageHeader *s = buf;
1017
1018   if (buf == NULL)
1019     {
1020       /* Can only be shutdown, just give up */
1021 #if DEBUG_TRANSPORT
1022       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1023                   "Shutdown while trying to transmit `%s' request.\n",
1024                   "START");
1025 #endif
1026       return 0;
1027     }
1028 #if DEBUG_TRANSPORT
1029   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1030               "Transmitting `%s' request.\n", "START");
1031 #endif
1032   GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader));
1033   s->size = htons (sizeof (struct GNUNET_MessageHeader));
1034   s->type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_START);
1035   return sizeof (struct GNUNET_MessageHeader);
1036 }
1037
1038
1039 /**
1040  * Free neighbour. 
1041  * 
1042  * @param h our state
1043  * @param n the entry to free
1044  */
1045 static void
1046 neighbour_free (struct NeighbourList *n)
1047 {
1048   struct GNUNET_TRANSPORT_Handle *h;
1049   struct NeighbourList *prev;
1050   struct NeighbourList *pos;
1051
1052   h = n->h;
1053 #if DEBUG_TRANSPORT
1054   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1055               "Removing neighbour `%s' from list of connected peers.\n",
1056               GNUNET_i2s (&n->id));
1057 #endif
1058   GNUNET_break (n->is_connected == GNUNET_NO);
1059   GNUNET_break (n->transmit_stage == TS_NEW);
1060
1061   prev = NULL;
1062   pos = h->neighbours;
1063   while (pos != n)
1064     {
1065       prev = pos;
1066       pos = pos->next;
1067     }
1068   if (prev == NULL)
1069     h->neighbours = n->next;
1070   else
1071     prev->next = n->next;
1072   GNUNET_free (n);
1073 }
1074
1075
1076 /**
1077  * Mark neighbour as disconnected. 
1078  * 
1079  * @param n the entry to mark as disconnected
1080  */
1081 static void
1082 neighbour_disconnect (struct NeighbourList *n)
1083 {
1084   struct GNUNET_TRANSPORT_Handle *h = n->h;
1085 #if DEBUG_TRANSPORT
1086   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1087               "Removing neighbour `%s' from list of connected peers.\n",
1088               GNUNET_i2s (&n->id));
1089 #endif
1090   GNUNET_break (n->is_connected == GNUNET_YES);
1091   n->is_connected = GNUNET_NO;
1092   if (h->nc_cb != NULL)
1093     h->nd_cb (h->cls, &n->id);
1094   if (n->transmit_stage == TS_NEW)
1095     neighbour_free (n);
1096 }
1097
1098
1099 /**
1100  * Function we use for handling incoming messages.
1101  *
1102  * @param cls closure (struct GNUNET_TRANSPORT_Handle *)
1103  * @param msg message received, NULL on timeout or fatal error
1104  */
1105 static void demultiplexer (void *cls, 
1106                            const struct GNUNET_MessageHeader *msg);
1107
1108
1109 /**
1110  * Try again to connect to transport service.
1111  *
1112  * @param cls the handle to the transport service
1113  * @param tc scheduler context
1114  */
1115 static void
1116 reconnect (void *cls, 
1117            const struct GNUNET_SCHEDULER_TaskContext *tc)
1118 {
1119   struct GNUNET_TRANSPORT_Handle *h = cls;
1120   struct ControlMessage *pos;
1121   struct NeighbourList *n;
1122
1123   if ( (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
1124     {
1125       /* shutdown, just give up */
1126       return;
1127     }
1128   /* Forget about all neighbours that we used to be connected to */
1129   n = h->neighbours;
1130   while (NULL != n)
1131     {
1132       neighbour_disconnect (n);
1133       n = n->next;
1134     }
1135 #if DEBUG_TRANSPORT
1136   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service.\n");
1137 #endif
1138   GNUNET_assert (h->client == NULL);
1139   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
1140   h->client = GNUNET_CLIENT_connect (h->sched, "transport", h->cfg);
1141   GNUNET_assert (h->client != NULL);
1142   /* make sure we don't send "START" twice, remove existing entry from
1143      queue (if present) */
1144   pos = h->control_head;
1145   while (pos != NULL)
1146     {
1147       if (pos->notify == &send_start)
1148         {
1149           GNUNET_CONTAINER_DLL_remove (h->control_head,
1150                                        h->control_tail,
1151                                        pos);
1152           if (GNUNET_SCHEDULER_NO_TASK != pos->notify_delay_task)
1153             {
1154               GNUNET_SCHEDULER_cancel (h->sched, pos->notify_delay_task);
1155               pos->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
1156             }
1157           GNUNET_free (pos);
1158           break;
1159         }
1160       pos = pos->next;
1161     }
1162   schedule_control_transmit (h,
1163                              sizeof (struct GNUNET_MessageHeader),
1164                              GNUNET_YES,
1165                              GNUNET_TIME_UNIT_FOREVER_REL, &send_start, NULL);
1166   GNUNET_CLIENT_receive (h->client,
1167                          &demultiplexer, h, GNUNET_TIME_UNIT_FOREVER_REL);
1168 }
1169
1170
1171 /**
1172  * Function that will schedule the job that will try
1173  * to connect us again to the client.
1174  *
1175  * @param h transport service to reconnect
1176  */
1177 static void
1178 schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h)
1179 {
1180 #if DEBUG_TRANSPORT
1181   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1182               "Scheduling task to reconnect to transport service in %llu ms.\n",
1183               h->reconnect_delay.value);
1184 #endif
1185   GNUNET_assert (h->client == NULL);
1186   GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
1187   h->reconnect_task
1188     = GNUNET_SCHEDULER_add_delayed (h->sched,
1189                                     h->reconnect_delay, &reconnect, h);
1190   if (h->reconnect_delay.value == 0)
1191     {
1192       h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
1193     }
1194   else 
1195     {
1196       h->reconnect_delay = GNUNET_TIME_relative_multiply (h->reconnect_delay, 2);
1197       h->reconnect_delay = GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_SECONDS,
1198                                                      h->reconnect_delay);
1199     }
1200 }
1201
1202
1203 /**
1204  * Add neighbour to our list
1205  */
1206 static struct NeighbourList *
1207 neighbour_add (struct GNUNET_TRANSPORT_Handle *h,
1208                const struct GNUNET_PeerIdentity *pid)
1209 {
1210   struct NeighbourList *n;
1211
1212   /* check for duplicates */
1213   if (NULL != (n = neighbour_find (h, pid)))
1214     {
1215       GNUNET_break (0);
1216       return n;
1217     }
1218 #if DEBUG_TRANSPORT
1219   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1220               "Creating entry for neighbour `%4s'.\n", 
1221               GNUNET_i2s (pid));
1222 #endif
1223   n = GNUNET_malloc (sizeof (struct NeighbourList));
1224   n->id = *pid;
1225   n->last_quota_update = GNUNET_TIME_absolute_get ();
1226   n->next = h->neighbours;
1227   n->quota_out = GNUNET_CONSTANTS_DEFAULT_BPM_IN_OUT;
1228   n->h = h;
1229   h->neighbours = n;  
1230   return n;
1231 }
1232
1233
1234 /**
1235  * Connect to the transport service.  Note that the connection may
1236  * complete (or fail) asynchronously.
1237  *
1238  * @param sched scheduler to use
1239  * @param cfg configuration to use
1240  * @param cls closure for the callbacks
1241  * @param rec receive function to call
1242  * @param nc function to call on connect events
1243  * @param nd function to call on disconnect events
1244  */
1245 struct GNUNET_TRANSPORT_Handle *
1246 GNUNET_TRANSPORT_connect (struct GNUNET_SCHEDULER_Handle *sched,
1247                           const struct GNUNET_CONFIGURATION_Handle *cfg,
1248                           void *cls,
1249                           GNUNET_TRANSPORT_ReceiveCallback rec,
1250                           GNUNET_TRANSPORT_NotifyConnect nc,
1251                           GNUNET_TRANSPORT_NotifyDisconnect nd)
1252 {
1253   struct GNUNET_TRANSPORT_Handle *ret;
1254
1255   GNUNET_ARM_start_services (cfg, sched, "peerinfo", "transport", NULL);
1256   ret = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_Handle));
1257   ret->sched = sched;
1258   ret->cfg = cfg;
1259   ret->cls = cls;
1260   ret->rec = rec;
1261   ret->nc_cb = nc;
1262   ret->nd_cb = nd;
1263   ret->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
1264   schedule_reconnect (ret);
1265   return ret;
1266 }
1267
1268
1269 /**
1270  * Disconnect from the transport service.
1271  */
1272 void
1273 GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle)
1274 {
1275   struct GNUNET_TRANSPORT_TransmitHandle *th;
1276   struct NeighbourList *n;
1277   struct HelloWaitList *hwl;
1278   struct GNUNET_CLIENT_Connection *client;
1279
1280 #if DEBUG_TRANSPORT
1281   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transport disconnect called!\n");
1282 #endif
1283   while (NULL != (n = handle->neighbours))
1284     {
1285       handle->neighbours = n->next;
1286       switch (n->transmit_stage)
1287         {
1288         case TS_NEW:
1289         case TS_TRANSMITTED:
1290           /* nothing to do */
1291           break;
1292         case TS_QUEUED:
1293         case TS_TRANSMITTED_QUEUED:
1294           th = &n->transmit_handle;
1295           if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
1296             {
1297               GNUNET_SCHEDULER_cancel (handle->sched,
1298                                        th->notify_delay_task);
1299               th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
1300             }
1301           GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL));        
1302           break;
1303         default:
1304           GNUNET_break (0);
1305         }
1306       GNUNET_free (n);
1307     }
1308   while (NULL != (hwl = handle->hwl_head))
1309     {
1310       handle->hwl_head = hwl->next;
1311       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1312                   _
1313                   ("Disconnect while notification for `%s' still registered.\n"),
1314                   "HELLO");
1315       if (hwl->rec != NULL)
1316         hwl->rec (hwl->rec_cls, NULL);
1317       GNUNET_free (hwl);
1318     }
1319   if (handle->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
1320     {
1321       GNUNET_SCHEDULER_cancel (handle->sched, handle->reconnect_task);
1322       handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
1323     }
1324   if (handle->quota_task != GNUNET_SCHEDULER_NO_TASK)
1325     {
1326       GNUNET_SCHEDULER_cancel (handle->sched, handle->quota_task);
1327       handle->quota_task = GNUNET_SCHEDULER_NO_TASK;
1328     }
1329   GNUNET_free_non_null (handle->my_hello);
1330   handle->my_hello = NULL;
1331   GNUNET_ARM_stop_services (handle->cfg, handle->sched, "transport",
1332                             "peerinfo", NULL);
1333   if (NULL != handle->network_handle)
1334     {
1335       GNUNET_CLIENT_notify_transmit_ready_cancel (handle->network_handle);
1336       handle->network_handle = NULL;
1337     }
1338   if (NULL != (client = handle->client))
1339     {
1340 #if DEBUG_TRANSPORT
1341       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1342                   "Disconnecting from transport service for good.\n");
1343 #endif
1344       handle->client = NULL;
1345       GNUNET_CLIENT_disconnect (client);
1346     }
1347   GNUNET_free (handle);
1348 }
1349
1350
1351 /**
1352  * Function we use for handling incoming messages.
1353  *
1354  * @param cls closure (struct GNUNET_TRANSPORT_Handle *)
1355  * @param msg message received, NULL on timeout or fatal error
1356  */
1357 static void
1358 demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg)
1359 {
1360   struct GNUNET_TRANSPORT_Handle *h = cls;
1361   const struct DisconnectInfoMessage *dim;
1362   const struct ConnectInfoMessage *cim;
1363   const struct InboundMessage *im;
1364   const struct GNUNET_MessageHeader *imm;
1365   const struct SendOkMessage *okm;
1366   struct HelloWaitList *hwl;
1367   struct HelloWaitList *next_hwl;
1368   struct NeighbourList *n;
1369   struct GNUNET_PeerIdentity me;
1370   uint16_t size;
1371
1372   if (h->client == NULL)
1373     {
1374       /* shutdown initiated from 'GNUNET_TRANSPORT_disconnect',
1375          finish clean up work! */
1376       GNUNET_free (h);
1377       return;
1378     }
1379   if (msg == NULL) 
1380     {
1381 #if DEBUG_TRANSPORT
1382       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1383                   "Error receiving from transport service, disconnecting temporarily.\n");
1384 #endif
1385       if (h->network_handle != NULL)
1386         {
1387           GNUNET_CLIENT_notify_transmit_ready_cancel (h->network_handle);
1388           h->network_handle = NULL;
1389         }
1390       GNUNET_CLIENT_disconnect (h->client);
1391       h->client = NULL;
1392       schedule_reconnect (h);
1393       return;
1394     }
1395   GNUNET_CLIENT_receive (h->client,
1396                          &demultiplexer, h, GNUNET_TIME_UNIT_FOREVER_REL);
1397   size = ntohs (msg->size);
1398   switch (ntohs (msg->type))
1399     {
1400     case GNUNET_MESSAGE_TYPE_HELLO:
1401       if (GNUNET_OK !=
1402           GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg,
1403                                &me))
1404         {
1405           GNUNET_break (0);
1406           break;
1407         }
1408 #if DEBUG_TRANSPORT
1409       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1410                   "Receiving (my own) `%s' message, I am `%4s'.\n",
1411                   "HELLO", GNUNET_i2s (&me));
1412 #endif
1413       GNUNET_free_non_null (h->my_hello);
1414       h->my_hello = NULL;
1415       if (size < sizeof (struct GNUNET_MessageHeader))
1416         {
1417           GNUNET_break (0);
1418           break;
1419         }
1420       h->my_hello = GNUNET_malloc (size);
1421       memcpy (h->my_hello, msg, size);
1422       hwl = h->hwl_head;
1423       while (NULL != hwl)
1424         {
1425           next_hwl = hwl->next;
1426           hwl->rec (hwl->rec_cls,
1427                     (const struct GNUNET_MessageHeader *) h->my_hello);
1428           hwl = next_hwl;
1429         }
1430       break;
1431     case GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT:
1432       if (size != sizeof (struct ConnectInfoMessage))
1433         {
1434           GNUNET_break (0);
1435           break;
1436         }
1437       cim = (const struct ConnectInfoMessage *) msg;
1438 #if DEBUG_TRANSPORT
1439       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1440                   "Receiving `%s' message for `%4s'.\n",
1441                   "CONNECT", GNUNET_i2s (&cim->id));
1442 #endif
1443       n = neighbour_find (h, &cim->id);
1444       if (n == NULL)
1445         n = neighbour_add (h,
1446                            &cim->id);
1447       GNUNET_break (n->is_connected == GNUNET_NO);
1448       n->is_connected = GNUNET_YES;
1449       if (h->nc_cb != NULL)
1450         h->nc_cb (h->cls, &n->id,
1451                   GNUNET_TIME_relative_ntoh (cim->latency), 
1452                   ntohs (cim->distance));
1453       break;
1454     case GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT:
1455       if (size != sizeof (struct DisconnectInfoMessage))
1456         {
1457           GNUNET_break (0);
1458           break;
1459         }
1460       dim = (const struct DisconnectInfoMessage *) msg;
1461 #if DEBUG_TRANSPORT
1462       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1463                   "Receiving `%s' message for `%4s'.\n",
1464                   "DISCONNECT",
1465                   GNUNET_i2s (&dim->peer));
1466 #endif
1467       n = neighbour_find (h, &cim->id);
1468       GNUNET_break (n != NULL);
1469       if (n != NULL)
1470         neighbour_disconnect (n);      
1471       break;
1472     case GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK:
1473       if (size != sizeof (struct SendOkMessage))
1474         {
1475           GNUNET_break (0);
1476           break;
1477         }
1478       okm = (const struct SendOkMessage *) msg;
1479 #if DEBUG_TRANSPORT
1480       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1481                   "Receiving `%s' message, transmission %s.\n", "SEND_OK",
1482                   ntohl (okm->success) == GNUNET_OK ? "succeeded" : "failed");
1483 #endif
1484       n = neighbour_find (h, &okm->peer);
1485       GNUNET_assert (n != NULL);
1486       switch (n->transmit_stage)
1487         {
1488         case TS_NEW:
1489           GNUNET_break (0);
1490           break;
1491         case TS_QUEUED:
1492           GNUNET_break (0);
1493           break;
1494         case TS_TRANSMITTED:
1495           n->transmit_stage = TS_NEW;
1496           break;
1497         case TS_TRANSMITTED_QUEUED:
1498           n->transmit_stage = TS_QUEUED;
1499           schedule_transmission (h);
1500           break;
1501         default:
1502           GNUNET_break (0);
1503         }
1504       break;
1505     case GNUNET_MESSAGE_TYPE_TRANSPORT_RECV:
1506 #if DEBUG_TRANSPORT
1507       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1508                   "Receiving `%s' message.\n", "RECV");
1509 #endif
1510       if (size <
1511           sizeof (struct InboundMessage) +
1512           sizeof (struct GNUNET_MessageHeader))
1513         {
1514           GNUNET_break (0);
1515           break;
1516         }
1517       im = (const struct InboundMessage *) msg;
1518       imm = (const struct GNUNET_MessageHeader *) &im[1];
1519       if (ntohs (imm->size) + sizeof (struct InboundMessage) != size)
1520         {
1521           GNUNET_break (0);
1522           break;
1523         }
1524 #if DEBUG_TRANSPORT
1525       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1526                   "Received message of type %u from `%4s'.\n",
1527                   ntohs (imm->type), GNUNET_i2s (&im->peer));
1528 #endif      
1529       n = neighbour_find (h, &im->peer);
1530       if (n == NULL)
1531         n = neighbour_add (h, &im->peer);
1532       if (n == NULL) 
1533         break;
1534       if (h->rec != NULL)
1535         h->rec (h->cls, &im->peer, imm,
1536                 GNUNET_TIME_relative_ntoh (im->latency), ntohs(im->distance));
1537       break;
1538     default:
1539       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1540                   _
1541                   ("Received unexpected message of type %u in %s:%u\n"),
1542                   ntohs (msg->type), __FILE__, __LINE__);
1543       GNUNET_break (0);
1544       break;
1545     }
1546 }
1547
1548
1549 /**
1550  * Called when our transmit request timed out before any transport
1551  * reported success connecting to the desired peer or before the
1552  * transport was ready to receive.  Signal error and free
1553  * TransmitHandle.
1554  *
1555  * @param cls the 'struct GNUNET_TRANSPORT_TransmitHandle*' that is timing out
1556  * @param tc scheduler context
1557  */
1558 static void
1559 peer_transmit_timeout (void *cls,
1560                        const struct GNUNET_SCHEDULER_TaskContext *tc)
1561 {
1562   struct GNUNET_TRANSPORT_TransmitHandle *th = cls;
1563   struct NeighbourList *n;
1564
1565   th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
1566   n = th->neighbour;
1567   switch (n->transmit_stage)
1568     {
1569     case TS_NEW:
1570       GNUNET_break (0);
1571       break;
1572     case TS_QUEUED:
1573       n->transmit_stage = TS_NEW;
1574       if (n->is_connected == GNUNET_NO)
1575         neighbour_free (n);
1576       break;
1577     case TS_TRANSMITTED:
1578       GNUNET_break (0);
1579       break;
1580     case TS_TRANSMITTED_QUEUED:
1581       n->transmit_stage = TS_TRANSMITTED;
1582       break;
1583     default:
1584       GNUNET_break (0);
1585     }
1586   if (NULL != th->notify)
1587     th->notify (th->notify_cls, 0, NULL);
1588 }
1589
1590
1591 /**
1592  * Check if we could queue a message of the given size for
1593  * transmission.  The transport service will take both its
1594  * internal buffers and bandwidth limits imposed by the
1595  * other peer into consideration when answering this query.
1596  *
1597  * @param handle connection to transport service
1598  * @param target who should receive the message
1599  * @param size how big is the message we want to transmit?
1600  * @param priority how important is the message?
1601  * @param timeout after how long should we give up (and call
1602  *        notify with buf NULL and size 0)?
1603  * @param notify function to call when we are ready to
1604  *        send such a message
1605  * @param notify_cls closure for notify
1606  * @return NULL if someone else is already waiting to be notified
1607  *         non-NULL if the notify callback was queued (can be used to cancel
1608  *         using GNUNET_TRANSPORT_notify_transmit_ready_cancel)
1609  */
1610 struct GNUNET_TRANSPORT_TransmitHandle *
1611 GNUNET_TRANSPORT_notify_transmit_ready (struct GNUNET_TRANSPORT_Handle
1612                                         *handle,
1613                                         const struct GNUNET_PeerIdentity
1614                                         *target, size_t size,
1615                                         unsigned int priority,
1616                                         struct GNUNET_TIME_Relative timeout,
1617                                         GNUNET_CONNECTION_TransmitReadyNotify
1618                                         notify, void *notify_cls)
1619 {
1620   struct GNUNET_TRANSPORT_TransmitHandle *th;
1621   struct NeighbourList *n;
1622
1623   if (size + sizeof (struct OutboundMessage) >=
1624       GNUNET_SERVER_MAX_MESSAGE_SIZE)
1625     {
1626 #if DEBUG_TRANSPORT
1627       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1628                   "Message size is %d, max allowed is %d.\n",
1629                   size + sizeof (struct OutboundMessage), GNUNET_SERVER_MAX_MESSAGE_SIZE);
1630 #endif
1631       GNUNET_break (0);
1632       return NULL;
1633     }
1634 #if DEBUG_TRANSPORT
1635   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1636               "Asking transport service for transmission of %u bytes to peer `%4s'.\n",
1637               size, GNUNET_i2s (target));
1638 #endif
1639   n = neighbour_find (handle, target);
1640   if (n == NULL)
1641     n = neighbour_add (handle, target);
1642   if (n == NULL) 
1643     return NULL;
1644   switch (n->transmit_stage)
1645     {
1646     case TS_NEW:
1647       n->transmit_stage = TS_QUEUED;
1648       break;
1649     case TS_QUEUED:
1650       GNUNET_break (0);
1651       return NULL;
1652     case TS_TRANSMITTED:
1653       n->transmit_stage = TS_TRANSMITTED_QUEUED;
1654       break;
1655     case TS_TRANSMITTED_QUEUED:
1656       GNUNET_break (0);
1657       return NULL;
1658     default:
1659       GNUNET_break (0);
1660       return NULL;
1661     }
1662   th = &n->transmit_handle;
1663   th->neighbour = n;
1664   th->notify = notify;
1665   th->notify_cls = notify_cls;
1666   th->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1667   th->notify_size = size + sizeof (struct OutboundMessage);
1668   th->priority = priority;
1669   th->notify_delay_task
1670     = GNUNET_SCHEDULER_add_delayed (handle->sched, timeout,
1671                                     &peer_transmit_timeout, th);
1672   schedule_transmission (handle);
1673   return th;
1674 }
1675
1676
1677 /**
1678  * Cancel the specified transmission-ready notification.
1679  */
1680 void
1681 GNUNET_TRANSPORT_notify_transmit_ready_cancel (struct
1682                                                GNUNET_TRANSPORT_TransmitHandle
1683                                                *th)
1684 {
1685   struct NeighbourList *n;
1686
1687   n = th->neighbour;
1688 #if DEBUG_TRANSPORT
1689   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1690               "Transmission request of %u bytes to `%4s' was cancelled.\n",
1691               th->notify_size - sizeof (struct OutboundMessage),
1692               GNUNET_i2s (&n->id));
1693 #endif
1694   switch (n->transmit_stage)
1695     {
1696     case TS_NEW:
1697       GNUNET_break (0);
1698       break;
1699     case TS_QUEUED:
1700       n->transmit_stage = TS_NEW;
1701       if (n->is_connected == GNUNET_NO)
1702         neighbour_free (n);
1703       break;
1704     case TS_TRANSMITTED:
1705       GNUNET_break (0);
1706       break;
1707     case TS_TRANSMITTED_QUEUED:
1708       n->transmit_stage = TS_TRANSMITTED;
1709       break;
1710     default:
1711       GNUNET_break (0);
1712     }
1713 }
1714
1715
1716 /* end of transport_api.c */