From 8ef239b448c52485cf94d334c1d8b1c6de37d976 Mon Sep 17 00:00:00 2001 From: sfan5 Date: Mon, 20 Apr 2020 23:22:00 +0200 Subject: [PATCH] Improve protocol-level receiving code (#9617) --- src/network/connection.cpp | 4 +- src/network/connectionthreads.cpp | 250 +++++++++++++++--------------- src/network/connectionthreads.h | 2 +- 3 files changed, 126 insertions(+), 130 deletions(-) diff --git a/src/network/connection.cpp b/src/network/connection.cpp index 15eda7725..3692e45a9 100644 --- a/src/network/connection.cpp +++ b/src/network/connection.cpp @@ -1173,7 +1173,9 @@ Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout, m_bc_peerhandler(peerhandler) { - m_udpSocket.setTimeoutMs(5); + /* Amount of time Receive() will wait for data, this is entirely different + * from the connection timeout */ + m_udpSocket.setTimeoutMs(500); m_sendThread->setParent(this); m_receiveThread->setParent(this); diff --git a/src/network/connectionthreads.cpp b/src/network/connectionthreads.cpp index 13d82e06d..1f33d2ded 100644 --- a/src/network/connectionthreads.cpp +++ b/src/network/connectionthreads.cpp @@ -812,6 +812,14 @@ void *ConnectionReceiveThread::run() ThreadIdentifier); PROFILE(ThreadIdentifier << "ConnectionReceive: [" << m_connection->getDesc() << "]"); + // use IPv6 minimum allowed MTU as receive buffer size as this is + // theoretical reliable upper boundary of a udp packet for all IPv6 enabled + // infrastructure + const unsigned int packet_maxsize = 1500; + SharedBuffer packetdata(packet_maxsize); + + bool packet_queued = true; + #ifdef DEBUG_CONNECTION_KBPS u64 curtime = porting::getTimeMs(); u64 lasttime = curtime; @@ -830,7 +838,7 @@ void *ConnectionReceiveThread::run() #endif /* receive packets */ - receive(); + receive(packetdata, packet_queued); #ifdef DEBUG_CONNECTION_KBPS debug_print_timer += dtime; @@ -892,157 +900,142 @@ void *ConnectionReceiveThread::run() } // Receive packets from the network and buffers and create ConnectionEvents -void ConnectionReceiveThread::receive() +void ConnectionReceiveThread::receive(SharedBuffer &packetdata, + bool &packet_queued) { - // use IPv6 minimum allowed MTU as receive buffer size as this is - // theoretical reliable upper boundary of a udp packet for all IPv6 enabled - // infrastructure - unsigned int packet_maxsize = 1500; - SharedBuffer packetdata(packet_maxsize); - - bool packet_queued = true; - - unsigned int loop_count = 0; - - /* first of all read packets from socket */ - /* check for incoming data available */ - while ((loop_count < 10) && - (m_connection->m_udpSocket.WaitData(50))) { - loop_count++; - try { - if (packet_queued) { - bool data_left = true; - session_t peer_id; - SharedBuffer resultdata; - while (data_left) { - try { - data_left = getFromBuffers(peer_id, resultdata); - if (data_left) { - ConnectionEvent e; - e.dataReceived(peer_id, resultdata); - m_connection->putEvent(e); - } - } - catch (ProcessedSilentlyException &e) { - /* try reading again */ + try { + // First, see if there any buffered packets we can process now + if (packet_queued) { + bool data_left = true; + session_t peer_id; + SharedBuffer resultdata; + while (data_left) { + try { + data_left = getFromBuffers(peer_id, resultdata); + if (data_left) { + ConnectionEvent e; + e.dataReceived(peer_id, resultdata); + m_connection->putEvent(e); } } - packet_queued = false; - } - - Address sender; - s32 received_size = m_connection->m_udpSocket.Receive(sender, *packetdata, - packet_maxsize); - - if ((received_size < BASE_HEADER_SIZE) || - (readU32(&packetdata[0]) != m_connection->GetProtocolID())) { - LOG(derr_con << m_connection->getDesc() - << "Receive(): Invalid incoming packet, " - << "size: " << received_size - << ", protocol: " - << ((received_size >= 4) ? readU32(&packetdata[0]) : -1) - << std::endl); - continue; + catch (ProcessedSilentlyException &e) { + /* try reading again */ + } } + packet_queued = false; + } - session_t peer_id = readPeerId(*packetdata); - u8 channelnum = readChannel(*packetdata); + // Call Receive() to wait for incoming data + Address sender; + s32 received_size = m_connection->m_udpSocket.Receive(sender, + *packetdata, packetdata.getSize()); + if (received_size < 0) + return; - if (channelnum > CHANNEL_COUNT - 1) { - LOG(derr_con << m_connection->getDesc() - << "Receive(): Invalid channel " << (u32)channelnum << std::endl); - throw InvalidIncomingDataException("Channel doesn't exist"); - } + if ((received_size < BASE_HEADER_SIZE) || + (readU32(&packetdata[0]) != m_connection->GetProtocolID())) { + LOG(derr_con << m_connection->getDesc() + << "Receive(): Invalid incoming packet, " + << "size: " << received_size + << ", protocol: " + << ((received_size >= 4) ? readU32(&packetdata[0]) : -1) + << std::endl); + return; + } - /* Try to identify peer by sender address (may happen on join) */ - if (peer_id == PEER_ID_INEXISTENT) { - peer_id = m_connection->lookupPeer(sender); - // We do not have to remind the peer of its - // peer id as the CONTROLTYPE_SET_PEER_ID - // command was sent reliably. - } + session_t peer_id = readPeerId(*packetdata); + u8 channelnum = readChannel(*packetdata); - /* The peer was not found in our lists. Add it. */ - if (peer_id == PEER_ID_INEXISTENT) { - peer_id = m_connection->createPeer(sender, MTP_MINETEST_RELIABLE_UDP, 0); - } + if (channelnum > CHANNEL_COUNT - 1) { + LOG(derr_con << m_connection->getDesc() + << "Receive(): Invalid channel " << (u32)channelnum << std::endl); + return; + } - PeerHelper peer = m_connection->getPeerNoEx(peer_id); + /* Try to identify peer by sender address (may happen on join) */ + if (peer_id == PEER_ID_INEXISTENT) { + peer_id = m_connection->lookupPeer(sender); + // We do not have to remind the peer of its + // peer id as the CONTROLTYPE_SET_PEER_ID + // command was sent reliably. + } - if (!peer) { - LOG(dout_con << m_connection->getDesc() - << " got packet from unknown peer_id: " - << peer_id << " Ignoring." << std::endl); - continue; - } + /* The peer was not found in our lists. Add it. */ + if (peer_id == PEER_ID_INEXISTENT) { + peer_id = m_connection->createPeer(sender, MTP_MINETEST_RELIABLE_UDP, 0); + } - // Validate peer address + PeerHelper peer = m_connection->getPeerNoEx(peer_id); + if (!peer) { + LOG(dout_con << m_connection->getDesc() + << " got packet from unknown peer_id: " + << peer_id << " Ignoring." << std::endl); + return; + } - Address peer_address; + // Validate peer address - if (peer->getAddress(MTP_UDP, peer_address)) { - if (peer_address != sender) { - LOG(derr_con << m_connection->getDesc() - << m_connection->getDesc() - << " Peer " << peer_id << " sending from different address." - " Ignoring." << std::endl); - continue; - } - } else { - - bool invalid_address = true; - if (invalid_address) { - LOG(derr_con << m_connection->getDesc() - << m_connection->getDesc() - << " Peer " << peer_id << " unknown." - " Ignoring." << std::endl); - continue; - } + Address peer_address; + if (peer->getAddress(MTP_UDP, peer_address)) { + if (peer_address != sender) { + LOG(derr_con << m_connection->getDesc() + << " Peer " << peer_id << " sending from different address." + " Ignoring." << std::endl); + return; } + } else { + LOG(derr_con << m_connection->getDesc() + << " Peer " << peer_id << " doesn't have an address?!" + " Ignoring." << std::endl); + return; + } - peer->ResetTimeout(); - - Channel *channel = 0; + peer->ResetTimeout(); - if (dynamic_cast(&peer) != 0) { - channel = &(dynamic_cast(&peer)->channels[channelnum]); - } + Channel *channel = nullptr; + if (dynamic_cast(&peer)) { + channel = &dynamic_cast(&peer)->channels[channelnum]; + } else { + LOG(derr_con << m_connection->getDesc() + << "Receive(): peer_id=" << peer_id << " isn't an UDPPeer?!" + " Ignoring." << std::endl); + return; + } - if (channel != 0) { - channel->UpdateBytesReceived(received_size); - } + channel->UpdateBytesReceived(received_size); - // Throw the received packet to channel->processPacket() + // Throw the received packet to channel->processPacket() - // Make a new SharedBuffer from the data without the base headers - SharedBuffer strippeddata(received_size - BASE_HEADER_SIZE); - memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE], - strippeddata.getSize()); + // Make a new SharedBuffer from the data without the base headers + SharedBuffer strippeddata(received_size - BASE_HEADER_SIZE); + memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE], + strippeddata.getSize()); - try { - // Process it (the result is some data with no headers made by us) - SharedBuffer resultdata = processPacket - (channel, strippeddata, peer_id, channelnum, false); + try { + // Process it (the result is some data with no headers made by us) + SharedBuffer resultdata = processPacket + (channel, strippeddata, peer_id, channelnum, false); - LOG(dout_con << m_connection->getDesc() - << " ProcessPacket from peer_id: " << peer_id - << ", channel: " << (u32)channelnum << ", returned " - << resultdata.getSize() << " bytes" << std::endl); + LOG(dout_con << m_connection->getDesc() + << " ProcessPacket from peer_id: " << peer_id + << ", channel: " << (u32)channelnum << ", returned " + << resultdata.getSize() << " bytes" << std::endl); - ConnectionEvent e; - e.dataReceived(peer_id, resultdata); - m_connection->putEvent(e); - } - catch (ProcessedSilentlyException &e) { - } - catch (ProcessedQueued &e) { - packet_queued = true; - } - } - catch (InvalidIncomingDataException &e) { + ConnectionEvent e; + e.dataReceived(peer_id, resultdata); + m_connection->putEvent(e); } catch (ProcessedSilentlyException &e) { } + catch (ProcessedQueued &e) { + // we set it to true anyway (see below) + } + + /* Every time we receive a packet it can happen that a previously + * buffered packet is now ready to process. */ + packet_queued = true; + } + catch (InvalidIncomingDataException &e) { } } @@ -1189,7 +1182,8 @@ SharedBuffer ConnectionReceiveThread::handlePacketType_Control(Channel *chan m_connection->TriggerSend(); } catch (NotFoundException &e) { LOG(derr_con << m_connection->getDesc() - << "WARNING: ACKed packet not in outgoing queue" << std::endl); + << "WARNING: ACKed packet not in outgoing queue" + << " seqnum=" << seqnum << std::endl); channel->UpdatePacketTooLateCounter(); } diff --git a/src/network/connectionthreads.h b/src/network/connectionthreads.h index da4ea92f5..612407c3b 100644 --- a/src/network/connectionthreads.h +++ b/src/network/connectionthreads.h @@ -101,7 +101,7 @@ public: } private: - void receive(); + void receive(SharedBuffer &packetdata, bool &packet_queued); // Returns next data from a buffer if possible // If found, returns true; if not, false. -- 2.25.1