Improve protocol-level receiving code (#9617)
authorsfan5 <sfan5@live.de>
Mon, 20 Apr 2020 21:22:00 +0000 (23:22 +0200)
committerGitHub <noreply@github.com>
Mon, 20 Apr 2020 21:22:00 +0000 (23:22 +0200)
src/network/connection.cpp
src/network/connectionthreads.cpp
src/network/connectionthreads.h

index 15eda7725ed13869ba81a51b9d1a720376e65eac..3692e45a9870519297252582f871f57670643df1 100644 (file)
@@ -1173,7 +1173,9 @@ Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
        m_bc_peerhandler(peerhandler)
 
 {
-       m_udpSocket.setTimeoutMs(5);
+       /* Amount of time Receive() will wait for data, this is entirely different
+        * from the connection timeout */
+       m_udpSocket.setTimeoutMs(500);
 
        m_sendThread->setParent(this);
        m_receiveThread->setParent(this);
index 13d82e06d12b6b0bbf2b4127d7ba80b6c1e9618f..1f33d2deddb615d70a6d9ad93cdb645580e89cef 100644 (file)
@@ -812,6 +812,14 @@ void *ConnectionReceiveThread::run()
        ThreadIdentifier);
        PROFILE(ThreadIdentifier << "ConnectionReceive: [" << m_connection->getDesc() << "]");
 
+       // use IPv6 minimum allowed MTU as receive buffer size as this is
+       // theoretical reliable upper boundary of a udp packet for all IPv6 enabled
+       // infrastructure
+       const unsigned int packet_maxsize = 1500;
+       SharedBuffer<u8> packetdata(packet_maxsize);
+
+       bool packet_queued = true;
+
 #ifdef DEBUG_CONNECTION_KBPS
        u64 curtime = porting::getTimeMs();
        u64 lasttime = curtime;
@@ -830,7 +838,7 @@ void *ConnectionReceiveThread::run()
 #endif
 
                /* receive packets */
-               receive();
+               receive(packetdata, packet_queued);
 
 #ifdef DEBUG_CONNECTION_KBPS
                debug_print_timer += dtime;
@@ -892,157 +900,142 @@ void *ConnectionReceiveThread::run()
 }
 
 // Receive packets from the network and buffers and create ConnectionEvents
-void ConnectionReceiveThread::receive()
+void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata,
+               bool &packet_queued)
 {
-       // use IPv6 minimum allowed MTU as receive buffer size as this is
-       // theoretical reliable upper boundary of a udp packet for all IPv6 enabled
-       // infrastructure
-       unsigned int packet_maxsize = 1500;
-       SharedBuffer<u8> packetdata(packet_maxsize);
-
-       bool packet_queued = true;
-
-       unsigned int loop_count = 0;
-
-       /* first of all read packets from socket */
-       /* check for incoming data available */
-       while ((loop_count < 10) &&
-               (m_connection->m_udpSocket.WaitData(50))) {
-               loop_count++;
-               try {
-                       if (packet_queued) {
-                               bool data_left = true;
-                               session_t peer_id;
-                               SharedBuffer<u8> resultdata;
-                               while (data_left) {
-                                       try {
-                                               data_left = getFromBuffers(peer_id, resultdata);
-                                               if (data_left) {
-                                                       ConnectionEvent e;
-                                                       e.dataReceived(peer_id, resultdata);
-                                                       m_connection->putEvent(e);
-                                               }
-                                       }
-                                       catch (ProcessedSilentlyException &e) {
-                                               /* try reading again */
+       try {
+               // First, see if there any buffered packets we can process now
+               if (packet_queued) {
+                       bool data_left = true;
+                       session_t peer_id;
+                       SharedBuffer<u8> resultdata;
+                       while (data_left) {
+                               try {
+                                       data_left = getFromBuffers(peer_id, resultdata);
+                                       if (data_left) {
+                                               ConnectionEvent e;
+                                               e.dataReceived(peer_id, resultdata);
+                                               m_connection->putEvent(e);
                                        }
                                }
-                               packet_queued = false;
-                       }
-
-                       Address sender;
-                       s32 received_size = m_connection->m_udpSocket.Receive(sender, *packetdata,
-                               packet_maxsize);
-
-                       if ((received_size < BASE_HEADER_SIZE) ||
-                               (readU32(&packetdata[0]) != m_connection->GetProtocolID())) {
-                               LOG(derr_con << m_connection->getDesc()
-                                       << "Receive(): Invalid incoming packet, "
-                                       << "size: " << received_size
-                                       << ", protocol: "
-                                       << ((received_size >= 4) ? readU32(&packetdata[0]) : -1)
-                                       << std::endl);
-                               continue;
+                               catch (ProcessedSilentlyException &e) {
+                                       /* try reading again */
+                               }
                        }
+                       packet_queued = false;
+               }
 
-                       session_t peer_id = readPeerId(*packetdata);
-                       u8 channelnum = readChannel(*packetdata);
+               // Call Receive() to wait for incoming data
+               Address sender;
+               s32 received_size = m_connection->m_udpSocket.Receive(sender,
+                       *packetdata, packetdata.getSize());
+               if (received_size < 0)
+                       return;
 
-                       if (channelnum > CHANNEL_COUNT - 1) {
-                               LOG(derr_con << m_connection->getDesc()
-                                       << "Receive(): Invalid channel " << (u32)channelnum << std::endl);
-                               throw InvalidIncomingDataException("Channel doesn't exist");
-                       }
+               if ((received_size < BASE_HEADER_SIZE) ||
+                       (readU32(&packetdata[0]) != m_connection->GetProtocolID())) {
+                       LOG(derr_con << m_connection->getDesc()
+                               << "Receive(): Invalid incoming packet, "
+                               << "size: " << received_size
+                               << ", protocol: "
+                               << ((received_size >= 4) ? readU32(&packetdata[0]) : -1)
+                               << std::endl);
+                       return;
+               }
 
-                       /* Try to identify peer by sender address (may happen on join) */
-                       if (peer_id == PEER_ID_INEXISTENT) {
-                               peer_id = m_connection->lookupPeer(sender);
-                               // We do not have to remind the peer of its
-                               // peer id as the CONTROLTYPE_SET_PEER_ID
-                               // command was sent reliably.
-                       }
+               session_t peer_id = readPeerId(*packetdata);
+               u8 channelnum = readChannel(*packetdata);
 
-                       /* The peer was not found in our lists. Add it. */
-                       if (peer_id == PEER_ID_INEXISTENT) {
-                               peer_id = m_connection->createPeer(sender, MTP_MINETEST_RELIABLE_UDP, 0);
-                       }
+               if (channelnum > CHANNEL_COUNT - 1) {
+                       LOG(derr_con << m_connection->getDesc()
+                               << "Receive(): Invalid channel " << (u32)channelnum << std::endl);
+                       return;
+               }
 
-                       PeerHelper peer = m_connection->getPeerNoEx(peer_id);
+               /* Try to identify peer by sender address (may happen on join) */
+               if (peer_id == PEER_ID_INEXISTENT) {
+                       peer_id = m_connection->lookupPeer(sender);
+                       // We do not have to remind the peer of its
+                       // peer id as the CONTROLTYPE_SET_PEER_ID
+                       // command was sent reliably.
+               }
 
-                       if (!peer) {
-                               LOG(dout_con << m_connection->getDesc()
-                                       << " got packet from unknown peer_id: "
-                                       << peer_id << " Ignoring." << std::endl);
-                               continue;
-                       }
+               /* The peer was not found in our lists. Add it. */
+               if (peer_id == PEER_ID_INEXISTENT) {
+                       peer_id = m_connection->createPeer(sender, MTP_MINETEST_RELIABLE_UDP, 0);
+               }
 
-                       // Validate peer address
+               PeerHelper peer = m_connection->getPeerNoEx(peer_id);
+               if (!peer) {
+                       LOG(dout_con << m_connection->getDesc()
+                               << " got packet from unknown peer_id: "
+                               << peer_id << " Ignoring." << std::endl);
+                       return;
+               }
 
-                       Address peer_address;
+               // Validate peer address
 
-                       if (peer->getAddress(MTP_UDP, peer_address)) {
-                               if (peer_address != sender) {
-                                       LOG(derr_con << m_connection->getDesc()
-                                               << m_connection->getDesc()
-                                               << " Peer " << peer_id << " sending from different address."
-                                               " Ignoring." << std::endl);
-                                       continue;
-                               }
-                       } else {
-
-                               bool invalid_address = true;
-                               if (invalid_address) {
-                                       LOG(derr_con << m_connection->getDesc()
-                                               << m_connection->getDesc()
-                                               << " Peer " << peer_id << " unknown."
-                                               " Ignoring." << std::endl);
-                                       continue;
-                               }
+               Address peer_address;
+               if (peer->getAddress(MTP_UDP, peer_address)) {
+                       if (peer_address != sender) {
+                               LOG(derr_con << m_connection->getDesc()
+                                       << " Peer " << peer_id << " sending from different address."
+                                       " Ignoring." << std::endl);
+                               return;
                        }
+               } else {
+                       LOG(derr_con << m_connection->getDesc()
+                               << " Peer " << peer_id << " doesn't have an address?!"
+                               " Ignoring." << std::endl);
+                       return;
+               }
 
-                       peer->ResetTimeout();
-
-                       Channel *channel = 0;
+               peer->ResetTimeout();
 
-                       if (dynamic_cast<UDPPeer *>(&peer) != 0) {
-                               channel = &(dynamic_cast<UDPPeer *>(&peer)->channels[channelnum]);
-                       }
+               Channel *channel = nullptr;
+               if (dynamic_cast<UDPPeer *>(&peer)) {
+                       channel = &dynamic_cast<UDPPeer *>(&peer)->channels[channelnum];
+               } else {
+                       LOG(derr_con << m_connection->getDesc()
+                               << "Receive(): peer_id=" << peer_id << " isn't an UDPPeer?!"
+                               " Ignoring." << std::endl);
+                       return;
+               }
 
-                       if (channel != 0) {
-                               channel->UpdateBytesReceived(received_size);
-                       }
+               channel->UpdateBytesReceived(received_size);
 
-                       // Throw the received packet to channel->processPacket()
+               // Throw the received packet to channel->processPacket()
 
-                       // Make a new SharedBuffer from the data without the base headers
-                       SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
-                       memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
-                               strippeddata.getSize());
+               // Make a new SharedBuffer from the data without the base headers
+               SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
+               memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
+                       strippeddata.getSize());
 
-                       try {
-                               // Process it (the result is some data with no headers made by us)
-                               SharedBuffer<u8> resultdata = processPacket
-                                       (channel, strippeddata, peer_id, channelnum, false);
+               try {
+                       // Process it (the result is some data with no headers made by us)
+                       SharedBuffer<u8> resultdata = processPacket
+                               (channel, strippeddata, peer_id, channelnum, false);
 
-                               LOG(dout_con << m_connection->getDesc()
-                                       << " ProcessPacket from peer_id: " << peer_id
-                                       << ", channel: " << (u32)channelnum << ", returned "
-                                       << resultdata.getSize() << " bytes" << std::endl);
+                       LOG(dout_con << m_connection->getDesc()
+                               << " ProcessPacket from peer_id: " << peer_id
+                               << ", channel: " << (u32)channelnum << ", returned "
+                               << resultdata.getSize() << " bytes" << std::endl);
 
-                               ConnectionEvent e;
-                               e.dataReceived(peer_id, resultdata);
-                               m_connection->putEvent(e);
-                       }
-                       catch (ProcessedSilentlyException &e) {
-                       }
-                       catch (ProcessedQueued &e) {
-                               packet_queued = true;
-                       }
-               }
-               catch (InvalidIncomingDataException &e) {
+                       ConnectionEvent e;
+                       e.dataReceived(peer_id, resultdata);
+                       m_connection->putEvent(e);
                }
                catch (ProcessedSilentlyException &e) {
                }
+               catch (ProcessedQueued &e) {
+                       // we set it to true anyway (see below)
+               }
+
+               /* Every time we receive a packet it can happen that a previously
+                * buffered packet is now ready to process. */
+               packet_queued = true;
+       }
+       catch (InvalidIncomingDataException &e) {
        }
 }
 
@@ -1189,7 +1182,8 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *chan
                                m_connection->TriggerSend();
                } catch (NotFoundException &e) {
                        LOG(derr_con << m_connection->getDesc()
-                               << "WARNING: ACKed packet not in outgoing queue" << std::endl);
+                               << "WARNING: ACKed packet not in outgoing queue"
+                               << " seqnum=" << seqnum << std::endl);
                        channel->UpdatePacketTooLateCounter();
                }
 
index da4ea92f513e3ef57e8081e78f1bca8dd97a8063..612407c3b5cf9190c6e665bbdb62736dc436ee99 100644 (file)
@@ -101,7 +101,7 @@ public:
        }
 
 private:
-       void receive();
+       void receive(SharedBuffer<u8> &packetdata, bool &packet_queued);
 
        // Returns next data from a buffer if possible
        // If found, returns true; if not, false.