fix: memory leak
[oweals/gnunet.git] / src / transport / plugin_transport_tcp.c
1 /*
2      This file is part of GNUnet
3      (C) 2002--2012 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 3, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20 /**
21  * @file transport/plugin_transport_tcp.c
22  * @brief Implementation of the TCP transport service
23  * @author Christian Grothoff
24  */
25 #include "platform.h"
26 #include "gnunet_hello_lib.h"
27 #include "gnunet_constants.h"
28 #include "gnunet_connection_lib.h"
29 #include "gnunet_container_lib.h"
30 #include "gnunet_nat_lib.h"
31 #include "gnunet_os_lib.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_resolver_service.h"
34 #include "gnunet_server_lib.h"
35 #include "gnunet_service_lib.h"
36 #include "gnunet_signatures.h"
37 #include "gnunet_statistics_service.h"
38 #include "gnunet_transport_service.h"
39 #include "gnunet_transport_plugin.h"
40 #include "transport.h"
41
42 #define LOG(kind,...) GNUNET_log_from (kind, "transport-tcp",__VA_ARGS__)
43
44 #define PLUGIN_NAME "tcp"
45
46 /**
47  * How long until we give up on establishing an NAT connection?
48  * Must be > 4 RTT
49  */
50 #define NAT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10)
51
52
53 GNUNET_NETWORK_STRUCT_BEGIN
54
55 /**
56  * Address options
57  */
58 static uint32_t myoptions;
59
60 /**
61  * Initial handshake message for a session.
62  */
63 struct WelcomeMessage
64 {
65   /**
66    * Type is GNUNET_MESSAGE_TYPE_TRANSPORT_TCP_WELCOME.
67    */
68   struct GNUNET_MessageHeader header;
69
70   /**
71    * Identity of the node connecting (TCP client)
72    */
73   struct GNUNET_PeerIdentity clientIdentity;
74
75 };
76
77
78 /**
79  * Basically a WELCOME message, but with the purpose
80  * of giving the waiting peer a client handle to use
81  */
82 struct TCP_NAT_ProbeMessage
83 {
84   /**
85    * Type is GNUNET_MESSAGE_TYPE_TRANSPORT_TCP_NAT_PROBE.
86    */
87   struct GNUNET_MessageHeader header;
88
89   /**
90    * Identity of the sender of the message.
91    */
92   struct GNUNET_PeerIdentity clientIdentity;
93
94 };
95 GNUNET_NETWORK_STRUCT_END
96
97 /**
98  * Context for sending a NAT probe via TCP.
99  */
100 struct TCPProbeContext
101 {
102
103   /**
104    * Active probes are kept in a DLL.
105    */
106   struct TCPProbeContext *next;
107
108   /**
109    * Active probes are kept in a DLL.
110    */
111   struct TCPProbeContext *prev;
112
113   /**
114    * Probe connection.
115    */
116   struct GNUNET_CONNECTION_Handle *sock;
117
118   /**
119    * Message to be sent.
120    */
121   struct TCP_NAT_ProbeMessage message;
122
123   /**
124    * Handle to the transmission.
125    */
126   struct GNUNET_CONNECTION_TransmitHandle *transmit_handle;
127
128   /**
129    * Transport plugin handle.
130    */
131   struct Plugin *plugin;
132 };
133
134
135 GNUNET_NETWORK_STRUCT_BEGIN
136
137 /**
138  * Network format for IPv4 addresses.
139  */
140 struct IPv4TcpAddress
141 {
142         /**
143          * Optional options and flags for this address
144          */
145         uint32_t options;
146
147   /**
148    * IPv4 address, in network byte order.
149    */
150   uint32_t ipv4_addr GNUNET_PACKED;
151
152   /**
153    * Port number, in network byte order.
154    */
155   uint16_t t4_port GNUNET_PACKED;
156
157 };
158
159
160 /**
161  * Network format for IPv6 addresses.
162  */
163 struct IPv6TcpAddress
164 {
165         /**
166          * Optional flags for this address
167          */
168         uint32_t options;
169
170   /**
171    * IPv6 address.
172    */
173   struct in6_addr ipv6_addr GNUNET_PACKED;
174
175   /**
176    * Port number, in network byte order.
177    */
178   uint16_t t6_port GNUNET_PACKED;
179
180 };
181 GNUNET_NETWORK_STRUCT_END
182
183 /**
184  * Encapsulation of all of the state of the plugin.
185  */
186 struct Plugin;
187
188
189 /**
190  * Information kept for each message that is yet to
191  * be transmitted.
192  */
193 struct PendingMessage
194 {
195
196   /**
197    * This is a doubly-linked list.
198    */
199   struct PendingMessage *next;
200
201   /**
202    * This is a doubly-linked list.
203    */
204   struct PendingMessage *prev;
205
206   /**
207    * The pending message
208    */
209   const char *msg;
210
211   /**
212    * Continuation function to call once the message
213    * has been sent.  Can be NULL if there is no
214    * continuation to call.
215    */
216   GNUNET_TRANSPORT_TransmitContinuation transmit_cont;
217
218   /**
219    * Closure for transmit_cont.
220    */
221   void *transmit_cont_cls;
222
223   /**
224    * Timeout value for the pending message.
225    */
226   struct GNUNET_TIME_Absolute timeout;
227
228   /**
229    * So that the gnunet-service-transport can group messages together,
230    * these pending messages need to accept a message buffer and size
231    * instead of just a GNUNET_MessageHeader.
232    */
233   size_t message_size;
234
235 };
236
237
238 /**
239  * Session handle for TCP connections.
240  */
241 struct Session
242 {
243   /**
244    * To whom are we talking to (set to our identity
245    * if we are still waiting for the welcome message)
246    */
247   struct GNUNET_PeerIdentity target;
248
249   /**
250    * API requirement.
251    */
252   struct SessionHeader header;
253
254   /**
255    * Pointer to the global plugin struct.
256    */
257   struct Plugin *plugin;
258
259   /**
260    * The client (used to identify this connection)
261    */
262   struct GNUNET_SERVER_Client *client;
263
264   /**
265    * Task cleaning up a NAT client connection establishment attempt;
266    */
267   GNUNET_SCHEDULER_TaskIdentifier nat_connection_timeout;
268
269   /**
270    * Messages currently pending for transmission
271    * to this peer, if any.
272    */
273   struct PendingMessage *pending_messages_head;
274
275   /**
276    * Messages currently pending for transmission
277    * to this peer, if any.
278    */
279   struct PendingMessage *pending_messages_tail;
280
281   /**
282    * Handle for pending transmission request.
283    */
284   struct GNUNET_SERVER_TransmitHandle *transmit_handle;
285
286   /**
287    * ID of task used to delay receiving more to throttle sender.
288    */
289   GNUNET_SCHEDULER_TaskIdentifier receive_delay_task;
290
291   /**
292    * Session timeout task
293    */
294   GNUNET_SCHEDULER_TaskIdentifier timeout_task;
295
296   /**
297    * Address of the other peer (either based on our 'connect'
298    * call or on our 'accept' call).
299    *
300    * struct IPv4TcpAddress or struct IPv6TcpAddress
301    *
302    */
303   void *addr;
304
305   /**
306    * Length of connect_addr.
307    */
308   size_t addrlen;
309
310   /**
311    * Last activity on this connection.  Used to select preferred
312    * connection.
313    */
314   struct GNUNET_TIME_Absolute last_activity;
315
316   /**
317    * Are we still expecting the welcome message? (GNUNET_YES/GNUNET_NO)
318    */
319   int expecting_welcome;
320
321   /**
322    * Was this a connection that was inbound (we accepted)? (GNUNET_YES/GNUNET_NO)
323    */
324   int inbound;
325
326   /**
327    * Was this session created using NAT traversal?
328    */
329   int is_nat;
330
331   /**
332    * ATS network type in NBO
333    */
334   uint32_t ats_address_network_type;
335 };
336
337
338 /**
339  * Encapsulation of all of the state of the plugin.
340  */
341 struct Plugin
342 {
343   /**
344    * Our environment.
345    */
346   struct GNUNET_TRANSPORT_PluginEnvironment *env;
347
348   /**
349    * The listen socket.
350    */
351   struct GNUNET_CONNECTION_Handle *lsock;
352
353   /**
354    * Our handle to the NAT module.
355    */
356   struct GNUNET_NAT_Handle *nat;
357
358   /**
359    * Map from peer identities to sessions for the given peer.
360    */
361   struct GNUNET_CONTAINER_MultiHashMap *sessionmap;
362
363   /**
364    * Handle to the network service.
365    */
366   struct GNUNET_SERVICE_Context *service;
367
368   /**
369    * Handle to the server for this service.
370    */
371   struct GNUNET_SERVER_Handle *server;
372
373   /**
374    * Copy of the handler array where the closures are
375    * set to this struct's instance.
376    */
377   struct GNUNET_SERVER_MessageHandler *handlers;
378
379   /**
380    * Map of peers we have tried to contact behind a NAT
381    */
382   struct GNUNET_CONTAINER_MultiHashMap *nat_wait_conns;
383
384   /**
385    * List of active TCP probes.
386    */
387   struct TCPProbeContext *probe_head;
388
389   /**
390    * List of active TCP probes.
391    */
392   struct TCPProbeContext *probe_tail;
393
394   /**
395    * Handle for (DYN)DNS lookup of our external IP.
396    */
397   struct GNUNET_RESOLVER_RequestHandle *ext_dns;
398
399   /**
400    * How many more TCP sessions are we allowed to open right now?
401    */
402   unsigned long long max_connections;
403
404   /**
405    * How many more TCP sessions do we have right now?
406    */
407   unsigned long long cur_connections;
408
409   /**
410    * ID of task used to update our addresses when one expires.
411    */
412   GNUNET_SCHEDULER_TaskIdentifier address_update_task;
413
414   /**
415    * Port that we are actually listening on.
416    */
417   uint16_t open_port;
418
419   /**
420    * Port that the user said we would have visible to the
421    * rest of the world.
422    */
423   uint16_t adv_port;
424
425 };
426
427
428 /**
429  * Start session timeout
430  */
431 static void
432 start_session_timeout (struct Session *s);
433
434
435 /**
436  * Increment session timeout due to activity
437  */
438 static void
439 reschedule_session_timeout (struct Session *s);
440
441
442 /**
443  * Cancel timeout
444  */
445 static void
446 stop_session_timeout (struct Session *s);
447
448
449 /* DEBUG CODE */
450 static const char *
451 tcp_address_to_string (void *cls, const void *addr, size_t addrlen);
452
453
454 static unsigned int sessions;
455
456
457 static void 
458 inc_sessions (struct Plugin *plugin, struct Session *session, int line)
459 {
460   sessions++;
461   unsigned int size = GNUNET_CONTAINER_multihashmap_size(plugin->sessionmap);
462   if (sessions != size)
463     LOG (GNUNET_ERROR_TYPE_DEBUG, "Inconsistent sessions %u <-> session map size: %u\n",
464         sessions, size);
465   LOG (GNUNET_ERROR_TYPE_DEBUG, "%4i Session increased to %u (session map size: %u): `%s' `%s'\n",
466       line,
467       sessions,
468       size,
469       GNUNET_i2s (&session->target),
470       tcp_address_to_string (NULL, session->addr, session->addrlen));
471 }
472
473
474 static void 
475 dec_sessions (struct Plugin *plugin, struct Session *session, int line)
476 {
477   GNUNET_assert (sessions > 0);
478   unsigned int size = GNUNET_CONTAINER_multihashmap_size(plugin->sessionmap);
479   sessions--;
480   if (sessions != size)
481     LOG (GNUNET_ERROR_TYPE_DEBUG, "Inconsistent sessions %u <-> session map size: %u\n",
482       sessions, size);
483   LOG (GNUNET_ERROR_TYPE_DEBUG, "%4i Session decreased to %u (session map size: %u): `%s' `%s'\n",
484       line,
485       sessions,
486       size,
487       GNUNET_i2s (&session->target),
488       tcp_address_to_string (NULL, session->addr, session->addrlen));
489 }
490 /* DEBUG CODE */
491
492
493 /**
494  * Function to check if an inbound connection is acceptable.
495  * Mostly used to limit the total number of open connections
496  * we can have.
497  *
498  * @param cls the 'struct Plugin'
499  * @param ucred credentials, if available, otherwise NULL
500  * @param addr address
501  * @param addrlen length of address
502  * @return GNUNET_YES to allow, GNUNET_NO to deny, GNUNET_SYSERR
503  *   for unknown address family (will be denied).
504  */
505 static int
506 plugin_tcp_access_check (void *cls,
507                          const struct GNUNET_CONNECTION_Credentials *ucred,
508                          const struct sockaddr *addr, socklen_t addrlen)
509 {
510   struct Plugin *plugin = cls;
511   LOG (GNUNET_ERROR_TYPE_DEBUG,
512        "Accepting new incoming TCP connection from `%s'\n",
513        GNUNET_a2s (addr, addrlen));
514   if (plugin->cur_connections >= plugin->max_connections)
515     return GNUNET_NO;
516   plugin->cur_connections ++;
517   return GNUNET_YES;
518 }
519
520
521 /**
522  * Our external IP address/port mapping has changed.
523  *
524  * @param cls closure, the 'struct LocalAddrList'
525  * @param add_remove GNUNET_YES to mean the new public IP address, GNUNET_NO to mean
526  *     the previous (now invalid) one
527  * @param addr either the previous or the new public IP address
528  * @param addrlen actual lenght of the address
529  */
530 static void
531 tcp_nat_port_map_callback (void *cls, int add_remove,
532                            const struct sockaddr *addr, socklen_t addrlen)
533 {
534   struct Plugin *plugin = cls;
535   struct IPv4TcpAddress t4;
536   struct IPv6TcpAddress t6;
537   void *arg;
538   size_t args;
539
540   LOG (GNUNET_ERROR_TYPE_INFO,
541        "NAT notification to %s address `%s'\n",
542        (GNUNET_YES == add_remove) ? "add" : "remove",
543        GNUNET_a2s (addr, addrlen));
544   /* convert 'addr' to our internal format */
545   switch (addr->sa_family)
546   {
547   case AF_INET:
548     GNUNET_assert (addrlen == sizeof (struct sockaddr_in));
549     memset (&t4,0, sizeof (t4));
550     t4.options = htonl (myoptions);
551     t4.ipv4_addr = ((struct sockaddr_in *) addr)->sin_addr.s_addr;
552     t4.t4_port = ((struct sockaddr_in *) addr)->sin_port;
553     arg = &t4;
554     args = sizeof (t4);
555     break;
556   case AF_INET6:
557     GNUNET_assert (addrlen == sizeof (struct sockaddr_in6));
558     memset (&t6, 0, sizeof (t6));
559     memcpy (&t6.ipv6_addr, &((struct sockaddr_in6 *) addr)->sin6_addr,
560             sizeof (struct in6_addr));
561     t6.options = htonl (myoptions);
562     t6.t6_port = ((struct sockaddr_in6 *) addr)->sin6_port;
563     arg = &t6;
564     args = sizeof (t6);
565     break;
566   default:
567     GNUNET_break (0);
568     return;
569   }
570   /* modify our published address list */
571   plugin->env->notify_address (plugin->env->cls, add_remove, arg, args, "tcp");
572 }
573
574
575 /**
576  * Function called for a quick conversion of the binary address to
577  * a numeric address.  Note that the caller must not free the
578  * address and that the next call to this function is allowed
579  * to override the address again.
580  *
581  * @param cls closure ('struct Plugin*')
582  * @param addr binary address
583  * @param addrlen length of the address
584  * @return string representing the same address
585  */
586 static const char *
587 tcp_address_to_string (void *cls, const void *addr, size_t addrlen)
588 {
589   static char rbuf[INET6_ADDRSTRLEN + 12];
590   char buf[INET6_ADDRSTRLEN];
591   const void *sb;
592   struct in_addr a4;
593   struct in6_addr a6;
594   const struct IPv4TcpAddress *t4;
595   const struct IPv6TcpAddress *t6;
596   int af;
597   uint16_t port;
598   uint32_t options;
599
600   options = 0;
601   switch (addrlen)
602   {
603   case sizeof (struct IPv6TcpAddress):
604     t6 = addr;
605     af = AF_INET6;
606     port = ntohs (t6->t6_port);
607     options = ntohl (t6->options);
608     memcpy (&a6, &t6->ipv6_addr, sizeof (a6));
609     sb = &a6;
610     break;
611   case sizeof (struct IPv4TcpAddress): 
612     t4 = addr;
613     af = AF_INET;
614     port = ntohs (t4->t4_port);
615     options = ntohl (t4->options);
616     memcpy (&a4, &t4->ipv4_addr, sizeof (a4));
617     sb = &a4;
618     break;
619   default:
620     LOG (GNUNET_ERROR_TYPE_ERROR, 
621          _("Unexpected address length: %u bytes\n"),
622          (unsigned int) addrlen);
623     GNUNET_break (0);
624     return NULL;
625   }
626   if (NULL == inet_ntop (af, sb, buf, INET6_ADDRSTRLEN))
627   {
628     GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "inet_ntop");
629     return NULL;
630   }
631   GNUNET_snprintf (rbuf, sizeof (rbuf), (af == AF_INET6) ? "%s.%u.[%s]:%u" : "%s.%u.%s:%u",
632                    PLUGIN_NAME, options, buf, port);
633   return rbuf;
634 }
635
636
637 /**
638  * Function called to convert a string address to
639  * a binary address.
640  *
641  * @param cls closure ('struct Plugin*')
642  * @param addr string address
643  * @param addrlen length of the address
644  * @param buf location to store the buffer
645  * @param added location to store the number of bytes in the buffer.
646  *        If the function returns GNUNET_SYSERR, its contents are undefined.
647  * @return GNUNET_OK on success, GNUNET_SYSERR on failure
648  */
649 static int
650 tcp_string_to_address (void *cls, const char *addr, uint16_t addrlen,
651     void **buf, size_t *added)
652 {
653   struct sockaddr_storage socket_address;
654   char *address;
655   char *plugin;
656   char *optionstr;
657   uint32_t options;
658
659   /* Format tcp.options.address:port */
660   address = NULL;
661   plugin = NULL;
662   optionstr = NULL;
663   options = 0;
664   if ((NULL == addr) || (addrlen == 0))
665   {
666     GNUNET_break (0);
667     return GNUNET_SYSERR;
668   }
669   if ('\0' != addr[addrlen - 1])
670   {
671     GNUNET_break (0);
672     return GNUNET_SYSERR;
673   }
674   if (strlen (addr) != addrlen - 1)
675   {
676     GNUNET_break (0);
677     return GNUNET_SYSERR;
678   }
679   plugin = GNUNET_strdup (addr);
680   optionstr = strchr (plugin, '.');
681   if (NULL == optionstr)
682   {
683     GNUNET_break (0);
684     GNUNET_free (plugin);
685     return GNUNET_SYSERR;
686   }
687   optionstr[0] = '\0';
688   optionstr ++;
689   options = atol (optionstr);
690   address = strchr (optionstr, '.');
691   if (NULL == address)
692   {
693     GNUNET_break (0);
694     GNUNET_free (plugin);
695     return GNUNET_SYSERR;
696   }
697   address[0] = '\0';
698   address ++;
699
700   if (GNUNET_OK !=
701       GNUNET_STRINGS_to_address_ip (address, strlen (address),
702                                     &socket_address))
703   {
704     GNUNET_break (0);
705     GNUNET_free (plugin);
706     return GNUNET_SYSERR;
707   }
708
709   GNUNET_free (plugin);
710   switch (socket_address.ss_family)
711   {
712   case AF_INET:
713     {
714       struct IPv4TcpAddress *t4;
715       struct sockaddr_in *in4 = (struct sockaddr_in *) &socket_address;
716       t4 = GNUNET_malloc (sizeof (struct IPv4TcpAddress));
717       t4->options =  htonl (options);
718       t4->ipv4_addr = in4->sin_addr.s_addr;
719       t4->t4_port = in4->sin_port;
720       *buf = t4;
721       *added = sizeof (struct IPv4TcpAddress);
722       return GNUNET_OK;
723     }
724   case AF_INET6:  
725     {
726       struct IPv6TcpAddress *t6;
727       struct sockaddr_in6 *in6 = (struct sockaddr_in6 *) &socket_address;
728       t6 = GNUNET_malloc (sizeof (struct IPv6TcpAddress));
729       t6->options = htonl (options);
730       t6->ipv6_addr = in6->sin6_addr;
731       t6->t6_port = in6->sin6_port;
732       *buf = t6;
733       *added = sizeof (struct IPv6TcpAddress);
734       return GNUNET_OK;
735     }
736   default:
737     return GNUNET_SYSERR;
738   }
739 }
740
741
742 struct SessionClientCtx
743 {
744   const struct GNUNET_SERVER_Client *client;
745   struct Session *ret;
746 };
747
748
749 static int 
750 session_lookup_by_client_it (void *cls,
751                              const struct GNUNET_HashCode * key,
752                              void *value)
753 {
754   struct SessionClientCtx *sc_ctx = cls;
755   struct Session *s = value;
756
757   if (s->client == sc_ctx->client)
758   {
759     sc_ctx->ret = s;
760     return GNUNET_NO;
761   }
762   return GNUNET_YES;
763 }
764
765
766 /**
767  * Find the session handle for the given client.
768  *
769  * @param plugin the plugin
770  * @param client which client to find the session handle for
771  * @return NULL if no matching session exists
772  */
773 static struct Session *
774 lookup_session_by_client (struct Plugin *plugin,
775                           const struct GNUNET_SERVER_Client *client)
776 {
777   struct SessionClientCtx sc_ctx;
778
779   sc_ctx.client = client;
780   sc_ctx.ret = NULL;
781   GNUNET_CONTAINER_multihashmap_iterate (plugin->sessionmap, &session_lookup_by_client_it, &sc_ctx);
782   return sc_ctx.ret;
783 }
784
785
786 /**
787  * Create a new session.  Also queues a welcome message.
788  *
789  * @param plugin the plugin
790  * @param target peer to connect to
791  * @param client client to use, reference counter must have already been increased
792  * @param is_nat this a NAT session, we should wait for a client to
793  *               connect to us from an address, then assign that to
794  *               the session
795  * @return new session object
796  */
797 static struct Session *
798 create_session (struct Plugin *plugin, const struct GNUNET_PeerIdentity *target,
799                 struct GNUNET_SERVER_Client *client, int is_nat)
800 {
801   struct Session *session;
802   struct PendingMessage *pm;
803   struct WelcomeMessage welcome;
804
805   if (GNUNET_YES != is_nat)
806     GNUNET_assert (NULL != client);
807   else
808     GNUNET_assert (NULL == client);
809
810   LOG (GNUNET_ERROR_TYPE_DEBUG, 
811        "Creating new session for peer `%4s'\n",
812        GNUNET_i2s (target));
813   session = GNUNET_malloc (sizeof (struct Session));
814   session->last_activity = GNUNET_TIME_absolute_get ();
815   session->plugin = plugin;
816   session->is_nat = is_nat;
817   session->client = client;
818   session->target = *target;
819   session->expecting_welcome = GNUNET_YES;
820   session->ats_address_network_type = htonl (GNUNET_ATS_NET_UNSPECIFIED);
821   pm = GNUNET_malloc (sizeof (struct PendingMessage) +
822                       sizeof (struct WelcomeMessage));
823   pm->msg = (const char *) &pm[1];
824   pm->message_size = sizeof (struct WelcomeMessage);
825   welcome.header.size = htons (sizeof (struct WelcomeMessage));
826   welcome.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_TCP_WELCOME);
827   welcome.clientIdentity = *plugin->env->my_identity;
828   memcpy (&pm[1], &welcome, sizeof (welcome));
829   pm->timeout = GNUNET_TIME_UNIT_FOREVER_ABS;
830   GNUNET_STATISTICS_update (plugin->env->stats,
831                             gettext_noop ("# bytes currently in TCP buffers"),
832                             pm->message_size, GNUNET_NO);
833   GNUNET_CONTAINER_DLL_insert (session->pending_messages_head,
834                                session->pending_messages_tail, pm);
835   if (GNUNET_YES != is_nat)
836   {
837     GNUNET_STATISTICS_update (plugin->env->stats,
838                               gettext_noop ("# TCP sessions active"), 1,
839                               GNUNET_NO);
840   }
841   start_session_timeout (session);
842
843   return session;
844 }
845
846
847 /**
848  * If we have pending messages, ask the server to
849  * transmit them (schedule the respective tasks, etc.)
850  *
851  * @param session for which session should we do this
852  */
853 static void
854 process_pending_messages (struct Session *session);
855
856
857 /**
858  * Function called to notify a client about the socket
859  * being ready to queue more data.  "buf" will be
860  * NULL and "size" zero if the socket was closed for
861  * writing in the meantime.
862  *
863  * @param cls closure
864  * @param size number of bytes available in buf
865  * @param buf where the callee should write the message
866  * @return number of bytes written to buf
867  */
868 static size_t
869 do_transmit (void *cls, size_t size, void *buf)
870 {
871   struct Session *session = cls;
872   struct GNUNET_PeerIdentity pid;
873   struct Plugin *plugin;
874   struct PendingMessage *pos;
875   struct PendingMessage *hd;
876   struct PendingMessage *tl;
877   struct GNUNET_TIME_Absolute now;
878   char *cbuf;
879   size_t ret;
880
881   GNUNET_assert (NULL != session);
882   session->transmit_handle = NULL;
883   plugin = session->plugin;
884   if (NULL == buf)
885   {
886     LOG (GNUNET_ERROR_TYPE_DEBUG, 
887          "Timeout trying to transmit to peer `%4s', discarding message queue.\n",
888          GNUNET_i2s (&session->target));
889     /* timeout; cancel all messages that have already expired */
890     hd = NULL;
891     tl = NULL;
892     ret = 0;
893     now = GNUNET_TIME_absolute_get ();
894     while ((NULL != (pos = session->pending_messages_head)) &&
895            (pos->timeout.abs_value <= now.abs_value))
896     {
897       GNUNET_CONTAINER_DLL_remove (session->pending_messages_head,
898                                    session->pending_messages_tail, pos);
899       LOG (GNUNET_ERROR_TYPE_DEBUG,
900            "Failed to transmit %u byte message to `%4s'.\n",
901            pos->message_size, GNUNET_i2s (&session->target));
902       ret += pos->message_size;
903       GNUNET_CONTAINER_DLL_insert_after (hd, tl, tl, pos);
904     }
905     /* do this call before callbacks (so that if callbacks destroy
906      * session, they have a chance to cancel actions done by this
907      * call) */
908     process_pending_messages (session);
909     pid = session->target;
910     /* no do callbacks and do not use session again since
911      * the callbacks may abort the session */
912     while (NULL != (pos = hd))
913     {
914       GNUNET_CONTAINER_DLL_remove (hd, tl, pos);
915       if (pos->transmit_cont != NULL)
916         pos->transmit_cont (pos->transmit_cont_cls, &pid, GNUNET_SYSERR, pos->message_size, 0);
917       GNUNET_free (pos);
918     }
919     GNUNET_STATISTICS_update (plugin->env->stats,
920                               gettext_noop ("# bytes currently in TCP buffers"),
921                               -(int64_t) ret, GNUNET_NO);
922     GNUNET_STATISTICS_update (plugin->env->stats,
923                               gettext_noop
924                               ("# bytes discarded by TCP (timeout)"), ret,
925                               GNUNET_NO);
926     return 0;
927   }
928   /* copy all pending messages that would fit */
929   ret = 0;
930   cbuf = buf;
931   hd = NULL;
932   tl = NULL;
933   while (NULL != (pos = session->pending_messages_head))
934   {
935     if (ret + pos->message_size > size)
936       break;
937     GNUNET_CONTAINER_DLL_remove (session->pending_messages_head,
938                                  session->pending_messages_tail, pos);
939     GNUNET_assert (size >= pos->message_size);
940     LOG (GNUNET_ERROR_TYPE_DEBUG, 
941          "Transmitting message of type %u\n",
942          ntohs (((struct GNUNET_MessageHeader *) pos->msg)->type));
943     /* FIXME: this memcpy can be up to 7% of our total runtime */
944     memcpy (cbuf, pos->msg, pos->message_size);
945     cbuf += pos->message_size;
946     ret += pos->message_size;
947     size -= pos->message_size;
948     GNUNET_CONTAINER_DLL_insert_tail (hd, tl, pos);
949   }
950   /* schedule 'continuation' before callbacks so that callbacks that
951    * cancel everything don't cause us to use a session that no longer
952    * exists... */
953   process_pending_messages (session);
954   session->last_activity = GNUNET_TIME_absolute_get ();
955   pid = session->target;
956   /* we'll now call callbacks that may cancel the session; hence
957    * we should not use 'session' after this point */
958   while (NULL != (pos = hd))
959   {
960     GNUNET_CONTAINER_DLL_remove (hd, tl, pos);
961     if (pos->transmit_cont != NULL)
962       pos->transmit_cont (pos->transmit_cont_cls, &pid, GNUNET_OK, pos->message_size, pos->message_size); /* FIXME: include TCP overhead */
963     GNUNET_free (pos);
964   }
965   GNUNET_assert (hd == NULL);
966   GNUNET_assert (tl == NULL);
967   LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmitting %u bytes\n",
968                    ret);
969   GNUNET_STATISTICS_update (plugin->env->stats,
970                             gettext_noop ("# bytes currently in TCP buffers"),
971                             -(int64_t) ret, GNUNET_NO);
972   GNUNET_STATISTICS_update (plugin->env->stats,
973                             gettext_noop ("# bytes transmitted via TCP"), ret,
974                             GNUNET_NO);
975   return ret;
976 }
977
978
979 /**
980  * If we have pending messages, ask the server to
981  * transmit them (schedule the respective tasks, etc.)
982  *
983  * @param session for which session should we do this
984  */
985 static void
986 process_pending_messages (struct Session *session)
987 {
988   struct PendingMessage *pm;
989
990   GNUNET_assert (session->client != NULL);
991   if (session->transmit_handle != NULL)
992     return;
993   if (NULL == (pm = session->pending_messages_head))
994     return;
995
996   session->transmit_handle =
997       GNUNET_SERVER_notify_transmit_ready (session->client, pm->message_size,
998                                            GNUNET_TIME_absolute_get_remaining
999                                            (pm->timeout), &do_transmit,
1000                                            session);
1001 }
1002
1003
1004 /**
1005  * Functions with this signature are called whenever we need
1006  * to close a session due to a disconnect or failure to
1007  * establish a connection.
1008  *
1009  * @param session session to close down
1010  */
1011 static void
1012 disconnect_session (struct Session *session)
1013 {
1014   struct PendingMessage *pm;
1015   struct Plugin * plugin = session->plugin;
1016
1017   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1018        "Disconnecting session of peer `%s' address `%s'\n",
1019        GNUNET_i2s (&session->target),
1020        tcp_address_to_string (NULL, session->addr, session->addrlen));
1021
1022   stop_session_timeout (session);
1023
1024   if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (plugin->sessionmap, &session->target.hashPubKey, session))
1025   {
1026     GNUNET_STATISTICS_update (session->plugin->env->stats,
1027                               gettext_noop ("# TCP sessions active"), -1,
1028                               GNUNET_NO);
1029     dec_sessions (plugin, session, __LINE__);
1030   }
1031   else GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (plugin->nat_wait_conns, &session->target.hashPubKey, session));
1032
1033   /* clean up state */
1034   if (session->transmit_handle != NULL)
1035   {
1036     GNUNET_SERVER_notify_transmit_ready_cancel (session->transmit_handle);
1037     session->transmit_handle = NULL;
1038   }
1039   session->plugin->env->session_end (session->plugin->env->cls,
1040                                      &session->target, session);
1041
1042   if (GNUNET_SCHEDULER_NO_TASK != session->nat_connection_timeout)
1043   {
1044     GNUNET_SCHEDULER_cancel (session->nat_connection_timeout);
1045     session->nat_connection_timeout = GNUNET_SCHEDULER_NO_TASK;
1046   }
1047
1048   while (NULL != (pm = session->pending_messages_head))
1049   {
1050     LOG (GNUNET_ERROR_TYPE_DEBUG, 
1051          pm->transmit_cont !=
1052          NULL ? "Could not deliver message to `%4s'.\n" :
1053          "Could not deliver message to `%4s', notifying.\n",
1054          GNUNET_i2s (&session->target));
1055     GNUNET_STATISTICS_update (session->plugin->env->stats,
1056                               gettext_noop ("# bytes currently in TCP buffers"),
1057                               -(int64_t) pm->message_size, GNUNET_NO);
1058     GNUNET_STATISTICS_update (session->plugin->env->stats,
1059                               gettext_noop
1060                               ("# bytes discarded by TCP (disconnect)"),
1061                               pm->message_size, GNUNET_NO);
1062     GNUNET_CONTAINER_DLL_remove (session->pending_messages_head,
1063                                  session->pending_messages_tail, pm);
1064     if (NULL != pm->transmit_cont)
1065       pm->transmit_cont (pm->transmit_cont_cls, &session->target,
1066                          GNUNET_SYSERR, pm->message_size, 0);
1067     GNUNET_free (pm);
1068   }
1069   if (session->receive_delay_task != GNUNET_SCHEDULER_NO_TASK)
1070   {
1071     GNUNET_SCHEDULER_cancel (session->receive_delay_task);
1072     if (NULL != session->client)
1073       GNUNET_SERVER_receive_done (session->client, GNUNET_SYSERR);
1074   }
1075   if (NULL != session->client)
1076   {
1077     GNUNET_SERVER_client_disconnect (session->client);
1078     GNUNET_SERVER_client_drop (session->client);
1079     session->client = NULL;
1080   }
1081   GNUNET_free_non_null (session->addr);
1082   GNUNET_assert (NULL == session->transmit_handle);
1083   GNUNET_free (session);
1084 }
1085
1086
1087 struct FindSessionContext
1088 {
1089   struct Session *s;
1090   int res;
1091 };
1092
1093 int session_it (void *cls,
1094                const struct GNUNET_HashCode * key,
1095                void *value)
1096 {
1097   struct FindSessionContext *res = cls;
1098   if (res->s == value)
1099   {
1100     res->res = GNUNET_OK;
1101     return GNUNET_NO;
1102   }
1103   else
1104     return GNUNET_YES;
1105 }
1106
1107 int find_session (struct Plugin *plugin, struct Session *session)
1108 {
1109   struct FindSessionContext session_map_res;
1110   struct FindSessionContext nat_map_res;
1111
1112   session_map_res.s = session;
1113   session_map_res.res = GNUNET_SYSERR;
1114   GNUNET_CONTAINER_multihashmap_iterate (plugin->sessionmap, &session_it, &session_map_res);
1115
1116   nat_map_res.s = session;
1117   nat_map_res.res = GNUNET_SYSERR;
1118   GNUNET_CONTAINER_multihashmap_iterate (plugin->nat_wait_conns, &session_it, &nat_map_res);
1119
1120   if ((session_map_res.res == GNUNET_SYSERR) && (nat_map_res.res == GNUNET_SYSERR))
1121   {
1122     GNUNET_break (0);
1123     return GNUNET_SYSERR;
1124   }
1125   return GNUNET_OK;
1126 }
1127
1128
1129 /**
1130  * Function that can be used by the transport service to transmit
1131  * a message using the plugin.   Note that in the case of a
1132  * peer disconnecting, the continuation MUST be called
1133  * prior to the disconnect notification itself.  This function
1134  * will be called with this peer's HELLO message to initiate
1135  * a fresh connection to another peer.
1136  *
1137  * @param cls closure
1138  * @param session which session must be used
1139  * @param msgbuf the message to transmit
1140  * @param msgbuf_size number of bytes in 'msgbuf'
1141  * @param priority how important is the message (most plugins will
1142  *                 ignore message priority and just FIFO)
1143  * @param to how long to wait at most for the transmission (does not
1144  *                require plugins to discard the message after the timeout,
1145  *                just advisory for the desired delay; most plugins will ignore
1146  *                this as well)
1147  * @param cont continuation to call once the message has
1148  *        been transmitted (or if the transport is ready
1149  *        for the next transmission call; or if the
1150  *        peer disconnected...); can be NULL
1151  * @param cont_cls closure for cont
1152  * @return number of bytes used (on the physical network, with overheads);
1153  *         -1 on hard errors (i.e. address invalid); 0 is a legal value
1154  *         and does NOT mean that the message was not transmitted (DV)
1155  */
1156 static ssize_t
1157 tcp_plugin_send (void *cls,
1158     struct Session *session,
1159     const char *msgbuf, size_t msgbuf_size,
1160     unsigned int priority,
1161     struct GNUNET_TIME_Relative to,
1162     GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
1163 {
1164   struct Plugin * plugin = cls;
1165   struct PendingMessage *pm;
1166
1167   GNUNET_assert (NULL != plugin);
1168   GNUNET_assert (NULL != session);
1169
1170   if (GNUNET_SYSERR == find_session(plugin, session))
1171   {
1172       LOG (GNUNET_ERROR_TYPE_ERROR,
1173            _("Trying to send with invalid session %p\n"));
1174       return GNUNET_SYSERR;
1175   }
1176
1177   /* create new message entry */
1178   pm = GNUNET_malloc (sizeof (struct PendingMessage) + msgbuf_size);
1179   pm->msg = (const char *) &pm[1];
1180   memcpy (&pm[1], msgbuf, msgbuf_size);
1181   pm->message_size = msgbuf_size;
1182   pm->timeout = GNUNET_TIME_relative_to_absolute (to);
1183   pm->transmit_cont = cont;
1184   pm->transmit_cont_cls = cont_cls;
1185
1186   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1187        "Asked to transmit %u bytes to `%s', added message to list.\n",
1188        msgbuf_size, GNUNET_i2s (&session->target));
1189
1190   if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains_value (plugin->sessionmap, 
1191                                                                   &session->target.hashPubKey, 
1192                                                                   session))
1193   {
1194     GNUNET_assert (session->client != NULL);
1195     reschedule_session_timeout (session);
1196     GNUNET_SERVER_client_set_timeout (session->client,
1197                                       GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
1198     GNUNET_STATISTICS_update (plugin->env->stats,
1199                               gettext_noop ("# bytes currently in TCP buffers"),
1200                               msgbuf_size, GNUNET_NO);
1201
1202     /* append pm to pending_messages list */
1203     GNUNET_CONTAINER_DLL_insert_tail (session->pending_messages_head,
1204                                       session->pending_messages_tail, pm);
1205
1206     process_pending_messages (session);
1207     return msgbuf_size;
1208   }
1209   else if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains_value(plugin->nat_wait_conns, &session->target.hashPubKey, session))
1210   {
1211     LOG (GNUNET_ERROR_TYPE_DEBUG, 
1212          "This NAT WAIT session for peer `%s' is not yet ready!\n",
1213          GNUNET_i2s (&session->target));
1214     reschedule_session_timeout (session);
1215     GNUNET_STATISTICS_update (plugin->env->stats,
1216                               gettext_noop ("# bytes currently in TCP buffers"),
1217                               msgbuf_size, GNUNET_NO);
1218
1219     /* append pm to pending_messages list */
1220     GNUNET_CONTAINER_DLL_insert_tail (session->pending_messages_head,
1221                                       session->pending_messages_tail, pm);
1222     return msgbuf_size;
1223   }
1224   else
1225   {
1226     LOG (GNUNET_ERROR_TYPE_ERROR,
1227          "Invalid session %p\n", session);
1228     if (NULL != cont)
1229       cont (cont_cls, &session->target, GNUNET_SYSERR, pm->message_size, 0);
1230     GNUNET_break (0);
1231     GNUNET_free (pm);
1232     return GNUNET_SYSERR; /* session does not exist here */
1233   }
1234 }
1235
1236
1237 struct SessionItCtx
1238 {
1239   void *addr;
1240   size_t addrlen;
1241   struct Session *result;
1242 };
1243
1244
1245 static int 
1246 session_lookup_it (void *cls,
1247                    const struct GNUNET_HashCode *key,
1248                    void *value)
1249 {
1250   struct SessionItCtx * si_ctx = cls;
1251   struct Session * session = value;
1252 #if 0
1253   char * a1 = strdup (tcp_address_to_string(NULL, session->addr, session->addrlen));
1254   char * a2 = strdup (tcp_address_to_string(NULL, si_ctx->addr, si_ctx->addrlen));
1255   LOG (GNUNET_ERROR_TYPE_DEBUG,
1256        "Comparing: %s %u <-> %s %u\n",
1257        a1,
1258        session->addrlen,
1259        a2,
1260        si_ctx->addrlen);
1261   GNUNET_free (a1);
1262   GNUNET_free (a2);
1263 #endif
1264   if (session->addrlen != si_ctx->addrlen)
1265   {
1266     return GNUNET_YES;
1267   }
1268   if (0 != memcmp (session->addr, si_ctx->addr, si_ctx->addrlen))
1269   {
1270     return GNUNET_YES;
1271   }
1272 #if 0
1273   a1 = strdup (tcp_address_to_string(NULL, session->addr, session->addrlen));
1274   a2 = strdup (tcp_address_to_string(NULL, si_ctx->addr, si_ctx->addrlen));
1275   LOG (GNUNET_ERROR_TYPE_DEBUG,
1276        "Comparing: %s %u <-> %s %u , OK!\n",
1277        a1,
1278        session->addrlen,
1279        a2,
1280        si_ctx->addrlen);
1281   GNUNET_free (a1);
1282   GNUNET_free (a2);
1283 #endif
1284   /* Found existing session */
1285   si_ctx->result = session;
1286   return GNUNET_NO;
1287 }
1288
1289
1290 /**
1291  * Task cleaning up a NAT connection attempt after timeout
1292  */
1293 static void
1294 nat_connect_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1295 {
1296   struct Session *session = cls;
1297
1298   session->nat_connection_timeout = GNUNET_SCHEDULER_NO_TASK;
1299   LOG (GNUNET_ERROR_TYPE_DEBUG,
1300        "NAT WAIT connection to `%4s' at `%s' could not be established, removing session\n",
1301        GNUNET_i2s (&session->target), tcp_address_to_string(NULL, session->addr, session->addrlen));
1302   disconnect_session (session);
1303 }
1304
1305
1306 /**
1307  * Create a new session to transmit data to the target
1308  * This session will used to send data to this peer and the plugin will
1309  * notify us by calling the env->session_end function
1310  *
1311  * @param cls closure
1312  * @param address pointer to the GNUNET_HELLO_Address
1313  * @return the session if the address is valid, NULL otherwise
1314  */
1315 static struct Session *
1316 tcp_plugin_get_session (void *cls,
1317                         const struct GNUNET_HELLO_Address *address)
1318 {
1319   struct Plugin *plugin = cls;
1320   struct Session *session = NULL;
1321   int af;
1322   const void *sb;
1323   size_t sbs;
1324   struct GNUNET_CONNECTION_Handle *sa;
1325   struct sockaddr_in a4;
1326   struct sockaddr_in6 a6;
1327   const struct IPv4TcpAddress *t4;
1328   const struct IPv6TcpAddress *t6;
1329   struct GNUNET_ATS_Information ats;
1330   unsigned int is_natd = GNUNET_NO;
1331   size_t addrlen;
1332
1333   GNUNET_assert (plugin != NULL);
1334   GNUNET_assert (address != NULL);
1335   addrlen = address->address_length;
1336   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1337        "Trying to get session for `%s' address of peer `%s'\n",
1338        tcp_address_to_string(NULL, address->address, address->address_length),
1339        GNUNET_i2s (&address->peer));
1340
1341   /* look for existing session */
1342   if (GNUNET_YES == 
1343       GNUNET_CONTAINER_multihashmap_contains (plugin->sessionmap, 
1344                                               &address->peer.hashPubKey))
1345   {
1346     struct SessionItCtx si_ctx;
1347
1348     si_ctx.addr = (void *) address->address;
1349     si_ctx.addrlen = address->address_length;
1350
1351     si_ctx.result = NULL;
1352
1353     GNUNET_CONTAINER_multihashmap_get_multiple (plugin->sessionmap, 
1354                                                 &address->peer.hashPubKey, 
1355                                                 &session_lookup_it, &si_ctx);
1356     if (si_ctx.result != NULL)
1357     {
1358       session = si_ctx.result;
1359       LOG (GNUNET_ERROR_TYPE_DEBUG, 
1360            "Found existing session for `%s' address `%s' session %p\n",
1361            GNUNET_i2s (&address->peer),
1362            tcp_address_to_string(NULL, address->address, address->address_length),
1363            session);
1364       return session;
1365     }
1366     LOG (GNUNET_ERROR_TYPE_DEBUG,
1367          "Existing sessions did not match address `%s' or peer `%s'\n",
1368          tcp_address_to_string(NULL, address->address, address->address_length),
1369          GNUNET_i2s (&address->peer));
1370   }
1371
1372   if (addrlen == sizeof (struct IPv6TcpAddress))
1373   {
1374     GNUNET_assert (NULL != address->address);     /* make static analysis happy */
1375     t6 = address->address;
1376     af = AF_INET6;
1377     memset (&a6, 0, sizeof (a6));
1378 #if HAVE_SOCKADDR_IN_SIN_LEN
1379     a6.sin6_len = sizeof (a6);
1380 #endif
1381     a6.sin6_family = AF_INET6;
1382     a6.sin6_port = t6->t6_port;
1383     if (t6->t6_port == 0)
1384       is_natd = GNUNET_YES;
1385     memcpy (&a6.sin6_addr, &t6->ipv6_addr, sizeof (struct in6_addr));
1386     sb = &a6;
1387     sbs = sizeof (a6);
1388   }
1389   else if (addrlen == sizeof (struct IPv4TcpAddress))
1390   {
1391     GNUNET_assert (NULL != address->address);     /* make static analysis happy */
1392     t4 = address->address;
1393     af = AF_INET;
1394     memset (&a4, 0, sizeof (a4));
1395 #if HAVE_SOCKADDR_IN_SIN_LEN
1396     a4.sin_len = sizeof (a4);
1397 #endif
1398     a4.sin_family = AF_INET;
1399     a4.sin_port = t4->t4_port;
1400     if (t4->t4_port == 0)
1401       is_natd = GNUNET_YES;
1402     a4.sin_addr.s_addr = t4->ipv4_addr;
1403     sb = &a4;
1404     sbs = sizeof (a4);
1405   }
1406   else
1407   {
1408     LOG (GNUNET_ERROR_TYPE_WARNING,
1409         _("Trying to create session for address of unexpected length %u (should be %u or %u)\n"),
1410                   addrlen, sizeof (struct IPv4TcpAddress), sizeof (struct IPv6TcpAddress));
1411     return NULL;
1412   }
1413
1414   ats = plugin->env->get_address_type (plugin->env->cls, sb ,sbs);
1415
1416   if ((is_natd == GNUNET_YES) && (addrlen == sizeof (struct IPv6TcpAddress)))
1417   {
1418     /* NAT client only works with IPv4 addresses */
1419     return NULL;
1420   }
1421
1422   if (plugin->cur_connections >= plugin->max_connections)
1423   {
1424     /* saturated */
1425     return NULL;
1426   }
1427
1428   if ((is_natd == GNUNET_YES) &&
1429       (GNUNET_YES ==
1430        GNUNET_CONTAINER_multihashmap_contains (plugin->nat_wait_conns,
1431                                                &address->peer.hashPubKey)))
1432   {
1433     /* Only do one NAT punch attempt per peer identity */
1434      return NULL;
1435   }
1436
1437   if ((is_natd == GNUNET_YES) && (NULL != plugin->nat) &&
1438       (GNUNET_NO ==
1439        GNUNET_CONTAINER_multihashmap_contains (plugin->nat_wait_conns,
1440                                                &address->peer.hashPubKey)))
1441   {
1442     LOG (GNUNET_ERROR_TYPE_DEBUG, 
1443          "Found valid IPv4 NAT address (creating session)!\n") ;
1444     session = create_session (plugin, &address->peer, NULL, GNUNET_YES);
1445     session->addrlen = 0;
1446     session->addr = NULL;
1447     session->ats_address_network_type = ats.value;
1448     session->nat_connection_timeout = GNUNET_SCHEDULER_add_delayed (NAT_TIMEOUT,
1449                                                                     &nat_connect_timeout,
1450                                                                     session);
1451     GNUNET_assert (session != NULL);
1452     GNUNET_assert (GNUNET_OK ==
1453                    GNUNET_CONTAINER_multihashmap_put (plugin->nat_wait_conns, 
1454                                                       &session->target.hashPubKey, 
1455                                                       session,
1456                                                       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
1457
1458     LOG (GNUNET_ERROR_TYPE_DEBUG, 
1459          "Created NAT WAIT connection to `%4s' at `%s'\n",
1460          GNUNET_i2s (&session->target), GNUNET_a2s (sb, sbs));
1461
1462     if (GNUNET_OK == GNUNET_NAT_run_client (plugin->nat, &a4))
1463       return session;
1464     else
1465     {
1466       LOG (GNUNET_ERROR_TYPE_DEBUG, 
1467            "Running NAT client for `%4s' at `%s' failed\n",
1468            GNUNET_i2s (&session->target), GNUNET_a2s (sb, sbs));
1469       disconnect_session (session);
1470       return NULL;
1471     }
1472   }
1473
1474   /* create new outbound session */
1475   GNUNET_assert (plugin->cur_connections <= plugin->max_connections);
1476   sa = GNUNET_CONNECTION_create_from_sockaddr (af, sb, sbs);
1477   if (sa == NULL)
1478   {
1479     LOG (GNUNET_ERROR_TYPE_DEBUG, 
1480          "Failed to create connection to `%4s' at `%s'\n",
1481          GNUNET_i2s (&address->peer), GNUNET_a2s (sb, sbs));
1482     return NULL;
1483   }
1484   plugin->cur_connections++;
1485   if (plugin->cur_connections == plugin->max_connections)
1486         GNUNET_SERVER_suspend (plugin->server); /* Maximum number of connections rechead */
1487
1488   LOG (GNUNET_ERROR_TYPE_DEBUG,
1489        "Asked to transmit to `%4s', creating fresh session using address `%s'.\n",
1490        GNUNET_i2s (&address->peer), GNUNET_a2s (sb, sbs));
1491
1492   session = create_session (plugin,
1493                             &address->peer,
1494                             GNUNET_SERVER_connect_socket (plugin->server, sa),
1495                             GNUNET_NO);
1496   session->addr = GNUNET_malloc (addrlen);
1497   memcpy (session->addr, address->address, addrlen);
1498   session->addrlen = addrlen;
1499   session->ats_address_network_type = ats.value;
1500
1501   GNUNET_CONTAINER_multihashmap_put (plugin->sessionmap, 
1502                                      &session->target.hashPubKey, 
1503                                      session, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
1504   inc_sessions (plugin, session, __LINE__);
1505   LOG (GNUNET_ERROR_TYPE_DEBUG,
1506        "Creating new session for `%s' address `%s' session %p\n",
1507        GNUNET_i2s (&address->peer),
1508        tcp_address_to_string(NULL, address->address, address->address_length),
1509        session);
1510   /* Send TCP Welcome */
1511   process_pending_messages (session);
1512
1513   return session;
1514 }
1515
1516
1517 static int 
1518 session_disconnect_it (void *cls,
1519                        const struct GNUNET_HashCode * key,
1520                        void *value)
1521 {
1522   struct Session *session = value;
1523
1524   GNUNET_STATISTICS_update (session->plugin->env->stats,
1525                             gettext_noop
1526                             ("# transport-service disconnect requests for TCP"),
1527                             1, GNUNET_NO);
1528   disconnect_session (session);
1529   return GNUNET_YES;
1530 }
1531
1532
1533 /**
1534  * Function that can be called to force a disconnect from the
1535  * specified neighbour.  This should also cancel all previously
1536  * scheduled transmissions.  Obviously the transmission may have been
1537  * partially completed already, which is OK.  The plugin is supposed
1538  * to close the connection (if applicable) and no longer call the
1539  * transmit continuation(s).
1540  *
1541  * Finally, plugin MUST NOT call the services's receive function to
1542  * notify the service that the connection to the specified target was
1543  * closed after a getting this call.
1544  *
1545  * @param cls closure
1546  * @param target peer for which the last transmission is
1547  *        to be cancelled
1548  */
1549 static void
1550 tcp_plugin_disconnect (void *cls, const struct GNUNET_PeerIdentity *target)
1551 {
1552   struct Plugin *plugin = cls;
1553
1554   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1555        "Disconnecting peer `%4s'\n", GNUNET_i2s (target));
1556   GNUNET_CONTAINER_multihashmap_get_multiple (plugin->sessionmap, &target->hashPubKey, &session_disconnect_it, plugin);
1557   GNUNET_CONTAINER_multihashmap_get_multiple (plugin->nat_wait_conns, &target->hashPubKey, &session_disconnect_it, plugin);
1558 }
1559
1560
1561 /**
1562  * Context for address to string conversion.
1563  */
1564 struct PrettyPrinterContext
1565 {
1566   /**
1567    * Function to call with the result.
1568    */
1569   GNUNET_TRANSPORT_AddressStringCallback asc;
1570
1571   /**
1572    * Clsoure for 'asc'.
1573    */
1574   void *asc_cls;
1575
1576   /**
1577    * Port to add after the IP address.
1578    */
1579   uint16_t port;
1580
1581   /**
1582    * IPv6 address
1583    */
1584   int ipv6;
1585
1586   /**
1587    * Options
1588    */
1589   uint32_t options;
1590 };
1591
1592
1593 /**
1594  * Append our port and forward the result.
1595  *
1596  * @param cls the 'struct PrettyPrinterContext*'
1597  * @param hostname hostname part of the address
1598  */
1599 static void
1600 append_port (void *cls, const char *hostname)
1601 {
1602   struct PrettyPrinterContext *ppc = cls;
1603   char *ret;
1604
1605   if (hostname == NULL)
1606   {
1607     ppc->asc (ppc->asc_cls, NULL);
1608     GNUNET_free (ppc);
1609     return;
1610   }
1611   if (GNUNET_YES == ppc->ipv6)
1612     GNUNET_asprintf (&ret, "%s.%u.[%s]:%d", PLUGIN_NAME, ppc->options, hostname, ppc->port);
1613   else
1614     GNUNET_asprintf (&ret, "%s.%u.%s:%d", PLUGIN_NAME, ppc->options, hostname, ppc->port);
1615   ppc->asc (ppc->asc_cls, ret);
1616   GNUNET_free (ret);
1617 }
1618
1619
1620 /**
1621  * Convert the transports address to a nice, human-readable
1622  * format.
1623  *
1624  * @param cls closure
1625  * @param type name of the transport that generated the address
1626  * @param addr one of the addresses of the host, NULL for the last address
1627  *        the specific address format depends on the transport
1628  * @param addrlen length of the address
1629  * @param numeric should (IP) addresses be displayed in numeric form?
1630  * @param timeout after how long should we give up?
1631  * @param asc function to call on each string
1632  * @param asc_cls closure for asc
1633  */
1634 static void
1635 tcp_plugin_address_pretty_printer (void *cls, const char *type,
1636                                    const void *addr, size_t addrlen,
1637                                    int numeric,
1638                                    struct GNUNET_TIME_Relative timeout,
1639                                    GNUNET_TRANSPORT_AddressStringCallback asc,
1640                                    void *asc_cls)
1641 {
1642   struct PrettyPrinterContext *ppc;
1643   const void *sb;
1644   size_t sbs;
1645   struct sockaddr_in a4;
1646   struct sockaddr_in6 a6;
1647   const struct IPv4TcpAddress *t4;
1648   const struct IPv6TcpAddress *t6;
1649   uint16_t port;
1650   uint32_t options;
1651
1652
1653   options = 0;
1654   if (addrlen == sizeof (struct IPv6TcpAddress))
1655   {
1656     t6 = addr;
1657     memset (&a6, 0, sizeof (a6));
1658     a6.sin6_family = AF_INET6;
1659     a6.sin6_port = t6->t6_port;
1660     memcpy (&a6.sin6_addr, &t6->ipv6_addr, sizeof (struct in6_addr));
1661     port = ntohs (t6->t6_port);
1662     options = ntohl (t6->options);
1663     sb = &a6;
1664     sbs = sizeof (a6);
1665   }
1666   else if (addrlen == sizeof (struct IPv4TcpAddress))
1667   {
1668     t4 = addr;
1669     memset (&a4, 0, sizeof (a4));
1670     a4.sin_family = AF_INET;
1671     a4.sin_port = t4->t4_port;
1672     a4.sin_addr.s_addr = t4->ipv4_addr;
1673     port = ntohs (t4->t4_port);
1674     options = ntohl (t4->options);
1675     sb = &a4;
1676     sbs = sizeof (a4);
1677   }
1678   else if (0 == addrlen)
1679   {
1680     asc (asc_cls, "<inbound connection>");
1681     asc (asc_cls, NULL);
1682     return;
1683   }
1684   else
1685   {
1686     /* invalid address */
1687     GNUNET_break_op (0);
1688     asc (asc_cls, NULL);
1689     return;
1690   }
1691   ppc = GNUNET_malloc (sizeof (struct PrettyPrinterContext));
1692   if (addrlen == sizeof (struct IPv6TcpAddress))
1693     ppc->ipv6 = GNUNET_YES;
1694   else
1695     ppc->ipv6 = GNUNET_NO;
1696   ppc->asc = asc;
1697   ppc->asc_cls = asc_cls;
1698   ppc->port = port;
1699   ppc->options = options;
1700   GNUNET_RESOLVER_hostname_get (sb, sbs, !numeric, timeout, &append_port, ppc);
1701 }
1702
1703
1704 /**
1705  * Check if the given port is plausible (must be either our listen
1706  * port or our advertised port), or any port if we are behind NAT
1707  * and do not have a port open.  If it is neither, we return
1708  * GNUNET_SYSERR.
1709  *
1710  * @param plugin global variables
1711  * @param in_port port number to check
1712  * @return GNUNET_OK if port is either open_port or adv_port
1713  */
1714 static int
1715 check_port (struct Plugin *plugin, uint16_t in_port)
1716 {
1717   if ((in_port == plugin->adv_port) || (in_port == plugin->open_port))
1718     return GNUNET_OK;
1719   return GNUNET_SYSERR;
1720 }
1721
1722
1723 /**
1724  * Function that will be called to check if a binary address for this
1725  * plugin is well-formed and corresponds to an address for THIS peer
1726  * (as per our configuration).  Naturally, if absolutely necessary,
1727  * plugins can be a bit conservative in their answer, but in general
1728  * plugins should make sure that the address does not redirect
1729  * traffic to a 3rd party that might try to man-in-the-middle our
1730  * traffic.
1731  *
1732  * @param cls closure, our 'struct Plugin*'
1733  * @param addr pointer to the address
1734  * @param addrlen length of addr
1735  * @return GNUNET_OK if this is a plausible address for this peer
1736  *         and transport, GNUNET_SYSERR if not
1737  */
1738 static int
1739 tcp_plugin_check_address (void *cls, const void *addr, size_t addrlen)
1740 {
1741   struct Plugin *plugin = cls;
1742   struct IPv4TcpAddress *v4;
1743   struct IPv6TcpAddress *v6;
1744
1745   if ((addrlen != sizeof (struct IPv4TcpAddress)) &&
1746       (addrlen != sizeof (struct IPv6TcpAddress)))
1747   {
1748     GNUNET_break_op (0);
1749     return GNUNET_SYSERR;
1750   }
1751
1752
1753   if (addrlen == sizeof (struct IPv4TcpAddress))
1754   {
1755     v4 = (struct IPv4TcpAddress *) addr;
1756     if (0 != memcmp (&v4->options, &myoptions, sizeof (myoptions)))
1757     {
1758         GNUNET_break (0);
1759         return GNUNET_SYSERR;
1760     }
1761     if (GNUNET_OK != check_port (plugin, ntohs (v4->t4_port)))
1762       return GNUNET_SYSERR;
1763     if (GNUNET_OK !=
1764         GNUNET_NAT_test_address (plugin->nat, &v4->ipv4_addr,
1765                                  sizeof (struct in_addr)))
1766       return GNUNET_SYSERR;
1767   }
1768   else
1769   {
1770     v6 = (struct IPv6TcpAddress *) addr;
1771     if (IN6_IS_ADDR_LINKLOCAL (&v6->ipv6_addr))
1772     {
1773       GNUNET_break_op (0);
1774       return GNUNET_SYSERR;
1775     }
1776     if (0 != memcmp (&v6->options, &myoptions, sizeof (myoptions)))
1777     {
1778         GNUNET_break (0);
1779         return GNUNET_SYSERR;
1780     }
1781     if (GNUNET_OK != check_port (plugin, ntohs (v6->t6_port)))
1782       return GNUNET_SYSERR;
1783     if (GNUNET_OK !=
1784         GNUNET_NAT_test_address (plugin->nat, &v6->ipv6_addr,
1785                                  sizeof (struct in6_addr)))
1786       return GNUNET_SYSERR;
1787   }
1788   return GNUNET_OK;
1789 }
1790
1791
1792 /**
1793  * We've received a nat probe from this peer via TCP.  Finish
1794  * creating the client session and resume sending of queued
1795  * messages.
1796  *
1797  * @param cls closure
1798  * @param client identification of the client
1799  * @param message the actual message
1800  */
1801 static void
1802 handle_tcp_nat_probe (void *cls, struct GNUNET_SERVER_Client *client,
1803                       const struct GNUNET_MessageHeader *message)
1804 {
1805   struct Plugin *plugin = cls;
1806   struct Session *session;
1807   const struct TCP_NAT_ProbeMessage *tcp_nat_probe;
1808   size_t alen;
1809   void *vaddr;
1810   struct IPv4TcpAddress *t4;
1811   struct IPv6TcpAddress *t6;
1812   const struct sockaddr_in *s4;
1813   const struct sockaddr_in6 *s6;
1814
1815   LOG (GNUNET_ERROR_TYPE_DEBUG, "Received NAT probe\n");
1816
1817   /* We have received a TCP NAT probe, meaning we (hopefully) initiated
1818    * a connection to this peer by running gnunet-nat-client.  This peer
1819    * received the punch message and now wants us to use the new connection
1820    * as the default for that peer.  Do so and then send a WELCOME message
1821    * so we can really be connected!
1822    */
1823   if (ntohs (message->size) != sizeof (struct TCP_NAT_ProbeMessage))
1824   {
1825     GNUNET_break_op (0);
1826     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1827     return;
1828   }
1829
1830   tcp_nat_probe = (const struct TCP_NAT_ProbeMessage *) message;
1831   if (0 ==
1832       memcmp (&tcp_nat_probe->clientIdentity, plugin->env->my_identity,
1833               sizeof (struct GNUNET_PeerIdentity)))
1834   {
1835     /* refuse connections from ourselves */
1836     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1837     return;
1838   }
1839
1840   session =
1841       GNUNET_CONTAINER_multihashmap_get (plugin->nat_wait_conns,
1842                                          &tcp_nat_probe->
1843                                          clientIdentity.hashPubKey);
1844   if (session == NULL)
1845   {
1846     LOG (GNUNET_ERROR_TYPE_DEBUG, 
1847          "Did NOT find session for NAT probe!\n");
1848     GNUNET_SERVER_receive_done (client, GNUNET_OK);
1849     return;
1850   }
1851   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1852        "Found session for NAT probe!\n");
1853
1854   if (session->nat_connection_timeout != GNUNET_SCHEDULER_NO_TASK)
1855   {
1856     GNUNET_SCHEDULER_cancel (session->nat_connection_timeout);
1857     session->nat_connection_timeout = GNUNET_SCHEDULER_NO_TASK;
1858   }
1859
1860   if (GNUNET_OK != GNUNET_SERVER_client_get_address (client, &vaddr, &alen))
1861   {
1862     GNUNET_break (0);
1863     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1864     disconnect_session (session);
1865     return;
1866   }
1867   GNUNET_assert (GNUNET_CONTAINER_multihashmap_remove
1868                  (plugin->nat_wait_conns,
1869                   &tcp_nat_probe->clientIdentity.hashPubKey,
1870                   session) == GNUNET_YES);
1871   GNUNET_CONTAINER_multihashmap_put (plugin->sessionmap,
1872                                      &session->target.hashPubKey, session, 
1873                                      GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);  
1874   session->last_activity = GNUNET_TIME_absolute_get ();
1875   session->inbound = GNUNET_NO;
1876   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1877        "Found address `%s' for incoming connection\n",
1878        GNUNET_a2s (vaddr, alen));
1879   switch (((const struct sockaddr *) vaddr)->sa_family)
1880   {
1881   case AF_INET:
1882     s4 = vaddr;
1883     t4 = GNUNET_malloc (sizeof (struct IPv4TcpAddress));
1884     t4->options = 0;
1885     t4->t4_port = s4->sin_port;
1886     t4->ipv4_addr = s4->sin_addr.s_addr;
1887     session->addr = t4;
1888     session->addrlen = sizeof (struct IPv4TcpAddress);
1889     break;
1890   case AF_INET6:
1891     s6 = vaddr;
1892     t6 = GNUNET_malloc (sizeof (struct IPv6TcpAddress));
1893     t6->options = 0;
1894     t6->t6_port = s6->sin6_port;
1895     memcpy (&t6->ipv6_addr, &s6->sin6_addr, sizeof (struct in6_addr));
1896     session->addr = t6;
1897     session->addrlen = sizeof (struct IPv6TcpAddress);
1898     break;
1899   default:
1900     GNUNET_break_op (0);
1901     LOG (GNUNET_ERROR_TYPE_DEBUG, 
1902          "Bad address for incoming connection!\n");
1903     GNUNET_free (vaddr);
1904     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1905     disconnect_session (session);
1906     return;
1907   }
1908   GNUNET_free (vaddr);
1909   GNUNET_break (NULL == session->client);
1910   GNUNET_SERVER_client_keep (client);
1911   session->client = client;
1912   inc_sessions (plugin, session, __LINE__);
1913   GNUNET_STATISTICS_update (plugin->env->stats,
1914                             gettext_noop ("# TCP sessions active"), 1,
1915                             GNUNET_NO);
1916   process_pending_messages (session);
1917   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1918 }
1919
1920
1921 /**
1922  * We've received a welcome from this peer via TCP.  Possibly create a
1923  * fresh client record and send back our welcome.
1924  *
1925  * @param cls closure
1926  * @param client identification of the client
1927  * @param message the actual message
1928  */
1929 static void
1930 handle_tcp_welcome (void *cls, struct GNUNET_SERVER_Client *client,
1931                     const struct GNUNET_MessageHeader *message)
1932 {
1933   struct Plugin *plugin = cls;
1934   const struct WelcomeMessage *wm = (const struct WelcomeMessage *) message;
1935   struct Session *session;
1936   size_t alen;
1937   void *vaddr;
1938   struct IPv4TcpAddress *t4;
1939   struct IPv6TcpAddress *t6;
1940   const struct sockaddr_in *s4;
1941   const struct sockaddr_in6 *s6;
1942
1943   if (0 ==
1944       memcmp (&wm->clientIdentity, plugin->env->my_identity,
1945               sizeof (struct GNUNET_PeerIdentity)))
1946   {
1947     /* refuse connections from ourselves */
1948     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1949     return;
1950   }
1951   LOG (GNUNET_ERROR_TYPE_DEBUG, 
1952        "Received %s message from `%4s'\n", "WELCOME",
1953        GNUNET_i2s (&wm->clientIdentity));
1954   GNUNET_STATISTICS_update (plugin->env->stats,
1955                             gettext_noop ("# TCP WELCOME messages received"), 1,
1956                             GNUNET_NO);
1957   session = lookup_session_by_client (plugin, client);
1958   if (session != NULL)
1959   {
1960     if (GNUNET_OK == GNUNET_SERVER_client_get_address (client, &vaddr, &alen))
1961     {
1962       LOG (GNUNET_ERROR_TYPE_DEBUG, 
1963            "Found existing session %p for peer `%s'\n",
1964            session,
1965            GNUNET_a2s (vaddr, alen));
1966       GNUNET_free (vaddr);
1967     }
1968   }
1969   else
1970   {
1971     GNUNET_SERVER_client_keep (client);
1972     if (plugin->service != NULL) /* Otherwise value is incremented in tcp_access_check */
1973         plugin->cur_connections++;
1974     if (plugin->cur_connections == plugin->max_connections)
1975         GNUNET_SERVER_suspend (plugin->server); /* Maximum number of connections rechead */
1976
1977     session = create_session (plugin, &wm->clientIdentity, client, GNUNET_NO);
1978     session->inbound = GNUNET_YES;
1979     if (GNUNET_OK == GNUNET_SERVER_client_get_address (client, &vaddr, &alen))
1980     {
1981       if (alen == sizeof (struct sockaddr_in))
1982       {
1983         s4 = vaddr;
1984         t4 = GNUNET_malloc (sizeof (struct IPv4TcpAddress));
1985         t4->options = htonl (0);
1986         t4->t4_port = s4->sin_port;
1987         t4->ipv4_addr = s4->sin_addr.s_addr;
1988         session->addr = t4;
1989         session->addrlen = sizeof (struct IPv4TcpAddress);
1990       }
1991       else if (alen == sizeof (struct sockaddr_in6))
1992       {
1993         s6 = vaddr;
1994         t6 = GNUNET_malloc (sizeof (struct IPv6TcpAddress));
1995         t6->options = htonl (0);
1996         t6->t6_port = s6->sin6_port;
1997         memcpy (&t6->ipv6_addr, &s6->sin6_addr, sizeof (struct in6_addr));
1998         session->addr = t6;
1999         session->addrlen = sizeof (struct IPv6TcpAddress);
2000       }
2001
2002       struct GNUNET_ATS_Information ats;
2003       ats = plugin->env->get_address_type (plugin->env->cls, vaddr ,alen);
2004       session->ats_address_network_type = ats.value;
2005
2006       GNUNET_free (vaddr);
2007     }
2008     else
2009     {
2010       LOG (GNUNET_ERROR_TYPE_DEBUG, 
2011            "Did not obtain TCP socket address for incoming connection\n");
2012     }
2013     GNUNET_CONTAINER_multihashmap_put (plugin->sessionmap, 
2014                                        &session->target.hashPubKey, 
2015                                        session, 
2016                                        GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
2017     inc_sessions (plugin, session, __LINE__);
2018   }
2019
2020   if (session->expecting_welcome != GNUNET_YES)
2021   {
2022     GNUNET_break_op (0);
2023     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2024     return;
2025   }
2026   session->last_activity = GNUNET_TIME_absolute_get ();
2027   session->expecting_welcome = GNUNET_NO;
2028
2029
2030   process_pending_messages (session);
2031
2032   GNUNET_SERVER_client_set_timeout (client,
2033                                     GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT);
2034   GNUNET_SERVER_receive_done (client, GNUNET_OK);
2035 }
2036
2037
2038 /**
2039  * Task to signal the server that we can continue
2040  * receiving from the TCP client now.
2041  *
2042  * @param cls the 'struct Session*'
2043  * @param tc task context (unused)
2044  */
2045 static void
2046 delayed_done (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2047 {
2048   struct Session *session = cls;
2049
2050   session->receive_delay_task = GNUNET_SCHEDULER_NO_TASK;
2051   reschedule_session_timeout (session);
2052
2053   GNUNET_SERVER_receive_done (session->client, GNUNET_OK);
2054 }
2055
2056
2057 /**
2058  * We've received data for this peer via TCP.  Unbox,
2059  * compute latency and forward.
2060  *
2061  * @param cls closure
2062  * @param client identification of the client
2063  * @param message the actual message
2064  */
2065 static void
2066 handle_tcp_data (void *cls, struct GNUNET_SERVER_Client *client,
2067                  const struct GNUNET_MessageHeader *message)
2068 {
2069   struct Plugin *plugin = cls;
2070   struct Session *session;
2071   struct GNUNET_TIME_Relative delay;
2072   uint16_t type;
2073
2074   type = ntohs (message->type);
2075   if ((GNUNET_MESSAGE_TYPE_TRANSPORT_TCP_WELCOME == type) ||
2076       (GNUNET_MESSAGE_TYPE_TRANSPORT_TCP_NAT_PROBE == type))
2077   {
2078     /* We don't want to propagate WELCOME and NAT Probe messages up! */
2079     GNUNET_SERVER_receive_done (client, GNUNET_OK);
2080     return;
2081   }
2082   session = lookup_session_by_client (plugin, client);
2083   if (NULL == session)
2084   {
2085     /* No inbound session found */
2086     void *vaddr;
2087     size_t alen;
2088     
2089     GNUNET_SERVER_client_get_address (client, &vaddr, &alen);
2090     LOG (GNUNET_ERROR_TYPE_ERROR, 
2091          "Received unexpected %u bytes of type %u from `%s'\n",
2092          (unsigned int) ntohs (message->size),
2093          (unsigned int) ntohs (message->type),
2094          GNUNET_a2s(vaddr, alen));
2095     GNUNET_break_op (0);
2096     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2097     GNUNET_free_non_null(vaddr);
2098     return;
2099   }
2100   else if (GNUNET_YES == session->expecting_welcome)
2101   {
2102     /* Session is expecting WELCOME message */
2103     void *vaddr;
2104     size_t alen;
2105
2106     GNUNET_SERVER_client_get_address (client, &vaddr, &alen);
2107     LOG (GNUNET_ERROR_TYPE_ERROR, 
2108          "Received unexpected %u bytes of type %u from `%s'\n",
2109          (unsigned int) ntohs (message->size),
2110          (unsigned int) ntohs (message->type),
2111          GNUNET_a2s(vaddr, alen));
2112     GNUNET_break_op (0);
2113     GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
2114     GNUNET_free_non_null(vaddr);
2115     return;
2116   }
2117
2118   session->last_activity = GNUNET_TIME_absolute_get ();
2119   LOG (GNUNET_ERROR_TYPE_DEBUG, 
2120                    "Passing %u bytes of type %u from `%4s' to transport service.\n",
2121                    (unsigned int) ntohs (message->size),
2122                    (unsigned int) ntohs (message->type),
2123                    GNUNET_i2s (&session->target));
2124
2125   GNUNET_STATISTICS_update (plugin->env->stats,
2126                             gettext_noop ("# bytes received via TCP"),
2127                             ntohs (message->size), GNUNET_NO);
2128   struct GNUNET_ATS_Information distance;
2129
2130   distance.type = htonl (GNUNET_ATS_NETWORK_TYPE);
2131   distance.value = session->ats_address_network_type;
2132   GNUNET_break (ntohl(session->ats_address_network_type) != GNUNET_ATS_NET_UNSPECIFIED);
2133
2134   GNUNET_assert (GNUNET_CONTAINER_multihashmap_contains_value (plugin->sessionmap,
2135       &session->target.hashPubKey,
2136       session));
2137
2138   delay = plugin->env->receive (plugin->env->cls,
2139                                 &session->target,
2140                                 message,
2141                                 session,
2142                                 (GNUNET_YES == session->inbound) ? NULL : session->addr,
2143                                 (GNUNET_YES == session->inbound) ? 0 : session->addrlen);
2144   plugin->env->update_address_metrics (plugin->env->cls,
2145                 &session->target,
2146                 (GNUNET_YES == session->inbound) ? NULL : session->addr,
2147                                        (GNUNET_YES == session->inbound) ? 0 : session->addrlen,
2148                                        session,
2149                                        &distance,
2150                                        1);
2151
2152   reschedule_session_timeout (session);
2153
2154   if (delay.rel_value == 0)
2155   {
2156     GNUNET_SERVER_receive_done (client, GNUNET_OK);
2157   }
2158   else
2159   {
2160     LOG (GNUNET_ERROR_TYPE_DEBUG, 
2161          "Throttling receiving from `%s' for %llu ms\n",
2162          GNUNET_i2s (&session->target),
2163          (unsigned long long) delay.rel_value);
2164     GNUNET_SERVER_disable_receive_done_warning (client);
2165     session->receive_delay_task =
2166         GNUNET_SCHEDULER_add_delayed (delay, &delayed_done, session);
2167   }
2168 }
2169
2170
2171 /**
2172  * Functions with this signature are called whenever a peer
2173  * is disconnected on the network level.
2174  *
2175  * @param cls closure
2176  * @param client identification of the client
2177  */
2178 static void
2179 disconnect_notify (void *cls, struct GNUNET_SERVER_Client *client)
2180 {
2181   struct Plugin *plugin = cls;
2182   struct Session *session;
2183
2184   if (client == NULL)
2185     return;
2186   session = lookup_session_by_client (plugin, client);
2187   if (session == NULL)
2188     return;                     /* unknown, nothing to do */
2189   LOG (GNUNET_ERROR_TYPE_DEBUG,
2190        "Destroying session of `%4s' with %s due to network-level disconnect.\n",
2191        GNUNET_i2s (&session->target),
2192        (session->addr !=
2193         NULL) ? tcp_address_to_string (session->plugin,
2194                                        session->addr,
2195                                        session->addrlen) :
2196        "*");
2197
2198   if (plugin->cur_connections == plugin->max_connections)
2199         GNUNET_SERVER_resume (plugin->server); /* Resume server  */
2200
2201   if (plugin->cur_connections < 1)
2202         GNUNET_break (0);
2203   else
2204         plugin->cur_connections--;
2205
2206   GNUNET_STATISTICS_update (session->plugin->env->stats,
2207                             gettext_noop
2208                             ("# network-level TCP disconnect events"), 1,
2209                             GNUNET_NO);
2210   disconnect_session (session);
2211 }
2212
2213
2214 /**
2215  * We can now send a probe message, copy into buffer to really send.
2216  *
2217  * @param cls closure, a struct TCPProbeContext
2218  * @param size max size to copy
2219  * @param buf buffer to copy message to
2220  * @return number of bytes copied into buf
2221  */
2222 static size_t
2223 notify_send_probe (void *cls, size_t size, void *buf)
2224 {
2225   struct TCPProbeContext *tcp_probe_ctx = cls;
2226   struct Plugin *plugin = tcp_probe_ctx->plugin;
2227   size_t ret;
2228
2229   tcp_probe_ctx->transmit_handle = NULL;
2230   GNUNET_CONTAINER_DLL_remove (plugin->probe_head, plugin->probe_tail,
2231                                tcp_probe_ctx);
2232   if (buf == NULL)
2233   {
2234     GNUNET_CONNECTION_destroy (tcp_probe_ctx->sock);
2235     GNUNET_free (tcp_probe_ctx);
2236     return 0;
2237   }
2238   GNUNET_assert (size >= sizeof (tcp_probe_ctx->message));
2239   memcpy (buf, &tcp_probe_ctx->message, sizeof (tcp_probe_ctx->message));
2240   GNUNET_SERVER_connect_socket (tcp_probe_ctx->plugin->server,
2241                                 tcp_probe_ctx->sock);
2242   ret = sizeof (tcp_probe_ctx->message);
2243   GNUNET_free (tcp_probe_ctx);
2244   return ret;
2245 }
2246
2247
2248 /**
2249  * Function called by the NAT subsystem suggesting another peer wants
2250  * to connect to us via connection reversal.  Try to connect back to the
2251  * given IP.
2252  *
2253  * @param cls closure
2254  * @param addr address to try
2255  * @param addrlen number of bytes in addr
2256  */
2257 static void
2258 try_connection_reversal (void *cls, const struct sockaddr *addr,
2259                          socklen_t addrlen)
2260 {
2261   struct Plugin *plugin = cls;
2262   struct GNUNET_CONNECTION_Handle *sock;
2263   struct TCPProbeContext *tcp_probe_ctx;
2264
2265   /**
2266    * We have received an ICMP response, ostensibly from a peer
2267    * that wants to connect to us! Send a message to establish a connection.
2268    */
2269   sock = GNUNET_CONNECTION_create_from_sockaddr (AF_INET, addr, addrlen);
2270   if (sock == NULL)
2271   {
2272     /* failed for some odd reason (out of sockets?); ignore attempt */
2273     return;
2274   }
2275
2276   /* FIXME: do we need to track these probe context objects so that
2277    * we can clean them up on plugin unload? */
2278   tcp_probe_ctx = GNUNET_malloc (sizeof (struct TCPProbeContext));
2279   tcp_probe_ctx->message.header.size =
2280       htons (sizeof (struct TCP_NAT_ProbeMessage));
2281   tcp_probe_ctx->message.header.type =
2282       htons (GNUNET_MESSAGE_TYPE_TRANSPORT_TCP_NAT_PROBE);
2283   memcpy (&tcp_probe_ctx->message.clientIdentity, plugin->env->my_identity,
2284           sizeof (struct GNUNET_PeerIdentity));
2285   tcp_probe_ctx->plugin = plugin;
2286   tcp_probe_ctx->sock = sock;
2287   GNUNET_CONTAINER_DLL_insert (plugin->probe_head, plugin->probe_tail,
2288                                tcp_probe_ctx);
2289   tcp_probe_ctx->transmit_handle =
2290       GNUNET_CONNECTION_notify_transmit_ready (sock,
2291                                                ntohs (tcp_probe_ctx->
2292                                                       message.header.size),
2293                                                GNUNET_TIME_UNIT_FOREVER_REL,
2294                                                &notify_send_probe,
2295                                                tcp_probe_ctx);
2296
2297 }
2298
2299
2300 /**
2301  * Session was idle, so disconnect it
2302  */
2303 static void
2304 session_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
2305 {
2306   GNUNET_assert (NULL != cls);
2307   struct Session *s = cls;
2308
2309   s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
2310   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2311               "Session %p was idle for %llu ms, disconnecting\n",
2312               s, (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
2313   /* call session destroy function */
2314   disconnect_session(s);
2315 }
2316
2317
2318 /**
2319  * Start session timeout
2320  */
2321 static void
2322 start_session_timeout (struct Session *s)
2323 {
2324   GNUNET_assert (NULL != s);
2325   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == s->timeout_task);
2326   s->timeout_task =  GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
2327                                                    &session_timeout,
2328                                                    s);
2329   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2330               "Timeout for session %p set to %llu ms\n",
2331               s,  (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
2332 }
2333
2334
2335 /**
2336  * Increment session timeout due to activity
2337  */
2338 static void
2339 reschedule_session_timeout (struct Session *s)
2340 {
2341   GNUNET_assert (NULL != s);
2342   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != s->timeout_task);
2343
2344   GNUNET_SCHEDULER_cancel (s->timeout_task);
2345   s->timeout_task =  GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT,
2346                                                    &session_timeout,
2347                                                    s);
2348   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2349               "Timeout rescheduled for session %p set to %llu ms\n",
2350               s, (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
2351 }
2352
2353
2354 /**
2355  * Cancel timeout
2356  */
2357 static void
2358 stop_session_timeout (struct Session *s)
2359 {
2360   GNUNET_assert (NULL != s);
2361
2362   if (GNUNET_SCHEDULER_NO_TASK != s->timeout_task)
2363   {
2364     GNUNET_SCHEDULER_cancel (s->timeout_task);
2365     s->timeout_task = GNUNET_SCHEDULER_NO_TASK;
2366     GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
2367                 "Timeout stopped for session %p canceled\n",
2368                 s, (unsigned long long) GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT.rel_value);
2369   }
2370 }
2371
2372
2373 /**
2374  * Entry point for the plugin.
2375  *
2376  * @param cls closure, the 'struct GNUNET_TRANSPORT_PluginEnvironment*'
2377  * @return the 'struct GNUNET_TRANSPORT_PluginFunctions*' or NULL on error
2378  */
2379 void *
2380 libgnunet_plugin_transport_tcp_init (void *cls)
2381 {
2382   static const struct GNUNET_SERVER_MessageHandler my_handlers[] = {
2383     {&handle_tcp_welcome, NULL, GNUNET_MESSAGE_TYPE_TRANSPORT_TCP_WELCOME,
2384      sizeof (struct WelcomeMessage)},
2385     {&handle_tcp_nat_probe, NULL, GNUNET_MESSAGE_TYPE_TRANSPORT_TCP_NAT_PROBE,
2386      sizeof (struct TCP_NAT_ProbeMessage)},
2387     {&handle_tcp_data, NULL, GNUNET_MESSAGE_TYPE_ALL, 0},
2388     {NULL, NULL, 0, 0}
2389   };
2390   struct GNUNET_TRANSPORT_PluginEnvironment *env = cls;
2391   struct GNUNET_TRANSPORT_PluginFunctions *api;
2392   struct Plugin *plugin;
2393   struct GNUNET_SERVICE_Context *service;
2394   unsigned long long aport;
2395   unsigned long long bport;
2396   unsigned long long max_connections;
2397   unsigned int i;
2398   struct GNUNET_TIME_Relative idle_timeout;
2399   int ret;
2400   int ret_s;
2401   struct sockaddr **addrs;
2402   socklen_t *addrlens;
2403
2404
2405   if (NULL == env->receive)
2406   {
2407     /* run in 'stub' mode (i.e. as part of gnunet-peerinfo), don't fully
2408        initialze the plugin or the API */
2409     api = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_PluginFunctions));
2410     api->cls = NULL;
2411     api->address_pretty_printer = &tcp_plugin_address_pretty_printer;
2412     api->address_to_string = &tcp_address_to_string;
2413     api->string_to_address = &tcp_string_to_address;
2414     return api;
2415   }
2416
2417   GNUNET_assert (NULL != env->cfg);
2418   if (GNUNET_OK !=
2419       GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-tcp",
2420                                              "MAX_CONNECTIONS",
2421                                              &max_connections))
2422     max_connections = 128;
2423
2424   aport = 0;
2425   if ((GNUNET_OK !=
2426        GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-tcp", "PORT",
2427                                               &bport)) || (bport > 65535) ||
2428       ((GNUNET_OK ==
2429         GNUNET_CONFIGURATION_get_value_number (env->cfg, "transport-tcp",
2430                                                "ADVERTISED-PORT", &aport)) &&
2431        (aport > 65535)))
2432   {
2433     LOG (GNUNET_ERROR_TYPE_ERROR, 
2434          _
2435          ("Require valid port number for service `%s' in configuration!\n"),
2436          "transport-tcp");
2437     return NULL;
2438   }
2439   if (aport == 0)
2440     aport = bport;
2441   if (bport == 0)
2442     aport = 0;
2443   if (bport != 0)
2444   {
2445     service = GNUNET_SERVICE_start ("transport-tcp", env->cfg, GNUNET_SERVICE_OPTION_NONE);
2446     if (service == NULL)
2447     {
2448       LOG (GNUNET_ERROR_TYPE_WARNING,
2449            _("Failed to start service.\n"));
2450       return NULL;
2451     }
2452   }
2453   else
2454     service = NULL;
2455
2456   /* Initialize my flags */
2457   myoptions = 0;
2458
2459   plugin = GNUNET_malloc (sizeof (struct Plugin));
2460   plugin->sessionmap = GNUNET_CONTAINER_multihashmap_create (max_connections, GNUNET_YES);
2461   plugin->max_connections = max_connections;
2462   plugin->cur_connections = 0;
2463   plugin->open_port = bport;
2464   plugin->adv_port = aport;
2465   plugin->env = env;
2466   plugin->lsock = NULL;
2467   if ((service != NULL) &&
2468       (GNUNET_SYSERR !=
2469        (ret_s =
2470         GNUNET_SERVICE_get_server_addresses ("transport-tcp", env->cfg, &addrs,
2471                                              &addrlens))))
2472   {
2473     for (ret = ret_s-1; ret >= 0; ret--)
2474       LOG (GNUNET_ERROR_TYPE_INFO,
2475            "Binding to address `%s'\n", 
2476            GNUNET_a2s (addrs[ret], addrlens[ret]));
2477     plugin->nat =
2478         GNUNET_NAT_register (env->cfg, GNUNET_YES, aport, (unsigned int) ret_s,
2479                              (const struct sockaddr **) addrs, addrlens,
2480                              &tcp_nat_port_map_callback,
2481                              &try_connection_reversal, plugin);
2482     for (ret = ret_s -1; ret >= 0; ret--)
2483     {
2484       GNUNET_assert (addrs[ret] != NULL);
2485       GNUNET_free (addrs[ret]);
2486     }
2487     GNUNET_free_non_null (addrs);
2488     GNUNET_free_non_null (addrlens);
2489   }
2490   else
2491   {
2492     plugin->nat = GNUNET_NAT_register (plugin->env->cfg,
2493                                        GNUNET_YES, 0, 0, NULL, NULL, NULL,
2494                                        &try_connection_reversal, plugin);
2495   }
2496   api = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_PluginFunctions));
2497   api->cls = plugin;
2498   api->send = &tcp_plugin_send;
2499   api->get_session = &tcp_plugin_get_session;
2500
2501   api->disconnect = &tcp_plugin_disconnect;
2502   api->address_pretty_printer = &tcp_plugin_address_pretty_printer;
2503   api->check_address = &tcp_plugin_check_address;
2504   api->address_to_string = &tcp_address_to_string;
2505   api->string_to_address = &tcp_string_to_address;
2506   plugin->service = service;
2507   if (service != NULL)
2508   {
2509     plugin->server = GNUNET_SERVICE_get_server (service);
2510   }
2511   else
2512   {
2513     if (GNUNET_OK !=
2514         GNUNET_CONFIGURATION_get_value_time (env->cfg, "transport-tcp",
2515                                              "TIMEOUT", &idle_timeout))
2516     {
2517       GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
2518                                  "transport-tcp", "TIMEOUT");
2519       if (plugin->nat != NULL)
2520         GNUNET_NAT_unregister (plugin->nat);
2521       GNUNET_free (plugin);
2522       GNUNET_free (api);
2523       return NULL;
2524     }
2525     plugin->server =
2526         GNUNET_SERVER_create_with_sockets (&plugin_tcp_access_check, plugin,
2527                                            NULL, idle_timeout, GNUNET_YES);
2528   }
2529   plugin->handlers = GNUNET_malloc (sizeof (my_handlers));
2530   memcpy (plugin->handlers, my_handlers, sizeof (my_handlers));
2531   for (i = 0;
2532        i < sizeof (my_handlers) / sizeof (struct GNUNET_SERVER_MessageHandler);
2533        i++)
2534     plugin->handlers[i].callback_cls = plugin;
2535
2536   GNUNET_SERVER_add_handlers (plugin->server, plugin->handlers);
2537   GNUNET_SERVER_disconnect_notify (plugin->server, &disconnect_notify, plugin);
2538   plugin->nat_wait_conns = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
2539   if (bport != 0)
2540     LOG (GNUNET_ERROR_TYPE_INFO, 
2541          _("TCP transport listening on port %llu\n"), bport);
2542   else
2543     LOG (GNUNET_ERROR_TYPE_INFO, 
2544          _
2545          ("TCP transport not listening on any port (client only)\n"));
2546   if (aport != bport)
2547     LOG (GNUNET_ERROR_TYPE_INFO, 
2548                      _
2549                      ("TCP transport advertises itself as being on port %llu\n"),
2550                      aport);
2551   /* Initially set connections to 0 */
2552   GNUNET_assert (NULL != plugin->env->stats);
2553   GNUNET_STATISTICS_set (plugin->env->stats,
2554                         gettext_noop ("# TCP sessions active"), 0,
2555                         GNUNET_NO);
2556   return api;
2557 }
2558
2559
2560 /**
2561  * Exit point from the plugin.
2562  */
2563 void *
2564 libgnunet_plugin_transport_tcp_done (void *cls)
2565 {
2566   struct GNUNET_TRANSPORT_PluginFunctions *api = cls;
2567   struct Plugin *plugin = api->cls;
2568   struct TCPProbeContext *tcp_probe;
2569
2570   if (NULL == plugin)
2571   {
2572     GNUNET_free (api);
2573     return NULL;
2574   }
2575   LOG (GNUNET_ERROR_TYPE_DEBUG, "Shutting down TCP plugin\n");
2576
2577   /* Removing leftover sessions */
2578   GNUNET_CONTAINER_multihashmap_iterate(plugin->sessionmap, &session_disconnect_it, NULL);
2579   /* Removing leftover NAT sessions */
2580   GNUNET_CONTAINER_multihashmap_iterate(plugin->nat_wait_conns, &session_disconnect_it, NULL);
2581
2582   if (plugin->service != NULL)
2583     GNUNET_SERVICE_stop (plugin->service);
2584   else
2585     GNUNET_SERVER_destroy (plugin->server);
2586   GNUNET_free (plugin->handlers);
2587   if (plugin->nat != NULL)
2588     GNUNET_NAT_unregister (plugin->nat);
2589   while (NULL != (tcp_probe = plugin->probe_head))
2590   {
2591     GNUNET_CONTAINER_DLL_remove (plugin->probe_head, plugin->probe_tail,
2592                                  tcp_probe);
2593     GNUNET_CONNECTION_destroy (tcp_probe->sock);
2594     GNUNET_free (tcp_probe);
2595   }
2596   GNUNET_CONTAINER_multihashmap_destroy (plugin->nat_wait_conns);
2597   GNUNET_CONTAINER_multihashmap_destroy (plugin->sessionmap);
2598   GNUNET_free (plugin);
2599   GNUNET_free (api);
2600   return NULL;
2601 }
2602
2603 /* end of plugin_transport_tcp.c */