Miscellaneous networking improvements (#9611)
authorsfan5 <sfan5@live.de>
Wed, 8 Apr 2020 18:12:58 +0000 (20:12 +0200)
committerGitHub <noreply@github.com>
Wed, 8 Apr 2020 18:12:58 +0000 (20:12 +0200)
fixes #2862

src/network/clientopcodes.cpp
src/network/connection.cpp
src/network/connection.h
src/network/connectionthreads.cpp
src/network/serveropcodes.cpp

index 431455b766bd9a3a3210ce7ae0daca1334029e03..0f20047c048856d67f111e734c1bf04b9c6d9cd0 100644 (file)
@@ -126,6 +126,16 @@ const ToClientCommandHandler toClientCommandTable[TOCLIENT_NUM_MSG_TYPES] =
 
 const static ServerCommandFactory null_command_factory = { "TOSERVER_NULL", 0, false };
 
+/*
+       Channels used for Client -> Server communication
+       2: Notifications back to the server (e.g. GOTBLOCKS)
+       1: Init and Authentication
+       0: everything else
+
+       Packet order is only guaranteed inside a channel, so packets that operate on
+       the same objects are *required* to be in the same channel.
+*/
+
 const ServerCommandFactory serverCommandFactoryTable[TOSERVER_NUM_MSG_TYPES] =
 {
        null_command_factory, // 0x00
@@ -143,7 +153,7 @@ const ServerCommandFactory serverCommandFactoryTable[TOSERVER_NUM_MSG_TYPES] =
        null_command_factory, // 0x0c
        null_command_factory, // 0x0d
        null_command_factory, // 0x0e
-       null_command_factory, // 0x0F
+       null_command_factory, // 0x0f
        null_command_factory, // 0x10
        { "TOSERVER_INIT2",              1, true }, // 0x11
        null_command_factory, // 0x12
@@ -186,7 +196,7 @@ const ServerCommandFactory serverCommandFactoryTable[TOSERVER_NUM_MSG_TYPES] =
        { "TOSERVER_PLAYERITEM",         0, true }, // 0x37
        { "TOSERVER_RESPAWN",            0, true }, // 0x38
        { "TOSERVER_INTERACT",           0, true }, // 0x39
-       { "TOSERVER_REMOVED_SOUNDS",     1, true }, // 0x3a
+       { "TOSERVER_REMOVED_SOUNDS",     2, true }, // 0x3a
        { "TOSERVER_NODEMETA_FIELDS",    0, true }, // 0x3b
        { "TOSERVER_INVENTORY_FIELDS",   0, true }, // 0x3c
        null_command_factory, // 0x3d
@@ -194,8 +204,8 @@ const ServerCommandFactory serverCommandFactoryTable[TOSERVER_NUM_MSG_TYPES] =
        null_command_factory, // 0x3f
        { "TOSERVER_REQUEST_MEDIA",      1, true }, // 0x40
        null_command_factory, // 0x41
-       { "TOSERVER_BREATH",             0, true }, // 0x42 old TOSERVER_BREATH. Ignored by servers
-       { "TOSERVER_CLIENT_READY",       0, true }, // 0x43
+       null_command_factory, // 0x42
+       { "TOSERVER_CLIENT_READY",       1, true }, // 0x43
        null_command_factory, // 0x44
        null_command_factory, // 0x45
        null_command_factory, // 0x46
index 36124ce3c0d5c2e1adfc6b5a1711ac1010e05060..15eda7725ed13869ba81a51b9d1a720376e65eac 100644 (file)
@@ -924,7 +924,7 @@ UDPPeer::UDPPeer(u16 a_id, Address a_address, Connection* connection) :
        Peer(a_address,a_id,connection)
 {
        for (Channel &channel : channels)
-               channel.setWindowSize(g_settings->getU16("max_packets_per_iteration"));
+               channel.setWindowSize(START_RELIABLE_WINDOW_SIZE);
 }
 
 bool UDPPeer::getAddress(MTProtocols type,Address& toset)
@@ -975,22 +975,29 @@ void UDPPeer::PutReliableSendCommand(ConnectionCommand &c,
        if (m_pending_disconnect)
                return;
 
-       if ( channels[c.channelnum].queued_commands.empty() &&
+       Channel &chan = channels[c.channelnum];
+
+       if (chan.queued_commands.empty() &&
                        /* don't queue more packets then window size */
-                       (channels[c.channelnum].queued_reliables.size()
-                       < (channels[c.channelnum].getWindowSize()/2))) {
+                       (chan.queued_reliables.size() < chan.getWindowSize() / 2)) {
                LOG(dout_con<<m_connection->getDesc()
                                <<" processing reliable command for peer id: " << c.peer_id
                                <<" data size: " << c.data.getSize() << std::endl);
                if (!processReliableSendCommand(c,max_packet_size)) {
-                       channels[c.channelnum].queued_commands.push_back(c);
+                       chan.queued_commands.push_back(c);
                }
        }
        else {
                LOG(dout_con<<m_connection->getDesc()
                                <<" Queueing reliable command for peer id: " << c.peer_id
                                <<" data size: " << c.data.getSize() <<std::endl);
-               channels[c.channelnum].queued_commands.push_back(c);
+               chan.queued_commands.push_back(c);
+               if (chan.queued_commands.size() >= chan.getWindowSize() / 2) {
+                       LOG(derr_con << m_connection->getDesc()
+                                       << "Possible packet stall to peer id: " << c.peer_id
+                                       << " queued_commands=" << chan.queued_commands.size()
+                                       << std::endl);
+               }
        }
 }
 
@@ -1001,6 +1008,8 @@ bool UDPPeer::processReliableSendCommand(
        if (m_pending_disconnect)
                return true;
 
+       Channel &chan = channels[c.channelnum];
+
        u32 chunksize_max = max_packet_size
                                                        - BASE_HEADER_SIZE
                                                        - RELIABLE_HEADER_SIZE;
@@ -1008,13 +1017,13 @@ bool UDPPeer::processReliableSendCommand(
        sanity_check(c.data.getSize() < MAX_RELIABLE_WINDOW_SIZE*512);
 
        std::list<SharedBuffer<u8>> originals;
-       u16 split_sequence_number = channels[c.channelnum].readNextSplitSeqNum();
+       u16 split_sequence_number = chan.readNextSplitSeqNum();
 
        if (c.raw) {
                originals.emplace_back(c.data);
        } else {
                makeAutoSplitPacket(c.data, chunksize_max,split_sequence_number, &originals);
-               channels[c.channelnum].setNextSplitSeqNum(split_sequence_number);
+               chan.setNextSplitSeqNum(split_sequence_number);
        }
 
        bool have_sequence_number = true;
@@ -1023,7 +1032,7 @@ bool UDPPeer::processReliableSendCommand(
        volatile u16 initial_sequence_number = 0;
 
        for (SharedBuffer<u8> &original : originals) {
-               u16 seqnum = channels[c.channelnum].getOutgoingSequenceNumber(have_sequence_number);
+               u16 seqnum = chan.getOutgoingSequenceNumber(have_sequence_number);
 
                /* oops, we don't have enough sequence numbers to send this packet */
                if (!have_sequence_number)
@@ -1055,10 +1064,10 @@ bool UDPPeer::processReliableSendCommand(
 //                                     << " channel: " << (c.channelnum&0xFF)
 //                                     << " seqnum: " << readU16(&p.data[BASE_HEADER_SIZE+1])
 //                                     << std::endl)
-                       channels[c.channelnum].queued_reliables.push(p);
+                       chan.queued_reliables.push(p);
                        pcount++;
                }
-               sanity_check(channels[c.channelnum].queued_reliables.size() < 0xFFFF);
+               sanity_check(chan.queued_reliables.size() < 0xFFFF);
                return true;
        }
 
@@ -1073,7 +1082,7 @@ bool UDPPeer::processReliableSendCommand(
                toadd.pop();
 
                bool successfully_put_back_sequence_number
-                       = channels[c.channelnum].putBackSequenceNumber(
+                       = chan.putBackSequenceNumber(
                                (initial_sequence_number+toadd.size() % (SEQNUM_MAX+1)));
 
                FATAL_ERROR_IF(!successfully_put_back_sequence_number, "error");
@@ -1081,7 +1090,7 @@ bool UDPPeer::processReliableSendCommand(
 
        // DO NOT REMOVE n_queued! It avoids a deadlock of async locked
        // 'log_message_mutex' and 'm_list_mutex'.
-       u32 n_queued = channels[c.channelnum].outgoing_reliables_sent.size();
+       u32 n_queued = chan.outgoing_reliables_sent.size();
 
        LOG(dout_con<<m_connection->getDesc()
                        << " Windowsize exceeded on reliable sending "
index 0b12bf701fa6e2b16c7853a7383703440c30f454..85f021c4c424407cde6066a14e6e146bc76b4fb7 100644 (file)
@@ -141,7 +141,6 @@ private:
 === NOTES ===
 
 A packet is sent through a channel to a peer with a basic header:
-TODO: Should we have a receiver_peer_id also?
        Header (7 bytes):
        [0] u32 protocol_id
        [4] session_t sender_peer_id
@@ -152,8 +151,7 @@ sender_peer_id:
        value 1 (PEER_ID_SERVER) is reserved for server
        these constants are defined in constants.h
 channel:
-       The lower the number, the higher the priority is.
-       Only channels 0, 1 and 2 exist.
+       Channel numbers have no intrinsic meaning. Currently only 0, 1, 2 exist.
 */
 #define BASE_HEADER_SIZE 7
 #define CHANNEL_COUNT 3
@@ -386,12 +384,14 @@ struct ConnectionCommand
        }
 };
 
-/* maximum window size to use, 0xFFFF is theoretical maximum  don't think about
+/* maximum window size to use, 0xFFFF is theoretical maximum. don't think about
  * touching it, the less you're away from it the more likely data corruption
  * will occur
  */
 #define MAX_RELIABLE_WINDOW_SIZE 0x8000
-       /* starting value for window size */
+/* starting value for window size */
+#define START_RELIABLE_WINDOW_SIZE 0x400
+/* minimum value for window size */
 #define MIN_RELIABLE_WINDOW_SIZE 0x40
 
 class Channel
@@ -555,15 +555,15 @@ class Peer {
 
                bool isTimedOut(float timeout);
 
-               unsigned int m_increment_packets_remaining = 9;
-               unsigned int m_increment_bytes_remaining = 0;
+               unsigned int m_increment_packets_remaining = 0;
 
                virtual u16 getNextSplitSequenceNumber(u8 channel) { return 0; };
                virtual void setNextSplitSequenceNumber(u8 channel, u16 seqnum) {};
                virtual SharedBuffer<u8> addSplitPacket(u8 channel, const BufferedPacket &toadd,
                                bool reliable)
                {
-                       fprintf(stderr,"Peer: addSplitPacket called, this is supposed to be never called!\n");
+                       errorstream << "Peer::addSplitPacket called,"
+                                       << " this is supposed to be never called!" << std::endl;
                        return SharedBuffer<u8>(0);
                };
 
index f8b58c025fd4c886391df5d4895d7659b670e74c..48a4f51abffc6816cb56eb6513bdc8b5148e6237 100644 (file)
@@ -73,6 +73,7 @@ ConnectionSendThread::ConnectionSendThread(unsigned int max_packet_size,
        m_timeout(timeout),
        m_max_data_packets_per_iteration(g_settings->getU16("max_packets_per_iteration"))
 {
+       SANITY_CHECK(m_max_data_packets_per_iteration > 1);
 }
 
 void *ConnectionSendThread::run()
@@ -107,8 +108,13 @@ void *ConnectionSendThread::run()
                curtime = porting::getTimeMs();
                float dtime = CALC_DTIME(lasttime, curtime);
 
-               /* first do all the reliable stuff */
+               /* first resend timed-out packets */
                runTimeouts(dtime);
+               if (m_iteration_packets_avaialble == 0) {
+                       LOG(warningstream << m_connection->getDesc()
+                               << " Packet quota used up after re-sending packets, "
+                               << "max=" << m_max_data_packets_per_iteration << std::endl);
+               }
 
                /* translate commands to packets */
                ConnectionCommand c = m_connection->m_command_queue.pop_frontNoEx(0);
@@ -121,7 +127,7 @@ void *ConnectionSendThread::run()
                        c = m_connection->m_command_queue.pop_frontNoEx(0);
                }
 
-               /* send non reliable packets */
+               /* send queued packets */
                sendPackets(dtime);
 
                END_DEBUG_EXCEPTION_HANDLER
@@ -644,6 +650,9 @@ void ConnectionSendThread::sendPackets(float dtime)
        std::list<session_t> pendingDisconnect;
        std::map<session_t, bool> pending_unreliable;
 
+       const unsigned int peer_packet_quota = m_iteration_packets_avaialble
+               / MYMAX(peerIds.size(), 1);
+
        for (session_t peerId : peerIds) {
                PeerHelper peer = m_connection->getPeerNoEx(peerId);
                //peer may have been removed
@@ -653,8 +662,7 @@ void ConnectionSendThread::sendPackets(float dtime)
                                << std::endl);
                        continue;
                }
-               peer->m_increment_packets_remaining =
-                       m_iteration_packets_avaialble / m_connection->m_peers.size();
+               peer->m_increment_packets_remaining = peer_packet_quota;
 
                UDPPeer *udpPeer = dynamic_cast<UDPPeer *>(&peer);
 
@@ -751,23 +759,30 @@ void ConnectionSendThread::sendPackets(float dtime)
                }
 
                /* send acks immediately */
-               if (packet.ack) {
+               if (packet.ack || peer->m_increment_packets_remaining > 0 || stopRequested()) {
                        rawSendAsPacket(packet.peer_id, packet.channelnum,
                                packet.data, packet.reliable);
-                       peer->m_increment_packets_remaining =
-                               MYMIN(0, peer->m_increment_packets_remaining--);
-               } else if (
-                       (peer->m_increment_packets_remaining > 0) ||
-                               (stopRequested())) {
-                       rawSendAsPacket(packet.peer_id, packet.channelnum,
-                               packet.data, packet.reliable);
-                       peer->m_increment_packets_remaining--;
+                       if (peer->m_increment_packets_remaining > 0)
+                               peer->m_increment_packets_remaining--;
                } else {
                        m_outgoing_queue.push(packet);
                        pending_unreliable[packet.peer_id] = true;
                }
        }
 
+       if (peer_packet_quota > 0) {
+               for (session_t peerId : peerIds) {
+                       PeerHelper peer = m_connection->getPeerNoEx(peerId);
+                       if (!peer)
+                               continue;
+                       if (peer->m_increment_packets_remaining == 0) {
+                               LOG(warningstream << m_connection->getDesc()
+                                       << " Packet quota used up for peer_id=" << peerId
+                                       << ", was " << peer_packet_quota << " pkts" << std::endl);
+                       }
+               }
+       }
+
        for (session_t peerId : pendingDisconnect) {
                if (!pending_unreliable[peerId]) {
                        m_connection->deletePeer(peerId, false);
index cca2e56ead948b8cfc867b9584b795a6190768df..6ee4ff25685e6583001840fd94daa8c327a881d2 100644 (file)
@@ -111,6 +111,16 @@ const ToServerCommandHandler toServerCommandTable[TOSERVER_NUM_MSG_TYPES] =
 
 const static ClientCommandFactory null_command_factory = { "TOCLIENT_NULL", 0, false };
 
+/*
+       Channels used for Server -> Client communication
+       2: Bulk data (mapblocks, media, ...)
+       1: HUD packets
+       0: everything else
+
+       Packet order is only guaranteed inside a channel, so packets that operate on
+       the same objects are *required* to be in the same channel.
+*/
+
 const ClientCommandFactory clientCommandFactoryTable[TOCLIENT_NUM_MSG_TYPES] =
 {
        null_command_factory, // 0x00
@@ -163,7 +173,7 @@ const ClientCommandFactory clientCommandFactoryTable[TOCLIENT_NUM_MSG_TYPES] =
        { "TOCLIENT_CHAT_MESSAGE",             0, true }, // 0x2F
        null_command_factory, // 0x30
        { "TOCLIENT_ACTIVE_OBJECT_REMOVE_ADD", 0, true }, // 0x31
-       { "TOCLIENT_ACTIVE_OBJECT_MESSAGES",   0, true }, // 0x32 Special packet, sent by 0 (rel) and 1 (unrel) channel
+       { "TOCLIENT_ACTIVE_OBJECT_MESSAGES",   0, true }, // 0x32 (may be sent as unrel over channel 1 too)
        { "TOCLIENT_HP",                       0, true }, // 0x33
        { "TOCLIENT_MOVE_PLAYER",              0, true }, // 0x34
        { "TOCLIENT_ACCESS_DENIED_LEGACY",     0, true }, // 0x35
@@ -176,7 +186,7 @@ const ClientCommandFactory clientCommandFactoryTable[TOCLIENT_NUM_MSG_TYPES] =
        { "TOCLIENT_ANNOUNCE_MEDIA",           0, true }, // 0x3C
        { "TOCLIENT_ITEMDEF",                  0, true }, // 0x3D
        null_command_factory, // 0x3E
-       { "TOCLIENT_PLAY_SOUND",               0, true }, // 0x3f
+       { "TOCLIENT_PLAY_SOUND",               0, true }, // 0x3f (may be sent as unrel too)
        { "TOCLIENT_STOP_SOUND",               0, true }, // 0x40
        { "TOCLIENT_PRIVILEGES",               0, true }, // 0x41
        { "TOCLIENT_INVENTORY_FORMSPEC",       0, true }, // 0x42
@@ -188,9 +198,9 @@ const ClientCommandFactory clientCommandFactoryTable[TOCLIENT_NUM_MSG_TYPES] =
        null_command_factory, // 0x48
        { "TOCLIENT_HUDADD",                   1, true }, // 0x49
        { "TOCLIENT_HUDRM",                    1, true }, // 0x4a
-       { "TOCLIENT_HUDCHANGE",                0, true }, // 0x4b
-       { "TOCLIENT_HUD_SET_FLAGS",            0, true }, // 0x4c
-       { "TOCLIENT_HUD_SET_PARAM",            0, true }, // 0x4d
+       { "TOCLIENT_HUDCHANGE",                1, true }, // 0x4b
+       { "TOCLIENT_HUD_SET_FLAGS",            1, true }, // 0x4c
+       { "TOCLIENT_HUD_SET_PARAM",            1, true }, // 0x4d
        { "TOCLIENT_BREATH",                   0, true }, // 0x4e
        { "TOCLIENT_SET_SKY",                  0, true }, // 0x4f
        { "TOCLIENT_OVERRIDE_DAY_NIGHT_RATIO", 0, true }, // 0x50