C++11 cleanup inventorymanager (#6077)
[oweals/minetest.git] / src / network / connection.cpp
index 69deb4dd5fa1f9e04d86200261f9f54685eec80d..d1ab948db8db5bc0426aa999341908f99da34d50 100644 (file)
@@ -20,10 +20,10 @@ with this program; if not, write to the Free Software Foundation, Inc.,
 #include <iomanip>
 #include <errno.h>
 #include "connection.h"
-#include "main.h"
 #include "serialization.h"
 #include "log.h"
 #include "porting.h"
+#include "network/networkpacket.h"
 #include "util/serialize.h"
 #include "util/numeric.h"
 #include "util/string.h"
@@ -42,10 +42,10 @@ namespace con
 #undef DEBUG_CONNECTION_KBPS
 #else
 /* this mutex is used to achieve log message consistency */
-JMutex log_message_mutex;
+std::mutex log_message_mutex;
 #define LOG(a)                                                                 \
        {                                                                          \
-       JMutexAutoLock loglock(log_message_mutex);                                 \
+       MutexAutoLock loglock(log_message_mutex);                                 \
        a;                                                                         \
        }
 #define PROFILE(a) a
@@ -54,23 +54,19 @@ JMutex log_message_mutex;
 #endif
 
 
-static inline float CALC_DTIME(unsigned int lasttime, unsigned int curtime) {
+static inline float CALC_DTIME(u64 lasttime, u64 curtime)
+{
        float value = ( curtime - lasttime) / 1000.0;
        return MYMAX(MYMIN(value,0.1),0.0);
 }
 
-/* maximum window size to use, 0xFFFF is theoretical maximum  don't think about
- * touching it, the less you're away from it the more likely data corruption
- * will occur
- */
-#define MAX_RELIABLE_WINDOW_SIZE 0x8000
- /* starting value for window size */
-#define MIN_RELIABLE_WINDOW_SIZE 0x40
-
 #define MAX_UDP_PEERS 65535
 
 #define PING_TIMEOUT 5.0
 
+/* maximum number of retries for reliable packets */
+#define MAX_RELIABLE_RETRY 5
+
 static u16 readPeerId(u8 *packetdata)
 {
        return readU16(&packetdata[4]);
@@ -205,11 +201,9 @@ SharedBuffer<u8> makeReliablePacket(
        ReliablePacketBuffer
 */
 
-ReliablePacketBuffer::ReliablePacketBuffer(): m_list_size(0) {}
-
 void ReliablePacketBuffer::print()
 {
-       JMutexAutoLock listlock(m_list_mutex);
+       MutexAutoLock listlock(m_list_mutex);
        LOG(dout_con<<"Dump of ReliablePacketBuffer:" << std::endl);
        unsigned int index = 0;
        for(std::list<BufferedPacket>::iterator i = m_list.begin();
@@ -223,7 +217,7 @@ void ReliablePacketBuffer::print()
 }
 bool ReliablePacketBuffer::empty()
 {
-       JMutexAutoLock listlock(m_list_mutex);
+       MutexAutoLock listlock(m_list_mutex);
        return m_list.empty();
 }
 
@@ -256,7 +250,7 @@ RPBSearchResult ReliablePacketBuffer::notFound()
 }
 bool ReliablePacketBuffer::getFirstSeqnum(u16& result)
 {
-       JMutexAutoLock listlock(m_list_mutex);
+       MutexAutoLock listlock(m_list_mutex);
        if (m_list.empty())
                return false;
        BufferedPacket p = *m_list.begin();
@@ -266,7 +260,7 @@ bool ReliablePacketBuffer::getFirstSeqnum(u16& result)
 
 BufferedPacket ReliablePacketBuffer::popFirst()
 {
-       JMutexAutoLock listlock(m_list_mutex);
+       MutexAutoLock listlock(m_list_mutex);
        if (m_list.empty())
                throw NotFoundException("Buffer is empty");
        BufferedPacket p = *m_list.begin();
@@ -283,7 +277,7 @@ BufferedPacket ReliablePacketBuffer::popFirst()
 }
 BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
 {
-       JMutexAutoLock listlock(m_list_mutex);
+       MutexAutoLock listlock(m_list_mutex);
        RPBSearchResult r = findPacket(seqnum);
        if (r == notFound()) {
                LOG(dout_con<<"Sequence number: " << seqnum
@@ -294,7 +288,7 @@ BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
 
 
        RPBSearchResult next = r;
-       next++;
+       ++next;
        if (next != notFound()) {
                u16 s = readU16(&(next->data[BASE_HEADER_SIZE+1]));
                m_oldest_non_answered_ack = s;
@@ -311,14 +305,30 @@ BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
 }
 void ReliablePacketBuffer::insert(BufferedPacket &p,u16 next_expected)
 {
-       JMutexAutoLock listlock(m_list_mutex);
-       FATAL_ERROR_IF(p.data.getSize() < BASE_HEADER_SIZE+3, "Invalid data size");
-       u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
-       sanity_check(type == TYPE_RELIABLE);
-       u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
+       MutexAutoLock listlock(m_list_mutex);
+       if (p.data.getSize() < BASE_HEADER_SIZE + 3) {
+               errorstream << "ReliablePacketBuffer::insert(): Invalid data size for "
+                       "reliable packet" << std::endl;
+               return;
+       }
+       u8 type = readU8(&p.data[BASE_HEADER_SIZE + 0]);
+       if (type != TYPE_RELIABLE) {
+               errorstream << "ReliablePacketBuffer::insert(): type is not reliable"
+                       << std::endl;
+               return;
+       }
+       u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE + 1]);
 
-       sanity_check(seqnum_in_window(seqnum, next_expected, MAX_RELIABLE_WINDOW_SIZE));
-       sanity_check(seqnum != next_expected);
+       if (!seqnum_in_window(seqnum, next_expected, MAX_RELIABLE_WINDOW_SIZE)) {
+               errorstream << "ReliablePacketBuffer::insert(): seqnum is outside of "
+                       "expected window " << std::endl;
+               return;
+       }
+       if (seqnum == next_expected) {
+               errorstream << "ReliablePacketBuffer::insert(): seqnum is next expected"
+                       << std::endl;
+               return;
+       }
 
        ++m_list_size;
        sanity_check(m_list_size <= SEQNUM_MAX+1);      // FIXME: Handle the error?
@@ -342,7 +352,7 @@ void ReliablePacketBuffer::insert(BufferedPacket &p,u16 next_expected)
        /* this is true e.g. on wrap around */
        if (seqnum < next_expected) {
                while(((s < seqnum) || (s >= next_expected)) && (i != m_list.end())) {
-                       i++;
+                       ++i;
                        if (i != m_list.end())
                                s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
                }
@@ -351,7 +361,7 @@ void ReliablePacketBuffer::insert(BufferedPacket &p,u16 next_expected)
        else
        {
                while(((s < seqnum) && (s >= next_expected)) && (i != m_list.end())) {
-                       i++;
+                       ++i;
                        if (i != m_list.end())
                                s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
                }
@@ -377,10 +387,6 @@ void ReliablePacketBuffer::insert(BufferedPacket &p,u16 next_expected)
                        throw IncomingDataCorruption("duplicated packet isn't same as original one");
                }
 
-               sanity_check(readU16(&(i->data[BASE_HEADER_SIZE+1])) == seqnum);
-               sanity_check(i->data.getSize() == p.data.getSize());
-               sanity_check(i->address == p.address);
-
                /* nothing to do this seems to be a resent packet */
                /* for paranoia reason data should be compared */
                --m_list_size;
@@ -399,7 +405,7 @@ void ReliablePacketBuffer::insert(BufferedPacket &p,u16 next_expected)
 
 void ReliablePacketBuffer::incrementTimeouts(float dtime)
 {
-       JMutexAutoLock listlock(m_list_mutex);
+       MutexAutoLock listlock(m_list_mutex);
        for(std::list<BufferedPacket>::iterator i = m_list.begin();
                i != m_list.end(); ++i)
        {
@@ -411,7 +417,7 @@ void ReliablePacketBuffer::incrementTimeouts(float dtime)
 std::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout,
                                                                                                        unsigned int max_packets)
 {
-       JMutexAutoLock listlock(m_list_mutex);
+       MutexAutoLock listlock(m_list_mutex);
        std::list<BufferedPacket> timed_outs;
        for(std::list<BufferedPacket>::iterator i = m_list.begin();
                i != m_list.end(); ++i)
@@ -434,7 +440,7 @@ std::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout,
 
 IncomingSplitBuffer::~IncomingSplitBuffer()
 {
-       JMutexAutoLock listlock(m_map_mutex);
+       MutexAutoLock listlock(m_map_mutex);
        for(std::map<u16, IncomingSplitPacket*>::iterator i = m_buf.begin();
                i != m_buf.end(); ++i)
        {
@@ -447,15 +453,23 @@ IncomingSplitBuffer::~IncomingSplitBuffer()
 */
 SharedBuffer<u8> IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable)
 {
-       JMutexAutoLock listlock(m_map_mutex);
+       MutexAutoLock listlock(m_map_mutex);
        u32 headersize = BASE_HEADER_SIZE + 7;
-       FATAL_ERROR_IF(p.data.getSize() < headersize, "Invalid data size");
+       if (p.data.getSize() < headersize) {
+               errorstream << "Invalid data size for split packet" << std::endl;
+               return SharedBuffer<u8>();
+       }
        u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
-       sanity_check(type == TYPE_SPLIT);
        u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
        u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]);
        u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]);
 
+       if (type != TYPE_SPLIT) {
+               errorstream << "IncomingSplitBuffer::insert(): type is not split"
+                       << std::endl;
+               return SharedBuffer<u8>();
+       }
+
        // Add if doesn't exist
        if (m_buf.find(seqnum) == m_buf.end())
        {
@@ -526,7 +540,7 @@ void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
 {
        std::list<u16> remove_queue;
        {
-               JMutexAutoLock listlock(m_map_mutex);
+               MutexAutoLock listlock(m_map_mutex);
                for(std::map<u16, IncomingSplitPacket*>::iterator i = m_buf.begin();
                        i != m_buf.end(); ++i)
                {
@@ -542,7 +556,7 @@ void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
        for(std::list<u16>::iterator j = remove_queue.begin();
                j != remove_queue.end(); ++j)
        {
-               JMutexAutoLock listlock(m_map_mutex);
+               MutexAutoLock listlock(m_map_mutex);
                LOG(dout_con<<"NOTE: Removing timed out unreliable split packet"<<std::endl);
                delete m_buf[*j];
                m_buf.erase(*j);
@@ -553,45 +567,15 @@ void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
        Channel
 */
 
-Channel::Channel() :
-               window_size(MIN_RELIABLE_WINDOW_SIZE),
-               next_incoming_seqnum(SEQNUM_INITIAL),
-               next_outgoing_seqnum(SEQNUM_INITIAL),
-               next_outgoing_split_seqnum(SEQNUM_INITIAL),
-               current_packet_loss(0),
-               current_packet_too_late(0),
-               current_packet_successfull(0),
-               packet_loss_counter(0),
-               current_bytes_transfered(0),
-               current_bytes_received(0),
-               current_bytes_lost(0),
-               max_kbps(0.0),
-               cur_kbps(0.0),
-               avg_kbps(0.0),
-               max_incoming_kbps(0.0),
-               cur_incoming_kbps(0.0),
-               avg_incoming_kbps(0.0),
-               max_kbps_lost(0.0),
-               cur_kbps_lost(0.0),
-               avg_kbps_lost(0.0),
-               bpm_counter(0.0),
-               rate_samples(0)
-{
-}
-
-Channel::~Channel()
-{
-}
-
 u16 Channel::readNextIncomingSeqNum()
 {
-       JMutexAutoLock internal(m_internal_mutex);
+       MutexAutoLock internal(m_internal_mutex);
        return next_incoming_seqnum;
 }
 
 u16 Channel::incNextIncomingSeqNum()
 {
-       JMutexAutoLock internal(m_internal_mutex);
+       MutexAutoLock internal(m_internal_mutex);
        u16 retval = next_incoming_seqnum;
        next_incoming_seqnum++;
        return retval;
@@ -599,18 +583,18 @@ u16 Channel::incNextIncomingSeqNum()
 
 u16 Channel::readNextSplitSeqNum()
 {
-       JMutexAutoLock internal(m_internal_mutex);
+       MutexAutoLock internal(m_internal_mutex);
        return next_outgoing_split_seqnum;
 }
 void Channel::setNextSplitSeqNum(u16 seqnum)
 {
-       JMutexAutoLock internal(m_internal_mutex);
+       MutexAutoLock internal(m_internal_mutex);
        next_outgoing_split_seqnum = seqnum;
 }
 
-u16 Channel::getOutgoingSequenceNumber(bool& successfull)
+u16 Channel::getOutgoingSequenceNumber(bool& successful)
 {
-       JMutexAutoLock internal(m_internal_mutex);
+       MutexAutoLock internal(m_internal_mutex);
        u16 retval = next_outgoing_seqnum;
        u16 lowest_unacked_seqnumber;
 
@@ -628,7 +612,7 @@ u16 Channel::getOutgoingSequenceNumber(bool& successfull)
                        // know about difference of two unsigned may be negative in general
                        // but we already made sure it won't happen in this case
                        if (((u16)(next_outgoing_seqnum - lowest_unacked_seqnumber)) > window_size) {
-                               successfull = false;
+                               successful = false;
                                return 0;
                        }
                }
@@ -638,7 +622,7 @@ u16 Channel::getOutgoingSequenceNumber(bool& successfull)
                        // but we already made sure it won't happen in this case
                        if ((next_outgoing_seqnum + (u16)(SEQNUM_MAX - lowest_unacked_seqnumber)) >
                                window_size) {
-                               successfull = false;
+                               successful = false;
                                return 0;
                        }
                }
@@ -650,7 +634,7 @@ u16 Channel::getOutgoingSequenceNumber(bool& successfull)
 
 u16 Channel::readOutgoingSequenceNumber()
 {
-       JMutexAutoLock internal(m_internal_mutex);
+       MutexAutoLock internal(m_internal_mutex);
        return next_outgoing_seqnum;
 }
 
@@ -666,32 +650,32 @@ bool Channel::putBackSequenceNumber(u16 seqnum)
 
 void Channel::UpdateBytesSent(unsigned int bytes, unsigned int packets)
 {
-       JMutexAutoLock internal(m_internal_mutex);
+       MutexAutoLock internal(m_internal_mutex);
        current_bytes_transfered += bytes;
        current_packet_successfull += packets;
 }
 
 void Channel::UpdateBytesReceived(unsigned int bytes) {
-       JMutexAutoLock internal(m_internal_mutex);
+       MutexAutoLock internal(m_internal_mutex);
        current_bytes_received += bytes;
 }
 
 void Channel::UpdateBytesLost(unsigned int bytes)
 {
-       JMutexAutoLock internal(m_internal_mutex);
+       MutexAutoLock internal(m_internal_mutex);
        current_bytes_lost += bytes;
 }
 
 
 void Channel::UpdatePacketLossCounter(unsigned int count)
 {
-       JMutexAutoLock internal(m_internal_mutex);
+       MutexAutoLock internal(m_internal_mutex);
        current_packet_loss += count;
 }
 
 void Channel::UpdatePacketTooLateCounter()
 {
-       JMutexAutoLock internal(m_internal_mutex);
+       MutexAutoLock internal(m_internal_mutex);
        current_packet_too_late++;
 }
 
@@ -711,7 +695,7 @@ void Channel::UpdateTimers(float dtime,bool legacy_peer)
                bool reasonable_amount_of_data_transmitted = false;
 
                {
-                       JMutexAutoLock internal(m_internal_mutex);
+                       MutexAutoLock internal(m_internal_mutex);
                        packet_loss = current_packet_loss;
                        //packet_too_late = current_packet_too_late;
                        packets_successfull = current_packet_successfull;
@@ -782,7 +766,7 @@ void Channel::UpdateTimers(float dtime,bool legacy_peer)
        if (bpm_counter > 10.0)
        {
                {
-                       JMutexAutoLock internal(m_internal_mutex);
+                       MutexAutoLock internal(m_internal_mutex);
                        cur_kbps                 =
                                        (((float) current_bytes_transfered)/bpm_counter)/1024.0;
                        current_bytes_transfered = 0;
@@ -825,40 +809,26 @@ void Channel::UpdateTimers(float dtime,bool legacy_peer)
        Peer
 */
 
-PeerHelper::PeerHelper() :
-       m_peer(0)
-{}
-
 PeerHelper::PeerHelper(Peer* peer) :
        m_peer(peer)
 {
-       if (peer != NULL)
-       {
-               if (!peer->IncUseCount())
-               {
-                       m_peer = 0;
-               }
-       }
+       if (peer && !peer->IncUseCount())
+               m_peer = nullptr;
 }
 
 PeerHelper::~PeerHelper()
 {
-       if (m_peer != 0)
+       if (m_peer)
                m_peer->DecUseCount();
 
-       m_peer = 0;
+       m_peer = nullptr;
 }
 
 PeerHelper& PeerHelper::operator=(Peer* peer)
 {
        m_peer = peer;
-       if (peer != NULL)
-       {
-               if (!peer->IncUseCount())
-               {
-                       m_peer = 0;
-               }
-       }
+       if (peer && !peer->IncUseCount())
+               m_peer = nullptr;
        return *this;
 }
 
@@ -883,10 +853,9 @@ bool PeerHelper::operator!=(void* ptr)
 
 bool Peer::IncUseCount()
 {
-       JMutexAutoLock lock(m_exclusive_access_mutex);
+       MutexAutoLock lock(m_exclusive_access_mutex);
 
-       if (!m_pending_deletion)
-       {
+       if (!m_pending_deletion) {
                this->m_usage++;
                return true;
        }
@@ -897,7 +866,7 @@ bool Peer::IncUseCount()
 void Peer::DecUseCount()
 {
        {
-               JMutexAutoLock lock(m_exclusive_access_mutex);
+               MutexAutoLock lock(m_exclusive_access_mutex);
                sanity_check(m_usage > 0);
                m_usage--;
 
@@ -907,7 +876,7 @@ void Peer::DecUseCount()
        delete this;
 }
 
-void Peer::RTTStatistics(float rtt, std::string profiler_id,
+void Peer::RTTStatistics(float rtt, const std::string &profiler_id,
                unsigned int num_samples) {
 
        if (m_last_rtt > 0) {
@@ -946,8 +915,7 @@ void Peer::RTTStatistics(float rtt, std::string profiler_id,
                        m_rtt.jitter_avg  = m_rtt.jitter_avg * (num_samples/(num_samples-1)) +
                                                                jitter * (1/num_samples);
 
-               if (profiler_id != "")
-               {
+               if (profiler_id != "") {
                        g_profiler->graphAdd(profiler_id + "_rtt", rtt);
                        g_profiler->graphAdd(profiler_id + "_jitter", jitter);
                }
@@ -958,8 +926,8 @@ void Peer::RTTStatistics(float rtt, std::string profiler_id,
 
 bool Peer::isTimedOut(float timeout)
 {
-       JMutexAutoLock lock(m_exclusive_access_mutex);
-       u32 current_time = porting::getTimeMs();
+       MutexAutoLock lock(m_exclusive_access_mutex);
+       u64 current_time = porting::getTimeMs();
 
        float dtime = CALC_DTIME(m_last_timeout_check,current_time);
        m_last_timeout_check = current_time;
@@ -972,7 +940,7 @@ bool Peer::isTimedOut(float timeout)
 void Peer::Drop()
 {
        {
-               JMutexAutoLock usage_lock(m_exclusive_access_mutex);
+               MutexAutoLock usage_lock(m_exclusive_access_mutex);
                m_pending_deletion = true;
                if (m_usage != 0)
                        return;
@@ -991,10 +959,7 @@ void Peer::Drop()
 }
 
 UDPPeer::UDPPeer(u16 a_id, Address a_address, Connection* connection) :
-       Peer(a_address,a_id,connection),
-       m_pending_disconnect(false),
-       resend_timeout(0.5),
-       m_legacy_peer(true)
+       Peer(a_address,a_id,connection)
 {
 }
 
@@ -1031,7 +996,7 @@ void UDPPeer::reportRTT(float rtt)
        if (timeout > RESEND_TIMEOUT_MAX)
                timeout = RESEND_TIMEOUT_MAX;
 
-       JMutexAutoLock usage_lock(m_exclusive_access_mutex);
+       MutexAutoLock usage_lock(m_exclusive_access_mutex);
        resend_timeout = timeout;
 }
 
@@ -1063,14 +1028,14 @@ void UDPPeer::PutReliableSendCommand(ConnectionCommand &c,
                                <<" processing reliable command for peer id: " << c.peer_id
                                <<" data size: " << c.data.getSize() << std::endl);
                if (!processReliableSendCommand(c,max_packet_size)) {
-                       channels[c.channelnum].queued_commands.push(c);
+                       channels[c.channelnum].queued_commands.push_back(c);
                }
        }
        else {
                LOG(dout_con<<m_connection->getDesc()
                                <<" Queueing reliable command for peer id: " << c.peer_id
                                <<" data size: " << c.data.getSize() <<std::endl);
-               channels[c.channelnum].queued_commands.push(c);
+               channels[c.channelnum].queued_commands.push_back(c);
        }
 }
 
@@ -1182,25 +1147,26 @@ void UDPPeer::RunCommandQueues(
                                                        unsigned int maxtransfer)
 {
 
-       for (unsigned int i = 0; i < CHANNEL_COUNT; i++)
-       {
+       for (unsigned int i = 0; i < CHANNEL_COUNT; i++) {
                unsigned int commands_processed = 0;
 
                if ((channels[i].queued_commands.size() > 0) &&
                                (channels[i].queued_reliables.size() < maxtransfer) &&
-                               (commands_processed < maxcommands))
-               {
+                               (commands_processed < maxcommands)) {
                        try {
                                ConnectionCommand c = channels[i].queued_commands.front();
-                               channels[i].queued_commands.pop();
-                               LOG(dout_con<<m_connection->getDesc()
-                                               <<" processing queued reliable command "<<std::endl);
-                               if (!processReliableSendCommand(c,max_packet_size)) {
-                                       LOG(dout_con<<m_connection->getDesc()
+
+                               LOG(dout_con << m_connection->getDesc()
+                                               << " processing queued reliable command " << std::endl);
+
+                               // Packet is processed, remove it from queue
+                               if (processReliableSendCommand(c,max_packet_size)) {
+                                       channels[i].queued_commands.pop_front();
+                               } else {
+                                       LOG(dout_con << m_connection->getDesc()
                                                        << " Failed to queue packets for peer_id: " << c.peer_id
                                                        << ", delaying sending of " << c.data.getSize()
                                                        << " bytes" << std::endl);
-                                       channels[i].queued_commands.push(c);
                                }
                        }
                        catch (ItemNotFoundException &e) {
@@ -1213,7 +1179,7 @@ void UDPPeer::RunCommandQueues(
 u16 UDPPeer::getNextSplitSequenceNumber(u8 channel)
 {
        assert(channel < CHANNEL_COUNT); // Pre-condition
-       return channels[channel].readNextIncomingSeqNum();
+       return channels[channel].readNextSplitSeqNum();
 }
 
 void UDPPeer::setNextSplitSequenceNumber(u8 channel, u16 seqnum)
@@ -1234,47 +1200,41 @@ SharedBuffer<u8> UDPPeer::addSpiltPacket(u8 channel,
 /* Connection Threads                                                         */
 /******************************************************************************/
 
-ConnectionSendThread::ConnectionSendThread( unsigned int max_packet_size,
-                                                                                       float timeout) :
-       m_connection(NULL),
+ConnectionSendThread::ConnectionSendThread(unsigned int max_packet_size,
+               float timeout) :
+       Thread("ConnectionSend"),
        m_max_packet_size(max_packet_size),
        m_timeout(timeout),
-       m_max_commands_per_iteration(1),
-       m_max_data_packets_per_iteration(g_settings->getU16("max_packets_per_iteration")),
-       m_max_packets_requeued(256)
+       m_max_data_packets_per_iteration(g_settings->getU16("max_packets_per_iteration"))
 {
 }
 
-void * ConnectionSendThread::Thread()
+void * ConnectionSendThread::run()
 {
-       assert(m_connection != NULL);
-       ThreadStarted();
-       log_register_thread("ConnectionSend");
+       assert(m_connection);
 
        LOG(dout_con<<m_connection->getDesc()
                        <<"ConnectionSend thread started"<<std::endl);
 
-       u32 curtime = porting::getTimeMs();
-       u32 lasttime = curtime;
+       u64 curtime = porting::getTimeMs();
+       u64 lasttime = curtime;
 
        PROFILE(std::stringstream ThreadIdentifier);
        PROFILE(ThreadIdentifier << "ConnectionSend: [" << m_connection->getDesc() << "]");
 
-       porting::setThreadName("ConnectionSend");
-
        /* if stop is requested don't stop immediately but try to send all        */
        /* packets first */
-       while(!StopRequested() || packetsQueued()) {
+       while(!stopRequested() || packetsQueued()) {
                BEGIN_DEBUG_EXCEPTION_HANDLER
                PROFILE(ScopeProfiler sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
 
                m_iteration_packets_avaialble = m_max_data_packets_per_iteration;
 
                /* wait for trigger or timeout */
-               m_send_sleep_semaphore.Wait(50);
+               m_send_sleep_semaphore.wait(50);
 
                /* remove all triggers */
-               while(m_send_sleep_semaphore.Wait(0)) {}
+               while(m_send_sleep_semaphore.wait(0)) {}
 
                lasttime = curtime;
                curtime = porting::getTimeMs();
@@ -1298,7 +1258,7 @@ void * ConnectionSendThread::Thread()
                /* send non reliable packets */
                sendPackets(dtime);
 
-               END_DEBUG_EXCEPTION_HANDLER(errorstream);
+               END_DEBUG_EXCEPTION_HANDLER
        }
 
        PROFILE(g_profiler->remove(ThreadIdentifier.str()));
@@ -1307,7 +1267,7 @@ void * ConnectionSendThread::Thread()
 
 void ConnectionSendThread::Trigger()
 {
-       m_send_sleep_semaphore.Post();
+       m_send_sleep_semaphore.post();
 }
 
 bool ConnectionSendThread::packetsQueued()
@@ -1328,12 +1288,10 @@ bool ConnectionSendThread::packetsQueued()
                if (dynamic_cast<UDPPeer*>(&peer) == 0)
                        continue;
 
-               for(u16 i=0; i<CHANNEL_COUNT; i++)
-               {
+               for(u16 i=0; i < CHANNEL_COUNT; i++) {
                        Channel *channel = &(dynamic_cast<UDPPeer*>(&peer))->channels[i];
 
-                       if (channel->queued_commands.size() > 0)
-                       {
+                       if (channel->queued_commands.size() > 0) {
                                return true;
                        }
                }
@@ -1383,6 +1341,7 @@ void ConnectionSendThread::runTimeouts(float dtime)
                }
 
                float resend_timeout = dynamic_cast<UDPPeer*>(&peer)->getResendTimeout();
+               bool retry_count_exceeded = false;
                for(u16 i=0; i<CHANNEL_COUNT; i++)
                {
                        std::list<BufferedPacket> timed_outs;
@@ -1422,6 +1381,13 @@ void ConnectionSendThread::runTimeouts(float dtime)
                                channel->UpdateBytesLost(k->data.getSize());
                                k->resend_count++;
 
+                               if (k-> resend_count > MAX_RELIABLE_RETRY) {
+                                       retry_count_exceeded = true;
+                                       timeouted_peers.push_back(peer->id);
+                                       /* no need to check additional packets if a single one did timeout*/
+                                       break;
+                               }
+
                                LOG(derr_con<<m_connection->getDesc()
                                                <<"RE-SENDING timed-out RELIABLE to "
                                                << k->address.serializeString()
@@ -1436,9 +1402,18 @@ void ConnectionSendThread::runTimeouts(float dtime)
                                // do not handle rtt here as we can't decide if this packet was
                                // lost or really takes more time to transmit
                        }
+
+                       if (retry_count_exceeded) {
+                               break; /* no need to check other channels if we already did timeout */
+                       }
+
                        channel->UpdateTimers(dtime,dynamic_cast<UDPPeer*>(&peer)->getLegacyPeer());
                }
 
+               /* skip to next peer if we did timeout */
+               if (retry_count_exceeded)
+                       continue;
+
                /* send ping if necessary */
                if (dynamic_cast<UDPPeer*>(&peer)->Ping(dtime,data)) {
                        LOG(dout_con<<m_connection->getDesc()
@@ -1744,7 +1719,7 @@ void ConnectionSendThread::disconnect()
 
        for (std::list<u16>::iterator i = peerids.begin();
                        i != peerids.end();
-                       i++)
+                       ++i)
        {
                sendAsPacket(*i, 0,data,false);
        }
@@ -1824,7 +1799,7 @@ void ConnectionSendThread::sendToAll(u8 channelnum, SharedBuffer<u8> data)
 
        for (std::list<u16>::iterator i = peerids.begin();
                        i != peerids.end();
-                       i++)
+                       ++i)
        {
                send(*i, channelnum, data);
        }
@@ -1836,7 +1811,7 @@ void ConnectionSendThread::sendToAllReliable(ConnectionCommand &c)
 
        for (std::list<u16>::iterator i = peerids.begin();
                        i != peerids.end();
-                       i++)
+                       ++i)
        {
                PeerHelper peer = m_connection->getPeerNoEx(*i);
 
@@ -1965,7 +1940,7 @@ void ConnectionSendThread::sendPackets(float dtime)
                }
                else if (
                        ( peer->m_increment_packets_remaining > 0) ||
-                       (StopRequested())) {
+                       (stopRequested())) {
                        rawSendAsPacket(packet.peer_id, packet.channelnum,
                                        packet.data, packet.reliable);
                        peer->m_increment_packets_remaining--;
@@ -1995,15 +1970,13 @@ void ConnectionSendThread::sendAsPacket(u16 peer_id, u8 channelnum,
 }
 
 ConnectionReceiveThread::ConnectionReceiveThread(unsigned int max_packet_size) :
-       m_connection(NULL)
+       Thread("ConnectionReceive")
 {
 }
 
-void * ConnectionReceiveThread::Thread()
+void * ConnectionReceiveThread::run()
 {
-       assert(m_connection != NULL);
-       ThreadStarted();
-       log_register_thread("ConnectionReceive");
+       assert(m_connection);
 
        LOG(dout_con<<m_connection->getDesc()
                        <<"ConnectionReceive thread started"<<std::endl);
@@ -2011,15 +1984,13 @@ void * ConnectionReceiveThread::Thread()
        PROFILE(std::stringstream ThreadIdentifier);
        PROFILE(ThreadIdentifier << "ConnectionReceive: [" << m_connection->getDesc() << "]");
 
-       porting::setThreadName("ConnectionReceive");
-
 #ifdef DEBUG_CONNECTION_KBPS
-       u32 curtime = porting::getTimeMs();
-       u32 lasttime = curtime;
+       u64 curtime = porting::getTimeMs();
+       u64 lasttime = curtime;
        float debug_print_timer = 0.0;
 #endif
 
-       while(!StopRequested()) {
+       while(!stopRequested()) {
                BEGIN_DEBUG_EXCEPTION_HANDLER
                PROFILE(ScopeProfiler sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
 
@@ -2084,8 +2055,9 @@ void * ConnectionReceiveThread::Thread()
                        }
                }
 #endif
-               END_DEBUG_EXCEPTION_HANDLER(errorstream);
+               END_DEBUG_EXCEPTION_HANDLER
        }
+
        PROFILE(g_profiler->remove(ThreadIdentifier.str()));
        return NULL;
 }
@@ -2153,12 +2125,12 @@ void ConnectionReceiveThread::receive()
                                throw InvalidIncomingDataException("Channel doesn't exist");
                        }
 
-                       /* preserve original peer_id for later usage */
-                       u16 packet_peer_id   = peer_id;
-
                        /* Try to identify peer by sender address (may happen on join) */
                        if (peer_id == PEER_ID_INEXISTENT) {
                                peer_id = m_connection->lookupPeer(sender);
+                               // We do not have to remind the peer of its
+                               // peer id as the CONTROLTYPE_SET_PEER_ID
+                               // command was sent reliably.
                        }
 
                        /* The peer was not found in our lists. Add it. */
@@ -2200,11 +2172,6 @@ void ConnectionReceiveThread::receive()
                                }
                        }
 
-
-                       /* mark peer as seen with id */
-                       if (!(packet_peer_id == PEER_ID_INEXISTENT))
-                               peer->setSentWithID();
-
                        peer->ResetTimeout();
 
                        Channel *channel = 0;
@@ -2328,8 +2295,9 @@ SharedBuffer<u8> ConnectionReceiveThread::processPacket(Channel *channel,
        u8 type = readU8(&(packetdata[0]));
 
        if (MAX_UDP_PEERS <= 65535 && peer_id >= MAX_UDP_PEERS) {
-               errorstream << "Something is wrong with peer_id" << std::endl;
-               FATAL_ERROR("");
+               std::string errmsg = "Invalid peer_id=" + itos(peer_id);
+               errorstream << errmsg << std::endl;
+               throw InvalidIncomingDataException(errmsg.c_str());
        }
 
        if (type == TYPE_CONTROL)
@@ -2341,10 +2309,12 @@ SharedBuffer<u8> ConnectionReceiveThread::processPacket(Channel *channel,
 
                if (controltype == CONTROLTYPE_ACK)
                {
-                       FATAL_ERROR_IF(channel == 0, "Invalid channel (0)");
-                       if (packetdata.getSize() < 4)
-                               throw InvalidIncomingDataException
-                                               ("packetdata.getSize() < 4 (ACK header size)");
+                       assert(channel != NULL);
+
+                       if (packetdata.getSize() < 4) {
+                               throw InvalidIncomingDataException(
+                                       "packetdata.getSize() < 4 (ACK header size)");
+                       }
 
                        u16 seqnum = readU16(&packetdata[2]);
                        LOG(dout_con<<m_connection->getDesc()
@@ -2359,7 +2329,7 @@ SharedBuffer<u8> ConnectionReceiveThread::processPacket(Channel *channel,
                                // only calculate rtt from straight sent packets
                                if (p.resend_count == 0) {
                                        // Get round trip time
-                                       unsigned int current_time = porting::getTimeMs();
+                                       u64 current_time = porting::getTimeMs();
 
                                        // a overflow is quite unlikely but as it'd result in major
                                        // rtt miscalculation we handle it here
@@ -2509,7 +2479,8 @@ SharedBuffer<u8> ConnectionReceiveThread::processPacket(Channel *channel,
        }
        else if (type == TYPE_RELIABLE)
        {
-               FATAL_ERROR_IF(channel == 0, "Invalid channel (0)");
+               assert(channel != NULL);
+
                // Recursive reliable packets not allowed
                if (reliable)
                        throw InvalidIncomingDataException("Found nested reliable packets");
@@ -2640,44 +2611,13 @@ SharedBuffer<u8> ConnectionReceiveThread::processPacket(Channel *channel,
        Connection
 */
 
-Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
-               bool ipv6) :
-       m_udpSocket(ipv6),
-       m_command_queue(),
-       m_event_queue(),
-       m_peer_id(0),
-       m_protocol_id(protocol_id),
-       m_sendThread(max_packet_size, timeout),
-       m_receiveThread(max_packet_size),
-       m_info_mutex(),
-       m_bc_peerhandler(0),
-       m_bc_receive_timeout(0),
-       m_shutting_down(false),
-       m_next_remote_peer_id(2)
-{
-       m_udpSocket.setTimeoutMs(5);
-
-       m_sendThread.setParent(this);
-       m_receiveThread.setParent(this);
-
-       m_sendThread.Start();
-       m_receiveThread.Start();
-}
-
 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
                bool ipv6, PeerHandler *peerhandler) :
        m_udpSocket(ipv6),
-       m_command_queue(),
-       m_event_queue(),
-       m_peer_id(0),
        m_protocol_id(protocol_id),
        m_sendThread(max_packet_size, timeout),
        m_receiveThread(max_packet_size),
-       m_info_mutex(),
-       m_bc_peerhandler(peerhandler),
-       m_bc_receive_timeout(0),
-       m_shutting_down(false),
-       m_next_remote_peer_id(2)
+       m_bc_peerhandler(peerhandler)
 
 {
        m_udpSocket.setTimeoutMs(5);
@@ -2685,8 +2625,8 @@ Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
        m_sendThread.setParent(this);
        m_receiveThread.setParent(this);
 
-       m_sendThread.Start();
-       m_receiveThread.Start();
+       m_sendThread.start();
+       m_receiveThread.start();
 
 }
 
@@ -2695,8 +2635,8 @@ Connection::~Connection()
 {
        m_shutting_down = true;
        // request threads to stop
-       m_sendThread.Stop();
-       m_receiveThread.Stop();
+       m_sendThread.stop();
+       m_receiveThread.stop();
 
        //TODO for some unkonwn reason send/receive threads do not exit as they're
        // supposed to be but wait on peer timeout. To speed up shutdown we reduce
@@ -2704,8 +2644,8 @@ Connection::~Connection()
        m_sendThread.setPeerTimeout(0.5);
 
        // wait for threads to finish
-       m_sendThread.Wait();
-       m_receiveThread.Wait();
+       m_sendThread.wait();
+       m_receiveThread.wait();
 
        // Delete peers
        for(std::map<u16, Peer*>::iterator
@@ -2725,7 +2665,7 @@ void Connection::putEvent(ConnectionEvent &e)
 
 PeerHelper Connection::getPeer(u16 peer_id)
 {
-       JMutexAutoLock peerlock(m_peers_mutex);
+       MutexAutoLock peerlock(m_peers_mutex);
        std::map<u16, Peer*>::iterator node = m_peers.find(peer_id);
 
        if (node == m_peers.end()) {
@@ -2740,7 +2680,7 @@ PeerHelper Connection::getPeer(u16 peer_id)
 
 PeerHelper Connection::getPeerNoEx(u16 peer_id)
 {
-       JMutexAutoLock peerlock(m_peers_mutex);
+       MutexAutoLock peerlock(m_peers_mutex);
        std::map<u16, Peer*>::iterator node = m_peers.find(peer_id);
 
        if (node == m_peers.end()) {
@@ -2756,13 +2696,13 @@ PeerHelper Connection::getPeerNoEx(u16 peer_id)
 /* find peer_id for address */
 u16 Connection::lookupPeer(Address& sender)
 {
-       JMutexAutoLock peerlock(m_peers_mutex);
+       MutexAutoLock peerlock(m_peers_mutex);
        std::map<u16, Peer*>::iterator j;
        j = m_peers.begin();
        for(; j != m_peers.end(); ++j)
        {
                Peer *peer = j->second;
-               if (peer->isActive())
+               if (peer->isPendingDeletion())
                        continue;
 
                Address tocheck;
@@ -2795,7 +2735,7 @@ bool Connection::deletePeer(u16 peer_id, bool timeout)
 
        /* lock list as short as possible */
        {
-               JMutexAutoLock peerlock(m_peers_mutex);
+               MutexAutoLock peerlock(m_peers_mutex);
                if (m_peers.find(peer_id) == m_peers.end())
                        return false;
                peer = m_peers[peer_id];
@@ -2818,16 +2758,6 @@ bool Connection::deletePeer(u16 peer_id, bool timeout)
 
 /* Interface */
 
-ConnectionEvent Connection::getEvent()
-{
-       if (m_event_queue.empty()) {
-               ConnectionEvent e;
-               e.type = CONNEVENT_NONE;
-               return e;
-       }
-       return m_event_queue.pop_frontNoEx();
-}
-
 ConnectionEvent Connection::waitEvent(u32 timeout_ms)
 {
        try {
@@ -2863,7 +2793,7 @@ void Connection::Connect(Address address)
 
 bool Connection::Connected()
 {
-       JMutexAutoLock peerlock(m_peers_mutex);
+       MutexAutoLock peerlock(m_peers_mutex);
 
        if (m_peers.size() != 1)
                return false;
@@ -2885,30 +2815,36 @@ void Connection::Disconnect()
        putCommand(c);
 }
 
-u32 Connection::Receive(u16 &peer_id, SharedBuffer<u8> &data)
+void Connection::Receive(NetworkPacket* pkt)
 {
        for(;;) {
                ConnectionEvent e = waitEvent(m_bc_receive_timeout);
                if (e.type != CONNEVENT_NONE)
-                       LOG(dout_con<<getDesc()<<": Receive: got event: "
-                                       <<e.describe()<<std::endl);
+                       LOG(dout_con << getDesc() << ": Receive: got event: "
+                                       << e.describe() << std::endl);
                switch(e.type) {
                case CONNEVENT_NONE:
                        throw NoIncomingDataException("No incoming data");
                case CONNEVENT_DATA_RECEIVED:
-                       peer_id = e.peer_id;
-                       data = SharedBuffer<u8>(e.data);
-                       return e.data.getSize();
+                       // Data size is lesser than command size, ignoring packet
+                       if (e.data.getSize() < 2) {
+                               continue;
+                       }
+
+                       pkt->putRawPacket(*e.data, e.data.getSize(), e.peer_id);
+                       return;
                case CONNEVENT_PEER_ADDED: {
                        UDPPeer tmp(e.peer_id, e.address, this);
                        if (m_bc_peerhandler)
                                m_bc_peerhandler->peerAdded(&tmp);
-                       continue; }
+                       continue;
+               }
                case CONNEVENT_PEER_REMOVED: {
                        UDPPeer tmp(e.peer_id, e.address, this);
                        if (m_bc_peerhandler)
                                m_bc_peerhandler->deletingPeer(&tmp, e.timeout);
-                       continue; }
+                       continue;
+               }
                case CONNEVENT_BIND_FAILED:
                        throw ConnectionBindFailed("Failed to bind socket "
                                        "(port already in use?)");
@@ -2924,7 +2860,7 @@ void Connection::Send(u16 peer_id, u8 channelnum,
 
        ConnectionCommand c;
 
-       c.send(peer_id, channelnum, pkt->oldForgePacket(), reliable);
+       c.send(peer_id, channelnum, pkt, reliable);
        putCommand(c);
 }
 
@@ -2992,7 +2928,7 @@ u16 Connection::createPeer(Address& sender, MTProtocols protocol, int fd)
        /*
                Find an unused peer id
        */
-       JMutexAutoLock lock(m_peers_mutex);
+       MutexAutoLock lock(m_peers_mutex);
        bool out_of_ids = false;
        for(;;) {
                // Check if exists
@@ -3043,9 +2979,9 @@ u16 Connection::createPeer(Address& sender, MTProtocols protocol, int fd)
 
 void Connection::PrintInfo(std::ostream &out)
 {
-       m_info_mutex.Lock();
+       m_info_mutex.lock();
        out<<getDesc()<<": ";
-       m_info_mutex.Unlock();
+       m_info_mutex.unlock();
 }
 
 void Connection::PrintInfo()
@@ -3096,7 +3032,7 @@ UDPPeer* Connection::createServerPeer(Address& address)
        UDPPeer *peer = new UDPPeer(PEER_ID_SERVER, address, this);
 
        {
-               JMutexAutoLock lock(m_peers_mutex);
+               MutexAutoLock lock(m_peers_mutex);
                m_peers[peer->id] = peer;
                m_peer_ids.push_back(peer->id);
        }