debug code
[oweals/gnunet.git] / src / transport / transport_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/transport_api.c
23  * @brief library to access the low-level P2P IO service
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - set_quota with low bandwidth should cause peer
28  *   disconnects (currently never does that) (MINOR)
29  */
30 #include "platform.h"
31 #include "gnunet_client_lib.h"
32 #include "gnunet_arm_service.h"
33 #include "gnunet_hello_lib.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_server_lib.h"
36 #include "gnunet_time_lib.h"
37 #include "gnunet_transport_service.h"
38 #include "transport.h"
39
40 /**
41  * After how long do we give up on transmitting a HELLO
42  * to the service?
43  */
44 #define OFFER_HELLO_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
45
46 /**
47  * How long should ARM wait when starting up the
48  * transport service before reporting back?
49  */
50 #define START_SERVICE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
51
52 /**
53  * How long should ARM wait when stopping the
54  * transport service before reporting back?
55  */
56 #define STOP_SERVICE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
57
58 /**
59  * Entry in linked list of all of our current neighbours.
60  */
61 struct NeighbourList
62 {
63
64   /**
65    * This is a linked list.
66    */
67   struct NeighbourList *next;
68
69   /**
70    * Active transmit handle, can be NULL.  Used to move
71    * from ready to wait list on disconnect and to block
72    * two transmissions to the same peer from being scheduled
73    * at the same time.
74    */
75   struct GNUNET_TRANSPORT_TransmitHandle *transmit_handle;
76
77
78   /**
79    * Identity of this neighbour.
80    */
81   struct GNUNET_PeerIdentity id;
82
83   /**
84    * At what time did we reset last_sent last?
85    */
86   struct GNUNET_TIME_Absolute last_quota_update;
87
88   /**
89    * How many bytes have we sent since the "last_quota_update"
90    * timestamp?
91    */
92   uint64_t last_sent;
93
94   /**
95    * Global quota for outbound traffic to the neighbour in bytes/ms.
96    */
97   uint32_t quota_out;
98
99   /**
100    * Set to GNUNET_YES if we are currently allowed to
101    * transmit a message to the transport service for this
102    * peer, GNUNET_NO otherwise.
103    */
104   int transmit_ok;
105
106   /**
107    * Set to GNUNET_YES if we have received an ACK for the
108    * given peer.  Peers that receive our HELLO always respond
109    * with an ACK to let us know that we are successfully
110    * communicating.  Note that a PING can not be used for this
111    * since PINGs are only send if a HELLO address requires
112    * confirmation (and also, PINGs are not passed to the
113    * transport API itself).
114    */
115   int received_ack;
116
117 };
118
119
120 /**
121  * Linked list of requests from clients for our HELLO
122  * that were deferred.
123  */
124 struct HelloWaitList
125 {
126
127   /**
128    * This is a linked list.
129    */
130   struct HelloWaitList *next;
131
132   /**
133    * Reference back to our transport handle.
134    */
135   struct GNUNET_TRANSPORT_Handle *handle;
136
137   /**
138    * Callback to call once we got our HELLO.
139    */
140   GNUNET_TRANSPORT_ReceiveCallback rec;
141
142   /**
143    * Closure for rec.
144    */
145   void *rec_cls;
146
147   /**
148    * When to time out (call rec with NULL).
149    */
150   struct GNUNET_TIME_Absolute timeout;
151
152   /**
153    * Timeout task (used to trigger timeout,
154    * cancel if we get the HELLO in time).
155    */
156   GNUNET_SCHEDULER_TaskIdentifier task;
157
158
159 };
160
161
162 /**
163  * Opaque handle for a transmission-ready request.
164  */
165 struct GNUNET_TRANSPORT_TransmitHandle
166 {
167
168   /**
169    * We keep the transmit handles that are waiting for
170    * a transport-level connection in a doubly linked list.
171    */
172   struct GNUNET_TRANSPORT_TransmitHandle *next;
173
174   /**
175    * We keep the transmit handles that are waiting for
176    * a transport-level connection in a doubly linked list.
177    */
178   struct GNUNET_TRANSPORT_TransmitHandle *prev;
179
180   /**
181    * Handle of the main transport data structure.
182    */
183   struct GNUNET_TRANSPORT_Handle *handle;
184
185   /**
186    * Neighbour for this handle, can be NULL if the service
187    * is not yet connected to the target.
188    */
189   struct NeighbourList *neighbour;
190
191   /**
192    * Which peer is this transmission going to be for?  All
193    * zeros if it is control-traffic to the service.
194    */
195   struct GNUNET_PeerIdentity target;
196
197   /**
198    * Function to call when notify_size bytes are available
199    * for transmission.
200    */
201   GNUNET_NETWORK_TransmitReadyNotify notify;
202
203   /**
204    * Closure for notify.
205    */
206   void *notify_cls;
207
208   /**
209    * transmit_ready task Id.  The task is used to introduce
210    * the artificial delay that may be required to maintain
211    * the bandwidth limits.
212    */
213   GNUNET_SCHEDULER_TaskIdentifier notify_delay_task;
214
215   /**
216    * Timeout for this request.
217    */
218   struct GNUNET_TIME_Absolute timeout;
219
220   /**
221    * How many bytes is our notify callback waiting for?
222    */
223   size_t notify_size;
224
225 };
226
227
228 /**
229  * Handle for the transport service (includes all of the
230  * state for the transport service).
231  */
232 struct GNUNET_TRANSPORT_Handle
233 {
234
235   /**
236    * Closure for the callbacks.
237    */
238   void *cls;
239
240   /**
241    * Function to call for received data.
242    */
243   GNUNET_TRANSPORT_ReceiveCallback rec;
244
245   /**
246    * function to call on connect events
247    */
248   GNUNET_TRANSPORT_NotifyConnect nc_cb;
249
250   /**
251    * function to call on disconnect events
252    */
253   GNUNET_TRANSPORT_NotifyDisconnect nd_cb;
254
255   /**
256    * The current HELLO message for this peer.  Updated
257    * whenever transports change their addresses.
258    */
259   struct GNUNET_HELLO_Message *my_hello;
260
261   /**
262    * My client connection to the transport service.
263    */
264   struct GNUNET_CLIENT_Connection *client;
265
266   /**
267    * Handle to our registration with the client for notification.
268    */
269   struct GNUNET_NETWORK_TransmitHandle *network_handle;
270
271   /**
272    * Linked list of transmit handles that are waiting for the
273    * transport to connect to the respective peer.  When we
274    * receive notification that the transport connected to a
275    * peer, we go over this list and check if someone has already
276    * requested a transmission to the new peer; if so, we trigger
277    * the next step.
278    */
279   struct GNUNET_TRANSPORT_TransmitHandle *connect_wait_head;
280
281   /**
282    * Linked list of transmit handles that are waiting for the
283    * transport to be ready for transmission to the respective
284    * peer.  When we
285    * receive notification that the transport disconnected from
286    * a peer, we go over this list and move the entry back to
287    * the connect_wait list.
288    */
289   struct GNUNET_TRANSPORT_TransmitHandle *connect_ready_head;
290
291   /**
292    * Linked list of pending requests for our HELLO.
293    */
294   struct HelloWaitList *hwl_head;
295
296   /**
297    * My scheduler.
298    */
299   struct GNUNET_SCHEDULER_Handle *sched;
300
301   /**
302    * My configuration.
303    */
304   struct GNUNET_CONFIGURATION_Handle *cfg;
305
306   /**
307    * Linked list of the current neighbours of this peer.
308    */
309   struct NeighbourList *neighbours;
310
311   /**
312    * ID of the task trying to reconnect to the
313    * service.
314    */
315   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
316
317   /**
318    * Delay until we try to reconnect.
319    */
320   struct GNUNET_TIME_Relative reconnect_delay;
321
322   /**
323    * Do we currently have a transmission pending?
324    * (schedule transmission was called but has not
325    * yet succeeded)?
326    */
327   int transmission_scheduled;
328 };
329
330
331 static struct NeighbourList *
332 find_neighbour (struct GNUNET_TRANSPORT_Handle *h,
333                 const struct GNUNET_PeerIdentity *peer)
334 {
335   struct NeighbourList *pos;
336
337   pos = h->neighbours;
338   while ((pos != NULL) &&
339          (0 != memcmp (peer, &pos->id, sizeof (struct GNUNET_PeerIdentity))))
340     pos = pos->next;
341   return pos;
342 }
343
344
345 /**
346  * Schedule the task to send one message from the
347  * connect_ready list to the service.
348  */
349 static void schedule_transmission (struct GNUNET_TRANSPORT_Handle *h);
350
351
352 /**
353  * Transmit message to client...
354  */
355 static size_t
356 transport_notify_ready (void *cls, size_t size, void *buf)
357 {
358   struct GNUNET_TRANSPORT_Handle *h = cls;
359   struct GNUNET_TRANSPORT_TransmitHandle *th;
360   struct NeighbourList *n;
361   size_t ret;
362   char *cbuf;
363
364 #if DEBUG_TRANSPORT
365   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
366               "Ready to transmit %u bytes to transport service\n", size);
367 #endif
368   h->network_handle = NULL;
369   h->transmission_scheduled = GNUNET_NO;
370   if (buf == NULL)
371     {
372       th = h->connect_ready_head;
373       if (th->next != NULL)
374         th->next->prev = NULL;
375       h->connect_ready_head = th->next;
376       if (NULL != (n = th->neighbour))
377         {
378           GNUNET_assert (n->transmit_handle == th);
379           n->transmit_handle = NULL;
380         }
381       GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL));
382       GNUNET_free (th);
383       return 0;
384     }
385   cbuf = buf;
386   ret = 0;
387   h->network_handle = NULL;
388   h->transmission_scheduled = GNUNET_NO;
389   do
390     {
391       th = h->connect_ready_head;
392       GNUNET_assert (th->notify_size <= size);
393       if (th->next != NULL)
394         th->next->prev = NULL;
395       h->connect_ready_head = th->next;
396       if (NULL != (n = th->neighbour))
397         {
398           GNUNET_assert (n->transmit_handle == th);
399           n->transmit_handle = NULL;
400         }
401       ret += th->notify (th->notify_cls, size, &cbuf[ret]);
402       GNUNET_free (th);
403       if (n != NULL)
404         n->last_sent += ret;
405       size -= ret;
406     }
407   while ((h->connect_ready_head != NULL) &&
408          (h->connect_ready_head->notify_size <= size));
409   if (h->connect_ready_head != NULL)
410     schedule_transmission (h);
411 #if DEBUG_TRANSPORT
412   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
413               "Transmitting %u bytes to transport service\n", ret);
414 #endif
415   return ret;
416 }
417
418
419 /**
420  * Schedule the task to send one message from the
421  * connect_ready list to the service.
422  */
423 static void
424 schedule_transmission (struct GNUNET_TRANSPORT_Handle *h)
425 {
426   struct GNUNET_TRANSPORT_TransmitHandle *th;
427
428   GNUNET_assert (NULL == h->network_handle);
429   if (h->client == NULL)
430     {
431 #if DEBUG_TRANSPORT
432       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
433                   "Not yet connected to transport service, need to wait.\n");
434 #endif
435       return;
436     }
437   th = h->connect_ready_head;
438   if (th == NULL)
439     {
440 #if DEBUG_TRANSPORT
441       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
442                   "Schedule transmission called, but no request is pending.\n");
443 #endif
444       return;
445     }
446   h->transmission_scheduled = GNUNET_YES;
447 #if DEBUG_TRANSPORT
448   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
449               "Asking client API for transmission of %u bytes\n",
450               th->notify_size);
451 #endif
452   h->network_handle = GNUNET_CLIENT_notify_transmit_ready (h->client,
453                                                            th->notify_size,
454                                                            GNUNET_TIME_absolute_get_remaining
455                                                            (th->timeout),
456                                                            &transport_notify_ready,
457                                                            h);
458   GNUNET_assert (NULL != h->network_handle);
459 }
460
461
462 /**
463  * Insert the given transmit handle in the given sorted
464  * doubly linked list based on timeout.
465  *
466  * @param head pointer to the head of the linked list
467  * @param th element to insert into the list
468  */
469 static void
470 insert_transmit_handle (struct GNUNET_TRANSPORT_TransmitHandle **head,
471                         struct GNUNET_TRANSPORT_TransmitHandle *th)
472 {
473   struct GNUNET_TRANSPORT_TransmitHandle *pos;
474   struct GNUNET_TRANSPORT_TransmitHandle *prev;
475
476   pos = *head;
477   prev = NULL;
478   while ((pos != NULL) && (pos->timeout.value < th->timeout.value))
479     {
480       prev = pos;
481       pos = pos->next;
482     }
483   if (prev == NULL)
484     {
485       th->next = *head;
486       if (th->next != NULL)
487         th->next->prev = th;
488       *head = th;
489     }
490   else
491     {
492       th->next = pos;
493       th->prev = prev;
494       prev->next = th;
495       if (pos != NULL)
496         pos->prev = th;
497     }
498 }
499
500
501 /**
502  * Queue control request for transmission to the transport
503  * service.
504  *
505  * @param size number of bytes to be transmitted
506  * @param at_head request must be added to the head of the queue
507  *        (otherwise request will be appended)
508  * @param timeout how long this transmission can wait (at most)
509  * @param notify function to call to get the content
510  * @param notify_cls closure for notify
511  */
512 static void
513 schedule_control_transmit (struct GNUNET_TRANSPORT_Handle *h,
514                            size_t size,
515                            int at_head,
516                            struct GNUNET_TIME_Relative timeout,
517                            GNUNET_NETWORK_TransmitReadyNotify notify,
518                            void *notify_cls)
519 {
520   struct GNUNET_TRANSPORT_TransmitHandle *th;
521 #if DEBUG_TRANSPORT
522   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
523               "Queueing %u bytes control transmission request.\n", size);
524 #endif
525   th = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_TransmitHandle));
526   th->handle = h;
527   th->notify = notify;
528   th->notify_cls = notify_cls;
529   th->timeout = GNUNET_TIME_relative_to_absolute (timeout);
530   th->notify_size = size;
531   if (at_head)
532     {
533       th->next = h->connect_ready_head;
534       h->connect_ready_head = th;
535       if (th->next != NULL)
536         th->next->prev = th;
537     }
538   else
539     {
540       insert_transmit_handle (&h->connect_ready_head, th);
541     }
542   if (GNUNET_NO == h->transmission_scheduled)
543     schedule_transmission (h);
544 }
545
546
547 /**
548  * Update the quota values for the given neighbour now.
549  */
550 static void
551 update_quota (struct NeighbourList *n)
552 {
553   struct GNUNET_TIME_Relative delta;
554   uint64_t allowed;
555   uint64_t remaining;
556
557   delta = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
558   allowed = delta.value * n->quota_out;
559   if (n->last_sent < allowed)
560     {
561       remaining = allowed - n->last_sent;
562       if (n->quota_out > 0)
563         remaining /= n->quota_out;
564       else
565         remaining = 0;
566       if (remaining > MAX_BANDWIDTH_CARRY)
567         remaining = MAX_BANDWIDTH_CARRY;
568       n->last_sent = 0;
569       n->last_quota_update = GNUNET_TIME_absolute_get ();
570       n->last_quota_update.value -= remaining;
571     }
572   else
573     {
574       n->last_sent -= allowed;
575       n->last_quota_update = GNUNET_TIME_absolute_get ();
576     }
577 }
578
579
580 struct SetQuotaContext
581 {
582   struct GNUNET_TRANSPORT_Handle *handle;
583
584   struct GNUNET_PeerIdentity target;
585
586   GNUNET_SCHEDULER_Task cont;
587
588   void *cont_cls;
589
590   struct GNUNET_TIME_Absolute timeout;
591
592   uint32_t quota_in;
593 };
594
595
596 static size_t
597 send_set_quota (void *cls, size_t size, void *buf)
598 {
599   struct SetQuotaContext *sqc = cls;
600   struct QuotaSetMessage *msg;
601
602   if (buf == NULL)
603     {
604       GNUNET_SCHEDULER_add_continuation (sqc->handle->sched,
605                                          GNUNET_NO,
606                                          sqc->cont,
607                                          sqc->cont_cls,
608                                          GNUNET_SCHEDULER_REASON_TIMEOUT);
609       GNUNET_free (sqc);
610       return 0;
611     }
612 #if DEBUG_TRANSPORT
613   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
614               "Transmitting `%s' request with respect to `%4s'.\n",
615               "SET_QUOTA", GNUNET_i2s (&sqc->target));
616 #endif
617   GNUNET_assert (size >= sizeof (struct QuotaSetMessage));
618   msg = buf;
619   msg->header.size = htons (sizeof (struct QuotaSetMessage));
620   msg->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
621   msg->quota_in = htonl (sqc->quota_in);
622   memcpy (&msg->peer, &sqc->target, sizeof (struct GNUNET_PeerIdentity));
623   if (sqc->cont != NULL)
624     GNUNET_SCHEDULER_add_continuation (sqc->handle->sched,
625                                        GNUNET_NO,
626                                        sqc->cont,
627                                        sqc->cont_cls,
628                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
629   GNUNET_free (sqc);
630   return sizeof (struct QuotaSetMessage);
631 }
632
633
634 /**
635  * Set the share of incoming bandwidth for the given
636  * peer to the specified amount.
637  *
638  * @param handle connection to transport service
639  * @param target who's bandwidth quota is being changed
640  * @param quota_in incoming bandwidth quota in bytes per ms; 0 can
641  *        be used to force all traffic to be discarded
642  * @param quota_out outgoing bandwidth quota in bytes per ms; 0 can
643  *        be used to force all traffic to be discarded
644  * @param timeout how long to wait until signaling failure if
645  *        we can not communicate the quota change
646  * @param cont continuation to call when done, will be called
647  *        either with reason "TIMEOUT" or with reason "PREREQ_DONE"
648  * @param cont_cls closure for continuation
649  */
650 void
651 GNUNET_TRANSPORT_set_quota (struct GNUNET_TRANSPORT_Handle *handle,
652                             const struct GNUNET_PeerIdentity *target,
653                             uint32_t quota_in,
654                             uint32_t quota_out,
655                             struct GNUNET_TIME_Relative timeout,
656                             GNUNET_SCHEDULER_Task cont, void *cont_cls)
657 {
658   struct NeighbourList *n;
659   struct SetQuotaContext *sqc;
660
661   n = find_neighbour (handle, target);
662   if (n != NULL)
663     {
664       update_quota (n);
665       if (n->quota_out < quota_out)
666         n->last_quota_update = GNUNET_TIME_absolute_get ();
667       n->quota_out = quota_out;
668     }
669   sqc = GNUNET_malloc (sizeof (struct SetQuotaContext));
670   sqc->handle = handle;
671   sqc->target = *target;
672   sqc->cont = cont;
673   sqc->cont_cls = cont_cls;
674   sqc->timeout = GNUNET_TIME_relative_to_absolute (timeout);
675   sqc->quota_in = quota_in;
676   schedule_control_transmit (handle,
677                              sizeof (struct QuotaSetMessage),
678                              GNUNET_NO, timeout, &send_set_quota, sqc);
679 }
680
681
682 /**
683  * A "get_hello" request has timed out.  Signal the client
684  * and clean up.
685  */
686 static void
687 hello_wait_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
688 {
689   struct HelloWaitList *hwl = cls;
690   struct HelloWaitList *pos;
691   struct HelloWaitList *prev;
692
693   prev = NULL;
694   pos = hwl->handle->hwl_head;
695   while (pos != hwl)
696     {
697       GNUNET_assert (pos != NULL);
698       prev = pos;
699       pos = pos->next;
700     }
701   if (prev == NULL)
702     hwl->handle->hwl_head = hwl->next;
703   else
704     prev->next = hwl->next;
705   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
706               _("Timeout trying to obtain `%s' from transport service.\n"),
707               "HELLO");
708   /* signal timeout */
709   if (hwl->rec != NULL)
710     hwl->rec (hwl->rec_cls, GNUNET_TIME_UNIT_ZERO, NULL, NULL);
711   GNUNET_free (hwl);
712 }
713
714
715 /**
716  * Obtain the HELLO message for this peer.
717  *
718  * @param handle connection to transport service
719  * @param timeout how long to wait for the HELLO
720  * @param rec function to call with the HELLO, sender will be our peer
721  *            identity; message and sender will be NULL on timeout
722  *            (handshake with transport service pending/failed).
723  *             cost estimate will be 0.
724  * @param rec_cls closure for rec
725  */
726 void
727 GNUNET_TRANSPORT_get_hello (struct GNUNET_TRANSPORT_Handle *handle,
728                             struct GNUNET_TIME_Relative timeout,
729                             GNUNET_TRANSPORT_ReceiveCallback rec,
730                             void *rec_cls)
731 {
732   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded pk;
733   struct GNUNET_PeerIdentity me;
734   struct HelloWaitList *hwl;
735
736   if (handle->my_hello == NULL)
737     {
738       hwl = GNUNET_malloc (sizeof (struct HelloWaitList));
739       hwl->next = handle->hwl_head;
740       handle->hwl_head = hwl;
741       hwl->handle = handle;
742       hwl->rec = rec;
743       hwl->rec_cls = rec_cls;
744       hwl->timeout = GNUNET_TIME_relative_to_absolute (timeout);
745       hwl->task = GNUNET_SCHEDULER_add_delayed (handle->sched,
746                                                 GNUNET_YES,
747                                                 GNUNET_SCHEDULER_PRIORITY_KEEP,
748                                                 GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
749                                                 timeout,
750                                                 &hello_wait_timeout, hwl);
751       return;
752     }
753   GNUNET_assert (GNUNET_OK == GNUNET_HELLO_get_key (handle->my_hello, &pk));
754   GNUNET_CRYPTO_hash (&pk,
755                       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
756                       &me.hashPubKey);
757
758   rec (rec_cls,
759        GNUNET_TIME_UNIT_ZERO,
760        &me, (const struct GNUNET_MessageHeader *) handle->my_hello);
761 }
762
763
764 static size_t
765 send_hello (void *cls, size_t size, void *buf)
766 {
767   struct GNUNET_MessageHeader *hello = cls;
768   uint16_t msize;
769
770   if (buf == NULL)
771     {
772 #if DEBUG_TRANSPORT
773       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
774                   "Timeout while trying to transmit `%s' request.\n",
775                   "HELLO");
776 #endif
777       GNUNET_free (hello);
778       return 0;
779     }
780 #if DEBUG_TRANSPORT
781   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
782               "Transmitting `%s' request.\n", "HELLO");
783 #endif
784   msize = ntohs (hello->size);
785   GNUNET_assert (size >= msize);
786   memcpy (buf, hello, msize);
787   GNUNET_free (hello);
788   return msize;
789 }
790
791
792 /**
793  * Offer the transport service the HELLO of another peer.  Note that
794  * the transport service may just ignore this message if the HELLO is
795  * malformed or useless due to our local configuration.
796  *
797  * @param handle connection to transport service
798  * @param hello the hello message
799  */
800 void
801 GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle,
802                               const struct GNUNET_MessageHeader *hello)
803 {
804   struct GNUNET_MessageHeader *hc;
805   uint16_t size;
806
807   if (handle->client == NULL)
808     {
809 #if DEBUG_TRANSPORT
810       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
811                   "Not connected to transport service, dropping offered HELLO\n");
812 #endif
813       return;
814     }
815   GNUNET_break (ntohs (hello->type) == GNUNET_MESSAGE_TYPE_HELLO);
816   size = ntohs (hello->size);
817   GNUNET_break (size >= sizeof (struct GNUNET_MessageHeader));
818   hc = GNUNET_malloc (size);
819   memcpy (hc, hello, size);
820   schedule_control_transmit (handle,
821                              size,
822                              GNUNET_NO, OFFER_HELLO_TIMEOUT, &send_hello, hc);
823 }
824
825
826 /**
827  * Function we use for handling incoming messages.
828  */
829 static void demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg);
830
831
832 static size_t
833 send_start (void *cls, size_t size, void *buf)
834 {
835   struct GNUNET_MessageHeader *s = buf;
836
837   if (buf == NULL)
838     return 0;
839 #if DEBUG_TRANSPORT
840   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
841               "Transmitting `%s' request.\n", "START");
842 #endif
843   GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader));
844   s->size = htons (sizeof (struct GNUNET_MessageHeader));
845   s->type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_START);
846   return sizeof (struct GNUNET_MessageHeader);
847 }
848
849
850 /**
851  * Try again to connect to transport service.
852  */
853 static void
854 reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
855 {
856   struct GNUNET_TRANSPORT_Handle *h = cls;
857   struct GNUNET_TRANSPORT_TransmitHandle *pos;
858   struct NeighbourList *n;
859
860   while (NULL != (n = h->neighbours))
861     {
862       h->neighbours = n->next;
863       pos = n->transmit_handle;
864       if (pos != NULL)
865         {
866           pos->neighbour = NULL;
867           pos->next = h->connect_wait_head;
868           h->connect_wait_head = pos;
869           if (pos->next != NULL)
870             pos->next->prev = pos;
871           pos->prev = NULL;
872         }
873       GNUNET_free (n);
874     }
875   h->connect_ready_head = NULL;
876 #if DEBUG_TRANSPORT
877   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service.\n");
878 #endif
879   GNUNET_assert (h->client == NULL);
880   h->reconnect_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
881   h->client = GNUNET_CLIENT_connect (h->sched, "transport", h->cfg);
882   GNUNET_assert (h->client != NULL);
883   /* make sure we don't send "START" twice,
884      remove existing entry from queue (if present) */
885   pos = h->connect_ready_head;
886   while (pos != NULL)
887     {
888       if (pos->notify == &send_start)
889         {
890           if (pos->prev == NULL)
891             h->connect_ready_head = pos->next;
892           else
893             pos->prev->next = pos->next;
894           if (pos->next != NULL)
895             pos->next->prev = pos->prev;
896           GNUNET_assert (pos->neighbour == NULL);
897           GNUNET_free (pos);
898           break;
899         }
900       pos = pos->next;
901     }
902   schedule_control_transmit (h,
903                              sizeof (struct GNUNET_MessageHeader),
904                              GNUNET_YES,
905                              GNUNET_TIME_UNIT_FOREVER_REL, &send_start, NULL);
906   GNUNET_CLIENT_receive (h->client,
907                          &demultiplexer, h, GNUNET_TIME_UNIT_FOREVER_REL);
908 }
909
910
911 /**
912  * Function that will schedule the job that will try
913  * to connect us again to the client.
914  */
915 static void
916 schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h)
917 {
918 #if DEBUG_TRANSPORT
919   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
920               "Scheduling task to reconnect to transport service in %llu ms.\n",
921               h->reconnect_delay.value);
922 #endif
923   GNUNET_assert (h->client == NULL);
924   GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_PREREQUISITE_TASK);
925   h->reconnect_task
926     = GNUNET_SCHEDULER_add_delayed (h->sched,
927                                     GNUNET_NO,
928                                     GNUNET_SCHEDULER_PRIORITY_DEFAULT,
929                                     GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
930                                     h->reconnect_delay, &reconnect, h);
931   h->reconnect_delay = GNUNET_TIME_UNIT_SECONDS;
932 }
933
934
935 /**
936  * Remove the given transmit handle from the wait list.  Does NOT free
937  * it.
938  */
939 static void
940 remove_from_wait_list (struct GNUNET_TRANSPORT_TransmitHandle *th)
941 {
942   if (th->prev == NULL)
943     th->handle->connect_wait_head = th->next;
944   else
945     th->prev->next = th->next;
946   if (th->next != NULL)
947     th->next->prev = th->prev;
948 }
949
950
951 /**
952  * We are connected to the respective peer, check the
953  * bandwidth limits and schedule the transmission.
954  */
955 static void schedule_request (struct GNUNET_TRANSPORT_TransmitHandle *th);
956
957
958 /**
959  * Function called by the scheduler when the timeout
960  * for bandwidth availablility for the target
961  * neighbour is reached.
962  */
963 static void
964 transmit_ready (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
965 {
966   struct GNUNET_TRANSPORT_TransmitHandle *th = cls;
967
968   th->notify_delay_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
969   schedule_request (th);
970 }
971
972
973 /**
974  * Called when our transmit request timed out before any transport
975  * reported success connecting to the desired peer or before the
976  * transport was ready to receive.  Signal error and free
977  * TransmitHandle.
978  */
979 static void
980 transmit_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
981 {
982   struct GNUNET_TRANSPORT_TransmitHandle *th = cls;
983
984   if (th->neighbour != NULL)
985     th->neighbour->transmit_handle = NULL;
986 #if DEBUG_TRANSPORT
987   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission request timed out.\n");
988 #endif
989   th->notify_delay_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
990   remove_from_wait_list (th);
991   th->notify (th->notify_cls, 0, NULL);
992   GNUNET_free (th);
993 }
994
995
996 /**
997  * We are connected to the respective peer, check the
998  * bandwidth limits and schedule the transmission.
999  */
1000 static void
1001 schedule_request (struct GNUNET_TRANSPORT_TransmitHandle *th)
1002 {
1003   struct GNUNET_TRANSPORT_Handle *h;
1004   struct GNUNET_TIME_Relative duration;
1005   struct NeighbourList *n;
1006   uint64_t available;
1007
1008   h = th->handle;
1009   n = th->neighbour;
1010   if (th->notify_delay_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
1011     {
1012       GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task);
1013       th->notify_delay_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1014     }
1015   /* check outgoing quota */
1016   duration = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
1017   if (duration.value > MIN_QUOTA_REFRESH_TIME)
1018     {
1019       update_quota (n);
1020       duration = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
1021     }
1022   available = duration.value * n->quota_out;
1023   if (available < n->last_sent + th->notify_size)
1024     {
1025       /* calculate how much bandwidth we'd still need to
1026          accumulate and based on that how long we'll have
1027          to wait... */
1028       available = n->last_sent + th->notify_size - available;
1029       duration = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
1030                                                 available / n->quota_out);
1031       if (th->timeout.value <
1032           GNUNET_TIME_relative_to_absolute (duration).value)
1033         {
1034           /* signal timeout! */
1035 #if DEBUG_TRANSPORT
1036           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1037                       "Would need %llu ms before bandwidth is available for delivery, that is too long.  Signaling timeout.\n",
1038                       duration.value);
1039 #endif
1040           remove_from_wait_list (th);
1041           th->notify (th->notify_cls, 0, NULL);
1042           GNUNET_free (th);
1043           return;
1044         }
1045 #if DEBUG_TRANSPORT
1046       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1047                   "Need more bandwidth, delaying delivery by %llu ms\n",
1048                   duration.value);
1049 #endif
1050       th->notify_delay_task
1051         = GNUNET_SCHEDULER_add_delayed (h->sched,
1052                                         GNUNET_NO,
1053                                         GNUNET_SCHEDULER_PRIORITY_KEEP,
1054                                         GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
1055                                         duration, &transmit_ready, th);
1056       return;
1057     }
1058 #if DEBUG_TRANSPORT
1059   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1060               "Bandwidth available for transmission to `%4s'\n",
1061               GNUNET_i2s (&n->id));
1062 #endif
1063   if (GNUNET_NO == n->transmit_ok)
1064     {
1065       /* we may be ready, but transport service is not;
1066          wait for SendOkMessage or timeout */
1067 #if DEBUG_TRANSPORT
1068       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1069                   "Need to wait for transport service `%s' message\n",
1070                   "SEND_OK");
1071 #endif
1072       th->notify_delay_task
1073         = GNUNET_SCHEDULER_add_delayed (h->sched,
1074                                         GNUNET_NO,
1075                                         GNUNET_SCHEDULER_PRIORITY_KEEP,
1076                                         GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
1077                                         GNUNET_TIME_absolute_get_remaining
1078                                         (th->timeout), &transmit_timeout, th);
1079       return;
1080     }
1081   n->transmit_ok = GNUNET_NO;
1082   remove_from_wait_list (th);
1083 #if DEBUG_TRANSPORT
1084   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Moving message to ready list\n");
1085 #endif
1086   insert_transmit_handle (&h->connect_ready_head, th);
1087   if (GNUNET_NO == h->transmission_scheduled)
1088     schedule_transmission (h);
1089 }
1090
1091
1092 /**
1093  * Add neighbour to our list
1094  */
1095 static void
1096 add_neighbour (struct GNUNET_TRANSPORT_Handle *h,
1097                uint32_t quota_out,
1098                struct GNUNET_TIME_Relative latency,
1099                const struct GNUNET_PeerIdentity *pid)
1100 {
1101   struct NeighbourList *n;
1102   struct GNUNET_TRANSPORT_TransmitHandle *prev;
1103   struct GNUNET_TRANSPORT_TransmitHandle *pos;
1104   struct GNUNET_TRANSPORT_TransmitHandle *next;
1105
1106   /* check for duplicates */
1107   if (NULL != find_neighbour (h, pid))
1108     {
1109       GNUNET_break (0);
1110       return;
1111     }
1112 #if DEBUG_TRANSPORT
1113   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1114               "Creating entry for new neighbour `%4s'.\n", GNUNET_i2s (pid));
1115 #endif
1116   n = GNUNET_malloc (sizeof (struct NeighbourList));
1117   n->id = *pid;
1118   n->last_quota_update = GNUNET_TIME_absolute_get ();
1119   n->quota_out = quota_out;
1120   n->next = h->neighbours;
1121   n->transmit_ok = GNUNET_YES;
1122   h->neighbours = n;
1123   if (h->nc_cb != NULL)
1124     h->nc_cb (h->cls, &n->id, latency);
1125   prev = NULL;
1126   pos = h->connect_wait_head;
1127   while (pos != NULL)
1128     {
1129       next = pos->next;
1130 #if DEBUG_TRANSPORT
1131       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1132                   "Found entry in connect_wait_head for `%4s'.\n",
1133                   GNUNET_i2s (&pos->target));
1134 #endif
1135       if (0 == memcmp (pid,
1136                        &pos->target, sizeof (struct GNUNET_PeerIdentity)))
1137         {
1138 #if DEBUG_TRANSPORT
1139           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1140                       "Found pending request for new connection, will trigger now.\n");
1141 #endif
1142           pos->neighbour = n;
1143           if (pos->notify_delay_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
1144             {
1145               GNUNET_SCHEDULER_cancel (h->sched, pos->notify_delay_task);
1146               pos->notify_delay_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1147             }
1148           GNUNET_assert (NULL == n->transmit_handle);
1149           n->transmit_handle = pos;
1150           if (GNUNET_YES == n->received_ack)
1151             {
1152 #if DEBUG_TRANSPORT
1153               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1154                           "`%s' already received, scheduling request\n",
1155                           "ACK");
1156 #endif
1157               schedule_request (pos);
1158             }
1159           else
1160             {
1161 #if DEBUG_TRANSPORT
1162               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1163                           "Still need to wait to receive `%s' message\n",
1164                           "ACK");
1165 #endif
1166               pos->notify_delay_task
1167                 = GNUNET_SCHEDULER_add_delayed (h->sched,
1168                                                 GNUNET_NO,
1169                                                 GNUNET_SCHEDULER_PRIORITY_KEEP,
1170                                                 GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
1171                                                 GNUNET_TIME_absolute_get_remaining
1172                                                 (pos->timeout),
1173                                                 &transmit_timeout, pos);
1174             }
1175           if (prev == NULL)
1176             h->connect_wait_head = next;
1177           else
1178             prev->next = next;
1179           break;
1180         }
1181       prev = pos;
1182       pos = next;
1183     }
1184 }
1185
1186
1187 /**
1188  * Connect to the transport service.  Note that the connection may
1189  * complete (or fail) asynchronously.
1190  *
1191
1192  * @param sched scheduler to use
1193  * @param cfg configuration to use
1194  * @param cls closure for the callbacks
1195  * @param rec receive function to call
1196  * @param nc function to call on connect events
1197  * @param dc function to call on disconnect events
1198  */
1199 struct GNUNET_TRANSPORT_Handle *
1200 GNUNET_TRANSPORT_connect (struct GNUNET_SCHEDULER_Handle *sched,
1201                           struct GNUNET_CONFIGURATION_Handle *cfg,
1202                           void *cls,
1203                           GNUNET_TRANSPORT_ReceiveCallback rec,
1204                           GNUNET_TRANSPORT_NotifyConnect nc,
1205                           GNUNET_TRANSPORT_NotifyDisconnect nd)
1206 {
1207   struct GNUNET_TRANSPORT_Handle *ret;
1208
1209   GNUNET_ARM_start_service ("peerinfo",
1210                             cfg, sched, START_SERVICE_TIMEOUT, NULL, NULL);
1211   GNUNET_ARM_start_service ("transport",
1212                             cfg, sched, START_SERVICE_TIMEOUT, NULL, NULL);
1213   ret = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_Handle));
1214   ret->sched = sched;
1215   ret->cfg = cfg;
1216   ret->cls = cls;
1217   ret->rec = rec;
1218   ret->nc_cb = nc;
1219   ret->nd_cb = nd;
1220   ret->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
1221   schedule_reconnect (ret);
1222   return ret;
1223 }
1224
1225
1226 /**
1227  * These stop activities must be run in a fresh
1228  * scheduler that is NOT in shutdown mode.
1229  */
1230 static void
1231 stop_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1232 {
1233   struct GNUNET_TRANSPORT_Handle *handle = cls;
1234   GNUNET_ARM_stop_service ("transport",
1235                            handle->cfg,
1236                            tc->sched, STOP_SERVICE_TIMEOUT, NULL, NULL);
1237   GNUNET_ARM_stop_service ("peerinfo",
1238                            handle->cfg,
1239                            tc->sched, STOP_SERVICE_TIMEOUT, NULL, NULL);
1240 }
1241
1242
1243 /**
1244  * Disconnect from the transport service.
1245  */
1246 void
1247 GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle)
1248 {
1249   struct GNUNET_TRANSPORT_TransmitHandle *th;
1250   struct NeighbourList *n;
1251   struct HelloWaitList *hwl;
1252   struct GNUNET_CLIENT_Connection *client;
1253
1254 #if DEBUG_TRANSPORT
1255   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transport disconnect called!\n");
1256 #endif
1257   while (NULL != (th = handle->connect_ready_head))
1258     {
1259       handle->connect_ready_head = th->next;
1260       th->notify (th->notify_cls, 0, NULL);
1261       GNUNET_free (th);
1262     }
1263
1264   while (NULL != (th = handle->connect_wait_head))
1265     {
1266       handle->connect_wait_head = th->next;
1267       if (th->notify_delay_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
1268         {
1269           GNUNET_SCHEDULER_cancel (handle->sched, th->notify_delay_task);
1270           th->notify_delay_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1271         }
1272       th->notify (th->notify_cls, 0, NULL);
1273       GNUNET_free (th);
1274     }
1275   while (NULL != (n = handle->neighbours))
1276     {
1277       handle->neighbours = n->next;
1278       GNUNET_free (n);
1279     }
1280   while (NULL != (hwl = handle->hwl_head))
1281     {
1282       handle->hwl_head = hwl->next;
1283       GNUNET_SCHEDULER_cancel (handle->sched, hwl->task);
1284       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1285                   _
1286                   ("Disconnect while trying to obtain HELLO from transport service.\n"));
1287       if (hwl->rec != NULL)
1288         hwl->rec (hwl->rec_cls, GNUNET_TIME_UNIT_ZERO, NULL, NULL);
1289       GNUNET_free (hwl);
1290     }
1291   if (handle->reconnect_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
1292     {
1293       GNUNET_SCHEDULER_cancel (handle->sched, handle->reconnect_task);
1294       handle->reconnect_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1295     }
1296   GNUNET_free_non_null (handle->my_hello);
1297   handle->my_hello = NULL;
1298   GNUNET_SCHEDULER_run (&stop_task, handle);
1299   if (NULL != (client = handle->client))
1300     {
1301 #if DEBUG_TRANSPORT
1302       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1303                   "Disconnecting from transport service for good.\n");
1304 #endif
1305       handle->client = NULL;
1306       GNUNET_CLIENT_disconnect (client);
1307     }
1308   if (client == NULL)
1309     GNUNET_free (handle);
1310 }
1311
1312
1313 /**
1314  * We're ready to transmit the request that the transport service
1315  * should connect to a new peer.  In addition to sending the
1316  * request, schedule the next phase for the transmission processing
1317  * that caused the connect request in the first place.
1318  */
1319 static size_t
1320 request_connect (void *cls, size_t size, void *buf)
1321 {
1322   struct GNUNET_TRANSPORT_TransmitHandle *th = cls;
1323   struct TryConnectMessage *tcm;
1324   struct GNUNET_TRANSPORT_Handle *h;
1325
1326   h = th->handle;
1327   if (buf == NULL)
1328     {
1329 #if DEBUG_TRANSPORT
1330       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1331                   "Failed to transmit connect request to service.\n");
1332 #endif
1333       th->notify (th->notify_cls, 0, NULL);
1334       GNUNET_free (th);
1335       return 0;
1336     }
1337 #if DEBUG_TRANSPORT
1338   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1339               "Transmitting `%s' message for `%4s'.\n",
1340               "TRY_CONNECT", GNUNET_i2s (&th->target));
1341 #endif
1342   GNUNET_assert (size >= sizeof (struct TryConnectMessage));
1343   tcm = buf;
1344   tcm->header.size = htons (sizeof (struct TryConnectMessage));
1345   tcm->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_TRY_CONNECT);
1346   tcm->reserved = htonl (0);
1347   memcpy (&tcm->peer, &th->target, sizeof (struct GNUNET_PeerIdentity));
1348   th->notify_delay_task
1349     = GNUNET_SCHEDULER_add_delayed (h->sched,
1350                                     GNUNET_NO,
1351                                     GNUNET_SCHEDULER_PRIORITY_KEEP,
1352                                     GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
1353                                     GNUNET_TIME_absolute_get_remaining
1354                                     (th->timeout), &transmit_timeout, th);
1355   insert_transmit_handle (&h->connect_wait_head, th);
1356   return sizeof (struct TryConnectMessage);
1357 }
1358
1359
1360 /**
1361  * Schedule a request to connect to the given
1362  * neighbour (and if successful, add the specified
1363  * handle to the wait list).
1364  */
1365 static void
1366 try_connect (struct GNUNET_TRANSPORT_TransmitHandle *th)
1367 {
1368   schedule_control_transmit (th->handle,
1369                              sizeof (struct TryConnectMessage),
1370                              GNUNET_NO,
1371                              GNUNET_TIME_absolute_get_remaining (th->timeout),
1372                              &request_connect, th);
1373 }
1374
1375
1376 /**
1377  * Cancel a pending notify transmit task
1378  * and also remove the given transmit handle
1379  * from whatever list is on.
1380  */
1381 static void
1382 remove_from_any_list (struct GNUNET_TRANSPORT_TransmitHandle *th)
1383 {
1384   struct GNUNET_TRANSPORT_Handle *h;
1385
1386   h = th->handle;
1387   if (th->notify_delay_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
1388     {
1389       GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task);
1390       th->notify_delay_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1391     }
1392   if (th->prev == NULL)
1393     {
1394       if (th == h->connect_wait_head)
1395         h->connect_wait_head = th->next;
1396       else
1397         h->connect_ready_head = th->next;
1398     }
1399   else
1400     th->prev->next = th->next;
1401   if (th->next != NULL)
1402     th->next->prev = th->prev;
1403 }
1404
1405
1406 /**
1407  * Remove neighbour from our list
1408  */
1409 static void
1410 remove_neighbour (struct GNUNET_TRANSPORT_Handle *h,
1411                   const struct GNUNET_PeerIdentity *peer)
1412 {
1413   struct NeighbourList *prev;
1414   struct NeighbourList *pos;
1415   struct GNUNET_TRANSPORT_TransmitHandle *th;
1416
1417   prev = NULL;
1418   pos = h->neighbours;
1419   while ((pos != NULL) &&
1420          (0 != memcmp (peer, &pos->id, sizeof (struct GNUNET_PeerIdentity))))
1421     {
1422       prev = pos;
1423       pos = pos->next;
1424     }
1425   if (pos == NULL)
1426     {
1427       GNUNET_break (0);
1428       return;
1429     }
1430   if (prev == NULL)
1431     h->neighbours = pos->next;
1432   else
1433     prev->next = pos->next;
1434   if (NULL != (th = pos->transmit_handle))
1435     {
1436       pos->transmit_handle = NULL;
1437       th->neighbour = NULL;
1438       remove_from_any_list (th);
1439       try_connect (th);
1440     }
1441   if (h->nc_cb != NULL)
1442     h->nd_cb (h->cls, peer);
1443   GNUNET_free (pos);
1444 }
1445
1446
1447 /**
1448  * Type of a function to call when we receive a message
1449  * from the service.
1450  *
1451  * @param cls closure
1452  * @param msg message received, NULL on timeout or fatal error
1453  */
1454 static void
1455 demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg)
1456 {
1457   struct GNUNET_TRANSPORT_Handle *h = cls;
1458   const struct DisconnectInfoMessage *dim;
1459   const struct ConnectInfoMessage *cim;
1460   const struct InboundMessage *im;
1461   const struct GNUNET_MessageHeader *imm;
1462   const struct SendOkMessage *okm;
1463   struct HelloWaitList *hwl;
1464   struct NeighbourList *n;
1465   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded pkey;
1466   struct GNUNET_PeerIdentity me;
1467   uint16_t size;
1468
1469   if ((msg == NULL) || (h->client == NULL))
1470     {
1471       if (h->client != NULL)
1472         {
1473 #if DEBUG_TRANSPORT
1474           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1475                       "Error receiving from transport service, disconnecting temporarily.\n");
1476 #endif
1477           if (h->network_handle != NULL)
1478             {
1479               GNUNET_NETWORK_notify_transmit_ready_cancel (h->network_handle);
1480               h->network_handle = NULL;
1481               h->transmission_scheduled = GNUNET_NO;
1482             }
1483           GNUNET_CLIENT_disconnect (h->client);
1484           h->client = NULL;
1485           schedule_reconnect (h);
1486         }
1487       else
1488         {
1489           /* shutdown initiated from 'GNUNET_TRANSPORT_disconnect',
1490              finish clean up work! */
1491           GNUNET_free (h);
1492         }
1493       return;
1494     }
1495   GNUNET_CLIENT_receive (h->client,
1496                          &demultiplexer, h, GNUNET_TIME_UNIT_FOREVER_REL);
1497   size = ntohs (msg->size);
1498   switch (ntohs (msg->type))
1499     {
1500     case GNUNET_MESSAGE_TYPE_HELLO:
1501       if (GNUNET_OK !=
1502           GNUNET_HELLO_get_key ((const struct GNUNET_HELLO_Message *) msg,
1503                                 &pkey))
1504         {
1505           GNUNET_break (0);
1506           break;
1507         }
1508       GNUNET_CRYPTO_hash (&pkey,
1509                           sizeof (struct
1510                                   GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
1511                           &me.hashPubKey);
1512 #if DEBUG_TRANSPORT
1513       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1514                   "Receiving (my own) `%s' message, I am `%4s'.\n",
1515                   "HELLO", GNUNET_i2s (&me));
1516 #endif
1517       GNUNET_free_non_null (h->my_hello);
1518       h->my_hello = NULL;
1519       if (size < sizeof (struct GNUNET_MessageHeader))
1520         {
1521           GNUNET_break (0);
1522           break;
1523         }
1524       h->my_hello = GNUNET_malloc (size);
1525       memcpy (h->my_hello, msg, size);
1526       while (NULL != (hwl = h->hwl_head))
1527         {
1528           h->hwl_head = hwl->next;
1529           GNUNET_SCHEDULER_cancel (h->sched, hwl->task);
1530           GNUNET_TRANSPORT_get_hello (h,
1531                                       GNUNET_TIME_UNIT_ZERO,
1532                                       hwl->rec, hwl->rec_cls);
1533           GNUNET_free (hwl);
1534         }
1535       break;
1536     case GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT:
1537       if (size != sizeof (struct ConnectInfoMessage))
1538         {
1539           GNUNET_break (0);
1540           break;
1541         }
1542       cim = (const struct ConnectInfoMessage *) msg;
1543 #if DEBUG_TRANSPORT
1544       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1545                   "Receiving `%s' message for `%4s'.\n",
1546                   "CONNECT", GNUNET_i2s (&cim->id));
1547 #endif
1548       add_neighbour (h,
1549                      ntohl (cim->quota_out),
1550                      GNUNET_TIME_relative_ntoh (cim->latency), &cim->id);
1551       break;
1552     case GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT:
1553       if (size != sizeof (struct DisconnectInfoMessage))
1554         {
1555           GNUNET_break (0);
1556           break;
1557         }
1558       dim = (const struct DisconnectInfoMessage *) msg;
1559 #if DEBUG_TRANSPORT
1560       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1561                   "Receiving `%s' message for `%4s'.\n",
1562                   "DISCONNECT", GNUNET_i2s (&dim->peer));
1563 #endif
1564       remove_neighbour (h, &dim->peer);
1565       break;
1566     case GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK:
1567       if (size != sizeof (struct SendOkMessage))
1568         {
1569           GNUNET_break (0);
1570           break;
1571         }
1572       okm = (const struct SendOkMessage *) msg;
1573 #if DEBUG_TRANSPORT
1574       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1575                   "Receiving `%s' message, transmission %s.\n", "SEND_OK",
1576                   ntohl(okm->success) == GNUNET_OK ? "succeeded" : "failed");
1577 #endif
1578       n = find_neighbour (h, &okm->peer);
1579       GNUNET_assert (n != NULL);
1580       n->transmit_ok = GNUNET_YES;
1581       if (n->transmit_handle != NULL)
1582         {
1583 #if DEBUG_TRANSPORT
1584           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1585                       "Processing pending message\n");
1586 #endif
1587           GNUNET_SCHEDULER_cancel (h->sched,
1588                                    n->transmit_handle->notify_delay_task);
1589           n->transmit_handle->notify_delay_task =
1590             GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1591           GNUNET_assert (GNUNET_YES == n->received_ack);
1592           schedule_request (n->transmit_handle);
1593         }
1594       break;
1595     case GNUNET_MESSAGE_TYPE_TRANSPORT_RECV:
1596 #if DEBUG_TRANSPORT
1597       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1598                   "Receiving `%s' message.\n", "RECV");
1599 #endif
1600       if (size <
1601           sizeof (struct InboundMessage) +
1602           sizeof (struct GNUNET_MessageHeader))
1603         {
1604           GNUNET_break (0);
1605           break;
1606         }
1607       im = (const struct InboundMessage *) msg;
1608       imm = (const struct GNUNET_MessageHeader *) &im[1];
1609       if (ntohs (imm->size) + sizeof (struct InboundMessage) != size)
1610         {
1611           GNUNET_break (0);
1612           break;
1613         }
1614       switch (ntohs (imm->type))
1615         {
1616         case GNUNET_MESSAGE_TYPE_TRANSPORT_ACK:
1617 #if DEBUG_TRANSPORT
1618           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1619                       "Receiving `%s' message from `%4s'.\n",
1620                       "ACK", GNUNET_i2s (&im->peer));
1621 #endif
1622           n = find_neighbour (h, &im->peer);
1623           if (n == NULL)
1624             {
1625               GNUNET_break (0);
1626               break;
1627             }
1628           if (n->received_ack == GNUNET_NO)
1629             {
1630               n->received_ack = GNUNET_YES;
1631               if (NULL != n->transmit_handle)
1632                 {
1633 #if DEBUG_TRANSPORT
1634                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1635                               "Peer connected, scheduling delayed message for deliverery now.\n");
1636 #endif
1637                   schedule_request (n->transmit_handle);
1638                 }
1639             }
1640           break;
1641         default:
1642 #if DEBUG_TRANSPORT
1643           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1644                       "Received message of type %u from `%4s'.\n",
1645                       ntohs (imm->type), GNUNET_i2s (&im->peer));
1646 #endif
1647           if (h->rec != NULL)
1648             h->rec (h->cls,
1649                     GNUNET_TIME_relative_ntoh (im->latency), &im->peer, imm);
1650           break;
1651         }
1652       break;
1653     default:
1654       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1655                   _
1656                   ("Received unexpected message of type %u in %s:%u\n"),
1657                   ntohs (msg->type), __FILE__, __LINE__);
1658       GNUNET_break (0);
1659       break;
1660     }
1661 }
1662
1663
1664 struct ClientTransmitWrapper
1665 {
1666   GNUNET_NETWORK_TransmitReadyNotify notify;
1667   void *notify_cls;
1668   struct GNUNET_TRANSPORT_TransmitHandle *th;
1669 };
1670
1671
1672 /**
1673  * Transmit message of a client destined for another
1674  * peer to the service.
1675  */
1676 static size_t
1677 client_notify_wrapper (void *cls, size_t size, void *buf)
1678 {
1679   struct ClientTransmitWrapper *ctw = cls;
1680   struct OutboundMessage *obm;
1681   struct GNUNET_MessageHeader *hdr;
1682   size_t ret;
1683
1684   if (size == 0)
1685     {
1686 #if DEBUG_TRANSPORT
1687       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1688                   "Transmission request could not be satisfied.\n");
1689 #endif
1690       ret = ctw->notify (ctw->notify_cls, 0, NULL);
1691       GNUNET_assert (ret == 0);
1692       GNUNET_free (ctw);
1693       return 0;
1694     }
1695   GNUNET_assert (size >= sizeof (struct OutboundMessage));
1696   obm = buf;
1697   ret = ctw->notify (ctw->notify_cls,
1698                      size - sizeof (struct OutboundMessage),
1699                      (void *) &obm[1]);
1700   if (ret == 0)
1701     {
1702       /* Need to reset flag, no SEND means no SEND_OK! */
1703       ctw->th->neighbour->transmit_ok = GNUNET_YES;
1704       GNUNET_free (ctw);
1705       return 0;
1706     }
1707   GNUNET_assert (ret >= sizeof (struct GNUNET_MessageHeader));
1708   hdr = (struct GNUNET_MessageHeader *) &obm[1];
1709   GNUNET_assert (ntohs (hdr->size) == ret);
1710   GNUNET_assert (ret + sizeof (struct OutboundMessage) <
1711                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
1712 #if DEBUG_TRANSPORT
1713   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1714               "Transmitting `%s' message with data for `%4s'\n",
1715               "SEND", GNUNET_i2s (&ctw->th->target));
1716 #endif
1717   ret += sizeof (struct OutboundMessage);
1718   obm->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND);
1719   obm->header.size = htons (ret);
1720   obm->reserved = htonl (0);
1721   obm->peer = ctw->th->target;
1722   GNUNET_free (ctw);
1723   return ret;
1724 }
1725
1726
1727
1728 /**
1729  * Check if we could queue a message of the given size for
1730  * transmission.  The transport service will take both its
1731  * internal buffers and bandwidth limits imposed by the
1732  * other peer into consideration when answering this query.
1733  *
1734  * @param handle connection to transport service
1735  * @param target who should receive the message
1736  * @param size how big is the message we want to transmit?
1737  * @param timeout after how long should we give up (and call
1738  *        notify with buf NULL and size 0)?
1739  * @param notify function to call when we are ready to
1740  *        send such a message
1741  * @param notify_cls closure for notify
1742  * @return NULL if someone else is already waiting to be notified
1743  *         non-NULL if the notify callback was queued (can be used to cancel
1744  *         using GNUNET_TRANSPORT_notify_transmit_ready_cancel)
1745  */
1746 struct GNUNET_TRANSPORT_TransmitHandle *
1747 GNUNET_TRANSPORT_notify_transmit_ready (struct GNUNET_TRANSPORT_Handle
1748                                         *handle,
1749                                         const struct GNUNET_PeerIdentity
1750                                         *target, size_t size,
1751                                         struct GNUNET_TIME_Relative timeout,
1752                                         GNUNET_NETWORK_TransmitReadyNotify
1753                                         notify, void *notify_cls)
1754 {
1755   struct GNUNET_TRANSPORT_TransmitHandle *pos;
1756   struct GNUNET_TRANSPORT_TransmitHandle *th;
1757   struct NeighbourList *n;
1758   struct ClientTransmitWrapper *ctw;
1759
1760   if (size + sizeof (struct OutboundMessage) >=
1761       GNUNET_SERVER_MAX_MESSAGE_SIZE)
1762     return NULL;
1763 #if DEBUG_TRANSPORT
1764   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1765               "Asking transport service for transmission of %u bytes to peer `%4s'.\n",
1766               size, GNUNET_i2s (target));
1767 #endif
1768   n = find_neighbour (handle, target);
1769   ctw = GNUNET_malloc (sizeof (struct ClientTransmitWrapper));
1770   th = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_TransmitHandle));
1771   ctw->notify = notify;
1772   ctw->notify_cls = notify_cls;
1773   ctw->th = th;
1774   th->handle = handle;
1775   th->target = *target;
1776   th->notify = &client_notify_wrapper;
1777   th->notify_cls = ctw;
1778   th->notify_size = size + sizeof (struct OutboundMessage);
1779   th->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1780   th->neighbour = n;
1781   if (NULL == n)
1782     {
1783 #if DEBUG_TRANSPORT
1784       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1785                   "Transmission request could not be satisfied (not yet connected), adding it to pending request list.\n");
1786 #endif
1787       pos = handle->connect_wait_head;
1788       while (pos != NULL)
1789         {
1790           GNUNET_assert (0 != memcmp (target,
1791                                       &pos->target,
1792                                       sizeof (struct GNUNET_PeerIdentity)));
1793           pos = pos->next;
1794         }
1795 #if DEBUG_TRANSPORT
1796       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1797                   "Will now try to connect to `%4s'.\n", GNUNET_i2s (target));
1798 #endif
1799       try_connect (th);
1800     }
1801   else
1802     {
1803 #if DEBUG_TRANSPORT
1804       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1805                   "Transmission request queued for transmission to transport service.\n");
1806 #endif
1807       GNUNET_assert (NULL == n->transmit_handle);
1808       n->transmit_handle = th;
1809       if (GNUNET_YES == n->received_ack)
1810         {
1811 #if DEBUG_TRANSPORT
1812           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1813                       "Peer `%4s' is connected, scheduling for delivery now.\n",
1814                       GNUNET_i2s (target));
1815 #endif
1816           schedule_request (th);
1817         }
1818       else
1819         {
1820 #if DEBUG_TRANSPORT
1821           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1822                       "Connection to `%4s' is not yet confirmed connected, scheduling timeout (%llums) only.\n",
1823                       GNUNET_i2s (target), timeout.value);
1824 #endif
1825           th->notify_delay_task
1826             = GNUNET_SCHEDULER_add_delayed (handle->sched,
1827                                             GNUNET_NO,
1828                                             GNUNET_SCHEDULER_PRIORITY_KEEP,
1829                                             GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
1830                                             timeout, &transmit_timeout, th);
1831         }
1832     }
1833   return th;
1834 }
1835
1836
1837 /**
1838  * Cancel the specified transmission-ready
1839  * notification.
1840  */
1841 void
1842 GNUNET_TRANSPORT_notify_transmit_ready_cancel (struct
1843                                                GNUNET_TRANSPORT_TransmitHandle
1844                                                *th)
1845 {
1846   struct GNUNET_TRANSPORT_Handle *h;
1847
1848   GNUNET_assert (th->notify == &client_notify_wrapper);
1849   remove_from_any_list (th);
1850   h = th->handle;
1851   if ((h->connect_ready_head == NULL) && (h->network_handle != NULL))
1852     {
1853       GNUNET_NETWORK_notify_transmit_ready_cancel (h->network_handle);
1854       h->network_handle = NULL;
1855       h->transmission_scheduled = GNUNET_NO;
1856     }
1857   GNUNET_free (th->notify_cls);
1858   GNUNET_free (th);
1859 }
1860
1861
1862 /* end of transport_api.c */