Improve Connection with threading and some kind of congestion control
authorPerttu Ahola <celeron55@gmail.com>
Thu, 20 Oct 2011 20:04:09 +0000 (23:04 +0300)
committerPerttu Ahola <celeron55@gmail.com>
Thu, 20 Oct 2011 20:04:09 +0000 (23:04 +0300)
12 files changed:
minetest.conf.example
src/client.cpp
src/client.h
src/connection.cpp
src/connection.h
src/defaultsettings.cpp
src/main.cpp
src/map.cpp
src/server.cpp
src/server.h
src/servercommand.cpp
src/test.cpp

index 01b3f3e7cd5fb62c48dd5f67b00a37451c64794e..30adc5e601cb472ce85232c65d44001bafcb3445 100644 (file)
 #active_object_send_range_blocks = 3
 #active_block_range = 2
 #max_simultaneous_block_sends_per_client = 2
-#max_simultaneous_block_sends_server_total = 2
+#max_simultaneous_block_sends_server_total = 8
 #max_block_send_distance = 7
 #max_block_generate_distance = 5
 #time_send_interval = 20
index 5ec53a524ef50b494890747b8d9ea6286cff0ca3..a777293a36dc01128025a93f0505fc3ec2a9c0dc 100644 (file)
@@ -246,7 +246,7 @@ void Client::connect(Address address)
 {
        DSTACK(__FUNCTION_NAME);
        //JMutexAutoLock lock(m_con_mutex); //bulk comment-out
-       m_con.setTimeoutMs(0);
+       m_con.SetTimeoutMs(0);
        m_con.Connect(address);
 }
 
@@ -563,8 +563,8 @@ void Client::step(float dtime)
                        counter = 0.0;
                        //JMutexAutoLock lock(m_con_mutex); //bulk comment-out
                        // connectedAndInitialized() is true, peer exists.
-                       con::Peer *peer = m_con.GetPeer(PEER_ID_SERVER);
-                       infostream<<"Client: avg_rtt="<<peer->avg_rtt<<std::endl;
+                       float avg_rtt = m_con.GetPeerAvgRTT(PEER_ID_SERVER);
+                       infostream<<"Client: avg_rtt="<<avg_rtt<<std::endl;
                }
        }
 
@@ -709,14 +709,6 @@ void Client::ProcessData(u8 *data, u32 datasize, u16 sender_peer_id)
                return;
        }
 
-       con::Peer *peer;
-       {
-               //JMutexAutoLock lock(m_con_mutex); //bulk comment-out
-               // All data is coming from the server
-               // PeerNotFoundException is handled by caller.
-               peer = m_con.GetPeer(PEER_ID_SERVER);
-       }
-
        u8 ser_version = m_server_ser_ver;
 
        //infostream<<"Client received command="<<(int)command<<std::endl;
@@ -2168,9 +2160,10 @@ ClientEvent Client::getClientEvent()
 
 float Client::getRTT(void)
 {
-       con::Peer *peer = m_con.GetPeerNoEx(PEER_ID_SERVER);
-       if(!peer)
-               return 0.0;
-       return peer->avg_rtt;
+       try{
+               return m_con.GetPeerAvgRTT(PEER_ID_SERVER);
+       } catch(con::PeerNotFoundException &e){
+               return 1337;
+       }
 }
 
index 52dd66ca2c6d2b1cd369d0bde77f80a8774b070c..8585f6d4a6a7c8514a4bc0a1df2de175785b3a38 100644 (file)
@@ -251,11 +251,11 @@ public:
 
        float getAvgRtt()
        {
-               //JMutexAutoLock lock(m_con_mutex); //bulk comment-out
-               con::Peer *peer = m_con.GetPeerNoEx(PEER_ID_SERVER);
-               if(peer == NULL)
-                       return 0.0;
-               return peer->avg_rtt;
+               try{
+                       return m_con.GetPeerAvgRTT(PEER_ID_SERVER);
+               } catch(con::PeerNotFoundException){
+                       return 1337;
+               }
        }
 
        bool getChatMessage(std::wstring &message)
index 623994c4c043785ca8e2255cd090b8585c939362..1c424839f96c739ff1676b8d661eec4c133f6031 100644 (file)
@@ -20,6 +20,8 @@ with this program; if not, write to the Free Software Foundation, Inc.,
 #include "connection.h"
 #include "main.h"
 #include "serialization.h"
+#include "log.h"
+#include "porting.h"
 
 namespace con
 {
@@ -439,16 +441,19 @@ Channel::~Channel()
        Peer
 */
 
-Peer::Peer(u16 a_id, Address a_address)
+Peer::Peer(u16 a_id, Address a_address):
+       address(a_address),
+       id(a_id),
+       timeout_counter(0.0),
+       ping_timer(0.0),
+       resend_timeout(0.5),
+       avg_rtt(-1.0),
+       has_sent_with_id(false),
+       m_sendtime_accu(0),
+       m_max_packets_per_second(10),
+       m_num_sent(0),
+       m_max_num_sent(0)
 {
-       id = a_id;
-       address = a_address;
-       timeout_counter = 0.0;
-       //resend_timeout = RESEND_TIMEOUT_MINIMUM;
-       ping_timer = 0.0;
-       resend_timeout = 0.5;
-       avg_rtt = -1.0;
-       has_sent_with_id = false;
 }
 Peer::~Peer()
 {
@@ -456,6 +461,19 @@ Peer::~Peer()
 
 void Peer::reportRTT(float rtt)
 {
+       if(rtt >= 0.0){
+               if(rtt < 0.01){
+                       if(m_max_packets_per_second < 100)
+                               m_max_packets_per_second += 10;
+               } else if(rtt < 0.2){
+                       if(m_max_packets_per_second < 100)
+                               m_max_packets_per_second += 2;
+               } else {
+                       if(m_max_packets_per_second > 5)
+                               m_max_packets_per_second *= 0.5;
+               }
+       }
+
        if(rtt < -0.999)
        {}
        else if(avg_rtt < 0.0)
@@ -485,897 +503,1154 @@ void Peer::reportRTT(float rtt)
        Connection
 */
 
-Connection::Connection(
-       u32 protocol_id,
-       u32 max_packet_size,
-       float timeout,
-       PeerHandler *peerhandler
-)
+Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout):
+       m_protocol_id(protocol_id),
+       m_max_packet_size(max_packet_size),
+       m_timeout(timeout),
+       m_peer_id(0),
+       m_bc_peerhandler(NULL),
+       m_bc_receive_timeout(0),
+       m_indentation(0)
 {
-       assert(peerhandler != NULL);
+       m_socket.setTimeoutMs(5);
 
-       m_protocol_id = protocol_id;
-       m_max_packet_size = max_packet_size;
-       m_timeout = timeout;
-       m_peer_id = PEER_ID_INEXISTENT;
-       //m_waiting_new_peer_id = false;
-       m_indentation = 0;
-       m_peerhandler = peerhandler;
+       Start();
 }
 
-Connection::~Connection()
+Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
+               PeerHandler *peerhandler):
+       m_protocol_id(protocol_id),
+       m_max_packet_size(max_packet_size),
+       m_timeout(timeout),
+       m_peer_id(0),
+       m_bc_peerhandler(peerhandler),
+       m_bc_receive_timeout(0),
+       m_indentation(0)
 {
-       // Clear peers
-       core::map<u16, Peer*>::Iterator j;
-       j = m_peers.getIterator();
-       for(; j.atEnd() == false; j++)
-       {
-               Peer *peer = j.getNode()->getValue();
-               delete peer;
-       }
+       m_socket.setTimeoutMs(5);
+
+       Start();
 }
 
-void Connection::Serve(unsigned short port)
+
+Connection::~Connection()
 {
-       m_socket.Bind(port);
-       m_peer_id = PEER_ID_SERVER;
+       stop();
 }
 
-void Connection::Connect(Address address)
+/* Internal stuff */
+
+void * Connection::Thread()
 {
-       core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
-       if(node != NULL){
-               throw ConnectionException("Already connected to a server");
-       }
+       ThreadStarted();
+       log_register_thread("Connection");
 
-       Peer *peer = new Peer(PEER_ID_SERVER, address);
-       m_peers.insert(peer->id, peer);
-       m_peerhandler->peerAdded(peer);
-       
-       m_socket.Bind(0);
+       dout_con<<"Connection thread started"<<std::endl;
        
-       // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
-       m_peer_id = PEER_ID_INEXISTENT;
-       SharedBuffer<u8> data(0);
-       Send(PEER_ID_SERVER, 0, data, true);
+       u32 curtime = porting::getTimeMs();
+       u32 lasttime = curtime;
+
+       while(getRun())
+       {
+               BEGIN_DEBUG_EXCEPTION_HANDLER
+               
+               lasttime = curtime;
+               curtime = porting::getTimeMs();
+               float dtime = (float)(curtime - lasttime) / 1000.;
+               if(dtime > 0.1)
+                       dtime = 0.1;
+               if(dtime < 0.0)
+                       dtime = 0.0;
+               
+               runTimeouts(dtime);
+
+               while(m_command_queue.size() != 0){
+                       ConnectionCommand c = m_command_queue.pop_front();
+                       processCommand(c);
+               }
+
+               send(dtime);
+
+               receive();
+               
+               END_DEBUG_EXCEPTION_HANDLER(derr_con);
+       }
 
-       //m_waiting_new_peer_id = true;
+       return NULL;
 }
 
-void Connection::Disconnect()
+void Connection::putEvent(ConnectionEvent &e)
 {
-       // Create and send DISCO packet
-       SharedBuffer<u8> data(2);
-       writeU8(&data[0], TYPE_CONTROL);
-       writeU8(&data[1], CONTROLTYPE_DISCO);
-       
-       // Send to all
-       core::map<u16, Peer*>::Iterator j;
-       j = m_peers.getIterator();
-       for(; j.atEnd() == false; j++)
-       {
-               Peer *peer = j.getNode()->getValue();
-               SendAsPacket(peer->id, 0, data, false);
-       }
+       assert(e.type != CONNEVENT_NONE);
+       m_event_queue.push_back(e);
 }
 
-bool Connection::Connected()
+void Connection::processCommand(ConnectionCommand &c)
 {
-       if(m_peers.size() != 1)
-               return false;
-               
-       core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
-       if(node == NULL)
-               return false;
-       
-       if(m_peer_id == PEER_ID_INEXISTENT)
-               return false;
-       
-       return true;
+       switch(c.type){
+       case CONNCMD_NONE:
+               dout_con<<getDesc()<<" processing CONNCMD_NONE"<<std::endl;
+               return;
+       case CONNCMD_SERVE:
+               dout_con<<getDesc()<<" processing CONNCMD_SERVE port="
+                               <<c.port<<std::endl;
+               serve(c.port);
+               return;
+       case CONNCMD_CONNECT:
+               dout_con<<getDesc()<<" processing CONNCMD_CONNECT"<<std::endl;
+               connect(c.address);
+               return;
+       case CONNCMD_DISCONNECT:
+               dout_con<<getDesc()<<" processing CONNCMD_DISCONNECT"<<std::endl;
+               disconnect();
+               return;
+       case CONNCMD_SEND:
+               dout_con<<getDesc()<<" processing CONNCMD_SEND"<<std::endl;
+               send(c.peer_id, c.channelnum, c.data, c.reliable);
+               return;
+       case CONNCMD_SEND_TO_ALL:
+               dout_con<<getDesc()<<" processing CONNCMD_SEND_TO_ALL"<<std::endl;
+               sendToAll(c.channelnum, c.data, c.reliable);
+               return;
+       case CONNCMD_DELETE_PEER:
+               dout_con<<getDesc()<<" processing CONNCMD_DELETE_PEER"<<std::endl;
+               deletePeer(c.peer_id, false);
+               return;
+       }
 }
 
-SharedBuffer<u8> Channel::ProcessPacket(
-               SharedBuffer<u8> packetdata,
-               Connection *con,
-               u16 peer_id,
-               u8 channelnum,
-               bool reliable)
+void Connection::send(float dtime)
 {
-       IndentationRaiser iraiser(&(con->m_indentation));
+       for(core::map<u16, Peer*>::Iterator
+                       j = m_peers.getIterator();
+                       j.atEnd() == false; j++)
+       {
+               Peer *peer = j.getNode()->getValue();
+               peer->m_sendtime_accu += dtime;
+               peer->m_num_sent = 0;
+               peer->m_max_num_sent = peer->m_sendtime_accu *
+                               peer->m_max_packets_per_second;
+       }
+       Queue<OutgoingPacket> postponed_packets;
+       while(m_outgoing_queue.size() != 0){
+               OutgoingPacket packet = m_outgoing_queue.pop_front();
+               Peer *peer = getPeerNoEx(packet.peer_id);
+               if(!peer)
+                       continue;
+               if(peer->channels[packet.channelnum].outgoing_reliables.size() >= 5){
+                       postponed_packets.push_back(packet);
+               } else if(peer->m_num_sent < peer->m_max_num_sent){
+                       rawSendAsPacket(packet.peer_id, packet.channelnum,
+                                       packet.data, packet.reliable);
+                       peer->m_num_sent++;
+               } else {
+                       postponed_packets.push_back(packet);
+               }
+       }
+       while(postponed_packets.size() != 0){
+               m_outgoing_queue.push_back(postponed_packets.pop_front());
+       }
+       for(core::map<u16, Peer*>::Iterator
+                       j = m_peers.getIterator();
+                       j.atEnd() == false; j++)
+       {
+               Peer *peer = j.getNode()->getValue();
+               peer->m_sendtime_accu -= (float)peer->m_num_sent /
+                               peer->m_max_packets_per_second;
+               if(peer->m_sendtime_accu > 10. / peer->m_max_packets_per_second)
+                       peer->m_sendtime_accu = 10. / peer->m_max_packets_per_second;
+       }
+}
 
-       if(packetdata.getSize() < 1)
-               throw InvalidIncomingDataException("packetdata.getSize() < 1");
+// Receive packets from the network and buffers and create ConnectionEvents
+void Connection::receive()
+{
+       u32 datasize = 100000;
+       // TODO: We can not know how many layers of header there are.
+       // For now, just assume there are no other than the base headers.
+       u32 packet_maxsize = datasize + BASE_HEADER_SIZE;
+       Buffer<u8> packetdata(packet_maxsize);
 
-       u8 type = readU8(&packetdata[0]);
+       bool single_wait_done = false;
        
-       if(type == TYPE_CONTROL)
+       for(;;)
        {
-               if(packetdata.getSize() < 2)
-                       throw InvalidIncomingDataException("packetdata.getSize() < 2");
-
-               u8 controltype = readU8(&packetdata[1]);
-
-               if(controltype == CONTROLTYPE_ACK)
+       try{
+               /* Check if some buffer has relevant data */
                {
-                       if(packetdata.getSize() < 4)
-                               throw InvalidIncomingDataException
-                                               ("packetdata.getSize() < 4 (ACK header size)");
-
-                       u16 seqnum = readU16(&packetdata[2]);
-                       con->PrintInfo();
-                       dout_con<<"Got CONTROLTYPE_ACK: channelnum="
-                                       <<((int)channelnum&0xff)<<", peer_id="<<peer_id
-                                       <<", seqnum="<<seqnum<<std::endl;
-
-                       try{
-                               BufferedPacket p = outgoing_reliables.popSeqnum(seqnum);
-                               // Get round trip time
-                               float rtt = p.totaltime;
-
-                               // Let peer calculate stuff according to it
-                               // (avg_rtt and resend_timeout)
-                               Peer *peer = con->GetPeer(peer_id);
-                               peer->reportRTT(rtt);
-
-                               //con->PrintInfo(dout_con);
-                               //dout_con<<"RTT = "<<rtt<<std::endl;
-
-                               /*dout_con<<"OUTGOING: ";
-                               con->PrintInfo();
-                               outgoing_reliables.print();
-                               dout_con<<std::endl;*/
-                       }
-                       catch(NotFoundException &e){
-                               con->PrintInfo(derr_con);
-                               derr_con<<"WARNING: ACKed packet not "
-                                               "in outgoing queue"
-                                               <<std::endl;
+                       u16 peer_id;
+                       SharedBuffer<u8> resultdata;
+                       bool got = getFromBuffers(peer_id, resultdata);
+                       if(got){
+                               ConnectionEvent e;
+                               e.dataReceived(peer_id, resultdata);
+                               putEvent(e);
+                               continue;
                        }
+               }
+               
+               if(single_wait_done){
+                       if(m_socket.WaitData(0) == false)
+                               break;
+               }
+               
+               single_wait_done = true;
 
-                       throw ProcessedSilentlyException("Got an ACK");
+               Address sender;
+               s32 received_size = m_socket.Receive(sender, *packetdata, packet_maxsize);
+
+               if(received_size < 0)
+                       break;
+               if(received_size < BASE_HEADER_SIZE)
+                       continue;
+               if(readU32(&packetdata[0]) != m_protocol_id)
+                       continue;
+               
+               u16 peer_id = readPeerId(*packetdata);
+               u8 channelnum = readChannel(*packetdata);
+               if(channelnum > CHANNEL_COUNT-1){
+                       PrintInfo(derr_con);
+                       derr_con<<"Receive(): Invalid channel "<<channelnum<<std::endl;
+                       throw InvalidIncomingDataException("Channel doesn't exist");
                }
-               else if(controltype == CONTROLTYPE_SET_PEER_ID)
+
+               if(peer_id == PEER_ID_INEXISTENT)
                {
-                       if(packetdata.getSize() < 4)
-                               throw InvalidIncomingDataException
-                                               ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
-                       u16 peer_id_new = readU16(&packetdata[2]);
-                       con->PrintInfo();
-                       dout_con<<"Got new peer id: "<<peer_id_new<<"... "<<std::endl;
+                       /*
+                               Somebody is trying to send stuff to us with no peer id.
+                               
+                               Check if the same address and port was added to our peer
+                               list before.
+                               Allow only entries that have has_sent_with_id==false.
+                       */
 
-                       if(con->GetPeerID() != PEER_ID_INEXISTENT)
+                       core::map<u16, Peer*>::Iterator j;
+                       j = m_peers.getIterator();
+                       for(; j.atEnd() == false; j++)
                        {
-                               con->PrintInfo(derr_con);
-                               derr_con<<"WARNING: Not changing"
-                                               " existing peer id."<<std::endl;
+                               Peer *peer = j.getNode()->getValue();
+                               if(peer->has_sent_with_id)
+                                       continue;
+                               if(peer->address == sender)
+                                       break;
                        }
+                       
+                       /*
+                               If no peer was found with the same address and port,
+                               we shall assume it is a new peer and create an entry.
+                       */
+                       if(j.atEnd())
+                       {
+                               // Pass on to adding the peer
+                       }
+                       // Else: A peer was found.
                        else
                        {
-                               dout_con<<"changing."<<std::endl;
-                               con->SetPeerID(peer_id_new);
+                               Peer *peer = j.getNode()->getValue();
+                               peer_id = peer->id;
+                               PrintInfo(derr_con);
+                               derr_con<<"WARNING: Assuming unknown peer to be "
+                                               <<"peer_id="<<peer_id<<std::endl;
                        }
-                       throw ProcessedSilentlyException("Got a SET_PEER_ID");
-               }
-               else if(controltype == CONTROLTYPE_PING)
-               {
-                       // Just ignore it, the incoming data already reset
-                       // the timeout counter
-                       con->PrintInfo();
-                       dout_con<<"PING"<<std::endl;
-                       throw ProcessedSilentlyException("Got a PING");
                }
-               else if(controltype == CONTROLTYPE_DISCO)
+               
+               /*
+                       The peer was not found in our lists. Add it.
+               */
+               if(peer_id == PEER_ID_INEXISTENT)
                {
-                       // Just ignore it, the incoming data already reset
-                       // the timeout counter
-                       con->PrintInfo();
-                       dout_con<<"DISCO: Removing peer "<<(peer_id)<<std::endl;
-                       
-                       if(con->deletePeer(peer_id, false) == false)
+                       // Somebody wants to make a new connection
+
+                       // Get a unique peer id (2 or higher)
+                       u16 peer_id_new = 2;
+                       /*
+                               Find an unused peer id
+                       */
+                       bool out_of_ids = false;
+                       for(;;)
                        {
-                               con->PrintInfo(derr_con);
-                               derr_con<<"DISCO: Peer not found"<<std::endl;
+                               // Check if exists
+                               if(m_peers.find(peer_id_new) == NULL)
+                                       break;
+                               // Check for overflow
+                               if(peer_id_new == 65535){
+                                       out_of_ids = true;
+                                       break;
+                               }
+                               peer_id_new++;
+                       }
+                       if(out_of_ids){
+                               errorstream<<getDesc()<<" ran out of peer ids"<<std::endl;
+                               continue;
                        }
 
-                       throw ProcessedSilentlyException("Got a DISCO");
-               }
-               else{
-                       con->PrintInfo(derr_con);
-                       derr_con<<"INVALID TYPE_CONTROL: invalid controltype="
-                                       <<((int)controltype&0xff)<<std::endl;
-                       throw InvalidIncomingDataException("Invalid control type");
+                       PrintInfo();
+                       dout_con<<"Receive(): Got a packet with peer_id=PEER_ID_INEXISTENT,"
+                                       " giving peer_id="<<peer_id_new<<std::endl;
+
+                       // Create a peer
+                       Peer *peer = new Peer(peer_id_new, sender);
+                       m_peers.insert(peer->id, peer);
+                       
+                       // Create peer addition event
+                       ConnectionEvent e;
+                       e.peerAdded(peer_id_new, sender);
+                       putEvent(e);
+                       
+                       // Create CONTROL packet to tell the peer id to the new peer.
+                       SharedBuffer<u8> reply(4);
+                       writeU8(&reply[0], TYPE_CONTROL);
+                       writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
+                       writeU16(&reply[2], peer_id_new);
+                       sendAsPacket(peer_id_new, 0, reply, true);
+                       
+                       // We're now talking to a valid peer_id
+                       peer_id = peer_id_new;
+
+                       // Go on and process whatever it sent
                }
-       }
-       else if(type == TYPE_ORIGINAL)
-       {
-               if(packetdata.getSize() < ORIGINAL_HEADER_SIZE)
-                       throw InvalidIncomingDataException
-                                       ("packetdata.getSize() < ORIGINAL_HEADER_SIZE");
-               con->PrintInfo();
-               dout_con<<"RETURNING TYPE_ORIGINAL to user"
-                               <<std::endl;
-               // Get the inside packet out and return it
-               SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
-               memcpy(*payload, &packetdata[ORIGINAL_HEADER_SIZE], payload.getSize());
-               return payload;
-       }
-       else if(type == TYPE_SPLIT)
-       {
-               // We have to create a packet again for buffering
-               // This isn't actually too bad an idea.
-               BufferedPacket packet = makePacket(
-                               con->GetPeer(peer_id)->address,
-                               packetdata,
-                               con->GetProtocolID(),
-                               peer_id,
-                               channelnum);
-               // Buffer the packet
-               SharedBuffer<u8> data = incoming_splits.insert(packet, reliable);
-               if(data.getSize() != 0)
-               {
-                       con->PrintInfo();
-                       dout_con<<"RETURNING TYPE_SPLIT: Constructed full data, "
-                                       <<"size="<<data.getSize()<<std::endl;
-                       return data;
-               }
-               con->PrintInfo();
-               dout_con<<"BUFFERED TYPE_SPLIT"<<std::endl;
-               throw ProcessedSilentlyException("Buffered a split packet chunk");
-       }
-       else if(type == TYPE_RELIABLE)
-       {
-               // Recursive reliable packets not allowed
-               assert(reliable == false);
-
-               if(packetdata.getSize() < RELIABLE_HEADER_SIZE)
-                       throw InvalidIncomingDataException
-                                       ("packetdata.getSize() < RELIABLE_HEADER_SIZE");
-
-               u16 seqnum = readU16(&packetdata[1]);
-
-               bool is_future_packet = seqnum_higher(seqnum, next_incoming_seqnum);
-               bool is_old_packet = seqnum_higher(next_incoming_seqnum, seqnum);
-               
-               con->PrintInfo();
-               if(is_future_packet)
-                       dout_con<<"BUFFERING";
-               else if(is_old_packet)
-                       dout_con<<"OLD";
-               else
-                       dout_con<<"RECUR";
-               dout_con<<" TYPE_RELIABLE seqnum="<<seqnum
-                               <<" next="<<next_incoming_seqnum;
-               dout_con<<" [sending CONTROLTYPE_ACK"
-                               " to peer_id="<<peer_id<<"]";
-               dout_con<<std::endl;
-               
-               //DEBUG
-               //assert(incoming_reliables.size() < 100);
-
-               // Send a CONTROLTYPE_ACK
-               SharedBuffer<u8> reply(4);
-               writeU8(&reply[0], TYPE_CONTROL);
-               writeU8(&reply[1], CONTROLTYPE_ACK);
-               writeU16(&reply[2], seqnum);
-               con->SendAsPacket(peer_id, channelnum, reply, false);
 
-               //if(seqnum_higher(seqnum, next_incoming_seqnum))
-               if(is_future_packet)
-               {
-                       /*con->PrintInfo();
-                       dout_con<<"Buffering reliable packet (seqnum="
-                                       <<seqnum<<")"<<std::endl;*/
-                       
-                       // This one comes later, buffer it.
-                       // Actually we have to make a packet to buffer one.
-                       // Well, we have all the ingredients, so just do it.
-                       BufferedPacket packet = makePacket(
-                                       con->GetPeer(peer_id)->address,
-                                       packetdata,
-                                       con->GetProtocolID(),
-                                       peer_id,
-                                       channelnum);
-                       try{
-                               incoming_reliables.insert(packet);
-                               
-                               /*con->PrintInfo();
-                               dout_con<<"INCOMING: ";
-                               incoming_reliables.print();
-                               dout_con<<std::endl;*/
-                       }
-                       catch(AlreadyExistsException &e)
-                       {
-                       }
+               core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
 
-                       throw ProcessedSilentlyException("Buffered future reliable packet");
-               }
-               //else if(seqnum_higher(next_incoming_seqnum, seqnum))
-               else if(is_old_packet)
+               if(node == NULL)
                {
-                       // An old packet, dump it
-                       throw InvalidIncomingDataException("Got an old reliable packet");
+                       // Peer not found
+                       // This means that the peer id of the sender is not PEER_ID_INEXISTENT
+                       // and it is invalid.
+                       PrintInfo(derr_con);
+                       derr_con<<"Receive(): Peer not found"<<std::endl;
+                       throw InvalidIncomingDataException("Peer not found (possible timeout)");
                }
 
-               next_incoming_seqnum++;
-
-               // Get out the inside packet and re-process it
-               SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
-               memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
-
-               return ProcessPacket(payload, con, peer_id, channelnum, true);
-       }
-       else
-       {
-               con->PrintInfo(derr_con);
-               derr_con<<"Got invalid type="<<((int)type&0xff)<<std::endl;
-               throw InvalidIncomingDataException("Invalid packet type");
-       }
-       
-       // We should never get here.
-       // If you get here, add an exception or a return to some of the
-       // above conditionals.
-       assert(0);
-       throw BaseException("Error in Channel::ProcessPacket()");
-}
+               Peer *peer = node->getValue();
 
-SharedBuffer<u8> Channel::CheckIncomingBuffers(Connection *con,
-               u16 &peer_id)
-{
-       u16 firstseqnum = 0;
-       // Clear old packets from start of buffer
-       try{
-       for(;;){
-               firstseqnum = incoming_reliables.getFirstSeqnum();
-               if(seqnum_higher(next_incoming_seqnum, firstseqnum))
-                       incoming_reliables.popFirst();
-               else
-                       break;
-       }
-       // This happens if all packets are old
-       }catch(con::NotFoundException)
-       {}
-       
-       if(incoming_reliables.empty() == false)
-       {
-               if(firstseqnum == next_incoming_seqnum)
+               // Validate peer address
+               if(peer->address != sender)
                {
-                       BufferedPacket p = incoming_reliables.popFirst();
-                       
-                       peer_id = readPeerId(*p.data);
-                       u8 channelnum = readChannel(*p.data);
-                       u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
+                       PrintInfo(derr_con);
+                       derr_con<<"Peer "<<peer_id<<" sending from different address."
+                                       " Ignoring."<<std::endl;
+                       continue;
+               }
+               
+               peer->timeout_counter = 0.0;
 
-                       con->PrintInfo();
-                       dout_con<<"UNBUFFERING TYPE_RELIABLE"
-                                       <<" seqnum="<<seqnum
-                                       <<" peer_id="<<peer_id
-                                       <<" channel="<<((int)channelnum&0xff)
-                                       <<std::endl;
+               Channel *channel = &(peer->channels[channelnum]);
+               
+               // Throw the received packet to channel->processPacket()
 
-                       next_incoming_seqnum++;
+               // Make a new SharedBuffer from the data without the base headers
+               SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
+               memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
+                               strippeddata.getSize());
+               
+               try{
+                       // Process it (the result is some data with no headers made by us)
+                       SharedBuffer<u8> resultdata = processPacket
+                                       (channel, strippeddata, peer_id, channelnum, false);
                        
-                       u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
-                       // Get out the inside packet and re-process it
-                       SharedBuffer<u8> payload(p.data.getSize() - headers_size);
-                       memcpy(*payload, &p.data[headers_size], payload.getSize());
-
-                       return ProcessPacket(payload, con, peer_id, channelnum, true);
+                       PrintInfo();
+                       dout_con<<"ProcessPacket returned data of size "
+                                       <<resultdata.getSize()<<std::endl;
+                       
+                       if(datasize < resultdata.getSize())
+                               throw InvalidIncomingDataException
+                                               ("Buffer too small for received data");
+                       
+                       ConnectionEvent e;
+                       e.dataReceived(peer_id, resultdata);
+                       putEvent(e);
+                       continue;
+               }catch(ProcessedSilentlyException &e){
                }
+       }catch(InvalidIncomingDataException &e){
        }
-               
-       throw NoIncomingDataException("No relevant data in buffers");
+       catch(ProcessedSilentlyException &e){
+       }
+       } // for
 }
 
-SharedBuffer<u8> Connection::GetFromBuffers(u16 &peer_id)
+void Connection::runTimeouts(float dtime)
 {
+       core::list<u16> timeouted_peers;
        core::map<u16, Peer*>::Iterator j;
        j = m_peers.getIterator();
        for(; j.atEnd() == false; j++)
        {
                Peer *peer = j.getNode()->getValue();
+               
+               /*
+                       Check peer timeout
+               */
+               peer->timeout_counter += dtime;
+               if(peer->timeout_counter > m_timeout)
+               {
+                       PrintInfo(derr_con);
+                       derr_con<<"RunTimeouts(): Peer "<<peer->id
+                                       <<" has timed out."
+                                       <<" (source=peer->timeout_counter)"
+                                       <<std::endl;
+                       // Add peer to the list
+                       timeouted_peers.push_back(peer->id);
+                       // Don't bother going through the buffers of this one
+                       continue;
+               }
+
+               float resend_timeout = peer->resend_timeout;
                for(u16 i=0; i<CHANNEL_COUNT; i++)
                {
+                       core::list<BufferedPacket> timed_outs;
+                       core::list<BufferedPacket>::Iterator j;
+                       
                        Channel *channel = &peer->channels[i];
-                       try{
-                               SharedBuffer<u8> resultdata = channel->CheckIncomingBuffers
-                                               (this, peer_id);
 
-                               return resultdata;
-                       }
-                       catch(NoIncomingDataException &e)
-                       {
-                       }
-                       catch(InvalidIncomingDataException &e)
+                       // Remove timed out incomplete unreliable split packets
+                       channel->incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
+                       
+                       // Increment reliable packet times
+                       channel->outgoing_reliables.incrementTimeouts(dtime);
+
+                       // Check reliable packet total times, remove peer if
+                       // over timeout.
+                       if(channel->outgoing_reliables.anyTotaltimeReached(m_timeout))
                        {
+                               PrintInfo(derr_con);
+                               derr_con<<"RunTimeouts(): Peer "<<peer->id
+                                               <<" has timed out."
+                                               <<" (source=reliable packet totaltime)"
+                                               <<std::endl;
+                               // Add peer to the to-be-removed list
+                               timeouted_peers.push_back(peer->id);
+                               goto nextpeer;
                        }
-                       catch(ProcessedSilentlyException &e)
+
+                       // Re-send timed out outgoing reliables
+                       
+                       timed_outs = channel->
+                                       outgoing_reliables.getTimedOuts(resend_timeout);
+
+                       channel->outgoing_reliables.resetTimedOuts(resend_timeout);
+
+                       j = timed_outs.begin();
+                       for(; j != timed_outs.end(); j++)
                        {
+                               u16 peer_id = readPeerId(*(j->data));
+                               u8 channel = readChannel(*(j->data));
+                               u16 seqnum = readU16(&(j->data[BASE_HEADER_SIZE+1]));
+
+                               PrintInfo(derr_con);
+                               derr_con<<"RE-SENDING timed-out RELIABLE to ";
+                               j->address.print(&derr_con);
+                               derr_con<<"(t/o="<<resend_timeout<<"): "
+                                               <<"from_peer_id="<<peer_id
+                                               <<", channel="<<((int)channel&0xff)
+                                               <<", seqnum="<<seqnum
+                                               <<std::endl;
+
+                               rawSend(*j);
+
+                               // Enlarge avg_rtt and resend_timeout:
+                               // The rtt will be at least the timeout.
+                               // NOTE: This won't affect the timeout of the next
+                               // checked channel because it was cached.
+                               peer->reportRTT(resend_timeout);
                        }
                }
-       }
-       throw NoIncomingDataException("No relevant data in buffers");
-}
-
-u32 Connection::Receive(u16 &peer_id, u8 *data, u32 datasize)
-{
-       /*
-               Receive a packet from the network
-       */
-       
-       // TODO: We can not know how many layers of header there are.
-       // For now, just assume there are no other than the base headers.
-       u32 packet_maxsize = datasize + BASE_HEADER_SIZE;
-       Buffer<u8> packetdata(packet_maxsize);
-       
-       for(;;)
-       {
-       try
-       {
+               
                /*
-                       Check if some buffer has relevant data
+                       Send pings
                */
-               try{
-                       SharedBuffer<u8> resultdata = GetFromBuffers(peer_id);
-
-                       if(datasize < resultdata.getSize())
-                               throw InvalidIncomingDataException
-                                               ("Buffer too small for received data");
-                               
-                       memcpy(data, *resultdata, resultdata.getSize());
-                       return resultdata.getSize();
-               }
-               catch(NoIncomingDataException &e)
+               peer->ping_timer += dtime;
+               if(peer->ping_timer >= 5.0)
                {
+                       // Create and send PING packet
+                       SharedBuffer<u8> data(2);
+                       writeU8(&data[0], TYPE_CONTROL);
+                       writeU8(&data[1], CONTROLTYPE_PING);
+                       rawSendAsPacket(peer->id, 0, data, true);
+
+                       peer->ping_timer = 0.0;
                }
-       
-               Address sender;
+               
+nextpeer:
+               continue;
+       }
 
-               s32 received_size = m_socket.Receive(sender, *packetdata, packet_maxsize);
+       // Remove timed out peers
+       core::list<u16>::Iterator i = timeouted_peers.begin();
+       for(; i != timeouted_peers.end(); i++)
+       {
+               PrintInfo(derr_con);
+               derr_con<<"RunTimeouts(): Removing peer "<<(*i)<<std::endl;
+               deletePeer(*i, true);
+       }
+}
 
-               if(received_size < 0)
-                       throw NoIncomingDataException("No incoming data");
-               if(received_size < BASE_HEADER_SIZE)
-                       throw InvalidIncomingDataException("No full header received");
-               if(readU32(&packetdata[0]) != m_protocol_id)
-                       throw InvalidIncomingDataException("Invalid protocol id");
-               
-               peer_id = readPeerId(*packetdata);
-               u8 channelnum = readChannel(*packetdata);
-               if(channelnum > CHANNEL_COUNT-1){
-                       PrintInfo(derr_con);
-                       derr_con<<"Receive(): Invalid channel "<<channelnum<<std::endl;
-                       throw InvalidIncomingDataException("Channel doesn't exist");
-               }
+void Connection::serve(u16 port)
+{
+       dout_con<<getDesc()<<" serving at port "<<port<<std::endl;
+       m_socket.Bind(port);
+       m_peer_id = PEER_ID_SERVER;
+}
 
-               if(peer_id == PEER_ID_INEXISTENT)
-               {
-                       /*
-                               Somebody is trying to send stuff to us with no peer id.
-                               
-                               Check if the same address and port was added to our peer
-                               list before.
-                               Allow only entries that have has_sent_with_id==false.
-                       */
+void Connection::connect(Address address)
+{
+       dout_con<<getDesc()<<" connecting to "<<address.serializeString()
+                       <<":"<<address.getPort()<<std::endl;
 
-                       core::map<u16, Peer*>::Iterator j;
-                       j = m_peers.getIterator();
-                       for(; j.atEnd() == false; j++)
-                       {
-                               Peer *peer = j.getNode()->getValue();
-                               if(peer->has_sent_with_id)
-                                       continue;
-                               if(peer->address == sender)
-                                       break;
-                       }
-                       
-                       /*
-                               If no peer was found with the same address and port,
-                               we shall assume it is a new peer and create an entry.
-                       */
-                       if(j.atEnd())
-                       {
-                               // Pass on to adding the peer
-                       }
-                       // Else: A peer was found.
-                       else
-                       {
-                               Peer *peer = j.getNode()->getValue();
-                               peer_id = peer->id;
-                               PrintInfo(derr_con);
-                               derr_con<<"WARNING: Assuming unknown peer to be "
-                                               <<"peer_id="<<peer_id<<std::endl;
-                       }
-               }
-               
-               /*
-                       The peer was not found in our lists. Add it.
-               */
-               if(peer_id == PEER_ID_INEXISTENT)
-               {
-                       // Somebody wants to make a new connection
+       core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
+       if(node != NULL){
+               throw ConnectionException("Already connected to a server");
+       }
 
-                       // Get a unique peer id (2 or higher)
-                       u16 peer_id_new = 2;
-                       /*
-                               Find an unused peer id
-                       */
-                       for(;;)
-                       {
-                               // Check if exists
-                               if(m_peers.find(peer_id_new) == NULL)
-                                       break;
-                               // Check for overflow
-                               if(peer_id_new == 65535)
-                                       throw ConnectionException
-                                               ("Connection ran out of peer ids");
-                               peer_id_new++;
-                       }
+       Peer *peer = new Peer(PEER_ID_SERVER, address);
+       m_peers.insert(peer->id, peer);
 
-                       PrintInfo();
-                       dout_con<<"Receive(): Got a packet with peer_id=PEER_ID_INEXISTENT,"
-                                       " giving peer_id="<<peer_id_new<<std::endl;
+       // Create event
+       ConnectionEvent e;
+       e.peerAdded(peer->id, peer->address);
+       putEvent(e);
+       
+       m_socket.Bind(0);
+       
+       // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
+       m_peer_id = PEER_ID_INEXISTENT;
+       SharedBuffer<u8> data(0);
+       Send(PEER_ID_SERVER, 0, data, true);
+}
 
-                       // Create a peer
-                       Peer *peer = new Peer(peer_id_new, sender);
-                       m_peers.insert(peer->id, peer);
-                       m_peerhandler->peerAdded(peer);
-                       
-                       // Create CONTROL packet to tell the peer id to the new peer.
-                       SharedBuffer<u8> reply(4);
-                       writeU8(&reply[0], TYPE_CONTROL);
-                       writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
-                       writeU16(&reply[2], peer_id_new);
-                       SendAsPacket(peer_id_new, 0, reply, true);
-                       
-                       // We're now talking to a valid peer_id
-                       peer_id = peer_id_new;
+void Connection::disconnect()
+{
+       dout_con<<getDesc()<<" disconnecting"<<std::endl;
 
-                       // Go on and process whatever it sent
-               }
+       // Create and send DISCO packet
+       SharedBuffer<u8> data(2);
+       writeU8(&data[0], TYPE_CONTROL);
+       writeU8(&data[1], CONTROLTYPE_DISCO);
+       
+       // Send to all
+       core::map<u16, Peer*>::Iterator j;
+       j = m_peers.getIterator();
+       for(; j.atEnd() == false; j++)
+       {
+               Peer *peer = j.getNode()->getValue();
+               rawSendAsPacket(peer->id, 0, data, false);
+       }
+}
 
-               core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
+void Connection::sendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
+{
+       core::map<u16, Peer*>::Iterator j;
+       j = m_peers.getIterator();
+       for(; j.atEnd() == false; j++)
+       {
+               Peer *peer = j.getNode()->getValue();
+               send(peer->id, channelnum, data, reliable);
+       }
+}
 
-               if(node == NULL)
-               {
-                       // Peer not found
-                       // This means that the peer id of the sender is not PEER_ID_INEXISTENT
-                       // and it is invalid.
-                       PrintInfo(derr_con);
-                       derr_con<<"Receive(): Peer not found"<<std::endl;
-                       throw InvalidIncomingDataException("Peer not found (possible timeout)");
-               }
+void Connection::send(u16 peer_id, u8 channelnum,
+               SharedBuffer<u8> data, bool reliable)
+{
+       dout_con<<getDesc()<<" sending to peer_id="<<peer_id<<std::endl;
 
-               Peer *peer = node->getValue();
+       assert(channelnum < CHANNEL_COUNT);
+       
+       Peer *peer = getPeerNoEx(peer_id);
+       if(peer == NULL)
+               return;
+       Channel *channel = &(peer->channels[channelnum]);
 
-               // Validate peer address
-               if(peer->address != sender)
-               {
-                       PrintInfo(derr_con);
-                       derr_con<<"Peer "<<peer_id<<" sending from different address."
-                                       " Ignoring."<<std::endl;
-                       throw InvalidIncomingDataException
-                                       ("Peer sending from different address");
-                       /*// If there is more data, receive again
-                       if(m_socket.WaitData(0) == true)
-                               continue;
-                       throw NoIncomingDataException("No incoming data (2)");*/
-               }
-               
-               peer->timeout_counter = 0.0;
+       u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
+       if(reliable)
+               chunksize_max -= RELIABLE_HEADER_SIZE;
 
-               Channel *channel = &(peer->channels[channelnum]);
+       core::list<SharedBuffer<u8> > originals;
+       originals = makeAutoSplitPacket(data, chunksize_max,
+                       channel->next_outgoing_split_seqnum);
+       
+       core::list<SharedBuffer<u8> >::Iterator i;
+       i = originals.begin();
+       for(; i != originals.end(); i++)
+       {
+               SharedBuffer<u8> original = *i;
                
-               // Throw the received packet to channel->processPacket()
+               sendAsPacket(peer_id, channelnum, original, reliable);
+       }
+}
 
-               // Make a new SharedBuffer from the data without the base headers
-               SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
-               memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
-                               strippeddata.getSize());
+void Connection::sendAsPacket(u16 peer_id, u8 channelnum,
+               SharedBuffer<u8> data, bool reliable)
+{
+       OutgoingPacket packet(peer_id, channelnum, data, reliable);
+       m_outgoing_queue.push_back(packet);
+}
+
+void Connection::rawSendAsPacket(u16 peer_id, u8 channelnum,
+               SharedBuffer<u8> data, bool reliable)
+{
+       Peer *peer = getPeerNoEx(peer_id);
+       if(!peer)
+               return;
+       Channel *channel = &(peer->channels[channelnum]);
+
+       if(reliable)
+       {
+               u16 seqnum = channel->next_outgoing_seqnum;
+               channel->next_outgoing_seqnum++;
+
+               SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
+
+               // Add base headers and make a packet
+               BufferedPacket p = makePacket(peer->address, reliable,
+                               m_protocol_id, m_peer_id, channelnum);
                
                try{
-                       // Process it (the result is some data with no headers made by us)
-                       SharedBuffer<u8> resultdata = channel->ProcessPacket
-                                       (strippeddata, this, peer_id, channelnum);
-                       
-                       PrintInfo();
-                       dout_con<<"ProcessPacket returned data of size "
-                                       <<resultdata.getSize()<<std::endl;
-                       
-                       if(datasize < resultdata.getSize())
-                               throw InvalidIncomingDataException
-                                               ("Buffer too small for received data");
-                       
-                       memcpy(data, *resultdata, resultdata.getSize());
-                       return resultdata.getSize();
+                       // Buffer the packet
+                       channel->outgoing_reliables.insert(p);
                }
-               catch(ProcessedSilentlyException &e)
+               catch(AlreadyExistsException &e)
                {
-                       // If there is more data, receive again
-                       if(m_socket.WaitData(0) == true)
-                               continue;
+                       PrintInfo(derr_con);
+                       derr_con<<"WARNING: Going to send a reliable packet "
+                                       "seqnum="<<seqnum<<" that is already "
+                                       "in outgoing buffer"<<std::endl;
+                       //assert(0);
                }
-               throw NoIncomingDataException("No incoming data (2)");
-       } // try
-       catch(InvalidIncomingDataException &e)
-       {
-               // If there is more data, receive again
-               if(m_socket.WaitData(0) == true)
-                       continue;
+               
+               // Send the packet
+               rawSend(p);
        }
-       catch(SendFailedException &e)
+       else
        {
+               // Add base headers and make a packet
+               BufferedPacket p = makePacket(peer->address, data,
+                               m_protocol_id, m_peer_id, channelnum);
+
+               // Send the packet
+               rawSend(p);
        }
-       } // for
 }
 
-void Connection::SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
+void Connection::rawSend(const BufferedPacket &packet)
+{
+       try{
+               m_socket.Send(packet.address, *packet.data, packet.data.getSize());
+       } catch(SendFailedException &e){
+               derr_con<<"Connection::rawSend(): SendFailedException: "
+                               <<packet.address.serializeString()<<std::endl;
+       }
+}
+
+Peer* Connection::getPeer(u16 peer_id)
+{
+       core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
+
+       if(node == NULL){
+               throw PeerNotFoundException("GetPeer: Peer not found (possible timeout)");
+       }
+
+       // Error checking
+       assert(node->getValue()->id == peer_id);
+
+       return node->getValue();
+}
+
+Peer* Connection::getPeerNoEx(u16 peer_id)
+{
+       core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
+
+       if(node == NULL){
+               return NULL;
+       }
+
+       // Error checking
+       assert(node->getValue()->id == peer_id);
+
+       return node->getValue();
+}
+
+core::list<Peer*> Connection::getPeers()
 {
+       core::list<Peer*> list;
        core::map<u16, Peer*>::Iterator j;
        j = m_peers.getIterator();
        for(; j.atEnd() == false; j++)
        {
                Peer *peer = j.getNode()->getValue();
-               Send(peer->id, channelnum, data, reliable);
+               list.push_back(peer);
+       }
+       return list;
+}
+
+bool Connection::getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst)
+{
+       core::map<u16, Peer*>::Iterator j;
+       j = m_peers.getIterator();
+       for(; j.atEnd() == false; j++)
+       {
+               Peer *peer = j.getNode()->getValue();
+               for(u16 i=0; i<CHANNEL_COUNT; i++)
+               {
+                       Channel *channel = &peer->channels[i];
+                       SharedBuffer<u8> resultdata;
+                       bool got = checkIncomingBuffers(channel, peer_id, resultdata);
+                       if(got){
+                               dst = resultdata;
+                               return true;
+                       }
+               }
+       }
+       return false;
+}
+
+bool Connection::checkIncomingBuffers(Channel *channel, u16 &peer_id,
+               SharedBuffer<u8> &dst)
+{
+       u16 firstseqnum = 0;
+       // Clear old packets from start of buffer
+       try{
+       for(;;){
+               firstseqnum = channel->incoming_reliables.getFirstSeqnum();
+               if(seqnum_higher(channel->next_incoming_seqnum, firstseqnum))
+                       channel->incoming_reliables.popFirst();
+               else
+                       break;
+       }
+       // This happens if all packets are old
+       }catch(con::NotFoundException)
+       {}
+       
+       if(channel->incoming_reliables.empty() == false)
+       {
+               if(firstseqnum == channel->next_incoming_seqnum)
+               {
+                       BufferedPacket p = channel->incoming_reliables.popFirst();
+                       
+                       peer_id = readPeerId(*p.data);
+                       u8 channelnum = readChannel(*p.data);
+                       u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
+
+                       PrintInfo();
+                       dout_con<<"UNBUFFERING TYPE_RELIABLE"
+                                       <<" seqnum="<<seqnum
+                                       <<" peer_id="<<peer_id
+                                       <<" channel="<<((int)channelnum&0xff)
+                                       <<std::endl;
+
+                       channel->next_incoming_seqnum++;
+                       
+                       u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
+                       // Get out the inside packet and re-process it
+                       SharedBuffer<u8> payload(p.data.getSize() - headers_size);
+                       memcpy(*payload, &p.data[headers_size], payload.getSize());
+
+                       dst = processPacket(channel, payload, peer_id, channelnum, true);
+                       return true;
+               }
        }
+       return false;
 }
 
-void Connection::Send(u16 peer_id, u8 channelnum,
-               SharedBuffer<u8> data, bool reliable)
-{
-       assert(channelnum < CHANNEL_COUNT);
-       
-       Peer *peer = GetPeerNoEx(peer_id);
-       if(peer == NULL)
-               return;
-       Channel *channel = &(peer->channels[channelnum]);
+SharedBuffer<u8> Connection::processPacket(Channel *channel,
+               SharedBuffer<u8> packetdata, u16 peer_id,
+               u8 channelnum, bool reliable)
+{
+       IndentationRaiser iraiser(&(m_indentation));
+
+       if(packetdata.getSize() < 1)
+               throw InvalidIncomingDataException("packetdata.getSize() < 1");
+
+       u8 type = readU8(&packetdata[0]);
+       
+       if(type == TYPE_CONTROL)
+       {
+               if(packetdata.getSize() < 2)
+                       throw InvalidIncomingDataException("packetdata.getSize() < 2");
+
+               u8 controltype = readU8(&packetdata[1]);
+
+               if(controltype == CONTROLTYPE_ACK)
+               {
+                       if(packetdata.getSize() < 4)
+                               throw InvalidIncomingDataException
+                                               ("packetdata.getSize() < 4 (ACK header size)");
+
+                       u16 seqnum = readU16(&packetdata[2]);
+                       PrintInfo();
+                       dout_con<<"Got CONTROLTYPE_ACK: channelnum="
+                                       <<((int)channelnum&0xff)<<", peer_id="<<peer_id
+                                       <<", seqnum="<<seqnum<<std::endl;
+
+                       try{
+                               BufferedPacket p = channel->outgoing_reliables.popSeqnum(seqnum);
+                               // Get round trip time
+                               float rtt = p.totaltime;
+
+                               // Let peer calculate stuff according to it
+                               // (avg_rtt and resend_timeout)
+                               Peer *peer = getPeer(peer_id);
+                               peer->reportRTT(rtt);
+
+                               //PrintInfo(dout_con);
+                               //dout_con<<"RTT = "<<rtt<<std::endl;
+
+                               /*dout_con<<"OUTGOING: ";
+                               PrintInfo();
+                               channel->outgoing_reliables.print();
+                               dout_con<<std::endl;*/
+                       }
+                       catch(NotFoundException &e){
+                               PrintInfo(derr_con);
+                               derr_con<<"WARNING: ACKed packet not "
+                                               "in outgoing queue"
+                                               <<std::endl;
+                       }
+
+                       throw ProcessedSilentlyException("Got an ACK");
+               }
+               else if(controltype == CONTROLTYPE_SET_PEER_ID)
+               {
+                       if(packetdata.getSize() < 4)
+                               throw InvalidIncomingDataException
+                                               ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
+                       u16 peer_id_new = readU16(&packetdata[2]);
+                       PrintInfo();
+                       dout_con<<"Got new peer id: "<<peer_id_new<<"... "<<std::endl;
+
+                       if(GetPeerID() != PEER_ID_INEXISTENT)
+                       {
+                               PrintInfo(derr_con);
+                               derr_con<<"WARNING: Not changing"
+                                               " existing peer id."<<std::endl;
+                       }
+                       else
+                       {
+                               dout_con<<"changing."<<std::endl;
+                               SetPeerID(peer_id_new);
+                       }
+                       throw ProcessedSilentlyException("Got a SET_PEER_ID");
+               }
+               else if(controltype == CONTROLTYPE_PING)
+               {
+                       // Just ignore it, the incoming data already reset
+                       // the timeout counter
+                       PrintInfo();
+                       dout_con<<"PING"<<std::endl;
+                       throw ProcessedSilentlyException("Got a PING");
+               }
+               else if(controltype == CONTROLTYPE_DISCO)
+               {
+                       // Just ignore it, the incoming data already reset
+                       // the timeout counter
+                       PrintInfo();
+                       dout_con<<"DISCO: Removing peer "<<(peer_id)<<std::endl;
+                       
+                       if(deletePeer(peer_id, false) == false)
+                       {
+                               PrintInfo(derr_con);
+                               derr_con<<"DISCO: Peer not found"<<std::endl;
+                       }
+
+                       throw ProcessedSilentlyException("Got a DISCO");
+               }
+               else{
+                       PrintInfo(derr_con);
+                       derr_con<<"INVALID TYPE_CONTROL: invalid controltype="
+                                       <<((int)controltype&0xff)<<std::endl;
+                       throw InvalidIncomingDataException("Invalid control type");
+               }
+       }
+       else if(type == TYPE_ORIGINAL)
+       {
+               if(packetdata.getSize() < ORIGINAL_HEADER_SIZE)
+                       throw InvalidIncomingDataException
+                                       ("packetdata.getSize() < ORIGINAL_HEADER_SIZE");
+               PrintInfo();
+               dout_con<<"RETURNING TYPE_ORIGINAL to user"
+                               <<std::endl;
+               // Get the inside packet out and return it
+               SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
+               memcpy(*payload, &packetdata[ORIGINAL_HEADER_SIZE], payload.getSize());
+               return payload;
+       }
+       else if(type == TYPE_SPLIT)
+       {
+               // We have to create a packet again for buffering
+               // This isn't actually too bad an idea.
+               BufferedPacket packet = makePacket(
+                               getPeer(peer_id)->address,
+                               packetdata,
+                               GetProtocolID(),
+                               peer_id,
+                               channelnum);
+               // Buffer the packet
+               SharedBuffer<u8> data = channel->incoming_splits.insert(packet, reliable);
+               if(data.getSize() != 0)
+               {
+                       PrintInfo();
+                       dout_con<<"RETURNING TYPE_SPLIT: Constructed full data, "
+                                       <<"size="<<data.getSize()<<std::endl;
+                       return data;
+               }
+               PrintInfo();
+               dout_con<<"BUFFERED TYPE_SPLIT"<<std::endl;
+               throw ProcessedSilentlyException("Buffered a split packet chunk");
+       }
+       else if(type == TYPE_RELIABLE)
+       {
+               // Recursive reliable packets not allowed
+               assert(reliable == false);
+
+               if(packetdata.getSize() < RELIABLE_HEADER_SIZE)
+                       throw InvalidIncomingDataException
+                                       ("packetdata.getSize() < RELIABLE_HEADER_SIZE");
+
+               u16 seqnum = readU16(&packetdata[1]);
+
+               bool is_future_packet = seqnum_higher(seqnum, channel->next_incoming_seqnum);
+               bool is_old_packet = seqnum_higher(channel->next_incoming_seqnum, seqnum);
+               
+               PrintInfo();
+               if(is_future_packet)
+                       dout_con<<"BUFFERING";
+               else if(is_old_packet)
+                       dout_con<<"OLD";
+               else
+                       dout_con<<"RECUR";
+               dout_con<<" TYPE_RELIABLE seqnum="<<seqnum
+                               <<" next="<<channel->next_incoming_seqnum;
+               dout_con<<" [sending CONTROLTYPE_ACK"
+                               " to peer_id="<<peer_id<<"]";
+               dout_con<<std::endl;
+               
+               //DEBUG
+               //assert(channel->incoming_reliables.size() < 100);
+
+               // Send a CONTROLTYPE_ACK
+               SharedBuffer<u8> reply(4);
+               writeU8(&reply[0], TYPE_CONTROL);
+               writeU8(&reply[1], CONTROLTYPE_ACK);
+               writeU16(&reply[2], seqnum);
+               rawSendAsPacket(peer_id, channelnum, reply, false);
+
+               //if(seqnum_higher(seqnum, channel->next_incoming_seqnum))
+               if(is_future_packet)
+               {
+                       /*PrintInfo();
+                       dout_con<<"Buffering reliable packet (seqnum="
+                                       <<seqnum<<")"<<std::endl;*/
+                       
+                       // This one comes later, buffer it.
+                       // Actually we have to make a packet to buffer one.
+                       // Well, we have all the ingredients, so just do it.
+                       BufferedPacket packet = makePacket(
+                                       getPeer(peer_id)->address,
+                                       packetdata,
+                                       GetProtocolID(),
+                                       peer_id,
+                                       channelnum);
+                       try{
+                               channel->incoming_reliables.insert(packet);
+                               
+                               /*PrintInfo();
+                               dout_con<<"INCOMING: ";
+                               channel->incoming_reliables.print();
+                               dout_con<<std::endl;*/
+                       }
+                       catch(AlreadyExistsException &e)
+                       {
+                       }
+
+                       throw ProcessedSilentlyException("Buffered future reliable packet");
+               }
+               //else if(seqnum_higher(channel->next_incoming_seqnum, seqnum))
+               else if(is_old_packet)
+               {
+                       // An old packet, dump it
+                       throw InvalidIncomingDataException("Got an old reliable packet");
+               }
+
+               channel->next_incoming_seqnum++;
 
-       u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
-       if(reliable)
-               chunksize_max -= RELIABLE_HEADER_SIZE;
+               // Get out the inside packet and re-process it
+               SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
+               memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
 
-       core::list<SharedBuffer<u8> > originals;
-       originals = makeAutoSplitPacket(data, chunksize_max,
-                       channel->next_outgoing_split_seqnum);
-       
-       core::list<SharedBuffer<u8> >::Iterator i;
-       i = originals.begin();
-       for(; i != originals.end(); i++)
+               return processPacket(channel, payload, peer_id, channelnum, true);
+       }
+       else
        {
-               SharedBuffer<u8> original = *i;
-               
-               SendAsPacket(peer_id, channelnum, original, reliable);
+               PrintInfo(derr_con);
+               derr_con<<"Got invalid type="<<((int)type&0xff)<<std::endl;
+               throw InvalidIncomingDataException("Invalid packet type");
        }
+       
+       // We should never get here.
+       // If you get here, add an exception or a return to some of the
+       // above conditionals.
+       assert(0);
+       throw BaseException("Error in Channel::ProcessPacket()");
 }
 
-void Connection::SendAsPacket(u16 peer_id, u8 channelnum,
-               SharedBuffer<u8> data, bool reliable)
+bool Connection::deletePeer(u16 peer_id, bool timeout)
 {
-       Peer *peer = GetPeer(peer_id);
-       Channel *channel = &(peer->channels[channelnum]);
+       if(m_peers.find(peer_id) == NULL)
+               return false;
+       
+       Peer *peer = m_peers[peer_id];
 
-       if(reliable)
-       {
-               u16 seqnum = channel->next_outgoing_seqnum;
-               channel->next_outgoing_seqnum++;
+       // Create event
+       ConnectionEvent e;
+       e.peerRemoved(peer_id, timeout, peer->address);
+       putEvent(e);
 
-               SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
+       delete m_peers[peer_id];
+       m_peers.remove(peer_id);
+       return true;
+}
 
-               // Add base headers and make a packet
-               BufferedPacket p = makePacket(peer->address, reliable,
-                               m_protocol_id, m_peer_id, channelnum);
-               
-               try{
-                       // Buffer the packet
-                       channel->outgoing_reliables.insert(p);
-               }
-               catch(AlreadyExistsException &e)
-               {
-                       PrintInfo(derr_con);
-                       derr_con<<"WARNING: Going to send a reliable packet "
-                                       "seqnum="<<seqnum<<" that is already "
-                                       "in outgoing buffer"<<std::endl;
-                       //assert(0);
-               }
-               
-               // Send the packet
-               RawSend(p);
-       }
-       else
-       {
-               // Add base headers and make a packet
-               BufferedPacket p = makePacket(peer->address, data,
-                               m_protocol_id, m_peer_id, channelnum);
+/* Interface */
 
-               // Send the packet
-               RawSend(p);
+ConnectionEvent Connection::getEvent()
+{
+       if(m_event_queue.size() == 0){
+               ConnectionEvent e;
+               e.type = CONNEVENT_NONE;
+               return e;
        }
+       return m_event_queue.pop_front();
 }
 
-void Connection::RawSend(const BufferedPacket &packet)
+ConnectionEvent Connection::waitEvent(u32 timeout_ms)
 {
        try{
-               m_socket.Send(packet.address, *packet.data, packet.data.getSize());
-       } catch(SendFailedException &e){
-               derr_con<<"Connection::RawSend(): SendFailedException: "
-                               <<packet.address.serializeString()<<std::endl;
+               return m_event_queue.pop_front(timeout_ms);
+       } catch(ItemNotFoundException &e){
+               ConnectionEvent e;
+               e.type = CONNEVENT_NONE;
+               return e;
        }
 }
 
-void Connection::RunTimeouts(float dtime)
+void Connection::putCommand(ConnectionCommand &c)
 {
-       core::list<u16> timeouted_peers;
-       core::map<u16, Peer*>::Iterator j;
-       j = m_peers.getIterator();
-       for(; j.atEnd() == false; j++)
-       {
-               Peer *peer = j.getNode()->getValue();
-               
-               /*
-                       Check peer timeout
-               */
-               peer->timeout_counter += dtime;
-               if(peer->timeout_counter > m_timeout)
-               {
-                       PrintInfo(derr_con);
-                       derr_con<<"RunTimeouts(): Peer "<<peer->id
-                                       <<" has timed out."
-                                       <<" (source=peer->timeout_counter)"
-                                       <<std::endl;
-                       // Add peer to the list
-                       timeouted_peers.push_back(peer->id);
-                       // Don't bother going through the buffers of this one
-                       continue;
-               }
-
-               float resend_timeout = peer->resend_timeout;
-               for(u16 i=0; i<CHANNEL_COUNT; i++)
-               {
-                       core::list<BufferedPacket> timed_outs;
-                       core::list<BufferedPacket>::Iterator j;
-                       
-                       Channel *channel = &peer->channels[i];
-
-                       // Remove timed out incomplete unreliable split packets
-                       channel->incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
-                       
-                       // Increment reliable packet times
-                       channel->outgoing_reliables.incrementTimeouts(dtime);
-
-                       // Check reliable packet total times, remove peer if
-                       // over timeout.
-                       if(channel->outgoing_reliables.anyTotaltimeReached(m_timeout))
-                       {
-                               PrintInfo(derr_con);
-                               derr_con<<"RunTimeouts(): Peer "<<peer->id
-                                               <<" has timed out."
-                                               <<" (source=reliable packet totaltime)"
-                                               <<std::endl;
-                               // Add peer to the to-be-removed list
-                               timeouted_peers.push_back(peer->id);
-                               goto nextpeer;
-                       }
-
-                       // Re-send timed out outgoing reliables
-                       
-                       timed_outs = channel->
-                                       outgoing_reliables.getTimedOuts(resend_timeout);
-
-                       channel->outgoing_reliables.resetTimedOuts(resend_timeout);
-
-                       j = timed_outs.begin();
-                       for(; j != timed_outs.end(); j++)
-                       {
-                               u16 peer_id = readPeerId(*(j->data));
-                               u8 channel = readChannel(*(j->data));
-                               u16 seqnum = readU16(&(j->data[BASE_HEADER_SIZE+1]));
+       m_command_queue.push_back(c);
+}
 
-                               PrintInfo(derr_con);
-                               derr_con<<"RE-SENDING timed-out RELIABLE to ";
-                               j->address.print(&derr_con);
-                               derr_con<<"(t/o="<<resend_timeout<<"): "
-                                               <<"from_peer_id="<<peer_id
-                                               <<", channel="<<((int)channel&0xff)
-                                               <<", seqnum="<<seqnum
-                                               <<std::endl;
+void Connection::Serve(unsigned short port)
+{
+       ConnectionCommand c;
+       c.serve(port);
+       putCommand(c);
+}
 
-                               RawSend(*j);
+void Connection::Connect(Address address)
+{
+       ConnectionCommand c;
+       c.connect(address);
+       putCommand(c);
+}
 
-                               // Enlarge avg_rtt and resend_timeout:
-                               // The rtt will be at least the timeout.
-                               // NOTE: This won't affect the timeout of the next
-                               // checked channel because it was cached.
-                               peer->reportRTT(resend_timeout);
-                       }
-               }
-               
-               /*
-                       Send pings
-               */
-               peer->ping_timer += dtime;
-               if(peer->ping_timer >= 5.0)
-               {
-                       // Create and send PING packet
-                       SharedBuffer<u8> data(2);
-                       writeU8(&data[0], TYPE_CONTROL);
-                       writeU8(&data[1], CONTROLTYPE_PING);
-                       SendAsPacket(peer->id, 0, data, true);
+bool Connection::Connected()
+{
+       JMutexAutoLock peerlock(m_peers_mutex);
 
-                       peer->ping_timer = 0.0;
-               }
+       if(m_peers.size() != 1)
+               return false;
                
-nextpeer:
-               continue;
-       }
-
-       // Remove timed out peers
-       core::list<u16>::Iterator i = timeouted_peers.begin();
-       for(; i != timeouted_peers.end(); i++)
-       {
-               PrintInfo(derr_con);
-               derr_con<<"RunTimeouts(): Removing peer "<<(*i)<<std::endl;
-               deletePeer(*i, true);
-       }
+       core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
+       if(node == NULL)
+               return false;
+       
+       if(m_peer_id == PEER_ID_INEXISTENT)
+               return false;
+       
+       return true;
 }
 
-Peer* Connection::GetPeer(u16 peer_id)
+void Connection::Disconnect()
 {
-       core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
+       ConnectionCommand c;
+       c.disconnect();
+       putCommand(c);
+}
 
-       if(node == NULL){
-               // Peer not found
-               throw PeerNotFoundException("GetPeer: Peer not found (possible timeout)");
+u32 Connection::Receive(u16 &peer_id, u8 *data, u32 datasize)
+{
+       for(;;){
+               ConnectionEvent e = waitEvent(m_bc_receive_timeout);
+               if(e.type != CONNEVENT_NONE)
+                       dout_con<<getDesc()<<": Receive: got event: "
+                                       <<e.describe()<<std::endl;
+               switch(e.type){
+               case CONNEVENT_NONE:
+                       throw NoIncomingDataException("No incoming data");
+               case CONNEVENT_DATA_RECEIVED:
+                       peer_id = e.peer_id;
+                       memcpy(data, *e.data, e.data.getSize());
+                       return e.data.getSize();
+               case CONNEVENT_PEER_ADDED: {
+                       Peer tmp(e.peer_id, e.address);
+                       if(m_bc_peerhandler)
+                               m_bc_peerhandler->peerAdded(&tmp);
+                       continue; }
+               case CONNEVENT_PEER_REMOVED: {
+                       Peer tmp(e.peer_id, e.address);
+                       if(m_bc_peerhandler)
+                               m_bc_peerhandler->deletingPeer(&tmp, e.timeout);
+                       continue; }
+               }
        }
+       throw NoIncomingDataException("No incoming data");
+}
 
-       // Error checking
-       assert(node->getValue()->id == peer_id);
+void Connection::SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
+{
+       assert(channelnum < CHANNEL_COUNT);
 
-       return node->getValue();
+       ConnectionCommand c;
+       c.sendToAll(channelnum, data, reliable);
+       putCommand(c);
 }
 
-Peer* Connection::GetPeerNoEx(u16 peer_id)
+void Connection::Send(u16 peer_id, u8 channelnum,
+               SharedBuffer<u8> data, bool reliable)
 {
-       core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
+       assert(channelnum < CHANNEL_COUNT);
 
-       if(node == NULL){
-               return NULL;
-       }
+       ConnectionCommand c;
+       c.send(peer_id, channelnum, data, reliable);
+       putCommand(c);
+}
 
-       // Error checking
-       assert(node->getValue()->id == peer_id);
+void Connection::RunTimeouts(float dtime)
+{
+       // No-op
+}
 
-       return node->getValue();
+Address Connection::GetPeerAddress(u16 peer_id)
+{
+       JMutexAutoLock peerlock(m_peers_mutex);
+       return getPeer(peer_id)->address;
 }
 
-core::list<Peer*> Connection::GetPeers()
+float Connection::GetPeerAvgRTT(u16 peer_id)
 {
-       core::list<Peer*> list;
-       core::map<u16, Peer*>::Iterator j;
-       j = m_peers.getIterator();
-       for(; j.atEnd() == false; j++)
-       {
-               Peer *peer = j.getNode()->getValue();
-               list.push_back(peer);
-       }
-       return list;
+       JMutexAutoLock peerlock(m_peers_mutex);
+       return getPeer(peer_id)->avg_rtt;
 }
 
-bool Connection::deletePeer(u16 peer_id, bool timeout)
+void Connection::DeletePeer(u16 peer_id)
 {
-       if(m_peers.find(peer_id) == NULL)
-               return false;
-       m_peerhandler->deletingPeer(m_peers[peer_id], timeout);
-       delete m_peers[peer_id];
-       m_peers.remove(peer_id);
-       return true;
+       ConnectionCommand c;
+       c.deletePeer(peer_id);
+       putCommand(c);
 }
 
 void Connection::PrintInfo(std::ostream &out)
 {
-       out<<m_socket.GetHandle();
-       out<<" ";
-       out<<"con "<<m_peer_id<<": ";
-       for(s16 i=0; i<(s16)m_indentation-1; i++)
-               out<<"  ";
+       out<<getDesc()<<": ";
 }
 
 void Connection::PrintInfo()
@@ -1383,5 +1658,10 @@ void Connection::PrintInfo()
        PrintInfo(dout_con);
 }
 
+std::string Connection::getDesc()
+{
+       return std::string("con(")+itos(m_socket.GetHandle())+"/"+itos(m_peer_id)+")";
+}
+
 } // namespace
 
index 6eb2f2824642cc86661fd4c788e5c090d515e366..570bc92ab22ba817873c0e549af2a59882b46940 100644 (file)
@@ -319,28 +319,6 @@ struct Channel
 {
        Channel();
        ~Channel();
-       /*
-               Processes a packet with the basic header stripped out.
-               Parameters:
-                       packetdata: Data in packet (with no base headers)
-                       con: The connection to which the channel is associated
-                            (used for sending back stuff (ACKs))
-                       peer_id: peer id of the sender of the packet in question
-                       channelnum: channel on which the packet was sent
-                       reliable: true if recursing into a reliable packet
-       */
-       SharedBuffer<u8> ProcessPacket(
-                       SharedBuffer<u8> packetdata,
-                       Connection *con,
-                       u16 peer_id,
-                       u8 channelnum,
-                       bool reliable=false);
-       
-       // Returns next data from a buffer if possible
-       // throws a NoIncomingDataException if no data is available
-       // If found, sets peer_id
-       SharedBuffer<u8> CheckIncomingBuffers(Connection *con,
-                       u16 &peer_id);
 
        u16 next_outgoing_seqnum;
        u16 next_incoming_seqnum;
@@ -412,78 +390,237 @@ public:
        // with the id we have given to it
        bool has_sent_with_id;
        
+       float m_sendtime_accu;
+       float m_max_packets_per_second;
+       int m_num_sent;
+       int m_max_num_sent;
+       
 private:
 };
 
-class Connection
+/*
+       Connection
+*/
+
+struct OutgoingPacket
+{
+       u16 peer_id;
+       u8 channelnum;
+       SharedBuffer<u8> data;
+       bool reliable;
+
+       OutgoingPacket(u16 peer_id_, u8 channelnum_, SharedBuffer<u8> data_,
+                       bool reliable_):
+               peer_id(peer_id_),
+               channelnum(channelnum_),
+               data(data_),
+               reliable(reliable_)
+       {
+       }
+};
+
+enum ConnectionEventType{
+       CONNEVENT_NONE,
+       CONNEVENT_DATA_RECEIVED,
+       CONNEVENT_PEER_ADDED,
+       CONNEVENT_PEER_REMOVED,
+};
+
+struct ConnectionEvent
+{
+       enum ConnectionEventType type;
+       u16 peer_id;
+       SharedBuffer<u8> data;
+       bool timeout;
+       Address address;
+
+       ConnectionEvent(): type(CONNEVENT_NONE) {}
+
+       std::string describe()
+       {
+               switch(type){
+               case CONNEVENT_NONE:
+                       return "CONNEVENT_NONE";
+               case CONNEVENT_DATA_RECEIVED:
+                       return "CONNEVENT_DATA_RECEIVED";
+               case CONNEVENT_PEER_ADDED: 
+                       return "CONNEVENT_PEER_ADDED";
+               case CONNEVENT_PEER_REMOVED: 
+                       return "CONNEVENT_PEER_REMOVED";
+               }
+               return "Invalid ConnectionEvent";
+       }
+       
+       void dataReceived(u16 peer_id_, SharedBuffer<u8> data_)
+       {
+               type = CONNEVENT_DATA_RECEIVED;
+               peer_id = peer_id_;
+               data = data_;
+       }
+       void peerAdded(u16 peer_id_, Address address_)
+       {
+               type = CONNEVENT_PEER_ADDED;
+               peer_id = peer_id_;
+               address = address_;
+       }
+       void peerRemoved(u16 peer_id_, bool timeout_, Address address_)
+       {
+               type = CONNEVENT_PEER_REMOVED;
+               peer_id = peer_id_;
+               timeout = timeout_;
+               address = address_;
+       }
+};
+
+enum ConnectionCommandType{
+       CONNCMD_NONE,
+       CONNCMD_SERVE,
+       CONNCMD_CONNECT,
+       CONNCMD_DISCONNECT,
+       CONNCMD_SEND,
+       CONNCMD_SEND_TO_ALL,
+       CONNCMD_DELETE_PEER,
+};
+
+struct ConnectionCommand
+{
+       enum ConnectionCommandType type;
+       u16 port;
+       Address address;
+       u16 peer_id;
+       u8 channelnum;
+       SharedBuffer<u8> data;
+       bool reliable;
+       
+       ConnectionCommand(): type(CONNCMD_NONE) {}
+
+       void serve(u16 port_)
+       {
+               type = CONNCMD_SERVE;
+               port = port_;
+       }
+       void connect(Address address_)
+       {
+               type = CONNCMD_CONNECT;
+               address = address_;
+       }
+       void disconnect()
+       {
+               type = CONNCMD_DISCONNECT;
+       }
+       void send(u16 peer_id_, u8 channelnum_,
+                       SharedBuffer<u8> data_, bool reliable_)
+       {
+               type = CONNCMD_SEND;
+               peer_id = peer_id_;
+               channelnum = channelnum_;
+               data = data_;
+               reliable = reliable_;
+       }
+       void sendToAll(u8 channelnum_, SharedBuffer<u8> data_, bool reliable_)
+       {
+               type = CONNCMD_SEND_TO_ALL;
+               channelnum = channelnum_;
+               data = data_;
+               reliable = reliable_;
+       }
+       void deletePeer(u16 peer_id_)
+       {
+               type = CONNCMD_DELETE_PEER;
+               peer_id = peer_id_;
+       }
+};
+
+class Connection: public SimpleThread
 {
 public:
-       Connection(
-               u32 protocol_id,
-               u32 max_packet_size,
-               float timeout,
-               PeerHandler *peerhandler
-       );
+       Connection(u32 protocol_id, u32 max_packet_size, float timeout);
+       Connection(u32 protocol_id, u32 max_packet_size, float timeout,
+                       PeerHandler *peerhandler);
        ~Connection();
-       void setTimeoutMs(int timeout){ m_socket.setTimeoutMs(timeout); }
-       // Start being a server
+       void * Thread();
+
+       /* Interface */
+
+       ConnectionEvent getEvent();
+       ConnectionEvent waitEvent(u32 timeout_ms);
+       void putCommand(ConnectionCommand &c);
+       
+       void SetTimeoutMs(int timeout){ m_bc_receive_timeout = timeout; }
        void Serve(unsigned short port);
-       // Connect to a server
        void Connect(Address address);
        bool Connected();
-
        void Disconnect();
-
-       // Sets peer_id
-       SharedBuffer<u8> GetFromBuffers(u16 &peer_id);
-
-       // The peer_id of sender is stored in peer_id
-       // Return value: I guess this always throws an exception or
-       //               actually gets data
-       // May call PeerHandler methods
        u32 Receive(u16 &peer_id, u8 *data, u32 datasize);
-       
-       // These will automatically package the data as an original or split
        void SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable);
        void Send(u16 peer_id, u8 channelnum, SharedBuffer<u8> data, bool reliable);
-       // Send data as a packet; it will be wrapped in base header and
-       // optionally to a reliable packet.
-       void SendAsPacket(u16 peer_id, u8 channelnum,
+       void RunTimeouts(float dtime); // dummy
+       u16 GetPeerID(){ return m_peer_id; }
+       Address GetPeerAddress(u16 peer_id);
+       float GetPeerAvgRTT(u16 peer_id);
+       void DeletePeer(u16 peer_id);
+       
+private:
+       void putEvent(ConnectionEvent &e);
+       void processCommand(ConnectionCommand &c);
+       void send(float dtime);
+       void receive();
+       void runTimeouts(float dtime);
+       void serve(u16 port);
+       void connect(Address address);
+       void disconnect();
+       void sendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable);
+       void send(u16 peer_id, u8 channelnum, SharedBuffer<u8> data, bool reliable);
+       void sendAsPacket(u16 peer_id, u8 channelnum,
                        SharedBuffer<u8> data, bool reliable);
-       // Sends a raw packet
-       void RawSend(const BufferedPacket &packet);
+       void rawSendAsPacket(u16 peer_id, u8 channelnum,
+                       SharedBuffer<u8> data, bool reliable);
+       void rawSend(const BufferedPacket &packet);
+       Peer* getPeer(u16 peer_id);
+       Peer* getPeerNoEx(u16 peer_id);
+       core::list<Peer*> getPeers();
+       bool getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst);
+       // Returns next data from a buffer if possible
+       // If found, returns true; if not, false.
+       // If found, sets peer_id and dst
+       bool checkIncomingBuffers(Channel *channel, u16 &peer_id,
+                       SharedBuffer<u8> &dst);
+       /*
+               Processes a packet with the basic header stripped out.
+               Parameters:
+                       packetdata: Data in packet (with no base headers)
+                       peer_id: peer id of the sender of the packet in question
+                       channelnum: channel on which the packet was sent
+                       reliable: true if recursing into a reliable packet
+       */
+       SharedBuffer<u8> processPacket(Channel *channel,
+                       SharedBuffer<u8> packetdata, u16 peer_id,
+                       u8 channelnum, bool reliable);
+       bool deletePeer(u16 peer_id, bool timeout);
        
-       // May call PeerHandler methods
-       void RunTimeouts(float dtime);
-
-       // Can throw a PeerNotFoundException
-       Peer* GetPeer(u16 peer_id);
-       // returns NULL if failed
-       Peer* GetPeerNoEx(u16 peer_id);
-       core::list<Peer*> GetPeers();
+       Queue<OutgoingPacket> m_outgoing_queue;
+       MutexedQueue<ConnectionEvent> m_event_queue;
+       MutexedQueue<ConnectionCommand> m_command_queue;
        
-       // Calls PeerHandler::deletingPeer
-       // Returns false if peer was not found
-       bool deletePeer(u16 peer_id, bool timeout);
+       u32 m_protocol_id;
+       u32 m_max_packet_size;
+       float m_timeout;
+       UDPSocket m_socket;
+       u16 m_peer_id;
+       
+       core::map<u16, Peer*> m_peers;
+       JMutex m_peers_mutex;
 
+       // Backwards compatibility
+       PeerHandler *m_bc_peerhandler;
+       int m_bc_receive_timeout;
+       
        void SetPeerID(u16 id){ m_peer_id = id; }
-       u16 GetPeerID(){ return m_peer_id; }
        u32 GetProtocolID(){ return m_protocol_id; }
-
-       // For debug printing
        void PrintInfo(std::ostream &out);
        void PrintInfo();
+       std::string getDesc();
        u16 m_indentation;
-
-private:
-       u32 m_protocol_id;
-       float m_timeout;
-       PeerHandler *m_peerhandler;
-       core::map<u16, Peer*> m_peers;
-       u16 m_peer_id;
-       //bool m_waiting_new_peer_id;
-       u32 m_max_packet_size;
-       UDPSocket m_socket;
 };
 
 } // namespace
index 586eaa282a2143b731ed874f8661ed510b2d4e79..7f0d46a10b6c8cc5d1aa5ae29d1c3a314e92da79 100644 (file)
@@ -100,7 +100,7 @@ void set_default_settings(Settings *settings)
        //settings->setDefault("max_simultaneous_block_sends_per_client", "1");
        // This causes frametime jitter on client side, or does it?
        settings->setDefault("max_simultaneous_block_sends_per_client", "2");
-       settings->setDefault("max_simultaneous_block_sends_server_total", "2");
+       settings->setDefault("max_simultaneous_block_sends_server_total", "8");
        settings->setDefault("max_block_send_distance", "7");
        settings->setDefault("max_block_generate_distance", "5");
        settings->setDefault("time_send_interval", "20");
index df1347f121d3624aa8a494f707e3335e9322fd2a..bc44775bdeaba6246dc3898507169a63bf790d30 100644 (file)
@@ -481,6 +481,8 @@ MainGameCallback *g_gamecallback = NULL;
 // Connection
 std::ostream *dout_con_ptr = &dummyout;
 std::ostream *derr_con_ptr = &verbosestream;
+//std::ostream *dout_con_ptr = &infostream;
+//std::ostream *derr_con_ptr = &errorstream;
 
 // Server
 std::ostream *dout_server_ptr = &infostream;
index ba4130ca29e860a9b6196cc66a1be5ecdb990988..8aad4e539886b80acd62d10e46dd6ba22a5bcdcb 100644 (file)
@@ -21,7 +21,9 @@ with this program; if not, write to the Free Software Foundation, Inc.,
 #include "mapsector.h"
 #include "mapblock.h"
 #include "main.h"
+#ifndef SERVER
 #include "client.h"
+#endif
 #include "filesys.h"
 #include "utility.h"
 #include "voxel.h"
index 1a441e8199a68fae397f68d6dbe253f506d433c3..37ba65a95957a604a9b2f447092b4212c5dccc35 100644 (file)
@@ -1074,7 +1074,7 @@ void Server::start(unsigned short port)
        m_thread.stop();
        
        // Initialize connection
-       m_con.setTimeoutMs(30);
+       m_con.SetTimeoutMs(30);
        m_con.Serve(port);
 
        // Start thread
@@ -1823,9 +1823,18 @@ void Server::ProcessData(u8 *data, u32 datasize, u16 peer_id)
        JMutexAutoLock envlock(m_env_mutex);
        JMutexAutoLock conlock(m_con_mutex);
        
-       con::Peer *peer;
        try{
-               peer = m_con.GetPeer(peer_id);
+               Address address = m_con.GetPeerAddress(peer_id);
+
+               // drop player if is ip is banned
+               if(m_banmanager.isIpBanned(address.serializeString())){
+                       SendAccessDenied(m_con, peer_id,
+                                       L"Your ip is banned. Banned name was "
+                                       +narrow_to_wide(m_banmanager.getBanName(
+                                               address.serializeString())));
+                       m_con.DeletePeer(peer_id);
+                       return;
+               }
        }
        catch(con::PeerNotFoundException &e)
        {
@@ -1834,17 +1843,7 @@ void Server::ProcessData(u8 *data, u32 datasize, u16 peer_id)
                return;
        }
 
-       // drop player if is ip is banned
-       if(m_banmanager.isIpBanned(peer->address.serializeString())){
-               SendAccessDenied(m_con, peer_id,
-                               L"Your ip is banned. Banned name was "
-                               +narrow_to_wide(m_banmanager.getBanName(
-                                       peer->address.serializeString())));
-               m_con.deletePeer(peer_id, false);
-               return;
-       }
-       
-       u8 peer_ser_ver = getClient(peer->id)->serialization_version;
+       u8 peer_ser_ver = getClient(peer_id)->serialization_version;
 
        try
        {
@@ -1865,7 +1864,7 @@ void Server::ProcessData(u8 *data, u32 datasize, u16 peer_id)
                        return;
 
                infostream<<"Server: Got TOSERVER_INIT from "
-                               <<peer->id<<std::endl;
+                               <<peer_id<<std::endl;
 
                // First byte after command is maximum supported
                // serialization version
@@ -1878,7 +1877,7 @@ void Server::ProcessData(u8 *data, u32 datasize, u16 peer_id)
                        deployed = SER_FMT_VER_INVALID;
 
                //peer->serialization_version = deployed;
-               getClient(peer->id)->pending_serialization_version = deployed;
+               getClient(peer_id)->pending_serialization_version = deployed;
                
                if(deployed == SER_FMT_VER_INVALID)
                {
@@ -1900,7 +1899,7 @@ void Server::ProcessData(u8 *data, u32 datasize, u16 peer_id)
                        net_proto_version = readU16(&data[2+1+PLAYERNAME_SIZE+PASSWORD_SIZE]);
                }
 
-               getClient(peer->id)->net_proto_version = net_proto_version;
+               getClient(peer_id)->net_proto_version = net_proto_version;
 
                if(net_proto_version == 0)
                {
@@ -2045,11 +2044,11 @@ void Server::ProcessData(u8 *data, u32 datasize, u16 peer_id)
        if(command == TOSERVER_INIT2)
        {
                infostream<<"Server: Got TOSERVER_INIT2 from "
-                               <<peer->id<<std::endl;
+                               <<peer_id<<std::endl;
 
 
-               getClient(peer->id)->serialization_version
-                               = getClient(peer->id)->pending_serialization_version;
+               getClient(peer_id)->serialization_version
+                               = getClient(peer_id)->pending_serialization_version;
 
                /*
                        Send some initialization data
@@ -2059,8 +2058,8 @@ void Server::ProcessData(u8 *data, u32 datasize, u16 peer_id)
                SendPlayerInfos();
 
                // Send inventory to player
-               UpdateCrafting(peer->id);
-               SendInventory(peer->id);
+               UpdateCrafting(peer_id);
+               SendInventory(peer_id);
 
                // Send player items to all players
                SendPlayerItems();
@@ -2074,7 +2073,7 @@ void Server::ProcessData(u8 *data, u32 datasize, u16 peer_id)
                {
                        SharedBuffer<u8> data = makePacket_TOCLIENT_TIME_OF_DAY(
                                        m_env.getTimeOfDay());
-                       m_con.Send(peer->id, 0, data, true);
+                       m_con.Send(peer_id, 0, data, true);
                }
                
                // Send information about server to player in chat
@@ -2095,7 +2094,7 @@ void Server::ProcessData(u8 *data, u32 datasize, u16 peer_id)
                }
                
                // Warnings about protocol version can be issued here
-               if(getClient(peer->id)->net_proto_version < PROTOCOL_VERSION)
+               if(getClient(peer_id)->net_proto_version < PROTOCOL_VERSION)
                {
                        SendChatMessage(peer_id, L"# Server: WARNING: YOUR CLIENT IS OLD AND MAY WORK PROPERLY WITH THIS SERVER");
                }
@@ -2402,7 +2401,7 @@ void Server::ProcessData(u8 *data, u32 datasize, u16 peer_id)
                else if(action == 2)
                {
 #if 0
-                       RemoteClient *client = getClient(peer->id);
+                       RemoteClient *client = getClient(peer_id);
                        JMutexAutoLock digmutex(client->m_dig_mutex);
                        client->m_dig_tool_item = -1;
 #endif
@@ -2685,7 +2684,7 @@ void Server::ProcessData(u8 *data, u32 datasize, u16 peer_id)
                                }
 
                                // Reset build time counter
-                               getClient(peer->id)->m_time_from_building = 0.0;
+                               getClient(peer_id)->m_time_from_building = 0.0;
                                
                                // Create node data
                                MaterialItem *mitem = (MaterialItem*)item;
@@ -3428,11 +3427,10 @@ core::list<PlayerInfo> Server::getPlayerInfo()
                Player *player = *i;
 
                try{
-                       con::Peer *peer = m_con.GetPeer(player->peer_id);
-                       // Copy info from peer to info struct
-                       info.id = peer->id;
-                       info.address = peer->address;
-                       info.avg_rtt = peer->avg_rtt;
+                       // Copy info from connection to info struct
+                       info.id = player->peer_id;
+                       info.address = m_con.GetPeerAddress(player->peer_id);
+                       info.avg_rtt = m_con.GetPeerAvgRTT(player->peer_id);
                }
                catch(con::PeerNotFoundException &e)
                {
index dac7e2826314b9ccd17e26cbe13081223141952c..b238bec26e86029b191c03101ce776808d4fb6f0 100644 (file)
@@ -468,9 +468,9 @@ public:
                return m_banmanager.getBanDescription(ip_or_name);
        }
 
-       con::Peer* getPeerNoEx(u16 peer_id)
+       Address getPeerAddress(u16 peer_id)
        {
-               return m_con.GetPeerNoEx(peer_id);
+               return m_con.GetPeerAddress(peer_id);
        }
        
        // Envlock and conlock should be locked when calling this
index a090039609a1f4fcaa52d732b346704cc3bfc1c8..1565989409b96e0b5343ae67dcdf9732e2fe9a22 100644 (file)
@@ -250,20 +250,19 @@ void cmd_banunban(std::wostringstream &os, ServerCommandContext *ctx)
                        os<<L"-!- No such player";
                        return;
                }
-
-               con::Peer *peer = ctx->server->getPeerNoEx(player->peer_id);
-               if(peer == NULL)
-               {
+               
+               try{
+                       Address address = ctx->server->getPeerAddress(player->peer_id);
+                       std::string ip_string = address.serializeString();
+                       ctx->server->setIpBanned(ip_string, player->getName());
+                       os<<L"-!- Banned "<<narrow_to_wide(ip_string)<<L"|"
+                                       <<narrow_to_wide(player->getName());
+
+                       actionstream<<ctx->player->getName()<<" bans "
+                                       <<player->getName()<<" / "<<ip_string<<std::endl;
+               } catch(con::PeerNotFoundException){
                        dstream<<__FUNCTION_NAME<<": peer was not found"<<std::endl;
-                       return;
                }
-               std::string ip_string = peer->address.serializeString();
-               ctx->server->setIpBanned(ip_string, player->getName());
-               os<<L"-!- Banned "<<narrow_to_wide(ip_string)<<L"|"
-                               <<narrow_to_wide(player->getName());
-
-               actionstream<<ctx->player->getName()<<" bans "
-                               <<player->getName()<<" / "<<ip_string<<std::endl;
        }
        else
        {
index 6b9ef4b6f7bb87f6f753518db8e36c9e80f18268..c1f04b2ef2b238039ab5d20979972d20ded79ef5 100644 (file)
@@ -819,7 +819,10 @@ struct TestConnection
 
                /*
                        Test some real connections
+
+                       NOTE: This mostly tests the legacy interface.
                */
+
                u32 proto_id = 0xad26846a;
 
                Handler hand_server("server");
@@ -843,11 +846,30 @@ struct TestConnection
 
                sleep_ms(50);
                
+               // Client should not have added client yet
+               assert(hand_client.count == 0);
+               
+               try
+               {
+                       u16 peer_id;
+                       u8 data[100];
+                       infostream<<"** running client.Receive()"<<std::endl;
+                       u32 size = client.Receive(peer_id, data, 100);
+                       infostream<<"** Client received: peer_id="<<peer_id
+                                       <<", size="<<size
+                                       <<std::endl;
+               }
+               catch(con::NoIncomingDataException &e)
+               {
+               }
+
                // Client should have added server now
                assert(hand_client.count == 1);
                assert(hand_client.last_id == 1);
-               // But server should not have added client
+               // Server should not have added client yet
                assert(hand_server.count == 0);
+               
+               sleep_ms(50);
 
                try
                {
@@ -930,7 +952,7 @@ struct TestConnection
                }
                
                u16 peer_id_client = 2;
-
+#if 0
                {
                        /*
                                Send consequent packets in different order
@@ -941,13 +963,13 @@ struct TestConnection
                        SharedBuffer<u8> data2 = SharedBufferFromString("Hello2");
 
                        Address client_address =
-                                       server.GetPeer(peer_id_client)->address;
+                                       server.GetPeerAddress(peer_id_client);
                        
                        infostream<<"*** Sending packets in wrong order (2,1,2)"
                                        <<std::endl;
                        
                        u8 chn = 0;
-                       con::Channel *ch = &server.GetPeer(peer_id_client)->channels[chn];
+                       con::Channel *ch = &server.getPeer(peer_id_client)->channels[chn];
                        u16 sn = ch->next_outgoing_seqnum;
                        ch->next_outgoing_seqnum = sn+1;
                        server.Send(peer_id_client, chn, data2, true);
@@ -1004,6 +1026,7 @@ struct TestConnection
                        }
                        assert(got_exception);
                }
+#endif
                {
                        const int datasize = 30000;
                        SharedBuffer<u8> data1(datasize);
@@ -1022,12 +1045,25 @@ struct TestConnection
                        
                        server.Send(peer_id_client, 0, data1, true);
 
-                       sleep_ms(50);
+                       sleep_ms(3000);
                        
                        u8 recvdata[datasize + 1000];
                        infostream<<"** running client.Receive()"<<std::endl;
                        u16 peer_id = 132;
-                       u16 size = client.Receive(peer_id, recvdata, datasize + 1000);
+                       u16 size = 0;
+                       bool received = false;
+                       u32 timems0 = porting::getTimeMs();
+                       for(;;){
+                               if(porting::getTimeMs() - timems0 > 5000)
+                                       break;
+                               try{
+                                       size = client.Receive(peer_id, recvdata, datasize + 1000);
+                                       received = true;
+                               }catch(con::NoIncomingDataException &e){
+                               }
+                               sleep_ms(10);
+                       }
+                       assert(received);
                        infostream<<"** Client received: peer_id="<<peer_id
                                        <<", size="<<size
                                        <<std::endl;