debugging
[oweals/gnunet.git] / src / transport / transport_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/transport_api.c
23  * @brief library to access the low-level P2P IO service
24  * @author Christian Grothoff
25  *
26  * TODO:
27  * - set_quota with low bandwidth should cause peer
28  *   disconnects (currently never does that) (MINOR)
29  */
30 #include "platform.h"
31 #include "gnunet_client_lib.h"
32 #include "gnunet_arm_service.h"
33 #include "gnunet_hello_lib.h"
34 #include "gnunet_protocols.h"
35 #include "gnunet_server_lib.h"
36 #include "gnunet_time_lib.h"
37 #include "gnunet_transport_service.h"
38 #include "transport.h"
39
40 /**
41  * After how long do we give up on transmitting a HELLO
42  * to the service?
43  */
44 #define OFFER_HELLO_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
45
46 /**
47  * After how long do we give automatically retry an unsuccessful
48  * CONNECT request?
49  */
50 #define CONNECT_RETRY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 750)
51
52 /**
53  * How long should ARM wait when starting up the
54  * transport service before reporting back?
55  */
56 #define START_SERVICE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
57
58 /**
59  * How long should ARM wait when stopping the
60  * transport service before reporting back?
61  */
62 #define STOP_SERVICE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
63
64 /**
65  * Entry in linked list of all of our current neighbours.
66  */
67 struct NeighbourList
68 {
69
70   /**
71    * This is a linked list.
72    */
73   struct NeighbourList *next;
74
75   /**
76    * Active transmit handle, can be NULL.  Used to move
77    * from ready to wait list on disconnect and to block
78    * two transmissions to the same peer from being scheduled
79    * at the same time.
80    */
81   struct GNUNET_TRANSPORT_TransmitHandle *transmit_handle;
82
83
84   /**
85    * Identity of this neighbour.
86    */
87   struct GNUNET_PeerIdentity id;
88
89   /**
90    * At what time did we reset last_sent last?
91    */
92   struct GNUNET_TIME_Absolute last_quota_update;
93
94   /**
95    * How many bytes have we sent since the "last_quota_update"
96    * timestamp?
97    */
98   uint64_t last_sent;
99
100   /**
101    * Quota for outbound traffic to the neighbour in bytes/ms.
102    */
103   uint32_t quota_out;
104
105   /**
106    * Set to GNUNET_YES if we are currently allowed to
107    * transmit a message to the transport service for this
108    * peer, GNUNET_NO otherwise.
109    */
110   int transmit_ok;
111
112   /**
113    * Set to GNUNET_YES if we have received an ACK for the
114    * given peer.  Peers that receive our HELLO always respond
115    * with an ACK to let us know that we are successfully
116    * communicating.  Note that a PING can not be used for this
117    * since PINGs are only send if a HELLO address requires
118    * confirmation (and also, PINGs are not passed to the
119    * transport API itself).
120    */
121   int received_ack;
122
123 };
124
125
126 /**
127  * Linked list of requests from clients for our HELLO
128  * that were deferred.
129  */
130 struct HelloWaitList
131 {
132
133   /**
134    * This is a linked list.
135    */
136   struct HelloWaitList *next;
137
138   /**
139    * Reference back to our transport handle.
140    */
141   struct GNUNET_TRANSPORT_Handle *handle;
142
143   /**
144    * Callback to call once we got our HELLO.
145    */
146   GNUNET_TRANSPORT_ReceiveCallback rec;
147
148   /**
149    * Closure for rec.
150    */
151   void *rec_cls;
152
153   /**
154    * When to time out (call rec with NULL).
155    */
156   struct GNUNET_TIME_Absolute timeout;
157
158   /**
159    * Timeout task (used to trigger timeout,
160    * cancel if we get the HELLO in time).
161    */
162   GNUNET_SCHEDULER_TaskIdentifier task;
163
164
165 };
166
167
168 /**
169  * Opaque handle for a transmission-ready request.
170  */
171 struct GNUNET_TRANSPORT_TransmitHandle
172 {
173
174   /**
175    * We keep the transmit handles that are waiting for
176    * a transport-level connection in a doubly linked list.
177    */
178   struct GNUNET_TRANSPORT_TransmitHandle *next;
179
180   /**
181    * We keep the transmit handles that are waiting for
182    * a transport-level connection in a doubly linked list.
183    */
184   struct GNUNET_TRANSPORT_TransmitHandle *prev;
185
186   /**
187    * Handle of the main transport data structure.
188    */
189   struct GNUNET_TRANSPORT_Handle *handle;
190
191   /**
192    * Neighbour for this handle, can be NULL if the service
193    * is not yet connected to the target.
194    */
195   struct NeighbourList *neighbour;
196
197   /**
198    * Which peer is this transmission going to be for?  All
199    * zeros if it is control-traffic to the service.
200    */
201   struct GNUNET_PeerIdentity target;
202
203   /**
204    * Function to call when notify_size bytes are available
205    * for transmission.
206    */
207   GNUNET_NETWORK_TransmitReadyNotify notify;
208
209   /**
210    * Closure for notify.
211    */
212   void *notify_cls;
213
214   /**
215    * transmit_ready task Id.  The task is used to introduce the
216    * artificial delay that may be required to maintain the bandwidth
217    * limits.  Later, this will be the ID of the "transmit_timeout"
218    * task which is used to signal a timeout if the transmission could
219    * not be done in a timely fashion.
220    */
221   GNUNET_SCHEDULER_TaskIdentifier notify_delay_task;
222
223   /**
224    * Timeout for this request.
225    */
226   struct GNUNET_TIME_Absolute timeout;
227
228   /**
229    * How many bytes is our notify callback waiting for?
230    */
231   size_t notify_size;
232
233   /**
234    * How important is this message?
235    */
236   unsigned int priority;
237
238 };
239
240
241 /**
242  * Handle for the transport service (includes all of the
243  * state for the transport service).
244  */
245 struct GNUNET_TRANSPORT_Handle
246 {
247
248   /**
249    * Closure for the callbacks.
250    */
251   void *cls;
252
253   /**
254    * Function to call for received data.
255    */
256   GNUNET_TRANSPORT_ReceiveCallback rec;
257
258   /**
259    * function to call on connect events
260    */
261   GNUNET_TRANSPORT_NotifyConnect nc_cb;
262
263   /**
264    * function to call on disconnect events
265    */
266   GNUNET_TRANSPORT_NotifyDisconnect nd_cb;
267
268   /**
269    * The current HELLO message for this peer.  Updated
270    * whenever transports change their addresses.
271    */
272   struct GNUNET_HELLO_Message *my_hello;
273
274   /**
275    * My client connection to the transport service.
276    */
277   struct GNUNET_CLIENT_Connection *client;
278
279   /**
280    * Handle to our registration with the client for notification.
281    */
282   struct GNUNET_NETWORK_TransmitHandle *network_handle;
283
284   /**
285    * Linked list of transmit handles that are waiting for the
286    * transport to connect to the respective peer.  When we
287    * receive notification that the transport connected to a
288    * peer, we go over this list and check if someone has already
289    * requested a transmission to the new peer; if so, we trigger
290    * the next step.
291    */
292   struct GNUNET_TRANSPORT_TransmitHandle *connect_wait_head;
293
294   /**
295    * Linked list of transmit handles that are waiting for the
296    * transport to be ready for transmission to the respective
297    * peer.  When we
298    * receive notification that the transport disconnected from
299    * a peer, we go over this list and move the entry back to
300    * the connect_wait list.
301    */
302   struct GNUNET_TRANSPORT_TransmitHandle *connect_ready_head;
303
304   /**
305    * Linked list of pending requests for our HELLO.
306    */
307   struct HelloWaitList *hwl_head;
308
309   /**
310    * My scheduler.
311    */
312   struct GNUNET_SCHEDULER_Handle *sched;
313
314   /**
315    * My configuration.
316    */
317   struct GNUNET_CONFIGURATION_Handle *cfg;
318
319   /**
320    * Linked list of the current neighbours of this peer.
321    */
322   struct NeighbourList *neighbours;
323
324   /**
325    * ID of the task trying to reconnect to the
326    * service.
327    */
328   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
329
330   /**
331    * Delay until we try to reconnect.
332    */
333   struct GNUNET_TIME_Relative reconnect_delay;
334
335   /**
336    * Do we currently have a transmission pending?
337    * (schedule transmission was called but has not
338    * yet succeeded)?
339    */
340   int transmission_scheduled;
341 };
342
343
344 static struct NeighbourList *
345 find_neighbour (struct GNUNET_TRANSPORT_Handle *h,
346                 const struct GNUNET_PeerIdentity *peer)
347 {
348   struct NeighbourList *pos;
349
350   pos = h->neighbours;
351   while ((pos != NULL) &&
352          (0 != memcmp (peer, &pos->id, sizeof (struct GNUNET_PeerIdentity))))
353     pos = pos->next;
354   return pos;
355 }
356
357
358 /**
359  * Schedule the task to send one message from the
360  * connect_ready list to the service.
361  */
362 static void schedule_transmission (struct GNUNET_TRANSPORT_Handle *h);
363
364
365 /**
366  * Transmit message to client...
367  */
368 static size_t
369 transport_notify_ready (void *cls, size_t size, void *buf)
370 {
371   struct GNUNET_TRANSPORT_Handle *h = cls;
372   struct GNUNET_TRANSPORT_TransmitHandle *th;
373   struct NeighbourList *n;
374   size_t ret;
375   char *cbuf;
376
377   h->network_handle = NULL;
378   h->transmission_scheduled = GNUNET_NO;
379   if (buf == NULL)
380     {
381 #if DEBUG_TRANSPORT
382       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
383                   "Could not transmit to transport service, cancelling pending requests\n");
384 #endif
385       th = h->connect_ready_head;
386       if (th->next != NULL)
387         th->next->prev = NULL;
388       h->connect_ready_head = th->next;
389       if (NULL != (n = th->neighbour))
390         {
391           GNUNET_assert (n->transmit_handle == th);
392           n->transmit_handle = NULL;
393         }
394       GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL));
395       GNUNET_free (th);
396       return 0;
397     } 
398 #if DEBUG_TRANSPORT
399   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
400               "Ready to transmit %u bytes to transport service\n", size);
401 #endif
402   cbuf = buf;
403   ret = 0;
404   h->network_handle = NULL;
405   h->transmission_scheduled = GNUNET_NO;
406   do
407     {
408       th = h->connect_ready_head;
409       if (th->notify_delay_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
410         {
411           /* remove existing time out task (only applies if
412              this is not the first iteration of the loop) */
413           GNUNET_SCHEDULER_cancel (h->sched,
414                                    th->notify_delay_task);
415           th->notify_delay_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
416         }
417       GNUNET_assert (th->notify_size <= size);
418       if (th->next != NULL)
419         th->next->prev = NULL;
420       h->connect_ready_head = th->next;
421       if (NULL != (n = th->neighbour))
422         {
423           GNUNET_assert (n->transmit_handle == th);
424           n->transmit_handle = NULL;
425         }
426       ret += th->notify (th->notify_cls, size, &cbuf[ret]);
427       GNUNET_free (th);
428       if (n != NULL)
429         n->last_sent += ret;
430       size -= ret;
431     }
432   while ((h->connect_ready_head != NULL) &&
433          (h->connect_ready_head->notify_size <= size));
434   if (h->connect_ready_head != NULL)
435     schedule_transmission (h);
436 #if DEBUG_TRANSPORT
437   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
438               "Transmitting %u bytes to transport service\n", ret);
439 #endif
440   return ret;
441 }
442
443
444 /**
445  * Schedule the task to send one message from the
446  * connect_ready list to the service.
447  */
448 static void
449 schedule_transmission (struct GNUNET_TRANSPORT_Handle *h)
450 {
451   struct GNUNET_TRANSPORT_TransmitHandle *th;
452
453   GNUNET_assert (NULL == h->network_handle);
454   if (h->client == NULL)
455     {
456       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
457                   "Could not yet schedule transmission: we are not yet connected to the transport service!\n");
458       return; /* not yet connected */
459     }
460   th = h->connect_ready_head;
461   if (th == NULL)
462     return; /* no request pending */
463   if (th->notify_delay_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
464     {
465       /* remove existing time out task, will be integrated
466          with transmit_ready notification! */
467       GNUNET_SCHEDULER_cancel (h->sched,
468                                th->notify_delay_task);
469       th->notify_delay_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
470     }
471   h->transmission_scheduled = GNUNET_YES;
472   h->network_handle = GNUNET_CLIENT_notify_transmit_ready (h->client,
473                                                            th->notify_size,
474                                                            GNUNET_TIME_absolute_get_remaining
475                                                            (th->timeout),
476                                                            &transport_notify_ready,
477                                                            h);
478   GNUNET_assert (NULL != h->network_handle);
479 }
480
481
482 /**
483  * Insert the given transmit handle in the given sorted
484  * doubly linked list based on timeout.
485  *
486  * @param head pointer to the head of the linked list
487  * @param th element to insert into the list
488  */
489 static void
490 insert_transmit_handle (struct GNUNET_TRANSPORT_TransmitHandle **head,
491                         struct GNUNET_TRANSPORT_TransmitHandle *th)
492 {
493   struct GNUNET_TRANSPORT_TransmitHandle *pos;
494   struct GNUNET_TRANSPORT_TransmitHandle *prev;
495
496   pos = *head;
497   prev = NULL;
498   while ((pos != NULL) && (pos->timeout.value < th->timeout.value))
499     {
500       prev = pos;
501       pos = pos->next;
502     }
503   if (prev == NULL)
504     {
505       th->next = *head;
506       if (th->next != NULL)
507         th->next->prev = th;
508       *head = th;
509     }
510   else
511     {
512       th->next = pos;
513       th->prev = prev;
514       prev->next = th;
515       if (pos != NULL)
516         pos->prev = th;
517     }
518 }
519
520
521 /**
522  * Cancel a pending notify delay task (if pending) and also remove the
523  * given transmit handle from whatever list is on.
524  *
525  * @param th handle for the transmission request to manipulate
526  */
527 static void
528 remove_from_any_list (struct GNUNET_TRANSPORT_TransmitHandle *th)
529 {
530   struct GNUNET_TRANSPORT_Handle *h;
531
532   h = th->handle;
533   if (th->notify_delay_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
534     {
535       GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task);
536       th->notify_delay_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
537     }
538   if (th->prev == NULL)
539     {
540       if (th == h->connect_wait_head)
541         h->connect_wait_head = th->next;
542       else
543         h->connect_ready_head = th->next;
544     }
545   else
546     {
547       th->prev->next = th->next;
548     }
549   if (th->next != NULL)
550     th->next->prev = th->prev;
551 }
552
553
554 /**
555  * Schedule a request to connect to the given
556  * neighbour (and if successful, add the specified
557  * handle to the wait list).
558  *
559  * @param th handle for a request to transmit once we
560  *        have connected
561  */
562 static void
563 try_connect (struct GNUNET_TRANSPORT_TransmitHandle *th);
564
565
566 /**
567  * Called when our transmit request timed out before any transport
568  * reported success connecting to the desired peer or before the
569  * transport was ready to receive.  Signal error and free
570  * TransmitHandle.
571  */
572 static void
573 transmit_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
574 {
575   struct GNUNET_TRANSPORT_TransmitHandle *th = cls;
576
577   th->notify_delay_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
578   if (th->neighbour != NULL)
579     th->neighbour->transmit_handle = NULL;
580 #if DEBUG_TRANSPORT
581   GNUNET_log (GNUNET_ERROR_TYPE_INFO, 
582               "Request for transmission to peer `%s' timed out.\n",
583               GNUNET_i2s(&th->target));
584 #endif
585   remove_from_any_list (th);
586   th->notify (th->notify_cls, 0, NULL);
587   GNUNET_free (th);
588 }
589
590
591
592
593 /**
594  * Queue control request for transmission to the transport
595  * service.
596  *
597  * @param size number of bytes to be transmitted
598  * @param at_head request must be added to the head of the queue
599  *        (otherwise request will be appended)
600  * @param timeout how long this transmission can wait (at most)
601  * @param notify function to call to get the content
602  * @param notify_cls closure for notify
603  */
604 static void
605 schedule_control_transmit (struct GNUNET_TRANSPORT_Handle *h,
606                            size_t size,
607                            int at_head,
608                            struct GNUNET_TIME_Relative timeout,
609                            GNUNET_NETWORK_TransmitReadyNotify notify,
610                            void *notify_cls)
611 {
612   struct GNUNET_TRANSPORT_TransmitHandle *th;
613
614   th = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_TransmitHandle));
615   th->handle = h;
616   th->notify = notify;
617   th->notify_cls = notify_cls;
618   th->timeout = GNUNET_TIME_relative_to_absolute (timeout);
619   th->notify_size = size;
620   th->notify_delay_task 
621     = GNUNET_SCHEDULER_add_delayed (h->sched,
622                                     GNUNET_NO,
623                                     GNUNET_SCHEDULER_PRIORITY_KEEP,
624                                     GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
625                                     timeout,
626                                     &transmit_timeout, th);    
627   if (at_head)
628     {
629       th->next = h->connect_ready_head;
630       h->connect_ready_head = th;
631       if (th->next != NULL)
632         th->next->prev = th;
633     }
634   else
635     {
636       insert_transmit_handle (&h->connect_ready_head, th);
637     }
638   if (GNUNET_NO == h->transmission_scheduled)
639     schedule_transmission (h);
640 }
641
642
643 /**
644  * Update the quota values for the given neighbour now.
645  */
646 static void
647 update_quota (struct NeighbourList *n)
648 {
649   struct GNUNET_TIME_Relative delta;
650   uint64_t allowed;
651   uint64_t remaining;
652
653   delta = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
654   allowed = delta.value * n->quota_out;
655   if (n->last_sent < allowed)
656     {
657       remaining = allowed - n->last_sent;
658       if (n->quota_out > 0)
659         remaining /= n->quota_out;
660       else
661         remaining = 0;
662       if (remaining > MAX_BANDWIDTH_CARRY)
663         remaining = MAX_BANDWIDTH_CARRY;
664       n->last_sent = 0;
665       n->last_quota_update = GNUNET_TIME_absolute_get ();
666       n->last_quota_update.value -= remaining;
667     }
668   else
669     {
670       n->last_sent -= allowed;
671       n->last_quota_update = GNUNET_TIME_absolute_get ();
672     }
673 }
674
675
676 struct SetQuotaContext
677 {
678   struct GNUNET_TRANSPORT_Handle *handle;
679
680   struct GNUNET_PeerIdentity target;
681
682   GNUNET_SCHEDULER_Task cont;
683
684   void *cont_cls;
685
686   struct GNUNET_TIME_Absolute timeout;
687
688   uint32_t quota_in;
689 };
690
691
692 static size_t
693 send_set_quota (void *cls, size_t size, void *buf)
694 {
695   struct SetQuotaContext *sqc = cls;
696   struct QuotaSetMessage *msg;
697
698   if (buf == NULL)
699     {
700       GNUNET_SCHEDULER_add_continuation (sqc->handle->sched,
701                                          GNUNET_NO,
702                                          sqc->cont,
703                                          sqc->cont_cls,
704                                          GNUNET_SCHEDULER_REASON_TIMEOUT);
705       GNUNET_free (sqc);
706       return 0;
707     }
708 #if DEBUG_TRANSPORT
709   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
710               "Transmitting `%s' request with respect to `%4s'.\n",
711               "SET_QUOTA", GNUNET_i2s (&sqc->target));
712 #endif
713   GNUNET_assert (size >= sizeof (struct QuotaSetMessage));
714   msg = buf;
715   msg->header.size = htons (sizeof (struct QuotaSetMessage));
716   msg->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
717   msg->quota_in = htonl (sqc->quota_in);
718   memcpy (&msg->peer, &sqc->target, sizeof (struct GNUNET_PeerIdentity));
719   if (sqc->cont != NULL)
720     GNUNET_SCHEDULER_add_continuation (sqc->handle->sched,
721                                        GNUNET_NO,
722                                        sqc->cont,
723                                        sqc->cont_cls,
724                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
725   GNUNET_free (sqc);
726   return sizeof (struct QuotaSetMessage);
727 }
728
729
730 /**
731  * Set the share of incoming bandwidth for the given
732  * peer to the specified amount.
733  *
734  * @param handle connection to transport service
735  * @param target who's bandwidth quota is being changed
736  * @param quota_in incoming bandwidth quota in bytes per ms; 0 can
737  *        be used to force all traffic to be discarded
738  * @param quota_out outgoing bandwidth quota in bytes per ms; 0 can
739  *        be used to force all traffic to be discarded
740  * @param timeout how long to wait until signaling failure if
741  *        we can not communicate the quota change
742  * @param cont continuation to call when done, will be called
743  *        either with reason "TIMEOUT" or with reason "PREREQ_DONE"
744  * @param cont_cls closure for continuation
745  */
746 void
747 GNUNET_TRANSPORT_set_quota (struct GNUNET_TRANSPORT_Handle *handle,
748                             const struct GNUNET_PeerIdentity *target,
749                             uint32_t quota_in,
750                             uint32_t quota_out,
751                             struct GNUNET_TIME_Relative timeout,
752                             GNUNET_SCHEDULER_Task cont, void *cont_cls)
753 {
754   struct NeighbourList *n;
755   struct SetQuotaContext *sqc;
756
757   n = find_neighbour (handle, target);
758   if (n != NULL)
759     {
760       update_quota (n);
761       if (n->quota_out < quota_out)
762         n->last_quota_update = GNUNET_TIME_absolute_get ();
763       n->quota_out = quota_out;
764     }
765   sqc = GNUNET_malloc (sizeof (struct SetQuotaContext));
766   sqc->handle = handle;
767   sqc->target = *target;
768   sqc->cont = cont;
769   sqc->cont_cls = cont_cls;
770   sqc->timeout = GNUNET_TIME_relative_to_absolute (timeout);
771   sqc->quota_in = quota_in;
772   schedule_control_transmit (handle,
773                              sizeof (struct QuotaSetMessage),
774                              GNUNET_NO, timeout, &send_set_quota, sqc);
775 }
776
777
778 /**
779  * A "get_hello" request has timed out.  Signal the client
780  * and clean up.
781  */
782 static void
783 hello_wait_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
784 {
785   struct HelloWaitList *hwl = cls;
786   struct HelloWaitList *pos;
787   struct HelloWaitList *prev;
788
789   prev = NULL;
790   pos = hwl->handle->hwl_head;
791   while (pos != hwl)
792     {
793       GNUNET_assert (pos != NULL);
794       prev = pos;
795       pos = pos->next;
796     }
797   if (prev == NULL)
798     hwl->handle->hwl_head = hwl->next;
799   else
800     prev->next = hwl->next;
801   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
802               _("Timeout trying to obtain `%s' from transport service.\n"),
803               "HELLO");
804   /* signal timeout */
805   if (hwl->rec != NULL)
806     hwl->rec (hwl->rec_cls, GNUNET_TIME_UNIT_ZERO, NULL, NULL);
807   GNUNET_free (hwl);
808 }
809
810
811 /**
812  * Obtain the HELLO message for this peer.
813  *
814  * @param handle connection to transport service
815  * @param timeout how long to wait for the HELLO
816  * @param rec function to call with the HELLO, sender will be our peer
817  *            identity; message and sender will be NULL on timeout
818  *            (handshake with transport service pending/failed).
819  *             cost estimate will be 0.
820  * @param rec_cls closure for rec
821  */
822 void
823 GNUNET_TRANSPORT_get_hello (struct GNUNET_TRANSPORT_Handle *handle,
824                             struct GNUNET_TIME_Relative timeout,
825                             GNUNET_TRANSPORT_ReceiveCallback rec,
826                             void *rec_cls)
827 {
828   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded pk;
829   struct GNUNET_PeerIdentity me;
830   struct HelloWaitList *hwl;
831
832   if (handle->my_hello == NULL)
833     {
834       hwl = GNUNET_malloc (sizeof (struct HelloWaitList));
835       hwl->next = handle->hwl_head;
836       handle->hwl_head = hwl;
837       hwl->handle = handle;
838       hwl->rec = rec;
839       hwl->rec_cls = rec_cls;
840       hwl->timeout = GNUNET_TIME_relative_to_absolute (timeout);
841       hwl->task = GNUNET_SCHEDULER_add_delayed (handle->sched,
842                                                 GNUNET_YES,
843                                                 GNUNET_SCHEDULER_PRIORITY_KEEP,
844                                                 GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
845                                                 timeout,
846                                                 &hello_wait_timeout, hwl);
847       return;
848     }
849   GNUNET_assert (GNUNET_OK == GNUNET_HELLO_get_key (handle->my_hello, &pk));
850   GNUNET_CRYPTO_hash (&pk,
851                       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
852                       &me.hashPubKey);
853
854   rec (rec_cls,
855        GNUNET_TIME_UNIT_ZERO,
856        &me, (const struct GNUNET_MessageHeader *) handle->my_hello);
857 }
858
859
860 static size_t
861 send_hello (void *cls, size_t size, void *buf)
862 {
863   struct GNUNET_MessageHeader *hello = cls;
864   uint16_t msize;
865
866   if (buf == NULL)
867     {
868 #if DEBUG_TRANSPORT
869       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
870                   "Timeout while trying to transmit `%s' request.\n",
871                   "HELLO");
872 #endif
873       GNUNET_free (hello);
874       return 0;
875     }
876 #if DEBUG_TRANSPORT
877   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
878               "Transmitting `%s' request.\n", "HELLO");
879 #endif
880   msize = ntohs (hello->size);
881   GNUNET_assert (size >= msize);
882   memcpy (buf, hello, msize);
883   GNUNET_free (hello);
884   return msize;
885 }
886
887
888 /**
889  * Offer the transport service the HELLO of another peer.  Note that
890  * the transport service may just ignore this message if the HELLO is
891  * malformed or useless due to our local configuration.
892  *
893  * @param handle connection to transport service
894  * @param hello the hello message
895  */
896 void
897 GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle,
898                               const struct GNUNET_MessageHeader *hello)
899 {
900   struct GNUNET_MessageHeader *hc;
901   uint16_t size;
902
903   if (handle->client == NULL)
904     {
905 #if DEBUG_TRANSPORT
906       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
907                   "Not connected to transport service, dropping offered HELLO\n");
908 #endif
909       return;
910     }
911   GNUNET_break (ntohs (hello->type) == GNUNET_MESSAGE_TYPE_HELLO);
912   size = ntohs (hello->size);
913   GNUNET_break (size >= sizeof (struct GNUNET_MessageHeader));
914   hc = GNUNET_malloc (size);
915   memcpy (hc, hello, size);
916   schedule_control_transmit (handle,
917                              size,
918                              GNUNET_NO, OFFER_HELLO_TIMEOUT, &send_hello, hc);
919 }
920
921
922 /**
923  * Function we use for handling incoming messages.
924  */
925 static void demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg);
926
927
928 static size_t
929 send_start (void *cls, size_t size, void *buf)
930 {
931   struct GNUNET_MessageHeader *s = buf;
932
933   if (buf == NULL)
934     {
935 #if DEBUG_TRANSPORT
936       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
937                   "Timeout while trying to transmit `%s' request.\n",
938                   "START");
939 #endif
940       return 0;
941     }
942 #if DEBUG_TRANSPORT
943   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
944               "Transmitting `%s' request.\n", "START");
945 #endif
946   GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader));
947   s->size = htons (sizeof (struct GNUNET_MessageHeader));
948   s->type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_START);
949   return sizeof (struct GNUNET_MessageHeader);
950 }
951
952
953 /**
954  * We're ready to transmit the request that the transport service
955  * should connect to a new peer.  In addition to sending the
956  * request, schedule the next phase for the transmission processing
957  * that caused the connect request in the first place.
958  */
959 static size_t
960 request_connect (void *cls, size_t size, void *buf)
961 {
962   struct GNUNET_TRANSPORT_TransmitHandle *th = cls;
963   struct TryConnectMessage *tcm;
964   struct GNUNET_TRANSPORT_Handle *h;
965
966   GNUNET_assert (th->notify_delay_task == GNUNET_SCHEDULER_NO_PREREQUISITE_TASK);
967   h = th->handle;
968   if (buf == NULL)
969     {
970 #if DEBUG_TRANSPORT
971       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
972                   "Failed to transmit `%s' request for `%4s' to service.\n",
973                   "TRY_CONNECT",
974                   GNUNET_i2s(&th->target));
975 #endif
976       th->notify (th->notify_cls, 0, NULL);
977       GNUNET_free (th);
978       return 0;
979     }
980 #if DEBUG_TRANSPORT
981   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
982               "Transmitting `%s' message for `%4s'.\n",
983               "TRY_CONNECT", GNUNET_i2s (&th->target));
984 #endif
985   GNUNET_assert (size >= sizeof (struct TryConnectMessage));
986   tcm = buf;
987   tcm->header.size = htons (sizeof (struct TryConnectMessage));
988   tcm->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_TRY_CONNECT);
989   tcm->reserved = htonl (0);
990   memcpy (&tcm->peer, &th->target, sizeof (struct GNUNET_PeerIdentity));
991   th->notify_delay_task
992     = GNUNET_SCHEDULER_add_delayed (h->sched,
993                                     GNUNET_NO,
994                                     GNUNET_SCHEDULER_PRIORITY_KEEP,
995                                     GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
996                                     GNUNET_TIME_absolute_get_remaining
997                                     (th->timeout),
998                                     &transmit_timeout, th);
999   insert_transmit_handle (&h->connect_wait_head, th);
1000   return sizeof (struct TryConnectMessage);
1001 }
1002
1003
1004 /**
1005  * Schedule a request to connect to the given
1006  * neighbour (and if successful, add the specified
1007  * handle to the wait list).
1008  *
1009  * @param th handle for a request to transmit once we
1010  *        have connected
1011  */
1012 static void
1013 try_connect (struct GNUNET_TRANSPORT_TransmitHandle *th)
1014 {
1015   GNUNET_assert (th->notify_delay_task == GNUNET_SCHEDULER_NO_PREREQUISITE_TASK);  
1016   schedule_control_transmit (th->handle,
1017                              sizeof (struct TryConnectMessage),
1018                              GNUNET_NO,
1019                              GNUNET_TIME_absolute_get_remaining (th->timeout),
1020                              &request_connect, th);
1021 }
1022
1023
1024 /**
1025  * Task for delayed attempts to reconnect to a peer.
1026  *
1027  * @param cls must be a transmit handle that determines the peer
1028  *        to which we will try to connect
1029  * @param tc scheduler information about why we were triggered (not used)
1030  */
1031 static void
1032 try_connect_task (void *cls,
1033                   const struct GNUNET_SCHEDULER_TaskContext *tc)
1034 {
1035   struct GNUNET_TRANSPORT_TransmitHandle *th = cls;  
1036
1037   th->notify_delay_task
1038     = GNUNET_SCHEDULER_add_delayed (th->handle->sched,
1039                                     GNUNET_NO,
1040                                     GNUNET_SCHEDULER_PRIORITY_KEEP,
1041                                     GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
1042                                     GNUNET_TIME_absolute_get_remaining
1043                                     (th->timeout), &transmit_timeout, th);
1044   try_connect (th);
1045 }
1046
1047
1048 /**
1049  * Remove neighbour from our list.  Will automatically
1050  * trigger a re-connect attempt if we have messages pending
1051  * for this peer.
1052  * 
1053  * @param h our state
1054  * @param peer the peer to remove
1055  */
1056 static void
1057 remove_neighbour (struct GNUNET_TRANSPORT_Handle *h,
1058                   const struct GNUNET_PeerIdentity *peer)
1059 {
1060   struct NeighbourList *prev;
1061   struct NeighbourList *pos;
1062   struct GNUNET_TRANSPORT_TransmitHandle *th;
1063
1064   prev = NULL;
1065   pos = h->neighbours;
1066   while ((pos != NULL) &&
1067          (0 != memcmp (peer, 
1068                        &pos->id, 
1069                        sizeof (struct GNUNET_PeerIdentity))))
1070     {
1071       prev = pos;
1072       pos = pos->next;
1073     }
1074   if (pos == NULL)
1075     {
1076       GNUNET_break (0);
1077       return;
1078     }
1079   if (prev == NULL)
1080     h->neighbours = pos->next;
1081   else
1082     prev->next = pos->next;
1083   if (NULL != (th = pos->transmit_handle))
1084     {
1085       pos->transmit_handle = NULL;
1086       th->neighbour = NULL;
1087       remove_from_any_list (th);
1088       if (GNUNET_TIME_absolute_get_remaining (th->timeout).value > CONNECT_RETRY_TIMEOUT.value)
1089         {
1090           /* signal error */
1091           GNUNET_SCHEDULER_cancel (h->sched,
1092                                    th->notify_delay_task);
1093           transmit_timeout (th, NULL);    
1094         }
1095       else
1096         {
1097           /* try again in a bit */
1098           GNUNET_SCHEDULER_cancel (h->sched,
1099                                    th->notify_delay_task);
1100           th->notify_delay_task 
1101             = GNUNET_SCHEDULER_add_delayed (h->sched,
1102                                             GNUNET_NO,
1103                                             GNUNET_SCHEDULER_PRIORITY_KEEP,
1104                                             GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
1105                                             CONNECT_RETRY_TIMEOUT,
1106                                             &try_connect_task,
1107                                             th);
1108         }
1109     }
1110   if (h->nc_cb != NULL)
1111     h->nd_cb (h->cls, peer);
1112   GNUNET_free (pos);
1113 }
1114
1115
1116 /**
1117  * Try again to connect to transport service.
1118  */
1119 static void
1120 reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1121 {
1122   struct GNUNET_TRANSPORT_Handle *h = cls;
1123   struct GNUNET_TRANSPORT_TransmitHandle *pos;
1124   struct NeighbourList *n;
1125
1126   /* Forget about all neighbours that we used to be connected
1127      to */
1128   while (NULL != (n = h->neighbours))
1129     remove_neighbour (h, &n->id);
1130 #if DEBUG_TRANSPORT
1131   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Connecting to transport service.\n");
1132 #endif
1133   GNUNET_assert (h->client == NULL);
1134   h->reconnect_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1135   h->client = GNUNET_CLIENT_connect (h->sched, "transport", h->cfg);
1136   GNUNET_assert (h->client != NULL);
1137   /* make sure we don't send "START" twice,
1138      remove existing entry from queue (if present) */
1139   pos = h->connect_ready_head;
1140   while (pos != NULL)
1141     {
1142       if (pos->notify == &send_start)
1143         {
1144           if (pos->prev == NULL)
1145             h->connect_ready_head = pos->next;
1146           else
1147             pos->prev->next = pos->next;
1148           if (pos->next != NULL)
1149             pos->next->prev = pos->prev;
1150           GNUNET_assert (pos->neighbour == NULL);
1151           GNUNET_free (pos);
1152           break;
1153         }
1154       pos = pos->next;
1155     }
1156   schedule_control_transmit (h,
1157                              sizeof (struct GNUNET_MessageHeader),
1158                              GNUNET_YES,
1159                              GNUNET_TIME_UNIT_FOREVER_REL, &send_start, NULL);
1160   GNUNET_CLIENT_receive (h->client,
1161                          &demultiplexer, h, GNUNET_TIME_UNIT_FOREVER_REL);
1162 }
1163
1164
1165 /**
1166  * Function that will schedule the job that will try
1167  * to connect us again to the client.
1168  */
1169 static void
1170 schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h)
1171 {
1172 #if DEBUG_TRANSPORT
1173   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1174               "Scheduling task to reconnect to transport service in %llu ms.\n",
1175               h->reconnect_delay.value);
1176 #endif
1177   GNUNET_assert (h->client == NULL);
1178   GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_PREREQUISITE_TASK);
1179   h->reconnect_task
1180     = GNUNET_SCHEDULER_add_delayed (h->sched,
1181                                     GNUNET_NO,
1182                                     GNUNET_SCHEDULER_PRIORITY_DEFAULT,
1183                                     GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
1184                                     h->reconnect_delay, &reconnect, h);
1185   h->reconnect_delay = GNUNET_TIME_UNIT_SECONDS;
1186 }
1187
1188
1189 /**
1190  * We are connected to the respective peer, check the
1191  * bandwidth limits and schedule the transmission.
1192  */
1193 static void schedule_request (struct GNUNET_TRANSPORT_TransmitHandle *th);
1194
1195
1196 /**
1197  * Function called by the scheduler when the timeout
1198  * for bandwidth availablility for the target
1199  * neighbour is reached.
1200  */
1201 static void
1202 transmit_ready (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1203 {
1204   struct GNUNET_TRANSPORT_TransmitHandle *th = cls;
1205
1206   th->notify_delay_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1207   schedule_request (th);
1208 }
1209
1210
1211 /**
1212  * Remove the given transmit handle from the wait list.  Does NOT free
1213  * it.
1214  */
1215 static void
1216 remove_from_wait_list (struct GNUNET_TRANSPORT_TransmitHandle *th)
1217 {
1218   if (th->prev == NULL)
1219     th->handle->connect_wait_head = th->next;
1220   else
1221     th->prev->next = th->next;
1222   if (th->next != NULL)
1223     th->next->prev = th->prev;
1224 }
1225
1226
1227 /**
1228  * We are connected to the respective peer, check the
1229  * bandwidth limits and schedule the transmission.
1230  */
1231 static void
1232 schedule_request (struct GNUNET_TRANSPORT_TransmitHandle *th)
1233 {
1234   struct GNUNET_TRANSPORT_Handle *h;
1235   struct GNUNET_TIME_Relative duration;
1236   struct NeighbourList *n;
1237   uint64_t available;
1238
1239   h = th->handle;
1240   n = th->neighbour;
1241   if (th->notify_delay_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
1242     {
1243       GNUNET_SCHEDULER_cancel (h->sched, th->notify_delay_task);
1244       th->notify_delay_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1245     }
1246   /* check outgoing quota */
1247   duration = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
1248   if (duration.value > MIN_QUOTA_REFRESH_TIME)
1249     {
1250       update_quota (n);
1251       duration = GNUNET_TIME_absolute_get_duration (n->last_quota_update);
1252     }
1253   available = duration.value * n->quota_out;
1254   if (available < n->last_sent + th->notify_size)
1255     {
1256       /* calculate how much bandwidth we'd still need to
1257          accumulate and based on that how long we'll have
1258          to wait... */
1259       available = n->last_sent + th->notify_size - available;
1260       duration = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
1261                                                 available / n->quota_out);
1262       if (th->timeout.value <
1263           GNUNET_TIME_relative_to_absolute (duration).value)
1264         {
1265           /* signal timeout! */
1266 #if DEBUG_TRANSPORT
1267           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1268                       "Would need %llu ms before bandwidth is available for delivery to `%4s', that is too long.  Signaling timeout.\n",
1269                       duration.value,
1270                       GNUNET_i2s(&th->target));
1271 #endif
1272           remove_from_wait_list (th);
1273           th->notify (th->notify_cls, 0, NULL);
1274           GNUNET_free (th);
1275           return;
1276         }
1277 #if DEBUG_TRANSPORT
1278       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1279                   "Need more bandwidth, delaying delivery to `%4s' by %llu ms\n",
1280                   GNUNET_i2s(&th->target),
1281                   duration.value);
1282 #endif
1283       th->notify_delay_task
1284         = GNUNET_SCHEDULER_add_delayed (h->sched,
1285                                         GNUNET_NO,
1286                                         GNUNET_SCHEDULER_PRIORITY_KEEP,
1287                                         GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
1288                                         duration, &transmit_ready, th);
1289       return;
1290     }
1291 #if DEBUG_TRANSPORT
1292   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1293               "Bandwidth available for transmission to `%4s'\n",
1294               GNUNET_i2s (&n->id));
1295 #endif
1296   if (GNUNET_NO == n->transmit_ok)
1297     {
1298       /* we may be ready, but transport service is not;
1299          wait for SendOkMessage or timeout */
1300 #if DEBUG_TRANSPORT
1301       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1302                   "Need to wait for transport service `%s' message\n",
1303                   "SEND_OK");
1304 #endif
1305       th->notify_delay_task
1306         = GNUNET_SCHEDULER_add_delayed (h->sched,
1307                                         GNUNET_NO,
1308                                         GNUNET_SCHEDULER_PRIORITY_KEEP,
1309                                         GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
1310                                         GNUNET_TIME_absolute_get_remaining
1311                                         (th->timeout), &transmit_timeout, th);
1312       return;
1313     }
1314   n->transmit_ok = GNUNET_NO;
1315   remove_from_wait_list (th);
1316 #if DEBUG_TRANSPORT
1317   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Moving message for `%4s' to ready list\n",
1318               GNUNET_i2s(&n->id));
1319 #endif
1320   insert_transmit_handle (&h->connect_ready_head, th);
1321   if (GNUNET_NO == h->transmission_scheduled)
1322     schedule_transmission (h);
1323 }
1324
1325
1326 /**
1327  * Add neighbour to our list
1328  */
1329 static void
1330 add_neighbour (struct GNUNET_TRANSPORT_Handle *h,
1331                uint32_t quota_out,
1332                struct GNUNET_TIME_Relative latency,
1333                const struct GNUNET_PeerIdentity *pid)
1334 {
1335   struct NeighbourList *n;
1336   struct GNUNET_TRANSPORT_TransmitHandle *prev;
1337   struct GNUNET_TRANSPORT_TransmitHandle *pos;
1338   struct GNUNET_TRANSPORT_TransmitHandle *next;
1339
1340   /* check for duplicates */
1341   if (NULL != find_neighbour (h, pid))
1342     {
1343       GNUNET_break (0);
1344       return;
1345     }
1346 #if DEBUG_TRANSPORT
1347   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1348               "Creating entry for new neighbour `%4s'.\n", GNUNET_i2s (pid));
1349 #endif
1350   n = GNUNET_malloc (sizeof (struct NeighbourList));
1351   n->id = *pid;
1352   n->last_quota_update = GNUNET_TIME_absolute_get ();
1353   n->quota_out = quota_out;
1354   n->next = h->neighbours;
1355   n->transmit_ok = GNUNET_YES;
1356   h->neighbours = n;
1357   if (h->nc_cb != NULL)
1358     h->nc_cb (h->cls, &n->id, latency);
1359   prev = NULL;
1360   pos = h->connect_wait_head;
1361   while (pos != NULL)
1362     {
1363       next = pos->next;
1364       if (0 == memcmp (pid,
1365                        &pos->target, sizeof (struct GNUNET_PeerIdentity)))
1366         {
1367           pos->neighbour = n;
1368           GNUNET_assert (NULL == n->transmit_handle);
1369           n->transmit_handle = pos;
1370           if (prev == NULL)
1371             h->connect_wait_head = next;
1372           else
1373             prev->next = next;
1374           if (GNUNET_YES == n->received_ack)
1375             {
1376 #if DEBUG_TRANSPORT
1377               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1378                           "Found pending request for `%4s' will trigger it now.\n",
1379                           GNUNET_i2s (&pos->target));
1380 #endif
1381               if (pos->notify_delay_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
1382                 {
1383                   GNUNET_SCHEDULER_cancel (h->sched, pos->notify_delay_task);
1384                   pos->notify_delay_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1385                 }
1386               schedule_request (pos);
1387             }
1388           else
1389             {
1390 #if DEBUG_TRANSPORT
1391               GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1392                           "Found pending request for `%4s' but still need `%s' before proceeding.\n",
1393                           GNUNET_i2s (&pos->target),
1394                           "ACK");
1395 #endif
1396             }
1397           break;
1398         }
1399       prev = pos;
1400       pos = next;
1401     }
1402 }
1403
1404
1405 /**
1406  * Connect to the transport service.  Note that the connection may
1407  * complete (or fail) asynchronously.
1408  *
1409
1410  * @param sched scheduler to use
1411  * @param cfg configuration to use
1412  * @param cls closure for the callbacks
1413  * @param rec receive function to call
1414  * @param nc function to call on connect events
1415  * @param dc function to call on disconnect events
1416  */
1417 struct GNUNET_TRANSPORT_Handle *
1418 GNUNET_TRANSPORT_connect (struct GNUNET_SCHEDULER_Handle *sched,
1419                           struct GNUNET_CONFIGURATION_Handle *cfg,
1420                           void *cls,
1421                           GNUNET_TRANSPORT_ReceiveCallback rec,
1422                           GNUNET_TRANSPORT_NotifyConnect nc,
1423                           GNUNET_TRANSPORT_NotifyDisconnect nd)
1424 {
1425   struct GNUNET_TRANSPORT_Handle *ret;
1426
1427   GNUNET_ARM_start_service ("peerinfo",
1428                             cfg, sched, START_SERVICE_TIMEOUT, NULL, NULL);
1429   GNUNET_ARM_start_service ("transport",
1430                             cfg, sched, START_SERVICE_TIMEOUT, NULL, NULL);
1431   ret = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_Handle));
1432   ret->sched = sched;
1433   ret->cfg = cfg;
1434   ret->cls = cls;
1435   ret->rec = rec;
1436   ret->nc_cb = nc;
1437   ret->nd_cb = nd;
1438   ret->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
1439   schedule_reconnect (ret);
1440   return ret;
1441 }
1442
1443
1444 /**
1445  * These stop activities must be run in a fresh
1446  * scheduler that is NOT in shutdown mode.
1447  */
1448 static void
1449 stop_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1450 {
1451   struct GNUNET_TRANSPORT_Handle *handle = cls;
1452   GNUNET_ARM_stop_service ("transport",
1453                            handle->cfg,
1454                            tc->sched, STOP_SERVICE_TIMEOUT, NULL, NULL);
1455   GNUNET_ARM_stop_service ("peerinfo",
1456                            handle->cfg,
1457                            tc->sched, STOP_SERVICE_TIMEOUT, NULL, NULL);
1458 }
1459
1460
1461 /**
1462  * Disconnect from the transport service.
1463  */
1464 void
1465 GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle)
1466 {
1467   struct GNUNET_TRANSPORT_TransmitHandle *th;
1468   struct NeighbourList *n;
1469   struct HelloWaitList *hwl;
1470   struct GNUNET_CLIENT_Connection *client;
1471
1472 #if DEBUG_TRANSPORT
1473   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transport disconnect called!\n");
1474 #endif
1475   while (NULL != (th = handle->connect_ready_head))
1476     {
1477       handle->connect_ready_head = th->next;
1478       th->notify (th->notify_cls, 0, NULL);
1479       GNUNET_free (th);
1480     }
1481   while (NULL != (th = handle->connect_wait_head))
1482     {
1483       handle->connect_wait_head = th->next;
1484       if (th->notify_delay_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
1485         {
1486           GNUNET_SCHEDULER_cancel (handle->sched, th->notify_delay_task);
1487           th->notify_delay_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1488         }
1489       th->notify (th->notify_cls, 0, NULL);
1490       GNUNET_free (th);
1491     }
1492   while (NULL != (n = handle->neighbours))
1493     {
1494       handle->neighbours = n->next;
1495       GNUNET_free (n);
1496     }
1497   while (NULL != (hwl = handle->hwl_head))
1498     {
1499       handle->hwl_head = hwl->next;
1500       GNUNET_SCHEDULER_cancel (handle->sched, hwl->task);
1501       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1502                   _("Disconnect while trying to obtain `%s' from transport service.\n"),
1503                   "HELLO");
1504       if (hwl->rec != NULL)
1505         hwl->rec (hwl->rec_cls, GNUNET_TIME_UNIT_ZERO, NULL, NULL);
1506       GNUNET_free (hwl);
1507     }
1508   if (handle->reconnect_task != GNUNET_SCHEDULER_NO_PREREQUISITE_TASK)
1509     {
1510       GNUNET_SCHEDULER_cancel (handle->sched, handle->reconnect_task);
1511       handle->reconnect_task = GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1512     }
1513   GNUNET_free_non_null (handle->my_hello);
1514   handle->my_hello = NULL;
1515   GNUNET_SCHEDULER_run (&stop_task, handle);
1516   if (NULL != (client = handle->client))
1517     {
1518 #if DEBUG_TRANSPORT
1519       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1520                   "Disconnecting from transport service for good.\n");
1521 #endif
1522       handle->client = NULL;
1523       GNUNET_CLIENT_disconnect (client);
1524     }
1525   if (client == NULL)
1526     GNUNET_free (handle);
1527 }
1528
1529
1530 /**
1531  * Type of a function to call when we receive a message
1532  * from the service.
1533  *
1534  * @param cls closure
1535  * @param msg message received, NULL on timeout or fatal error
1536  */
1537 static void
1538 demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg)
1539 {
1540   struct GNUNET_TRANSPORT_Handle *h = cls;
1541   const struct DisconnectInfoMessage *dim;
1542   const struct ConnectInfoMessage *cim;
1543   const struct InboundMessage *im;
1544   const struct GNUNET_MessageHeader *imm;
1545   const struct SendOkMessage *okm;
1546   struct HelloWaitList *hwl;
1547   struct NeighbourList *n;
1548   struct GNUNET_PeerIdentity me;
1549   struct GNUNET_TRANSPORT_TransmitHandle *th;
1550   uint16_t size;
1551
1552   if ((msg == NULL) || (h->client == NULL))
1553     {
1554       if (h->client != NULL)
1555         {
1556 #if DEBUG_TRANSPORT
1557           GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1558                       "Error receiving from transport service, disconnecting temporarily.\n");
1559 #endif
1560           if (h->network_handle != NULL)
1561             {
1562               GNUNET_NETWORK_notify_transmit_ready_cancel (h->network_handle);
1563               h->network_handle = NULL;
1564               h->transmission_scheduled = GNUNET_NO;
1565               th = h->connect_ready_head;
1566               /* add timeout again, we cancelled the transmit_ready task! */
1567               GNUNET_assert (th->notify_delay_task == GNUNET_SCHEDULER_NO_PREREQUISITE_TASK);
1568               th->notify_delay_task 
1569                 = GNUNET_SCHEDULER_add_delayed (h->sched,
1570                                                 GNUNET_NO,
1571                                                 GNUNET_SCHEDULER_PRIORITY_KEEP,
1572                                                 GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
1573                                                 GNUNET_TIME_absolute_get_remaining(th->timeout),
1574                                                 &transmit_timeout, 
1575                                                 th);    
1576             }
1577           GNUNET_CLIENT_disconnect (h->client);
1578           h->client = NULL;
1579           schedule_reconnect (h);
1580         }
1581       else
1582         {
1583           /* shutdown initiated from 'GNUNET_TRANSPORT_disconnect',
1584              finish clean up work! */
1585           GNUNET_free (h);
1586         }
1587       return;
1588     }
1589   GNUNET_CLIENT_receive (h->client,
1590                          &demultiplexer, h, GNUNET_TIME_UNIT_FOREVER_REL);
1591   size = ntohs (msg->size);
1592   switch (ntohs (msg->type))
1593     {
1594     case GNUNET_MESSAGE_TYPE_HELLO:
1595       if (GNUNET_OK !=
1596           GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg,
1597                                &me))
1598         {
1599           GNUNET_break (0);
1600           break;
1601         }
1602 #if DEBUG_TRANSPORT
1603       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1604                   "Receiving (my own) `%s' message, I am `%4s'.\n",
1605                   "HELLO", GNUNET_i2s (&me));
1606 #endif
1607       GNUNET_free_non_null (h->my_hello);
1608       h->my_hello = NULL;
1609       if (size < sizeof (struct GNUNET_MessageHeader))
1610         {
1611           GNUNET_break (0);
1612           break;
1613         }
1614       h->my_hello = GNUNET_malloc (size);
1615       memcpy (h->my_hello, msg, size);
1616       while (NULL != (hwl = h->hwl_head))
1617         {
1618           h->hwl_head = hwl->next;
1619           GNUNET_SCHEDULER_cancel (h->sched, hwl->task);
1620           GNUNET_TRANSPORT_get_hello (h,
1621                                       GNUNET_TIME_UNIT_ZERO,
1622                                       hwl->rec, hwl->rec_cls);
1623           GNUNET_free (hwl);
1624         }
1625       break;
1626     case GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT:
1627       if (size != sizeof (struct ConnectInfoMessage))
1628         {
1629           GNUNET_break (0);
1630           break;
1631         }
1632       cim = (const struct ConnectInfoMessage *) msg;
1633 #if DEBUG_TRANSPORT
1634       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1635                   "Receiving `%s' message for `%4s'.\n",
1636                   "CONNECT", GNUNET_i2s (&cim->id));
1637 #endif
1638       add_neighbour (h,
1639                      ntohl (cim->quota_out),
1640                      GNUNET_TIME_relative_ntoh (cim->latency), &cim->id);
1641       break;
1642     case GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT:
1643       if (size != sizeof (struct DisconnectInfoMessage))
1644         {
1645           GNUNET_break (0);
1646           break;
1647         }
1648       dim = (const struct DisconnectInfoMessage *) msg;
1649 #if DEBUG_TRANSPORT
1650       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1651                   "Receiving `%s' message for `%4s'.\n",
1652                   "DISCONNECT", GNUNET_i2s (&dim->peer));
1653 #endif
1654       remove_neighbour (h, &dim->peer);
1655       break;
1656     case GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK:
1657       if (size != sizeof (struct SendOkMessage))
1658         {
1659           GNUNET_break (0);
1660           break;
1661         }
1662       okm = (const struct SendOkMessage *) msg;
1663 #if DEBUG_TRANSPORT
1664       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1665                   "Receiving `%s' message, transmission %s.\n", "SEND_OK",
1666                   ntohl(okm->success) == GNUNET_OK ? "succeeded" : "failed");
1667 #endif
1668       /* FIXME: need to check status code and change action accordingly,
1669          especially if the error was for CONNECT */
1670       n = find_neighbour (h, &okm->peer);
1671       GNUNET_assert (n != NULL);
1672       n->transmit_ok = GNUNET_YES;
1673       if (n->transmit_handle != NULL)
1674         {
1675 #if DEBUG_TRANSPORT
1676           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1677                       "Processing pending message\n");
1678 #endif
1679           GNUNET_SCHEDULER_cancel (h->sched,
1680                                    n->transmit_handle->notify_delay_task);
1681           n->transmit_handle->notify_delay_task =
1682             GNUNET_SCHEDULER_NO_PREREQUISITE_TASK;
1683           GNUNET_assert (GNUNET_YES == n->received_ack);
1684           schedule_request (n->transmit_handle);
1685         }
1686       break;
1687     case GNUNET_MESSAGE_TYPE_TRANSPORT_RECV:
1688 #if DEBUG_TRANSPORT
1689       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1690                   "Receiving `%s' message.\n", "RECV");
1691 #endif
1692       if (size <
1693           sizeof (struct InboundMessage) +
1694           sizeof (struct GNUNET_MessageHeader))
1695         {
1696           GNUNET_break (0);
1697           break;
1698         }
1699       im = (const struct InboundMessage *) msg;
1700       imm = (const struct GNUNET_MessageHeader *) &im[1];
1701       if (ntohs (imm->size) + sizeof (struct InboundMessage) != size)
1702         {
1703           GNUNET_break (0);
1704           break;
1705         }
1706       switch (ntohs (imm->type))
1707         {
1708         case GNUNET_MESSAGE_TYPE_TRANSPORT_ACK:
1709 #if DEBUG_TRANSPORT
1710           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1711                       "Receiving `%s' message from `%4s'.\n",
1712                       "ACK", GNUNET_i2s (&im->peer));
1713 #endif
1714           n = find_neighbour (h, &im->peer);
1715           if (n == NULL)
1716             {
1717               GNUNET_break (0);
1718               break;
1719             }
1720           if (n->received_ack == GNUNET_NO)
1721             {
1722               n->received_ack = GNUNET_YES;
1723               if (NULL != n->transmit_handle)
1724                 {
1725 #if DEBUG_TRANSPORT
1726                   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1727                               "Peer connected, scheduling delayed message for deliverery now.\n");
1728 #endif
1729                   schedule_request (n->transmit_handle);
1730                 }
1731             }
1732           break;
1733         default:
1734 #if DEBUG_TRANSPORT
1735           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1736                       "Received message of type %u from `%4s'.\n",
1737                       ntohs (imm->type), GNUNET_i2s (&im->peer));
1738 #endif
1739           if (h->rec != NULL)
1740             h->rec (h->cls,
1741                     GNUNET_TIME_relative_ntoh (im->latency), &im->peer, imm);
1742           break;
1743         }
1744       break;
1745     default:
1746       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1747                   _
1748                   ("Received unexpected message of type %u in %s:%u\n"),
1749                   ntohs (msg->type), __FILE__, __LINE__);
1750       GNUNET_break (0);
1751       break;
1752     }
1753 }
1754
1755
1756 struct ClientTransmitWrapper
1757 {
1758   GNUNET_NETWORK_TransmitReadyNotify notify;
1759   void *notify_cls;
1760   struct GNUNET_TRANSPORT_TransmitHandle *th;
1761 };
1762
1763
1764 /**
1765  * Transmit message of a client destined for another
1766  * peer to the service.
1767  */
1768 static size_t
1769 client_notify_wrapper (void *cls, size_t size, void *buf)
1770 {
1771   struct ClientTransmitWrapper *ctw = cls;
1772   struct OutboundMessage *obm;
1773   struct GNUNET_MessageHeader *hdr;
1774   size_t ret;
1775
1776   if (size == 0)
1777     {
1778 #if DEBUG_TRANSPORT
1779       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1780                   "Transmission request could not be satisfied.\n");
1781 #endif
1782       ret = ctw->notify (ctw->notify_cls, 0, NULL);
1783       GNUNET_assert (ret == 0);
1784       GNUNET_free (ctw);
1785       return 0;
1786     }
1787   GNUNET_assert (size >= sizeof (struct OutboundMessage));
1788   obm = buf;
1789   ret = ctw->notify (ctw->notify_cls,
1790                      size - sizeof (struct OutboundMessage),
1791                      (void *) &obm[1]);
1792   if (ret == 0)
1793     {
1794       /* Need to reset flag, no SEND means no SEND_OK! */
1795       ctw->th->neighbour->transmit_ok = GNUNET_YES;
1796       GNUNET_free (ctw);
1797       return 0;
1798     }
1799   GNUNET_assert (ret >= sizeof (struct GNUNET_MessageHeader));
1800   hdr = (struct GNUNET_MessageHeader *) &obm[1];
1801   GNUNET_assert (ntohs (hdr->size) == ret);
1802   GNUNET_assert (ret + sizeof (struct OutboundMessage) <
1803                  GNUNET_SERVER_MAX_MESSAGE_SIZE);
1804 #if DEBUG_TRANSPORT
1805   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1806               "Transmitting `%s' message with data for `%4s'\n",
1807               "SEND", GNUNET_i2s (&ctw->th->target));
1808 #endif
1809   ret += sizeof (struct OutboundMessage);
1810   obm->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND);
1811   obm->header.size = htons (ret);
1812   obm->priority = htonl (ctw->th->priority);
1813   obm->peer = ctw->th->target;
1814   GNUNET_free (ctw);
1815   return ret;
1816 }
1817
1818
1819
1820 /**
1821  * Check if we could queue a message of the given size for
1822  * transmission.  The transport service will take both its
1823  * internal buffers and bandwidth limits imposed by the
1824  * other peer into consideration when answering this query.
1825  *
1826  * @param handle connection to transport service
1827  * @param target who should receive the message
1828  * @param size how big is the message we want to transmit?
1829  * @param priority how important is the message?
1830  * @param timeout after how long should we give up (and call
1831  *        notify with buf NULL and size 0)?
1832  * @param notify function to call when we are ready to
1833  *        send such a message
1834  * @param notify_cls closure for notify
1835  * @return NULL if someone else is already waiting to be notified
1836  *         non-NULL if the notify callback was queued (can be used to cancel
1837  *         using GNUNET_TRANSPORT_notify_transmit_ready_cancel)
1838  */
1839 struct GNUNET_TRANSPORT_TransmitHandle *
1840 GNUNET_TRANSPORT_notify_transmit_ready (struct GNUNET_TRANSPORT_Handle
1841                                         *handle,
1842                                         const struct GNUNET_PeerIdentity
1843                                         *target, size_t size,
1844                                         unsigned int priority,
1845                                         struct GNUNET_TIME_Relative timeout,
1846                                         GNUNET_NETWORK_TransmitReadyNotify
1847                                         notify, void *notify_cls)
1848 {
1849   struct GNUNET_TRANSPORT_TransmitHandle *pos;
1850   struct GNUNET_TRANSPORT_TransmitHandle *th;
1851   struct NeighbourList *n;
1852   struct ClientTransmitWrapper *ctw;
1853
1854   if (size + sizeof (struct OutboundMessage) >=
1855       GNUNET_SERVER_MAX_MESSAGE_SIZE)
1856     {
1857       GNUNET_break (0);
1858       return NULL;
1859     }
1860 #if DEBUG_TRANSPORT
1861   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1862               "Asking transport service for transmission of %u bytes to peer `%4s'.\n",
1863               size, GNUNET_i2s (target));
1864 #endif
1865   n = find_neighbour (handle, target);
1866   if ( (n != NULL) &&
1867        (n->transmit_handle != NULL) )
1868     return NULL; /* already have a request pending for this peer! */
1869   ctw = GNUNET_malloc (sizeof (struct ClientTransmitWrapper));
1870   th = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_TransmitHandle));
1871   ctw->notify = notify;
1872   ctw->notify_cls = notify_cls;
1873   ctw->th = th;
1874   th->handle = handle;
1875   th->neighbour = n;
1876   th->target = *target;
1877   th->notify = &client_notify_wrapper;
1878   th->notify_cls = ctw;
1879   th->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1880   th->notify_size = size + sizeof (struct OutboundMessage);
1881   th->priority = priority;
1882   if (NULL == n)
1883     {
1884       pos = handle->connect_wait_head;
1885       while (pos != NULL)
1886         {
1887           GNUNET_assert (0 != memcmp (target,
1888                                       &pos->target,
1889                                       sizeof (struct GNUNET_PeerIdentity)));
1890           pos = pos->next;
1891         }
1892 #if DEBUG_TRANSPORT
1893       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1894                   "Will now try to connect to `%4s'.\n", GNUNET_i2s (target));
1895 #endif
1896       try_connect (th);
1897       return th;
1898     }
1899
1900 #if DEBUG_TRANSPORT
1901   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1902               "Transmission request queued for transmission to transport service.\n");
1903 #endif
1904   GNUNET_assert (NULL == n->transmit_handle);
1905   n->transmit_handle = th;
1906   if (GNUNET_YES != n->received_ack)
1907     {
1908 #if DEBUG_TRANSPORT
1909       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1910                   "Connection to `%4s' is not yet confirmed connected, scheduling timeout (%llu ms) only.\n",
1911                   GNUNET_i2s (target), timeout.value);
1912 #endif
1913       th->notify_delay_task
1914         = GNUNET_SCHEDULER_add_delayed (handle->sched,
1915                                         GNUNET_NO,
1916                                         GNUNET_SCHEDULER_PRIORITY_KEEP,
1917                                         GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
1918                                         timeout, &transmit_timeout, th);
1919       return th;
1920     }
1921   
1922 #if DEBUG_TRANSPORT
1923   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1924               "Peer `%4s' is ready to receive, scheduling message for delivery now.\n",
1925               GNUNET_i2s (target));
1926 #endif
1927   schedule_request (th);
1928   return th;
1929 }
1930
1931
1932 /**
1933  * Cancel the specified transmission-ready
1934  * notification.
1935  */
1936 void
1937 GNUNET_TRANSPORT_notify_transmit_ready_cancel (struct
1938                                                GNUNET_TRANSPORT_TransmitHandle
1939                                                *th)
1940 {
1941   struct GNUNET_TRANSPORT_Handle *h;
1942
1943 #if DEBUG_TRANSPORT
1944   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1945               "Transmission request of %u bytes to `%4s' was cancelled.\n",
1946               th->notify_size - sizeof(struct OutboundMessage),
1947               GNUNET_i2s (&th->target));
1948 #endif
1949   GNUNET_assert (th->notify == &client_notify_wrapper);
1950   remove_from_any_list (th);
1951   h = th->handle;
1952   if ((h->connect_ready_head == NULL) && (h->network_handle != NULL))
1953     {
1954       GNUNET_NETWORK_notify_transmit_ready_cancel (h->network_handle);
1955       h->network_handle = NULL;
1956       h->transmission_scheduled = GNUNET_NO;
1957     }
1958   GNUNET_free (th->notify_cls);
1959   GNUNET_free (th);
1960 }
1961
1962
1963 /* end of transport_api.c */