X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fnetwork%2Fconnection.cpp;h=d1ab948db8db5bc0426aa999341908f99da34d50;hb=c772e0e18c90e48f237e37a45230a1dacd945786;hp=69deb4dd5fa1f9e04d86200261f9f54685eec80d;hpb=083c8c734e8af037d8ad3a1a35475a5d4756572c;p=oweals%2Fminetest.git diff --git a/src/network/connection.cpp b/src/network/connection.cpp index 69deb4dd5..d1ab948db 100644 --- a/src/network/connection.cpp +++ b/src/network/connection.cpp @@ -20,10 +20,10 @@ with this program; if not, write to the Free Software Foundation, Inc., #include #include #include "connection.h" -#include "main.h" #include "serialization.h" #include "log.h" #include "porting.h" +#include "network/networkpacket.h" #include "util/serialize.h" #include "util/numeric.h" #include "util/string.h" @@ -42,10 +42,10 @@ namespace con #undef DEBUG_CONNECTION_KBPS #else /* this mutex is used to achieve log message consistency */ -JMutex log_message_mutex; +std::mutex log_message_mutex; #define LOG(a) \ { \ - JMutexAutoLock loglock(log_message_mutex); \ + MutexAutoLock loglock(log_message_mutex); \ a; \ } #define PROFILE(a) a @@ -54,23 +54,19 @@ JMutex log_message_mutex; #endif -static inline float CALC_DTIME(unsigned int lasttime, unsigned int curtime) { +static inline float CALC_DTIME(u64 lasttime, u64 curtime) +{ float value = ( curtime - lasttime) / 1000.0; return MYMAX(MYMIN(value,0.1),0.0); } -/* maximum window size to use, 0xFFFF is theoretical maximum don't think about - * touching it, the less you're away from it the more likely data corruption - * will occur - */ -#define MAX_RELIABLE_WINDOW_SIZE 0x8000 - /* starting value for window size */ -#define MIN_RELIABLE_WINDOW_SIZE 0x40 - #define MAX_UDP_PEERS 65535 #define PING_TIMEOUT 5.0 +/* maximum number of retries for reliable packets */ +#define MAX_RELIABLE_RETRY 5 + static u16 readPeerId(u8 *packetdata) { return readU16(&packetdata[4]); @@ -205,11 +201,9 @@ SharedBuffer makeReliablePacket( ReliablePacketBuffer */ -ReliablePacketBuffer::ReliablePacketBuffer(): m_list_size(0) {} - void ReliablePacketBuffer::print() { - JMutexAutoLock listlock(m_list_mutex); + MutexAutoLock listlock(m_list_mutex); LOG(dout_con<<"Dump of ReliablePacketBuffer:" << std::endl); unsigned int index = 0; for(std::list::iterator i = m_list.begin(); @@ -223,7 +217,7 @@ void ReliablePacketBuffer::print() } bool ReliablePacketBuffer::empty() { - JMutexAutoLock listlock(m_list_mutex); + MutexAutoLock listlock(m_list_mutex); return m_list.empty(); } @@ -256,7 +250,7 @@ RPBSearchResult ReliablePacketBuffer::notFound() } bool ReliablePacketBuffer::getFirstSeqnum(u16& result) { - JMutexAutoLock listlock(m_list_mutex); + MutexAutoLock listlock(m_list_mutex); if (m_list.empty()) return false; BufferedPacket p = *m_list.begin(); @@ -266,7 +260,7 @@ bool ReliablePacketBuffer::getFirstSeqnum(u16& result) BufferedPacket ReliablePacketBuffer::popFirst() { - JMutexAutoLock listlock(m_list_mutex); + MutexAutoLock listlock(m_list_mutex); if (m_list.empty()) throw NotFoundException("Buffer is empty"); BufferedPacket p = *m_list.begin(); @@ -283,7 +277,7 @@ BufferedPacket ReliablePacketBuffer::popFirst() } BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum) { - JMutexAutoLock listlock(m_list_mutex); + MutexAutoLock listlock(m_list_mutex); RPBSearchResult r = findPacket(seqnum); if (r == notFound()) { LOG(dout_con<<"Sequence number: " << seqnum @@ -294,7 +288,7 @@ BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum) RPBSearchResult next = r; - next++; + ++next; if (next != notFound()) { u16 s = readU16(&(next->data[BASE_HEADER_SIZE+1])); m_oldest_non_answered_ack = s; @@ -311,14 +305,30 @@ BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum) } void ReliablePacketBuffer::insert(BufferedPacket &p,u16 next_expected) { - JMutexAutoLock listlock(m_list_mutex); - FATAL_ERROR_IF(p.data.getSize() < BASE_HEADER_SIZE+3, "Invalid data size"); - u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]); - sanity_check(type == TYPE_RELIABLE); - u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]); + MutexAutoLock listlock(m_list_mutex); + if (p.data.getSize() < BASE_HEADER_SIZE + 3) { + errorstream << "ReliablePacketBuffer::insert(): Invalid data size for " + "reliable packet" << std::endl; + return; + } + u8 type = readU8(&p.data[BASE_HEADER_SIZE + 0]); + if (type != TYPE_RELIABLE) { + errorstream << "ReliablePacketBuffer::insert(): type is not reliable" + << std::endl; + return; + } + u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE + 1]); - sanity_check(seqnum_in_window(seqnum, next_expected, MAX_RELIABLE_WINDOW_SIZE)); - sanity_check(seqnum != next_expected); + if (!seqnum_in_window(seqnum, next_expected, MAX_RELIABLE_WINDOW_SIZE)) { + errorstream << "ReliablePacketBuffer::insert(): seqnum is outside of " + "expected window " << std::endl; + return; + } + if (seqnum == next_expected) { + errorstream << "ReliablePacketBuffer::insert(): seqnum is next expected" + << std::endl; + return; + } ++m_list_size; sanity_check(m_list_size <= SEQNUM_MAX+1); // FIXME: Handle the error? @@ -342,7 +352,7 @@ void ReliablePacketBuffer::insert(BufferedPacket &p,u16 next_expected) /* this is true e.g. on wrap around */ if (seqnum < next_expected) { while(((s < seqnum) || (s >= next_expected)) && (i != m_list.end())) { - i++; + ++i; if (i != m_list.end()) s = readU16(&(i->data[BASE_HEADER_SIZE+1])); } @@ -351,7 +361,7 @@ void ReliablePacketBuffer::insert(BufferedPacket &p,u16 next_expected) else { while(((s < seqnum) && (s >= next_expected)) && (i != m_list.end())) { - i++; + ++i; if (i != m_list.end()) s = readU16(&(i->data[BASE_HEADER_SIZE+1])); } @@ -377,10 +387,6 @@ void ReliablePacketBuffer::insert(BufferedPacket &p,u16 next_expected) throw IncomingDataCorruption("duplicated packet isn't same as original one"); } - sanity_check(readU16(&(i->data[BASE_HEADER_SIZE+1])) == seqnum); - sanity_check(i->data.getSize() == p.data.getSize()); - sanity_check(i->address == p.address); - /* nothing to do this seems to be a resent packet */ /* for paranoia reason data should be compared */ --m_list_size; @@ -399,7 +405,7 @@ void ReliablePacketBuffer::insert(BufferedPacket &p,u16 next_expected) void ReliablePacketBuffer::incrementTimeouts(float dtime) { - JMutexAutoLock listlock(m_list_mutex); + MutexAutoLock listlock(m_list_mutex); for(std::list::iterator i = m_list.begin(); i != m_list.end(); ++i) { @@ -411,7 +417,7 @@ void ReliablePacketBuffer::incrementTimeouts(float dtime) std::list ReliablePacketBuffer::getTimedOuts(float timeout, unsigned int max_packets) { - JMutexAutoLock listlock(m_list_mutex); + MutexAutoLock listlock(m_list_mutex); std::list timed_outs; for(std::list::iterator i = m_list.begin(); i != m_list.end(); ++i) @@ -434,7 +440,7 @@ std::list ReliablePacketBuffer::getTimedOuts(float timeout, IncomingSplitBuffer::~IncomingSplitBuffer() { - JMutexAutoLock listlock(m_map_mutex); + MutexAutoLock listlock(m_map_mutex); for(std::map::iterator i = m_buf.begin(); i != m_buf.end(); ++i) { @@ -447,15 +453,23 @@ IncomingSplitBuffer::~IncomingSplitBuffer() */ SharedBuffer IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable) { - JMutexAutoLock listlock(m_map_mutex); + MutexAutoLock listlock(m_map_mutex); u32 headersize = BASE_HEADER_SIZE + 7; - FATAL_ERROR_IF(p.data.getSize() < headersize, "Invalid data size"); + if (p.data.getSize() < headersize) { + errorstream << "Invalid data size for split packet" << std::endl; + return SharedBuffer(); + } u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]); - sanity_check(type == TYPE_SPLIT); u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]); u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]); u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]); + if (type != TYPE_SPLIT) { + errorstream << "IncomingSplitBuffer::insert(): type is not split" + << std::endl; + return SharedBuffer(); + } + // Add if doesn't exist if (m_buf.find(seqnum) == m_buf.end()) { @@ -526,7 +540,7 @@ void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout) { std::list remove_queue; { - JMutexAutoLock listlock(m_map_mutex); + MutexAutoLock listlock(m_map_mutex); for(std::map::iterator i = m_buf.begin(); i != m_buf.end(); ++i) { @@ -542,7 +556,7 @@ void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout) for(std::list::iterator j = remove_queue.begin(); j != remove_queue.end(); ++j) { - JMutexAutoLock listlock(m_map_mutex); + MutexAutoLock listlock(m_map_mutex); LOG(dout_con<<"NOTE: Removing timed out unreliable split packet"< window_size) { - successfull = false; + successful = false; return 0; } } @@ -638,7 +622,7 @@ u16 Channel::getOutgoingSequenceNumber(bool& successfull) // but we already made sure it won't happen in this case if ((next_outgoing_seqnum + (u16)(SEQNUM_MAX - lowest_unacked_seqnumber)) > window_size) { - successfull = false; + successful = false; return 0; } } @@ -650,7 +634,7 @@ u16 Channel::getOutgoingSequenceNumber(bool& successfull) u16 Channel::readOutgoingSequenceNumber() { - JMutexAutoLock internal(m_internal_mutex); + MutexAutoLock internal(m_internal_mutex); return next_outgoing_seqnum; } @@ -666,32 +650,32 @@ bool Channel::putBackSequenceNumber(u16 seqnum) void Channel::UpdateBytesSent(unsigned int bytes, unsigned int packets) { - JMutexAutoLock internal(m_internal_mutex); + MutexAutoLock internal(m_internal_mutex); current_bytes_transfered += bytes; current_packet_successfull += packets; } void Channel::UpdateBytesReceived(unsigned int bytes) { - JMutexAutoLock internal(m_internal_mutex); + MutexAutoLock internal(m_internal_mutex); current_bytes_received += bytes; } void Channel::UpdateBytesLost(unsigned int bytes) { - JMutexAutoLock internal(m_internal_mutex); + MutexAutoLock internal(m_internal_mutex); current_bytes_lost += bytes; } void Channel::UpdatePacketLossCounter(unsigned int count) { - JMutexAutoLock internal(m_internal_mutex); + MutexAutoLock internal(m_internal_mutex); current_packet_loss += count; } void Channel::UpdatePacketTooLateCounter() { - JMutexAutoLock internal(m_internal_mutex); + MutexAutoLock internal(m_internal_mutex); current_packet_too_late++; } @@ -711,7 +695,7 @@ void Channel::UpdateTimers(float dtime,bool legacy_peer) bool reasonable_amount_of_data_transmitted = false; { - JMutexAutoLock internal(m_internal_mutex); + MutexAutoLock internal(m_internal_mutex); packet_loss = current_packet_loss; //packet_too_late = current_packet_too_late; packets_successfull = current_packet_successfull; @@ -782,7 +766,7 @@ void Channel::UpdateTimers(float dtime,bool legacy_peer) if (bpm_counter > 10.0) { { - JMutexAutoLock internal(m_internal_mutex); + MutexAutoLock internal(m_internal_mutex); cur_kbps = (((float) current_bytes_transfered)/bpm_counter)/1024.0; current_bytes_transfered = 0; @@ -825,40 +809,26 @@ void Channel::UpdateTimers(float dtime,bool legacy_peer) Peer */ -PeerHelper::PeerHelper() : - m_peer(0) -{} - PeerHelper::PeerHelper(Peer* peer) : m_peer(peer) { - if (peer != NULL) - { - if (!peer->IncUseCount()) - { - m_peer = 0; - } - } + if (peer && !peer->IncUseCount()) + m_peer = nullptr; } PeerHelper::~PeerHelper() { - if (m_peer != 0) + if (m_peer) m_peer->DecUseCount(); - m_peer = 0; + m_peer = nullptr; } PeerHelper& PeerHelper::operator=(Peer* peer) { m_peer = peer; - if (peer != NULL) - { - if (!peer->IncUseCount()) - { - m_peer = 0; - } - } + if (peer && !peer->IncUseCount()) + m_peer = nullptr; return *this; } @@ -883,10 +853,9 @@ bool PeerHelper::operator!=(void* ptr) bool Peer::IncUseCount() { - JMutexAutoLock lock(m_exclusive_access_mutex); + MutexAutoLock lock(m_exclusive_access_mutex); - if (!m_pending_deletion) - { + if (!m_pending_deletion) { this->m_usage++; return true; } @@ -897,7 +866,7 @@ bool Peer::IncUseCount() void Peer::DecUseCount() { { - JMutexAutoLock lock(m_exclusive_access_mutex); + MutexAutoLock lock(m_exclusive_access_mutex); sanity_check(m_usage > 0); m_usage--; @@ -907,7 +876,7 @@ void Peer::DecUseCount() delete this; } -void Peer::RTTStatistics(float rtt, std::string profiler_id, +void Peer::RTTStatistics(float rtt, const std::string &profiler_id, unsigned int num_samples) { if (m_last_rtt > 0) { @@ -946,8 +915,7 @@ void Peer::RTTStatistics(float rtt, std::string profiler_id, m_rtt.jitter_avg = m_rtt.jitter_avg * (num_samples/(num_samples-1)) + jitter * (1/num_samples); - if (profiler_id != "") - { + if (profiler_id != "") { g_profiler->graphAdd(profiler_id + "_rtt", rtt); g_profiler->graphAdd(profiler_id + "_jitter", jitter); } @@ -958,8 +926,8 @@ void Peer::RTTStatistics(float rtt, std::string profiler_id, bool Peer::isTimedOut(float timeout) { - JMutexAutoLock lock(m_exclusive_access_mutex); - u32 current_time = porting::getTimeMs(); + MutexAutoLock lock(m_exclusive_access_mutex); + u64 current_time = porting::getTimeMs(); float dtime = CALC_DTIME(m_last_timeout_check,current_time); m_last_timeout_check = current_time; @@ -972,7 +940,7 @@ bool Peer::isTimedOut(float timeout) void Peer::Drop() { { - JMutexAutoLock usage_lock(m_exclusive_access_mutex); + MutexAutoLock usage_lock(m_exclusive_access_mutex); m_pending_deletion = true; if (m_usage != 0) return; @@ -991,10 +959,7 @@ void Peer::Drop() } UDPPeer::UDPPeer(u16 a_id, Address a_address, Connection* connection) : - Peer(a_address,a_id,connection), - m_pending_disconnect(false), - resend_timeout(0.5), - m_legacy_peer(true) + Peer(a_address,a_id,connection) { } @@ -1031,7 +996,7 @@ void UDPPeer::reportRTT(float rtt) if (timeout > RESEND_TIMEOUT_MAX) timeout = RESEND_TIMEOUT_MAX; - JMutexAutoLock usage_lock(m_exclusive_access_mutex); + MutexAutoLock usage_lock(m_exclusive_access_mutex); resend_timeout = timeout; } @@ -1063,14 +1028,14 @@ void UDPPeer::PutReliableSendCommand(ConnectionCommand &c, <<" processing reliable command for peer id: " << c.peer_id <<" data size: " << c.data.getSize() << std::endl); if (!processReliableSendCommand(c,max_packet_size)) { - channels[c.channelnum].queued_commands.push(c); + channels[c.channelnum].queued_commands.push_back(c); } } else { LOG(dout_con<getDesc() <<" Queueing reliable command for peer id: " << c.peer_id <<" data size: " << c.data.getSize() < 0) && (channels[i].queued_reliables.size() < maxtransfer) && - (commands_processed < maxcommands)) - { + (commands_processed < maxcommands)) { try { ConnectionCommand c = channels[i].queued_commands.front(); - channels[i].queued_commands.pop(); - LOG(dout_con<getDesc() - <<" processing queued reliable command "<getDesc() + + LOG(dout_con << m_connection->getDesc() + << " processing queued reliable command " << std::endl); + + // Packet is processed, remove it from queue + if (processReliableSendCommand(c,max_packet_size)) { + channels[i].queued_commands.pop_front(); + } else { + LOG(dout_con << m_connection->getDesc() << " Failed to queue packets for peer_id: " << c.peer_id << ", delaying sending of " << c.data.getSize() << " bytes" << std::endl); - channels[i].queued_commands.push(c); } } catch (ItemNotFoundException &e) { @@ -1213,7 +1179,7 @@ void UDPPeer::RunCommandQueues( u16 UDPPeer::getNextSplitSequenceNumber(u8 channel) { assert(channel < CHANNEL_COUNT); // Pre-condition - return channels[channel].readNextIncomingSeqNum(); + return channels[channel].readNextSplitSeqNum(); } void UDPPeer::setNextSplitSequenceNumber(u8 channel, u16 seqnum) @@ -1234,47 +1200,41 @@ SharedBuffer UDPPeer::addSpiltPacket(u8 channel, /* Connection Threads */ /******************************************************************************/ -ConnectionSendThread::ConnectionSendThread( unsigned int max_packet_size, - float timeout) : - m_connection(NULL), +ConnectionSendThread::ConnectionSendThread(unsigned int max_packet_size, + float timeout) : + Thread("ConnectionSend"), m_max_packet_size(max_packet_size), m_timeout(timeout), - m_max_commands_per_iteration(1), - m_max_data_packets_per_iteration(g_settings->getU16("max_packets_per_iteration")), - m_max_packets_requeued(256) + m_max_data_packets_per_iteration(g_settings->getU16("max_packets_per_iteration")) { } -void * ConnectionSendThread::Thread() +void * ConnectionSendThread::run() { - assert(m_connection != NULL); - ThreadStarted(); - log_register_thread("ConnectionSend"); + assert(m_connection); LOG(dout_con<getDesc() <<"ConnectionSend thread started"<getDesc() << "]"); - porting::setThreadName("ConnectionSend"); - /* if stop is requested don't stop immediately but try to send all */ /* packets first */ - while(!StopRequested() || packetsQueued()) { + while(!stopRequested() || packetsQueued()) { BEGIN_DEBUG_EXCEPTION_HANDLER PROFILE(ScopeProfiler sp(g_profiler, ThreadIdentifier.str(), SPT_AVG)); m_iteration_packets_avaialble = m_max_data_packets_per_iteration; /* wait for trigger or timeout */ - m_send_sleep_semaphore.Wait(50); + m_send_sleep_semaphore.wait(50); /* remove all triggers */ - while(m_send_sleep_semaphore.Wait(0)) {} + while(m_send_sleep_semaphore.wait(0)) {} lasttime = curtime; curtime = porting::getTimeMs(); @@ -1298,7 +1258,7 @@ void * ConnectionSendThread::Thread() /* send non reliable packets */ sendPackets(dtime); - END_DEBUG_EXCEPTION_HANDLER(errorstream); + END_DEBUG_EXCEPTION_HANDLER } PROFILE(g_profiler->remove(ThreadIdentifier.str())); @@ -1307,7 +1267,7 @@ void * ConnectionSendThread::Thread() void ConnectionSendThread::Trigger() { - m_send_sleep_semaphore.Post(); + m_send_sleep_semaphore.post(); } bool ConnectionSendThread::packetsQueued() @@ -1328,12 +1288,10 @@ bool ConnectionSendThread::packetsQueued() if (dynamic_cast(&peer) == 0) continue; - for(u16 i=0; i(&peer))->channels[i]; - if (channel->queued_commands.size() > 0) - { + if (channel->queued_commands.size() > 0) { return true; } } @@ -1383,6 +1341,7 @@ void ConnectionSendThread::runTimeouts(float dtime) } float resend_timeout = dynamic_cast(&peer)->getResendTimeout(); + bool retry_count_exceeded = false; for(u16 i=0; i timed_outs; @@ -1422,6 +1381,13 @@ void ConnectionSendThread::runTimeouts(float dtime) channel->UpdateBytesLost(k->data.getSize()); k->resend_count++; + if (k-> resend_count > MAX_RELIABLE_RETRY) { + retry_count_exceeded = true; + timeouted_peers.push_back(peer->id); + /* no need to check additional packets if a single one did timeout*/ + break; + } + LOG(derr_con<getDesc() <<"RE-SENDING timed-out RELIABLE to " << k->address.serializeString() @@ -1436,9 +1402,18 @@ void ConnectionSendThread::runTimeouts(float dtime) // do not handle rtt here as we can't decide if this packet was // lost or really takes more time to transmit } + + if (retry_count_exceeded) { + break; /* no need to check other channels if we already did timeout */ + } + channel->UpdateTimers(dtime,dynamic_cast(&peer)->getLegacyPeer()); } + /* skip to next peer if we did timeout */ + if (retry_count_exceeded) + continue; + /* send ping if necessary */ if (dynamic_cast(&peer)->Ping(dtime,data)) { LOG(dout_con<getDesc() @@ -1744,7 +1719,7 @@ void ConnectionSendThread::disconnect() for (std::list::iterator i = peerids.begin(); i != peerids.end(); - i++) + ++i) { sendAsPacket(*i, 0,data,false); } @@ -1824,7 +1799,7 @@ void ConnectionSendThread::sendToAll(u8 channelnum, SharedBuffer data) for (std::list::iterator i = peerids.begin(); i != peerids.end(); - i++) + ++i) { send(*i, channelnum, data); } @@ -1836,7 +1811,7 @@ void ConnectionSendThread::sendToAllReliable(ConnectionCommand &c) for (std::list::iterator i = peerids.begin(); i != peerids.end(); - i++) + ++i) { PeerHelper peer = m_connection->getPeerNoEx(*i); @@ -1965,7 +1940,7 @@ void ConnectionSendThread::sendPackets(float dtime) } else if ( ( peer->m_increment_packets_remaining > 0) || - (StopRequested())) { + (stopRequested())) { rawSendAsPacket(packet.peer_id, packet.channelnum, packet.data, packet.reliable); peer->m_increment_packets_remaining--; @@ -1995,15 +1970,13 @@ void ConnectionSendThread::sendAsPacket(u16 peer_id, u8 channelnum, } ConnectionReceiveThread::ConnectionReceiveThread(unsigned int max_packet_size) : - m_connection(NULL) + Thread("ConnectionReceive") { } -void * ConnectionReceiveThread::Thread() +void * ConnectionReceiveThread::run() { - assert(m_connection != NULL); - ThreadStarted(); - log_register_thread("ConnectionReceive"); + assert(m_connection); LOG(dout_con<getDesc() <<"ConnectionReceive thread started"<getDesc() << "]"); - porting::setThreadName("ConnectionReceive"); - #ifdef DEBUG_CONNECTION_KBPS - u32 curtime = porting::getTimeMs(); - u32 lasttime = curtime; + u64 curtime = porting::getTimeMs(); + u64 lasttime = curtime; float debug_print_timer = 0.0; #endif - while(!StopRequested()) { + while(!stopRequested()) { BEGIN_DEBUG_EXCEPTION_HANDLER PROFILE(ScopeProfiler sp(g_profiler, ThreadIdentifier.str(), SPT_AVG)); @@ -2084,8 +2055,9 @@ void * ConnectionReceiveThread::Thread() } } #endif - END_DEBUG_EXCEPTION_HANDLER(errorstream); + END_DEBUG_EXCEPTION_HANDLER } + PROFILE(g_profiler->remove(ThreadIdentifier.str())); return NULL; } @@ -2153,12 +2125,12 @@ void ConnectionReceiveThread::receive() throw InvalidIncomingDataException("Channel doesn't exist"); } - /* preserve original peer_id for later usage */ - u16 packet_peer_id = peer_id; - /* Try to identify peer by sender address (may happen on join) */ if (peer_id == PEER_ID_INEXISTENT) { peer_id = m_connection->lookupPeer(sender); + // We do not have to remind the peer of its + // peer id as the CONTROLTYPE_SET_PEER_ID + // command was sent reliably. } /* The peer was not found in our lists. Add it. */ @@ -2200,11 +2172,6 @@ void ConnectionReceiveThread::receive() } } - - /* mark peer as seen with id */ - if (!(packet_peer_id == PEER_ID_INEXISTENT)) - peer->setSentWithID(); - peer->ResetTimeout(); Channel *channel = 0; @@ -2328,8 +2295,9 @@ SharedBuffer ConnectionReceiveThread::processPacket(Channel *channel, u8 type = readU8(&(packetdata[0])); if (MAX_UDP_PEERS <= 65535 && peer_id >= MAX_UDP_PEERS) { - errorstream << "Something is wrong with peer_id" << std::endl; - FATAL_ERROR(""); + std::string errmsg = "Invalid peer_id=" + itos(peer_id); + errorstream << errmsg << std::endl; + throw InvalidIncomingDataException(errmsg.c_str()); } if (type == TYPE_CONTROL) @@ -2341,10 +2309,12 @@ SharedBuffer ConnectionReceiveThread::processPacket(Channel *channel, if (controltype == CONTROLTYPE_ACK) { - FATAL_ERROR_IF(channel == 0, "Invalid channel (0)"); - if (packetdata.getSize() < 4) - throw InvalidIncomingDataException - ("packetdata.getSize() < 4 (ACK header size)"); + assert(channel != NULL); + + if (packetdata.getSize() < 4) { + throw InvalidIncomingDataException( + "packetdata.getSize() < 4 (ACK header size)"); + } u16 seqnum = readU16(&packetdata[2]); LOG(dout_con<getDesc() @@ -2359,7 +2329,7 @@ SharedBuffer ConnectionReceiveThread::processPacket(Channel *channel, // only calculate rtt from straight sent packets if (p.resend_count == 0) { // Get round trip time - unsigned int current_time = porting::getTimeMs(); + u64 current_time = porting::getTimeMs(); // a overflow is quite unlikely but as it'd result in major // rtt miscalculation we handle it here @@ -2509,7 +2479,8 @@ SharedBuffer ConnectionReceiveThread::processPacket(Channel *channel, } else if (type == TYPE_RELIABLE) { - FATAL_ERROR_IF(channel == 0, "Invalid channel (0)"); + assert(channel != NULL); + // Recursive reliable packets not allowed if (reliable) throw InvalidIncomingDataException("Found nested reliable packets"); @@ -2640,44 +2611,13 @@ SharedBuffer ConnectionReceiveThread::processPacket(Channel *channel, Connection */ -Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout, - bool ipv6) : - m_udpSocket(ipv6), - m_command_queue(), - m_event_queue(), - m_peer_id(0), - m_protocol_id(protocol_id), - m_sendThread(max_packet_size, timeout), - m_receiveThread(max_packet_size), - m_info_mutex(), - m_bc_peerhandler(0), - m_bc_receive_timeout(0), - m_shutting_down(false), - m_next_remote_peer_id(2) -{ - m_udpSocket.setTimeoutMs(5); - - m_sendThread.setParent(this); - m_receiveThread.setParent(this); - - m_sendThread.Start(); - m_receiveThread.Start(); -} - Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6, PeerHandler *peerhandler) : m_udpSocket(ipv6), - m_command_queue(), - m_event_queue(), - m_peer_id(0), m_protocol_id(protocol_id), m_sendThread(max_packet_size, timeout), m_receiveThread(max_packet_size), - m_info_mutex(), - m_bc_peerhandler(peerhandler), - m_bc_receive_timeout(0), - m_shutting_down(false), - m_next_remote_peer_id(2) + m_bc_peerhandler(peerhandler) { m_udpSocket.setTimeoutMs(5); @@ -2685,8 +2625,8 @@ Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout, m_sendThread.setParent(this); m_receiveThread.setParent(this); - m_sendThread.Start(); - m_receiveThread.Start(); + m_sendThread.start(); + m_receiveThread.start(); } @@ -2695,8 +2635,8 @@ Connection::~Connection() { m_shutting_down = true; // request threads to stop - m_sendThread.Stop(); - m_receiveThread.Stop(); + m_sendThread.stop(); + m_receiveThread.stop(); //TODO for some unkonwn reason send/receive threads do not exit as they're // supposed to be but wait on peer timeout. To speed up shutdown we reduce @@ -2704,8 +2644,8 @@ Connection::~Connection() m_sendThread.setPeerTimeout(0.5); // wait for threads to finish - m_sendThread.Wait(); - m_receiveThread.Wait(); + m_sendThread.wait(); + m_receiveThread.wait(); // Delete peers for(std::map::iterator @@ -2725,7 +2665,7 @@ void Connection::putEvent(ConnectionEvent &e) PeerHelper Connection::getPeer(u16 peer_id) { - JMutexAutoLock peerlock(m_peers_mutex); + MutexAutoLock peerlock(m_peers_mutex); std::map::iterator node = m_peers.find(peer_id); if (node == m_peers.end()) { @@ -2740,7 +2680,7 @@ PeerHelper Connection::getPeer(u16 peer_id) PeerHelper Connection::getPeerNoEx(u16 peer_id) { - JMutexAutoLock peerlock(m_peers_mutex); + MutexAutoLock peerlock(m_peers_mutex); std::map::iterator node = m_peers.find(peer_id); if (node == m_peers.end()) { @@ -2756,13 +2696,13 @@ PeerHelper Connection::getPeerNoEx(u16 peer_id) /* find peer_id for address */ u16 Connection::lookupPeer(Address& sender) { - JMutexAutoLock peerlock(m_peers_mutex); + MutexAutoLock peerlock(m_peers_mutex); std::map::iterator j; j = m_peers.begin(); for(; j != m_peers.end(); ++j) { Peer *peer = j->second; - if (peer->isActive()) + if (peer->isPendingDeletion()) continue; Address tocheck; @@ -2795,7 +2735,7 @@ bool Connection::deletePeer(u16 peer_id, bool timeout) /* lock list as short as possible */ { - JMutexAutoLock peerlock(m_peers_mutex); + MutexAutoLock peerlock(m_peers_mutex); if (m_peers.find(peer_id) == m_peers.end()) return false; peer = m_peers[peer_id]; @@ -2818,16 +2758,6 @@ bool Connection::deletePeer(u16 peer_id, bool timeout) /* Interface */ -ConnectionEvent Connection::getEvent() -{ - if (m_event_queue.empty()) { - ConnectionEvent e; - e.type = CONNEVENT_NONE; - return e; - } - return m_event_queue.pop_frontNoEx(); -} - ConnectionEvent Connection::waitEvent(u32 timeout_ms) { try { @@ -2863,7 +2793,7 @@ void Connection::Connect(Address address) bool Connection::Connected() { - JMutexAutoLock peerlock(m_peers_mutex); + MutexAutoLock peerlock(m_peers_mutex); if (m_peers.size() != 1) return false; @@ -2885,30 +2815,36 @@ void Connection::Disconnect() putCommand(c); } -u32 Connection::Receive(u16 &peer_id, SharedBuffer &data) +void Connection::Receive(NetworkPacket* pkt) { for(;;) { ConnectionEvent e = waitEvent(m_bc_receive_timeout); if (e.type != CONNEVENT_NONE) - LOG(dout_con<(e.data); - return e.data.getSize(); + // Data size is lesser than command size, ignoring packet + if (e.data.getSize() < 2) { + continue; + } + + pkt->putRawPacket(*e.data, e.data.getSize(), e.peer_id); + return; case CONNEVENT_PEER_ADDED: { UDPPeer tmp(e.peer_id, e.address, this); if (m_bc_peerhandler) m_bc_peerhandler->peerAdded(&tmp); - continue; } + continue; + } case CONNEVENT_PEER_REMOVED: { UDPPeer tmp(e.peer_id, e.address, this); if (m_bc_peerhandler) m_bc_peerhandler->deletingPeer(&tmp, e.timeout); - continue; } + continue; + } case CONNEVENT_BIND_FAILED: throw ConnectionBindFailed("Failed to bind socket " "(port already in use?)"); @@ -2924,7 +2860,7 @@ void Connection::Send(u16 peer_id, u8 channelnum, ConnectionCommand c; - c.send(peer_id, channelnum, pkt->oldForgePacket(), reliable); + c.send(peer_id, channelnum, pkt, reliable); putCommand(c); } @@ -2992,7 +2928,7 @@ u16 Connection::createPeer(Address& sender, MTProtocols protocol, int fd) /* Find an unused peer id */ - JMutexAutoLock lock(m_peers_mutex); + MutexAutoLock lock(m_peers_mutex); bool out_of_ids = false; for(;;) { // Check if exists @@ -3043,9 +2979,9 @@ u16 Connection::createPeer(Address& sender, MTProtocols protocol, int fd) void Connection::PrintInfo(std::ostream &out) { - m_info_mutex.Lock(); + m_info_mutex.lock(); out<id] = peer; m_peer_ids.push_back(peer->id); }