simplifying transport plugin API
[oweals/gnunet.git] / src / transport / plugin_transport_tcp.c
1 /*
2      This file is part of GNUnet
3      (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009 Christian Grothoff (and other contributing authors)
4
5      GNUnet is free software; you can redistribute it and/or modify
6      it under the terms of the GNU General Public License as published
7      by the Free Software Foundation; either version 2, or (at your
8      option) any later version.
9
10      GNUnet is distributed in the hope that it will be useful, but
11      WITHOUT ANY WARRANTY; without even the implied warranty of
12      MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13      General Public License for more details.
14
15      You should have received a copy of the GNU General Public License
16      along with GNUnet; see the file COPYING.  If not, write to the
17      Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18      Boston, MA 02111-1307, USA.
19 */
20
21 /**
22  * @file transport/plugin_transport_tcp.c
23  * @brief Implementation of the TCP transport service
24  * @author Christian Grothoff
25  */
26
27 #include "platform.h"
28 #include "gnunet_hello_lib.h"
29 #include "gnunet_connection_lib.h"
30 #include "gnunet_os_lib.h"
31 #include "gnunet_peerinfo_service.h"
32 #include "gnunet_protocols.h"
33 #include "gnunet_resolver_service.h"
34 #include "gnunet_server_lib.h"
35 #include "gnunet_service_lib.h"
36 #include "gnunet_signatures.h"
37 #include "gnunet_statistics_service.h"
38 #include "gnunet_transport_service.h"
39 #include "plugin_transport.h"
40 #include "transport.h"
41
42 #define DEBUG_TCP GNUNET_NO
43
44 /**
45  * After how long do we expire an address that we
46  * learned from another peer if it is not reconfirmed
47  * by anyone?
48  */
49 #define LEARNED_ADDRESS_EXPIRATION GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_HOURS, 6)
50
51 /**
52  * How long until we give up on transmitting the welcome message?
53  */
54 #define WELCOME_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
55
56 /**
57  * How long until we give up on transmitting the welcome message?
58  */
59 #define HOSTNAME_RESOLVE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5)
60
61 /**
62  * For how many messages back to we keep transmission times?
63  */
64 #define ACK_LOG_SIZE 32
65
66
67
68 /**
69  * Message used to ask a peer to validate receipt (to check an address
70  * from a HELLO).  Followed by the address used.  Note that the
71  * recipients response does not affirm that he has this address,
72  * only that he got the challenge message.
73  */
74 struct ValidationChallengeMessage
75 {
76
77   /**
78    * Type will be GNUNET_MESSAGE_TYPE_TRANSPORT_TCP_PING
79    */
80   struct GNUNET_MessageHeader header;
81
82   /**
83    * Random challenge number (in network byte order).
84    */
85   uint32_t challenge GNUNET_PACKED;
86
87   /**
88    * Who is the intended recipient?
89    */
90   struct GNUNET_PeerIdentity target;
91
92 };
93
94
95 /**
96  * Message used to validate a HELLO.  The challenge is included in the
97  * confirmation to make matching of replies to requests possible.  The
98  * signature signs the original challenge number, our public key, the
99  * sender's address (so that the sender can check that the address we
100  * saw is plausible for him and possibly detect a MiM attack) and a
101  * timestamp (to limit replay).<p>
102  *
103  * This message is followed by the address of the
104  * client that we are observing (which is part of what
105  * is being signed).
106  */
107 struct ValidationChallengeResponse
108 {
109
110   /**
111    * Type will be GNUNET_MESSAGE_TYPE_TRANSPORT_TCP_PONG
112    */
113   struct GNUNET_MessageHeader header;
114
115   /**
116    * For padding, always zero.
117    */
118   uint32_t reserved GNUNET_PACKED;
119
120   /**
121    * Signature.
122    */
123   struct GNUNET_CRYPTO_RsaSignature signature;
124
125   /**
126    * What are we signing and why?
127    */
128   struct GNUNET_CRYPTO_RsaSignaturePurpose purpose;
129
130   /**
131    * Random challenge number (in network byte order).
132    */
133   uint32_t challenge GNUNET_PACKED;
134
135   /**
136    * Who signed this message?
137    */
138   struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded signer;
139
140 };
141
142
143
144 /**
145  * Initial handshake message for a session.  This header
146  * is followed by the address that the other peer used to
147  * connect to us (so that we may learn it) or the address
148  * that the other peer got from the accept call.
149  */
150 struct WelcomeMessage
151 {
152   struct GNUNET_MessageHeader header;
153
154   /**
155    * Identity of the node connecting (TCP client)
156    */
157   struct GNUNET_PeerIdentity clientIdentity;
158
159 };
160
161
162 /**
163  * Encapsulation for normal TCP traffic.
164  */
165 struct DataMessage
166 {
167   struct GNUNET_MessageHeader header;
168
169   /**
170    * For alignment.
171    */
172   uint32_t reserved GNUNET_PACKED;
173
174   /**
175    * Number of the last message that was received from the other peer.
176    */
177   uint64_t ack_in GNUNET_PACKED;
178
179   /**
180    * Number of this outgoing message.
181    */
182   uint64_t ack_out GNUNET_PACKED;
183
184   /**
185    * How long was sending this ack delayed by the other peer
186    * (estimate).  The receiver of this message can use the delay
187    * between sending his message number 'ack' and receiving this ack
188    * minus the delay as an estimate of the round-trip time.
189    */
190   struct GNUNET_TIME_RelativeNBO delay;
191
192 };
193
194
195 /**
196  * Encapsulation of all of the state of the plugin.
197  */
198 struct Plugin;
199
200
201 /**
202  * Information kept for each message that is yet to
203  * be transmitted.
204  */
205 struct PendingMessage
206 {
207
208   /**
209    * This is a linked list.
210    */
211   struct PendingMessage *next;
212
213   /**
214    * The pending message, pointer to the end
215    * of this struct, do not free!
216    */
217   struct GNUNET_MessageHeader *msg;
218
219   /**
220    * Continuation function to call once the message
221    * has been sent.  Can be  NULL if there is no
222    * continuation to call.
223    */
224   GNUNET_TRANSPORT_TransmitContinuation transmit_cont;
225
226   /**
227    * Closure for transmit_cont.
228    */
229   void *transmit_cont_cls;
230
231   /**
232    * Timeout value for the pending message.
233    */
234   struct GNUNET_TIME_Absolute timeout;
235
236   /**
237    * GNUNET_YES if this is a welcome message;
238    * otherwise this should be a DATA message.
239    */
240   int is_welcome;
241
242 };
243
244
245 /**
246  * Session handle for TCP connections.
247  */
248 struct Session
249 {
250
251   /**
252    * Stored in a linked list.
253    */
254   struct Session *next;
255
256   /**
257    * Pointer to the global plugin struct.
258    */
259   struct Plugin *plugin;
260
261   /**
262    * The client (used to identify this connection)
263    */
264   struct GNUNET_SERVER_Client *client;
265
266   /**
267    * gnunet-service-transport context for this connection.
268    */
269   struct ReadyList *service_context;
270
271   /**
272    * Messages currently pending for transmission
273    * to this peer, if any.
274    */
275   struct PendingMessage *pending_messages;
276
277   /**
278    * Handle for pending transmission request.
279    */
280   struct GNUNET_CONNECTION_TransmitHandle *transmit_handle;
281
282   /**
283    * To whom are we talking to (set to our identity
284    * if we are still waiting for the welcome message)
285    */
286   struct GNUNET_PeerIdentity target;
287
288   /**
289    * At what time did we reset last_received last?
290    */
291   struct GNUNET_TIME_Absolute last_quota_update;
292
293   /**
294    * Address of the other peer if WE initiated the connection
295    * (and hence can be sure what it is), otherwise NULL.
296    */
297   void *connect_addr;
298
299   /**
300    * How many bytes have we received since the "last_quota_update"
301    * timestamp?
302    */
303   uint64_t last_received;
304
305   /**
306    * Our current latency estimate (in ms).
307    */
308   double latency_estimate;
309
310   /**
311    * Time when we generated the last ACK_LOG_SIZE acks.
312    * (the "last" refers to the "out_msg_counter" here)
313    */
314   struct GNUNET_TIME_Absolute gen_time[ACK_LOG_SIZE];
315
316   /**
317    * Our current sequence number.
318    */
319   uint64_t out_msg_counter;
320
321   /**
322    * Highest received incoming sequence number.
323    */
324   uint64_t max_in_msg_counter;
325
326   /**
327    * Number of bytes per ms that this peer is allowed
328    * to send to us.
329    */
330   uint32_t quota_in;
331
332   /**
333    * Length of connect_addr, can be 0.
334    */
335   size_t connect_alen;
336
337   /**
338    * Are we still expecting the welcome message? (GNUNET_YES/GNUNET_NO)
339    * GNUNET_SYSERR is used to mark non-welcoming connections (HELLO
340    * validation only).
341    */
342   int expecting_welcome;
343
344 };
345
346
347 /**
348  * Encapsulation of all of the state of the plugin.
349  */
350 struct Plugin
351 {
352   /**
353    * Our environment.
354    */
355   struct GNUNET_TRANSPORT_PluginEnvironment *env;
356
357   /**
358    * The listen socket.
359    */
360   struct GNUNET_CONNECTION_Handle *lsock;
361
362   /**
363    * List of open TCP sessions.
364    */
365   struct Session *sessions;
366
367   /**
368    * Handle for the statistics service.
369    */
370   struct GNUNET_STATISTICS_Handle *statistics;
371
372   /**
373    * Handle to the network service.
374    */
375   struct GNUNET_SERVICE_Context *service;
376
377   /**
378    * Handle to the server for this service.
379    */
380   struct GNUNET_SERVER_Handle *server;
381
382   /**
383    * Copy of the handler array where the closures are
384    * set to this struct's instance.
385    */
386   struct GNUNET_SERVER_MessageHandler *handlers;
387
388   /**
389    * Handle for request of hostname resolution, non-NULL if pending.
390    */
391   struct GNUNET_RESOLVER_RequestHandle *hostname_dns;
392
393   /**
394    * ID of task used to update our addresses when one expires.
395    */
396   GNUNET_SCHEDULER_TaskIdentifier address_update_task;
397
398   /**
399    * Port that we are actually listening on.
400    */
401   uint16_t open_port;
402
403   /**
404    * Port that the user said we would have visible to the
405    * rest of the world.
406    */
407   uint16_t adv_port;
408
409 };
410
411
412 /**
413  * Find the session handle for the given peer.
414  */
415 static struct Session *
416 find_session_by_target (struct Plugin *plugin,
417                         const struct GNUNET_PeerIdentity *target)
418 {
419   struct Session *ret;
420
421   ret = plugin->sessions;
422   while ((ret != NULL) &&
423          ( (GNUNET_SYSERR == ret->expecting_welcome) ||
424            (0 != memcmp (target,
425                          &ret->target, sizeof (struct GNUNET_PeerIdentity)))))
426     ret = ret->next;
427   return ret;
428 }
429
430
431 /**
432  * Find the session handle for the given peer.
433  */
434 static struct Session *
435 find_session_by_client (struct Plugin *plugin,
436                         const struct GNUNET_SERVER_Client *client)
437 {
438   struct Session *ret;
439
440   ret = plugin->sessions;
441   while ((ret != NULL) && (client != ret->client))
442     ret = ret->next;
443   return ret;
444 }
445
446
447 /**
448  * Create a welcome message.
449  */
450 static struct PendingMessage *
451 create_welcome (size_t addrlen, const void *addr, struct Plugin *plugin)
452 {
453   struct PendingMessage *pm;
454   struct WelcomeMessage *welcome;
455
456   pm = GNUNET_malloc (sizeof (struct PendingMessage) +
457                       sizeof (struct WelcomeMessage) + addrlen);
458   pm->msg = (struct GNUNET_MessageHeader *) &pm[1];
459   welcome = (struct WelcomeMessage *) &pm[1];
460   welcome->header.size = htons (sizeof (struct WelcomeMessage) + addrlen);
461   welcome->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_TCP_WELCOME);
462   GNUNET_CRYPTO_hash (plugin->env->my_public_key,
463                       sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
464                       &welcome->clientIdentity.hashPubKey);
465   memcpy (&welcome[1], addr, addrlen);
466   pm->timeout = GNUNET_TIME_relative_to_absolute (WELCOME_TIMEOUT);
467   pm->is_welcome = GNUNET_YES;
468   return pm;
469 }
470
471
472 /**
473  * Create a new session using the specified address
474  * for the welcome message.
475  *
476  * @param plugin us
477  * @param target peer to connect to
478  * @param client client to use
479  * @param addrlen IPv4 or IPv6
480  * @param addr either struct sockaddr_in or struct sockaddr_in6
481  * @return NULL connection failed / invalid address
482  */
483 static struct Session *
484 create_session (struct Plugin *plugin,
485                 const struct GNUNET_PeerIdentity *target,
486                 struct GNUNET_SERVER_Client *client,
487                 const void *addr, size_t addrlen)
488 {
489   struct Session *ret;
490
491   ret = GNUNET_malloc (sizeof (struct Session));
492   ret->plugin = plugin;
493   ret->next = plugin->sessions;
494   plugin->sessions = ret;
495   ret->client = client;
496   ret->target = *target;
497   ret->last_quota_update = GNUNET_TIME_absolute_get ();
498   ret->quota_in = plugin->env->default_quota_in;
499   ret->expecting_welcome = GNUNET_YES;
500   ret->pending_messages = create_welcome (addrlen, addr, plugin);
501   return ret;
502 }
503
504
505 /**
506  * If we have pending messages, ask the server to
507  * transmit them (schedule the respective tasks, etc.)
508  *
509  * @param session for which session should we do this
510  */
511 static void process_pending_messages (struct Session *session);
512
513
514 /**
515  * Function called to notify a client about the socket
516  * begin ready to queue more data.  "buf" will be
517  * NULL and "size" zero if the socket was closed for
518  * writing in the meantime.
519  *
520  * @param cls closure
521  * @param size number of bytes available in buf
522  * @param buf where the callee should write the message
523  * @return number of bytes written to buf
524  */
525 static size_t
526 do_transmit (void *cls, size_t size, void *buf)
527 {
528   struct Session *session = cls;
529   struct PendingMessage *pm;
530   char *cbuf;
531   uint16_t msize;
532   size_t ret;
533   struct DataMessage *dm;
534
535   session->transmit_handle = NULL;
536   if (buf == NULL)
537     {
538 #if DEBUG_TCP
539       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
540                        "tcp", 
541                        "Timeout trying to transmit to peer `%4s', discarding message queue.\n",
542                        GNUNET_i2s(&session->target));
543 #endif
544       /* timeout */
545       while (NULL != (pm = session->pending_messages))
546         {
547           session->pending_messages = pm->next;
548 #if DEBUG_TCP
549           GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
550                            "tcp", 
551                            "Failed to transmit message of type %u to `%4s'.\n",
552                            ntohs(pm->msg->type),
553                            GNUNET_i2s(&session->target));
554 #endif
555           if (pm->transmit_cont != NULL)
556             pm->transmit_cont (pm->transmit_cont_cls,
557                                session->service_context,
558                                &session->target, GNUNET_SYSERR);            
559           GNUNET_free (pm);
560         }
561       return 0;
562     }
563   ret = 0;
564   cbuf = buf;
565   while (NULL != (pm = session->pending_messages))
566     {
567       if (pm->is_welcome)
568         {
569           if (size < (msize = ntohs (pm->msg->size)))
570             break;
571           memcpy (cbuf, pm->msg, msize);
572           cbuf += msize;
573           ret += msize;
574           size -= msize;
575         }
576       else
577         {
578           if (size <
579               sizeof (struct DataMessage) + (msize = ntohs (pm->msg->size)))
580             break;
581           dm = (struct DataMessage *) cbuf;
582           dm->header.size = htons (sizeof (struct DataMessage) + msize);
583           dm->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_TCP_DATA);
584           dm->ack_out = GNUNET_htonll (++session->out_msg_counter);
585           dm->ack_in = GNUNET_htonll (session->max_in_msg_counter);
586           cbuf += sizeof (struct DataMessage);
587           ret += sizeof (struct DataMessage);
588           size -= sizeof (struct DataMessage);
589           memcpy (cbuf, pm->msg, msize);
590           cbuf += msize;
591           ret += msize;
592           size -= msize;
593         }
594       session->pending_messages = pm->next;
595       if (pm->transmit_cont != NULL)
596         pm->transmit_cont (pm->transmit_cont_cls,
597                            session->service_context,
598                            &session->target, GNUNET_OK);
599       GNUNET_free (pm);
600       session->gen_time[session->out_msg_counter % ACK_LOG_SIZE]
601         = GNUNET_TIME_absolute_get ();
602     }
603   process_pending_messages (session);
604 #if DEBUG_TCP
605   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
606                    "tcp", "Transmitting %u bytes\n", ret);
607 #endif
608   return ret;
609 }
610
611
612 /**
613  * If we have pending messages, ask the server to
614  * transmit them (schedule the respective tasks, etc.)
615  *
616  * @param session for which session should we do this
617  */
618 static void
619 process_pending_messages (struct Session *session)
620 {
621   GNUNET_assert (session->client != NULL);
622   if (session->pending_messages == NULL)
623     return;
624   if (session->transmit_handle != NULL)
625     return;
626   session->transmit_handle
627     = GNUNET_SERVER_notify_transmit_ready (session->client,
628                                            ntohs (session->
629                                                   pending_messages->msg->
630                                                   size) +
631                                            (session->
632                                             pending_messages->is_welcome ? 0 :
633                                             sizeof (struct DataMessage)),
634                                            GNUNET_TIME_absolute_get_remaining
635                                            (session->
636                                             pending_messages[0].timeout),
637                                            &do_transmit, session);
638 }
639
640
641
642 /**
643  * Create a new session connecting to the specified
644  * target at the specified address.  The session will
645  * be used to verify an address in a HELLO and should
646  * not expect to receive a WELCOME.
647  *
648  * @param plugin us
649  * @param target peer to connect to
650  * @param addrlen IPv4 or IPv6
651  * @param addr either struct sockaddr_in or struct sockaddr_in6
652  * @return NULL connection failed / invalid address
653  */
654 static struct Session *
655 connect_and_create_validation_session (struct Plugin *plugin,
656                                        const struct GNUNET_PeerIdentity *target,
657                                        const void *addr, size_t addrlen)
658 {
659   struct GNUNET_SERVER_Client *client;
660   struct GNUNET_CONNECTION_Handle *conn;
661   struct Session *session;
662   int af;
663
664   if (addrlen == sizeof (struct sockaddr_in))
665     af = AF_INET;
666   else if (addrlen == sizeof (struct sockaddr_in6))
667     af = AF_INET6;    
668   else
669     {
670       GNUNET_break_op (0);
671       return NULL;              /* invalid address */
672     }
673   conn = GNUNET_CONNECTION_create_from_sockaddr (plugin->env->sched,
674                                                      af,
675                                                      addr,
676                                                      addrlen,
677                                                      GNUNET_SERVER_MAX_MESSAGE_SIZE);
678   if (conn == NULL)
679     {
680 #if DEBUG_TCP
681       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
682                        "tcp",
683                        "Failed to create connection to peer at `%s'.\n",
684                        GNUNET_a2s(addr, addrlen));
685 #endif
686       return NULL;
687     }
688   client = GNUNET_SERVER_connect_socket (plugin->server, conn);
689   GNUNET_assert (client != NULL);
690   session = create_session (plugin, target, client, addr, addrlen);
691   /* kill welcome */
692   GNUNET_free (session->pending_messages);
693   session->pending_messages = NULL;
694   session->connect_alen = addrlen;
695   session->connect_addr = GNUNET_malloc (addrlen);
696   session->expecting_welcome = GNUNET_SYSERR;  
697   memcpy (session->connect_addr, addr, addrlen);
698 #if DEBUG_TCP
699   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
700                    "tcp",
701                    "Creating new session %p with `%s' for `%4s' based on `%s' request.\n",
702                    session, 
703                    GNUNET_a2s (addr, addrlen),
704                    GNUNET_i2s (&session->target),
705                    "VALIDATE");
706 #endif
707   return session;
708 }
709
710
711 /**
712  * Function that can be used by the transport service to validate that
713  * another peer is reachable at a particular address (even if we
714  * already have a connection to this peer, this function is required
715  * to establish a new one).
716  *
717  * @param cls closure
718  * @param target who should receive this message
719  * @param challenge challenge code to use
720  * @param addrlen length of the address
721  * @param addr the address
722  * @param timeout how long should we try to transmit these?
723  * @return GNUNET_OK if the transmission has been scheduled
724  */
725 static int
726 tcp_plugin_validate (void *cls,
727                      const struct GNUNET_PeerIdentity *target,
728                      uint32_t challenge,
729                      struct GNUNET_TIME_Relative timeout,
730                      const void *addr, size_t addrlen)
731 {
732   struct Plugin *plugin = cls;
733   struct Session *session;
734   struct PendingMessage *pm;
735   struct ValidationChallengeMessage *vcm;
736
737   session = connect_and_create_validation_session (plugin, target, addr, addrlen);
738   if (session == NULL)
739     {
740 #if DEBUG_TCP
741       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
742                        "tcp", "Failed to create fresh session.\n");
743 #endif
744       return GNUNET_SYSERR;
745     }
746   pm = GNUNET_malloc (sizeof (struct PendingMessage) +
747                       sizeof (struct ValidationChallengeMessage) + addrlen);
748   pm->msg = (struct GNUNET_MessageHeader *) &pm[1];
749   pm->timeout = GNUNET_TIME_relative_to_absolute (timeout);
750   pm->is_welcome = GNUNET_YES;
751   vcm = (struct ValidationChallengeMessage*) &pm[1];
752   vcm->header.size =
753     htons (sizeof (struct ValidationChallengeMessage) + addrlen);
754   vcm->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_TCP_PING);
755   vcm->challenge = htonl(challenge);
756   vcm->target = *target;
757   memcpy (&vcm[1], addr, addrlen);
758   GNUNET_assert (session->pending_messages == NULL);
759   session->pending_messages = pm;
760   process_pending_messages (session);
761   return GNUNET_OK;
762 }
763
764
765 /**
766  * Functions with this signature are called whenever we need
767  * to close a session due to a disconnect or failure to
768  * establish a connection.
769  *
770  * @param session session to close down
771  */
772 static void
773 disconnect_session (struct Session *session)
774 {
775   struct Session *prev;
776   struct Session *pos;
777   struct PendingMessage *pm;
778
779 #if DEBUG_TCP
780   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
781                    "tcp",
782                    "Disconnecting from `%4s' at %s (session %p).\n", 
783                    GNUNET_i2s(&session->target),
784                    (session->connect_addr != NULL) ? 
785                    GNUNET_a2s(session->connect_addr,
786                               session->connect_alen) : "*",
787                    session);
788 #endif
789   /* remove from session list */
790   prev = NULL;
791   pos = session->plugin->sessions;
792   while (pos != session)
793     {
794       prev = pos;
795       pos = pos->next;
796     }
797   if (prev == NULL)
798     session->plugin->sessions = session->next;
799   else
800     prev->next = session->next;
801   /* clean up state */
802   if (session->transmit_handle != NULL)
803     {
804       GNUNET_CONNECTION_notify_transmit_ready_cancel (session->transmit_handle);
805       session->transmit_handle = NULL;
806     }
807   while (NULL != (pm = session->pending_messages))
808     {
809 #if DEBUG_TCP
810       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
811                        "tcp",
812                        pm->transmit_cont != NULL 
813                        ? "Could not deliver message of type %u to `%4s'.\n" 
814                        : "Could not deliver message of type %u to `%4s', notifying.\n",
815                        ntohs(pm->msg->type),                   
816                        GNUNET_i2s(&session->target));
817 #endif
818       session->pending_messages = pm->next;
819       if (NULL != pm->transmit_cont)
820         pm->transmit_cont (pm->transmit_cont_cls,
821                            session->service_context,
822                            &session->target, GNUNET_SYSERR);
823       GNUNET_free (pm);
824     }
825   if (GNUNET_NO == session->expecting_welcome)
826     {
827 #if DEBUG_TCP
828       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
829                        "tcp",
830                        "Notifying transport service about loss of data connection with `%4s'.\n",
831                        GNUNET_i2s(&session->target));
832 #endif
833       /* Data session that actually went past the 
834          initial handshake; transport service may
835          know about this one, so we need to 
836          notify transport service about disconnect */
837       session->plugin->env->receive (session->plugin->env->cls,
838                                      session->service_context,
839                                      GNUNET_TIME_UNIT_ZERO,
840                                      &session->target, NULL);
841     }
842   if (session->client != NULL)
843     {
844       GNUNET_SERVER_client_drop (session->client);
845       session->client = NULL;
846     }
847   GNUNET_free_non_null (session->connect_addr);
848   GNUNET_free (session);
849 }
850
851
852 /**
853  * Iterator callback to go over all addresses.  If we get
854  * a TCP address, increment the counter
855  *
856  * @param cls closure, points to the counter
857  * @param tname name of the transport
858  * @param expiration expiration time
859  * @param addr the address
860  * @param addrlen length of the address
861  * @return GNUNET_OK to keep the address,
862  *         GNUNET_NO to delete it from the HELLO
863  *         GNUNET_SYSERR to stop iterating (but keep current address)
864  */
865 static int
866 count_tcp_addresses (void *cls,
867                      const char *tname,
868                      struct GNUNET_TIME_Absolute expiration,
869                      const void *addr, size_t addrlen)
870 {
871   unsigned int *counter = cls;
872
873   if (0 != strcmp (tname, "tcp"))
874     return GNUNET_OK;           /* not one of ours */
875   (*counter)++;
876   return GNUNET_OK;             /* failed to connect */
877 }
878
879
880 struct ConnectContext
881 {
882   struct Plugin *plugin;
883
884   struct GNUNET_CONNECTION_Handle *sa;
885
886   struct PendingMessage *welcome;
887
888   unsigned int pos;
889 };
890
891
892 /**
893  * Iterator callback to go over all addresses.  If we get
894  * the "pos" TCP address, try to connect to it.
895  *
896  * @param cls closure
897  * @param tname name of the transport
898  * @param expiration expiration time
899  * @param addrlen length of the address
900  * @param addr the address
901  * @return GNUNET_OK to keep the address,
902  *         GNUNET_NO to delete it from the HELLO
903  *         GNUNET_SYSERR to stop iterating (but keep current address)
904  */
905 static int
906 try_connect_to_address (void *cls,
907                         const char *tname,
908                         struct GNUNET_TIME_Absolute expiration,
909                         const void *addr, size_t addrlen)
910 {
911   struct ConnectContext *cc = cls;
912   int af;
913
914   if (0 != strcmp (tname, "tcp"))
915     return GNUNET_OK;           /* not one of ours */
916   if (sizeof (struct sockaddr_in) == addrlen)
917     af = AF_INET;
918   else if (sizeof (struct sockaddr_in6) == addrlen)
919     af = AF_INET6;
920   else
921     {
922       /* not a valid address */
923       GNUNET_break (0);
924       return GNUNET_NO;
925     }
926   if (0 == cc->pos--)
927     {
928       cc->welcome = create_welcome (addrlen, addr, cc->plugin);
929       cc->sa =
930         GNUNET_CONNECTION_create_from_sockaddr (cc->plugin->env->sched,
931                                                     af, addr, addrlen,
932                                                     GNUNET_SERVER_MAX_MESSAGE_SIZE);
933 #if DEBUG_TCP
934       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
935                        "tcp", 
936                        "Connecting using address %s.\n",
937                        GNUNET_a2s(addr, addrlen));
938 #endif
939       return GNUNET_SYSERR;
940     }
941   return GNUNET_OK;             /* failed to connect */
942 }
943
944
945 /**
946  * Type of an iterator over the hosts.  Note that each
947  * host will be called with each available protocol.
948  *
949  * @param cls closure
950  * @param peer id of the peer, NULL for last call
951  * @param hello hello message for the peer (can be NULL)
952  * @param trust amount of trust we have in the peer
953  */
954 static void
955 session_try_connect (void *cls,
956                      const struct GNUNET_PeerIdentity *peer,
957                      const struct GNUNET_HELLO_Message *hello, uint32_t trust)
958 {
959   struct Session *session = cls;
960   unsigned int count;
961   struct ConnectContext cctx;
962   struct PendingMessage *pm;
963
964   if (peer == NULL)
965     {
966       /* last call, destroy session if we are still not
967          connected */
968       if (session->client != NULL)
969         {
970 #if DEBUG_TCP
971           GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
972                            "tcp",
973                            "Now connected to `%4s', now processing messages.\n",
974                            GNUNET_i2s(&session->target));
975 #endif
976           process_pending_messages (session);
977         }
978       else
979         {
980 #if DEBUG_TCP
981           GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
982                            "tcp",
983                            "Failed to connect to `%4s' (no working `%s'), closing session.\n",
984                            GNUNET_i2s(&session->target),
985                            "HELLO");
986 #endif
987           disconnect_session (session);
988         }
989       return;
990     }
991   if ((hello == NULL) || (session->client != NULL))
992     {
993       GNUNET_break (0);         /* should this ever happen!? */
994       return;
995     }
996   count = 0;
997   GNUNET_HELLO_iterate_addresses (hello,
998                                   GNUNET_NO, &count_tcp_addresses, &count);
999   if (count == 0)
1000     {
1001 #if DEBUG_TCP
1002       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1003                        "tcp",
1004                        "Asked to connect to `%4s', but have no addresses to try.\n",
1005                        GNUNET_i2s(&session->target));
1006 #endif
1007       return;
1008     }
1009   cctx.plugin = session->plugin;
1010   cctx.sa = NULL;
1011   cctx.welcome = NULL;
1012   cctx.pos = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, count);
1013   GNUNET_HELLO_iterate_addresses (hello,
1014                                   GNUNET_NO, &try_connect_to_address, &cctx);
1015   if (cctx.sa == NULL)
1016     {
1017 #if DEBUG_TCP
1018       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1019                        "tcp",
1020                        "Asked to connect, but all addresses failed.\n");
1021 #endif
1022       GNUNET_free_non_null (cctx.welcome);
1023       return;
1024     }
1025   session->client = GNUNET_SERVER_connect_socket (session->plugin->server,
1026                                                   cctx.sa);
1027 #if DEBUG_TCP
1028   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
1029               "Connected to `%4s' for session %p\n",
1030               GNUNET_i2s(&session->target),
1031               session->client);
1032 #endif
1033   if (session->client == NULL)
1034     {
1035       GNUNET_break (0);         /* how could this happen? */
1036       GNUNET_free_non_null (cctx.welcome);
1037       return;
1038     }
1039   pm = cctx.welcome;
1040   /* prepend (!) */
1041   pm->next = session->pending_messages;
1042   session->pending_messages = pm;
1043 #if DEBUG_TCP
1044   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1045                    "tcp",
1046                    "Connected to `%4s', now sending `%s' message.\n",
1047                    GNUNET_i2s(&session->target),
1048                    "WELCOME");
1049 #endif
1050 }
1051
1052
1053 /**
1054  * Function that can be used by the transport service to transmit
1055  * a message using the plugin.
1056  *
1057  * @param cls closure
1058  * @param service_context value passed to the transport-service
1059  *        to identify the neighbour
1060  * @param target who should receive this message
1061  * @param priority how important is the message
1062  * @param msg the message to transmit
1063  * @param timeout when should we time out (give up) if we can not transmit?
1064  * @param cont continuation to call once the message has
1065  *        been transmitted (or if the transport is ready
1066  *        for the next transmission call; or if the
1067  *        peer disconnected...)
1068  * @param cont_cls closure for cont
1069  */
1070 static void 
1071 tcp_plugin_send (void *cls,
1072                  struct ReadyList *service_context,
1073                  const struct GNUNET_PeerIdentity *target,   
1074                  unsigned int priority,
1075                  const struct GNUNET_MessageHeader *msg,
1076                  struct GNUNET_TIME_Relative timeout,
1077                  GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls)
1078 {
1079   struct Plugin *plugin = cls;
1080   struct Session *session;
1081   struct PendingMessage *pm;
1082   struct PendingMessage *pme;
1083
1084   session = find_session_by_target (plugin, target);  
1085   pm = GNUNET_malloc (sizeof (struct PendingMessage) + ntohs (msg->size));
1086   pm->msg = (struct GNUNET_MessageHeader *) &pm[1];
1087   memcpy (pm->msg, msg, ntohs (msg->size));
1088   pm->timeout = GNUNET_TIME_relative_to_absolute (timeout);
1089   pm->transmit_cont = cont;
1090   pm->transmit_cont_cls = cont_cls;
1091   if (session == NULL)
1092     {
1093       session = GNUNET_malloc (sizeof (struct Session));
1094 #if DEBUG_TCP
1095       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1096                        "tcp",
1097                        "Asked to transmit, creating fresh session %p.\n",
1098                        session);
1099 #endif
1100       session->next = plugin->sessions;
1101       plugin->sessions = session;
1102       session->plugin = plugin;
1103       session->target = *target;
1104       session->last_quota_update = GNUNET_TIME_absolute_get ();
1105       session->quota_in = plugin->env->default_quota_in;
1106       session->expecting_welcome = GNUNET_YES;
1107       session->pending_messages = pm;
1108       session->service_context = service_context;
1109       GNUNET_PEERINFO_for_all (plugin->env->cfg,
1110                                plugin->env->sched,
1111                                target,
1112                                0, timeout, &session_try_connect, session);
1113       return;
1114     }
1115   GNUNET_assert (session != NULL);
1116   GNUNET_assert (session->client != NULL);
1117   session->service_context = service_context;
1118   /* append pm to pending_messages list */
1119   pme = session->pending_messages;
1120   if (pme == NULL)
1121     {
1122       session->pending_messages = pm;
1123     }
1124   else
1125     {
1126       while (NULL != pme->next)
1127         pme = pme->next;
1128       pme->next = pm;
1129     }
1130 #if DEBUG_TCP
1131   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1132                    "tcp", "Asked to transmit, added message to list.\n");
1133 #endif
1134   process_pending_messages (session);
1135 }
1136
1137
1138
1139 /**
1140  * Function that can be called to force a disconnect from the
1141  * specified neighbour.  This should also cancel all previously
1142  * scheduled transmissions.  Obviously the transmission may have been
1143  * partially completed already, which is OK.  The plugin is supposed
1144  * to close the connection (if applicable) and no longer call the
1145  * transmit continuation(s).
1146  *
1147  * Finally, plugin MUST NOT call the services's receive function to
1148  * notify the service that the connection to the specified target was
1149  * closed after a getting this call.
1150  *
1151  * @param cls closure
1152  * @param service_context must correspond to the service context
1153  *        of the corresponding Transmit call; the plugin should
1154  *        not cancel a send call made with a different service
1155  *        context pointer!  Never NULL.
1156  * @param target peer for which the last transmission is
1157  *        to be cancelled
1158  */
1159 static void
1160 tcp_plugin_cancel (void *cls,
1161                    struct ReadyList *service_context,
1162                    const struct GNUNET_PeerIdentity *target)
1163 {
1164   struct Plugin *plugin = cls;
1165   struct Session *session;
1166   struct PendingMessage *pm;
1167   
1168   session = find_session_by_target (plugin, target);
1169   if (session == NULL)
1170     {
1171       GNUNET_break (0);
1172       return;
1173     }
1174 #if DEBUG_TCP
1175   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1176                    "tcp",
1177                    "Asked to cancel session %p with `%4s'\n",
1178                    plugin_context,
1179                    GNUNET_i2s(target));
1180 #endif
1181   pm = session->pending_messages;
1182   while (pm != NULL)
1183     {
1184       pm->transmit_cont = NULL;
1185       pm->transmit_cont_cls = NULL;
1186       pm = pm->next;
1187     }
1188   session->service_context = NULL;
1189   if (session->client != NULL)
1190     {
1191       GNUNET_SERVER_client_drop (session->client);
1192       session->client = NULL;
1193     }
1194   /* rest of the clean-up of the session will be done as part of
1195      disconnect_notify which should be triggered any time now 
1196      (or which may be triggering this call in the first place) */
1197 }
1198
1199
1200 struct PrettyPrinterContext
1201 {
1202   GNUNET_TRANSPORT_AddressStringCallback asc;
1203   void *asc_cls;
1204   uint16_t port;
1205 };
1206
1207
1208 /**
1209  * Append our port and forward the result.
1210  */
1211 static void
1212 append_port (void *cls, const char *hostname)
1213 {
1214   struct PrettyPrinterContext *ppc = cls;
1215   char *ret;
1216
1217   if (hostname == NULL)
1218     {
1219       ppc->asc (ppc->asc_cls, NULL);
1220       GNUNET_free (ppc);
1221       return;
1222     }
1223   GNUNET_asprintf (&ret, "%s:%d", hostname, ppc->port);
1224   ppc->asc (ppc->asc_cls, ret);
1225   GNUNET_free (ret);
1226 }
1227
1228
1229 /**
1230  * Convert the transports address to a nice, human-readable
1231  * format.
1232  *
1233  * @param cls closure
1234  * @param type name of the transport that generated the address
1235  * @param addr one of the addresses of the host, NULL for the last address
1236  *        the specific address format depends on the transport
1237  * @param addrlen length of the address
1238  * @param numeric should (IP) addresses be displayed in numeric form?
1239  * @param timeout after how long should we give up?
1240  * @param asc function to call on each string
1241  * @param asc_cls closure for asc
1242  */
1243 static void
1244 tcp_plugin_address_pretty_printer (void *cls,
1245                                    const char *type,
1246                                    const void *addr,
1247                                    size_t addrlen,
1248                                    int numeric,
1249                                    struct GNUNET_TIME_Relative timeout,
1250                                    GNUNET_TRANSPORT_AddressStringCallback asc,
1251                                    void *asc_cls)
1252 {
1253   struct Plugin *plugin = cls;
1254   const struct sockaddr_in *v4;
1255   const struct sockaddr_in6 *v6;
1256   struct PrettyPrinterContext *ppc;
1257
1258   if ((addrlen != sizeof (struct sockaddr_in)) &&
1259       (addrlen != sizeof (struct sockaddr_in6)))
1260     {
1261       /* invalid address */
1262       GNUNET_break_op (0);
1263       asc (asc_cls, NULL);
1264       return;
1265     }
1266   ppc = GNUNET_malloc (sizeof (struct PrettyPrinterContext));
1267   ppc->asc = asc;
1268   ppc->asc_cls = asc_cls;
1269   if (addrlen == sizeof (struct sockaddr_in))
1270     {
1271       v4 = (const struct sockaddr_in *) addr;
1272       ppc->port = ntohs (v4->sin_port);
1273     }
1274   else
1275     {
1276       v6 = (const struct sockaddr_in6 *) addr;
1277       ppc->port = ntohs (v6->sin6_port);
1278
1279     }
1280   GNUNET_RESOLVER_hostname_get (plugin->env->sched,
1281                                 plugin->env->cfg,
1282                                 addr,
1283                                 addrlen,
1284                                 !numeric, timeout, &append_port, ppc);
1285 }
1286
1287
1288 /**
1289  * Update the last-received and bandwidth quota values
1290  * for this session.
1291  *
1292  * @param session session to update
1293  * @param force set to GNUNET_YES if we should update even
1294  *        though the minimum refresh time has not yet expired
1295  */
1296 static void
1297 update_quota (struct Session *session, int force)
1298 {
1299   struct GNUNET_TIME_Absolute now;
1300   unsigned long long delta;
1301   unsigned long long total_allowed;
1302   unsigned long long total_remaining;
1303
1304   now = GNUNET_TIME_absolute_get ();
1305   delta = now.value - session->last_quota_update.value;
1306   if ((delta < MIN_QUOTA_REFRESH_TIME) && (!force))
1307     return;                     /* too early, not enough data */
1308
1309   total_allowed = session->quota_in * delta;
1310   if (total_allowed > session->last_received)
1311     {
1312       /* got less than acceptable */
1313       total_remaining = total_allowed - session->last_received;
1314       session->last_received = 0;
1315       delta = total_remaining / session->quota_in;      /* bonus seconds */
1316       if (delta > MAX_BANDWIDTH_CARRY)
1317         delta = MAX_BANDWIDTH_CARRY;    /* limit amount of carry-over */
1318     }
1319   else
1320     {
1321       /* got more than acceptable */
1322       session->last_received -= total_allowed;
1323       delta = 0;
1324     }
1325   session->last_quota_update.value = now.value - delta;
1326 }
1327
1328
1329 /**
1330  * Set a quota for receiving data from the given peer; this is a
1331  * per-transport limit.  The transport should limit its read/select
1332  * calls to stay below the quota (in terms of incoming data).
1333  *
1334  * @param cls closure
1335  * @param target the peer for whom the quota is given
1336  * @param quota_in quota for receiving/sending data in bytes per ms
1337  */
1338 static void
1339 tcp_plugin_set_receive_quota (void *cls,
1340                               const struct GNUNET_PeerIdentity *target,
1341                               uint32_t quota_in)
1342 {
1343   struct Plugin *plugin = cls;
1344   struct Session *session;
1345
1346   session = find_session_by_target (plugin, target);
1347   if (session == NULL)
1348     return; /* peer must have disconnected, ignore */
1349   if (session->quota_in != quota_in)
1350     {
1351       update_quota (session, GNUNET_YES);
1352       if (session->quota_in > quota_in)
1353         session->last_quota_update = GNUNET_TIME_absolute_get ();
1354       session->quota_in = quota_in;
1355     }
1356 }
1357
1358
1359 /**
1360  * Check if the given port is plausible (must be either
1361  * our listen port or our advertised port).  If it is
1362  * neither, we return one of these two ports at random.
1363  *
1364  * @return either in_port or a more plausible port
1365  */
1366 static uint16_t
1367 check_port (struct Plugin *plugin, uint16_t in_port)
1368 {
1369   if ((in_port == plugin->adv_port) || (in_port == plugin->open_port))
1370     return in_port;
1371   return (GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
1372                                     2) == 0)
1373     ? plugin->open_port : plugin->adv_port;
1374 }
1375
1376
1377 /**
1378  * Another peer has suggested an address for this
1379  * peer and transport plugin.  Check that this could be a valid
1380  * address.  If so, consider adding it to the list
1381  * of addresses.
1382  *
1383  * @param cls closure
1384  * @param addr pointer to the address
1385  * @param addrlen length of addr
1386  * @return GNUNET_OK if this is a plausible address for this peer
1387  *         and transport
1388  */
1389 static int
1390 tcp_plugin_address_suggested (void *cls, const void *addr, size_t addrlen)
1391 {
1392   struct Plugin *plugin = cls;
1393   char buf[sizeof (struct sockaddr_in6)];
1394   struct sockaddr_in *v4;
1395   struct sockaddr_in6 *v6;
1396
1397   if ((addrlen != sizeof (struct sockaddr_in)) &&
1398       (addrlen != sizeof (struct sockaddr_in6)))
1399     {
1400       GNUNET_break_op (0);
1401       return GNUNET_SYSERR;
1402     }
1403   memcpy (buf, addr, sizeof (struct sockaddr_in6));
1404   if (addrlen == sizeof (struct sockaddr_in))
1405     {
1406       v4 = (struct sockaddr_in *) buf;
1407       v4->sin_port = htons (check_port (plugin, ntohs (v4->sin_port)));
1408     }
1409   else
1410     {
1411       v6 = (struct sockaddr_in6 *) buf;
1412       v6->sin6_port = htons (check_port (plugin, ntohs (v6->sin6_port)));
1413     }
1414 #if DEBUG_TCP
1415   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1416                    "tcp",
1417                    "Informing transport service about my address `%s'.\n",
1418                    GNUNET_a2s(addr, addrlen));
1419 #endif
1420   plugin->env->notify_address (plugin->env->cls,
1421                                "tcp",
1422                                buf, addrlen, LEARNED_ADDRESS_EXPIRATION);
1423   return GNUNET_OK;
1424 }
1425
1426
1427 /**
1428  * Send a validation challenge response.
1429  */
1430 static size_t
1431 send_vcr (void *cls,
1432           size_t size,
1433           void *buf)
1434 {
1435   struct ValidationChallengeResponse *vcr = cls;
1436   uint16_t msize;
1437
1438   if (NULL == buf)
1439     {
1440       GNUNET_free (vcr);
1441       return 0;
1442     }
1443   msize = ntohs(vcr->header.size);
1444   GNUNET_assert (size >= msize);
1445   memcpy (buf, vcr, msize);
1446   GNUNET_free (vcr);
1447   return msize;
1448 }
1449
1450
1451 /**
1452  * We've received a PING from this peer via TCP.
1453  * Send back our PONG.
1454  *
1455  * @param cls closure
1456  * @param client identification of the client
1457  * @param message the actual message
1458  */
1459 static void
1460 handle_tcp_ping (void *cls,
1461                  struct GNUNET_SERVER_Client *client,
1462                  const struct GNUNET_MessageHeader *message)
1463 {
1464   struct Plugin *plugin = cls;
1465   const struct ValidationChallengeMessage *vcm;
1466   struct ValidationChallengeResponse *vcr;
1467   uint16_t msize;
1468   size_t addrlen;
1469   void *addr;
1470
1471 #if DEBUG_TRANSPORT
1472   if (GNUNET_OK ==
1473       GNUNET_SERVER_client_get_address (client,
1474                                         (void **) &addr,
1475                                         &addrlen))
1476     {
1477       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1478                   "Processing `%s' from `%s'\n",
1479                   "PING",
1480                   GNUNET_a2s (addr, addrlen));
1481       GNUNET_free (addr);
1482     }
1483 #endif
1484   msize = ntohs (message->size);
1485   if (msize < sizeof (struct ValidationChallengeMessage))
1486     {
1487       GNUNET_break_op (0);
1488       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);  
1489       return;
1490     }
1491   vcm = (const struct ValidationChallengeMessage *) message;
1492   if (0 != memcmp (&vcm->target,
1493                    plugin->env->my_identity, sizeof (struct GNUNET_PeerIdentity)))
1494     {
1495       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
1496                   _("Received `%s' message not destined for me!\n"), "PING");
1497       /* TODO: call statistics */
1498       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);  
1499       return;
1500     }
1501   msize -= sizeof (struct ValidationChallengeMessage);
1502   if (GNUNET_OK !=
1503       tcp_plugin_address_suggested (plugin, &vcm[1], msize))
1504     {
1505       GNUNET_break_op (0);
1506       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);  
1507       return;
1508     }
1509   if (GNUNET_OK !=
1510       GNUNET_SERVER_client_get_address (client,
1511                                         &addr,
1512                                         &addrlen))
1513     {
1514       GNUNET_break (0);
1515       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);  
1516       return;
1517     }
1518   vcr = GNUNET_malloc (sizeof (struct ValidationChallengeResponse) + addrlen);
1519   vcr->header.size = htons (sizeof (struct ValidationChallengeResponse) + addrlen);
1520   vcr->header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_TCP_PONG);
1521   vcr->purpose.size =
1522     htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
1523            sizeof (uint32_t) +
1524            sizeof ( struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
1525            addrlen);
1526   vcr->purpose.purpose = htonl (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_TCP_PING);
1527   vcr->challenge = vcm->challenge;
1528   vcr->signer = *plugin->env->my_public_key;
1529   memcpy (&vcr[1],
1530           addr,
1531           addrlen);
1532   GNUNET_assert (GNUNET_OK ==
1533                  GNUNET_CRYPTO_rsa_sign (plugin->env->my_private_key,
1534                                          &vcr->purpose,
1535                                          &vcr->signature));
1536 #if EXTRA_CHECKS
1537   GNUNET_assert (GNUNET_OK ==
1538                  GNUNET_CRYPTO_rsa_verify
1539                  (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_TCP_PING,
1540                   &vcr->purpose,
1541                   &vcr->signature,
1542                   plugin->env->my_public_key));
1543 #endif
1544   GNUNET_free (addr);
1545   if (NULL ==
1546       GNUNET_SERVER_notify_transmit_ready (client,
1547                                            sizeof (struct ValidationChallengeResponse) + addrlen,
1548                                            GNUNET_TIME_UNIT_SECONDS,
1549                                            &send_vcr,
1550                                            vcr))
1551     {
1552       GNUNET_break (0);
1553       GNUNET_free (vcr);
1554     }
1555   /* after a PING, we always close the connection */
1556   GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);  
1557 }
1558
1559
1560 /**
1561  * Handle PONG-message.
1562  *
1563  * @param cls handle for this plugin
1564  * @param client from where did we receive the PONG
1565  * @param message the actual message
1566  */
1567 static void
1568 handle_tcp_pong (void *cls,
1569                  struct GNUNET_SERVER_Client *client,
1570                  const struct GNUNET_MessageHeader *message)
1571 {
1572   struct Plugin *plugin = cls;
1573   const struct ValidationChallengeResponse *vcr;
1574   struct GNUNET_PeerIdentity peer;
1575   char *sender_addr;
1576   size_t addrlen;
1577   const struct sockaddr *addr;
1578   struct sockaddr_in v4;
1579   struct sockaddr_in6 v6;
1580
1581 #if DEBUG_TRANSPORT
1582   struct sockaddr *claddr;
1583
1584   if (GNUNET_OK ==
1585       GNUNET_SERVER_client_get_address (client,
1586                                         (void**) &claddr,
1587                                         &addrlen))
1588     {
1589       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
1590                   "Processing `%s' from `%s'\n",
1591                   "PONG",
1592                   GNUNET_a2s (claddr, addrlen));
1593       GNUNET_free (claddr);
1594     }
1595 #endif
1596   if (ntohs(message->size) < sizeof(struct ValidationChallengeResponse))
1597     {
1598       GNUNET_break_op (0);
1599       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);  
1600       return;
1601     }
1602   addrlen = ntohs(message->size) - sizeof(struct ValidationChallengeResponse);
1603   vcr = (const struct ValidationChallengeResponse *) message;
1604   if ( (ntohl(vcr->purpose.size) !=
1605         sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) +
1606         sizeof (uint32_t) +
1607         sizeof ( struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) +
1608         addrlen))
1609     {
1610       GNUNET_break_op (0);
1611       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);  
1612       return;
1613     }
1614   if (GNUNET_OK !=
1615       GNUNET_CRYPTO_rsa_verify
1616       (GNUNET_SIGNATURE_PURPOSE_TRANSPORT_TCP_PING, 
1617        &vcr->purpose,
1618        &vcr->signature, 
1619        &vcr->signer))
1620     {
1621       GNUNET_break_op (0);
1622       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);  
1623       return;
1624     }
1625   GNUNET_CRYPTO_hash (&vcr->signer,
1626                       sizeof(  struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded),
1627                       &peer.hashPubKey);
1628   addr = (const struct sockaddr*) &vcr[1];
1629   if (addrlen == sizeof (struct sockaddr_in))
1630     {
1631       memcpy (&v4, addr, sizeof (struct sockaddr_in));
1632       v4.sin_port = htons(check_port (plugin, ntohs (v4.sin_port)));
1633       sender_addr = GNUNET_strdup (GNUNET_a2s((const struct sockaddr*) &v4,
1634                                               addrlen));
1635     }
1636   else if (addrlen == sizeof (struct sockaddr_in6))
1637     {
1638       memcpy (&v6, addr, sizeof (struct sockaddr_in6));
1639       v6.sin6_port = htons(check_port (plugin, ntohs (v6.sin6_port)));
1640       sender_addr = GNUNET_strdup (GNUNET_a2s((const struct sockaddr*) &v6,
1641                                               addrlen));
1642     }
1643   else
1644     {
1645       GNUNET_break_op (0); 
1646       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);  
1647       return;
1648     }
1649   plugin->env->notify_validation (plugin->env->cls,
1650                                   "tcp",
1651                                   &peer,
1652                                   ntohl(vcr->challenge),
1653                                   sender_addr);
1654   GNUNET_free (sender_addr);
1655   /* after a PONG, we always close the connection */
1656   GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);  
1657 }
1658
1659
1660 /**
1661  * We've received a welcome from this peer via TCP.
1662  * Possibly create a fresh client record and send back
1663  * our welcome.
1664  *
1665  * @param cls closure
1666  * @param client identification of the client
1667  * @param message the actual message
1668  */
1669 static void
1670 handle_tcp_welcome (void *cls,
1671                     struct GNUNET_SERVER_Client *client,
1672                     const struct GNUNET_MessageHeader *message)
1673 {
1674   struct Plugin *plugin = cls;
1675   struct Session *session_c;
1676   const struct WelcomeMessage *wm;
1677   uint16_t msize;
1678   uint32_t addrlen;
1679   size_t alen;
1680   void *vaddr;
1681   const struct sockaddr *addr;
1682
1683   msize = ntohs (message->size);
1684   if (msize < sizeof (struct WelcomeMessage))
1685     {
1686       GNUNET_break_op (0);
1687       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1688       return;
1689     }
1690   wm = (const struct WelcomeMessage *) message;
1691 #if DEBUG_TCP
1692   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1693                    "tcp",
1694                    "Received `%s' message from `%4s/%p'.\n", "WELCOME",
1695                    GNUNET_i2s(&wm->clientIdentity),
1696                    client);
1697 #endif
1698   session_c = find_session_by_client (plugin, client);
1699   if (session_c == NULL)
1700     {
1701       vaddr = NULL;
1702       GNUNET_SERVER_client_get_address (client, &vaddr, &alen);
1703       GNUNET_SERVER_client_keep (client);
1704       session_c = create_session (plugin,
1705                                   &wm->clientIdentity, client, vaddr, alen);
1706 #if DEBUG_TCP
1707       GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1708                        "tcp",
1709                        "Creating new session %p for incoming `%s' message.\n",
1710                        session_c, "WELCOME");
1711 #endif
1712       GNUNET_free_non_null (vaddr);
1713       process_pending_messages (session_c);
1714     }
1715   if (session_c->expecting_welcome != GNUNET_YES)
1716     {
1717       GNUNET_break_op (0);
1718       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1719       return;
1720     }
1721   session_c->expecting_welcome = GNUNET_NO;
1722   if (0 < (addrlen = msize - sizeof (struct WelcomeMessage)))
1723     {
1724       addr = (const struct sockaddr *) &wm[1];
1725       tcp_plugin_address_suggested (plugin, addr, addrlen);
1726     }
1727   GNUNET_SERVER_receive_done (client, GNUNET_OK);
1728 }
1729
1730
1731 /**
1732  * Calculate how long we should delay reading from the TCP socket to
1733  * ensure that we stay within our bandwidth limits (push back).
1734  *
1735  * @param session for which client should this be calculated
1736  */
1737 static struct GNUNET_TIME_Relative
1738 calculate_throttle_delay (struct Session *session)
1739 {
1740   struct GNUNET_TIME_Relative ret;
1741   struct GNUNET_TIME_Absolute now;
1742   uint64_t del;
1743   uint64_t avail;
1744   uint64_t excess;
1745
1746   now = GNUNET_TIME_absolute_get ();
1747   del = now.value - session->last_quota_update.value;
1748   if (del > MAX_BANDWIDTH_CARRY)
1749     {
1750       update_quota (session, GNUNET_YES);
1751       del = now.value - session->last_quota_update.value;
1752       GNUNET_assert (del <= MAX_BANDWIDTH_CARRY);
1753     }
1754   if (session->quota_in == 0)
1755     session->quota_in = 1;      /* avoid divison by zero */
1756   avail = del * session->quota_in;
1757   if (avail > session->last_received)
1758     return GNUNET_TIME_UNIT_ZERO;       /* can receive right now */
1759   excess = session->last_received - avail;
1760   ret.value = excess / session->quota_in;
1761   return ret;
1762 }
1763
1764
1765 /**
1766  * Task to signal the server that we can continue
1767  * receiving from the TCP client now.
1768  */
1769 static void
1770 delayed_done (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
1771 {
1772   struct Session *session = cls;
1773   GNUNET_SERVER_receive_done (session->client, GNUNET_OK);
1774 }
1775
1776
1777 /**
1778  * We've received data for this peer via TCP.  Unbox,
1779  * compute latency and forward.
1780  *
1781  * @param cls closure
1782  * @param client identification of the client
1783  * @param message the actual message
1784  */
1785 static void
1786 handle_tcp_data (void *cls,
1787                  struct GNUNET_SERVER_Client *client,
1788                  const struct GNUNET_MessageHeader *message)
1789 {
1790   struct Plugin *plugin = cls;
1791   struct Session *session;
1792   const struct DataMessage *dm;
1793   uint16_t msize;
1794   const struct GNUNET_MessageHeader *msg;
1795   struct GNUNET_TIME_Relative latency;
1796   struct GNUNET_TIME_Absolute ttime;
1797   struct GNUNET_TIME_Absolute now;
1798   struct GNUNET_TIME_Relative delay;
1799   uint64_t ack_in;
1800
1801   msize = ntohs (message->size);
1802   if ((msize <
1803        sizeof (struct DataMessage) + sizeof (struct GNUNET_MessageHeader)))
1804     {
1805       GNUNET_break_op (0);
1806       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1807       return;
1808     }
1809   session = find_session_by_client (plugin, client);
1810   if ((NULL == session) || (GNUNET_NO != session->expecting_welcome))
1811     {
1812       GNUNET_break_op (0);
1813       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1814       return;
1815     }
1816 #if DEBUG_TCP
1817   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1818                    "tcp", "Receiving %u bytes from `%4s'.\n",
1819                    msize,
1820                    GNUNET_i2s(&session->target));
1821 #endif
1822   dm = (const struct DataMessage *) message;
1823   session->max_in_msg_counter = GNUNET_MAX (session->max_in_msg_counter,
1824                                             GNUNET_ntohll (dm->ack_out));
1825   msg = (const struct GNUNET_MessageHeader *) &dm[1];
1826   if (msize != sizeof (struct DataMessage) + ntohs (msg->size))
1827     {
1828       GNUNET_break_op (0);
1829       GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
1830       return;
1831     }
1832   /* estimate latency */
1833   ack_in = GNUNET_ntohll (dm->ack_in);
1834   if ((ack_in <= session->out_msg_counter) &&
1835       (session->out_msg_counter - ack_in < ACK_LOG_SIZE))
1836     {
1837       delay = GNUNET_TIME_relative_ntoh (dm->delay);
1838       ttime = session->gen_time[ack_in % ACK_LOG_SIZE];
1839       now = GNUNET_TIME_absolute_get ();
1840       if (delay.value > now.value - ttime.value)
1841         delay.value = 0;        /* not plausible */
1842       /* update (round-trip) latency using ageing; we
1843          use 7:1 so that we can reasonably quickly react
1844          to changes, but not so fast that latency is largely
1845          jitter... */
1846       session->latency_estimate
1847         = ((7 * session->latency_estimate) +
1848            (now.value - ttime.value - delay.value)) / 8;
1849     }
1850   latency.value = (uint64_t) session->latency_estimate;
1851   /* deliver on */
1852 #if DEBUG_TCP
1853   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1854                    "tcp",
1855                    "Forwarding data of type %u to transport service.\n",
1856                    ntohs (msg->type));
1857 #endif
1858   session->service_context
1859     = plugin->env->receive (plugin->env->cls,
1860                             session->service_context,
1861                             latency, &session->target, msg);
1862   /* update bandwidth used */
1863   session->last_received += msize;
1864   update_quota (session, GNUNET_NO);
1865
1866   delay = calculate_throttle_delay (session);
1867   if (delay.value == 0)
1868     GNUNET_SERVER_receive_done (client, GNUNET_OK);
1869   else
1870     GNUNET_SCHEDULER_add_delayed (session->plugin->env->sched,
1871                                   delay, &delayed_done, session);
1872 }
1873
1874
1875 /**
1876  * Handlers for the various TCP messages.
1877  */
1878 static struct GNUNET_SERVER_MessageHandler my_handlers[] = {
1879   {&handle_tcp_ping, NULL, GNUNET_MESSAGE_TYPE_TRANSPORT_TCP_PING, 0},
1880   {&handle_tcp_pong, NULL, GNUNET_MESSAGE_TYPE_TRANSPORT_TCP_PONG, 0},
1881   {&handle_tcp_welcome, NULL, GNUNET_MESSAGE_TYPE_TRANSPORT_TCP_WELCOME, 0},
1882   {&handle_tcp_data, NULL, GNUNET_MESSAGE_TYPE_TRANSPORT_TCP_DATA, 0},
1883   {NULL, NULL, 0, 0}
1884 };
1885
1886
1887 static void
1888 create_tcp_handlers (struct Plugin *plugin)
1889 {
1890   unsigned int i;
1891   plugin->handlers = GNUNET_malloc (sizeof (my_handlers));
1892   memcpy (plugin->handlers, my_handlers, sizeof (my_handlers));
1893   for (i = 0;
1894        i <
1895        sizeof (my_handlers) / sizeof (struct GNUNET_SERVER_MessageHandler);
1896        i++)
1897     plugin->handlers[i].callback_cls = plugin;
1898   GNUNET_SERVER_add_handlers (plugin->server, plugin->handlers);
1899 }
1900
1901
1902 /**
1903  * Functions with this signature are called whenever a peer
1904  * is disconnected on the network level.
1905  *
1906  * @param cls closure
1907  * @param client identification of the client
1908  */
1909 static void
1910 disconnect_notify (void *cls, struct GNUNET_SERVER_Client *client)
1911 {
1912   struct Plugin *plugin = cls;
1913   struct Session *session;
1914
1915   session = find_session_by_client (plugin, client);
1916   if (session == NULL)
1917     return;                     /* unknown, nothing to do */
1918 #if DEBUG_TCP
1919   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
1920                    "tcp",
1921                    "Destroying session of `%4s' with %s (%p) due to network-level disconnect.\n",
1922                    GNUNET_i2s(&session->target),
1923                    (session->connect_addr != NULL) ? 
1924                    GNUNET_a2s(session->connect_addr,
1925                               session->connect_alen) : "*",
1926                    client);
1927 #endif
1928   disconnect_session (session);
1929 }
1930
1931
1932 /**
1933  * Add the IP of our network interface to the list of
1934  * our external IP addresses.
1935  */
1936 static int
1937 process_interfaces (void *cls,
1938                     const char *name,
1939                     int isDefault,
1940                     const struct sockaddr *addr, socklen_t addrlen)
1941 {
1942   struct Plugin *plugin = cls;
1943   int af;
1944   struct sockaddr_in *v4;
1945   struct sockaddr_in6 *v6;
1946
1947   af = addr->sa_family;
1948   if (af == AF_INET)
1949     {
1950       v4 = (struct sockaddr_in *) addr;
1951       v4->sin_port = htons (plugin->adv_port);
1952     }
1953   else
1954     {
1955       GNUNET_assert (af == AF_INET6);
1956       v6 = (struct sockaddr_in6 *) addr;
1957       v6->sin6_port = htons (plugin->adv_port);
1958     }
1959   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO |
1960                    GNUNET_ERROR_TYPE_BULK,
1961                    "tcp", _("Found address `%s' (%s)\n"), 
1962                    GNUNET_a2s(addr, addrlen), name);
1963   plugin->env->notify_address (plugin->env->cls,
1964                                "tcp",
1965                                addr, addrlen, GNUNET_TIME_UNIT_FOREVER_REL);
1966   return GNUNET_OK;
1967 }
1968
1969
1970 /**
1971  * Function called by the resolver for each address obtained from DNS
1972  * for our own hostname.  Add the addresses to the list of our
1973  * external IP addresses.
1974  *
1975  * @param cls closure
1976  * @param addr one of the addresses of the host, NULL for the last address
1977  * @param addrlen length of the address
1978  */
1979 static void
1980 process_hostname_ips (void *cls,
1981                       const struct sockaddr *addr, socklen_t addrlen)
1982 {
1983   struct Plugin *plugin = cls;
1984
1985   if (addr == NULL)
1986     {
1987       plugin->hostname_dns = NULL;
1988       return;
1989     }
1990   process_interfaces (plugin,
1991                       "<hostname>",
1992                       GNUNET_YES,
1993                       addr,
1994                       addrlen);
1995 }
1996
1997
1998 /**
1999  * Entry point for the plugin.
2000  */
2001 void *
2002 libgnunet_plugin_transport_tcp_init (void *cls)
2003 {
2004   struct GNUNET_TRANSPORT_PluginEnvironment *env = cls;
2005   struct GNUNET_TRANSPORT_PluginFunctions *api;
2006   struct Plugin *plugin;
2007   struct GNUNET_SERVICE_Context *service;
2008   unsigned long long aport;
2009   unsigned long long bport;
2010
2011   service = GNUNET_SERVICE_start ("transport-tcp", env->sched, env->cfg);
2012   if (service == NULL)
2013     {
2014       GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING,
2015                        "tcp",
2016                        _
2017                        ("Failed to start service for `%s' transport plugin.\n"),
2018                        "tcp");
2019       return NULL;
2020     }
2021   aport = 0;
2022   if ((GNUNET_OK !=
2023        GNUNET_CONFIGURATION_get_value_number (env->cfg,
2024                                               "transport-tcp",
2025                                               "PORT",
2026                                               &bport)) ||
2027       (bport > 65535) ||
2028       ((GNUNET_OK ==
2029         GNUNET_CONFIGURATION_get_value_number (env->cfg,
2030                                                "transport-tcp",
2031                                                "ADVERTISED-PORT",
2032                                                &aport)) && (aport > 65535)))
2033     {
2034       GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
2035                        "tcp",
2036                        _
2037                        ("Require valid port number for service `%s' in configuration!\n"),
2038                        "transport-tcp");
2039       GNUNET_SERVICE_stop (service);
2040       return NULL;
2041     }
2042   if (aport == 0)
2043     aport = bport;
2044   plugin = GNUNET_malloc (sizeof (struct Plugin));
2045   plugin->open_port = bport;
2046   plugin->adv_port = aport;
2047   plugin->env = env;
2048   plugin->lsock = NULL;
2049   plugin->statistics = NULL;
2050   api = GNUNET_malloc (sizeof (struct GNUNET_TRANSPORT_PluginFunctions));
2051   api->cls = plugin;
2052   api->validate = &tcp_plugin_validate;
2053   api->send = &tcp_plugin_send;
2054   api->cancel = &tcp_plugin_cancel;
2055   api->address_pretty_printer = &tcp_plugin_address_pretty_printer;
2056   api->set_receive_quota = &tcp_plugin_set_receive_quota;
2057   api->address_suggested = &tcp_plugin_address_suggested;
2058   api->cost_estimate = 42;      /* TODO: ATS */
2059   plugin->service = service;
2060   plugin->server = GNUNET_SERVICE_get_server (service);
2061   create_tcp_handlers (plugin);
2062   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
2063                    "tcp", _("TCP transport listening on port %llu\n"), bport);
2064   if (aport != bport)
2065     GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
2066                      "tcp",
2067                      _
2068                      ("TCP transport advertises itself as being on port %llu\n"),
2069                      aport);
2070   GNUNET_SERVER_disconnect_notify (plugin->server, &disconnect_notify,
2071                                    plugin);
2072   GNUNET_OS_network_interfaces_list (&process_interfaces, plugin);
2073   plugin->hostname_dns = GNUNET_RESOLVER_hostname_resolve (env->sched,
2074                                                            env->cfg,
2075                                                            AF_UNSPEC,
2076                                                            HOSTNAME_RESOLVE_TIMEOUT,
2077                                                            &process_hostname_ips, plugin);
2078   return api;
2079 }
2080
2081
2082 /**
2083  * Exit point from the plugin.
2084  */
2085 void *
2086 libgnunet_plugin_transport_tcp_done (void *cls)
2087 {
2088   struct GNUNET_TRANSPORT_PluginFunctions *api = cls;
2089   struct Plugin *plugin = api->cls;
2090   struct Session *session;
2091
2092   while (NULL != (session = plugin->sessions))
2093     disconnect_session (session);    
2094   if (NULL != plugin->hostname_dns)
2095     {
2096       GNUNET_RESOLVER_request_cancel (plugin->hostname_dns);
2097       plugin->hostname_dns = NULL;
2098     }
2099   GNUNET_SERVICE_stop (plugin->service);
2100   GNUNET_free (plugin->handlers);
2101   GNUNET_free (plugin);
2102   GNUNET_free (api);
2103   return NULL;
2104 }
2105
2106 /* end of plugin_transport_tcp.c */