check for target repetition
[oweals/gnunet.git] / src / transport / transport_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/transport_api.c
23  * @brief library to access the low-level P2P IO service
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - set_quota with low bandwidth should cause peer
28  *   disconnects (currently never does that) (MINOR)
29  */
30 #include "platform.h"
31 #include "gnunet_client_lib.h"
32 #include "gnunet_arm_service.h"
33 #include "gnunet_hello_lib.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_server_lib.h"
36 #include "gnunet_time_lib.h"
37 #include "gnunet_transport_service.h"
38 #include "transport.h"
39
40 /**
41  * After how long do we give up on transmitting a HELLO
42  * to the service?
43  */
44 #define OFFER_HELLO_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
45
46 /**
47  * How long should ARM wait when starting up the
48  * transport service before reporting back?
49  */
50 #define START_SERVICE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
51
52 /**
53  * How long should ARM wait when stopping the
54  * transport service before reporting back?
55  */
56 #define STOP_SERVICE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
57
58 /**
59  * Entry in linked list of all of our current neighbours.
60  */
61 struct NeighbourList
62 {
63
64   /**
65    * This is a linked list.
66    */
67   struct NeighbourList *next;
68
69   /**
70    * Active transmit handle, can be NULL.  Used to move
71    * from ready to wait list on disconnect and to block
72    * two transmissions to the same peer from being scheduled
73    * at the same time.
74    */
75   struct GNUNET_TRANSPORT_TransmitHandle *transmit_handle;
76
77
78   /**
79    * Identity of this neighbour.
80    */
81   struct GNUNET_PeerIdentity id;
82
83   /**
84    * At what time did we reset last_sent last?
85    */
86   struct GNUNET_TIME_Absolute last_quota_update;
87
88   /**
89    * How many bytes have we sent since the "last_quota_update"
90    * timestamp?
91    */
92   uint64_t last_sent;
93
94   /**
95    * Global quota for outbound traffic to the neighbour in bytes/ms.
96    */
97   uint32_t quota_out;
98
99   /**
100    * Set to GNUNET_YES if we are currently allowed to
101    * transmit a message to the transport service for this
102    * peer, GNUNET_NO otherwise.
103    */
104   int transmit_ok;
105
106   /**
107    * Set to GNUNET_YES if we have received an ACK for the
108    * given peer.  Peers that receive our HELLO always respond
109    * with an ACK to let us know that we are successfully
110    * communicating.  Note that a PING can not be used for this
111    * since PINGs are only send if a HELLO address requires
112    * confirmation (and also, PINGs are not passed to the
113    * transport API itself).
114    */
115   int received_ack;
116
117 };
118
119
120 /**
121  * Linked list of requests from clients for our HELLO
122  * that were deferred.
123  */
124 struct HelloWaitList
125 {
126
127   /**
128    * This is a linked list.
129    */
130   struct HelloWaitList *next;
131
132   /**
133    * Reference back to our transport handle.
134    */
135   struct GNUNET_TRANSPORT_Handle *handle;
136
137   /**
138    * Callback to call once we got our HELLO.
139    */
140   GNUNET_TRANSPORT_ReceiveCallback rec;
141
142   /**
143    * Closure for rec.
144    */
145   void *rec_cls;
146
147   /**
148    * When to time out (call rec with NULL).
149    */
150   struct GNUNET_TIME_Absolute timeout;
151
152   /**
153    * Timeout task (used to trigger timeout,
154    * cancel if we get the HELLO in time).
155    */
156   GNUNET_SCHEDULER_TaskIdentifier task;
157
158
159 };
160
161
162 /**
163  * Opaque handle for a transmission-ready request.
164  */
165 struct GNUNET_TRANSPORT_TransmitHandle
166 {
167
168   /**
169    * We keep the transmit handles that are waiting for
170    * a transport-level connection in a doubly linked list.
171    */
172   struct GNUNET_TRANSPORT_TransmitHandle *next;
173
174   /**
175    * We keep the transmit handles that are waiting for
176    * a transport-level connection in a doubly linked list.
177    */
178   struct GNUNET_TRANSPORT_TransmitHandle *prev;
179
180   /**
181    * Handle of the main transport data structure.
182    */
183   struct GNUNET_TRANSPORT_Handle *handle;
184
185   /**
186    * Neighbour for this handle, can be NULL if the service
187    * is not yet connected to the target.
188    */
189   struct NeighbourList *neighbour;
190
191   /**
192    * Which peer is this transmission going to be for?  All
193    * zeros if it is control-traffic to the service.
194    */
195   struct GNUNET_PeerIdentity target;
196
197   /**
198    * Function to call when notify_size bytes are available
199    * for transmission.
200    */
201   GNUNET_NETWORK_TransmitReadyNotify notify;
202
203   /**
204    * Closure for notify.
205    */
206   void *notify_cls;
207
208   /**
209    * transmit_ready task Id.  The task is used to introduce the
210    * artificial delay that may be required to maintain the bandwidth
211    * limits.  Later, this will be the ID of the "transmit_timeout"
212    * task which is used to signal a timeout if the transmission could
213    * not be done in a timely fashion.
214    */
215   GNUNET_SCHEDULER_TaskIdentifier notify_delay_task;
216
217   /**
218    * Timeout for this request.
219    */
220   struct GNUNET_TIME_Absolute timeout;
221
222   /**
223    * How many bytes is our notify callback waiting for?
224    */
225   size_t notify_size;
226
227 };
228
229
230 /**
231  * Handle for the transport service (includes all of the
232  * state for the transport service).
233  */
234 struct GNUNET_TRANSPORT_Handle
235 {
236
237   /**
238    * Closure for the callbacks.
239    */
240   void *cls;
241
242   /**
243    * Function to call for received data.
244    */
245   GNUNET_TRANSPORT_ReceiveCallback rec;
246
247   /**
248    * function to call on connect events
249    */
250   GNUNET_TRANSPORT_NotifyConnect nc_cb;
251
252   /**
253    * function to call on disconnect events
254    */
255   GNUNET_TRANSPORT_NotifyDisconnect nd_cb;
256
257   /**
258    * The current HELLO message for this peer.  Updated
259    * whenever transports change their addresses.
260    */
261   struct GNUNET_HELLO_Message *my_hello;
262
263   /**
264    * My client connection to the transport service.
265    */
266   struct GNUNET_CLIENT_Connection *client;
267
268   /**
269    * Handle to our registration with the client for notification.
270    */
271   struct GNUNET_NETWORK_TransmitHandle *network_handle;
272
273   /**
274    * Linked list of transmit handles that are waiting for the
275    * transport to connect to the respective peer.  When we
276    * receive notification that the transport connected to a
277    * peer, we go over this list and check if someone has already
278    * requested a transmission to the new peer; if so, we trigger
279    * the next step.
280    */
281   struct GNUNET_TRANSPORT_TransmitHandle *connect_wait_head;
282
283   /**
284    * Linked list of transmit handles that are waiting for the
285    * transport to be ready for transmission to the respective
286    * peer.  When we
287    * receive notification that the transport disconnected from
288    * a peer, we go over this list and move the entry back to
289    * the connect_wait list.
290    */
291   struct GNUNET_TRANSPORT_TransmitHandle *connect_ready_head;
292
293   /**
294    * Linked list of pending requests for our HELLO.
295    */
296   struct HelloWaitList *hwl_head;
297
298   /**
299    * My scheduler.
300    */
301   struct GNUNET_SCHEDULER_Handle *sched;
302
303   /**
304    * My configuration.
305    */
306   struct GNUNET_CONFIGURATION_Handle *cfg;
307
308   /**
309    * Linked list of the current neighbours of this peer.
310    */
311   struct NeighbourList *neighbours;
312
313   /**
314    * ID of the task trying to reconnect to the
315    * service.
316    */
317   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
318
319   /**
320    * Delay until we try to reconnect.
321    */
322   struct GNUNET_TIME_Relative reconnect_delay;
323
324   /**
325    * Do we currently have a transmission pending?
326    * (schedule transmission was called but has not
327    * yet succeeded)?
328    */
329   int transmission_scheduled;
330 };
331
332
333 static struct NeighbourList *
334 find_neighbour (struct GNUNET_TRANSPORT_Handle *h,
335                 const struct GNUNET_PeerIdentity *peer)
336 {
337   struct NeighbourList *pos;
338
339   pos = h->neighbours;
340   while ((pos != NULL) &&
341          (0 != memcmp (peer, &pos->id, sizeof (struct GNUNET_PeerIdentity))))
342     pos = pos->next;
343   return pos;
344 }
345
346
347 /**
348  * Schedule the task to send one message from the
349  * connect_ready list to the service.
350  */
351 static void schedule_transmission (struct GNUNET_TRANSPORT_Handle *h);
352
353
354 /**
355  * Transmit message to client...
356  */
357 static size_t
358 transport_notify_ready (void *cls, size_t size, void *buf)
359 {
360   struct GNUNET_TRANSPORT_Handle *h = cls;
361   struct GNUNET_TRANSPORT_TransmitHandle *th;
362   struct NeighbourList *n;
363   size_t ret;
364   char *cbuf;
365
366   h->network_handle = NULL;
367   h->transmission_scheduled = GNUNET_NO;
368   if (buf == NULL)
369     {
370 #if DEBUG_TRANSPORT
371       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
372                   "Could not transmit to transport service, cancelling pending requests\n");
373 #endif
374       th = h->connect_ready_head;
375       if (th->next != NULL)
376         th->next->prev = NULL;
377       h->connect_ready_head = th->next;
378       if (NULL != (n = th->neighbour))
379         {
380           GNUNET_assert (n->transmit_handle == th);
381           n->transmit_handle = NULL;
382         }
383       GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL));
384       GNUNET_free (th);
385       return 0;
386     } 
387 #if DEBUG_TRANSPORT
388   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
389               "Ready to transmit %u bytes to transport service\n", size);
390 #endif
391   cbuf = buf;
392   ret = 0;
393   h->network_handle = NULL;
394   h->transmission_scheduled = GNUNET_NO;
395   do
396     {
397       th = h->connect_ready_head;
398       if (th->notify_delay_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
399         {
400           /* remove existing time out task (only applies if
401              this is not the first iteration of the loop) */
402           GNUNET_SCHEDULER_cancel (h->sched,
403                                    th->notify_delay_task);
404           th->notify_delay_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
405         }
406       GNUNET_assert (th->notify_size <= size);
407       if (th->next != NULL)
408         th->next->prev = NULL;
409       h->connect_ready_head = th->next;
410       if (NULL != (n = th->neighbour))
411         {
412           GNUNET_assert (n->transmit_handle == th);
413           n->transmit_handle = NULL;
414         }
415       ret += th->notify (th->notify_cls, size, &cbuf[ret]);
416       GNUNET_free (th);
417       if (n != NULL)
418         n->last_sent += ret;
419       size -= ret;
420     }
421   while ((h->connect_ready_head != NULL) &&
422          (h->connect_ready_head->notify_size <= size));
423   if (h->connect_ready_head != NULL)
424     schedule_transmission (h);
425 #if DEBUG_TRANSPORT
426   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
427               "Transmitting %u bytes to transport service\n", ret);
428 #endif
429   return ret;
430 }
431
432
433 /**
434  * Schedule the task to send one message from the
435  * connect_ready list to the service.
436  */
437 static void
438 schedule_transmission (struct GNUNET_TRANSPORT_Handle *h)
439 {
440   struct GNUNET_TRANSPORT_TransmitHandle *th;
441
442   GNUNET_assert (NULL == h->network_handle);
443   if (h->client == NULL)
444     {
445       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
446                   "Could not yet schedule transmission: we are not yet connected to the transport service!\n");
447       return; /* not yet connected */
448     }
449   th = h->connect_ready_head;
450   if (th == NULL)
451     return; /* no request pending */
452   if (th->notify_delay_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
453     {
454       /* remove existing time out task, will be integrated
455          with transmit_ready notification! */
456       GNUNET_SCHEDULER_cancel (h->sched,
457                                th->notify_delay_task);
458       th->notify_delay_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
459     }
460   h->transmission_scheduled = GNUNET_YES;
461   h->network_handle = GNUNET_CLIENT_notify_transmit_ready (h->client,
462                                                            th->notify_size,
463                                                            GNUNET_TIME_absolute_get_remaining
464                                                            (th->timeout),
465                                                            &transport_notify_ready,
466                                                            h);
467   GNUNET_assert (NULL != h->network_handle);
468 }
469
470
471 /**
472  * Insert the given transmit handle in the given sorted
473  * doubly linked list based on timeout.
474  *
475  * @param head pointer to the head of the linked list
476  * @param th element to insert into the list
477  */
478 static void
479 insert_transmit_handle (struct GNUNET_TRANSPORT_TransmitHandle **head,
480                         struct GNUNET_TRANSPORT_TransmitHandle *th)
481 {
482   struct GNUNET_TRANSPORT_TransmitHandle *pos;
483   struct GNUNET_TRANSPORT_TransmitHandle *prev;
484
485   pos = *head;
486   prev = NULL;
487   while ((pos != NULL) && (pos->timeout.value < th->timeout.value))
488     {
489       prev = pos;
490       pos = pos->next;
491     }
492   if (prev == NULL)
493     {
494       th->next = *head;
495       if (th->next != NULL)
496         th->next->prev = th;
497       *head = th;
498     }
499   else
500     {
501       th->next = pos;
502       th->prev = prev;
503       prev->next = th;
504       if (pos != NULL)
505         pos->prev = th;
506     }
507 }
508
509
510 /**
511  * Cancel a pending notify delay task (if pending) and also remove the
512  * given transmit handle from whatever list is on.
513  *
514  * @param th handle for the transmission request to manipulate
515  */
516 static void
517 remove_from_any_list (struct GNUNET_TRANSPORT_TransmitHandle *th)
518 {
519   struct GNUNET_TRANSPORT_Handle *h;
520
521   h = th->handle;
522   if (th->notify_delay_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
523     {
524       GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task);
525       th->notify_delay_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
526     }
527   if (th->prev == NULL)
528     {
529       if (th == h->connect_wait_head)
530         h->connect_wait_head = th->next;
531       else
532         h->connect_ready_head = th->next;
533     }
534   else
535     {
536       th->prev->next = th->next;
537     }
538   if (th->next != NULL)
539     th->next->prev = th->prev;
540 }
541
542
543 /**
544  * Called when our transmit request timed out before any transport
545  * reported success connecting to the desired peer or before the
546  * transport was ready to receive.  Signal error and free
547  * TransmitHandle.
548  */
549 static void
550 transmit_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
551 {
552   struct GNUNET_TRANSPORT_TransmitHandle *th = cls;
553
554   th->notify_delay_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
555   if (th->neighbour != NULL)
556     th->neighbour->transmit_handle = NULL;
557 #if DEBUG_TRANSPORT
558   GNUNET_log (GNUNET_ERROR_TYPE_INFO, 
559               "Request for transmission to peer `%s' timed out.\n",
560               GNUNET_i2s(&th->target));
561 #endif
562   remove_from_any_list (th);
563   th->notify (th->notify_cls, 0, NULL);
564   GNUNET_free (th);
565 }
566
567
568
569
570 /**
571  * Queue control request for transmission to the transport
572  * service.
573  *
574  * @param size number of bytes to be transmitted
575  * @param at_head request must be added to the head of the queue
576  *        (otherwise request will be appended)
577  * @param timeout how long this transmission can wait (at most)
578  * @param notify function to call to get the content
579  * @param notify_cls closure for notify
580  */
581 static void
582 schedule_control_transmit (struct GNUNET_TRANSPORT_Handle *h,
583                            size_t size,
584                            int at_head,
585                            struct GNUNET_TIME_Relative timeout,
586                            GNUNET_NETWORK_TransmitReadyNotify notify,
587                            void *notify_cls)
588 {
589   struct GNUNET_TRANSPORT_TransmitHandle *th;
590
591   th = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_TransmitHandle));
592   th->handle = h;
593   th->notify = notify;
594   th->notify_cls = notify_cls;
595   th->timeout = GNUNET_TIME_relative_to_absolute (timeout);
596   th->notify_size = size;
597   th->notify_delay_task 
598     = GNUNET_SCHEDULER_add_delayed (h->sched,
599                                     GNUNET_NO,
600                                     GNUNET_SCHEDULER_PRIORITY_KEEP,
601                                     GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
602                                     timeout,
603                                     &transmit_timeout, th);    
604   if (at_head)
605     {
606       th->next = h->connect_ready_head;
607       h->connect_ready_head = th;
608       if (th->next != NULL)
609         th->next->prev = th;
610     }
611   else
612     {
613       insert_transmit_handle (&h->connect_ready_head, th);
614     }
615   if (GNUNET_NO == h->transmission_scheduled)
616     schedule_transmission (h);
617 }
618
619
620 /**
621  * Update the quota values for the given neighbour now.
622  */
623 static void
624 update_quota (struct NeighbourList *n)
625 {
626   struct GNUNET_TIME_Relative delta;
627   uint64_t allowed;
628   uint64_t remaining;
629
630   delta = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
631   allowed = delta.value * n->quota_out;
632   if (n->last_sent < allowed)
633     {
634       remaining = allowed - n->last_sent;
635       if (n->quota_out > 0)
636         remaining /= n->quota_out;
637       else
638         remaining = 0;
639       if (remaining > MAX_BANDWIDTH_CARRY)
640         remaining = MAX_BANDWIDTH_CARRY;
641       n->last_sent = 0;
642       n->last_quota_update = GNUNET_TIME_absolute_get ();
643       n->last_quota_update.value -= remaining;
644     }
645   else
646     {
647       n->last_sent -= allowed;
648       n->last_quota_update = GNUNET_TIME_absolute_get ();
649     }
650 }
651
652
653 struct SetQuotaContext
654 {
655   struct GNUNET_TRANSPORT_Handle *handle;
656
657   struct GNUNET_PeerIdentity target;
658
659   GNUNET_SCHEDULER_Task cont;
660
661   void *cont_cls;
662
663   struct GNUNET_TIME_Absolute timeout;
664
665   uint32_t quota_in;
666 };
667
668
669 static size_t
670 send_set_quota (void *cls, size_t size, void *buf)
671 {
672   struct SetQuotaContext *sqc = cls;
673   struct QuotaSetMessage *msg;
674
675   if (buf == NULL)
676     {
677       GNUNET_SCHEDULER_add_continuation (sqc->handle->sched,
678                                          GNUNET_NO,
679                                          sqc->cont,
680                                          sqc->cont_cls,
681                                          GNUNET_SCHEDULER_REASON_TIMEOUT);
682       GNUNET_free (sqc);
683       return 0;
684     }
685 #if DEBUG_TRANSPORT
686   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
687               "Transmitting `%s' request with respect to `%4s'.\n",
688               "SET_QUOTA", GNUNET_i2s (&sqc->target));
689 #endif
690   GNUNET_assert (size >= sizeof (struct QuotaSetMessage));
691   msg = buf;
692   msg->header.size = htons (sizeof (struct QuotaSetMessage));
693   msg->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
694   msg->quota_in = htonl (sqc->quota_in);
695   memcpy (&msg->peer, &sqc->target, sizeof (struct GNUNET_PeerIdentity));
696   if (sqc->cont != NULL)
697     GNUNET_SCHEDULER_add_continuation (sqc->handle->sched,
698                                        GNUNET_NO,
699                                        sqc->cont,
700                                        sqc->cont_cls,
701                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
702   GNUNET_free (sqc);
703   return sizeof (struct QuotaSetMessage);
704 }
705
706
707 /**
708  * Set the share of incoming bandwidth for the given
709  * peer to the specified amount.
710  *
711  * @param handle connection to transport service
712  * @param target who's bandwidth quota is being changed
713  * @param quota_in incoming bandwidth quota in bytes per ms; 0 can
714  *        be used to force all traffic to be discarded
715  * @param quota_out outgoing bandwidth quota in bytes per ms; 0 can
716  *        be used to force all traffic to be discarded
717  * @param timeout how long to wait until signaling failure if
718  *        we can not communicate the quota change
719  * @param cont continuation to call when done, will be called
720  *        either with reason "TIMEOUT" or with reason "PREREQ_DONE"
721  * @param cont_cls closure for continuation
722  */
723 void
724 GNUNET_TRANSPORT_set_quota (struct GNUNET_TRANSPORT_Handle *handle,
725                             const struct GNUNET_PeerIdentity *target,
726                             uint32_t quota_in,
727                             uint32_t quota_out,
728                             struct GNUNET_TIME_Relative timeout,
729                             GNUNET_SCHEDULER_Task cont, void *cont_cls)
730 {
731   struct NeighbourList *n;
732   struct SetQuotaContext *sqc;
733
734   n = find_neighbour (handle, target);
735   if (n != NULL)
736     {
737       update_quota (n);
738       if (n->quota_out < quota_out)
739         n->last_quota_update = GNUNET_TIME_absolute_get ();
740       n->quota_out = quota_out;
741     }
742   sqc = GNUNET_malloc (sizeof (struct SetQuotaContext));
743   sqc->handle = handle;
744   sqc->target = *target;
745   sqc->cont = cont;
746   sqc->cont_cls = cont_cls;
747   sqc->timeout = GNUNET_TIME_relative_to_absolute (timeout);
748   sqc->quota_in = quota_in;
749   schedule_control_transmit (handle,
750                              sizeof (struct QuotaSetMessage),
751                              GNUNET_NO, timeout, &send_set_quota, sqc);
752 }
753
754
755 /**
756  * A "get_hello" request has timed out.  Signal the client
757  * and clean up.
758  */
759 static void
760 hello_wait_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
761 {
762   struct HelloWaitList *hwl = cls;
763   struct HelloWaitList *pos;
764   struct HelloWaitList *prev;
765
766   prev = NULL;
767   pos = hwl->handle->hwl_head;
768   while (pos != hwl)
769     {
770       GNUNET_assert (pos != NULL);
771       prev = pos;
772       pos = pos->next;
773     }
774   if (prev == NULL)
775     hwl->handle->hwl_head = hwl->next;
776   else
777     prev->next = hwl->next;
778   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
779               _("Timeout trying to obtain `%s' from transport service.\n"),
780               "HELLO");
781   /* signal timeout */
782   if (hwl->rec != NULL)
783     hwl->rec (hwl->rec_cls, GNUNET_TIME_UNIT_ZERO, NULL, NULL);
784   GNUNET_free (hwl);
785 }
786
787
788 /**
789  * Obtain the HELLO message for this peer.
790  *
791  * @param handle connection to transport service
792  * @param timeout how long to wait for the HELLO
793  * @param rec function to call with the HELLO, sender will be our peer
794  *            identity; message and sender will be NULL on timeout
795  *            (handshake with transport service pending/failed).
796  *             cost estimate will be 0.
797  * @param rec_cls closure for rec
798  */
799 void
800 GNUNET_TRANSPORT_get_hello (struct GNUNET_TRANSPORT_Handle *handle,
801                             struct GNUNET_TIME_Relative timeout,
802                             GNUNET_TRANSPORT_ReceiveCallback rec,
803                             void *rec_cls)
804 {
805   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded pk;
806   struct GNUNET_PeerIdentity me;
807   struct HelloWaitList *hwl;
808
809   if (handle->my_hello == NULL)
810     {
811       hwl = GNUNET_malloc (sizeof (struct HelloWaitList));
812       hwl->next = handle->hwl_head;
813       handle->hwl_head = hwl;
814       hwl->handle = handle;
815       hwl->rec = rec;
816       hwl->rec_cls = rec_cls;
817       hwl->timeout = GNUNET_TIME_relative_to_absolute (timeout);
818       hwl->task = GNUNET_SCHEDULER_add_delayed (handle->sched,
819                                                 GNUNET_YES,
820                                                 GNUNET_SCHEDULER_PRIORITY_KEEP,
821                                                 GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
822                                                 timeout,
823                                                 &hello_wait_timeout, hwl);
824       return;
825     }
826   GNUNET_assert (GNUNET_OK == GNUNET_HELLO_get_key (handle->my_hello, &pk));
827   GNUNET_CRYPTO_hash (&pk,
828                       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
829                       &me.hashPubKey);
830
831   rec (rec_cls,
832        GNUNET_TIME_UNIT_ZERO,
833        &me, (const struct GNUNET_MessageHeader *) handle->my_hello);
834 }
835
836
837 static size_t
838 send_hello (void *cls, size_t size, void *buf)
839 {
840   struct GNUNET_MessageHeader *hello = cls;
841   uint16_t msize;
842
843   if (buf == NULL)
844     {
845 #if DEBUG_TRANSPORT
846       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
847                   "Timeout while trying to transmit `%s' request.\n",
848                   "HELLO");
849 #endif
850       GNUNET_free (hello);
851       return 0;
852     }
853 #if DEBUG_TRANSPORT
854   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
855               "Transmitting `%s' request.\n", "HELLO");
856 #endif
857   msize = ntohs (hello->size);
858   GNUNET_assert (size >= msize);
859   memcpy (buf, hello, msize);
860   GNUNET_free (hello);
861   return msize;
862 }
863
864
865 /**
866  * Offer the transport service the HELLO of another peer.  Note that
867  * the transport service may just ignore this message if the HELLO is
868  * malformed or useless due to our local configuration.
869  *
870  * @param handle connection to transport service
871  * @param hello the hello message
872  */
873 void
874 GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle,
875                               const struct GNUNET_MessageHeader *hello)
876 {
877   struct GNUNET_MessageHeader *hc;
878   uint16_t size;
879
880   if (handle->client == NULL)
881     {
882 #if DEBUG_TRANSPORT
883       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
884                   "Not connected to transport service, dropping offered HELLO\n");
885 #endif
886       return;
887     }
888   GNUNET_break (ntohs (hello->type) == GNUNET_MESSAGE_TYPE_HELLO);
889   size = ntohs (hello->size);
890   GNUNET_break (size >= sizeof (struct GNUNET_MessageHeader));
891   hc = GNUNET_malloc (size);
892   memcpy (hc, hello, size);
893   schedule_control_transmit (handle,
894                              size,
895                              GNUNET_NO, OFFER_HELLO_TIMEOUT, &send_hello, hc);
896 }
897
898
899 /**
900  * Function we use for handling incoming messages.
901  */
902 static void demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg);
903
904
905 static size_t
906 send_start (void *cls, size_t size, void *buf)
907 {
908   struct GNUNET_MessageHeader *s = buf;
909
910   if (buf == NULL)
911     {
912 #if DEBUG_TRANSPORT
913       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
914                   "Timeout while trying to transmit `%s' request.\n",
915                   "START");
916 #endif
917       return 0;
918     }
919 #if DEBUG_TRANSPORT
920   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
921               "Transmitting `%s' request.\n", "START");
922 #endif
923   GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader));
924   s->size = htons (sizeof (struct GNUNET_MessageHeader));
925   s->type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_START);
926   return sizeof (struct GNUNET_MessageHeader);
927 }
928
929
930 /**
931  * We're ready to transmit the request that the transport service
932  * should connect to a new peer.  In addition to sending the
933  * request, schedule the next phase for the transmission processing
934  * that caused the connect request in the first place.
935  */
936 static size_t
937 request_connect (void *cls, size_t size, void *buf)
938 {
939   struct GNUNET_TRANSPORT_TransmitHandle *th = cls;
940   struct TryConnectMessage *tcm;
941   struct GNUNET_TRANSPORT_Handle *h;
942
943   GNUNET_assert (th->notify_delay_task == GNUNET_SCHEDULER_NO_PREREQUISITE_TASK);
944   h = th->handle;
945   if (buf == NULL)
946     {
947 #if DEBUG_TRANSPORT
948       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
949                   "Failed to transmit `%s' request for `%4s' to service.\n",
950                   "TRY_CONNECT",
951                   GNUNET_i2s(&th->target));
952 #endif
953       th->notify (th->notify_cls, 0, NULL);
954       GNUNET_free (th);
955       return 0;
956     }
957 #if DEBUG_TRANSPORT
958   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
959               "Transmitting `%s' message for `%4s'.\n",
960               "TRY_CONNECT", GNUNET_i2s (&th->target));
961 #endif
962   GNUNET_assert (size >= sizeof (struct TryConnectMessage));
963   tcm = buf;
964   tcm->header.size = htons (sizeof (struct TryConnectMessage));
965   tcm->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_TRY_CONNECT);
966   tcm->reserved = htonl (0);
967   memcpy (&tcm->peer, &th->target, sizeof (struct GNUNET_PeerIdentity));
968   th->notify_delay_task
969     = GNUNET_SCHEDULER_add_delayed (h->sched,
970                                     GNUNET_NO,
971                                     GNUNET_SCHEDULER_PRIORITY_KEEP,
972                                     GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
973                                     GNUNET_TIME_absolute_get_remaining
974                                     (th->timeout), &transmit_timeout, th);
975   insert_transmit_handle (&h->connect_wait_head, th);
976   return sizeof (struct TryConnectMessage);
977 }
978
979
980 /**
981  * Schedule a request to connect to the given
982  * neighbour (and if successful, add the specified
983  * handle to the wait list).
984  *
985  * @param th handle for a request to transmit once we
986  *        have connected
987  */
988 static void
989 try_connect (struct GNUNET_TRANSPORT_TransmitHandle *th)
990 {
991   GNUNET_assert (th->notify_delay_task == GNUNET_SCHEDULER_NO_PREREQUISITE_TASK);  
992   schedule_control_transmit (th->handle,
993                              sizeof (struct TryConnectMessage),
994                              GNUNET_NO,
995                              GNUNET_TIME_absolute_get_remaining (th->timeout),
996                              &request_connect, th);
997 }
998
999
1000 /**
1001  * Remove neighbour from our list
1002  */
1003 static void
1004 remove_neighbour (struct GNUNET_TRANSPORT_Handle *h,
1005                   const struct GNUNET_PeerIdentity *peer)
1006 {
1007   struct NeighbourList *prev;
1008   struct NeighbourList *pos;
1009   struct GNUNET_TRANSPORT_TransmitHandle *th;
1010
1011   prev = NULL;
1012   pos = h->neighbours;
1013   while ((pos != NULL) &&
1014          (0 != memcmp (peer, &pos->id, sizeof (struct GNUNET_PeerIdentity))))
1015     {
1016       prev = pos;
1017       pos = pos->next;
1018     }
1019   if (pos == NULL)
1020     {
1021       GNUNET_break (0);
1022       return;
1023     }
1024   if (prev == NULL)
1025     h->neighbours = pos->next;
1026   else
1027     prev->next = pos->next;
1028   if (NULL != (th = pos->transmit_handle))
1029     {
1030       pos->transmit_handle = NULL;
1031       th->neighbour = NULL;
1032       remove_from_any_list (th);
1033       try_connect (th);
1034     }
1035   if (h->nc_cb != NULL)
1036     h->nd_cb (h->cls, peer);
1037   GNUNET_free (pos);
1038 }
1039
1040
1041 /**
1042  * Try again to connect to transport service.
1043  */
1044 static void
1045 reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1046 {
1047   struct GNUNET_TRANSPORT_Handle *h = cls;
1048   struct GNUNET_TRANSPORT_TransmitHandle *pos;
1049   struct NeighbourList *n;
1050
1051   /* Forget about all neighbours that we used to be connected
1052      to */
1053   while (NULL != (n = h->neighbours))
1054     remove_neighbour (h, &n->id);
1055 #if DEBUG_TRANSPORT
1056   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service.\n");
1057 #endif
1058   GNUNET_assert (h->client == NULL);
1059   h->reconnect_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1060   h->client = GNUNET_CLIENT_connect (h->sched, "transport", h->cfg);
1061   GNUNET_assert (h->client != NULL);
1062   /* make sure we don't send "START" twice,
1063      remove existing entry from queue (if present) */
1064   pos = h->connect_ready_head;
1065   while (pos != NULL)
1066     {
1067       if (pos->notify == &send_start)
1068         {
1069           if (pos->prev == NULL)
1070             h->connect_ready_head = pos->next;
1071           else
1072             pos->prev->next = pos->next;
1073           if (pos->next != NULL)
1074             pos->next->prev = pos->prev;
1075           GNUNET_assert (pos->neighbour == NULL);
1076           GNUNET_free (pos);
1077           break;
1078         }
1079       pos = pos->next;
1080     }
1081   schedule_control_transmit (h,
1082                              sizeof (struct GNUNET_MessageHeader),
1083                              GNUNET_YES,
1084                              GNUNET_TIME_UNIT_FOREVER_REL, &send_start, NULL);
1085   GNUNET_CLIENT_receive (h->client,
1086                          &demultiplexer, h, GNUNET_TIME_UNIT_FOREVER_REL);
1087 }
1088
1089
1090 /**
1091  * Function that will schedule the job that will try
1092  * to connect us again to the client.
1093  */
1094 static void
1095 schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h)
1096 {
1097 #if DEBUG_TRANSPORT
1098   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1099               "Scheduling task to reconnect to transport service in %llu ms.\n",
1100               h->reconnect_delay.value);
1101 #endif
1102   GNUNET_assert (h->client == NULL);
1103   GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_PREREQUISITE_TASK);
1104   h->reconnect_task
1105     = GNUNET_SCHEDULER_add_delayed (h->sched,
1106                                     GNUNET_NO,
1107                                     GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1108                                     GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
1109                                     h->reconnect_delay, &reconnect, h);
1110   h->reconnect_delay = GNUNET_TIME_UNIT_SECONDS;
1111 }
1112
1113
1114 /**
1115  * We are connected to the respective peer, check the
1116  * bandwidth limits and schedule the transmission.
1117  */
1118 static void schedule_request (struct GNUNET_TRANSPORT_TransmitHandle *th);
1119
1120
1121 /**
1122  * Function called by the scheduler when the timeout
1123  * for bandwidth availablility for the target
1124  * neighbour is reached.
1125  */
1126 static void
1127 transmit_ready (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1128 {
1129   struct GNUNET_TRANSPORT_TransmitHandle *th = cls;
1130
1131   th->notify_delay_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1132   schedule_request (th);
1133 }
1134
1135
1136 /**
1137  * Remove the given transmit handle from the wait list.  Does NOT free
1138  * it.
1139  */
1140 static void
1141 remove_from_wait_list (struct GNUNET_TRANSPORT_TransmitHandle *th)
1142 {
1143   if (th->prev == NULL)
1144     th->handle->connect_wait_head = th->next;
1145   else
1146     th->prev->next = th->next;
1147   if (th->next != NULL)
1148     th->next->prev = th->prev;
1149 }
1150
1151
1152 /**
1153  * We are connected to the respective peer, check the
1154  * bandwidth limits and schedule the transmission.
1155  */
1156 static void
1157 schedule_request (struct GNUNET_TRANSPORT_TransmitHandle *th)
1158 {
1159   struct GNUNET_TRANSPORT_Handle *h;
1160   struct GNUNET_TIME_Relative duration;
1161   struct NeighbourList *n;
1162   uint64_t available;
1163
1164   h = th->handle;
1165   n = th->neighbour;
1166   if (th->notify_delay_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
1167     {
1168       GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task);
1169       th->notify_delay_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1170     }
1171   /* check outgoing quota */
1172   duration = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
1173   if (duration.value > MIN_QUOTA_REFRESH_TIME)
1174     {
1175       update_quota (n);
1176       duration = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
1177     }
1178   available = duration.value * n->quota_out;
1179   if (available < n->last_sent + th->notify_size)
1180     {
1181       /* calculate how much bandwidth we'd still need to
1182          accumulate and based on that how long we'll have
1183          to wait... */
1184       available = n->last_sent + th->notify_size - available;
1185       duration = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
1186                                                 available / n->quota_out);
1187       if (th->timeout.value <
1188           GNUNET_TIME_relative_to_absolute (duration).value)
1189         {
1190           /* signal timeout! */
1191 #if DEBUG_TRANSPORT
1192           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1193                       "Would need %llu ms before bandwidth is available for delivery to `%4s', that is too long.  Signaling timeout.\n",
1194                       duration.value,
1195                       GNUNET_i2s(&th->target));
1196 #endif
1197           remove_from_wait_list (th);
1198           th->notify (th->notify_cls, 0, NULL);
1199           GNUNET_free (th);
1200           return;
1201         }
1202 #if DEBUG_TRANSPORT
1203       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1204                   "Need more bandwidth, delaying delivery to `%4s' by %llu ms\n",
1205                   GNUNET_i2s(&th->target),
1206                   duration.value);
1207 #endif
1208       th->notify_delay_task
1209         = GNUNET_SCHEDULER_add_delayed (h->sched,
1210                                         GNUNET_NO,
1211                                         GNUNET_SCHEDULER_PRIORITY_KEEP,
1212                                         GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
1213                                         duration, &transmit_ready, th);
1214       return;
1215     }
1216 #if DEBUG_TRANSPORT
1217   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1218               "Bandwidth available for transmission to `%4s'\n",
1219               GNUNET_i2s (&n->id));
1220 #endif
1221   if (GNUNET_NO == n->transmit_ok)
1222     {
1223       /* we may be ready, but transport service is not;
1224          wait for SendOkMessage or timeout */
1225 #if DEBUG_TRANSPORT
1226       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1227                   "Need to wait for transport service `%s' message\n",
1228                   "SEND_OK");
1229 #endif
1230       th->notify_delay_task
1231         = GNUNET_SCHEDULER_add_delayed (h->sched,
1232                                         GNUNET_NO,
1233                                         GNUNET_SCHEDULER_PRIORITY_KEEP,
1234                                         GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
1235                                         GNUNET_TIME_absolute_get_remaining
1236                                         (th->timeout), &transmit_timeout, th);
1237       return;
1238     }
1239   n->transmit_ok = GNUNET_NO;
1240   remove_from_wait_list (th);
1241 #if DEBUG_TRANSPORT
1242   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Moving message for `%4s' to ready list\n",
1243               GNUNET_i2s(&n->id));
1244 #endif
1245   insert_transmit_handle (&h->connect_ready_head, th);
1246   if (GNUNET_NO == h->transmission_scheduled)
1247     schedule_transmission (h);
1248 }
1249
1250
1251 /**
1252  * Add neighbour to our list
1253  */
1254 static void
1255 add_neighbour (struct GNUNET_TRANSPORT_Handle *h,
1256                uint32_t quota_out,
1257                struct GNUNET_TIME_Relative latency,
1258                const struct GNUNET_PeerIdentity *pid)
1259 {
1260   struct NeighbourList *n;
1261   struct GNUNET_TRANSPORT_TransmitHandle *prev;
1262   struct GNUNET_TRANSPORT_TransmitHandle *pos;
1263   struct GNUNET_TRANSPORT_TransmitHandle *next;
1264
1265   /* check for duplicates */
1266   if (NULL != find_neighbour (h, pid))
1267     {
1268       GNUNET_break (0);
1269       return;
1270     }
1271 #if DEBUG_TRANSPORT
1272   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1273               "Creating entry for new neighbour `%4s'.\n", GNUNET_i2s (pid));
1274 #endif
1275   n = GNUNET_malloc (sizeof (struct NeighbourList));
1276   n->id = *pid;
1277   n->last_quota_update = GNUNET_TIME_absolute_get ();
1278   n->quota_out = quota_out;
1279   n->next = h->neighbours;
1280   n->transmit_ok = GNUNET_YES;
1281   h->neighbours = n;
1282   if (h->nc_cb != NULL)
1283     h->nc_cb (h->cls, &n->id, latency);
1284   prev = NULL;
1285   pos = h->connect_wait_head;
1286   while (pos != NULL)
1287     {
1288       next = pos->next;
1289       if (0 == memcmp (pid,
1290                        &pos->target, sizeof (struct GNUNET_PeerIdentity)))
1291         {
1292           pos->neighbour = n;
1293           GNUNET_assert (NULL == n->transmit_handle);
1294           n->transmit_handle = pos;
1295           if (prev == NULL)
1296             h->connect_wait_head = next;
1297           else
1298             prev->next = next;
1299           if (GNUNET_YES == n->received_ack)
1300             {
1301 #if DEBUG_TRANSPORT
1302               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1303                           "Found pending request for `%4s' will trigger it now.\n",
1304                           GNUNET_i2s (&pos->target));
1305 #endif
1306               if (pos->notify_delay_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
1307                 {
1308                   GNUNET_SCHEDULER_cancel (h->sched, pos->notify_delay_task);
1309                   pos->notify_delay_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1310                 }
1311               schedule_request (pos);
1312             }
1313           else
1314             {
1315 #if DEBUG_TRANSPORT
1316               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1317                           "Found pending request for `%4s' but still need `%s' before proceeding.\n",
1318                           GNUNET_i2s (&pos->target),
1319                           "ACK");
1320 #endif
1321             }
1322           break;
1323         }
1324       prev = pos;
1325       pos = next;
1326     }
1327 }
1328
1329
1330 /**
1331  * Connect to the transport service.  Note that the connection may
1332  * complete (or fail) asynchronously.
1333  *
1334
1335  * @param sched scheduler to use
1336  * @param cfg configuration to use
1337  * @param cls closure for the callbacks
1338  * @param rec receive function to call
1339  * @param nc function to call on connect events
1340  * @param dc function to call on disconnect events
1341  */
1342 struct GNUNET_TRANSPORT_Handle *
1343 GNUNET_TRANSPORT_connect (struct GNUNET_SCHEDULER_Handle *sched,
1344                           struct GNUNET_CONFIGURATION_Handle *cfg,
1345                           void *cls,
1346                           GNUNET_TRANSPORT_ReceiveCallback rec,
1347                           GNUNET_TRANSPORT_NotifyConnect nc,
1348                           GNUNET_TRANSPORT_NotifyDisconnect nd)
1349 {
1350   struct GNUNET_TRANSPORT_Handle *ret;
1351
1352   GNUNET_ARM_start_service ("peerinfo",
1353                             cfg, sched, START_SERVICE_TIMEOUT, NULL, NULL);
1354   GNUNET_ARM_start_service ("transport",
1355                             cfg, sched, START_SERVICE_TIMEOUT, NULL, NULL);
1356   ret = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_Handle));
1357   ret->sched = sched;
1358   ret->cfg = cfg;
1359   ret->cls = cls;
1360   ret->rec = rec;
1361   ret->nc_cb = nc;
1362   ret->nd_cb = nd;
1363   ret->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
1364   schedule_reconnect (ret);
1365   return ret;
1366 }
1367
1368
1369 /**
1370  * These stop activities must be run in a fresh
1371  * scheduler that is NOT in shutdown mode.
1372  */
1373 static void
1374 stop_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1375 {
1376   struct GNUNET_TRANSPORT_Handle *handle = cls;
1377   GNUNET_ARM_stop_service ("transport",
1378                            handle->cfg,
1379                            tc->sched, STOP_SERVICE_TIMEOUT, NULL, NULL);
1380   GNUNET_ARM_stop_service ("peerinfo",
1381                            handle->cfg,
1382                            tc->sched, STOP_SERVICE_TIMEOUT, NULL, NULL);
1383 }
1384
1385
1386 /**
1387  * Disconnect from the transport service.
1388  */
1389 void
1390 GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle)
1391 {
1392   struct GNUNET_TRANSPORT_TransmitHandle *th;
1393   struct NeighbourList *n;
1394   struct HelloWaitList *hwl;
1395   struct GNUNET_CLIENT_Connection *client;
1396
1397 #if DEBUG_TRANSPORT
1398   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transport disconnect called!\n");
1399 #endif
1400   while (NULL != (th = handle->connect_ready_head))
1401     {
1402       handle->connect_ready_head = th->next;
1403       th->notify (th->notify_cls, 0, NULL);
1404       GNUNET_free (th);
1405     }
1406   while (NULL != (th = handle->connect_wait_head))
1407     {
1408       handle->connect_wait_head = th->next;
1409       if (th->notify_delay_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
1410         {
1411           GNUNET_SCHEDULER_cancel (handle->sched, th->notify_delay_task);
1412           th->notify_delay_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1413         }
1414       th->notify (th->notify_cls, 0, NULL);
1415       GNUNET_free (th);
1416     }
1417   while (NULL != (n = handle->neighbours))
1418     {
1419       handle->neighbours = n->next;
1420       GNUNET_free (n);
1421     }
1422   while (NULL != (hwl = handle->hwl_head))
1423     {
1424       handle->hwl_head = hwl->next;
1425       GNUNET_SCHEDULER_cancel (handle->sched, hwl->task);
1426       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1427                   _("Disconnect while trying to obtain `%s' from transport service.\n"),
1428                   "HELLO");
1429       if (hwl->rec != NULL)
1430         hwl->rec (hwl->rec_cls, GNUNET_TIME_UNIT_ZERO, NULL, NULL);
1431       GNUNET_free (hwl);
1432     }
1433   if (handle->reconnect_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
1434     {
1435       GNUNET_SCHEDULER_cancel (handle->sched, handle->reconnect_task);
1436       handle->reconnect_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1437     }
1438   GNUNET_free_non_null (handle->my_hello);
1439   handle->my_hello = NULL;
1440   GNUNET_SCHEDULER_run (&stop_task, handle);
1441   if (NULL != (client = handle->client))
1442     {
1443 #if DEBUG_TRANSPORT
1444       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1445                   "Disconnecting from transport service for good.\n");
1446 #endif
1447       handle->client = NULL;
1448       GNUNET_CLIENT_disconnect (client);
1449     }
1450   if (client == NULL)
1451     GNUNET_free (handle);
1452 }
1453
1454
1455 /**
1456  * Type of a function to call when we receive a message
1457  * from the service.
1458  *
1459  * @param cls closure
1460  * @param msg message received, NULL on timeout or fatal error
1461  */
1462 static void
1463 demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg)
1464 {
1465   struct GNUNET_TRANSPORT_Handle *h = cls;
1466   const struct DisconnectInfoMessage *dim;
1467   const struct ConnectInfoMessage *cim;
1468   const struct InboundMessage *im;
1469   const struct GNUNET_MessageHeader *imm;
1470   const struct SendOkMessage *okm;
1471   struct HelloWaitList *hwl;
1472   struct NeighbourList *n;
1473   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded pkey;
1474   struct GNUNET_PeerIdentity me;
1475   struct GNUNET_TRANSPORT_TransmitHandle *th;
1476   uint16_t size;
1477
1478   if ((msg == NULL) || (h->client == NULL))
1479     {
1480       if (h->client != NULL)
1481         {
1482 #if DEBUG_TRANSPORT
1483           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1484                       "Error receiving from transport service, disconnecting temporarily.\n");
1485 #endif
1486           if (h->network_handle != NULL)
1487             {
1488               GNUNET_NETWORK_notify_transmit_ready_cancel (h->network_handle);
1489               h->network_handle = NULL;
1490               h->transmission_scheduled = GNUNET_NO;
1491               th = h->connect_ready_head;
1492               /* add timeout again, we cancelled the transmit_ready task! */
1493               GNUNET_assert (th->notify_delay_task == GNUNET_SCHEDULER_NO_PREREQUISITE_TASK);
1494               th->notify_delay_task 
1495                 = GNUNET_SCHEDULER_add_delayed (h->sched,
1496                                                 GNUNET_NO,
1497                                                 GNUNET_SCHEDULER_PRIORITY_KEEP,
1498                                                 GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
1499                                                 GNUNET_TIME_absolute_get_remaining(th->timeout),
1500                                                 &transmit_timeout, 
1501                                                 th);    
1502             }
1503           GNUNET_CLIENT_disconnect (h->client);
1504           h->client = NULL;
1505           schedule_reconnect (h);
1506         }
1507       else
1508         {
1509           /* shutdown initiated from 'GNUNET_TRANSPORT_disconnect',
1510              finish clean up work! */
1511           GNUNET_free (h);
1512         }
1513       return;
1514     }
1515   GNUNET_CLIENT_receive (h->client,
1516                          &demultiplexer, h, GNUNET_TIME_UNIT_FOREVER_REL);
1517   size = ntohs (msg->size);
1518   switch (ntohs (msg->type))
1519     {
1520     case GNUNET_MESSAGE_TYPE_HELLO:
1521       if (GNUNET_OK !=
1522           GNUNET_HELLO_get_key ((const struct GNUNET_HELLO_Message *) msg,
1523                                 &pkey))
1524         {
1525           GNUNET_break (0);
1526           break;
1527         }
1528       GNUNET_CRYPTO_hash (&pkey,
1529                           sizeof (struct
1530                                   GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
1531                           &me.hashPubKey);
1532 #if DEBUG_TRANSPORT
1533       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1534                   "Receiving (my own) `%s' message, I am `%4s'.\n",
1535                   "HELLO", GNUNET_i2s (&me));
1536 #endif
1537       GNUNET_free_non_null (h->my_hello);
1538       h->my_hello = NULL;
1539       if (size < sizeof (struct GNUNET_MessageHeader))
1540         {
1541           GNUNET_break (0);
1542           break;
1543         }
1544       h->my_hello = GNUNET_malloc (size);
1545       memcpy (h->my_hello, msg, size);
1546       while (NULL != (hwl = h->hwl_head))
1547         {
1548           h->hwl_head = hwl->next;
1549           GNUNET_SCHEDULER_cancel (h->sched, hwl->task);
1550           GNUNET_TRANSPORT_get_hello (h,
1551                                       GNUNET_TIME_UNIT_ZERO,
1552                                       hwl->rec, hwl->rec_cls);
1553           GNUNET_free (hwl);
1554         }
1555       break;
1556     case GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT:
1557       if (size != sizeof (struct ConnectInfoMessage))
1558         {
1559           GNUNET_break (0);
1560           break;
1561         }
1562       cim = (const struct ConnectInfoMessage *) msg;
1563 #if DEBUG_TRANSPORT
1564       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1565                   "Receiving `%s' message for `%4s'.\n",
1566                   "CONNECT", GNUNET_i2s (&cim->id));
1567 #endif
1568       add_neighbour (h,
1569                      ntohl (cim->quota_out),
1570                      GNUNET_TIME_relative_ntoh (cim->latency), &cim->id);
1571       break;
1572     case GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT:
1573       if (size != sizeof (struct DisconnectInfoMessage))
1574         {
1575           GNUNET_break (0);
1576           break;
1577         }
1578       dim = (const struct DisconnectInfoMessage *) msg;
1579 #if DEBUG_TRANSPORT
1580       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1581                   "Receiving `%s' message for `%4s'.\n",
1582                   "DISCONNECT", GNUNET_i2s (&dim->peer));
1583 #endif
1584       remove_neighbour (h, &dim->peer);
1585       break;
1586     case GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK:
1587       if (size != sizeof (struct SendOkMessage))
1588         {
1589           GNUNET_break (0);
1590           break;
1591         }
1592       okm = (const struct SendOkMessage *) msg;
1593 #if DEBUG_TRANSPORT
1594       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1595                   "Receiving `%s' message, transmission %s.\n", "SEND_OK",
1596                   ntohl(okm->success) == GNUNET_OK ? "succeeded" : "failed");
1597 #endif
1598       n = find_neighbour (h, &okm->peer);
1599       GNUNET_assert (n != NULL);
1600       n->transmit_ok = GNUNET_YES;
1601       if (n->transmit_handle != NULL)
1602         {
1603 #if DEBUG_TRANSPORT
1604           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1605                       "Processing pending message\n");
1606 #endif
1607           GNUNET_SCHEDULER_cancel (h->sched,
1608                                    n->transmit_handle->notify_delay_task);
1609           n->transmit_handle->notify_delay_task =
1610             GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1611           GNUNET_assert (GNUNET_YES == n->received_ack);
1612           schedule_request (n->transmit_handle);
1613         }
1614       break;
1615     case GNUNET_MESSAGE_TYPE_TRANSPORT_RECV:
1616 #if DEBUG_TRANSPORT
1617       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1618                   "Receiving `%s' message.\n", "RECV");
1619 #endif
1620       if (size <
1621           sizeof (struct InboundMessage) +
1622           sizeof (struct GNUNET_MessageHeader))
1623         {
1624           GNUNET_break (0);
1625           break;
1626         }
1627       im = (const struct InboundMessage *) msg;
1628       imm = (const struct GNUNET_MessageHeader *) &im[1];
1629       if (ntohs (imm->size) + sizeof (struct InboundMessage) != size)
1630         {
1631           GNUNET_break (0);
1632           break;
1633         }
1634       switch (ntohs (imm->type))
1635         {
1636         case GNUNET_MESSAGE_TYPE_TRANSPORT_ACK:
1637 #if DEBUG_TRANSPORT
1638           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1639                       "Receiving `%s' message from `%4s'.\n",
1640                       "ACK", GNUNET_i2s (&im->peer));
1641 #endif
1642           n = find_neighbour (h, &im->peer);
1643           if (n == NULL)
1644             {
1645               GNUNET_break (0);
1646               break;
1647             }
1648           if (n->received_ack == GNUNET_NO)
1649             {
1650               n->received_ack = GNUNET_YES;
1651               if (NULL != n->transmit_handle)
1652                 {
1653 #if DEBUG_TRANSPORT
1654                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1655                               "Peer connected, scheduling delayed message for deliverery now.\n");
1656 #endif
1657                   schedule_request (n->transmit_handle);
1658                 }
1659             }
1660           break;
1661         default:
1662 #if DEBUG_TRANSPORT
1663           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1664                       "Received message of type %u from `%4s'.\n",
1665                       ntohs (imm->type), GNUNET_i2s (&im->peer));
1666 #endif
1667           if (h->rec != NULL)
1668             h->rec (h->cls,
1669                     GNUNET_TIME_relative_ntoh (im->latency), &im->peer, imm);
1670           break;
1671         }
1672       break;
1673     default:
1674       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1675                   _
1676                   ("Received unexpected message of type %u in %s:%u\n"),
1677                   ntohs (msg->type), __FILE__, __LINE__);
1678       GNUNET_break (0);
1679       break;
1680     }
1681 }
1682
1683
1684 struct ClientTransmitWrapper
1685 {
1686   GNUNET_NETWORK_TransmitReadyNotify notify;
1687   void *notify_cls;
1688   struct GNUNET_TRANSPORT_TransmitHandle *th;
1689 };
1690
1691
1692 /**
1693  * Transmit message of a client destined for another
1694  * peer to the service.
1695  */
1696 static size_t
1697 client_notify_wrapper (void *cls, size_t size, void *buf)
1698 {
1699   struct ClientTransmitWrapper *ctw = cls;
1700   struct OutboundMessage *obm;
1701   struct GNUNET_MessageHeader *hdr;
1702   size_t ret;
1703
1704   if (size == 0)
1705     {
1706 #if DEBUG_TRANSPORT
1707       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1708                   "Transmission request could not be satisfied.\n");
1709 #endif
1710       ret = ctw->notify (ctw->notify_cls, 0, NULL);
1711       GNUNET_assert (ret == 0);
1712       GNUNET_free (ctw);
1713       return 0;
1714     }
1715   GNUNET_assert (size >= sizeof (struct OutboundMessage));
1716   obm = buf;
1717   ret = ctw->notify (ctw->notify_cls,
1718                      size - sizeof (struct OutboundMessage),
1719                      (void *) &obm[1]);
1720   if (ret == 0)
1721     {
1722       /* Need to reset flag, no SEND means no SEND_OK! */
1723       ctw->th->neighbour->transmit_ok = GNUNET_YES;
1724       GNUNET_free (ctw);
1725       return 0;
1726     }
1727   GNUNET_assert (ret >= sizeof (struct GNUNET_MessageHeader));
1728   hdr = (struct GNUNET_MessageHeader *) &obm[1];
1729   GNUNET_assert (ntohs (hdr->size) == ret);
1730   GNUNET_assert (ret + sizeof (struct OutboundMessage) <
1731                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
1732 #if DEBUG_TRANSPORT
1733   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1734               "Transmitting `%s' message with data for `%4s'\n",
1735               "SEND", GNUNET_i2s (&ctw->th->target));
1736 #endif
1737   ret += sizeof (struct OutboundMessage);
1738   obm->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND);
1739   obm->header.size = htons (ret);
1740   obm->reserved = htonl (0);
1741   obm->peer = ctw->th->target;
1742   GNUNET_free (ctw);
1743   return ret;
1744 }
1745
1746
1747
1748 /**
1749  * Check if we could queue a message of the given size for
1750  * transmission.  The transport service will take both its
1751  * internal buffers and bandwidth limits imposed by the
1752  * other peer into consideration when answering this query.
1753  *
1754  * @param handle connection to transport service
1755  * @param target who should receive the message
1756  * @param size how big is the message we want to transmit?
1757  * @param timeout after how long should we give up (and call
1758  *        notify with buf NULL and size 0)?
1759  * @param notify function to call when we are ready to
1760  *        send such a message
1761  * @param notify_cls closure for notify
1762  * @return NULL if someone else is already waiting to be notified
1763  *         non-NULL if the notify callback was queued (can be used to cancel
1764  *         using GNUNET_TRANSPORT_notify_transmit_ready_cancel)
1765  */
1766 struct GNUNET_TRANSPORT_TransmitHandle *
1767 GNUNET_TRANSPORT_notify_transmit_ready (struct GNUNET_TRANSPORT_Handle
1768                                         *handle,
1769                                         const struct GNUNET_PeerIdentity
1770                                         *target, size_t size,
1771                                         struct GNUNET_TIME_Relative timeout,
1772                                         GNUNET_NETWORK_TransmitReadyNotify
1773                                         notify, void *notify_cls)
1774 {
1775   struct GNUNET_TRANSPORT_TransmitHandle *pos;
1776   struct GNUNET_TRANSPORT_TransmitHandle *th;
1777   struct NeighbourList *n;
1778   struct ClientTransmitWrapper *ctw;
1779
1780   if (size + sizeof (struct OutboundMessage) >=
1781       GNUNET_SERVER_MAX_MESSAGE_SIZE)
1782     {
1783       GNUNET_break (0);
1784       return NULL;
1785     }
1786 #if DEBUG_TRANSPORT
1787   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1788               "Asking transport service for transmission of %u bytes to peer `%4s'.\n",
1789               size, GNUNET_i2s (target));
1790 #endif
1791   n = find_neighbour (handle, target);
1792   if (n->transmit_handle != NULL)
1793     return NULL; /* already have a request pending for this peer! */
1794   ctw = GNUNET_malloc (sizeof (struct ClientTransmitWrapper));
1795   th = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_TransmitHandle));
1796   ctw->notify = notify;
1797   ctw->notify_cls = notify_cls;
1798   ctw->th = th;
1799   th->handle = handle;
1800   th->target = *target;
1801   th->notify = &client_notify_wrapper;
1802   th->notify_cls = ctw;
1803   th->notify_size = size + sizeof (struct OutboundMessage);
1804   th->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1805   th->neighbour = n;
1806   if (NULL == n)
1807     {
1808       pos = handle->connect_wait_head;
1809       while (pos != NULL)
1810         {
1811           GNUNET_assert (0 != memcmp (target,
1812                                       &pos->target,
1813                                       sizeof (struct GNUNET_PeerIdentity)));
1814           pos = pos->next;
1815         }
1816 #if DEBUG_TRANSPORT
1817       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1818                   "Will now try to connect to `%4s'.\n", GNUNET_i2s (target));
1819 #endif
1820       try_connect (th);
1821       return th;
1822     }
1823
1824 #if DEBUG_TRANSPORT
1825   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1826               "Transmission request queued for transmission to transport service.\n");
1827 #endif
1828   GNUNET_assert (NULL == n->transmit_handle);
1829   n->transmit_handle = th;
1830   if (GNUNET_YES != n->received_ack)
1831     {
1832 #if DEBUG_TRANSPORT
1833       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1834                   "Connection to `%4s' is not yet confirmed connected, scheduling timeout (%llu ms) only.\n",
1835                   GNUNET_i2s (target), timeout.value);
1836 #endif
1837       th->notify_delay_task
1838         = GNUNET_SCHEDULER_add_delayed (handle->sched,
1839                                         GNUNET_NO,
1840                                         GNUNET_SCHEDULER_PRIORITY_KEEP,
1841                                         GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
1842                                         timeout, &transmit_timeout, th);
1843       return th;
1844     }
1845   
1846 #if DEBUG_TRANSPORT
1847   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1848               "Peer `%4s' is ready to receive, scheduling message for delivery now.\n",
1849               GNUNET_i2s (target));
1850 #endif
1851   schedule_request (th);
1852   return th;
1853 }
1854
1855
1856 /**
1857  * Cancel the specified transmission-ready
1858  * notification.
1859  */
1860 void
1861 GNUNET_TRANSPORT_notify_transmit_ready_cancel (struct
1862                                                GNUNET_TRANSPORT_TransmitHandle
1863                                                *th)
1864 {
1865   struct GNUNET_TRANSPORT_Handle *h;
1866
1867 #if DEBUG_TRANSPORT
1868   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1869               "Transmission request of %u bytes to `%4s' was cancelled.\n",
1870               th->notify_size - sizeof(struct OutboundMessage),
1871               GNUNET_i2s (&th->target));
1872 #endif
1873   GNUNET_assert (th->notify == &client_notify_wrapper);
1874   remove_from_any_list (th);
1875   h = th->handle;
1876   if ((h->connect_ready_head == NULL) && (h->network_handle != NULL))
1877     {
1878       GNUNET_NETWORK_notify_transmit_ready_cancel (h->network_handle);
1879       h->network_handle = NULL;
1880       h->transmission_scheduled = GNUNET_NO;
1881     }
1882   GNUNET_free (th->notify_cls);
1883   GNUNET_free (th);
1884 }
1885
1886
1887 /* end of transport_api.c */