Rework packet receiving in ServerThread
authorsfan5 <sfan5@live.de>
Thu, 14 Nov 2019 16:38:15 +0000 (17:38 +0100)
committersfan5 <sfan5@live.de>
Tue, 19 Nov 2019 19:27:20 +0000 (20:27 +0100)
Notably it tries to receive all queued packets
between server steps, not just one.

src/network/connection.cpp
src/network/connection.h
src/network/networkpacket.cpp
src/network/networkpacket.h
src/server.cpp

index 0bc13a2f0f25f23a64d13bc6ee152dcc078e2ba4..a99e5b145229edb3d8aac18a18e4553edda0afc8 100644 (file)
@@ -1323,16 +1323,21 @@ void Connection::Disconnect()
        putCommand(c);
 }
 
-void Connection::Receive(NetworkPacket* pkt)
+bool Connection::Receive(NetworkPacket *pkt, u32 timeout)
 {
+       /*
+               Note that this function can potentially wait infinitely if non-data
+               events keep happening before the timeout expires.
+               This is not considered to be a problem (is it?)
+       */
        for(;;) {
-               ConnectionEvent e = waitEvent(m_bc_receive_timeout);
+               ConnectionEvent e = waitEvent(timeout);
                if (e.type != CONNEVENT_NONE)
                        LOG(dout_con << getDesc() << ": Receive: got event: "
                                        << e.describe() << std::endl);
                switch(e.type) {
                case CONNEVENT_NONE:
-                       throw NoIncomingDataException("No incoming data");
+                       return false;
                case CONNEVENT_DATA_RECEIVED:
                        // Data size is lesser than command size, ignoring packet
                        if (e.data.getSize() < 2) {
@@ -1340,7 +1345,7 @@ void Connection::Receive(NetworkPacket* pkt)
                        }
 
                        pkt->putRawPacket(*e.data, e.data.getSize(), e.peer_id);
-                       return;
+                       return true;
                case CONNEVENT_PEER_ADDED: {
                        UDPPeer tmp(e.peer_id, e.address, this);
                        if (m_bc_peerhandler)
@@ -1358,7 +1363,19 @@ void Connection::Receive(NetworkPacket* pkt)
                                        "(port already in use?)");
                }
        }
-       throw NoIncomingDataException("No incoming data");
+       return false;
+}
+
+void Connection::Receive(NetworkPacket *pkt)
+{
+       bool any = Receive(pkt, m_bc_receive_timeout);
+       if (!any)
+               throw NoIncomingDataException("No incoming data");
+}
+
+bool Connection::TryReceive(NetworkPacket *pkt)
+{
+       return Receive(pkt, 0);
 }
 
 void Connection::Send(session_t peer_id, u8 channelnum,
index 057bd39f66a93acef96a46b6ed15ce682af158fb..d7f1e0fe869810790278bb31802937285662fa5d 100644 (file)
@@ -771,6 +771,7 @@ public:
        bool Connected();
        void Disconnect();
        void Receive(NetworkPacket* pkt);
+       bool TryReceive(NetworkPacket *pkt);
        void Send(session_t peer_id, u8 channelnum, NetworkPacket *pkt, bool reliable);
        session_t GetPeerID() const { return m_peer_id; }
        Address GetPeerAddress(session_t peer_id);
@@ -803,6 +804,8 @@ protected:
        UDPSocket m_udpSocket;
        MutexedQueue<ConnectionCommand> m_command_queue;
 
+       bool Receive(NetworkPacket *pkt, u32 timeout);
+
        void putEvent(ConnectionEvent &e);
 
        void TriggerSend();
index 22c035c5b9c8c4734ddd21b75b3c4d99ef5f9e51..4d531b6114d5c0e482e172e4985ddff9aa2bf897 100644 (file)
@@ -66,6 +66,15 @@ void NetworkPacket::putRawPacket(u8 *data, u32 datasize, session_t peer_id)
        memcpy(m_data.data(), &data[2], m_datasize);
 }
 
+void NetworkPacket::clear()
+{
+       m_data.clear();
+       m_datasize = 0;
+       m_read_offset = 0;
+       m_command = 0;
+       m_peer_id = 0;
+}
+
 const char* NetworkPacket::getString(u32 from_offset)
 {
        checkReadOffset(from_offset, 0);
index a8b741374f8b32d2d2ca678dd5d201e5580e9cac..fc86176515b336ccd9aeb8d4ed7545e51b036b8b 100644 (file)
@@ -35,6 +35,7 @@ public:
        ~NetworkPacket();
 
        void putRawPacket(u8 *data, u32 datasize, session_t peer_id);
+       void clear();
 
        // Getters
        u32 getSize() const { return m_datasize; }
index 4aa8375c80b9d35ea55edb72b2cbab5358e28c92..d914beead35c44d086ac1aad92240c4387ff0db9 100644 (file)
@@ -93,6 +93,15 @@ void *ServerThread::run()
 {
        BEGIN_DEBUG_EXCEPTION_HANDLER
 
+       /*
+        * The real business of the server happens on the ServerThread.
+        * How this works:
+        * AsyncRunStep() runs an actual server step as soon as enough time has
+        * passed (dedicated_server_loop keeps track of that).
+        * Receive() blocks at least(!) 30ms waiting for a packet (so this loop
+        * doesn't busy wait) and will process any remaining packets.
+        */
+
        m_server->AsyncRunStep(true);
 
        while (!stopRequested()) {
@@ -101,7 +110,6 @@ void *ServerThread::run()
 
                        m_server->Receive();
 
-               } catch (con::NoIncomingDataException &e) {
                } catch (con::PeerNotFoundException &e) {
                        infostream<<"Server: PeerNotFoundException"<<std::endl;
                } catch (ClientNotFoundException &e) {
@@ -911,24 +919,43 @@ void Server::AsyncRunStep(bool initial_step)
 
 void Server::Receive()
 {
-       session_t peer_id = 0;
-       try {
-               NetworkPacket pkt;
-               m_con->Receive(&pkt);
-               peer_id = pkt.getPeerId();
-               ProcessData(&pkt);
-       } catch (const con::InvalidIncomingDataException &e) {
-               infostream << "Server::Receive(): InvalidIncomingDataException: what()="
-                               << e.what() << std::endl;
-       } catch (const SerializationError &e) {
-               infostream << "Server::Receive(): SerializationError: what()="
-                               << e.what() << std::endl;
-       } catch (const ClientStateError &e) {
-               errorstream << "ProcessData: peer=" << peer_id << e.what() << std::endl;
-               DenyAccess_Legacy(peer_id, L"Your client sent something server didn't expect."
-                               L"Try reconnecting or updating your client");
-       } catch (const con::PeerNotFoundException &e) {
-               // Do nothing
+       NetworkPacket pkt;
+       session_t peer_id;
+       bool first = true;
+       for (;;) {
+               pkt.clear();
+               peer_id = 0;
+               try {
+                       /*
+                               In the first iteration *wait* for a packet, afterwards process
+                               all packets that are immediately available (no waiting).
+                       */
+                       if (first) {
+                               m_con->Receive(&pkt);
+                               first = false;
+                       } else {
+                               if (!m_con->TryReceive(&pkt))
+                                       return;
+                       }
+
+                       peer_id = pkt.getPeerId();
+                       ProcessData(&pkt);
+               } catch (const con::InvalidIncomingDataException &e) {
+                       infostream << "Server::Receive(): InvalidIncomingDataException: what()="
+                                       << e.what() << std::endl;
+               } catch (const SerializationError &e) {
+                       infostream << "Server::Receive(): SerializationError: what()="
+                                       << e.what() << std::endl;
+               } catch (const ClientStateError &e) {
+                       errorstream << "ProcessData: peer=" << peer_id << " what()="
+                                        << e.what() << std::endl;
+                       DenyAccess_Legacy(peer_id, L"Your client sent something server didn't expect."
+                                       L"Try reconnecting or updating your client");
+               } catch (const con::PeerNotFoundException &e) {
+                       // Do nothing
+               } catch (const con::NoIncomingDataException &e) {
+                       return;
+               }
        }
 }
 
@@ -3728,6 +3755,11 @@ void dedicated_server_loop(Server &server, bool &kill)
        static thread_local const float profiler_print_interval =
                        g_settings->getFloat("profiler_print_interval");
 
+       /*
+        * The dedicated server loop only does time-keeping (in Server::step) and
+        * provides a way to main.cpp to kill the server externally (bool &kill).
+        */
+
        for(;;) {
                // This is kind of a hack but can be done like this
                // because server.step() is very light