minor style fixes
[oweals/gnunet.git] / src / transport / transport_api.c
1 /*
2      This file is part of GNUnet.
3      (C) 2009, 2010 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/transport_api.c
23  * @brief library to access the low-level P2P IO service
24  * @author Christian Grothoff
25  */
26 #include "platform.h"
27 #include "gnunet_bandwidth_lib.h"
28 #include "gnunet_client_lib.h"
29 #include "gnunet_constants.h"
30 #include "gnunet_container_lib.h"
31 #include "gnunet_arm_service.h"
32 #include "gnunet_hello_lib.h"
33 #include "gnunet_protocols.h"
34 #include "gnunet_server_lib.h"
35 #include "gnunet_time_lib.h"
36 #include "gnunet_transport_service.h"
37 #include "transport.h"
38
39 /**
40  * After how long do we give up on transmitting a HELLO
41  * to the service?
42  */
43 #define OFFER_HELLO_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
44
45 /**
46  * After how long do we automatically retry an unsuccessful
47  * CONNECT request?
48  */
49 #define CONNECT_RETRY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 750)
50
51 /**
52  * How long should ARM wait when starting up the
53  * transport service before reporting back?
54  */
55 #define START_SERVICE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
56
57 /**
58  * How long should ARM wait when stopping the
59  * transport service before reporting back?
60  */
61 #define STOP_SERVICE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
62
63 /**
64  * How large to start with for the hashmap of neighbours.
65  */
66 #define STARTING_NEIGHBOURS_SIZE 10
67
68
69 /**
70  * What stage are we in for transmission processing?
71  */
72 enum TransmitStage
73   {
74     /**
75      * No active message.
76      */
77     TS_NEW = 0,
78
79     /**
80      * Message in local queue, not given to service.
81      */
82     TS_QUEUED = 1,
83
84     /**
85      * Message given to service, not confirmed (no SEND_OK).
86      */
87     TS_TRANSMITTED = 2,
88
89     /**
90      * One message was given to service and before it was confirmed,
91      * another one was already queued (waiting for SEND_OK to pass on
92      * to service).
93      */
94     TS_TRANSMITTED_QUEUED = 3
95   };
96
97
98 /**
99  * Handle for a transmission-ready request.
100  */
101 struct GNUNET_TRANSPORT_TransmitHandle
102 {
103
104   /**
105    * Neighbour for this handle, NULL for control-traffic.
106    */
107   struct NeighbourList *neighbour;
108
109   /**
110    * Function to call when notify_size bytes are available
111    * for transmission.
112    */
113   GNUNET_CONNECTION_TransmitReadyNotify notify;
114
115   /**
116    * Closure for notify.
117    */
118   void *notify_cls;
119
120   /**
121    * transmit_ready task Id.  The task is used to introduce the
122    * artificial delay that may be required to maintain the bandwidth
123    * limits.  Later, this will be the ID of the "transmit_timeout"
124    * task which is used to signal a timeout if the transmission could
125    * not be done in a timely fashion.
126    */
127   GNUNET_SCHEDULER_TaskIdentifier notify_delay_task;
128
129   /**
130    * Timeout for this request.
131    */
132   struct GNUNET_TIME_Absolute timeout;
133
134   /**
135    * How many bytes is our notify callback waiting for?
136    */
137   size_t notify_size;
138
139   /**
140    * How important is this message?
141    */
142   unsigned int priority;
143
144 };
145
146
147 /**
148  * Handle for a control message queue entry.
149  */
150 struct ControlMessage
151 {
152
153   /**
154    * This is a doubly-linked list.
155    */
156   struct ControlMessage *next;
157
158   /**
159    * This is a doubly-linked list.
160    */
161   struct ControlMessage *prev;
162
163   /**
164    * Overall transport handle.
165    */
166   struct GNUNET_TRANSPORT_Handle *h;
167
168   /**
169    * Function to call when notify_size bytes are available
170    * for transmission.
171    */
172   GNUNET_CONNECTION_TransmitReadyNotify notify;
173
174   /**
175    * Closure for notify.
176    */
177   void *notify_cls;
178
179   /**
180    * transmit_ready task Id.  The task is used to introduce the
181    * artificial delay that may be required to maintain the bandwidth
182    * limits.  Later, this will be the ID of the "transmit_timeout"
183    * task which is used to signal a timeout if the transmission could
184    * not be done in a timely fashion.
185    */
186   GNUNET_SCHEDULER_TaskIdentifier notify_delay_task;
187
188   /**
189    * How many bytes is our notify callback waiting for?
190    */
191   size_t notify_size;
192
193 };
194
195 /**
196  * Context for storing information about attempted next transmission.
197  */
198 struct TryTransmitContext
199 {
200
201   /**
202    * Main transport handle.
203    */
204   struct GNUNET_TRANSPORT_Handle *h;
205
206   /**
207    * Returned transmission handle.
208    */
209   struct GNUNET_TRANSPORT_TransmitHandle *ret;
210
211   /**
212    * Time to retry the send task.
213    */
214   struct GNUNET_TIME_Relative retry_time;
215 };
216
217 /**
218  * Entry in hash table of all of our current neighbours.
219  */
220 struct NeighbourList
221 {
222   /**
223    * Overall transport handle.
224    */
225   struct GNUNET_TRANSPORT_Handle *h;
226
227   /**
228    * Active transmit handle; available if 'transmit_forbidden'
229    * is GNUNET_NO.
230    */
231   struct GNUNET_TRANSPORT_TransmitHandle transmit_handle;
232
233   /**
234    * Identity of this neighbour.
235    */
236   struct GNUNET_PeerIdentity id;
237
238   /**
239    * Outbound bandwidh tracker.
240    */
241   struct GNUNET_BANDWIDTH_Tracker out_tracker;
242
243   /**
244    * Set to GNUNET_NO if we are currently allowed to accept a
245    * message to the transport service for this peer, GNUNET_YES
246    * if we have one and are waiting for transmission, GNUNET_SYSERR
247    * if we are waiting for confirmation AND have already accepted
248    * yet another message.
249    */
250   enum TransmitStage transmit_stage;
251
252   /**
253    * Have we received a notification that this peer is connected
254    * to us right now?
255    */
256   int is_connected;
257
258   /**
259    * Are we in the middle of disconnecting the peer already?
260    */
261   unsigned int in_disconnect;
262
263 };
264
265
266 /**
267  * Linked list of requests from clients for our HELLO that were
268  * deferred.
269  */
270 struct HelloWaitList
271 {
272
273   /**
274    * This is a linked list.
275    */
276   struct HelloWaitList *next;
277
278   /**
279    * Reference back to our transport handle.
280    */
281   struct GNUNET_TRANSPORT_Handle *handle;
282
283   /**
284    * Callback to call once we got our HELLO.
285    */
286   GNUNET_TRANSPORT_HelloUpdateCallback rec;
287
288   /**
289    * Closure for rec.
290    */
291   void *rec_cls;
292
293 };
294
295
296 /**
297  * Handle for the transport service (includes all of the
298  * state for the transport service).
299  */
300 struct GNUNET_TRANSPORT_Handle
301 {
302
303   /**
304    * Closure for the callbacks.
305    */
306   void *cls;
307
308   /**
309    * Function to call for received data.
310    */
311   GNUNET_TRANSPORT_ReceiveCallback rec;
312
313   /**
314    * function to call on connect events
315    */
316   GNUNET_TRANSPORT_NotifyConnect nc_cb;
317
318   /**
319    * function to call on disconnect events
320    */
321   GNUNET_TRANSPORT_NotifyDisconnect nd_cb;
322
323   /**
324    * Head of DLL of control messages.
325    */
326   struct ControlMessage *control_head;
327
328   /**
329    * Tail of DLL of control messages.
330    */
331   struct ControlMessage *control_tail;
332
333   /**
334    * The current HELLO message for this peer.  Updated
335    * whenever transports change their addresses.
336    */
337   struct GNUNET_HELLO_Message *my_hello;
338
339   /**
340    * My client connection to the transport service.
341    */
342   struct GNUNET_CLIENT_Connection *client;
343
344   /**
345    * Handle to our registration with the client for notification.
346    */
347   struct GNUNET_CLIENT_TransmitHandle *network_handle;
348
349   /**
350    * Linked list of pending requests for our HELLO.
351    */
352   struct HelloWaitList *hwl_head;
353
354   /**
355    * My configuration.
356    */
357   const struct GNUNET_CONFIGURATION_Handle *cfg;
358
359   /**
360    * Linked list of the current neighbours of this peer.
361    */
362   struct GNUNET_CONTAINER_MultiHashMap *neighbours;
363
364   /**
365    * Peer identity as assumed by this process, or all zeros.
366    */
367   struct GNUNET_PeerIdentity self;
368
369   /**
370    * ID of the task trying to reconnect to the service.
371    */
372   GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
373
374   /**
375    * ID of the task trying to trigger transmission for a peer
376    * while maintaining bandwidth quotas.
377    */
378   GNUNET_SCHEDULER_TaskIdentifier quota_task;
379
380   /**
381    * Delay until we try to reconnect.
382    */
383   struct GNUNET_TIME_Relative reconnect_delay;
384
385   /**
386    * Set once we are in the process of disconnecting from the
387    * service.
388    */
389   int in_disconnect;
390
391   /**
392    * Should we check that 'self' matches what the service thinks?
393    * (if GNUNET_NO, then 'self' is all zeros!).
394    */
395   int check_self;
396 };
397
398
399 /**
400  * Get the neighbour list entry for the given peer
401  *
402  * @param h our context
403  * @param peer peer to look up
404  * @return NULL if no such peer entry exists
405  */
406 static struct NeighbourList *
407 neighbour_find (struct GNUNET_TRANSPORT_Handle *h,
408                 const struct GNUNET_PeerIdentity *peer)
409 {
410   return GNUNET_CONTAINER_multihashmap_get(h->neighbours, &peer->hashPubKey);
411 }
412
413
414 /**
415  * Schedule the task to send one message, either from the control
416  * list or the peer message queues  to the service.
417  */
418 static void schedule_transmission (struct GNUNET_TRANSPORT_Handle *h);
419
420
421 /**
422  * Function called by the scheduler when the timeout for bandwidth
423  * availablility for the target neighbour is reached.
424  *
425  * @param cls the 'struct GNUNET_TRANSPORT_Handle*'
426  * @param tc scheduler context
427  */
428 static void
429 quota_transmit_ready (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
430 {
431   struct GNUNET_TRANSPORT_Handle *h = cls;
432
433   h->quota_task = GNUNET_SCHEDULER_NO_TASK;
434   schedule_transmission (h);
435 }
436
437
438 /**
439  * Iterator over hash map entries, attempt to schedule
440  * a transmission to entries in the neighbour hashmap.
441  *
442  * @param cls closure a TryTransmitContext
443  * @param key current key code
444  * @param value value in the hash map, the neighbour entry to consider
445  * @return GNUNET_YES if we should continue to
446  *         iterate,
447  *         GNUNET_NO if not.
448  */
449 static int
450 try_schedule_transmission (void *cls,
451                            const GNUNET_HashCode * key,
452                            void *value)
453 {
454   struct NeighbourList *n = value;
455   struct TryTransmitContext *try_transmit_ctx = cls;
456   struct GNUNET_TIME_Relative duration;
457   GNUNET_CONNECTION_TransmitReadyNotify notify;
458   struct GNUNET_TRANSPORT_TransmitHandle *th;
459   struct GNUNET_TIME_Absolute duration_abs;
460
461   if (n->transmit_stage != TS_QUEUED)
462     return GNUNET_YES; /* not eligible, keep iterating */
463   if (n->is_connected != GNUNET_YES)
464     return GNUNET_YES; /* keep iterating */
465
466   th = &n->transmit_handle;
467   GNUNET_break (n == th->neighbour);
468   /* check outgoing quota */
469   duration = GNUNET_BANDWIDTH_tracker_get_delay (&n->out_tracker,
470                                                  th->notify_size - sizeof (struct OutboundMessage));
471   duration_abs = GNUNET_TIME_relative_to_absolute (duration);
472   if (th->timeout.abs_value < duration_abs.abs_value)
473     {
474       /* signal timeout! */
475 #if DEBUG_TRANSPORT
476       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
477                   "Would need %llu ms before bandwidth is available for delivery to `%4s', that is too long.  Signaling timeout.\n",
478                   duration.rel_value,
479                   GNUNET_i2s (&n->id));
480 #endif
481       if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
482         {
483           GNUNET_SCHEDULER_cancel (th->notify_delay_task);
484           th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
485         }
486       n->transmit_stage = TS_NEW;
487       if (NULL != (notify = th->notify))
488         {
489           th->notify = NULL;
490           GNUNET_assert (0 == notify (th->notify_cls, 0, NULL));
491         }
492       return GNUNET_YES; /* keep iterating */
493     }
494   if (duration.rel_value > 0)
495     {
496 #if DEBUG_TRANSPORT
497       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
498                   "Need more bandwidth (%u b/s allowed, %u b needed), delaying delivery to `%4s' by %llu ms\n",
499                   (unsigned int) n->out_tracker.available_bytes_per_s__,
500                   (unsigned int) th->notify_size - sizeof (struct OutboundMessage),
501                   GNUNET_i2s (&n->id),
502                   (unsigned long long) duration.rel_value);
503 #endif
504       try_transmit_ctx->retry_time = GNUNET_TIME_relative_min (try_transmit_ctx->retry_time,
505                                                                duration);
506       return GNUNET_YES; /* keep iterating */
507     }
508 #if DEBUG_TRANSPORT
509   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
510               "Have %u bytes of bandwidth available for transmission to `%4s' right now\n",
511               th->notify_size - sizeof (struct OutboundMessage),
512               GNUNET_i2s (&n->id));
513 #endif
514
515   if ( (try_transmit_ctx->ret == NULL) ||
516        (try_transmit_ctx->ret->priority < th->priority) )
517     try_transmit_ctx->ret = th;
518   return GNUNET_YES;
519 }
520
521
522 /**
523  * Figure out which transmission to a peer can be done right now.
524  * If none can, schedule a task to call 'schedule_transmission'
525  * whenever a peer transmission can be done in the future and
526  * return NULL.  Otherwise return the next transmission to be
527  * performed.
528  *
529  * @param h handle to transport
530  * @return NULL to wait longer before doing any peer transmissions
531  */
532 static struct GNUNET_TRANSPORT_TransmitHandle *
533 schedule_peer_transmission (struct GNUNET_TRANSPORT_Handle *h)
534 {
535   struct TryTransmitContext try_transmit_ctx;
536
537   if (h->quota_task != GNUNET_SCHEDULER_NO_TASK)
538     {
539       GNUNET_SCHEDULER_cancel (h->quota_task);
540       h->quota_task = GNUNET_SCHEDULER_NO_TASK;
541     }
542   memset(&try_transmit_ctx, 0, sizeof(struct TryTransmitContext));
543   try_transmit_ctx.retry_time = GNUNET_TIME_UNIT_FOREVER_REL;
544   GNUNET_CONTAINER_multihashmap_iterate(h->neighbours, 
545                                         &try_schedule_transmission, 
546                                         &try_transmit_ctx);
547   if (try_transmit_ctx.ret == NULL)
548     h->quota_task = GNUNET_SCHEDULER_add_delayed (try_transmit_ctx.retry_time,
549                                                   &quota_transmit_ready,
550                                                   h);
551   return try_transmit_ctx.ret;
552 }
553
554
555 /**
556  * Transmit message(s) to service.
557  *
558  * @param cls handle to transport
559  * @param size number of bytes available in buf
560  * @param buf where to copy the message
561  * @return number of bytes copied to buf
562  */
563 static size_t
564 transport_notify_ready (void *cls, size_t size, void *buf)
565 {
566   struct GNUNET_TRANSPORT_Handle *h = cls;
567   struct ControlMessage *cm;
568   struct GNUNET_TRANSPORT_TransmitHandle *th;
569   struct NeighbourList *n;
570   struct OutboundMessage obm;
571   GNUNET_CONNECTION_TransmitReadyNotify notify;
572   size_t ret;
573   size_t mret;
574   size_t nret;
575   char *cbuf;
576
577   h->network_handle = NULL;
578   if (buf == NULL)
579     {
580       schedule_transmission (h);
581       return 0;
582     }
583 #if DEBUG_TRANSPORT
584   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
585               "Ready to transmit %u bytes to transport service\n", size);
586 #endif
587   cbuf = buf;
588   ret = 0;
589   while ( (NULL != (cm = h->control_head)) &&
590           (cm->notify_size <= size) )
591     {
592       if (cm->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
593         {
594           GNUNET_SCHEDULER_cancel (cm->notify_delay_task);
595           cm->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
596         }
597       GNUNET_CONTAINER_DLL_remove (h->control_head,
598                                    h->control_tail,
599                                    cm);
600       nret = cm->notify (cm->notify_cls, size, &cbuf[ret]);
601 #if DEBUG_TRANSPORT
602       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
603                   "Added %u bytes of control message at %u\n",
604                   nret,
605                   ret);
606 #endif
607       GNUNET_free (cm);
608       ret += nret;
609       size -= nret;
610     }
611   while ( (NULL != (th = schedule_peer_transmission (h))) &&
612           (th->notify_size <= size) )
613     {
614       if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
615         {
616           GNUNET_SCHEDULER_cancel (th->notify_delay_task);
617           th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
618         }
619       n = th->neighbour;
620       switch (n->transmit_stage)
621         {
622         case TS_NEW:
623           GNUNET_break (0);
624           break;
625         case TS_QUEUED:
626           n->transmit_stage = TS_TRANSMITTED;
627           break;
628         case TS_TRANSMITTED:
629           GNUNET_break (0);
630           break;
631         case TS_TRANSMITTED_QUEUED:
632           GNUNET_break (0);
633           break;
634         default:
635           GNUNET_break (0);
636         }
637       GNUNET_assert (size >= sizeof (struct OutboundMessage));
638       notify = th->notify;
639       th->notify = NULL;
640       mret = notify (th->notify_cls,
641                      size - sizeof (struct OutboundMessage),
642                      &cbuf[ret + sizeof (struct OutboundMessage)]);
643       GNUNET_assert (mret <= size - sizeof (struct OutboundMessage));
644 #if DEBUG_TRANSPORT
645       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
646                   "Message of %u bytes with timeout %llums constructed for `%4s'\n",
647                   (unsigned int) mret,
648                   (unsigned long long) GNUNET_TIME_absolute_get_remaining (th->timeout).rel_value,
649                   GNUNET_i2s (&n->id));
650 #endif
651       if (mret != 0)    
652         {
653           GNUNET_assert (mret + sizeof (struct OutboundMessage) < GNUNET_SERVER_MAX_MESSAGE_SIZE);
654           obm.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SEND);
655           obm.header.size = htons (mret + sizeof (struct OutboundMessage));
656           obm.priority = htonl (th->priority);
657           obm.timeout = GNUNET_TIME_relative_hton (GNUNET_TIME_absolute_get_remaining (th->timeout));
658           obm.peer = n->id;
659           memcpy (&cbuf[ret], &obm, sizeof (struct OutboundMessage));
660           ret += (mret + sizeof (struct OutboundMessage));
661           size -= (mret + sizeof (struct OutboundMessage));
662           GNUNET_BANDWIDTH_tracker_consume (&n->out_tracker, mret);
663         }
664       else
665         {
666           switch (n->transmit_stage)
667             {
668             case TS_NEW:
669               GNUNET_break (0);
670               break;
671             case TS_QUEUED:
672               GNUNET_break (0);
673               break;
674             case TS_TRANSMITTED:
675               n->transmit_stage = TS_NEW;
676               break;
677             case TS_TRANSMITTED_QUEUED:
678               n->transmit_stage = TS_QUEUED;
679               continue;
680             default:
681               GNUNET_break (0);
682             }
683         }
684     }
685   schedule_transmission (h);
686 #if DEBUG_TRANSPORT
687   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
688               "Transmitting %u bytes to transport service\n", ret);
689 #endif
690   return ret;
691 }
692
693
694 /**
695  * Schedule the task to send one message, either from the control
696  * list or the peer message queues  to the service.
697  */
698 static void
699 schedule_transmission (struct GNUNET_TRANSPORT_Handle *h)
700 {
701   size_t size;
702   struct GNUNET_TIME_Relative timeout;
703   struct GNUNET_TRANSPORT_TransmitHandle *th;
704
705   if (NULL != h->network_handle)
706     return;
707   if (h->client == NULL)
708     {
709       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
710                   _("Could not yet schedule transmission: we are not yet connected to the transport service!\n"));
711       return;                   /* not yet connected */
712     }
713   if (NULL != h->control_head)
714     {
715       size = h->control_head->notify_size;
716       timeout = GNUNET_TIME_UNIT_FOREVER_REL;
717     }
718   else
719     {
720       th = schedule_peer_transmission (h);
721       if (th == NULL)
722         {
723           /* no transmission ready right now */
724 #if DEBUG_TRANSPORT
725           GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
726                       "Could not yet schedule transmission: none ready\n");
727 #endif
728           return;
729         }
730       size = th->notify_size;
731       timeout = GNUNET_TIME_absolute_get_remaining (th->timeout);
732     }
733 #if DEBUG_TRANSPORT
734     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
735                 "Calling notify_transmit_ready\n");
736 #endif
737   h->network_handle =
738     GNUNET_CLIENT_notify_transmit_ready (h->client,
739                                          size,
740                                          timeout,
741                                          GNUNET_NO,
742                                          &transport_notify_ready,
743                                          h);
744   GNUNET_assert (NULL != h->network_handle);
745 }
746
747
748 /**
749  * Called when our transmit request timed out before any transport
750  * reported success connecting to the desired peer or before the
751  * transport was ready to receive.  Signal error and free
752  * TransmitHandle.
753  */
754 static void
755 control_transmit_timeout (void *cls,
756                           const struct GNUNET_SCHEDULER_TaskContext *tc)
757 {
758   struct ControlMessage *th = cls;
759
760   th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
761   if (NULL != th->notify)    
762     th->notify (th->notify_cls, 0, NULL);    
763   GNUNET_CONTAINER_DLL_remove (th->h->control_head,
764                                th->h->control_tail,
765                                th);
766   GNUNET_free (th);
767 }
768
769
770 /**
771  * Queue control request for transmission to the transport
772  * service.
773  *
774  * @param h handle to the transport service
775  * @param size number of bytes to be transmitted
776  * @param at_head request must be added to the head of the queue
777  *        (otherwise request will be appended)
778  * @param timeout how long this transmission can wait (at most)
779  * @param notify function to call to get the content
780  * @param notify_cls closure for notify
781  */
782 static void
783 schedule_control_transmit (struct GNUNET_TRANSPORT_Handle *h,
784                            size_t size,
785                            int at_head,
786                            struct GNUNET_TIME_Relative timeout,
787                            GNUNET_CONNECTION_TransmitReadyNotify notify,
788                            void *notify_cls)
789 {
790   struct ControlMessage *cm;
791
792 #if DEBUG_TRANSPORT
793   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
794               "Control transmit of %u bytes within %llums requested\n",
795               size, (unsigned long long) timeout.rel_value);
796 #endif
797   cm = GNUNET_malloc (sizeof (struct ControlMessage));
798   cm->h = h;
799   cm->notify = notify;
800   cm->notify_cls = notify_cls;
801   cm->notify_size = size;
802   cm->notify_delay_task
803     = GNUNET_SCHEDULER_add_delayed (timeout, &control_transmit_timeout, cm);
804   if (at_head)
805     GNUNET_CONTAINER_DLL_insert (h->control_head,
806                                  h->control_tail,
807                                  cm);
808   else
809     GNUNET_CONTAINER_DLL_insert_after (h->control_head,
810                                        h->control_tail,
811                                        h->control_tail,
812                                        cm);
813   schedule_transmission (h);
814 }
815
816
817 struct SetQuotaContext
818 {
819   struct GNUNET_TRANSPORT_Handle *handle;
820
821   struct GNUNET_PeerIdentity target;
822
823   GNUNET_SCHEDULER_Task cont;
824
825   void *cont_cls;
826
827   struct GNUNET_TIME_Absolute timeout;
828
829   struct GNUNET_BANDWIDTH_Value32NBO quota_in;
830 };
831
832
833 /**
834  * Send SET_QUOTA message to the service.
835  *
836  * @param cls the 'struct SetQuotaContext'
837  * @param size number of bytes available in buf
838  * @param buf where to copy the message
839  * @return number of bytes copied to buf
840  */
841 static size_t
842 send_set_quota (void *cls, size_t size, void *buf)
843 {
844   struct SetQuotaContext *sqc = cls;
845   struct QuotaSetMessage *msg;
846
847   if (buf == NULL)
848     {
849       if (sqc->cont != NULL)
850         GNUNET_SCHEDULER_add_continuation (sqc->cont,
851                                            sqc->cont_cls,
852                                            GNUNET_SCHEDULER_REASON_TIMEOUT);
853       GNUNET_free (sqc);
854       return 0;
855     }
856 #if DEBUG_TRANSPORT
857   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
858               "Transmitting `%s' request with respect to `%4s'.\n",
859               "SET_QUOTA",
860               GNUNET_i2s (&sqc->target));
861 #endif
862   GNUNET_assert (size >= sizeof (struct QuotaSetMessage));
863   msg = buf;
864   msg->header.size = htons (sizeof (struct QuotaSetMessage));
865   msg->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA);
866   msg->quota = sqc->quota_in;
867   memcpy (&msg->peer, &sqc->target, sizeof (struct GNUNET_PeerIdentity));
868   if (sqc->cont != NULL)
869     GNUNET_SCHEDULER_add_continuation (sqc->cont,
870                                        sqc->cont_cls,
871                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
872   GNUNET_free (sqc);
873   return sizeof (struct QuotaSetMessage);
874 }
875
876
877 /**
878  * Set the share of incoming bandwidth for the given
879  * peer to the specified amount.
880  *
881  * @param handle connection to transport service
882  * @param target who's bandwidth quota is being changed
883  * @param quota_in incoming bandwidth quota in bytes per ms
884  * @param quota_out outgoing bandwidth quota in bytes per ms
885  * @param timeout how long to wait until signaling failure if
886  *        we can not communicate the quota change
887  * @param cont continuation to call when done, will be called
888  *        either with reason "TIMEOUT" or with reason "PREREQ_DONE"
889  * @param cont_cls closure for continuation
890  */
891 void
892 GNUNET_TRANSPORT_set_quota (struct GNUNET_TRANSPORT_Handle *handle,
893                             const struct GNUNET_PeerIdentity *target,
894                             struct GNUNET_BANDWIDTH_Value32NBO quota_in,
895                             struct GNUNET_BANDWIDTH_Value32NBO quota_out,
896                             struct GNUNET_TIME_Relative timeout,
897                             GNUNET_SCHEDULER_Task cont, void *cont_cls)
898 {
899   struct NeighbourList *n;
900   struct SetQuotaContext *sqc;
901
902   n = neighbour_find (handle, target);
903   if (n != NULL)
904     {
905 #if DEBUG_TRANSPORT
906       if (ntohl (quota_out.value__) != n->out_tracker.available_bytes_per_s__)
907         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
908                     "Quota changed from %u to %u for peer `%s'\n",
909                     (unsigned int) n->out_tracker.available_bytes_per_s__,
910                     (unsigned int) ntohl (quota_out.value__),
911                     GNUNET_i2s (target));
912       else
913         GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
914                     "Quota remains at %u for peer `%s'\n",
915                     (unsigned int) n->out_tracker.available_bytes_per_s__,
916                     GNUNET_i2s (target));
917 #endif
918       GNUNET_BANDWIDTH_tracker_update_quota (&n->out_tracker,
919                                              quota_out);
920     }
921   else
922     {
923 #if DEBUG_TRANSPORT
924       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
925                   "Quota changed to %u for peer `%s', but I have no such neighbour!\n",
926                   (unsigned int) ntohl (quota_out.value__),
927                   GNUNET_i2s (target));
928 #endif
929     }
930   sqc = GNUNET_malloc (sizeof (struct SetQuotaContext));
931   sqc->handle = handle;
932   sqc->target = *target;
933   sqc->cont = cont;
934   sqc->cont_cls = cont_cls;
935   sqc->timeout = GNUNET_TIME_relative_to_absolute (timeout);
936   sqc->quota_in = quota_in;
937   schedule_control_transmit (handle,
938                              sizeof (struct QuotaSetMessage),
939                              GNUNET_NO, timeout, &send_set_quota, sqc);
940 }
941
942
943 /**
944  * Obtain the HELLO message for this peer.
945  *
946  * @param handle connection to transport service
947  * @param rec function to call with the HELLO, sender will be our peer
948  *            identity; message and sender will be NULL on timeout
949  *            (handshake with transport service pending/failed).
950  *             cost estimate will be 0.
951  * @param rec_cls closure for rec
952  */
953 void
954 GNUNET_TRANSPORT_get_hello (struct GNUNET_TRANSPORT_Handle *handle,
955                             GNUNET_TRANSPORT_HelloUpdateCallback rec,
956                             void *rec_cls)
957 {
958   struct HelloWaitList *hwl;
959
960   hwl = GNUNET_malloc (sizeof (struct HelloWaitList));
961   hwl->next = handle->hwl_head;
962   handle->hwl_head = hwl;
963   hwl->handle = handle;
964   hwl->rec = rec;
965   hwl->rec_cls = rec_cls;
966   if (handle->my_hello == NULL)
967     return;
968   rec (rec_cls, (const struct GNUNET_MessageHeader *) handle->my_hello);
969 }
970
971
972
973 /**
974  * Stop receiving updates about changes to our HELLO message.
975  *
976  * @param handle connection to transport service
977  * @param rec function previously registered to be called with the HELLOs
978  * @param rec_cls closure for rec
979  */
980 void
981 GNUNET_TRANSPORT_get_hello_cancel (struct GNUNET_TRANSPORT_Handle *handle,
982                                    GNUNET_TRANSPORT_HelloUpdateCallback rec,
983                                    void *rec_cls)
984 {
985   struct HelloWaitList *pos;
986   struct HelloWaitList *prev;
987
988   prev = NULL;
989   pos = handle->hwl_head;
990   while (pos != NULL)
991     {
992       if ( (pos->rec == rec) &&
993            (pos->rec_cls == rec_cls) )
994         break;
995       prev = pos;
996       pos = pos->next;
997     }
998   GNUNET_break (pos != NULL);
999   if (pos == NULL)
1000     return;
1001   if (prev == NULL)
1002     handle->hwl_head = pos->next;
1003   else
1004     prev->next = pos->next;
1005   GNUNET_free (pos);
1006 }
1007
1008
1009 /**
1010  * Send HELLO message to the service.
1011  *
1012  * @param cls the HELLO message to send
1013  * @param size number of bytes available in buf
1014  * @param buf where to copy the message
1015  * @return number of bytes copied to buf
1016  */
1017 static size_t
1018 send_hello (void *cls, size_t size, void *buf)
1019 {
1020   struct GNUNET_MessageHeader *hello = cls;
1021   uint16_t msize;
1022
1023   if (buf == NULL)
1024     {
1025 #if DEBUG_TRANSPORT_TIMEOUT
1026       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1027                   "Timeout while trying to transmit `%s' request.\n",
1028                   "HELLO");
1029 #endif
1030       GNUNET_free (hello);
1031       return 0;
1032     }
1033 #if DEBUG_TRANSPORT
1034   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1035               "Transmitting `%s' request.\n", "HELLO");
1036 #endif
1037   msize = ntohs (hello->size);
1038   GNUNET_assert (size >= msize);
1039   memcpy (buf, hello, msize);
1040   GNUNET_free (hello);
1041   return msize;
1042 }
1043
1044
1045 /**
1046  * Offer the transport service the HELLO of another peer.  Note that
1047  * the transport service may just ignore this message if the HELLO is
1048  * malformed or useless due to our local configuration.
1049  *
1050  * @param handle connection to transport service
1051  * @param hello the hello message
1052  */
1053 void
1054 GNUNET_TRANSPORT_offer_hello (struct GNUNET_TRANSPORT_Handle *handle,
1055                               const struct GNUNET_MessageHeader *hello)
1056 {
1057   struct GNUNET_MessageHeader *hc;
1058   uint16_t size;
1059   struct GNUNET_PeerIdentity peer;
1060
1061   GNUNET_break (ntohs (hello->type) == GNUNET_MESSAGE_TYPE_HELLO);
1062   size = ntohs (hello->size);
1063   GNUNET_break (size >= sizeof (struct GNUNET_MessageHeader));
1064   if (GNUNET_OK != GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message*) hello,
1065                                         &peer))
1066     {
1067       GNUNET_break (0);
1068       return;
1069     }
1070 #if DEBUG_TRANSPORT
1071   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1072               "Offering `%s' message of `%4s' to transport for validation.\n",
1073               "HELLO",
1074               GNUNET_i2s (&peer));
1075 #endif
1076   hc = GNUNET_malloc (size);
1077   memcpy (hc, hello, size);
1078   schedule_control_transmit (handle,
1079                              size,
1080                              GNUNET_NO, OFFER_HELLO_TIMEOUT, &send_hello, hc);
1081 }
1082
1083
1084 /**
1085  * Transmit START message to service.
1086  *
1087  * @param cls unused
1088  * @param size number of bytes available in buf
1089  * @param buf where to copy the message
1090  * @return number of bytes copied to buf
1091  */
1092 static size_t
1093 send_start (void *cls, size_t size, void *buf)
1094 {
1095   struct GNUNET_TRANSPORT_Handle *h = cls;
1096   struct StartMessage s;
1097
1098   if (buf == NULL)
1099     {
1100       /* Can only be shutdown, just give up */
1101 #if DEBUG_TRANSPORT
1102       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1103                   "Shutdown while trying to transmit `%s' request.\n",
1104                   "START");
1105 #endif
1106       return 0;
1107     }
1108 #if DEBUG_TRANSPORT
1109   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1110               "Transmitting `%s' request.\n", "START");
1111 #endif
1112   GNUNET_assert (size >= sizeof (struct StartMessage));
1113   s.header.size = htons (sizeof (struct StartMessage));
1114   s.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_START);
1115   s.do_check = htonl (h->check_self);
1116   s.self = h->self;
1117   memcpy (buf, &s, sizeof (struct StartMessage));
1118   return sizeof (struct StartMessage);
1119 }
1120
1121
1122 /**
1123  * Free neighbour.
1124  *
1125  * @param n the entry to free
1126  */
1127 static void
1128 neighbour_free (struct NeighbourList *n)
1129 {
1130   struct GNUNET_TRANSPORT_Handle *h;
1131
1132   /* Added so task gets canceled when a disconnect is received! */
1133   /* Method 1
1134   if (n->transmit_handle.notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
1135     {
1136       GNUNET_SCHEDULER_cancel(n->transmit_handle.notify_delay_task);
1137       n->transmit_handle.notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
1138       n->transmit_handle.notify = NULL;
1139     }
1140   */
1141
1142   GNUNET_assert (n->transmit_handle.notify == NULL);
1143   h = n->h;
1144 #if DEBUG_TRANSPORT
1145   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1146               "Removing neighbour `%s' from list of connected peers.\n",
1147               GNUNET_i2s (&n->id));
1148 #endif
1149   GNUNET_break (n->is_connected == GNUNET_NO);
1150   GNUNET_break (n->transmit_stage == TS_NEW);
1151
1152   GNUNET_assert(GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove(h->neighbours, &n->id.hashPubKey, n));
1153
1154   GNUNET_free (n);
1155 }
1156
1157
1158 /**
1159  * Mark neighbour as disconnected.
1160  *
1161  * @param n the entry to mark as disconnected
1162  */
1163 static void
1164 neighbour_disconnect (struct NeighbourList *n)
1165 {
1166   struct GNUNET_TRANSPORT_Handle *h = n->h;
1167 #if DEBUG_TRANSPORT
1168   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1169               "Removing neighbour `%s' from list of connected peers.\n",
1170               GNUNET_i2s (&n->id));
1171 #endif
1172   GNUNET_break (n->is_connected == GNUNET_YES);
1173   n->is_connected = GNUNET_NO;
1174   n->in_disconnect = GNUNET_YES;
1175   if (h->nd_cb != NULL)
1176     h->nd_cb (h->cls, &n->id);
1177   if (n->transmit_stage == TS_NEW)    
1178     neighbour_free (n);
1179     
1180 }
1181
1182
1183 /**
1184  * Function we use for handling incoming messages.
1185  *
1186  * @param cls closure (struct GNUNET_TRANSPORT_Handle *)
1187  * @param msg message received, NULL on timeout or fatal error
1188  */
1189 static void demultiplexer (void *cls,
1190                            const struct GNUNET_MessageHeader *msg);
1191
1192
1193 /**
1194  * Iterator over hash map entries, for getting rid of a neighbor
1195  * upon a reconnect call.
1196  *
1197  * @param cls closure (NULL)
1198  * @param key current key code
1199  * @param value value in the hash map, the neighbour entry to forget
1200  * @return GNUNET_YES if we should continue to
1201  *         iterate,
1202  *         GNUNET_NO if not.
1203  */
1204 static int
1205 forget_neighbours (void *cls,
1206                    const GNUNET_HashCode * key,
1207                    void *value)
1208 {
1209   struct NeighbourList *n = value;
1210 #if DEBUG_TRANSPORT_DISCONNECT
1211   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1212               "Disconnecting due to reconnect being called\n");
1213 #endif
1214   if (n->is_connected)
1215     neighbour_disconnect (n);
1216
1217   return GNUNET_YES;
1218 }
1219
1220 /**
1221  * Try again to connect to transport service.
1222  *
1223  * @param cls the handle to the transport service
1224  * @param tc scheduler context
1225  */
1226 static void
1227 reconnect (void *cls,
1228            const struct GNUNET_SCHEDULER_TaskContext *tc)
1229 {
1230   struct GNUNET_TRANSPORT_Handle *h = cls;
1231   struct ControlMessage *pos;
1232
1233   h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
1234   if ( (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0)
1235     {
1236       /* shutdown, just give up */
1237       return;
1238     }
1239   /* Forget about all neighbours that we used to be connected to */
1240   GNUNET_CONTAINER_multihashmap_iterate(h->neighbours, &forget_neighbours, NULL);
1241
1242 #if DEBUG_TRANSPORT
1243   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1244               "Connecting to transport service.\n");
1245 #endif
1246   GNUNET_assert (h->client == NULL);
1247   h->client = GNUNET_CLIENT_connect ("transport", h->cfg);
1248   GNUNET_assert (h->client != NULL);
1249   /* make sure we don't send "START" twice, remove existing entry from
1250      queue (if present) */
1251   pos = h->control_head;
1252   while (pos != NULL)
1253     {
1254       if (pos->notify == &send_start)
1255         {
1256           GNUNET_CONTAINER_DLL_remove (h->control_head,
1257                                        h->control_tail,
1258                                        pos);
1259           if (GNUNET_SCHEDULER_NO_TASK != pos->notify_delay_task)
1260             {
1261               GNUNET_SCHEDULER_cancel (pos->notify_delay_task);
1262               pos->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
1263             }
1264           GNUNET_free (pos);
1265           break;
1266         }
1267       pos = pos->next;
1268     }
1269   schedule_control_transmit (h,
1270                              sizeof (struct StartMessage),
1271                              GNUNET_YES,
1272                              GNUNET_TIME_UNIT_FOREVER_REL, &send_start, h);
1273   GNUNET_CLIENT_receive (h->client,
1274                          &demultiplexer, h, GNUNET_TIME_UNIT_FOREVER_REL);
1275 }
1276
1277
1278 /**
1279  * Function that will schedule the job that will try
1280  * to connect us again to the client.
1281  *
1282  * @param h transport service to reconnect
1283  */
1284 static void
1285 schedule_reconnect (struct GNUNET_TRANSPORT_Handle *h)
1286 {
1287 #if DEBUG_TRANSPORT
1288   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1289               "Scheduling task to reconnect to transport service in %llu ms.\n",
1290               h->reconnect_delay.rel_value);
1291 #endif
1292   GNUNET_assert (h->client == NULL);
1293   GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
1294   h->reconnect_task
1295     = GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, &reconnect, h);
1296   if (h->reconnect_delay.rel_value == 0)
1297     {
1298       h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS;
1299     }
1300   else
1301     {
1302       h->reconnect_delay = GNUNET_TIME_relative_multiply (h->reconnect_delay, 2);
1303       h->reconnect_delay = GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_SECONDS,
1304                                                      h->reconnect_delay);
1305     }
1306 }
1307
1308
1309 /**
1310  * Send request connect message to the service.
1311  *
1312  * @param cls the TransportRequestConnectMessage
1313  * @param size number of bytes available in buf
1314  * @param buf where to copy the message
1315  * @return number of bytes copied to buf
1316  */
1317 static size_t
1318 send_transport_request_connect (void *cls, size_t size, void *buf)
1319 {
1320   struct TransportRequestConnectMessage *trcm = cls;
1321
1322   if (buf == NULL)
1323     {
1324 #if DEBUG_TRANSPORT
1325       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1326                   "Buffer null for %s\n",
1327                   "REQUEST_CONNECT");
1328 #endif
1329       GNUNET_free (trcm);
1330       return 0;
1331     }
1332 #if DEBUG_TRANSPORT
1333   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1334               "Transmitting `%s' request for `%4s'.\n",
1335               "REQUEST_CONNECT",
1336               GNUNET_i2s (&trcm->peer));
1337 #endif
1338   GNUNET_assert (size >= sizeof (struct TransportRequestConnectMessage));
1339   memcpy(buf, trcm, sizeof(struct TransportRequestConnectMessage));
1340   return sizeof(struct TransportRequestConnectMessage);
1341 }
1342
1343 /**
1344  * Create and send a request connect message to
1345  * the transport service for a particular peer.
1346  *
1347  * @param h handle to the transport service
1348  * @param n the neighbor to send the request connect message about
1349  *
1350  */
1351 static void 
1352 send_request_connect_message(struct GNUNET_TRANSPORT_Handle *h, struct NeighbourList *n)
1353 {
1354   struct TransportRequestConnectMessage *trcm;
1355
1356   trcm = GNUNET_malloc(sizeof(struct TransportRequestConnectMessage));
1357   trcm->header.type = htons(GNUNET_MESSAGE_TYPE_TRANSPORT_REQUEST_CONNECT);
1358   trcm->header.size = htons(sizeof(struct TransportRequestConnectMessage));
1359   memcpy(&trcm->peer, &n->id, sizeof(struct GNUNET_PeerIdentity));
1360   schedule_control_transmit (h,
1361                              sizeof (struct TransportRequestConnectMessage),
1362                              GNUNET_NO,
1363                              GNUNET_TIME_UNIT_FOREVER_REL, &send_transport_request_connect, trcm);
1364 }
1365
1366 /**
1367  * Add neighbour to our list
1368  *
1369  * @return NULL if this API is currently disconnecting from the service
1370  */
1371 static struct NeighbourList *
1372 neighbour_add (struct GNUNET_TRANSPORT_Handle *h,
1373                const struct GNUNET_PeerIdentity *pid)
1374 {
1375   struct NeighbourList *n;
1376
1377   if (GNUNET_YES == h->in_disconnect)
1378     return NULL;
1379   /* check for duplicates */
1380   if (NULL != (n = neighbour_find (h, pid)))
1381     {
1382       GNUNET_break (0);
1383       return n;
1384     }
1385 #if DEBUG_TRANSPORT
1386   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1387               "Creating entry for neighbour `%4s'.\n",
1388               GNUNET_i2s (pid));
1389 #endif
1390   n = GNUNET_malloc (sizeof (struct NeighbourList));
1391   n->id = *pid;
1392   n->h = h;
1393   GNUNET_BANDWIDTH_tracker_init (&n->out_tracker,
1394                                  GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT,
1395                                  MAX_BANDWIDTH_CARRY_S);
1396   GNUNET_CONTAINER_multihashmap_put (h->neighbours,
1397                                      &pid->hashPubKey,
1398                                      n,
1399                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
1400
1401   return n;
1402 }
1403
1404 /**
1405  * Iterator over hash map entries, for deleting state of a neighbor.
1406  *
1407  * @param cls closure (NULL)
1408  * @param key current key code
1409  * @param value value in the hash map, the neighbour entry to delete
1410  * @return GNUNET_YES if we should continue to
1411  *         iterate,
1412  *         GNUNET_NO if not.
1413  */
1414 static int
1415 delete_neighbours (void *cls,
1416                    const GNUNET_HashCode * key,
1417                    void *value)
1418 {
1419   struct NeighbourList *n = value;
1420   struct GNUNET_TRANSPORT_TransmitHandle *th;
1421
1422   switch (n->transmit_stage)
1423     {
1424     case TS_NEW:
1425     case TS_TRANSMITTED:
1426       /* nothing to do */
1427       break;
1428     case TS_QUEUED:
1429     case TS_TRANSMITTED_QUEUED:
1430       th = &n->transmit_handle;
1431       if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
1432         {
1433           GNUNET_SCHEDULER_cancel (th->notify_delay_task);
1434           th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
1435         }
1436       GNUNET_assert (0 == th->notify (th->notify_cls, 0, NULL));
1437       break;
1438     default:
1439       GNUNET_break (0);
1440     }
1441   GNUNET_free (n);
1442   return GNUNET_YES;
1443 }
1444
1445
1446 /**
1447  * Connect to the transport service.  Note that the connection may
1448  * complete (or fail) asynchronously.
1449  *
1450  * @param cfg configuration to use
1451  * @param self our own identity (API should check that it matches
1452  *             the identity found by transport), or NULL (no check)
1453  * @param cls closure for the callbacks
1454  * @param rec receive function to call
1455  * @param nc function to call on connect events
1456  * @param nd function to call on disconnect events
1457  */
1458 struct GNUNET_TRANSPORT_Handle *
1459 GNUNET_TRANSPORT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
1460                           const struct GNUNET_PeerIdentity *self,
1461                           void *cls,
1462                           GNUNET_TRANSPORT_ReceiveCallback rec,
1463                           GNUNET_TRANSPORT_NotifyConnect nc,
1464                           GNUNET_TRANSPORT_NotifyDisconnect nd)
1465 {
1466   struct GNUNET_TRANSPORT_Handle *ret;
1467
1468   ret = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_Handle));
1469   if (self != NULL)
1470     {
1471       ret->self = *self;
1472       ret->check_self = GNUNET_YES;
1473     }
1474   ret->cfg = cfg;
1475   ret->cls = cls;
1476   ret->rec = rec;
1477   ret->nc_cb = nc;
1478   ret->nd_cb = nd;
1479   ret->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
1480   ret->neighbours = GNUNET_CONTAINER_multihashmap_create(STARTING_NEIGHBOURS_SIZE);
1481   schedule_reconnect (ret);
1482   return ret;
1483 }
1484
1485
1486 /**
1487  * Disconnect from the transport service.
1488  */
1489 void
1490 GNUNET_TRANSPORT_disconnect (struct GNUNET_TRANSPORT_Handle *handle)
1491 {
1492   struct HelloWaitList *hwl;
1493   struct GNUNET_CLIENT_Connection *client;
1494   struct ControlMessage *cm;
1495
1496 #if DEBUG_TRANSPORT
1497   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transport disconnect called!\n");
1498 #endif
1499   handle->in_disconnect = GNUNET_YES;
1500
1501   GNUNET_assert(GNUNET_SYSERR !=
1502                 GNUNET_CONTAINER_multihashmap_iterate(handle->neighbours,
1503                                                       &delete_neighbours,
1504                                                       handle));
1505   GNUNET_CONTAINER_multihashmap_destroy(handle->neighbours);
1506
1507   while (NULL != (hwl = handle->hwl_head))
1508     {
1509       handle->hwl_head = hwl->next;
1510       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1511                   _
1512                   ("Disconnect while notification for `%s' still registered.\n"),
1513                   "HELLO");
1514       if (hwl->rec != NULL)
1515         hwl->rec (hwl->rec_cls, NULL);
1516       GNUNET_free (hwl);
1517     }
1518
1519   /* Check for still scheduled control messages, cancel delay tasks if so */
1520   /* Added because somehow a notify_delay_task is remaining scheduled and is ever so annoying */
1521   while ( (NULL != (cm = handle->control_head)))
1522     {
1523 #if DEBUG_TRANSPORT
1524       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1525                   "Disconnect before control message sent!\n");
1526 #endif
1527       if (cm->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
1528         {
1529           GNUNET_SCHEDULER_cancel (cm->notify_delay_task);
1530           cm->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
1531         }
1532       GNUNET_CONTAINER_DLL_remove (handle->control_head,
1533                                    handle->control_tail,
1534                                    cm);
1535       GNUNET_free (cm);
1536     }
1537   /* end check */
1538
1539   if (handle->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
1540     {
1541       GNUNET_SCHEDULER_cancel (handle->reconnect_task);
1542       handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
1543     }
1544   if (handle->quota_task != GNUNET_SCHEDULER_NO_TASK)
1545     {
1546       GNUNET_SCHEDULER_cancel (handle->quota_task);
1547       handle->quota_task = GNUNET_SCHEDULER_NO_TASK;
1548     }
1549   GNUNET_free_non_null (handle->my_hello);
1550   handle->my_hello = NULL;
1551
1552   if (NULL != handle->network_handle)
1553     {
1554       GNUNET_CLIENT_notify_transmit_ready_cancel (handle->network_handle);
1555       handle->network_handle = NULL;
1556     }
1557   if (NULL != (client = handle->client))
1558     {
1559 #if DEBUG_TRANSPORT
1560       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1561                   "Disconnecting from transport service for good.\n");
1562 #endif
1563       handle->client = NULL;
1564       GNUNET_CLIENT_disconnect (client, GNUNET_NO);
1565     }
1566   GNUNET_free (handle);
1567 }
1568
1569
1570 /**
1571  * Function we use for handling incoming messages.
1572  *
1573  * @param cls closure (struct GNUNET_TRANSPORT_Handle *)
1574  * @param msg message received, NULL on timeout or fatal error
1575  */
1576 static void
1577 demultiplexer (void *cls, const struct GNUNET_MessageHeader *msg)
1578 {
1579   struct GNUNET_TRANSPORT_Handle *h = cls;
1580   const struct DisconnectInfoMessage *dim;
1581   const struct ConnectInfoMessage *cim;
1582   const struct InboundMessage *im;
1583   const struct GNUNET_MessageHeader *imm;
1584   const struct SendOkMessage *okm;
1585   struct HelloWaitList *hwl;
1586   struct HelloWaitList *next_hwl;
1587   struct NeighbourList *n;
1588   struct GNUNET_PeerIdentity me;
1589   uint16_t size;
1590
1591   if (h->client == NULL)
1592     {
1593       /* shutdown initiated from 'GNUNET_TRANSPORT_disconnect',
1594          finish clean up work! */
1595       GNUNET_free (h);
1596       return;
1597     }
1598   if (msg == NULL)
1599     {
1600 #if DEBUG_TRANSPORT
1601       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
1602                   "Error receiving from transport service, disconnecting temporarily.\n");
1603 #endif
1604       if (h->network_handle != NULL)
1605         {
1606           GNUNET_CLIENT_notify_transmit_ready_cancel (h->network_handle);
1607           h->network_handle = NULL;
1608         }
1609       GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
1610       h->client = NULL;
1611       schedule_reconnect (h);
1612       return;
1613     }
1614   GNUNET_CLIENT_receive (h->client,
1615                          &demultiplexer, h, GNUNET_TIME_UNIT_FOREVER_REL);
1616   size = ntohs (msg->size);
1617   switch (ntohs (msg->type))
1618     {
1619     case GNUNET_MESSAGE_TYPE_HELLO:
1620       if (GNUNET_OK !=
1621           GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) msg,
1622                                &me))
1623         {
1624           GNUNET_break (0);
1625           break;
1626         }
1627 #if DEBUG_TRANSPORT
1628       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1629                   "Receiving (my own) `%s' message, I am `%4s'.\n",
1630                   "HELLO", GNUNET_i2s (&me));
1631 #endif
1632       GNUNET_free_non_null (h->my_hello);
1633       h->my_hello = NULL;
1634       if (size < sizeof (struct GNUNET_MessageHeader))
1635         {
1636           GNUNET_break (0);
1637           break;
1638         }
1639       h->my_hello = GNUNET_malloc (size);
1640       memcpy (h->my_hello, msg, size);
1641       hwl = h->hwl_head;
1642       while (NULL != hwl)
1643         {
1644           next_hwl = hwl->next;
1645           hwl->rec (hwl->rec_cls,
1646                     (const struct GNUNET_MessageHeader *) h->my_hello);
1647           hwl = next_hwl;
1648         }
1649       break;
1650     case GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT:
1651       if (size != sizeof (struct ConnectInfoMessage))
1652         {
1653           GNUNET_break (0);
1654           break;
1655         }
1656       cim = (const struct ConnectInfoMessage *) msg;
1657 #if DEBUG_TRANSPORT
1658       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1659                   "Receiving `%s' message for `%4s'.\n",
1660                   "CONNECT", GNUNET_i2s (&cim->id));
1661 #endif
1662       n = neighbour_find (h, &cim->id);
1663       if (n == NULL)
1664         n = neighbour_add (h,
1665                            &cim->id);
1666       if (n == NULL)
1667         {
1668           GNUNET_break (0);
1669           return;
1670         }
1671       GNUNET_break (n->is_connected == GNUNET_NO);
1672       if (ntohl ((&cim->ats)[ntohl (cim->ats_count)].type) != GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR)
1673         {
1674           GNUNET_break (0);
1675           return;
1676         }
1677       fprintf(stderr,"transport_api GNUNET_MESSAGE_TYPE_TRANSPORT_CONNECT ats_count %u\n",ntohl (cim->ats_count));
1678      n->is_connected = GNUNET_YES;
1679       if (h->nc_cb != NULL)
1680                   h->nc_cb (h->cls, &n->id,
1681                     NULL,
1682                     0);
1683      /*  if (h->nc_cb != NULL)
1684           h->nc_cb (h->cls, &n->id,
1685                     &(cim->ats), 
1686                     ntohl (cim->ats_count));*/
1687       break;
1688     case GNUNET_MESSAGE_TYPE_TRANSPORT_DISCONNECT:
1689       if (size != sizeof (struct DisconnectInfoMessage))
1690         {
1691           GNUNET_break (0);
1692           break;
1693         }
1694       dim = (const struct DisconnectInfoMessage *) msg;
1695       GNUNET_break (ntohl (dim->reserved) == 0);
1696 #if DEBUG_TRANSPORT_DISCONNECT
1697       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1698                   "Receiving `%s' message for `%4s'.\n",
1699                   "DISCONNECT",
1700                   GNUNET_i2s (&dim->peer));
1701 #endif
1702       n = neighbour_find (h, &dim->peer);
1703       GNUNET_break (n != NULL);
1704       if (n != NULL)
1705         neighbour_disconnect (n);       
1706       break;
1707     case GNUNET_MESSAGE_TYPE_TRANSPORT_SEND_OK:
1708       if (size != sizeof (struct SendOkMessage))
1709         {
1710           GNUNET_break (0);
1711           break;
1712         }
1713       okm = (const struct SendOkMessage *) msg;
1714 #if DEBUG_TRANSPORT
1715       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1716                   "Receiving `%s' message, transmission %s.\n", "SEND_OK",
1717                   ntohl (okm->success) == GNUNET_OK ? "succeeded" : "failed");
1718 #endif
1719       n = neighbour_find (h, &okm->peer);
1720       GNUNET_assert (n != NULL);
1721       switch (n->transmit_stage)
1722         {
1723         case TS_NEW:
1724           GNUNET_break (0);
1725           break;
1726         case TS_QUEUED:
1727           GNUNET_break (0);
1728           break;
1729         case TS_TRANSMITTED:
1730           n->transmit_stage = TS_NEW;
1731           break;
1732         case TS_TRANSMITTED_QUEUED:
1733           n->transmit_stage = TS_QUEUED;
1734           schedule_transmission (h);
1735           break;
1736         default:
1737           GNUNET_break (0);
1738         }
1739       break;
1740     case GNUNET_MESSAGE_TYPE_TRANSPORT_RECV:
1741 #if DEBUG_TRANSPORT
1742       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1743                   "Receiving `%s' message.\n", "RECV");
1744 #endif
1745       if (size <
1746           sizeof (struct InboundMessage) +
1747           sizeof (struct GNUNET_MessageHeader))
1748         {
1749           GNUNET_break (0);
1750           break;
1751         }
1752       im = (const struct InboundMessage *) msg;
1753       GNUNET_break (0 == ntohl (im->reserved));
1754       GNUNET_assert(sizeof (struct InboundMessage) + ntohl(im->ats_count) * sizeof(struct GNUNET_TRANSPORT_ATS_Information) + sizeof (struct GNUNET_MessageHeader) <= size);
1755       imm = (const struct GNUNET_MessageHeader *) &((&im->ats)[ntohl(im->ats_count)+1]);
1756       if (ntohs (imm->size) + sizeof (struct InboundMessage) + ntohl(im->ats_count) * sizeof(struct GNUNET_TRANSPORT_ATS_Information) != size)
1757         {
1758           GNUNET_break (0);
1759           break;
1760         }
1761 #if DEBUG_TRANSPORT
1762       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1763                   "Received message of type %u from `%4s'.\n",
1764                   ntohs (imm->type), GNUNET_i2s (&im->peer));
1765 #endif
1766       n = neighbour_find (h, &im->peer);
1767       if (n == NULL)
1768         {
1769           GNUNET_break (0);
1770           break;
1771         }
1772       if (n->is_connected != GNUNET_YES)
1773         {
1774           GNUNET_break (0);
1775           break;
1776         }
1777       if (ntohl ((&im->ats)[ntohl(im->ats_count)].type) != GNUNET_TRANSPORT_ATS_ARRAY_TERMINATOR)
1778         {
1779           GNUNET_break (0);
1780           return;
1781         }
1782       fprintf(stderr,"transport_api GNUNET_MESSAGE_TYPE_TRANSPORT_RECV ats_count %u\n",ntohl (im->ats_count));
1783     if (h->rec != NULL)
1784                 h->rec (h->cls, &im->peer,
1785                         imm,
1786                         NULL,
1787                         0);
1788
1789         /*h->rec (h->cls, &im->peer,
1790                 imm, 
1791                 &im->ats, 
1792                 ntohl (im->ats_count));*/
1793       break;
1794     default:
1795       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
1796                   _
1797                   ("Received unexpected message of type %u in %s:%u\n"),
1798                   ntohs (msg->type), __FILE__, __LINE__);
1799       GNUNET_break (0);
1800       break;
1801     }
1802 }
1803
1804
1805 /**
1806  * Called when our transmit request timed out before any transport
1807  * reported success connecting to the desired peer or before the
1808  * transport was ready to receive.  Signal error and free
1809  * TransmitHandle.
1810  *
1811  * @param cls the 'struct GNUNET_TRANSPORT_TransmitHandle*' that is timing out
1812  * @param tc scheduler context
1813  */
1814 static void
1815 peer_transmit_timeout (void *cls,
1816                        const struct GNUNET_SCHEDULER_TaskContext *tc)
1817 {
1818   struct GNUNET_TRANSPORT_TransmitHandle *th = cls;
1819   struct NeighbourList *n;
1820   GNUNET_CONNECTION_TransmitReadyNotify notify;
1821   void *notify_cls;
1822
1823   th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
1824   n = th->neighbour;
1825 #if DEBUG_TRANSPORT
1826   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1827               "Triggering timeout for request to transmit to `%4s' (%d)\n",
1828               GNUNET_i2s (&n->id),
1829               n->transmit_stage);
1830 #endif
1831   notify = th->notify;
1832   th->notify = NULL;
1833   notify_cls = th->notify_cls;
1834   switch (n->transmit_stage)
1835     {
1836     case TS_NEW:
1837       GNUNET_break (0);
1838       break;
1839     case TS_QUEUED:
1840       n->transmit_stage = TS_NEW;
1841       if (n->is_connected == GNUNET_NO)
1842         neighbour_free (n);
1843       break;
1844     case TS_TRANSMITTED:
1845       GNUNET_break (0);
1846       break;
1847     case TS_TRANSMITTED_QUEUED:
1848       n->transmit_stage = TS_TRANSMITTED;
1849       break;
1850     default:
1851       GNUNET_break (0);
1852     }
1853   if (NULL != notify)
1854     notify (notify_cls, 0, NULL);
1855 }
1856
1857
1858 /**
1859  * Check if we could queue a message of the given size for
1860  * transmission.  The transport service will take both its
1861  * internal buffers and bandwidth limits imposed by the
1862  * other peer into consideration when answering this query.
1863  *
1864  * @param handle connection to transport service
1865  * @param target who should receive the message
1866  * @param size how big is the message we want to transmit?
1867  * @param priority how important is the message?
1868  * @param timeout after how long should we give up (and call
1869  *        notify with buf NULL and size 0)?
1870  * @param notify function to call when we are ready to
1871  *        send such a message
1872  * @param notify_cls closure for notify
1873  * @return NULL if someone else is already waiting to be notified
1874  *         non-NULL if the notify callback was queued (can be used to cancel
1875  *         using GNUNET_TRANSPORT_notify_transmit_ready_cancel)
1876  */
1877 struct GNUNET_TRANSPORT_TransmitHandle *
1878 GNUNET_TRANSPORT_notify_transmit_ready (struct GNUNET_TRANSPORT_Handle
1879                                         *handle,
1880                                         const struct GNUNET_PeerIdentity
1881                                         *target, size_t size,
1882                                         unsigned int priority,
1883                                         struct GNUNET_TIME_Relative timeout,
1884                                         GNUNET_CONNECTION_TransmitReadyNotify
1885                                         notify, void *notify_cls)
1886 {
1887   struct GNUNET_TRANSPORT_TransmitHandle *th;
1888   struct NeighbourList *n;
1889
1890   if (size + sizeof (struct OutboundMessage) >=
1891       GNUNET_SERVER_MAX_MESSAGE_SIZE)
1892     {
1893 #if DEBUG_TRANSPORT
1894       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1895                   "Message size is %d, max allowed is %d.\n",
1896                   size + sizeof (struct OutboundMessage), GNUNET_SERVER_MAX_MESSAGE_SIZE - 1);
1897 #endif
1898       GNUNET_break (0);
1899       return NULL;
1900     }
1901 #if DEBUG_TRANSPORT
1902   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1903               "Asking transport service for transmission of %u bytes to peer `%4s' within %llu ms.\n",
1904               size, GNUNET_i2s (target),
1905               (unsigned long long) timeout.rel_value);
1906 #endif
1907   n = neighbour_find (handle, target);
1908   if (n == NULL)
1909     {
1910       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1911                   "Created neighbour entry for peer `%s'\n",
1912                   GNUNET_i2s (target));
1913       n = neighbour_add (handle, target);
1914
1915     }
1916   if (n == NULL)
1917     {
1918       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1919                   "Could not create neighbour entry for peer `%s'\n",
1920                   GNUNET_i2s (target));
1921       return NULL;
1922     }
1923
1924   /**
1925    *  Send a request connect message if not connected,
1926    *  otherwise we will never send anything to
1927    *  transport service
1928    */
1929   if (n->is_connected == GNUNET_NO)
1930     {
1931       send_request_connect_message(handle, n);
1932     }
1933
1934   switch (n->transmit_stage)
1935     {
1936     case TS_NEW:
1937       n->transmit_stage = TS_QUEUED;
1938       break;
1939     case TS_QUEUED:
1940       GNUNET_break (0);
1941       return NULL;
1942     case TS_TRANSMITTED:
1943       n->transmit_stage = TS_TRANSMITTED_QUEUED;
1944       break;
1945     case TS_TRANSMITTED_QUEUED:
1946       GNUNET_break (0);
1947       return NULL;
1948     default:
1949       GNUNET_break (0);
1950       return NULL;
1951     }
1952   th = &n->transmit_handle;
1953   th->neighbour = n;
1954   th->notify = notify;
1955   th->notify_cls = notify_cls;
1956   th->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1957   th->notify_size = size + sizeof (struct OutboundMessage);
1958   th->priority = priority;
1959   th->notify_delay_task
1960     = GNUNET_SCHEDULER_add_delayed (timeout,
1961                                     &peer_transmit_timeout, th);
1962   schedule_transmission (handle);
1963   return th;
1964 }
1965
1966
1967 /**
1968  * Cancel the specified transmission-ready notification.
1969  */
1970 void
1971 GNUNET_TRANSPORT_notify_transmit_ready_cancel (struct
1972                                                GNUNET_TRANSPORT_TransmitHandle
1973                                                *th)
1974 {
1975   struct NeighbourList *n;
1976
1977   th->notify = NULL;
1978   n = th->neighbour;
1979 #if DEBUG_TRANSPORT
1980   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1981               "Transmission request of %u bytes to `%4s' was canceled.\n",
1982               th->notify_size - sizeof (struct OutboundMessage),
1983               GNUNET_i2s (&n->id));
1984 #endif
1985   if (th->notify_delay_task != GNUNET_SCHEDULER_NO_TASK)
1986     {
1987       GNUNET_SCHEDULER_cancel (th->notify_delay_task);
1988       th->notify_delay_task = GNUNET_SCHEDULER_NO_TASK;
1989     }
1990   switch (n->transmit_stage)
1991     {
1992     case TS_NEW:
1993       GNUNET_break (0);
1994       break;
1995     case TS_QUEUED:
1996       n->transmit_stage = TS_NEW;
1997       if (n->in_disconnect == GNUNET_NO)
1998         neighbour_free (n);
1999       break;
2000     case TS_TRANSMITTED:
2001       GNUNET_break (0);
2002       break;
2003     case TS_TRANSMITTED_QUEUED:
2004       n->transmit_stage = TS_TRANSMITTED;
2005       break;
2006     default:
2007       GNUNET_break (0);
2008     }
2009 }
2010
2011
2012 /* end of transport_api.c */