ng
[oweals/gnunet.git] / src / transport / transport_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/transport_api.c
23  * @brief library to access the low-level P2P IO service
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - set_quota with low bandwidth should cause peer
28  *   disconnects (currently never does that) (MINOR)
29  */
30 #include "platform.h"
31 #include "gnunet_client_lib.h"
32 #include "gnunet_arm_service.h"
33 #include "gnunet_hello_lib.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_server_lib.h"
36 #include "gnunet_time_lib.h"
37 #include "gnunet_transport_service.h"
38 #include "transport.h"
39
40 /**
41  * After how long do we give up on transmitting a HELLO
42  * to the service?
43  */
44 #define OFFER_HELLO_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
45
46 /**
47  * How long should ARM wait when starting up the
48  * transport service before reporting back?
49  */
50 #define START_SERVICE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
51
52 /**
53  * How long should ARM wait when stopping the
54  * transport service before reporting back?
55  */
56 #define STOP_SERVICE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
57
58 /**
59  * Entry in linked list of all of our current neighbours.
60  */
61 struct NeighbourList
62 {
63
64   /**
65    * This is a linked list.
66    */
67   struct NeighbourList *next;
68
69   /**
70    * Active transmit handle, can be NULL.  Used to move
71    * from ready to wait list on disconnect and to block
72    * two transmissions to the same peer from being scheduled
73    * at the same time.
74    */
75   struct GNUNET_TRANSPORT_TransmitHandle *transmit_handle;
76
77
78   /**
79    * Identity of this neighbour.
80    */
81   struct GNUNET_PeerIdentity id;
82
83   /**
84    * At what time did we reset last_sent last?
85    */
86   struct GNUNET_TIME_Absolute last_quota_update;
87
88   /**
89    * How many bytes have we sent since the "last_quota_update"
90    * timestamp?
91    */
92   uint64_t last_sent;
93
94   /**
95    * Global quota for outbound traffic to the neighbour in bytes/ms.
96    */
97   uint32_t quota_out;
98
99   /**
100    * Set to GNUNET_YES if we are currently allowed to
101    * transmit a message to the transport service for this
102    * peer, GNUNET_NO otherwise.
103    */
104   int transmit_ok;
105
106   /**
107    * Set to GNUNET_YES if we have received an ACK for the
108    * given peer.  Peers that receive our HELLO always respond
109    * with an ACK to let us know that we are successfully
110    * communicating.  Note that a PING can not be used for this
111    * since PINGs are only send if a HELLO address requires
112    * confirmation (and also, PINGs are not passed to the
113    * transport API itself).
114    */
115   int received_ack;
116
117 };
118
119
120 /**
121  * Linked list of requests from clients for our HELLO
122  * that were deferred.
123  */
124 struct HelloWaitList
125 {
126
127   /**
128    * This is a linked list.
129    */
130   struct HelloWaitList *next;
131
132   /**
133    * Reference back to our transport handle.
134    */
135   struct GNUNET_TRANSPORT_Handle *handle;
136
137   /**
138    * Callback to call once we got our HELLO.
139    */
140   GNUNET_TRANSPORT_ReceiveCallback rec;
141
142   /**
143    * Closure for rec.
144    */
145   void *rec_cls;
146
147   /**
148    * When to time out (call rec with NULL).
149    */
150   struct GNUNET_TIME_Absolute timeout;
151
152   /**
153    * Timeout task (used to trigger timeout,
154    * cancel if we get the HELLO in time).
155    */
156   GNUNET_SCHEDULER_TaskIdentifier task;
157
158
159 };
160
161
162 /**
163  * Opaque handle for a transmission-ready request.
164  */
165 struct GNUNET_TRANSPORT_TransmitHandle
166 {
167
168   /**
169    * We keep the transmit handles that are waiting for
170    * a transport-level connection in a doubly linked list.
171    */
172   struct GNUNET_TRANSPORT_TransmitHandle *next;
173
174   /**
175    * We keep the transmit handles that are waiting for
176    * a transport-level connection in a doubly linked list.
177    */
178   struct GNUNET_TRANSPORT_TransmitHandle *prev;
179
180   /**
181    * Handle of the main transport data structure.
182    */
183   struct GNUNET_TRANSPORT_Handle *handle;
184
185   /**
186    * Neighbour for this handle, can be NULL if the service
187    * is not yet connected to the target.
188    */
189   struct NeighbourList *neighbour;
190
191   /**
192    * Which peer is this transmission going to be for?  All
193    * zeros if it is control-traffic to the service.
194    */
195   struct GNUNET_PeerIdentity target;
196
197   /**
198    * Function to call when notify_size bytes are available
199    * for transmission.
200    */
201   GNUNET_NETWORK_TransmitReadyNotify notify;
202
203   /**
204    * Closure for notify.
205    */
206   void *notify_cls;
207
208   /**
209    * transmit_ready task Id.  The task is used to introduce
210    * the artificial delay that may be required to maintain
211    * the bandwidth limits.
212    */
213   GNUNET_SCHEDULER_TaskIdentifier notify_delay_task;
214
215   /**
216    * Timeout for this request.
217    */
218   struct GNUNET_TIME_Absolute timeout;
219
220   /**
221    * How many bytes is our notify callback waiting for?
222    */
223   size_t notify_size;
224
225 };
226
227
228 /**
229  * Handle for the transport service (includes all of the
230  * state for the transport service).
231  */
232 struct GNUNET_TRANSPORT_Handle
233 {
234
235   /**
236    * Closure for the callbacks.
237    */
238   void *cls;
239
240   /**
241    * Function to call for received data.
242    */
243   GNUNET_TRANSPORT_ReceiveCallback rec;
244
245   /**
246    * function to call on connect events
247    */
248   GNUNET_TRANSPORT_NotifyConnect nc_cb;
249
250   /**
251    * function to call on disconnect events
252    */
253   GNUNET_TRANSPORT_NotifyDisconnect nd_cb;
254
255   /**
256    * The current HELLO message for this peer.  Updated
257    * whenever transports change their addresses.
258    */
259   struct GNUNET_HELLO_Message *my_hello;
260
261   /**
262    * My client connection to the transport service.
263    */
264   struct GNUNET_CLIENT_Connection *client;
265
266   /**
267    * Handle to our registration with the client for notification.
268    */
269   struct GNUNET_NETWORK_TransmitHandle *network_handle;
270
271   /**
272    * Linked list of transmit handles that are waiting for the
273    * transport to connect to the respective peer.  When we
274    * receive notification that the transport connected to a
275    * peer, we go over this list and check if someone has already
276    * requested a transmission to the new peer; if so, we trigger
277    * the next step.
278    */
279   struct GNUNET_TRANSPORT_TransmitHandle *connect_wait_head;
280
281   /**
282    * Linked list of transmit handles that are waiting for the
283    * transport to be ready for transmission to the respective
284    * peer.  When we
285    * receive notification that the transport disconnected from
286    * a peer, we go over this list and move the entry back to
287    * the connect_wait list.
288    */
289   struct GNUNET_TRANSPORT_TransmitHandle *connect_ready_head;
290
291   /**
292    * Linked list of pending requests for our HELLO.
293    */
294   struct HelloWaitList *hwl_head;
295
296   /**
297    * My scheduler.
298    */
299   struct GNUNET_SCHEDULER_Handle *sched;
300
301   /**
302    * My configuration.
303    */
304   struct GNUNET_CONFIGURATION_Handle *cfg;
305
306   /**
307    * Linked list of the current neighbours of this peer.
308    */
309   struct NeighbourList *neighbours;
310
311   /**
312    * ID of the task trying to reconnect to the
313    * service.
314    */
315   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
316
317   /**
318    * Delay until we try to reconnect.
319    */
320   struct GNUNET_TIME_Relative reconnect_delay;
321
322   /**
323    * Do we currently have a transmission pending?
324    * (schedule transmission was called but has not
325    * yet succeeded)?
326    */
327   int transmission_scheduled;
328 };
329
330
331 static struct NeighbourList *
332 find_neighbour (struct GNUNET_TRANSPORT_Handle *h,
333                 const struct GNUNET_PeerIdentity *peer)
334 {
335   struct NeighbourList *pos;
336
337   pos = h->neighbours;
338   while ((pos != NULL) &&
339          (0 != memcmp (peer, &pos->id, sizeof (struct GNUNET_PeerIdentity))))
340     pos = pos->next;
341   return pos;
342 }
343
344
345 /**
346  * Schedule the task to send one message from the
347  * connect_ready list to the service.
348  */
349 static void schedule_transmission (struct GNUNET_TRANSPORT_Handle *h);
350
351
352 /**
353  * Transmit message to client...
354  */
355 static size_t
356 transport_notify_ready (void *cls, size_t size, void *buf)
357 {
358   struct GNUNET_TRANSPORT_Handle *h = cls;
359   struct GNUNET_TRANSPORT_TransmitHandle *th;
360   struct NeighbourList *n;
361   size_t ret;
362   char *cbuf;
363
364 #if DEBUG_TRANSPORT
365   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
366               "Ready to transmit %u bytes to transport service\n", size);
367 #endif
368   h->network_handle = NULL;
369   h->transmission_scheduled = GNUNET_NO;
370   if (buf == NULL)
371     {
372       th = h->connect_ready_head;
373       if (th->next != NULL)
374         th->next->prev = NULL;
375       h->connect_ready_head = th->next;
376       if (NULL != (n = th->neighbour))
377         {
378           GNUNET_assert (n->transmit_handle == th);
379           n->transmit_handle = NULL;
380         }
381       GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL));
382       GNUNET_free (th);
383       return 0;
384     }
385   cbuf = buf;
386   ret = 0;
387   h->network_handle = NULL;
388   h->transmission_scheduled = GNUNET_NO;
389   do
390     {
391       th = h->connect_ready_head;
392       GNUNET_assert (th->notify_size <= size);
393       if (th->next != NULL)
394         th->next->prev = NULL;
395       h->connect_ready_head = th->next;
396       if (NULL != (n = th->neighbour))
397         {
398           GNUNET_assert (n->transmit_handle == th);
399           n->transmit_handle = NULL;
400         }
401       ret += th->notify (th->notify_cls, size, &cbuf[ret]);
402       GNUNET_free (th);
403       if (n != NULL)
404         n->last_sent += ret;
405       size -= ret;
406     }
407   while ((h->connect_ready_head != NULL) &&
408          (h->connect_ready_head->notify_size <= size));
409   if (h->connect_ready_head != NULL)
410     schedule_transmission (h);
411 #if DEBUG_TRANSPORT
412   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
413               "Transmitting %u bytes to transport service\n", ret);
414 #endif
415   return ret;
416 }
417
418
419 /**
420  * Schedule the task to send one message from the
421  * connect_ready list to the service.
422  */
423 static void
424 schedule_transmission (struct GNUNET_TRANSPORT_Handle *h)
425 {
426   struct GNUNET_TRANSPORT_TransmitHandle *th;
427
428   GNUNET_assert (NULL == h->network_handle);
429   if (h->client == NULL)
430     {
431 #if DEBUG_TRANSPORT
432       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
433                   "Not yet connected to transport service, need to wait.\n");
434 #endif
435       return;
436     }
437   th = h->connect_ready_head;
438   if (th == NULL)
439     {
440 #if DEBUG_TRANSPORT
441       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
442                   "Schedule transmission called, but no request is pending.\n");
443 #endif
444       return;
445     }
446   h->transmission_scheduled = GNUNET_YES;
447 #if DEBUG_TRANSPORT
448   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
449               "Asking client API for transmission of %u bytes\n",
450               th->notify_size);
451 #endif
452   h->network_handle = GNUNET_CLIENT_notify_transmit_ready (h->client,
453                                                            th->notify_size,
454                                                            GNUNET_TIME_absolute_get_remaining
455                                                            (th->timeout),
456                                                            &transport_notify_ready,
457                                                            h);
458   GNUNET_assert (NULL != h->network_handle);
459 }
460
461
462 /**
463  * Insert the given transmit handle in the given sorted
464  * doubly linked list based on timeout.
465  *
466  * @param head pointer to the head of the linked list
467  * @param th element to insert into the list
468  */
469 static void
470 insert_transmit_handle (struct GNUNET_TRANSPORT_TransmitHandle **head,
471                         struct GNUNET_TRANSPORT_TransmitHandle *th)
472 {
473   struct GNUNET_TRANSPORT_TransmitHandle *pos;
474   struct GNUNET_TRANSPORT_TransmitHandle *prev;
475
476   pos = *head;
477   prev = NULL;
478   while ((pos != NULL) && (pos->timeout.value < th->timeout.value))
479     {
480       prev = pos;
481       pos = pos->next;
482     }
483   if (prev == NULL)
484     {
485       th->next = *head;
486       if (th->next != NULL)
487         th->next->prev = th;
488       *head = th;
489     }
490   else
491     {
492       th->next = pos;
493       th->prev = prev;
494       prev->next = th;
495       if (pos != NULL)
496         pos->prev = th;
497     }
498 }
499
500
501 /**
502  * Queue control request for transmission to the transport
503  * service.
504  *
505  * @param size number of bytes to be transmitted
506  * @param at_head request must be added to the head of the queue
507  *        (otherwise request will be appended)
508  * @param timeout how long this transmission can wait (at most)
509  * @param notify function to call to get the content
510  * @param notify_cls closure for notify
511  */
512 static void
513 schedule_control_transmit (struct GNUNET_TRANSPORT_Handle *h,
514                            size_t size,
515                            int at_head,
516                            struct GNUNET_TIME_Relative timeout,
517                            GNUNET_NETWORK_TransmitReadyNotify notify,
518                            void *notify_cls)
519 {
520   struct GNUNET_TRANSPORT_TransmitHandle *th;
521 #if DEBUG_TRANSPORT
522   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
523               "Queueing %u bytes control transmission request.\n", size);
524 #endif
525   th = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_TransmitHandle));
526   th->handle = h;
527   th->notify = notify;
528   th->notify_cls = notify_cls;
529   th->timeout = GNUNET_TIME_relative_to_absolute (timeout);
530   th->notify_size = size;
531   if (at_head)
532     {
533       th->next = h->connect_ready_head;
534       h->connect_ready_head = th;
535       if (th->next != NULL)
536         th->next->prev = th;
537     }
538   else
539     {
540       insert_transmit_handle (&h->connect_ready_head, th);
541     }
542   if (GNUNET_NO == h->transmission_scheduled)
543     schedule_transmission (h);
544 }
545
546
547 /**
548  * Update the quota values for the given neighbour now.
549  */
550 static void
551 update_quota (struct NeighbourList *n)
552 {
553   struct GNUNET_TIME_Relative delta;
554   uint64_t allowed;
555   uint64_t remaining;
556
557   delta = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
558   allowed = delta.value * n->quota_out;
559   if (n->last_sent < allowed)
560     {
561       remaining = allowed - n->last_sent;
562       if (n->quota_out > 0)
563         remaining /= n->quota_out;
564       else
565         remaining = 0;
566       if (remaining > MAX_BANDWIDTH_CARRY)
567         remaining = MAX_BANDWIDTH_CARRY;
568       n->last_sent = 0;
569       n->last_quota_update = GNUNET_TIME_absolute_get ();
570       n->last_quota_update.value -= remaining;
571     }
572   else
573     {
574       n->last_sent -= allowed;
575       n->last_quota_update = GNUNET_TIME_absolute_get ();
576     }
577 }
578
579
580 struct SetQuotaContext
581 {
582   struct GNUNET_TRANSPORT_Handle *handle;
583
584   struct GNUNET_PeerIdentity target;
585
586   GNUNET_SCHEDULER_Task cont;
587
588   void *cont_cls;
589
590   struct GNUNET_TIME_Absolute timeout;
591
592   uint32_t quota_in;
593 };
594
595
596 static size_t
597 send_set_quota (void *cls, size_t size, void *buf)
598 {
599   struct SetQuotaContext *sqc = cls;
600   struct QuotaSetMessage *msg;
601
602   if (buf == NULL)
603     {
604       GNUNET_SCHEDULER_add_continuation (sqc->handle->sched,
605                                          GNUNET_NO,
606                                          sqc->cont,
607                                          sqc->cont_cls,
608                                          GNUNET_SCHEDULER_REASON_TIMEOUT);
609       GNUNET_free (sqc);
610       return 0;
611     }
612 #if DEBUG_TRANSPORT
613   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
614               "Transmitting `%s' request with respect to `%4s'.\n",
615               "SET_QUOTA", GNUNET_i2s (&sqc->target));
616 #endif
617   GNUNET_assert (size >= sizeof (struct QuotaSetMessage));
618   msg = buf;
619   msg->header.size = htons (sizeof (struct QuotaSetMessage));
620   msg->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
621   msg->quota_in = htonl (sqc->quota_in);
622   memcpy (&msg->peer, &sqc->target, sizeof (struct GNUNET_PeerIdentity));
623   if (sqc->cont != NULL)
624     GNUNET_SCHEDULER_add_continuation (sqc->handle->sched,
625                                        GNUNET_NO,
626                                        sqc->cont,
627                                        sqc->cont_cls,
628                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
629   GNUNET_free (sqc);
630   return sizeof (struct QuotaSetMessage);
631 }
632
633
634 /**
635  * Set the share of incoming bandwidth for the given
636  * peer to the specified amount.
637  *
638  * @param handle connection to transport service
639  * @param target who's bandwidth quota is being changed
640  * @param quota_in incoming bandwidth quota in bytes per ms; 0 can
641  *        be used to force all traffic to be discarded
642  * @param quota_out outgoing bandwidth quota in bytes per ms; 0 can
643  *        be used to force all traffic to be discarded
644  * @param timeout how long to wait until signaling failure if
645  *        we can not communicate the quota change
646  * @param cont continuation to call when done, will be called
647  *        either with reason "TIMEOUT" or with reason "PREREQ_DONE"
648  * @param cont_cls closure for continuation
649  */
650 void
651 GNUNET_TRANSPORT_set_quota (struct GNUNET_TRANSPORT_Handle *handle,
652                             const struct GNUNET_PeerIdentity *target,
653                             uint32_t quota_in,
654                             uint32_t quota_out,
655                             struct GNUNET_TIME_Relative timeout,
656                             GNUNET_SCHEDULER_Task cont, void *cont_cls)
657 {
658   struct NeighbourList *n;
659   struct SetQuotaContext *sqc;
660
661   n = find_neighbour (handle, target);
662   if (n != NULL)
663     {
664       update_quota (n);
665       if (n->quota_out < quota_out)
666         n->last_quota_update = GNUNET_TIME_absolute_get ();
667       n->quota_out = quota_out;
668     }
669   sqc = GNUNET_malloc (sizeof (struct SetQuotaContext));
670   sqc->handle = handle;
671   sqc->target = *target;
672   sqc->cont = cont;
673   sqc->cont_cls = cont_cls;
674   sqc->timeout = GNUNET_TIME_relative_to_absolute (timeout);
675   sqc->quota_in = quota_in;
676   schedule_control_transmit (handle,
677                              sizeof (struct QuotaSetMessage),
678                              GNUNET_NO, timeout, &send_set_quota, sqc);
679 }
680
681
682 /**
683  * A "get_hello" request has timed out.  Signal the client
684  * and clean up.
685  */
686 static void
687 hello_wait_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
688 {
689   struct HelloWaitList *hwl = cls;
690   struct HelloWaitList *pos;
691   struct HelloWaitList *prev;
692
693   prev = NULL;
694   pos = hwl->handle->hwl_head;
695   while (pos != hwl)
696     {
697       GNUNET_assert (pos != NULL);
698       prev = pos;
699       pos = pos->next;
700     }
701   if (prev == NULL)
702     hwl->handle->hwl_head = hwl->next;
703   else
704     prev->next = hwl->next;
705   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
706               _("Timeout trying to obtain `%s' from transport service.\n"),
707               "HELLO");
708   /* signal timeout */
709   if (hwl->rec != NULL)
710     hwl->rec (hwl->rec_cls, GNUNET_TIME_UNIT_ZERO, NULL, NULL);
711   GNUNET_free (hwl);
712 }
713
714
715 /**
716  * Obtain the HELLO message for this peer.
717  *
718  * @param handle connection to transport service
719  * @param timeout how long to wait for the HELLO
720  * @param rec function to call with the HELLO, sender will be our peer
721  *            identity; message and sender will be NULL on timeout
722  *            (handshake with transport service pending/failed).
723  *             cost estimate will be 0.
724  * @param rec_cls closure for rec
725  */
726 void
727 GNUNET_TRANSPORT_get_hello (struct GNUNET_TRANSPORT_Handle *handle,
728                             struct GNUNET_TIME_Relative timeout,
729                             GNUNET_TRANSPORT_ReceiveCallback rec,
730                             void *rec_cls)
731 {
732   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded pk;
733   struct GNUNET_PeerIdentity me;
734   struct HelloWaitList *hwl;
735
736   if (handle->my_hello == NULL)
737     {
738       hwl = GNUNET_malloc (sizeof (struct HelloWaitList));
739       hwl->next = handle->hwl_head;
740       handle->hwl_head = hwl;
741       hwl->handle = handle;
742       hwl->rec = rec;
743       hwl->rec_cls = rec_cls;
744       hwl->timeout = GNUNET_TIME_relative_to_absolute (timeout);
745       hwl->task = GNUNET_SCHEDULER_add_delayed (handle->sched,
746                                                 GNUNET_YES,
747                                                 GNUNET_SCHEDULER_PRIORITY_KEEP,
748                                                 GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
749                                                 timeout,
750                                                 &hello_wait_timeout, hwl);
751       return;
752     }
753   GNUNET_assert (GNUNET_OK == GNUNET_HELLO_get_key (handle->my_hello, &pk));
754   GNUNET_CRYPTO_hash (&pk,
755                       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
756                       &me.hashPubKey);
757
758   rec (rec_cls,
759        GNUNET_TIME_UNIT_ZERO,
760        &me, (const struct GNUNET_MessageHeader *) handle->my_hello);
761 }
762
763
764 static size_t
765 send_hello (void *cls, size_t size, void *buf)
766 {
767   struct GNUNET_MessageHeader *hello = cls;
768   uint16_t msize;
769
770   if (buf == NULL)
771     {
772 #if DEBUG_TRANSPORT
773       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
774                   "Timeout while trying to transmit `%s' request.\n",
775                   "HELLO");
776 #endif
777       GNUNET_free (hello);
778       return 0;
779     }
780 #if DEBUG_TRANSPORT
781   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
782               "Transmitting `%s' request.\n", "HELLO");
783 #endif
784   msize = ntohs (hello->size);
785   GNUNET_assert (size >= msize);
786   memcpy (buf, hello, msize);
787   GNUNET_free (hello);
788   return msize;
789 }
790
791
792 /**
793  * Offer the transport service the HELLO of another peer.  Note that
794  * the transport service may just ignore this message if the HELLO is
795  * malformed or useless due to our local configuration.
796  *
797  * @param handle connection to transport service
798  * @param hello the hello message
799  */
800 void
801 GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle,
802                               const struct GNUNET_MessageHeader *hello)
803 {
804   struct GNUNET_MessageHeader *hc;
805   uint16_t size;
806
807   if (handle->client == NULL)
808     {
809 #if DEBUG_TRANSPORT
810       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
811                   "Not connected to transport service, dropping offered HELLO\n");
812 #endif
813       return;
814     }
815   GNUNET_break (ntohs (hello->type) == GNUNET_MESSAGE_TYPE_HELLO);
816   size = ntohs (hello->size);
817   GNUNET_break (size >= sizeof (struct GNUNET_MessageHeader));
818   hc = GNUNET_malloc (size);
819   memcpy (hc, hello, size);
820   schedule_control_transmit (handle,
821                              size,
822                              GNUNET_NO, OFFER_HELLO_TIMEOUT, &send_hello, hc);
823 }
824
825
826 /**
827  * Function we use for handling incoming messages.
828  */
829 static void demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg);
830
831
832 static size_t
833 send_start (void *cls, size_t size, void *buf)
834 {
835   struct GNUNET_MessageHeader *s = buf;
836
837   if (buf == NULL)
838     return 0;
839 #if DEBUG_TRANSPORT
840   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
841               "Transmitting `%s' request.\n", "START");
842 #endif
843   GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader));
844   s->size = htons (sizeof (struct GNUNET_MessageHeader));
845   s->type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_START);
846   return sizeof (struct GNUNET_MessageHeader);
847 }
848
849
850 /**
851  * Try again to connect to transport service.
852  */
853 static void
854 reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
855 {
856   struct GNUNET_TRANSPORT_Handle *h = cls;
857   struct GNUNET_TRANSPORT_TransmitHandle *pos;
858   struct NeighbourList *n;
859
860   while (NULL != (n = h->neighbours))
861     {
862       h->neighbours = n->next;
863       pos = n->transmit_handle;
864       if (pos != NULL)
865         {
866           pos->neighbour = NULL;
867           pos->next = h->connect_wait_head;
868           h->connect_wait_head = pos;
869           if (pos->next != NULL)
870             pos->next->prev = pos;
871           pos->prev = NULL;
872         }
873       GNUNET_free (n);
874     }
875   h->connect_ready_head = NULL;
876 #if DEBUG_TRANSPORT
877   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service.\n");
878 #endif
879   GNUNET_assert (h->client == NULL);
880   h->reconnect_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
881   h->client = GNUNET_CLIENT_connect (h->sched, "transport", h->cfg);
882   GNUNET_assert (h->client != NULL);
883   /* make sure we don't send "START" twice,
884      remove existing entry from queue (if present) */
885   pos = h->connect_ready_head;
886   while (pos != NULL)
887     {
888       if (pos->notify == &send_start)
889         {
890           if (pos->prev == NULL)
891             h->connect_ready_head = pos->next;
892           else
893             pos->prev->next = pos->next;
894           if (pos->next != NULL)
895             pos->next->prev = pos->prev;
896           GNUNET_assert (pos->neighbour == NULL);
897           GNUNET_free (pos);
898           break;
899         }
900       pos = pos->next;
901     }
902   schedule_control_transmit (h,
903                              sizeof (struct GNUNET_MessageHeader),
904                              GNUNET_YES,
905                              GNUNET_TIME_UNIT_FOREVER_REL, &send_start, NULL);
906   GNUNET_CLIENT_receive (h->client,
907                          &demultiplexer, h, GNUNET_TIME_UNIT_FOREVER_REL);
908 }
909
910
911 /**
912  * Function that will schedule the job that will try
913  * to connect us again to the client.
914  */
915 static void
916 schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h)
917 {
918 #if DEBUG_TRANSPORT
919   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
920               "Scheduling task to reconnect to transport service in %llu ms.\n",
921               h->reconnect_delay.value);
922 #endif
923   GNUNET_assert (h->client == NULL);
924   GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_PREREQUISITE_TASK);
925   h->reconnect_task
926     = GNUNET_SCHEDULER_add_delayed (h->sched,
927                                     GNUNET_NO,
928                                     GNUNET_SCHEDULER_PRIORITY_DEFAULT,
929                                     GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
930                                     h->reconnect_delay, &reconnect, h);
931   h->reconnect_delay = GNUNET_TIME_UNIT_SECONDS;
932 }
933
934
935 /**
936  * Remove the given transmit handle from the wait list.  Does NOT free
937  * it.
938  */
939 static void
940 remove_from_wait_list (struct GNUNET_TRANSPORT_TransmitHandle *th)
941 {
942   if (th->prev == NULL)
943     th->handle->connect_wait_head = th->next;
944   else
945     th->prev->next = th->next;
946   if (th->next != NULL)
947     th->next->prev = th->prev;
948 }
949
950
951 /**
952  * We are connected to the respective peer, check the
953  * bandwidth limits and schedule the transmission.
954  */
955 static void schedule_request (struct GNUNET_TRANSPORT_TransmitHandle *th);
956
957
958 /**
959  * Function called by the scheduler when the timeout
960  * for bandwidth availablility for the target
961  * neighbour is reached.
962  */
963 static void
964 transmit_ready (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
965 {
966   struct GNUNET_TRANSPORT_TransmitHandle *th = cls;
967
968   th->notify_delay_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
969   schedule_request (th);
970 }
971
972
973 /**
974  * Called when our transmit request timed out before any transport
975  * reported success connecting to the desired peer or before the
976  * transport was ready to receive.  Signal error and free
977  * TransmitHandle.
978  */
979 static void
980 transmit_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
981 {
982   struct GNUNET_TRANSPORT_TransmitHandle *th = cls;
983
984   if (th->neighbour != NULL)
985     th->neighbour->transmit_handle = NULL;
986 #if DEBUG_TRANSPORT
987   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission request timed out.\n");
988 #endif
989   th->notify_delay_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
990   remove_from_wait_list (th);
991   th->notify (th->notify_cls, 0, NULL);
992   GNUNET_free (th);
993 }
994
995
996 /**
997  * We are connected to the respective peer, check the
998  * bandwidth limits and schedule the transmission.
999  */
1000 static void
1001 schedule_request (struct GNUNET_TRANSPORT_TransmitHandle *th)
1002 {
1003   struct GNUNET_TRANSPORT_Handle *h;
1004   struct GNUNET_TIME_Relative duration;
1005   struct NeighbourList *n;
1006   uint64_t available;
1007
1008   h = th->handle;
1009   n = th->neighbour;
1010   if (th->notify_delay_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
1011     {
1012       GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task);
1013       th->notify_delay_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1014     }
1015   /* check outgoing quota */
1016   duration = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
1017   if (duration.value > MIN_QUOTA_REFRESH_TIME)
1018     {
1019       update_quota (n);
1020       duration = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
1021     }
1022   available = duration.value * n->quota_out;
1023   if (available < n->last_sent + th->notify_size)
1024     {
1025       /* calculate how much bandwidth we'd still need to
1026          accumulate and based on that how long we'll have
1027          to wait... */
1028       available = n->last_sent + th->notify_size - available;
1029       duration = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
1030                                                 available / n->quota_out);
1031       if (th->timeout.value <
1032           GNUNET_TIME_relative_to_absolute (duration).value)
1033         {
1034           /* signal timeout! */
1035 #if DEBUG_TRANSPORT
1036           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1037                       "Would need %llu ms before bandwidth is available for delivery, that is too long.  Signaling timeout.\n",
1038                       duration.value);
1039 #endif
1040           remove_from_wait_list (th);
1041           th->notify (th->notify_cls, 0, NULL);
1042           GNUNET_free (th);
1043           return;
1044         }
1045 #if DEBUG_TRANSPORT
1046       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1047                   "Need more bandwidth, delaying delivery by %llu ms\n",
1048                   duration.value);
1049 #endif
1050       th->notify_delay_task
1051         = GNUNET_SCHEDULER_add_delayed (h->sched,
1052                                         GNUNET_NO,
1053                                         GNUNET_SCHEDULER_PRIORITY_KEEP,
1054                                         GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
1055                                         duration, &transmit_ready, th);
1056       return;
1057     }
1058 #if DEBUG_TRANSPORT
1059   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1060               "Bandwidth available for transmission to `%4s'\n",
1061               GNUNET_i2s (&n->id));
1062 #endif
1063   if (GNUNET_NO == n->transmit_ok)
1064     {
1065       /* we may be ready, but transport service is not;
1066          wait for SendOkMessage or timeout */
1067 #if DEBUG_TRANSPORT
1068       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1069                   "Need to wait for transport service `%s' message\n",
1070                   "SEND_OK");
1071 #endif
1072       th->notify_delay_task
1073         = GNUNET_SCHEDULER_add_delayed (h->sched,
1074                                         GNUNET_NO,
1075                                         GNUNET_SCHEDULER_PRIORITY_KEEP,
1076                                         GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
1077                                         GNUNET_TIME_absolute_get_remaining
1078                                         (th->timeout), &transmit_timeout, th);
1079       return;
1080     }
1081   n->transmit_ok = GNUNET_NO;
1082   remove_from_wait_list (th);
1083 #if DEBUG_TRANSPORT
1084   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Moving message to ready list\n");
1085 #endif
1086   insert_transmit_handle (&h->connect_ready_head, th);
1087   if (GNUNET_NO == h->transmission_scheduled)
1088     schedule_transmission (h);
1089 }
1090
1091
1092 /**
1093  * Add neighbour to our list
1094  */
1095 static void
1096 add_neighbour (struct GNUNET_TRANSPORT_Handle *h,
1097                uint32_t quota_out,
1098                struct GNUNET_TIME_Relative latency,
1099                const struct GNUNET_PeerIdentity *pid)
1100 {
1101   struct NeighbourList *n;
1102   struct GNUNET_TRANSPORT_TransmitHandle *prev;
1103   struct GNUNET_TRANSPORT_TransmitHandle *pos;
1104   struct GNUNET_TRANSPORT_TransmitHandle *next;
1105
1106   /* check for duplicates */
1107   if (NULL != find_neighbour (h, pid))
1108     {
1109       GNUNET_break (0);
1110       return;
1111     }
1112 #if DEBUG_TRANSPORT
1113   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1114               "Creating entry for new neighbour `%4s'.\n", GNUNET_i2s (pid));
1115 #endif
1116   n = GNUNET_malloc (sizeof (struct NeighbourList));
1117   n->id = *pid;
1118   n->last_quota_update = GNUNET_TIME_absolute_get ();
1119   n->quota_out = quota_out;
1120   n->next = h->neighbours;
1121   n->transmit_ok = GNUNET_YES;
1122   h->neighbours = n;
1123   if (h->nc_cb != NULL)
1124     h->nc_cb (h->cls, &n->id, latency);
1125   prev = NULL;
1126   pos = h->connect_wait_head;
1127   while (pos != NULL)
1128     {
1129       next = pos->next;
1130 #if DEBUG_TRANSPORT
1131       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1132                   "Found entry in connect_wait_head for `%4s'.\n",
1133                   GNUNET_i2s (&pos->target));
1134 #endif
1135       if (0 == memcmp (pid,
1136                        &pos->target, sizeof (struct GNUNET_PeerIdentity)))
1137         {
1138 #if DEBUG_TRANSPORT
1139           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1140                       "Found pending request for new connection, will trigger now.\n");
1141 #endif
1142           pos->neighbour = n;
1143           if (pos->notify_delay_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
1144             {
1145               GNUNET_SCHEDULER_cancel (h->sched, pos->notify_delay_task);
1146               pos->notify_delay_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1147             }
1148           GNUNET_assert (NULL == n->transmit_handle);
1149           n->transmit_handle = pos;
1150           if (GNUNET_YES == n->received_ack)
1151             {
1152 #if DEBUG_TRANSPORT
1153               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1154                           "`%s' already received, scheduling request\n",
1155                           "ACK");
1156 #endif
1157               schedule_request (pos);
1158             }
1159           else
1160             {
1161 #if DEBUG_TRANSPORT
1162               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1163                           "Still need to wait to receive `%s' message\n",
1164                           "ACK");
1165 #endif
1166               pos->notify_delay_task
1167                 = GNUNET_SCHEDULER_add_delayed (h->sched,
1168                                                 GNUNET_NO,
1169                                                 GNUNET_SCHEDULER_PRIORITY_KEEP,
1170                                                 GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
1171                                                 GNUNET_TIME_absolute_get_remaining
1172                                                 (pos->timeout),
1173                                                 &transmit_timeout, pos);
1174             }
1175           if (prev == NULL)
1176             h->connect_wait_head = next;
1177           else
1178             prev->next = next;
1179           break;
1180         }
1181       prev = pos;
1182       pos = next;
1183     }
1184 }
1185
1186
1187 /**
1188  * Connect to the transport service.  Note that the connection may
1189  * complete (or fail) asynchronously.
1190  *
1191
1192  * @param sched scheduler to use
1193  * @param cfg configuration to use
1194  * @param cls closure for the callbacks
1195  * @param rec receive function to call
1196  * @param nc function to call on connect events
1197  * @param dc function to call on disconnect events
1198  */
1199 struct GNUNET_TRANSPORT_Handle *
1200 GNUNET_TRANSPORT_connect (struct GNUNET_SCHEDULER_Handle *sched,
1201                           struct GNUNET_CONFIGURATION_Handle *cfg,
1202                           void *cls,
1203                           GNUNET_TRANSPORT_ReceiveCallback rec,
1204                           GNUNET_TRANSPORT_NotifyConnect nc,
1205                           GNUNET_TRANSPORT_NotifyDisconnect nd)
1206 {
1207   struct GNUNET_TRANSPORT_Handle *ret;
1208
1209   GNUNET_ARM_start_service ("peerinfo",
1210                             cfg, sched, START_SERVICE_TIMEOUT, NULL, NULL);
1211   GNUNET_ARM_start_service ("transport",
1212                             cfg, sched, START_SERVICE_TIMEOUT, NULL, NULL);
1213   ret = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_Handle));
1214   ret->sched = sched;
1215   ret->cfg = cfg;
1216   ret->cls = cls;
1217   ret->rec = rec;
1218   ret->nc_cb = nc;
1219   ret->nd_cb = nd;
1220   ret->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
1221   schedule_reconnect (ret);
1222   return ret;
1223 }
1224
1225
1226 /**
1227  * These stop activities must be run in a fresh
1228  * scheduler that is NOT in shutdown mode.
1229  */
1230 static void
1231 stop_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1232 {
1233   struct GNUNET_TRANSPORT_Handle *handle = cls;
1234   GNUNET_ARM_stop_service ("transport",
1235                            handle->cfg,
1236                            tc->sched, STOP_SERVICE_TIMEOUT, NULL, NULL);
1237   GNUNET_ARM_stop_service ("peerinfo",
1238                            handle->cfg,
1239                            tc->sched, STOP_SERVICE_TIMEOUT, NULL, NULL);
1240 }
1241
1242
1243 /**
1244  * Disconnect from the transport service.
1245  */
1246 void
1247 GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle)
1248 {
1249   struct GNUNET_TRANSPORT_TransmitHandle *th;
1250   struct NeighbourList *n;
1251   struct HelloWaitList *hwl;
1252   struct GNUNET_CLIENT_Connection *client;
1253
1254 #if DEBUG_TRANSPORT
1255   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transport disconnect called!\n");
1256 #endif
1257   while (NULL != (th = handle->connect_ready_head))
1258     {
1259       handle->connect_ready_head = th->next;
1260       th->notify (th->notify_cls, 0, NULL);
1261       GNUNET_free (th);
1262     }
1263
1264   while (NULL != (th = handle->connect_wait_head))
1265     {
1266       handle->connect_wait_head = th->next;
1267       if (th->notify_delay_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
1268         {
1269           GNUNET_SCHEDULER_cancel (handle->sched, th->notify_delay_task);
1270           th->notify_delay_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1271         }
1272       th->notify (th->notify_cls, 0, NULL);
1273       GNUNET_free (th);
1274     }
1275   while (NULL != (n = handle->neighbours))
1276     {
1277       handle->neighbours = n->next;
1278       GNUNET_free (n);
1279     }
1280   while (NULL != (hwl = handle->hwl_head))
1281     {
1282       handle->hwl_head = hwl->next;
1283       GNUNET_SCHEDULER_cancel (handle->sched, hwl->task);
1284       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1285                   _
1286                   ("Disconnect while trying to obtain HELLO from transport service.\n"));
1287       if (hwl->rec != NULL)
1288         hwl->rec (hwl->rec_cls, GNUNET_TIME_UNIT_ZERO, NULL, NULL);
1289       GNUNET_free (hwl);
1290     }
1291   if (handle->reconnect_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
1292     {
1293       GNUNET_SCHEDULER_cancel (handle->sched, handle->reconnect_task);
1294       handle->reconnect_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1295     }
1296   GNUNET_free_non_null (handle->my_hello);
1297   handle->my_hello = NULL;
1298   GNUNET_SCHEDULER_run (&stop_task, handle);
1299   if (NULL != (client = handle->client))
1300     {
1301 #if DEBUG_TRANSPORT
1302       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1303                   "Disconnecting from transport service for good.\n");
1304 #endif
1305       handle->client = NULL;
1306       GNUNET_CLIENT_disconnect (client);
1307     }
1308   if (client == NULL)
1309     GNUNET_free (handle);
1310 }
1311
1312
1313 /**
1314  * We're ready to transmit the request that the transport service
1315  * should connect to a new peer.  In addition to sending the
1316  * request, schedule the next phase for the transmission processing
1317  * that caused the connect request in the first place.
1318  */
1319 static size_t
1320 request_connect (void *cls, size_t size, void *buf)
1321 {
1322   struct GNUNET_TRANSPORT_TransmitHandle *th = cls;
1323   struct TryConnectMessage *tcm;
1324   struct GNUNET_TRANSPORT_Handle *h;
1325
1326   h = th->handle;
1327   if (buf == NULL)
1328     {
1329 #if DEBUG_TRANSPORT
1330       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1331                   "Failed to transmit connect request to service.\n");
1332 #endif
1333       th->notify (th->notify_cls, 0, NULL);
1334       GNUNET_free (th);
1335       return 0;
1336     }
1337 #if DEBUG_TRANSPORT
1338   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1339               "Transmitting `%s' message for `%4s'.\n",
1340               "TRY_CONNECT", GNUNET_i2s (&th->target));
1341 #endif
1342   GNUNET_assert (size >= sizeof (struct TryConnectMessage));
1343   tcm = buf;
1344   tcm->header.size = htons (sizeof (struct TryConnectMessage));
1345   tcm->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_TRY_CONNECT);
1346   tcm->reserved = htonl (0);
1347   memcpy (&tcm->peer, &th->target, sizeof (struct GNUNET_PeerIdentity));
1348   th->notify_delay_task
1349     = GNUNET_SCHEDULER_add_delayed (h->sched,
1350                                     GNUNET_NO,
1351                                     GNUNET_SCHEDULER_PRIORITY_KEEP,
1352                                     GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
1353                                     GNUNET_TIME_absolute_get_remaining (th->
1354                                                                         timeout),
1355                                     &transmit_timeout, th);
1356   insert_transmit_handle (&h->connect_wait_head, th);
1357   return sizeof (struct TryConnectMessage);
1358 }
1359
1360
1361 /**
1362  * Schedule a request to connect to the given
1363  * neighbour (and if successful, add the specified
1364  * handle to the wait list).
1365  */
1366 static void
1367 try_connect (struct GNUNET_TRANSPORT_TransmitHandle *th)
1368 {
1369   schedule_control_transmit (th->handle,
1370                              sizeof (struct TryConnectMessage),
1371                              GNUNET_NO,
1372                              GNUNET_TIME_absolute_get_remaining (th->timeout),
1373                              &request_connect, th);
1374 }
1375
1376
1377 /**
1378  * Cancel a pending notify transmit task
1379  * and also remove the given transmit handle
1380  * from whatever list is on.
1381  */
1382 static void
1383 remove_from_any_list (struct GNUNET_TRANSPORT_TransmitHandle *th)
1384 {
1385   struct GNUNET_TRANSPORT_Handle *h;
1386
1387   h = th->handle;
1388   if (th->notify_delay_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
1389     {
1390       GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task);
1391       th->notify_delay_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1392     }
1393   if (th->prev == NULL)
1394     {
1395       if (th == h->connect_wait_head)
1396         h->connect_wait_head = th->next;
1397       else
1398         h->connect_ready_head = th->next;
1399     }
1400   else
1401     th->prev->next = th->next;
1402   if (th->next != NULL)
1403     th->next->prev = th->prev;
1404 }
1405
1406
1407 /**
1408  * Remove neighbour from our list
1409  */
1410 static void
1411 remove_neighbour (struct GNUNET_TRANSPORT_Handle *h,
1412                   const struct GNUNET_PeerIdentity *peer)
1413 {
1414   struct NeighbourList *prev;
1415   struct NeighbourList *pos;
1416   struct GNUNET_TRANSPORT_TransmitHandle *th;
1417
1418   prev = NULL;
1419   pos = h->neighbours;
1420   while ((pos != NULL) &&
1421          (0 != memcmp (peer, &pos->id, sizeof (struct GNUNET_PeerIdentity))))
1422     {
1423       prev = pos;
1424       pos = pos->next;
1425     }
1426   if (pos == NULL)
1427     {
1428       GNUNET_break (0);
1429       return;
1430     }
1431   if (prev == NULL)
1432     h->neighbours = pos->next;
1433   else
1434     prev->next = pos->next;
1435   if (NULL != (th = pos->transmit_handle))
1436     {
1437       pos->transmit_handle = NULL;
1438       th->neighbour = NULL;
1439       remove_from_any_list (th);
1440       try_connect (th);
1441     }
1442   if (h->nc_cb != NULL)
1443     h->nd_cb (h->cls, peer);
1444   GNUNET_free (pos);
1445 }
1446
1447
1448 /**
1449  * Type of a function to call when we receive a message
1450  * from the service.
1451  *
1452  * @param cls closure
1453  * @param msg message received, NULL on timeout or fatal error
1454  */
1455 static void
1456 demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg)
1457 {
1458   struct GNUNET_TRANSPORT_Handle *h = cls;
1459   const struct DisconnectInfoMessage *dim;
1460   const struct ConnectInfoMessage *cim;
1461   const struct InboundMessage *im;
1462   const struct GNUNET_MessageHeader *imm;
1463   const struct SendOkMessage *okm;
1464   struct HelloWaitList *hwl;
1465   struct NeighbourList *n;
1466   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded pkey;
1467   struct GNUNET_PeerIdentity me;
1468   uint16_t size;
1469
1470   if ((msg == NULL) || (h->client == NULL))
1471     {
1472       if (h->client != NULL)
1473         {
1474 #if DEBUG_TRANSPORT
1475           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1476                       "Error receiving from transport service, disconnecting temporarily.\n");
1477 #endif
1478           if (h->network_handle != NULL)
1479             {
1480               GNUNET_NETWORK_notify_transmit_ready_cancel (h->network_handle);
1481               h->network_handle = NULL;
1482               h->transmission_scheduled = GNUNET_NO;
1483             }
1484           GNUNET_CLIENT_disconnect (h->client);
1485           h->client = NULL;
1486           schedule_reconnect (h);
1487         }
1488       else
1489         {
1490           /* shutdown initiated from 'GNUNET_TRANSPORT_disconnect',
1491              finish clean up work! */
1492           GNUNET_free (h);
1493         }
1494       return;
1495     }
1496   GNUNET_CLIENT_receive (h->client,
1497                          &demultiplexer, h, GNUNET_TIME_UNIT_FOREVER_REL);
1498   size = ntohs (msg->size);
1499   switch (ntohs (msg->type))
1500     {
1501     case GNUNET_MESSAGE_TYPE_HELLO:
1502       if (GNUNET_OK !=
1503           GNUNET_HELLO_get_key ((const struct GNUNET_HELLO_Message *) msg,
1504                                 &pkey))
1505         {
1506           GNUNET_break (0);
1507           break;
1508         }
1509       GNUNET_CRYPTO_hash (&pkey,
1510                           sizeof (struct
1511                                   GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
1512                           &me.hashPubKey);
1513 #if DEBUG_TRANSPORT
1514       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1515                   "Receiving (my own) `%s' message, I am `%4s'.\n",
1516                   "HELLO", GNUNET_i2s (&me));
1517 #endif
1518       GNUNET_free_non_null (h->my_hello);
1519       h->my_hello = NULL;
1520       if (size < sizeof (struct GNUNET_MessageHeader))
1521         {
1522           GNUNET_break (0);
1523           break;
1524         }
1525       h->my_hello = GNUNET_malloc (size);
1526       memcpy (h->my_hello, msg, size);
1527       while (NULL != (hwl = h->hwl_head))
1528         {
1529           h->hwl_head = hwl->next;
1530           GNUNET_SCHEDULER_cancel (h->sched, hwl->task);
1531           GNUNET_TRANSPORT_get_hello (h,
1532                                       GNUNET_TIME_UNIT_ZERO,
1533                                       hwl->rec, hwl->rec_cls);
1534           GNUNET_free (hwl);
1535         }
1536       break;
1537     case GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT:
1538       if (size != sizeof (struct ConnectInfoMessage))
1539         {
1540           GNUNET_break (0);
1541           break;
1542         }
1543       cim = (const struct ConnectInfoMessage *) msg;
1544 #if DEBUG_TRANSPORT
1545       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1546                   "Receiving `%s' message for `%4s'.\n",
1547                   "CONNECT", GNUNET_i2s (&cim->id));
1548 #endif
1549       add_neighbour (h,
1550                      ntohl (cim->quota_out),
1551                      GNUNET_TIME_relative_ntoh (cim->latency), &cim->id);
1552       break;
1553     case GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT:
1554       if (size != sizeof (struct DisconnectInfoMessage))
1555         {
1556           GNUNET_break (0);
1557           break;
1558         }
1559       dim = (const struct DisconnectInfoMessage *) msg;
1560 #if DEBUG_TRANSPORT
1561       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1562                   "Receiving `%s' message for `%4s'.\n",
1563                   "DISCONNECT", GNUNET_i2s (&dim->peer));
1564 #endif
1565       remove_neighbour (h, &dim->peer);
1566       break;
1567     case GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK:
1568 #if DEBUG_TRANSPORT
1569       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1570                   "Receiving `%s' message.\n", "SEND_OK");
1571 #endif
1572       if (size != sizeof (struct SendOkMessage))
1573         {
1574           GNUNET_break (0);
1575           break;
1576         }
1577       okm = (const struct SendOkMessage *) msg;
1578       n = find_neighbour (h, &okm->peer);
1579       GNUNET_assert (n != NULL);
1580       n->transmit_ok = GNUNET_YES;
1581       if (n->transmit_handle != NULL)
1582         {
1583 #if DEBUG_TRANSPORT
1584           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1585                       "Processing pending message\n");
1586 #endif
1587           GNUNET_SCHEDULER_cancel (h->sched,
1588                                    n->transmit_handle->notify_delay_task);
1589           n->transmit_handle->notify_delay_task =
1590             GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1591           GNUNET_assert (GNUNET_YES == n->received_ack);
1592           schedule_request (n->transmit_handle);
1593         }
1594       break;
1595     case GNUNET_MESSAGE_TYPE_TRANSPORT_RECV:
1596 #if DEBUG_TRANSPORT
1597       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1598                   "Receiving `%s' message.\n", "RECV");
1599 #endif
1600       if (size <
1601           sizeof (struct InboundMessage) +
1602           sizeof (struct GNUNET_MessageHeader))
1603         {
1604           GNUNET_break (0);
1605           break;
1606         }
1607       im = (const struct InboundMessage *) msg;
1608       imm = (const struct GNUNET_MessageHeader *) &im[1];
1609       if (ntohs (imm->size) + sizeof (struct InboundMessage) != size)
1610         {
1611           GNUNET_break (0);
1612           break;
1613         }
1614       switch (ntohs (imm->type))
1615         {
1616         case GNUNET_MESSAGE_TYPE_TRANSPORT_ACK:
1617 #if DEBUG_TRANSPORT
1618           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1619                       "Receiving `%s' message from `%4s'.\n",
1620                       "ACK", GNUNET_i2s (&im->peer));
1621 #endif
1622           n = find_neighbour (h, &im->peer);
1623           if (n == NULL)
1624             {
1625               GNUNET_break (0);
1626               break;
1627             }
1628           if (n->received_ack == GNUNET_NO)
1629             {
1630               n->received_ack = GNUNET_YES;
1631               if (NULL != n->transmit_handle)
1632                 {
1633 #if DEBUG_TRANSPORT
1634                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1635                               "Peer connected, scheduling delayed message for deliverery now.\n");
1636 #endif
1637                   schedule_request (n->transmit_handle);
1638                 }
1639             }
1640           break;
1641         default:
1642 #if DEBUG_TRANSPORT
1643           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1644                       "Received message of type %u from `%4s'.\n",
1645                       ntohs (imm->type), GNUNET_i2s (&im->peer));
1646 #endif
1647           if (h->rec != NULL)
1648             h->rec (h->cls,
1649                     GNUNET_TIME_relative_ntoh (im->latency), &im->peer, imm);
1650           break;
1651         }
1652       break;
1653     default:
1654       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1655                   _
1656                   ("Received unexpected message of type %u from `%4s' in %s:%u\n"),
1657                   ntohs (msg->type), GNUNET_i2s (&im->peer), __FILE__,
1658                   __LINE__);
1659       GNUNET_break (0);
1660       break;
1661     }
1662 }
1663
1664
1665 struct ClientTransmitWrapper
1666 {
1667   GNUNET_NETWORK_TransmitReadyNotify notify;
1668   void *notify_cls;
1669   struct GNUNET_TRANSPORT_TransmitHandle *th;
1670 };
1671
1672
1673 /**
1674  * Transmit message of a client destined for another
1675  * peer to the service.
1676  */
1677 static size_t
1678 client_notify_wrapper (void *cls, size_t size, void *buf)
1679 {
1680   struct ClientTransmitWrapper *ctw = cls;
1681   struct OutboundMessage *obm;
1682   struct GNUNET_MessageHeader *hdr;
1683   size_t ret;
1684
1685   if (size == 0)
1686     {
1687 #if DEBUG_TRANSPORT
1688       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1689                   "Transmission request could not be satisfied.\n");
1690 #endif
1691       ret = ctw->notify (ctw->notify_cls, 0, NULL);
1692       GNUNET_assert (ret == 0);
1693       GNUNET_free (ctw);
1694       return 0;
1695     }
1696   GNUNET_assert (size >= sizeof (struct OutboundMessage));
1697   obm = buf;
1698   ret = ctw->notify (ctw->notify_cls,
1699                      size - sizeof (struct OutboundMessage),
1700                      (void *) &obm[1]);
1701   if (ret == 0)
1702     {
1703       /* Need to reset flag, no SEND means no SEND_OK! */
1704       ctw->th->neighbour->transmit_ok = GNUNET_YES;
1705       GNUNET_free (ctw);
1706       return 0;
1707     }
1708   GNUNET_assert (ret >= sizeof (struct GNUNET_MessageHeader));
1709   hdr = (struct GNUNET_MessageHeader *) &obm[1];
1710   GNUNET_assert (ntohs (hdr->size) == ret);
1711   GNUNET_assert (ret + sizeof (struct OutboundMessage) <
1712                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
1713 #if DEBUG_TRANSPORT
1714   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1715               "Transmitting `%s' message with data for `%4s'\n",
1716               "SEND", GNUNET_i2s (&ctw->th->target));
1717 #endif
1718   ret += sizeof (struct OutboundMessage);
1719   obm->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND);
1720   obm->header.size = htons (ret);
1721   obm->reserved = htonl (0);
1722   obm->peer = ctw->th->target;
1723   GNUNET_free (ctw);
1724   return ret;
1725 }
1726
1727
1728
1729 /**
1730  * Check if we could queue a message of the given size for
1731  * transmission.  The transport service will take both its
1732  * internal buffers and bandwidth limits imposed by the
1733  * other peer into consideration when answering this query.
1734  *
1735  * @param handle connection to transport service
1736  * @param target who should receive the message
1737  * @param size how big is the message we want to transmit?
1738  * @param timeout after how long should we give up (and call
1739  *        notify with buf NULL and size 0)?
1740  * @param notify function to call when we are ready to
1741  *        send such a message
1742  * @param notify_cls closure for notify
1743  * @return NULL if someone else is already waiting to be notified
1744  *         non-NULL if the notify callback was queued (can be used to cancel
1745  *         using GNUNET_TRANSPORT_notify_transmit_ready_cancel)
1746  */
1747 struct GNUNET_TRANSPORT_TransmitHandle *
1748 GNUNET_TRANSPORT_notify_transmit_ready (struct GNUNET_TRANSPORT_Handle
1749                                         *handle,
1750                                         const struct GNUNET_PeerIdentity
1751                                         *target, size_t size,
1752                                         struct GNUNET_TIME_Relative timeout,
1753                                         GNUNET_NETWORK_TransmitReadyNotify
1754                                         notify, void *notify_cls)
1755 {
1756   struct GNUNET_TRANSPORT_TransmitHandle *pos;
1757   struct GNUNET_TRANSPORT_TransmitHandle *th;
1758   struct NeighbourList *n;
1759   struct ClientTransmitWrapper *ctw;
1760
1761   if (size + sizeof (struct OutboundMessage) >=
1762       GNUNET_SERVER_MAX_MESSAGE_SIZE)
1763     return NULL;
1764 #if DEBUG_TRANSPORT
1765   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1766               "Asking transport service for transmission of %u bytes to peer `%4s'.\n",
1767               size, GNUNET_i2s (target));
1768 #endif
1769   n = find_neighbour (handle, target);
1770   ctw = GNUNET_malloc (sizeof (struct ClientTransmitWrapper));
1771   th = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_TransmitHandle));
1772   ctw->notify = notify;
1773   ctw->notify_cls = notify_cls;
1774   ctw->th = th;
1775   th->handle = handle;
1776   th->target = *target;
1777   th->notify = &client_notify_wrapper;
1778   th->notify_cls = ctw;
1779   th->notify_size = size + sizeof (struct OutboundMessage);
1780   th->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1781   th->neighbour = n;
1782   if (NULL == n)
1783     {
1784 #if DEBUG_TRANSPORT
1785       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1786                   "Transmission request could not be satisfied (not yet connected), adding it to pending request list.\n");
1787 #endif
1788       pos = handle->connect_wait_head;
1789       while (pos != NULL)
1790         {
1791           GNUNET_assert (0 != memcmp (target,
1792                                       &pos->target,
1793                                       sizeof (struct GNUNET_PeerIdentity)));
1794           pos = pos->next;
1795         }
1796 #if DEBUG_TRANSPORT
1797       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1798                   "Will now try to connect to `%4s'.\n", GNUNET_i2s (target));
1799 #endif
1800       try_connect (th);
1801     }
1802   else
1803     {
1804 #if DEBUG_TRANSPORT
1805       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1806                   "Transmission request queued for transmission to transport service.\n");
1807 #endif
1808       GNUNET_assert (NULL == n->transmit_handle);
1809       n->transmit_handle = th;
1810       if (GNUNET_YES == n->received_ack)
1811         {
1812 #if DEBUG_TRANSPORT
1813           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1814                       "Peer `%4s' is connected, scheduling for delivery now.\n",
1815                       GNUNET_i2s (target));
1816 #endif
1817           schedule_request (th);
1818         }
1819       else
1820         {
1821 #if DEBUG_TRANSPORT
1822           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1823                       "Connection to `%4s' is not yet confirmed connected, scheduling timeout (%llums) only.\n",
1824                       GNUNET_i2s (target), timeout.value);
1825 #endif
1826           th->notify_delay_task
1827             = GNUNET_SCHEDULER_add_delayed (handle->sched,
1828                                             GNUNET_NO,
1829                                             GNUNET_SCHEDULER_PRIORITY_KEEP,
1830                                             GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
1831                                             timeout, &transmit_timeout, th);
1832         }
1833     }
1834   return th;
1835 }
1836
1837
1838 /**
1839  * Cancel the specified transmission-ready
1840  * notification.
1841  */
1842 void
1843 GNUNET_TRANSPORT_notify_transmit_ready_cancel (struct
1844                                                GNUNET_TRANSPORT_TransmitHandle
1845                                                *th)
1846 {
1847   struct GNUNET_TRANSPORT_Handle *h;
1848
1849   GNUNET_assert (th->notify == &client_notify_wrapper);
1850   remove_from_any_list (th);
1851   h = th->handle;
1852   if ((h->connect_ready_head == NULL) && (h->network_handle != NULL))
1853     {
1854       GNUNET_NETWORK_notify_transmit_ready_cancel (h->network_handle);
1855       h->network_handle = NULL;
1856       h->transmission_scheduled = GNUNET_NO;
1857     }
1858   GNUNET_free (th->notify_cls);
1859   GNUNET_free (th);
1860 }
1861
1862
1863 /* end of transport_api.c */