nicer log messages
[oweals/gnunet.git] / src / transport / transport_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/transport_api.c
23  * @brief library to access the low-level P2P IO service
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - set_quota with low bandwidth should cause peer
28  *   disconnects (currently never does that) (MINOR)
29  */
30 #include "platform.h"
31 #include "gnunet_client_lib.h"
32 #include "gnunet_arm_service.h"
33 #include "gnunet_hello_lib.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_server_lib.h"
36 #include "gnunet_time_lib.h"
37 #include "gnunet_transport_service.h"
38 #include "transport.h"
39
40 /**
41  * After how long do we give up on transmitting a HELLO
42  * to the service?
43  */
44 #define OFFER_HELLO_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
45
46 /**
47  * How long should ARM wait when starting up the
48  * transport service before reporting back?
49  */
50 #define START_SERVICE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
51
52 /**
53  * How long should ARM wait when stopping the
54  * transport service before reporting back?
55  */
56 #define STOP_SERVICE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
57
58 /**
59  * Entry in linked list of all of our current neighbours.
60  */
61 struct NeighbourList
62 {
63
64   /**
65    * This is a linked list.
66    */
67   struct NeighbourList *next;
68
69   /**
70    * Active transmit handle, can be NULL.  Used to move
71    * from ready to wait list on disconnect and to block
72    * two transmissions to the same peer from being scheduled
73    * at the same time.
74    */
75   struct GNUNET_TRANSPORT_TransmitHandle *transmit_handle;
76
77
78   /**
79    * Identity of this neighbour.
80    */
81   struct GNUNET_PeerIdentity id;
82
83   /**
84    * At what time did we reset last_sent last?
85    */
86   struct GNUNET_TIME_Absolute last_quota_update;
87
88   /**
89    * How many bytes have we sent since the "last_quota_update"
90    * timestamp?
91    */
92   uint64_t last_sent;
93
94   /**
95    * Global quota for outbound traffic to the neighbour in bytes/ms.
96    */
97   uint32_t quota_out;
98
99   /**
100    * Set to GNUNET_YES if we are currently allowed to
101    * transmit a message to the transport service for this
102    * peer, GNUNET_NO otherwise.
103    */
104   int transmit_ok;
105
106   /**
107    * Set to GNUNET_YES if we have received an ACK for the
108    * given peer.  Peers that receive our HELLO always respond
109    * with an ACK to let us know that we are successfully
110    * communicating.  Note that a PING can not be used for this
111    * since PINGs are only send if a HELLO address requires
112    * confirmation (and also, PINGs are not passed to the
113    * transport API itself).
114    */
115   int received_ack;
116
117 };
118
119
120 /**
121  * Linked list of requests from clients for our HELLO
122  * that were deferred.
123  */
124 struct HelloWaitList
125 {
126
127   /**
128    * This is a linked list.
129    */
130   struct HelloWaitList *next;
131
132   /**
133    * Reference back to our transport handle.
134    */
135   struct GNUNET_TRANSPORT_Handle *handle;
136
137   /**
138    * Callback to call once we got our HELLO.
139    */
140   GNUNET_TRANSPORT_ReceiveCallback rec;
141
142   /**
143    * Closure for rec.
144    */
145   void *rec_cls;
146
147   /**
148    * When to time out (call rec with NULL).
149    */
150   struct GNUNET_TIME_Absolute timeout;
151
152   /**
153    * Timeout task (used to trigger timeout,
154    * cancel if we get the HELLO in time).
155    */
156   GNUNET_SCHEDULER_TaskIdentifier task;
157
158
159 };
160
161
162 /**
163  * Opaque handle for a transmission-ready request.
164  */
165 struct GNUNET_TRANSPORT_TransmitHandle
166 {
167
168   /**
169    * We keep the transmit handles that are waiting for
170    * a transport-level connection in a doubly linked list.
171    */
172   struct GNUNET_TRANSPORT_TransmitHandle *next;
173
174   /**
175    * We keep the transmit handles that are waiting for
176    * a transport-level connection in a doubly linked list.
177    */
178   struct GNUNET_TRANSPORT_TransmitHandle *prev;
179
180   /**
181    * Handle of the main transport data structure.
182    */
183   struct GNUNET_TRANSPORT_Handle *handle;
184
185   /**
186    * Neighbour for this handle, can be NULL if the service
187    * is not yet connected to the target.
188    */
189   struct NeighbourList *neighbour;
190
191   /**
192    * Which peer is this transmission going to be for?  All
193    * zeros if it is control-traffic to the service.
194    */
195   struct GNUNET_PeerIdentity target;
196
197   /**
198    * Function to call when notify_size bytes are available
199    * for transmission.
200    */
201   GNUNET_NETWORK_TransmitReadyNotify notify;
202
203   /**
204    * Closure for notify.
205    */
206   void *notify_cls;
207
208   /**
209    * transmit_ready task Id.  The task is used to introduce
210    * the artificial delay that may be required to maintain
211    * the bandwidth limits.
212    */
213   GNUNET_SCHEDULER_TaskIdentifier notify_delay_task;
214
215   /**
216    * Timeout for this request.
217    */
218   struct GNUNET_TIME_Absolute timeout;
219
220   /**
221    * How many bytes is our notify callback waiting for?
222    */
223   size_t notify_size;
224
225 };
226
227
228 /**
229  * Handle for the transport service (includes all of the
230  * state for the transport service).
231  */
232 struct GNUNET_TRANSPORT_Handle
233 {
234
235   /**
236    * Closure for the callbacks.
237    */
238   void *cls;
239
240   /**
241    * Function to call for received data.
242    */
243   GNUNET_TRANSPORT_ReceiveCallback rec;
244
245   /**
246    * function to call on connect events
247    */
248   GNUNET_TRANSPORT_NotifyConnect nc_cb;
249
250   /**
251    * function to call on disconnect events
252    */
253   GNUNET_TRANSPORT_NotifyDisconnect nd_cb;
254
255   /**
256    * The current HELLO message for this peer.  Updated
257    * whenever transports change their addresses.
258    */
259   struct GNUNET_HELLO_Message *my_hello;
260
261   /**
262    * My client connection to the transport service.
263    */
264   struct GNUNET_CLIENT_Connection *client;
265
266   /**
267    * Handle to our registration with the client for notification.
268    */
269   struct GNUNET_NETWORK_TransmitHandle *network_handle;
270
271   /**
272    * Linked list of transmit handles that are waiting for the
273    * transport to connect to the respective peer.  When we
274    * receive notification that the transport connected to a
275    * peer, we go over this list and check if someone has already
276    * requested a transmission to the new peer; if so, we trigger
277    * the next step.
278    */
279   struct GNUNET_TRANSPORT_TransmitHandle *connect_wait_head;
280
281   /**
282    * Linked list of transmit handles that are waiting for the
283    * transport to be ready for transmission to the respective
284    * peer.  When we
285    * receive notification that the transport disconnected from
286    * a peer, we go over this list and move the entry back to
287    * the connect_wait list.
288    */
289   struct GNUNET_TRANSPORT_TransmitHandle *connect_ready_head;
290
291   /**
292    * Linked list of pending requests for our HELLO.
293    */
294   struct HelloWaitList *hwl_head;
295
296   /**
297    * My scheduler.
298    */
299   struct GNUNET_SCHEDULER_Handle *sched;
300
301   /**
302    * My configuration.
303    */
304   struct GNUNET_CONFIGURATION_Handle *cfg;
305
306   /**
307    * Linked list of the current neighbours of this peer.
308    */
309   struct NeighbourList *neighbours;
310
311   /**
312    * ID of the task trying to reconnect to the
313    * service.
314    */
315   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
316
317   /**
318    * Delay until we try to reconnect.
319    */
320   struct GNUNET_TIME_Relative reconnect_delay;
321
322   /**
323    * Do we currently have a transmission pending?
324    * (schedule transmission was called but has not
325    * yet succeeded)?
326    */
327   int transmission_scheduled;
328 };
329
330
331 static struct NeighbourList *
332 find_neighbour (struct GNUNET_TRANSPORT_Handle *h,
333                 const struct GNUNET_PeerIdentity *peer)
334 {
335   struct NeighbourList *pos;
336
337   pos = h->neighbours;
338   while ((pos != NULL) &&
339          (0 != memcmp (peer, &pos->id, sizeof (struct GNUNET_PeerIdentity))))
340     pos = pos->next;
341   return pos;
342 }
343
344
345 /**
346  * Schedule the task to send one message from the
347  * connect_ready list to the service.
348  */
349 static void schedule_transmission (struct GNUNET_TRANSPORT_Handle *h);
350
351
352 /**
353  * Transmit message to client...
354  */
355 static size_t
356 transport_notify_ready (void *cls, size_t size, void *buf)
357 {
358   struct GNUNET_TRANSPORT_Handle *h = cls;
359   struct GNUNET_TRANSPORT_TransmitHandle *th;
360   struct NeighbourList *n;
361   size_t ret;
362   char *cbuf;
363
364   h->network_handle = NULL;
365   h->transmission_scheduled = GNUNET_NO;
366   if (buf == NULL)
367     {
368 #if DEBUG_TRANSPORT
369       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
370                   "Could not transmit to transport service, cancelling pending requests\n");
371 #endif
372       th = h->connect_ready_head;
373       if (th->next != NULL)
374         th->next->prev = NULL;
375       h->connect_ready_head = th->next;
376       if (NULL != (n = th->neighbour))
377         {
378           GNUNET_assert (n->transmit_handle == th);
379           n->transmit_handle = NULL;
380         }
381       GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL));
382       GNUNET_free (th);
383       return 0;
384     } 
385 #if DEBUG_TRANSPORT
386   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
387               "Ready to transmit %u bytes to transport service\n", size);
388 #endif
389   cbuf = buf;
390   ret = 0;
391   h->network_handle = NULL;
392   h->transmission_scheduled = GNUNET_NO;
393   do
394     {
395       th = h->connect_ready_head;
396       GNUNET_assert (th->notify_size <= size);
397       if (th->next != NULL)
398         th->next->prev = NULL;
399       h->connect_ready_head = th->next;
400       if (NULL != (n = th->neighbour))
401         {
402           GNUNET_assert (n->transmit_handle == th);
403           n->transmit_handle = NULL;
404         }
405       ret += th->notify (th->notify_cls, size, &cbuf[ret]);
406       GNUNET_free (th);
407       if (n != NULL)
408         n->last_sent += ret;
409       size -= ret;
410     }
411   while ((h->connect_ready_head != NULL) &&
412          (h->connect_ready_head->notify_size <= size));
413   if (h->connect_ready_head != NULL)
414     schedule_transmission (h);
415 #if DEBUG_TRANSPORT
416   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
417               "Transmitting %u bytes to transport service\n", ret);
418 #endif
419   return ret;
420 }
421
422
423 /**
424  * Schedule the task to send one message from the
425  * connect_ready list to the service.
426  */
427 static void
428 schedule_transmission (struct GNUNET_TRANSPORT_Handle *h)
429 {
430   struct GNUNET_TRANSPORT_TransmitHandle *th;
431
432   GNUNET_assert (NULL == h->network_handle);
433   if (h->client == NULL)
434     return; /* not yet connected */
435   th = h->connect_ready_head;
436   if (th == NULL)
437     return; /* no request pending */
438   h->transmission_scheduled = GNUNET_YES;
439   h->network_handle = GNUNET_CLIENT_notify_transmit_ready (h->client,
440                                                            th->notify_size,
441                                                            GNUNET_TIME_absolute_get_remaining
442                                                            (th->timeout),
443                                                            &transport_notify_ready,
444                                                            h);
445   GNUNET_assert (NULL != h->network_handle);
446 }
447
448
449 /**
450  * Insert the given transmit handle in the given sorted
451  * doubly linked list based on timeout.
452  *
453  * @param head pointer to the head of the linked list
454  * @param th element to insert into the list
455  */
456 static void
457 insert_transmit_handle (struct GNUNET_TRANSPORT_TransmitHandle **head,
458                         struct GNUNET_TRANSPORT_TransmitHandle *th)
459 {
460   struct GNUNET_TRANSPORT_TransmitHandle *pos;
461   struct GNUNET_TRANSPORT_TransmitHandle *prev;
462
463   pos = *head;
464   prev = NULL;
465   while ((pos != NULL) && (pos->timeout.value < th->timeout.value))
466     {
467       prev = pos;
468       pos = pos->next;
469     }
470   if (prev == NULL)
471     {
472       th->next = *head;
473       if (th->next != NULL)
474         th->next->prev = th;
475       *head = th;
476     }
477   else
478     {
479       th->next = pos;
480       th->prev = prev;
481       prev->next = th;
482       if (pos != NULL)
483         pos->prev = th;
484     }
485 }
486
487
488 /**
489  * Queue control request for transmission to the transport
490  * service.
491  *
492  * @param size number of bytes to be transmitted
493  * @param at_head request must be added to the head of the queue
494  *        (otherwise request will be appended)
495  * @param timeout how long this transmission can wait (at most)
496  * @param notify function to call to get the content
497  * @param notify_cls closure for notify
498  */
499 static void
500 schedule_control_transmit (struct GNUNET_TRANSPORT_Handle *h,
501                            size_t size,
502                            int at_head,
503                            struct GNUNET_TIME_Relative timeout,
504                            GNUNET_NETWORK_TransmitReadyNotify notify,
505                            void *notify_cls)
506 {
507   struct GNUNET_TRANSPORT_TransmitHandle *th;
508
509   th = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_TransmitHandle));
510   th->handle = h;
511   th->notify = notify;
512   th->notify_cls = notify_cls;
513   th->timeout = GNUNET_TIME_relative_to_absolute (timeout);
514   th->notify_size = size;
515   if (at_head)
516     {
517       th->next = h->connect_ready_head;
518       h->connect_ready_head = th;
519       if (th->next != NULL)
520         th->next->prev = th;
521     }
522   else
523     {
524       insert_transmit_handle (&h->connect_ready_head, th);
525     }
526   if (GNUNET_NO == h->transmission_scheduled)
527     schedule_transmission (h);
528 }
529
530
531 /**
532  * Update the quota values for the given neighbour now.
533  */
534 static void
535 update_quota (struct NeighbourList *n)
536 {
537   struct GNUNET_TIME_Relative delta;
538   uint64_t allowed;
539   uint64_t remaining;
540
541   delta = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
542   allowed = delta.value * n->quota_out;
543   if (n->last_sent < allowed)
544     {
545       remaining = allowed - n->last_sent;
546       if (n->quota_out > 0)
547         remaining /= n->quota_out;
548       else
549         remaining = 0;
550       if (remaining > MAX_BANDWIDTH_CARRY)
551         remaining = MAX_BANDWIDTH_CARRY;
552       n->last_sent = 0;
553       n->last_quota_update = GNUNET_TIME_absolute_get ();
554       n->last_quota_update.value -= remaining;
555     }
556   else
557     {
558       n->last_sent -= allowed;
559       n->last_quota_update = GNUNET_TIME_absolute_get ();
560     }
561 }
562
563
564 struct SetQuotaContext
565 {
566   struct GNUNET_TRANSPORT_Handle *handle;
567
568   struct GNUNET_PeerIdentity target;
569
570   GNUNET_SCHEDULER_Task cont;
571
572   void *cont_cls;
573
574   struct GNUNET_TIME_Absolute timeout;
575
576   uint32_t quota_in;
577 };
578
579
580 static size_t
581 send_set_quota (void *cls, size_t size, void *buf)
582 {
583   struct SetQuotaContext *sqc = cls;
584   struct QuotaSetMessage *msg;
585
586   if (buf == NULL)
587     {
588       GNUNET_SCHEDULER_add_continuation (sqc->handle->sched,
589                                          GNUNET_NO,
590                                          sqc->cont,
591                                          sqc->cont_cls,
592                                          GNUNET_SCHEDULER_REASON_TIMEOUT);
593       GNUNET_free (sqc);
594       return 0;
595     }
596 #if DEBUG_TRANSPORT
597   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
598               "Transmitting `%s' request with respect to `%4s'.\n",
599               "SET_QUOTA", GNUNET_i2s (&sqc->target));
600 #endif
601   GNUNET_assert (size >= sizeof (struct QuotaSetMessage));
602   msg = buf;
603   msg->header.size = htons (sizeof (struct QuotaSetMessage));
604   msg->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
605   msg->quota_in = htonl (sqc->quota_in);
606   memcpy (&msg->peer, &sqc->target, sizeof (struct GNUNET_PeerIdentity));
607   if (sqc->cont != NULL)
608     GNUNET_SCHEDULER_add_continuation (sqc->handle->sched,
609                                        GNUNET_NO,
610                                        sqc->cont,
611                                        sqc->cont_cls,
612                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
613   GNUNET_free (sqc);
614   return sizeof (struct QuotaSetMessage);
615 }
616
617
618 /**
619  * Set the share of incoming bandwidth for the given
620  * peer to the specified amount.
621  *
622  * @param handle connection to transport service
623  * @param target who's bandwidth quota is being changed
624  * @param quota_in incoming bandwidth quota in bytes per ms; 0 can
625  *        be used to force all traffic to be discarded
626  * @param quota_out outgoing bandwidth quota in bytes per ms; 0 can
627  *        be used to force all traffic to be discarded
628  * @param timeout how long to wait until signaling failure if
629  *        we can not communicate the quota change
630  * @param cont continuation to call when done, will be called
631  *        either with reason "TIMEOUT" or with reason "PREREQ_DONE"
632  * @param cont_cls closure for continuation
633  */
634 void
635 GNUNET_TRANSPORT_set_quota (struct GNUNET_TRANSPORT_Handle *handle,
636                             const struct GNUNET_PeerIdentity *target,
637                             uint32_t quota_in,
638                             uint32_t quota_out,
639                             struct GNUNET_TIME_Relative timeout,
640                             GNUNET_SCHEDULER_Task cont, void *cont_cls)
641 {
642   struct NeighbourList *n;
643   struct SetQuotaContext *sqc;
644
645   n = find_neighbour (handle, target);
646   if (n != NULL)
647     {
648       update_quota (n);
649       if (n->quota_out < quota_out)
650         n->last_quota_update = GNUNET_TIME_absolute_get ();
651       n->quota_out = quota_out;
652     }
653   sqc = GNUNET_malloc (sizeof (struct SetQuotaContext));
654   sqc->handle = handle;
655   sqc->target = *target;
656   sqc->cont = cont;
657   sqc->cont_cls = cont_cls;
658   sqc->timeout = GNUNET_TIME_relative_to_absolute (timeout);
659   sqc->quota_in = quota_in;
660   schedule_control_transmit (handle,
661                              sizeof (struct QuotaSetMessage),
662                              GNUNET_NO, timeout, &send_set_quota, sqc);
663 }
664
665
666 /**
667  * A "get_hello" request has timed out.  Signal the client
668  * and clean up.
669  */
670 static void
671 hello_wait_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
672 {
673   struct HelloWaitList *hwl = cls;
674   struct HelloWaitList *pos;
675   struct HelloWaitList *prev;
676
677   prev = NULL;
678   pos = hwl->handle->hwl_head;
679   while (pos != hwl)
680     {
681       GNUNET_assert (pos != NULL);
682       prev = pos;
683       pos = pos->next;
684     }
685   if (prev == NULL)
686     hwl->handle->hwl_head = hwl->next;
687   else
688     prev->next = hwl->next;
689   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
690               _("Timeout trying to obtain `%s' from transport service.\n"),
691               "HELLO");
692   /* signal timeout */
693   if (hwl->rec != NULL)
694     hwl->rec (hwl->rec_cls, GNUNET_TIME_UNIT_ZERO, NULL, NULL);
695   GNUNET_free (hwl);
696 }
697
698
699 /**
700  * Obtain the HELLO message for this peer.
701  *
702  * @param handle connection to transport service
703  * @param timeout how long to wait for the HELLO
704  * @param rec function to call with the HELLO, sender will be our peer
705  *            identity; message and sender will be NULL on timeout
706  *            (handshake with transport service pending/failed).
707  *             cost estimate will be 0.
708  * @param rec_cls closure for rec
709  */
710 void
711 GNUNET_TRANSPORT_get_hello (struct GNUNET_TRANSPORT_Handle *handle,
712                             struct GNUNET_TIME_Relative timeout,
713                             GNUNET_TRANSPORT_ReceiveCallback rec,
714                             void *rec_cls)
715 {
716   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded pk;
717   struct GNUNET_PeerIdentity me;
718   struct HelloWaitList *hwl;
719
720   if (handle->my_hello == NULL)
721     {
722       hwl = GNUNET_malloc (sizeof (struct HelloWaitList));
723       hwl->next = handle->hwl_head;
724       handle->hwl_head = hwl;
725       hwl->handle = handle;
726       hwl->rec = rec;
727       hwl->rec_cls = rec_cls;
728       hwl->timeout = GNUNET_TIME_relative_to_absolute (timeout);
729       hwl->task = GNUNET_SCHEDULER_add_delayed (handle->sched,
730                                                 GNUNET_YES,
731                                                 GNUNET_SCHEDULER_PRIORITY_KEEP,
732                                                 GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
733                                                 timeout,
734                                                 &hello_wait_timeout, hwl);
735       return;
736     }
737   GNUNET_assert (GNUNET_OK == GNUNET_HELLO_get_key (handle->my_hello, &pk));
738   GNUNET_CRYPTO_hash (&pk,
739                       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
740                       &me.hashPubKey);
741
742   rec (rec_cls,
743        GNUNET_TIME_UNIT_ZERO,
744        &me, (const struct GNUNET_MessageHeader *) handle->my_hello);
745 }
746
747
748 static size_t
749 send_hello (void *cls, size_t size, void *buf)
750 {
751   struct GNUNET_MessageHeader *hello = cls;
752   uint16_t msize;
753
754   if (buf == NULL)
755     {
756 #if DEBUG_TRANSPORT
757       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
758                   "Timeout while trying to transmit `%s' request.\n",
759                   "HELLO");
760 #endif
761       GNUNET_free (hello);
762       return 0;
763     }
764 #if DEBUG_TRANSPORT
765   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
766               "Transmitting `%s' request.\n", "HELLO");
767 #endif
768   msize = ntohs (hello->size);
769   GNUNET_assert (size >= msize);
770   memcpy (buf, hello, msize);
771   GNUNET_free (hello);
772   return msize;
773 }
774
775
776 /**
777  * Offer the transport service the HELLO of another peer.  Note that
778  * the transport service may just ignore this message if the HELLO is
779  * malformed or useless due to our local configuration.
780  *
781  * @param handle connection to transport service
782  * @param hello the hello message
783  */
784 void
785 GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle,
786                               const struct GNUNET_MessageHeader *hello)
787 {
788   struct GNUNET_MessageHeader *hc;
789   uint16_t size;
790
791   if (handle->client == NULL)
792     {
793 #if DEBUG_TRANSPORT
794       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
795                   "Not connected to transport service, dropping offered HELLO\n");
796 #endif
797       return;
798     }
799   GNUNET_break (ntohs (hello->type) == GNUNET_MESSAGE_TYPE_HELLO);
800   size = ntohs (hello->size);
801   GNUNET_break (size >= sizeof (struct GNUNET_MessageHeader));
802   hc = GNUNET_malloc (size);
803   memcpy (hc, hello, size);
804   schedule_control_transmit (handle,
805                              size,
806                              GNUNET_NO, OFFER_HELLO_TIMEOUT, &send_hello, hc);
807 }
808
809
810 /**
811  * Function we use for handling incoming messages.
812  */
813 static void demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg);
814
815
816 static size_t
817 send_start (void *cls, size_t size, void *buf)
818 {
819   struct GNUNET_MessageHeader *s = buf;
820
821   if (buf == NULL)
822     {
823 #if DEBUG_TRANSPORT
824       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
825                   "Timeout while trying to transmit `%s' request.\n",
826                   "START");
827 #endif
828       return 0;
829     }
830 #if DEBUG_TRANSPORT
831   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
832               "Transmitting `%s' request.\n", "START");
833 #endif
834   GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader));
835   s->size = htons (sizeof (struct GNUNET_MessageHeader));
836   s->type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_START);
837   return sizeof (struct GNUNET_MessageHeader);
838 }
839
840
841 /**
842  * Try again to connect to transport service.
843  */
844 static void
845 reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
846 {
847   struct GNUNET_TRANSPORT_Handle *h = cls;
848   struct GNUNET_TRANSPORT_TransmitHandle *pos;
849   struct NeighbourList *n;
850
851   while (NULL != (n = h->neighbours))
852     {
853       h->neighbours = n->next;
854       pos = n->transmit_handle;
855       if (pos != NULL)
856         {
857           pos->neighbour = NULL;
858           pos->next = h->connect_wait_head;
859           h->connect_wait_head = pos;
860           if (pos->next != NULL)
861             pos->next->prev = pos;
862           pos->prev = NULL;
863         }
864       GNUNET_free (n);
865     }
866   h->connect_ready_head = NULL;
867 #if DEBUG_TRANSPORT
868   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service.\n");
869 #endif
870   GNUNET_assert (h->client == NULL);
871   h->reconnect_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
872   h->client = GNUNET_CLIENT_connect (h->sched, "transport", h->cfg);
873   GNUNET_assert (h->client != NULL);
874   /* make sure we don't send "START" twice,
875      remove existing entry from queue (if present) */
876   pos = h->connect_ready_head;
877   while (pos != NULL)
878     {
879       if (pos->notify == &send_start)
880         {
881           if (pos->prev == NULL)
882             h->connect_ready_head = pos->next;
883           else
884             pos->prev->next = pos->next;
885           if (pos->next != NULL)
886             pos->next->prev = pos->prev;
887           GNUNET_assert (pos->neighbour == NULL);
888           GNUNET_free (pos);
889           break;
890         }
891       pos = pos->next;
892     }
893   schedule_control_transmit (h,
894                              sizeof (struct GNUNET_MessageHeader),
895                              GNUNET_YES,
896                              GNUNET_TIME_UNIT_FOREVER_REL, &send_start, NULL);
897   GNUNET_CLIENT_receive (h->client,
898                          &demultiplexer, h, GNUNET_TIME_UNIT_FOREVER_REL);
899 }
900
901
902 /**
903  * Function that will schedule the job that will try
904  * to connect us again to the client.
905  */
906 static void
907 schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h)
908 {
909 #if DEBUG_TRANSPORT
910   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
911               "Scheduling task to reconnect to transport service in %llu ms.\n",
912               h->reconnect_delay.value);
913 #endif
914   GNUNET_assert (h->client == NULL);
915   GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_PREREQUISITE_TASK);
916   h->reconnect_task
917     = GNUNET_SCHEDULER_add_delayed (h->sched,
918                                     GNUNET_NO,
919                                     GNUNET_SCHEDULER_PRIORITY_DEFAULT,
920                                     GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
921                                     h->reconnect_delay, &reconnect, h);
922   h->reconnect_delay = GNUNET_TIME_UNIT_SECONDS;
923 }
924
925
926 /**
927  * Remove the given transmit handle from the wait list.  Does NOT free
928  * it.
929  */
930 static void
931 remove_from_wait_list (struct GNUNET_TRANSPORT_TransmitHandle *th)
932 {
933   if (th->prev == NULL)
934     th->handle->connect_wait_head = th->next;
935   else
936     th->prev->next = th->next;
937   if (th->next != NULL)
938     th->next->prev = th->prev;
939 }
940
941
942 /**
943  * We are connected to the respective peer, check the
944  * bandwidth limits and schedule the transmission.
945  */
946 static void schedule_request (struct GNUNET_TRANSPORT_TransmitHandle *th);
947
948
949 /**
950  * Function called by the scheduler when the timeout
951  * for bandwidth availablility for the target
952  * neighbour is reached.
953  */
954 static void
955 transmit_ready (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
956 {
957   struct GNUNET_TRANSPORT_TransmitHandle *th = cls;
958
959   th->notify_delay_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
960   schedule_request (th);
961 }
962
963
964 /**
965  * Called when our transmit request timed out before any transport
966  * reported success connecting to the desired peer or before the
967  * transport was ready to receive.  Signal error and free
968  * TransmitHandle.
969  */
970 static void
971 transmit_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
972 {
973   struct GNUNET_TRANSPORT_TransmitHandle *th = cls;
974
975   if (th->neighbour != NULL)
976     th->neighbour->transmit_handle = NULL;
977 #if DEBUG_TRANSPORT
978   GNUNET_log (GNUNET_ERROR_TYPE_INFO, 
979               "Request for transmission to peer `%s' timed out.\n",
980               GNUNET_i2s(&th->target));
981 #endif
982   th->notify_delay_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
983   remove_from_wait_list (th);
984   th->notify (th->notify_cls, 0, NULL);
985   GNUNET_free (th);
986 }
987
988
989 /**
990  * We are connected to the respective peer, check the
991  * bandwidth limits and schedule the transmission.
992  */
993 static void
994 schedule_request (struct GNUNET_TRANSPORT_TransmitHandle *th)
995 {
996   struct GNUNET_TRANSPORT_Handle *h;
997   struct GNUNET_TIME_Relative duration;
998   struct NeighbourList *n;
999   uint64_t available;
1000
1001   h = th->handle;
1002   n = th->neighbour;
1003   if (th->notify_delay_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
1004     {
1005       GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task);
1006       th->notify_delay_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1007     }
1008   /* check outgoing quota */
1009   duration = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
1010   if (duration.value > MIN_QUOTA_REFRESH_TIME)
1011     {
1012       update_quota (n);
1013       duration = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
1014     }
1015   available = duration.value * n->quota_out;
1016   if (available < n->last_sent + th->notify_size)
1017     {
1018       /* calculate how much bandwidth we'd still need to
1019          accumulate and based on that how long we'll have
1020          to wait... */
1021       available = n->last_sent + th->notify_size - available;
1022       duration = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
1023                                                 available / n->quota_out);
1024       if (th->timeout.value <
1025           GNUNET_TIME_relative_to_absolute (duration).value)
1026         {
1027           /* signal timeout! */
1028 #if DEBUG_TRANSPORT
1029           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1030                       "Would need %llu ms before bandwidth is available for delivery to `%4s', that is too long.  Signaling timeout.\n",
1031                       duration.value,
1032                       GNUNET_i2s(&th->target));
1033 #endif
1034           remove_from_wait_list (th);
1035           th->notify (th->notify_cls, 0, NULL);
1036           GNUNET_free (th);
1037           return;
1038         }
1039 #if DEBUG_TRANSPORT
1040       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1041                   "Need more bandwidth, delaying delivery to `%4s' by %llu ms\n",
1042                   GNUNET_i2s(&th->target),
1043                   duration.value);
1044 #endif
1045       th->notify_delay_task
1046         = GNUNET_SCHEDULER_add_delayed (h->sched,
1047                                         GNUNET_NO,
1048                                         GNUNET_SCHEDULER_PRIORITY_KEEP,
1049                                         GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
1050                                         duration, &transmit_ready, th);
1051       return;
1052     }
1053 #if DEBUG_TRANSPORT
1054   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1055               "Bandwidth available for transmission to `%4s'\n",
1056               GNUNET_i2s (&n->id));
1057 #endif
1058   if (GNUNET_NO == n->transmit_ok)
1059     {
1060       /* we may be ready, but transport service is not;
1061          wait for SendOkMessage or timeout */
1062 #if DEBUG_TRANSPORT
1063       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1064                   "Need to wait for transport service `%s' message\n",
1065                   "SEND_OK");
1066 #endif
1067       th->notify_delay_task
1068         = GNUNET_SCHEDULER_add_delayed (h->sched,
1069                                         GNUNET_NO,
1070                                         GNUNET_SCHEDULER_PRIORITY_KEEP,
1071                                         GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
1072                                         GNUNET_TIME_absolute_get_remaining
1073                                         (th->timeout), &transmit_timeout, th);
1074       return;
1075     }
1076   n->transmit_ok = GNUNET_NO;
1077   remove_from_wait_list (th);
1078 #if DEBUG_TRANSPORT
1079   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Moving message for `%4s' to ready list\n",
1080               GNUNET_i2s(&n->id));
1081 #endif
1082   insert_transmit_handle (&h->connect_ready_head, th);
1083   if (GNUNET_NO == h->transmission_scheduled)
1084     schedule_transmission (h);
1085 }
1086
1087
1088 /**
1089  * Add neighbour to our list
1090  */
1091 static void
1092 add_neighbour (struct GNUNET_TRANSPORT_Handle *h,
1093                uint32_t quota_out,
1094                struct GNUNET_TIME_Relative latency,
1095                const struct GNUNET_PeerIdentity *pid)
1096 {
1097   struct NeighbourList *n;
1098   struct GNUNET_TRANSPORT_TransmitHandle *prev;
1099   struct GNUNET_TRANSPORT_TransmitHandle *pos;
1100   struct GNUNET_TRANSPORT_TransmitHandle *next;
1101
1102   /* check for duplicates */
1103   if (NULL != find_neighbour (h, pid))
1104     {
1105       GNUNET_break (0);
1106       return;
1107     }
1108 #if DEBUG_TRANSPORT
1109   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1110               "Creating entry for new neighbour `%4s'.\n", GNUNET_i2s (pid));
1111 #endif
1112   n = GNUNET_malloc (sizeof (struct NeighbourList));
1113   n->id = *pid;
1114   n->last_quota_update = GNUNET_TIME_absolute_get ();
1115   n->quota_out = quota_out;
1116   n->next = h->neighbours;
1117   n->transmit_ok = GNUNET_YES;
1118   h->neighbours = n;
1119   if (h->nc_cb != NULL)
1120     h->nc_cb (h->cls, &n->id, latency);
1121   prev = NULL;
1122   pos = h->connect_wait_head;
1123   while (pos != NULL)
1124     {
1125       next = pos->next;
1126       if (0 == memcmp (pid,
1127                        &pos->target, sizeof (struct GNUNET_PeerIdentity)))
1128         {
1129           pos->neighbour = n;
1130           GNUNET_assert (NULL == n->transmit_handle);
1131           n->transmit_handle = pos;
1132           if (prev == NULL)
1133             h->connect_wait_head = next;
1134           else
1135             prev->next = next;
1136           if (GNUNET_YES == n->received_ack)
1137             {
1138 #if DEBUG_TRANSPORT
1139               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1140                           "Found pending request for `%4s' will trigger it now.\n",
1141                           GNUNET_i2s (&pos->target));
1142 #endif
1143               if (pos->notify_delay_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
1144                 {
1145                   GNUNET_SCHEDULER_cancel (h->sched, pos->notify_delay_task);
1146                   pos->notify_delay_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1147                 }
1148               schedule_request (pos);
1149             }
1150           else
1151             {
1152 #if DEBUG_TRANSPORT
1153               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1154                           "Found pending request for `%4s' but still need `%s' before proceeding.\n",
1155                           GNUNET_i2s (&pos->target),
1156                           "ACK");
1157 #endif
1158             }
1159           break;
1160         }
1161       prev = pos;
1162       pos = next;
1163     }
1164 }
1165
1166
1167 /**
1168  * Connect to the transport service.  Note that the connection may
1169  * complete (or fail) asynchronously.
1170  *
1171
1172  * @param sched scheduler to use
1173  * @param cfg configuration to use
1174  * @param cls closure for the callbacks
1175  * @param rec receive function to call
1176  * @param nc function to call on connect events
1177  * @param dc function to call on disconnect events
1178  */
1179 struct GNUNET_TRANSPORT_Handle *
1180 GNUNET_TRANSPORT_connect (struct GNUNET_SCHEDULER_Handle *sched,
1181                           struct GNUNET_CONFIGURATION_Handle *cfg,
1182                           void *cls,
1183                           GNUNET_TRANSPORT_ReceiveCallback rec,
1184                           GNUNET_TRANSPORT_NotifyConnect nc,
1185                           GNUNET_TRANSPORT_NotifyDisconnect nd)
1186 {
1187   struct GNUNET_TRANSPORT_Handle *ret;
1188
1189   GNUNET_ARM_start_service ("peerinfo",
1190                             cfg, sched, START_SERVICE_TIMEOUT, NULL, NULL);
1191   GNUNET_ARM_start_service ("transport",
1192                             cfg, sched, START_SERVICE_TIMEOUT, NULL, NULL);
1193   ret = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_Handle));
1194   ret->sched = sched;
1195   ret->cfg = cfg;
1196   ret->cls = cls;
1197   ret->rec = rec;
1198   ret->nc_cb = nc;
1199   ret->nd_cb = nd;
1200   ret->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
1201   schedule_reconnect (ret);
1202   return ret;
1203 }
1204
1205
1206 /**
1207  * These stop activities must be run in a fresh
1208  * scheduler that is NOT in shutdown mode.
1209  */
1210 static void
1211 stop_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1212 {
1213   struct GNUNET_TRANSPORT_Handle *handle = cls;
1214   GNUNET_ARM_stop_service ("transport",
1215                            handle->cfg,
1216                            tc->sched, STOP_SERVICE_TIMEOUT, NULL, NULL);
1217   GNUNET_ARM_stop_service ("peerinfo",
1218                            handle->cfg,
1219                            tc->sched, STOP_SERVICE_TIMEOUT, NULL, NULL);
1220 }
1221
1222
1223 /**
1224  * Disconnect from the transport service.
1225  */
1226 void
1227 GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle)
1228 {
1229   struct GNUNET_TRANSPORT_TransmitHandle *th;
1230   struct NeighbourList *n;
1231   struct HelloWaitList *hwl;
1232   struct GNUNET_CLIENT_Connection *client;
1233
1234 #if DEBUG_TRANSPORT
1235   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transport disconnect called!\n");
1236 #endif
1237   while (NULL != (th = handle->connect_ready_head))
1238     {
1239       handle->connect_ready_head = th->next;
1240       th->notify (th->notify_cls, 0, NULL);
1241       GNUNET_free (th);
1242     }
1243
1244   while (NULL != (th = handle->connect_wait_head))
1245     {
1246       handle->connect_wait_head = th->next;
1247       if (th->notify_delay_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
1248         {
1249           GNUNET_SCHEDULER_cancel (handle->sched, th->notify_delay_task);
1250           th->notify_delay_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1251         }
1252       th->notify (th->notify_cls, 0, NULL);
1253       GNUNET_free (th);
1254     }
1255   while (NULL != (n = handle->neighbours))
1256     {
1257       handle->neighbours = n->next;
1258       GNUNET_free (n);
1259     }
1260   while (NULL != (hwl = handle->hwl_head))
1261     {
1262       handle->hwl_head = hwl->next;
1263       GNUNET_SCHEDULER_cancel (handle->sched, hwl->task);
1264       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1265                   _("Disconnect while trying to obtain `%s' from transport service.\n"),
1266                   "HELLO");
1267       if (hwl->rec != NULL)
1268         hwl->rec (hwl->rec_cls, GNUNET_TIME_UNIT_ZERO, NULL, NULL);
1269       GNUNET_free (hwl);
1270     }
1271   if (handle->reconnect_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
1272     {
1273       GNUNET_SCHEDULER_cancel (handle->sched, handle->reconnect_task);
1274       handle->reconnect_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1275     }
1276   GNUNET_free_non_null (handle->my_hello);
1277   handle->my_hello = NULL;
1278   GNUNET_SCHEDULER_run (&stop_task, handle);
1279   if (NULL != (client = handle->client))
1280     {
1281 #if DEBUG_TRANSPORT
1282       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1283                   "Disconnecting from transport service for good.\n");
1284 #endif
1285       handle->client = NULL;
1286       GNUNET_CLIENT_disconnect (client);
1287     }
1288   if (client == NULL)
1289     GNUNET_free (handle);
1290 }
1291
1292
1293 /**
1294  * We're ready to transmit the request that the transport service
1295  * should connect to a new peer.  In addition to sending the
1296  * request, schedule the next phase for the transmission processing
1297  * that caused the connect request in the first place.
1298  */
1299 static size_t
1300 request_connect (void *cls, size_t size, void *buf)
1301 {
1302   struct GNUNET_TRANSPORT_TransmitHandle *th = cls;
1303   struct TryConnectMessage *tcm;
1304   struct GNUNET_TRANSPORT_Handle *h;
1305
1306   h = th->handle;
1307   if (buf == NULL)
1308     {
1309 #if DEBUG_TRANSPORT
1310       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1311                   "Failed to transmit `%s' request for `%4s' to service.\n",
1312                   "TRY_CONNECT",
1313                   GNUNET_i2s(&th->target));
1314 #endif
1315       th->notify (th->notify_cls, 0, NULL);
1316       GNUNET_free (th);
1317       return 0;
1318     }
1319 #if DEBUG_TRANSPORT
1320   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1321               "Transmitting `%s' message for `%4s'.\n",
1322               "TRY_CONNECT", GNUNET_i2s (&th->target));
1323 #endif
1324   GNUNET_assert (size >= sizeof (struct TryConnectMessage));
1325   tcm = buf;
1326   tcm->header.size = htons (sizeof (struct TryConnectMessage));
1327   tcm->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_TRY_CONNECT);
1328   tcm->reserved = htonl (0);
1329   memcpy (&tcm->peer, &th->target, sizeof (struct GNUNET_PeerIdentity));
1330   th->notify_delay_task
1331     = GNUNET_SCHEDULER_add_delayed (h->sched,
1332                                     GNUNET_NO,
1333                                     GNUNET_SCHEDULER_PRIORITY_KEEP,
1334                                     GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
1335                                     GNUNET_TIME_absolute_get_remaining
1336                                     (th->timeout), &transmit_timeout, th);
1337   insert_transmit_handle (&h->connect_wait_head, th);
1338   return sizeof (struct TryConnectMessage);
1339 }
1340
1341
1342 /**
1343  * Schedule a request to connect to the given
1344  * neighbour (and if successful, add the specified
1345  * handle to the wait list).
1346  */
1347 static void
1348 try_connect (struct GNUNET_TRANSPORT_TransmitHandle *th)
1349 {
1350   schedule_control_transmit (th->handle,
1351                              sizeof (struct TryConnectMessage),
1352                              GNUNET_NO,
1353                              GNUNET_TIME_absolute_get_remaining (th->timeout),
1354                              &request_connect, th);
1355 }
1356
1357
1358 /**
1359  * Cancel a pending notify transmit task
1360  * and also remove the given transmit handle
1361  * from whatever list is on.
1362  */
1363 static void
1364 remove_from_any_list (struct GNUNET_TRANSPORT_TransmitHandle *th)
1365 {
1366   struct GNUNET_TRANSPORT_Handle *h;
1367
1368   h = th->handle;
1369   if (th->notify_delay_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
1370     {
1371       GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task);
1372       th->notify_delay_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1373     }
1374   if (th->prev == NULL)
1375     {
1376       if (th == h->connect_wait_head)
1377         h->connect_wait_head = th->next;
1378       else
1379         h->connect_ready_head = th->next;
1380     }
1381   else
1382     th->prev->next = th->next;
1383   if (th->next != NULL)
1384     th->next->prev = th->prev;
1385 }
1386
1387
1388 /**
1389  * Remove neighbour from our list
1390  */
1391 static void
1392 remove_neighbour (struct GNUNET_TRANSPORT_Handle *h,
1393                   const struct GNUNET_PeerIdentity *peer)
1394 {
1395   struct NeighbourList *prev;
1396   struct NeighbourList *pos;
1397   struct GNUNET_TRANSPORT_TransmitHandle *th;
1398
1399   prev = NULL;
1400   pos = h->neighbours;
1401   while ((pos != NULL) &&
1402          (0 != memcmp (peer, &pos->id, sizeof (struct GNUNET_PeerIdentity))))
1403     {
1404       prev = pos;
1405       pos = pos->next;
1406     }
1407   if (pos == NULL)
1408     {
1409       GNUNET_break (0);
1410       return;
1411     }
1412   if (prev == NULL)
1413     h->neighbours = pos->next;
1414   else
1415     prev->next = pos->next;
1416   if (NULL != (th = pos->transmit_handle))
1417     {
1418       pos->transmit_handle = NULL;
1419       th->neighbour = NULL;
1420       remove_from_any_list (th);
1421       try_connect (th);
1422     }
1423   if (h->nc_cb != NULL)
1424     h->nd_cb (h->cls, peer);
1425   GNUNET_free (pos);
1426 }
1427
1428
1429 /**
1430  * Type of a function to call when we receive a message
1431  * from the service.
1432  *
1433  * @param cls closure
1434  * @param msg message received, NULL on timeout or fatal error
1435  */
1436 static void
1437 demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg)
1438 {
1439   struct GNUNET_TRANSPORT_Handle *h = cls;
1440   const struct DisconnectInfoMessage *dim;
1441   const struct ConnectInfoMessage *cim;
1442   const struct InboundMessage *im;
1443   const struct GNUNET_MessageHeader *imm;
1444   const struct SendOkMessage *okm;
1445   struct HelloWaitList *hwl;
1446   struct NeighbourList *n;
1447   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded pkey;
1448   struct GNUNET_PeerIdentity me;
1449   uint16_t size;
1450
1451   if ((msg == NULL) || (h->client == NULL))
1452     {
1453       if (h->client != NULL)
1454         {
1455 #if DEBUG_TRANSPORT
1456           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1457                       "Error receiving from transport service, disconnecting temporarily.\n");
1458 #endif
1459           if (h->network_handle != NULL)
1460             {
1461               GNUNET_NETWORK_notify_transmit_ready_cancel (h->network_handle);
1462               h->network_handle = NULL;
1463               h->transmission_scheduled = GNUNET_NO;
1464             }
1465           GNUNET_CLIENT_disconnect (h->client);
1466           h->client = NULL;
1467           schedule_reconnect (h);
1468         }
1469       else
1470         {
1471           /* shutdown initiated from 'GNUNET_TRANSPORT_disconnect',
1472              finish clean up work! */
1473           GNUNET_free (h);
1474         }
1475       return;
1476     }
1477   GNUNET_CLIENT_receive (h->client,
1478                          &demultiplexer, h, GNUNET_TIME_UNIT_FOREVER_REL);
1479   size = ntohs (msg->size);
1480   switch (ntohs (msg->type))
1481     {
1482     case GNUNET_MESSAGE_TYPE_HELLO:
1483       if (GNUNET_OK !=
1484           GNUNET_HELLO_get_key ((const struct GNUNET_HELLO_Message *) msg,
1485                                 &pkey))
1486         {
1487           GNUNET_break (0);
1488           break;
1489         }
1490       GNUNET_CRYPTO_hash (&pkey,
1491                           sizeof (struct
1492                                   GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
1493                           &me.hashPubKey);
1494 #if DEBUG_TRANSPORT
1495       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1496                   "Receiving (my own) `%s' message, I am `%4s'.\n",
1497                   "HELLO", GNUNET_i2s (&me));
1498 #endif
1499       GNUNET_free_non_null (h->my_hello);
1500       h->my_hello = NULL;
1501       if (size < sizeof (struct GNUNET_MessageHeader))
1502         {
1503           GNUNET_break (0);
1504           break;
1505         }
1506       h->my_hello = GNUNET_malloc (size);
1507       memcpy (h->my_hello, msg, size);
1508       while (NULL != (hwl = h->hwl_head))
1509         {
1510           h->hwl_head = hwl->next;
1511           GNUNET_SCHEDULER_cancel (h->sched, hwl->task);
1512           GNUNET_TRANSPORT_get_hello (h,
1513                                       GNUNET_TIME_UNIT_ZERO,
1514                                       hwl->rec, hwl->rec_cls);
1515           GNUNET_free (hwl);
1516         }
1517       break;
1518     case GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT:
1519       if (size != sizeof (struct ConnectInfoMessage))
1520         {
1521           GNUNET_break (0);
1522           break;
1523         }
1524       cim = (const struct ConnectInfoMessage *) msg;
1525 #if DEBUG_TRANSPORT
1526       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1527                   "Receiving `%s' message for `%4s'.\n",
1528                   "CONNECT", GNUNET_i2s (&cim->id));
1529 #endif
1530       add_neighbour (h,
1531                      ntohl (cim->quota_out),
1532                      GNUNET_TIME_relative_ntoh (cim->latency), &cim->id);
1533       break;
1534     case GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT:
1535       if (size != sizeof (struct DisconnectInfoMessage))
1536         {
1537           GNUNET_break (0);
1538           break;
1539         }
1540       dim = (const struct DisconnectInfoMessage *) msg;
1541 #if DEBUG_TRANSPORT
1542       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1543                   "Receiving `%s' message for `%4s'.\n",
1544                   "DISCONNECT", GNUNET_i2s (&dim->peer));
1545 #endif
1546       remove_neighbour (h, &dim->peer);
1547       break;
1548     case GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK:
1549       if (size != sizeof (struct SendOkMessage))
1550         {
1551           GNUNET_break (0);
1552           break;
1553         }
1554       okm = (const struct SendOkMessage *) msg;
1555 #if DEBUG_TRANSPORT
1556       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1557                   "Receiving `%s' message, transmission %s.\n", "SEND_OK",
1558                   ntohl(okm->success) == GNUNET_OK ? "succeeded" : "failed");
1559 #endif
1560       n = find_neighbour (h, &okm->peer);
1561       GNUNET_assert (n != NULL);
1562       n->transmit_ok = GNUNET_YES;
1563       if (n->transmit_handle != NULL)
1564         {
1565 #if DEBUG_TRANSPORT
1566           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1567                       "Processing pending message\n");
1568 #endif
1569           GNUNET_SCHEDULER_cancel (h->sched,
1570                                    n->transmit_handle->notify_delay_task);
1571           n->transmit_handle->notify_delay_task =
1572             GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1573           GNUNET_assert (GNUNET_YES == n->received_ack);
1574           schedule_request (n->transmit_handle);
1575         }
1576       break;
1577     case GNUNET_MESSAGE_TYPE_TRANSPORT_RECV:
1578 #if DEBUG_TRANSPORT
1579       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1580                   "Receiving `%s' message.\n", "RECV");
1581 #endif
1582       if (size <
1583           sizeof (struct InboundMessage) +
1584           sizeof (struct GNUNET_MessageHeader))
1585         {
1586           GNUNET_break (0);
1587           break;
1588         }
1589       im = (const struct InboundMessage *) msg;
1590       imm = (const struct GNUNET_MessageHeader *) &im[1];
1591       if (ntohs (imm->size) + sizeof (struct InboundMessage) != size)
1592         {
1593           GNUNET_break (0);
1594           break;
1595         }
1596       switch (ntohs (imm->type))
1597         {
1598         case GNUNET_MESSAGE_TYPE_TRANSPORT_ACK:
1599 #if DEBUG_TRANSPORT
1600           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1601                       "Receiving `%s' message from `%4s'.\n",
1602                       "ACK", GNUNET_i2s (&im->peer));
1603 #endif
1604           n = find_neighbour (h, &im->peer);
1605           if (n == NULL)
1606             {
1607               GNUNET_break (0);
1608               break;
1609             }
1610           if (n->received_ack == GNUNET_NO)
1611             {
1612               n->received_ack = GNUNET_YES;
1613               if (NULL != n->transmit_handle)
1614                 {
1615 #if DEBUG_TRANSPORT
1616                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1617                               "Peer connected, scheduling delayed message for deliverery now.\n");
1618 #endif
1619                   schedule_request (n->transmit_handle);
1620                 }
1621             }
1622           break;
1623         default:
1624 #if DEBUG_TRANSPORT
1625           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1626                       "Received message of type %u from `%4s'.\n",
1627                       ntohs (imm->type), GNUNET_i2s (&im->peer));
1628 #endif
1629           if (h->rec != NULL)
1630             h->rec (h->cls,
1631                     GNUNET_TIME_relative_ntoh (im->latency), &im->peer, imm);
1632           break;
1633         }
1634       break;
1635     default:
1636       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1637                   _
1638                   ("Received unexpected message of type %u in %s:%u\n"),
1639                   ntohs (msg->type), __FILE__, __LINE__);
1640       GNUNET_break (0);
1641       break;
1642     }
1643 }
1644
1645
1646 struct ClientTransmitWrapper
1647 {
1648   GNUNET_NETWORK_TransmitReadyNotify notify;
1649   void *notify_cls;
1650   struct GNUNET_TRANSPORT_TransmitHandle *th;
1651 };
1652
1653
1654 /**
1655  * Transmit message of a client destined for another
1656  * peer to the service.
1657  */
1658 static size_t
1659 client_notify_wrapper (void *cls, size_t size, void *buf)
1660 {
1661   struct ClientTransmitWrapper *ctw = cls;
1662   struct OutboundMessage *obm;
1663   struct GNUNET_MessageHeader *hdr;
1664   size_t ret;
1665
1666   if (size == 0)
1667     {
1668 #if DEBUG_TRANSPORT
1669       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1670                   "Transmission request could not be satisfied.\n");
1671 #endif
1672       ret = ctw->notify (ctw->notify_cls, 0, NULL);
1673       GNUNET_assert (ret == 0);
1674       GNUNET_free (ctw);
1675       return 0;
1676     }
1677   GNUNET_assert (size >= sizeof (struct OutboundMessage));
1678   obm = buf;
1679   ret = ctw->notify (ctw->notify_cls,
1680                      size - sizeof (struct OutboundMessage),
1681                      (void *) &obm[1]);
1682   if (ret == 0)
1683     {
1684       /* Need to reset flag, no SEND means no SEND_OK! */
1685       ctw->th->neighbour->transmit_ok = GNUNET_YES;
1686       GNUNET_free (ctw);
1687       return 0;
1688     }
1689   GNUNET_assert (ret >= sizeof (struct GNUNET_MessageHeader));
1690   hdr = (struct GNUNET_MessageHeader *) &obm[1];
1691   GNUNET_assert (ntohs (hdr->size) == ret);
1692   GNUNET_assert (ret + sizeof (struct OutboundMessage) <
1693                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
1694 #if DEBUG_TRANSPORT
1695   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1696               "Transmitting `%s' message with data for `%4s'\n",
1697               "SEND", GNUNET_i2s (&ctw->th->target));
1698 #endif
1699   ret += sizeof (struct OutboundMessage);
1700   obm->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND);
1701   obm->header.size = htons (ret);
1702   obm->reserved = htonl (0);
1703   obm->peer = ctw->th->target;
1704   GNUNET_free (ctw);
1705   return ret;
1706 }
1707
1708
1709
1710 /**
1711  * Check if we could queue a message of the given size for
1712  * transmission.  The transport service will take both its
1713  * internal buffers and bandwidth limits imposed by the
1714  * other peer into consideration when answering this query.
1715  *
1716  * @param handle connection to transport service
1717  * @param target who should receive the message
1718  * @param size how big is the message we want to transmit?
1719  * @param timeout after how long should we give up (and call
1720  *        notify with buf NULL and size 0)?
1721  * @param notify function to call when we are ready to
1722  *        send such a message
1723  * @param notify_cls closure for notify
1724  * @return NULL if someone else is already waiting to be notified
1725  *         non-NULL if the notify callback was queued (can be used to cancel
1726  *         using GNUNET_TRANSPORT_notify_transmit_ready_cancel)
1727  */
1728 struct GNUNET_TRANSPORT_TransmitHandle *
1729 GNUNET_TRANSPORT_notify_transmit_ready (struct GNUNET_TRANSPORT_Handle
1730                                         *handle,
1731                                         const struct GNUNET_PeerIdentity
1732                                         *target, size_t size,
1733                                         struct GNUNET_TIME_Relative timeout,
1734                                         GNUNET_NETWORK_TransmitReadyNotify
1735                                         notify, void *notify_cls)
1736 {
1737   struct GNUNET_TRANSPORT_TransmitHandle *pos;
1738   struct GNUNET_TRANSPORT_TransmitHandle *th;
1739   struct NeighbourList *n;
1740   struct ClientTransmitWrapper *ctw;
1741
1742   if (size + sizeof (struct OutboundMessage) >=
1743       GNUNET_SERVER_MAX_MESSAGE_SIZE)
1744     {
1745       GNUNET_break (0);
1746       return NULL;
1747     }
1748 #if DEBUG_TRANSPORT
1749   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1750               "Asking transport service for transmission of %u bytes to peer `%4s'.\n",
1751               size, GNUNET_i2s (target));
1752 #endif
1753   n = find_neighbour (handle, target);
1754   ctw = GNUNET_malloc (sizeof (struct ClientTransmitWrapper));
1755   th = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_TransmitHandle));
1756   ctw->notify = notify;
1757   ctw->notify_cls = notify_cls;
1758   ctw->th = th;
1759   th->handle = handle;
1760   th->target = *target;
1761   th->notify = &client_notify_wrapper;
1762   th->notify_cls = ctw;
1763   th->notify_size = size + sizeof (struct OutboundMessage);
1764   th->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1765   th->neighbour = n;
1766   if (NULL == n)
1767     {
1768       pos = handle->connect_wait_head;
1769       while (pos != NULL)
1770         {
1771           GNUNET_assert (0 != memcmp (target,
1772                                       &pos->target,
1773                                       sizeof (struct GNUNET_PeerIdentity)));
1774           pos = pos->next;
1775         }
1776 #if DEBUG_TRANSPORT
1777       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1778                   "Will now try to connect to `%4s'.\n", GNUNET_i2s (target));
1779 #endif
1780       try_connect (th);
1781       return th;
1782     }
1783
1784 #if DEBUG_TRANSPORT
1785   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1786               "Transmission request queued for transmission to transport service.\n");
1787 #endif
1788   GNUNET_assert (NULL == n->transmit_handle);
1789   n->transmit_handle = th;
1790   if (GNUNET_YES != n->received_ack)
1791     {
1792 #if DEBUG_TRANSPORT
1793       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1794                   "Connection to `%4s' is not yet confirmed connected, scheduling timeout (%llu ms) only.\n",
1795                   GNUNET_i2s (target), timeout.value);
1796 #endif
1797       th->notify_delay_task
1798         = GNUNET_SCHEDULER_add_delayed (handle->sched,
1799                                         GNUNET_NO,
1800                                         GNUNET_SCHEDULER_PRIORITY_KEEP,
1801                                         GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
1802                                         timeout, &transmit_timeout, th);
1803       return th;
1804     }
1805   
1806 #if DEBUG_TRANSPORT
1807   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1808               "Peer `%4s' is ready to receive, scheduling message for delivery now.\n",
1809               GNUNET_i2s (target));
1810 #endif
1811   schedule_request (th);
1812   return th;
1813 }
1814
1815
1816 /**
1817  * Cancel the specified transmission-ready
1818  * notification.
1819  */
1820 void
1821 GNUNET_TRANSPORT_notify_transmit_ready_cancel (struct
1822                                                GNUNET_TRANSPORT_TransmitHandle
1823                                                *th)
1824 {
1825   struct GNUNET_TRANSPORT_Handle *h;
1826
1827 #if DEBUG_TRANSPORT
1828   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1829               "Transmission request of %u bytes to `%4s' was cancelled.\n",
1830               th->notify_size - sizeof(struct OutboundMessage),
1831               GNUNET_i2s (&th->target));
1832 #endif
1833   GNUNET_assert (th->notify == &client_notify_wrapper);
1834   remove_from_any_list (th);
1835   h = th->handle;
1836   if ((h->connect_ready_head == NULL) && (h->network_handle != NULL))
1837     {
1838       GNUNET_NETWORK_notify_transmit_ready_cancel (h->network_handle);
1839       h->network_handle = NULL;
1840       h->transmission_scheduled = GNUNET_NO;
1841     }
1842   GNUNET_free (th->notify_cls);
1843   GNUNET_free (th);
1844 }
1845
1846
1847 /* end of transport_api.c */