some documentation and bugfix in testcase
[oweals/gnunet.git] / src / transport / plugin_transport_tcp.c
1 /*
2      This file is part of GNUnet
3      (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/plugin_transport_tcp.c
23  * @brief Implementation of the TCP transport service
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_hello_lib.h"
29 #include "gnunet_network_lib.h"
30 #include "gnunet_os_lib.h"
31 #include "gnunet_peerinfo_service.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_resolver_service.h"
34 #include "gnunet_server_lib.h"
35 #include "gnunet_service_lib.h"
36 #include "gnunet_statistics_service.h"
37 #include "gnunet_transport_service.h"
38 #include "plugin_transport.h"
39 #include "transport.h"
40
41 #define DEBUG_TCP GNUNET_YES
42
43 /**
44  * After how long do we expire an address that we
45  * learned from another peer if it is not reconfirmed
46  * by anyone?
47  */
48 #define LEARNED_ADDRESS_EXPIRATION GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 6)
49
50 /**
51  * How long until we give up on transmitting the welcome message?
52  */
53 #define WELCOME_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
54
55 /**
56  * How long until we give up on transmitting the welcome message?
57  */
58 #define HOSTNAME_RESOLVE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
59
60 /**
61  * For how many messages back to we keep transmission times?
62  */
63 #define ACK_LOG_SIZE 32
64
65 /**
66  * Initial handshake message for a session.  This header
67  * is followed by the address that the other peer used to
68  * connect to us (so that we may learn it) or the address
69  * that the other peer got from the accept call.
70  */
71 struct WelcomeMessage
72 {
73   struct GNUNET_MessageHeader header;
74
75   /**
76    * Identity of the node connecting (TCP client)
77    */
78   struct GNUNET_PeerIdentity clientIdentity;
79
80 };
81
82
83 /**
84  * Encapsulation for normal TCP traffic.
85  */
86 struct DataMessage
87 {
88   struct GNUNET_MessageHeader header;
89
90   /**
91    * For alignment.
92    */
93   uint32_t reserved GNUNET_PACKED;
94
95   /**
96    * Number of the last message that was received from the other peer.
97    */
98   uint64_t ack_in GNUNET_PACKED;
99
100   /**
101    * Number of this outgoing message.
102    */
103   uint64_t ack_out GNUNET_PACKED;
104
105   /**
106    * How long was sending this ack delayed by the other peer
107    * (estimate).  The receiver of this message can use the delay
108    * between sending his message number 'ack' and receiving this ack
109    * minus the delay as an estimate of the round-trip time.
110    */
111   struct GNUNET_TIME_RelativeNBO delay;
112
113 };
114
115
116 /**
117  * Encapsulation of all of the state of the plugin.
118  */
119 struct Plugin;
120
121
122 /**
123  * Information kept for each message that is yet to
124  * be transmitted.
125  */
126 struct PendingMessage
127 {
128
129   /**
130    * This is a linked list.
131    */
132   struct PendingMessage *next;
133
134   /**
135    * The pending message, pointer to the end
136    * of this struct, do not free!
137    */
138   struct GNUNET_MessageHeader *msg;
139
140
141   /**
142    * Continuation function to call once the message
143    * has been sent.  Can be  NULL if there is no
144    * continuation to call.
145    */
146   GNUNET_TRANSPORT_TransmitContinuation transmit_cont;
147
148   /**
149    * Closure for transmit_cont.
150    */
151   void *transmit_cont_cls;
152
153   /**
154    * Timeout value for the pending message.
155    */
156   struct GNUNET_TIME_Absolute timeout;
157
158   /**
159    * GNUNET_YES if this is a welcome message;
160    * otherwise this should be a DATA message.
161    */
162   int is_welcome;
163
164 };
165
166
167 /**
168  * Session handle for TCP connections.
169  */
170 struct Session
171 {
172
173   /**
174    * Stored in a linked list.
175    */
176   struct Session *next;
177
178   /**
179    * Pointer to the global plugin struct.
180    */
181   struct Plugin *plugin;
182
183   /**
184    * The client (used to identify this connection)
185    */
186   struct GNUNET_SERVER_Client *client;
187
188   /**
189    * gnunet-service-transport context for this connection.
190    */
191   struct ReadyList *service_context;
192
193   /**
194    * Messages currently pending for transmission
195    * to this peer, if any.
196    */
197   struct PendingMessage *pending_messages;
198
199   /**
200    * Handle for pending transmission request.
201    */
202   struct GNUNET_NETWORK_TransmitHandle *transmit_handle;
203
204   /**
205    * To whom are we talking to (set to our identity
206    * if we are still waiting for the welcome message)
207    */
208   struct GNUNET_PeerIdentity target;
209
210   /**
211    * At what time did we reset last_received last?
212    */
213   struct GNUNET_TIME_Absolute last_quota_update;
214
215   /**
216    * Address of the other peer if WE initiated the connection
217    * (and hence can be sure what it is), otherwise NULL.
218    */
219   void *connect_addr;
220
221   /**
222    * How many bytes have we received since the "last_quota_update"
223    * timestamp?
224    */
225   uint64_t last_received;
226
227   /**
228    * Our current latency estimate (in ms).
229    */
230   double latency_estimate;
231
232   /**
233    * Time when we generated the last ACK_LOG_SIZE acks.
234    * (the "last" refers to the "out_msg_counter" here)
235    */
236   struct GNUNET_TIME_Absolute gen_time[ACK_LOG_SIZE];
237
238   /**
239    * Our current sequence number.
240    */
241   uint64_t out_msg_counter;
242
243   /**
244    * Highest received incoming sequence number.
245    */
246   uint64_t max_in_msg_counter;
247
248   /**
249    * Number of bytes per ms that this peer is allowed
250    * to send to us.
251    */
252   uint32_t quota_in;
253
254   /**
255    * Length of connect_addr, can be 0.
256    */
257   size_t connect_alen;
258
259   /**
260    * Are we still expecting the welcome message? (GNUNET_YES/GNUNET_NO)
261    */
262   int expecting_welcome;
263
264   /**
265    * Are we still trying to connect?
266    */
267   int still_connecting;
268
269 };
270
271
272 /**
273  * Encapsulation of all of the state of the plugin.
274  */
275 struct Plugin
276 {
277   /**
278    * Our environment.
279    */
280   struct GNUNET_TRANSPORT_PluginEnvironment *env;
281
282   /**
283    * The listen socket.
284    */
285   struct GNUNET_NETWORK_SocketHandle *lsock;
286
287   /**
288    * List of open TCP sessions.
289    */
290   struct Session *sessions;
291
292   /**
293    * Handle for the statistics service.
294    */
295   struct GNUNET_STATISTICS_Handle *statistics;
296
297   /**
298    * Handle to the network service.
299    */
300   struct GNUNET_SERVICE_Context *service;
301
302   /**
303    * Handle to the server for this service.
304    */
305   struct GNUNET_SERVER_Handle *server;
306
307   /**
308    * Copy of the handler array where the closures are
309    * set to this struct's instance.
310    */
311   struct GNUNET_SERVER_MessageHandler *handlers;
312
313   /**
314    * ID of task used to update our addresses when one expires.
315    */
316   GNUNET_SCHEDULER_TaskIdentifier address_update_task;
317
318   /**
319    * Port that we are actually listening on.
320    */
321   uint16_t open_port;
322
323   /**
324    * Port that the user said we would have visible to the
325    * rest of the world.
326    */
327   uint16_t adv_port;
328
329 };
330
331
332 /**
333  * Find the session handle for the given peer.
334  */
335 static struct Session *
336 find_session_by_target (struct Plugin *plugin,
337                         const struct GNUNET_PeerIdentity *target)
338 {
339   struct Session *ret;
340
341   ret = plugin->sessions;
342   while ((ret != NULL) &&
343          (0 != memcmp (target,
344                        &ret->target, sizeof (struct GNUNET_PeerIdentity))))
345     ret = ret->next;
346   return ret;
347 }
348
349
350 /**
351  * Find the session handle for the given peer.
352  */
353 static struct Session *
354 find_session_by_client (struct Plugin *plugin,
355                         const struct GNUNET_SERVER_Client *client)
356 {
357   struct Session *ret;
358
359   ret = plugin->sessions;
360   while ((ret != NULL) && (client != ret->client))
361     ret = ret->next;
362   return ret;
363 }
364
365
366 /**
367  * Create a welcome message.
368  */
369 static struct PendingMessage *
370 create_welcome (size_t addrlen, const void *addr, struct Plugin *plugin)
371 {
372   struct PendingMessage *pm;
373   struct WelcomeMessage *welcome;
374
375   pm = GNUNET_malloc (sizeof (struct PendingMessage) +
376                       sizeof (struct WelcomeMessage) + addrlen);
377   pm->msg = (struct GNUNET_MessageHeader *) &pm[1];
378   welcome = (struct WelcomeMessage *) &pm[1];
379   welcome->header.size = htons (sizeof (struct WelcomeMessage) + addrlen);
380   welcome->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_TCP_WELCOME);
381   GNUNET_CRYPTO_hash (plugin->env->my_public_key,
382                       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
383                       &welcome->clientIdentity.hashPubKey);
384   memcpy (&welcome[1], addr, addrlen);
385   pm->timeout = GNUNET_TIME_relative_to_absolute (WELCOME_TIMEOUT);
386   pm->is_welcome = GNUNET_YES;
387   return pm;
388 }
389
390
391 /**
392  * Create a new session using the specified address
393  * for the welcome message.
394  *
395  * @param plugin us
396  * @param target peer to connect to
397  * @param client client to use
398  * @param addrlen IPv4 or IPv6
399  * @param addr either struct sockaddr_in or struct sockaddr_in6
400  * @return NULL connection failed / invalid address
401  */
402 static struct Session *
403 create_session (struct Plugin *plugin,
404                 const struct GNUNET_PeerIdentity *target,
405                 struct GNUNET_SERVER_Client *client,
406                 const void *addr, size_t addrlen)
407 {
408   struct Session *ret;
409
410   ret = GNUNET_malloc (sizeof (struct Session));
411   ret->plugin = plugin;
412   ret->next = plugin->sessions;
413   plugin->sessions = ret;
414   ret->client = client;
415   ret->target = *target;
416   ret->last_quota_update = GNUNET_TIME_absolute_get ();
417   ret->quota_in = plugin->env->default_quota_in;
418   ret->expecting_welcome = GNUNET_YES;
419   ret->pending_messages = create_welcome (addrlen, addr, plugin);
420   return ret;
421 }
422
423
424 /**
425  * Create a new session connecting to the specified
426  * target at the specified address.
427  *
428  * @param plugin us
429  * @param target peer to connect to
430  * @param addrlen IPv4 or IPv6
431  * @param addr either struct sockaddr_in or struct sockaddr_in6
432  * @return NULL connection failed / invalid address
433  */
434 static struct Session *
435 connect_and_create_session (struct Plugin *plugin,
436                             const struct GNUNET_PeerIdentity *target,
437                             const void *addr, size_t addrlen)
438 {
439   struct GNUNET_SERVER_Client *client;
440   struct GNUNET_NETWORK_SocketHandle *conn;
441   struct Session *session;
442   int af;
443
444   session = plugin->sessions;
445   while (session != NULL)
446     {
447       if ((0 == memcmp (target,
448                         &session->target,
449                         sizeof (struct GNUNET_PeerIdentity))) &&
450           (session->connect_alen == addrlen) &&
451           (0 == memcmp (session->connect_addr, addr, addrlen)))
452         return session;         /* already exists! */
453       session = session->next;
454     }
455
456   if (addrlen == sizeof (struct sockaddr_in))
457     af = AF_INET;
458   else if (addrlen == sizeof (struct sockaddr_in6))
459     af = AF_INET6;    
460   else
461     {
462       GNUNET_break_op (0);
463       return NULL;              /* invalid address */
464     }
465   conn = GNUNET_NETWORK_socket_create_from_sockaddr (plugin->env->sched,
466                                                      af,
467                                                      addr,
468                                                      addrlen,
469                                                      GNUNET_SERVER_MAX_MESSAGE_SIZE);
470   if (conn == NULL)
471     {
472 #if DEBUG_TCP
473       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
474                        "tcp",
475                        "Failed to create connection to peer at `%s'.\n",
476                        GNUNET_a2s(addr, addrlen));
477 #endif
478       return NULL;
479     }
480   client = GNUNET_SERVER_connect_socket (plugin->server, conn);
481   GNUNET_assert (client != NULL);
482   session = create_session (plugin, target, client, addr, addrlen);
483   session->connect_alen = addrlen;
484   session->connect_addr = GNUNET_malloc (addrlen);
485   memcpy (session->connect_addr, addr, addrlen);
486 #if DEBUG_TCP
487   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
488                    "tcp",
489                    "Creating new session %p with `%s' based on `%s' request.\n",
490                    session, 
491                    GNUNET_a2s(addr, addrlen),
492                    "send_to");
493 #endif
494   return session;
495 }
496
497
498 /**
499  * If we have pending messages, ask the server to
500  * transmit them (schedule the respective tasks, etc.)
501  *
502  * @param session for which session should we do this
503  */
504 static void process_pending_messages (struct Session *session);
505
506
507 /**
508  * Function called to notify a client about the socket
509  * begin ready to queue more data.  "buf" will be
510  * NULL and "size" zero if the socket was closed for
511  * writing in the meantime.
512  *
513  * @param cls closure
514  * @param size number of bytes available in buf
515  * @param buf where the callee should write the message
516  * @return number of bytes written to buf
517  */
518 static size_t
519 do_transmit (void *cls, size_t size, void *buf)
520 {
521   struct Session *session = cls;
522   struct PendingMessage *pm;
523   char *cbuf;
524   uint16_t msize;
525   size_t ret;
526   struct DataMessage *dm;
527
528   session->transmit_handle = NULL;
529   if (buf == NULL)
530     {
531 #if DEBUG_TCP
532       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
533                        "tcp", 
534                        "Timeout trying to transmit to peer `%4s', discarding message queue.\n",
535                        GNUNET_i2s(&session->target));
536 #endif
537       /* timeout */
538       while (NULL != (pm = session->pending_messages))
539         {
540           session->pending_messages = pm->next;
541           if (pm->transmit_cont != NULL)
542             pm->transmit_cont (pm->transmit_cont_cls,
543                                session->service_context,
544                                &session->target, GNUNET_SYSERR);
545           GNUNET_free (pm);
546         }
547       return 0;
548     }
549   ret = 0;
550   cbuf = buf;
551   while (NULL != (pm = session->pending_messages))
552     {
553       if (pm->is_welcome)
554         {
555           if (size < (msize = htons (pm->msg->size)))
556             break;
557           memcpy (cbuf, pm->msg, msize);
558           cbuf += msize;
559           ret += msize;
560           size -= msize;
561         }
562       else
563         {
564           if (size <
565               sizeof (struct DataMessage) + (msize = htons (pm->msg->size)))
566             break;
567           dm = (struct DataMessage *) cbuf;
568           dm->header.size = htons (sizeof (struct DataMessage) + msize);
569           dm->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_TCP_DATA);
570           dm->ack_out = GNUNET_htonll (++session->out_msg_counter);
571           dm->ack_in = GNUNET_htonll (session->max_in_msg_counter);
572           cbuf += sizeof (struct DataMessage);
573           ret += sizeof (struct DataMessage);
574           size -= sizeof (struct DataMessage);
575           memcpy (cbuf, pm->msg, msize);
576           cbuf += msize;
577           ret += msize;
578           size -= msize;
579         }
580       session->pending_messages = pm->next;
581       if (pm->transmit_cont != NULL)
582         pm->transmit_cont (pm->transmit_cont_cls,
583                            session->service_context,
584                            &session->target, GNUNET_OK);
585       GNUNET_free (pm);
586       session->gen_time[session->out_msg_counter % ACK_LOG_SIZE]
587         = GNUNET_TIME_absolute_get ();
588     }
589   process_pending_messages (session);
590 #if DEBUG_TCP
591   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
592                    "tcp", "Transmitting %u bytes\n", ret);
593 #endif
594   return ret;
595 }
596
597
598 /**
599  * If we have pending messages, ask the server to
600  * transmit them (schedule the respective tasks, etc.)
601  *
602  * @param session for which session should we do this
603  */
604 static void
605 process_pending_messages (struct Session *session)
606 {
607   GNUNET_assert (session->client != NULL);
608   if (session->pending_messages == NULL)
609     return;
610   if (session->transmit_handle != NULL)
611     return;
612   session->transmit_handle
613     = GNUNET_SERVER_notify_transmit_ready (session->client,
614                                            htons (session->
615                                                   pending_messages->msg->
616                                                   size) +
617                                            (session->
618                                             pending_messages->is_welcome ? 0 :
619                                             sizeof (struct DataMessage)),
620                                            GNUNET_TIME_absolute_get_remaining
621                                            (session->
622                                             pending_messages[0].timeout),
623                                            &do_transmit, session);
624 }
625
626
627 /**
628  * Function that can be used by the transport service to transmit
629  * a message using the plugin using a fresh connection (even if
630  * we already have a connection to this peer, this function is
631  * required to establish a new one).
632  *
633  * @param cls closure
634  * @param target who should receive this message
635  * @param priority how important is the message
636  * @param msg1 first message to transmit
637  * @param msg2 second message to transmit (can be NULL)
638  * @param timeout how long should we try to transmit these?
639  * @param addrlen length of the address
640  * @param addr the address
641  * @return session if the transmission has been scheduled
642  *         NULL if the address format is invalid
643  */
644 static void *
645 tcp_plugin_send_to (void *cls,
646                     const struct GNUNET_PeerIdentity *target,
647                     unsigned int priority,
648                     const struct GNUNET_MessageHeader *msg1,
649                     const struct GNUNET_MessageHeader *msg2,
650                     struct GNUNET_TIME_Relative timeout,
651                     const void *addr, size_t addrlen)
652 {
653   struct Plugin *plugin = cls;
654   struct Session *session;
655   struct PendingMessage *pl;
656   struct PendingMessage *pm;
657
658   session = connect_and_create_session (plugin, target, addr, addrlen);
659   if (session == NULL)
660     {
661 #if DEBUG_TCP
662       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
663                        "tcp", "Failed to create fresh session.\n");
664 #endif
665       return NULL;
666     }
667   pl = NULL;
668   if (msg2 != NULL)
669     {
670       pm = GNUNET_malloc (sizeof (struct PendingMessage) +
671                           ntohs (msg2->size));
672       pm->msg = (struct GNUNET_MessageHeader *) &pm[1];
673       memcpy (pm->msg, msg2, ntohs (msg2->size));
674       pm->timeout = GNUNET_TIME_relative_to_absolute (timeout);
675       pm->is_welcome = GNUNET_NO;
676       pl = pm;
677     }
678   if (msg1 != NULL)
679     {
680       pm = GNUNET_malloc (sizeof (struct PendingMessage) +
681                           ntohs (msg1->size));
682       pm->msg = (struct GNUNET_MessageHeader *) &pm[1];
683       memcpy (pm->msg, msg1, ntohs (msg1->size));
684       pm->timeout = GNUNET_TIME_relative_to_absolute (timeout);
685       pm->is_welcome = GNUNET_NO;
686       pm->next = pl;
687       pl = pm;
688     }
689   /* append */
690   if (session->pending_messages != NULL)
691     {
692       pm = session->pending_messages;
693       while (pm->next != NULL)
694         pm = pm->next;
695       pm->next = pl;
696     }
697   else
698     {
699       session->pending_messages = pl;
700     }
701   process_pending_messages (session);
702   return session;
703 }
704
705
706 /**
707  * Functions with this signature are called whenever we need
708  * to close a session due to a disconnect or failure to
709  * establish a connection.
710  *
711  * @param session session to close down
712  */
713 static void
714 disconnect_session (struct Session *session)
715 {
716   struct Session *prev;
717   struct Session *pos;
718   struct PendingMessage *pm;
719
720 #if DEBUG_TCP
721   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
722                    "tcp",
723                    "Disconnecting from other peer (session %p).\n", session);
724 #endif
725   /* remove from session list */
726   prev = NULL;
727   pos = session->plugin->sessions;
728   while (pos != session)
729     {
730       prev = pos;
731       pos = pos->next;
732     }
733   if (prev == NULL)
734     session->plugin->sessions = session->next;
735   else
736     prev->next = session->next;
737   /* clean up state */
738   if (session->client != NULL)
739     {
740 #if DEBUG_TCP
741       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
742                   "Disconnecting from client address %p\n", session->client);
743 #endif
744       GNUNET_SERVER_client_drop (session->client);
745       session->client = NULL;
746     }
747   if (session->transmit_handle != NULL)
748     {
749       GNUNET_NETWORK_notify_transmit_ready_cancel (session->transmit_handle);
750       session->transmit_handle = NULL;
751     }
752   while (NULL != (pm = session->pending_messages))
753     {
754       session->pending_messages = pm->next;
755       if (NULL != pm->transmit_cont)
756         pm->transmit_cont (pm->transmit_cont_cls,
757                            session->service_context,
758                            &session->target, GNUNET_SYSERR);
759       GNUNET_free (pm);
760     }
761   /* notify transport service about disconnect */
762   session->plugin->env->receive (session->plugin->env->cls,
763                                  session,
764                                  session->service_context,
765                                  GNUNET_TIME_UNIT_ZERO,
766                                  &session->target, NULL);
767   GNUNET_free_non_null (session->connect_addr);
768   GNUNET_free (session);
769 }
770
771
772 /**
773  * Iterator callback to go over all addresses.  If we get
774  * a TCP address, increment the counter
775  *
776  * @param cls closure, points to the counter
777  * @param tname name of the transport
778  * @param expiration expiration time
779  * @param addr the address
780  * @param addrlen length of the address
781  * @return GNUNET_OK to keep the address,
782  *         GNUNET_NO to delete it from the HELLO
783  *         GNUNET_SYSERR to stop iterating (but keep current address)
784  */
785 static int
786 count_tcp_addresses (void *cls,
787                      const char *tname,
788                      struct GNUNET_TIME_Absolute expiration,
789                      const void *addr, size_t addrlen)
790 {
791   unsigned int *counter = cls;
792
793   if (0 != strcmp (tname, "tcp"))
794     return GNUNET_OK;           /* not one of ours */
795   (*counter)++;
796   return GNUNET_OK;             /* failed to connect */
797 }
798
799
800 struct ConnectContext
801 {
802   struct Plugin *plugin;
803
804   struct GNUNET_NETWORK_SocketHandle *sa;
805
806   struct PendingMessage *welcome;
807
808   unsigned int pos;
809 };
810
811
812 /**
813  * Iterator callback to go over all addresses.  If we get
814  * the "pos" TCP address, try to connect to it.
815  *
816  * @param cls closure
817  * @param tname name of the transport
818  * @param expiration expiration time
819  * @param addrlen length of the address
820  * @param addr the address
821  * @return GNUNET_OK to keep the address,
822  *         GNUNET_NO to delete it from the HELLO
823  *         GNUNET_SYSERR to stop iterating (but keep current address)
824  */
825 static int
826 try_connect_to_address (void *cls,
827                         const char *tname,
828                         struct GNUNET_TIME_Absolute expiration,
829                         const void *addr, size_t addrlen)
830 {
831   struct ConnectContext *cc = cls;
832   int af;
833
834   if (0 != strcmp (tname, "tcp"))
835     return GNUNET_OK;           /* not one of ours */
836   if (sizeof (struct sockaddr_in) == addrlen)
837     af = AF_INET;
838   else if (sizeof (struct sockaddr_in6) == addrlen)
839     af = AF_INET6;
840   else
841     {
842       /* not a valid address */
843       GNUNET_break (0);
844       return GNUNET_NO;
845     }
846   if (0 == cc->pos--)
847     {
848       cc->welcome = create_welcome (addrlen, addr, cc->plugin);
849       cc->sa =
850         GNUNET_NETWORK_socket_create_from_sockaddr (cc->plugin->env->sched,
851                                                     af, addr, addrlen,
852                                                     GNUNET_SERVER_MAX_MESSAGE_SIZE);
853 #if DEBUG_TCP
854       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
855                        "tcp", "Connected to other peer.\n");
856 #endif
857       return GNUNET_SYSERR;
858     }
859   return GNUNET_OK;             /* failed to connect */
860 }
861
862
863 /**
864  * Type of an iterator over the hosts.  Note that each
865  * host will be called with each available protocol.
866  *
867  * @param cls closure
868  * @param peer id of the peer, NULL for last call
869  * @param hello hello message for the peer (can be NULL)
870  * @param trust amount of trust we have in the peer
871  */
872 static void
873 session_try_connect (void *cls,
874                      const struct GNUNET_PeerIdentity *peer,
875                      const struct GNUNET_HELLO_Message *hello, uint32_t trust)
876 {
877   struct Session *session = cls;
878   unsigned int count;
879   struct ConnectContext cctx;
880   struct PendingMessage *pm;
881
882   if (peer == NULL)
883     {
884       /* last call, destroy session if we are still not
885          connected */
886       if (session->still_connecting == GNUNET_NO)
887         {
888 #if DEBUG_TCP
889           GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
890                            "tcp",
891                            "Connected to other peer, now processing messages.\n");
892 #endif
893           process_pending_messages (session);
894         }
895       else
896         {
897 #if DEBUG_TCP
898           GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
899                            "tcp",
900                            "Failed to connect to other peer, now closing session.\n");
901 #endif
902           disconnect_session (session);
903         }
904       return;
905     }
906   if ((hello == NULL) || (session->client != NULL))
907     {
908       GNUNET_break (0);         /* should this ever happen!? */
909       return;
910     }
911   count = 0;
912   GNUNET_HELLO_iterate_addresses (hello,
913                                   GNUNET_NO, &count_tcp_addresses, &count);
914   if (count == 0)
915     {
916 #if DEBUG_TCP
917       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
918                        "tcp",
919                        "Asked to connect, but have no addresses to try.\n");
920 #endif
921       return;
922     }
923   cctx.plugin = session->plugin;
924   cctx.sa = NULL;
925   cctx.welcome = NULL;
926   cctx.pos = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, count);
927   GNUNET_HELLO_iterate_addresses (hello,
928                                   GNUNET_NO, &try_connect_to_address, &cctx);
929   if (cctx.sa == NULL)
930     {
931 #if DEBUG_TCP
932       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
933                        "tcp",
934                        "Asked to connect, but all addresses failed.\n");
935 #endif
936       GNUNET_free_non_null (cctx.welcome);
937       return;
938     }
939   session->client = GNUNET_SERVER_connect_socket (session->plugin->server,
940                                                   cctx.sa);
941 #if DEBUG_TCP
942   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
943               "Connected getting client address %p\n", session->client);
944 #endif
945   if (session->client == NULL)
946     {
947       GNUNET_break (0);         /* how could this happen? */
948       GNUNET_free_non_null (cctx.welcome);
949       return;
950     }
951   pm = cctx.welcome;
952   /* prepend (!) */
953   pm->next = session->pending_messages;
954   session->pending_messages = pm;
955   session->still_connecting = GNUNET_NO;
956 #if DEBUG_TCP
957   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
958                    "tcp",
959                    "Connected to other peer, now sending `%s' message.\n",
960                    "WELCOME");
961 #endif
962 }
963
964
965 /**
966  * Function that can be used by the transport service to transmit
967  * a message using the plugin.
968  *
969  * @param cls closure
970  * @param plugin_context value we were asked to pass to this plugin
971  *        to respond to the given peer (use is optional,
972  *        but may speed up processing), can be NULL
973  * @param service_context value passed to the transport-service
974  *        to identify the neighbour
975  * @param target who should receive this message
976  * @param priority how important is the message
977  * @param msg the message to transmit
978  * @param cont continuation to call once the message has
979  *        been transmitted (or if the transport is ready
980  *        for the next transmission call; or if the
981  *        peer disconnected...)
982  * @param cont_cls closure for cont
983  * @return plugin_context that should be used next time for
984  *         sending messages to the specified peer
985  */
986 static void *
987 tcp_plugin_send (void *cls,
988                  void *plugin_context,
989                  struct ReadyList *service_context,
990                  const struct GNUNET_PeerIdentity *target,   
991                  unsigned int priority,
992                  const struct GNUNET_MessageHeader *msg,
993                  struct GNUNET_TIME_Relative timeout,
994                  GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
995 {
996   struct Plugin *plugin = cls;
997   struct Session *session = plugin_context;
998   struct PendingMessage *pm;
999   struct PendingMessage *pme;
1000
1001   if (session == NULL)
1002     session = find_session_by_target (plugin, target);  
1003   pm = GNUNET_malloc (sizeof (struct PendingMessage) + ntohs (msg->size));
1004   pm->msg = (struct GNUNET_MessageHeader *) &pm[1];
1005   memcpy (pm->msg, msg, ntohs (msg->size));
1006   pm->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1007   pm->transmit_cont = cont;
1008   pm->transmit_cont_cls = cont_cls;
1009   if (session == NULL)
1010     {
1011       session = GNUNET_malloc (sizeof (struct Session));
1012 #if DEBUG_TCP
1013       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1014                        "tcp",
1015                        "Asked to transmit, creating fresh session %p.\n",
1016                        session);
1017 #endif
1018       session->next = plugin->sessions;
1019       plugin->sessions = session;
1020       session->plugin = plugin;
1021       session->target = *target;
1022       session->last_quota_update = GNUNET_TIME_absolute_get ();
1023       session->quota_in = plugin->env->default_quota_in;
1024       session->expecting_welcome = GNUNET_YES;
1025       session->still_connecting = GNUNET_YES;
1026       session->pending_messages = pm;
1027       session->service_context = service_context;
1028       GNUNET_PEERINFO_for_all (plugin->env->cfg,
1029                                plugin->env->sched,
1030                                target,
1031                                0, timeout, &session_try_connect, session);
1032       return session;
1033     }
1034   GNUNET_assert (session != NULL);
1035   GNUNET_assert (session->still_connecting == GNUNET_NO);
1036   session->service_context = service_context;
1037   /* append pm to pending_messages list */
1038   pme = session->pending_messages;
1039   if (pme == NULL)
1040     {
1041       session->pending_messages = pm;
1042     }
1043   else
1044     {
1045       while (NULL != pme->next)
1046         pme = pme->next;
1047       pme->next = pm;
1048     }
1049 #if DEBUG_TCP
1050   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1051                    "tcp", "Asked to transmit, added message to list.\n");
1052 #endif
1053   process_pending_messages (session);
1054   return session;
1055 }
1056
1057
1058
1059 /**
1060  * Function that can be called to force a disconnect from the
1061  * specified neighbour.  This should also cancel all previously
1062  * scheduled transmissions.  Obviously the transmission may have been
1063  * partially completed already, which is OK.  The plugin is supposed
1064  * to close the connection (if applicable) and no longer call the
1065  * transmit continuation(s).
1066  *
1067  * Finally, plugin MUST NOT call the services's receive function to
1068  * notify the service that the connection to the specified target was
1069  * closed after a getting this call.
1070  *
1071  * @param cls closure
1072  * @param plugin_context value we were asked to pass to this plugin
1073  *        to respond to the given peer (use is optional,
1074  *        but may speed up processing), can be NULL (if
1075  *        NULL was returned from the transmit function)
1076  * @param service_context must correspond to the service context
1077  *        of the corresponding Transmit call; the plugin should
1078  *        not cancel a send call made with a different service
1079  *        context pointer!  Never NULL.
1080  * @param target peer for which the last transmission is
1081  *        to be cancelled
1082  */
1083 static void
1084 tcp_plugin_cancel (void *cls,
1085                    void *plugin_context,
1086                    struct ReadyList *service_context,
1087                    const struct GNUNET_PeerIdentity *target)
1088 {
1089   struct Plugin *plugin = cls;
1090   struct PendingMessage *pm;
1091   struct Session *session;
1092   struct Session *next;
1093   
1094   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1095                    "tcp",
1096                    "Asked to close session with `%4s'\n",
1097                    GNUNET_i2s(target));
1098   session = plugin->sessions;
1099   while (session != NULL)
1100     {
1101       next = session->next;
1102       if (0 == memcmp (target,
1103                        &session->target, sizeof (struct GNUNET_PeerIdentity)))
1104         {
1105           pm = session->pending_messages;
1106           while (pm != NULL)
1107             {
1108               pm->transmit_cont = NULL;
1109               pm->transmit_cont_cls = NULL;
1110               pm = pm->next;
1111             }
1112           session->service_context = NULL;
1113           GNUNET_SERVER_client_disconnect (session->client);
1114           /* rest of the clean-up of the session will be done as part of
1115              disconnect_notify which should be triggered any time now */
1116         }
1117       session = next;
1118     }
1119 }
1120
1121
1122 struct PrettyPrinterContext
1123 {
1124   GNUNET_TRANSPORT_AddressStringCallback asc;
1125   void *asc_cls;
1126   uint16_t port;
1127 };
1128
1129
1130 /**
1131  * Append our port and forward the result.
1132  */
1133 static void
1134 append_port (void *cls, const char *hostname)
1135 {
1136   struct PrettyPrinterContext *ppc = cls;
1137   char *ret;
1138
1139   if (hostname == NULL)
1140     {
1141       ppc->asc (ppc->asc_cls, NULL);
1142       GNUNET_free (ppc);
1143       return;
1144     }
1145   GNUNET_asprintf (&ret, "%s:%d", hostname, ppc->port);
1146   ppc->asc (ppc->asc_cls, ret);
1147   GNUNET_free (ret);
1148 }
1149
1150
1151 /**
1152  * Convert the transports address to a nice, human-readable
1153  * format.
1154  *
1155  * @param cls closure
1156  * @param name name of the transport that generated the address
1157  * @param addr one of the addresses of the host, NULL for the last address
1158  *        the specific address format depends on the transport
1159  * @param addrlen length of the address
1160  * @param numeric should (IP) addresses be displayed in numeric form?
1161  * @param timeout after how long should we give up?
1162  * @param asc function to call on each string
1163  * @param asc_cls closure for asc
1164  */
1165 static void
1166 tcp_plugin_address_pretty_printer (void *cls,
1167                                    const char *type,
1168                                    const void *addr,
1169                                    size_t addrlen,
1170                                    int numeric,
1171                                    struct GNUNET_TIME_Relative timeout,
1172                                    GNUNET_TRANSPORT_AddressStringCallback asc,
1173                                    void *asc_cls)
1174 {
1175   struct Plugin *plugin = cls;
1176   const struct sockaddr_in *v4;
1177   const struct sockaddr_in6 *v6;
1178   struct PrettyPrinterContext *ppc;
1179
1180   if ((addrlen != sizeof (struct sockaddr_in)) &&
1181       (addrlen != sizeof (struct sockaddr_in6)))
1182     {
1183       /* invalid address */
1184       GNUNET_break_op (0);
1185       asc (asc_cls, NULL);
1186       return;
1187     }
1188   ppc = GNUNET_malloc (sizeof (struct PrettyPrinterContext));
1189   ppc->asc = asc;
1190   ppc->asc_cls = asc_cls;
1191   if (addrlen == sizeof (struct sockaddr_in))
1192     {
1193       v4 = (const struct sockaddr_in *) addr;
1194       ppc->port = ntohs (v4->sin_port);
1195     }
1196   else
1197     {
1198       v6 = (const struct sockaddr_in6 *) addr;
1199       ppc->port = ntohs (v6->sin6_port);
1200
1201     }
1202   GNUNET_RESOLVER_hostname_get (plugin->env->sched,
1203                                 plugin->env->cfg,
1204                                 addr,
1205                                 addrlen,
1206                                 !numeric, timeout, &append_port, ppc);
1207 }
1208
1209
1210 /**
1211  * Update the last-received and bandwidth quota values
1212  * for this session.
1213  *
1214  * @param session session to update
1215  * @param force set to GNUNET_YES if we should update even
1216  *        though the minimum refresh time has not yet expired
1217  */
1218 static void
1219 update_quota (struct Session *session, int force)
1220 {
1221   struct GNUNET_TIME_Absolute now;
1222   unsigned long long delta;
1223   unsigned long long total_allowed;
1224   unsigned long long total_remaining;
1225
1226   now = GNUNET_TIME_absolute_get ();
1227   delta = now.value - session->last_quota_update.value;
1228   if ((delta < MIN_QUOTA_REFRESH_TIME) && (!force))
1229     return;                     /* too early, not enough data */
1230
1231   total_allowed = session->quota_in * delta;
1232   if (total_allowed > session->last_received)
1233     {
1234       /* got less than acceptable */
1235       total_remaining = total_allowed - session->last_received;
1236       session->last_received = 0;
1237       delta = total_remaining / session->quota_in;      /* bonus seconds */
1238       if (delta > MAX_BANDWIDTH_CARRY)
1239         delta = MAX_BANDWIDTH_CARRY;    /* limit amount of carry-over */
1240     }
1241   else
1242     {
1243       /* got more than acceptable */
1244       total_remaining = 0;
1245       session->last_received -= total_allowed;
1246       delta = 0;
1247     }
1248   session->last_quota_update.value = now.value - delta;
1249 }
1250
1251
1252 /**
1253  * Set a quota for receiving data from the given peer; this is a
1254  * per-transport limit.  The transport should limit its read/select
1255  * calls to stay below the quota (in terms of incoming data).
1256  *
1257  * @param cls closure
1258  * @param peer the peer for whom the quota is given
1259  * @param quota_in quota for receiving/sending data in bytes per ms
1260  */
1261 static void
1262 tcp_plugin_set_receive_quota (void *cls,
1263                               const struct GNUNET_PeerIdentity *target,
1264                               uint32_t quota_in)
1265 {
1266   struct Plugin *plugin = cls;
1267   struct Session *session;
1268
1269   session = find_session_by_target (plugin, target);
1270   if (session->quota_in != quota_in)
1271     {
1272       update_quota (session, GNUNET_YES);
1273       if (session->quota_in > quota_in)
1274         session->last_quota_update = GNUNET_TIME_absolute_get ();
1275       session->quota_in = quota_in;
1276     }
1277 }
1278
1279
1280 /**
1281  * Check if the given port is plausible (must be either
1282  * our listen port or our advertised port).  If it is
1283  * neither, we return one of these two ports at random.
1284  *
1285  * @return either in_port or a more plausible port
1286  */
1287 static uint16_t
1288 check_port (struct Plugin *plugin, uint16_t in_port)
1289 {
1290   if ((in_port == plugin->adv_port) || (in_port == plugin->open_port))
1291     return in_port;
1292   return (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1293                                     2) == 0)
1294     ? plugin->open_port : plugin->adv_port;
1295 }
1296
1297
1298 /**
1299  * Another peer has suggested an address for this
1300  * peer and transport plugin.  Check that this could be a valid
1301  * address.  If so, consider adding it to the list
1302  * of addresses.
1303  *
1304  * @param cls closure
1305  * @param addr pointer to the address
1306  * @param addrlen length of addr
1307  * @return GNUNET_OK if this is a plausible address for this peer
1308  *         and transport
1309  */
1310 static int
1311 tcp_plugin_address_suggested (void *cls, const void *addr, size_t addrlen)
1312 {
1313   struct Plugin *plugin = cls;
1314   char buf[sizeof (struct sockaddr_in6)];
1315   struct sockaddr_in *v4;
1316   struct sockaddr_in6 *v6;
1317
1318   if ((addrlen != sizeof (struct sockaddr_in)) &&
1319       (addrlen != sizeof (struct sockaddr_in6)))
1320     {
1321       GNUNET_break_op (0);
1322       return GNUNET_SYSERR;
1323     }
1324   memcpy (buf, addr, sizeof (struct sockaddr_in6));
1325   if (addrlen == sizeof (struct sockaddr_in))
1326     {
1327       v4 = (struct sockaddr_in *) buf;
1328       v4->sin_port = htons (check_port (plugin, ntohs (v4->sin_port)));
1329     }
1330   else
1331     {
1332       v6 = (struct sockaddr_in6 *) buf;
1333       v6->sin6_port = htons (check_port (plugin, ntohs (v6->sin6_port)));
1334     }
1335 #if DEBUG_TCP
1336   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1337                    "tcp",
1338                    "Informing transport service about my address `%s'.\n",
1339                    GNUNET_a2s(addr, addrlen));
1340 #endif
1341   plugin->env->notify_address (plugin->env->cls,
1342                                "tcp",
1343                                buf, addrlen, LEARNED_ADDRESS_EXPIRATION);
1344   return GNUNET_OK;
1345 }
1346
1347
1348 /**
1349  * We've received a welcome from this peer via TCP.
1350  * Possibly create a fresh client record and send back
1351  * our welcome.
1352  *
1353  * @param cls closure
1354  * @param client identification of the client
1355  * @param message the actual message
1356  */
1357 static void
1358 handle_tcp_welcome (void *cls,
1359                     struct GNUNET_SERVER_Client *client,
1360                     const struct GNUNET_MessageHeader *message)
1361 {
1362   struct Plugin *plugin = cls;
1363   struct Session *session_c;
1364   const struct WelcomeMessage *wm;
1365   uint16_t msize;
1366   uint32_t addrlen;
1367   size_t alen;
1368   void *vaddr;
1369   const struct sockaddr *addr;
1370
1371   msize = ntohs (message->size);
1372   if (msize < sizeof (struct WelcomeMessage))
1373     {
1374       GNUNET_break_op (0);
1375       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1376       return;
1377     }
1378   wm = (const struct WelcomeMessage *) message;
1379 #if DEBUG_TCP
1380   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1381                    "tcp",
1382                    "Received `%s' message from `%4s/%p'.\n", "WELCOME",
1383                    GNUNET_i2s(&wm->clientIdentity),
1384                    client);
1385 #endif
1386   session_c = find_session_by_client (plugin, client);
1387   if (session_c == NULL)
1388     {
1389       vaddr = NULL;
1390       GNUNET_SERVER_client_get_address (client, &vaddr, &alen);
1391       GNUNET_SERVER_client_keep (client);
1392       session_c = create_session (plugin,
1393                                   &wm->clientIdentity, client, vaddr, alen);
1394 #if DEBUG_TCP
1395       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1396                        "tcp",
1397                        "Creating new session %p for incoming `%s' message.\n",
1398                        session_c, "WELCOME");
1399 #endif
1400       GNUNET_free_non_null (vaddr);
1401       process_pending_messages (session_c);
1402     }
1403   session_c->expecting_welcome = GNUNET_NO;
1404   if (0 < (addrlen = msize - sizeof (struct WelcomeMessage)))
1405     {
1406       addr = (const struct sockaddr *) &wm[1];
1407       tcp_plugin_address_suggested (plugin, addr, addrlen);
1408     }
1409   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1410 }
1411
1412
1413 /**
1414  * Calculate how long we should delay reading from the TCP socket to
1415  * ensure that we stay within our bandwidth limits (push back).
1416  *
1417  * @param session for which client should this be calculated
1418  */
1419 static struct GNUNET_TIME_Relative
1420 calculate_throttle_delay (struct Session *session)
1421 {
1422   struct GNUNET_TIME_Relative ret;
1423   struct GNUNET_TIME_Absolute now;
1424   uint64_t del;
1425   uint64_t avail;
1426   uint64_t excess;
1427
1428   now = GNUNET_TIME_absolute_get ();
1429   del = now.value - session->last_quota_update.value;
1430   if (del > MAX_BANDWIDTH_CARRY)
1431     {
1432       update_quota (session, GNUNET_YES);
1433       del = now.value - session->last_quota_update.value;
1434       GNUNET_assert (del <= MAX_BANDWIDTH_CARRY);
1435     }
1436   if (session->quota_in == 0)
1437     session->quota_in = 1;      /* avoid divison by zero */
1438   avail = del * session->quota_in;
1439   if (avail > session->last_received)
1440     return GNUNET_TIME_UNIT_ZERO;       /* can receive right now */
1441   excess = session->last_received - avail;
1442   ret.value = excess / session->quota_in;
1443   return ret;
1444 }
1445
1446
1447 /**
1448  * Task to signal the server that we can continue
1449  * receiving from the TCP client now.
1450  */
1451 static void
1452 delayed_done (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1453 {
1454   struct Session *session = cls;
1455   GNUNET_SERVER_receive_done (session->client, GNUNET_OK);
1456 }
1457
1458
1459 /**
1460  * We've received data for this peer via TCP.  Unbox,
1461  * compute latency and forward.
1462  *
1463  * @param cls closure
1464  * @param client identification of the client
1465  * @param message the actual message
1466  */
1467 static void
1468 handle_tcp_data (void *cls,
1469                  struct GNUNET_SERVER_Client *client,
1470                  const struct GNUNET_MessageHeader *message)
1471 {
1472   struct Plugin *plugin = cls;
1473   struct Session *session;
1474   const struct DataMessage *dm;
1475   uint16_t msize;
1476   const struct GNUNET_MessageHeader *msg;
1477   struct GNUNET_TIME_Relative latency;
1478   struct GNUNET_TIME_Absolute ttime;
1479   struct GNUNET_TIME_Absolute now;
1480   struct GNUNET_TIME_Relative delay;
1481   uint64_t ack_in;
1482
1483   msize = ntohs (message->size);
1484   if ((msize <
1485        sizeof (struct DataMessage) + sizeof (struct GNUNET_MessageHeader)))
1486     {
1487       GNUNET_break_op (0);
1488       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1489       return;
1490     }
1491   session = find_session_by_client (plugin, client);
1492   if ((NULL == session) || (GNUNET_YES == session->expecting_welcome))
1493     {
1494       GNUNET_break_op (0);
1495       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1496       return;
1497     }
1498 #if DEBUG_TCP
1499   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1500                    "tcp", "Receiving %u bytes from `%4s'.\n",
1501                    msize,
1502                    GNUNET_i2s(&session->target));
1503 #endif
1504   dm = (const struct DataMessage *) message;
1505   session->max_in_msg_counter = GNUNET_MAX (session->max_in_msg_counter,
1506                                             GNUNET_ntohll (dm->ack_out));
1507   msg = (const struct GNUNET_MessageHeader *) &dm[1];
1508   if (msize != sizeof (struct DataMessage) + ntohs (msg->size))
1509     {
1510       GNUNET_break_op (0);
1511       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1512       return;
1513     }
1514   /* estimate latency */
1515   ack_in = GNUNET_ntohll (dm->ack_in);
1516   if ((ack_in <= session->out_msg_counter) &&
1517       (session->out_msg_counter - ack_in < ACK_LOG_SIZE))
1518     {
1519       delay = GNUNET_TIME_relative_ntoh (dm->delay);
1520       ttime = session->gen_time[ack_in % ACK_LOG_SIZE];
1521       now = GNUNET_TIME_absolute_get ();
1522       if (delay.value > now.value - ttime.value)
1523         delay.value = 0;        /* not plausible */
1524       /* update (round-trip) latency using ageing; we
1525          use 7:1 so that we can reasonably quickly react
1526          to changes, but not so fast that latency is largely
1527          jitter... */
1528       session->latency_estimate
1529         = ((7 * session->latency_estimate) +
1530            (now.value - ttime.value - delay.value)) / 8;
1531     }
1532   latency.value = (uint64_t) session->latency_estimate;
1533   /* deliver on */
1534 #if DEBUG_TCP
1535   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1536                    "tcp",
1537                    "Forwarding data of type %u to transport service.\n",
1538                    ntohs (msg->type));
1539 #endif
1540   session->service_context
1541     = plugin->env->receive (plugin->env->cls,
1542                             session,
1543                             session->service_context,
1544                             latency, &session->target, msg);
1545   /* update bandwidth used */
1546   session->last_received += msize;
1547   update_quota (session, GNUNET_NO);
1548
1549   delay = calculate_throttle_delay (session);
1550   if (delay.value == 0)
1551     GNUNET_SERVER_receive_done (client, GNUNET_OK);
1552   else
1553     GNUNET_SCHEDULER_add_delayed (session->plugin->env->sched,
1554                                   GNUNET_NO,
1555                                   GNUNET_SCHEDULER_PRIORITY_HIGH,
1556                                   GNUNET_SCHEDULER_NO_PREREQUISITE_TASK,
1557                                   delay, &delayed_done, session);
1558 }
1559
1560
1561 /**
1562  * Handlers for the various TCP messages.
1563  */
1564 static struct GNUNET_SERVER_MessageHandler my_handlers[] = {
1565   {&handle_tcp_welcome, NULL, GNUNET_MESSAGE_TYPE_TRANSPORT_TCP_WELCOME, 0},
1566   {&handle_tcp_data, NULL, GNUNET_MESSAGE_TYPE_TRANSPORT_TCP_DATA, 0},
1567   {NULL, NULL, 0, 0}
1568 };
1569
1570
1571 static void
1572 create_tcp_handlers (struct Plugin *plugin)
1573 {
1574   unsigned int i;
1575   plugin->handlers = GNUNET_malloc (sizeof (my_handlers));
1576   memcpy (plugin->handlers, my_handlers, sizeof (my_handlers));
1577   for (i = 0;
1578        i <
1579        sizeof (my_handlers) / sizeof (struct GNUNET_SERVER_MessageHandler);
1580        i++)
1581     plugin->handlers[i].callback_cls = plugin;
1582   GNUNET_SERVER_add_handlers (plugin->server, plugin->handlers);
1583 }
1584
1585
1586 /**
1587  * Functions with this signature are called whenever a peer
1588  * is disconnected on the network level.
1589  *
1590  * @param cls closure
1591  * @param client identification of the client
1592  */
1593 static void
1594 disconnect_notify (void *cls, struct GNUNET_SERVER_Client *client)
1595 {
1596   struct Plugin *plugin = cls;
1597   struct Session *session;
1598
1599 #if DEBUG_TCP
1600   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1601                    "tcp",
1602                    "Notified about network-level disconnect of client %p.\n",
1603                    client);
1604 #endif
1605   session = find_session_by_client (plugin, client);
1606   if (session == NULL)
1607     return;                     /* unknown, nothing to do */
1608 #if DEBUG_TCP
1609   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1610                    "tcp", "Will now destroy session %p.\n", session);
1611 #endif
1612   disconnect_session (session);
1613 }
1614
1615
1616 /**
1617  * Add the IP of our network interface to the list of
1618  * our external IP addresses.
1619  */
1620 static int
1621 process_interfaces (void *cls,
1622                     const char *name,
1623                     int isDefault,
1624                     const struct sockaddr *addr, socklen_t addrlen)
1625 {
1626   struct Plugin *plugin = cls;
1627   int af;
1628   struct sockaddr_in *v4;
1629   struct sockaddr_in6 *v6;
1630
1631   af = addr->sa_family;
1632   if (af == AF_INET)
1633     {
1634       v4 = (struct sockaddr_in *) addr;
1635       v4->sin_port = htons (plugin->adv_port);
1636     }
1637   else
1638     {
1639       GNUNET_assert (af == AF_INET6);
1640       v6 = (struct sockaddr_in6 *) addr;
1641       v6->sin6_port = htons (plugin->adv_port);
1642     }
1643   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO |
1644                    GNUNET_ERROR_TYPE_BULK,
1645                    "tcp", _("Found address `%s' (%s)\n"), 
1646                    GNUNET_a2s(addr, addrlen), name);
1647   plugin->env->notify_address (plugin->env->cls,
1648                                "tcp",
1649                                addr, addrlen, GNUNET_TIME_UNIT_FOREVER_REL);
1650   return GNUNET_OK;
1651 }
1652
1653
1654 /**
1655  * Function called by the resolver for each address obtained from DNS
1656  * for our own hostname.  Add the addresses to the list of our
1657  * external IP addresses.
1658  *
1659  * @param cls closure
1660  * @param addr one of the addresses of the host, NULL for the last address
1661  * @param addrlen length of the address
1662  */
1663 static void
1664 process_hostname_ips (void *cls,
1665                       const struct sockaddr *addr, socklen_t addrlen)
1666 {
1667   struct Plugin *plugin = cls;
1668
1669   if (addr == NULL)
1670     return;
1671   plugin->env->notify_address (plugin->env->cls,
1672                                "tcp",
1673                                addr, addrlen, GNUNET_TIME_UNIT_FOREVER_REL);
1674 }
1675
1676
1677 /**
1678  * Entry point for the plugin.
1679  */
1680 void *
1681 libgnunet_plugin_transport_tcp_init (void *cls)
1682 {
1683   struct GNUNET_TRANSPORT_PluginEnvironment *env = cls;
1684   struct GNUNET_TRANSPORT_PluginFunctions *api;
1685   struct Plugin *plugin;
1686   struct GNUNET_SERVICE_Context *service;
1687   unsigned long long aport;
1688   unsigned long long bport;
1689
1690   service = GNUNET_SERVICE_start ("transport-tcp", env->sched, env->cfg);
1691   if (service == NULL)
1692     {
1693       GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING,
1694                        "tcp",
1695                        _
1696                        ("Failed to start service for `%s' transport plugin.\n"),
1697                        "tcp");
1698       return NULL;
1699     }
1700   aport = 0;
1701   if ((GNUNET_OK !=
1702        GNUNET_CONFIGURATION_get_value_number (env->cfg,
1703                                               "transport-tcp",
1704                                               "PORT",
1705                                               &bport)) ||
1706       (bport > 65535) ||
1707       ((GNUNET_OK ==
1708         GNUNET_CONFIGURATION_get_value_number (env->cfg,
1709                                                "transport-tcp",
1710                                                "ADVERTISED-PORT",
1711                                                &aport)) && (aport > 65535)))
1712     {
1713       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
1714                        "tcp",
1715                        _
1716                        ("Require valid port number for service `%s' in configuration!\n"),
1717                        "transport-tcp");
1718       GNUNET_SERVICE_stop (service);
1719       return NULL;
1720     }
1721   if (aport == 0)
1722     aport = bport;
1723   plugin = GNUNET_malloc (sizeof (struct Plugin));
1724   plugin->open_port = bport;
1725   plugin->adv_port = aport;
1726   plugin->env = env;
1727   plugin->lsock = NULL;
1728   plugin->statistics = NULL;
1729   api = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_PluginFunctions));
1730   api->cls = plugin;
1731   api->send_to = &tcp_plugin_send_to;
1732   api->send = &tcp_plugin_send;
1733   api->cancel = &tcp_plugin_cancel;
1734   api->address_pretty_printer = &tcp_plugin_address_pretty_printer;
1735   api->set_receive_quota = &tcp_plugin_set_receive_quota;
1736   api->address_suggested = &tcp_plugin_address_suggested;
1737   api->cost_estimate = 42;      /* TODO: ATS */
1738   plugin->service = service;
1739   plugin->server = GNUNET_SERVICE_get_server (service);
1740   create_tcp_handlers (plugin);
1741   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1742                    "tcp", _("TCP transport listening on port %u\n"), bport);
1743   if (aport != bport)
1744     GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
1745                      "tcp",
1746                      _
1747                      ("TCP transport advertises itself as being on port %u\n"),
1748                      aport);
1749   GNUNET_SERVER_disconnect_notify (plugin->server, &disconnect_notify,
1750                                    plugin);
1751   GNUNET_OS_network_interfaces_list (&process_interfaces, plugin);
1752   GNUNET_RESOLVER_hostname_resolve (env->sched,
1753                                     env->cfg,
1754                                     AF_UNSPEC,
1755                                     HOSTNAME_RESOLVE_TIMEOUT,
1756                                     &process_hostname_ips, plugin);
1757   return api;
1758 }
1759
1760
1761 /**
1762  * Exit point from the plugin.
1763  */
1764 void *
1765 libgnunet_plugin_transport_tcp_done (void *cls)
1766 {
1767   struct GNUNET_TRANSPORT_PluginFunctions *api = cls;
1768   struct Plugin *plugin = api->cls;
1769
1770   GNUNET_SERVICE_stop (plugin->service);
1771   GNUNET_free (plugin->handlers);
1772   GNUNET_free (plugin);
1773   GNUNET_free (api);
1774   return NULL;
1775 }
1776
1777 /* end of plugin_transport_tcp.c */