Fix bone-attached entities (#10015)
[oweals/minetest.git] / src / network / connection.cpp
index a9f8ee082e065958dd3820cc9f1249651e17a665..3692e45a9870519297252582f871f57670643df1 100644 (file)
@@ -20,6 +20,7 @@ with this program; if not, write to the Free Software Foundation, Inc.,
 #include <iomanip>
 #include <cerrno>
 #include <algorithm>
+#include <cmath>
 #include "connection.h"
 #include "serialization.h"
 #include "log.h"
@@ -40,22 +41,28 @@ namespace con
 /* defines used for debugging and profiling                                   */
 /******************************************************************************/
 #ifdef NDEBUG
-#define LOG(a) a
-#define PROFILE(a)
+       #define LOG(a) a
+       #define PROFILE(a)
 #else
-/* this mutex is used to achieve log message consistency */
-std::mutex log_message_mutex;
-#define LOG(a)                                                                 \
-       {                                                                          \
-       MutexAutoLock loglock(log_message_mutex);                                 \
-       a;                                                                         \
-       }
-#define PROFILE(a) a
+       #if 0
+       /* this mutex is used to achieve log message consistency */
+       std::mutex log_message_mutex;
+       #define LOG(a)                                                                 \
+               {                                                                          \
+               MutexAutoLock loglock(log_message_mutex);                                 \
+               a;                                                                         \
+               }
+       #else
+       // Prevent deadlocks until a solution is found after 5.2.0 (TODO)
+       #define LOG(a) a
+       #endif
+
+       #define PROFILE(a) a
 #endif
 
 #define PING_TIMEOUT 5.0
 
-BufferedPacket makePacket(Address &address, SharedBuffer<u8> data,
+BufferedPacket makePacket(Address &address, const SharedBuffer<u8> &data,
                u32 protocol_id, session_t sender_peer_id, u8 channel)
 {
        u32 packet_size = data.getSize() + BASE_HEADER_SIZE;
@@ -125,7 +132,7 @@ void makeSplitPacket(const SharedBuffer<u8> &data, u32 chunksize_max, u16 seqnum
        }
 }
 
-void makeAutoSplitPacket(SharedBuffer<u8> data, u32 chunksize_max,
+void makeAutoSplitPacket(const SharedBuffer<u8> &data, u32 chunksize_max,
                u16 &split_seqnum, std::list<SharedBuffer<u8>> *list)
 {
        u32 original_header_size = 1;
@@ -139,7 +146,7 @@ void makeAutoSplitPacket(SharedBuffer<u8> data, u32 chunksize_max,
        list->push_back(makeOriginalPacket(data));
 }
 
-SharedBuffer<u8> makeReliablePacket(SharedBuffer<u8> data, u16 seqnum)
+SharedBuffer<u8> makeReliablePacket(const SharedBuffer<u8> &data, u16 seqnum)
 {
        u32 header_size = 3;
        u32 packet_size = data.getSize() + header_size;
@@ -168,6 +175,7 @@ void ReliablePacketBuffer::print()
                index++;
        }
 }
+
 bool ReliablePacketBuffer::empty()
 {
        MutexAutoLock listlock(m_list_mutex);
@@ -176,12 +184,8 @@ bool ReliablePacketBuffer::empty()
 
 u32 ReliablePacketBuffer::size()
 {
-       return m_list_size;
-}
-
-bool ReliablePacketBuffer::containsPacket(u16 seqnum)
-{
-       return !(findPacket(seqnum) == m_list.end());
+       MutexAutoLock listlock(m_list_mutex);
+       return m_list.size();
 }
 
 RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
@@ -190,24 +194,24 @@ RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
        for(; i != m_list.end(); ++i)
        {
                u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
-               /*dout_con<<"findPacket(): finding seqnum="<<seqnum
-                               <<", comparing to s="<<s<<std::endl;*/
                if (s == seqnum)
                        break;
        }
        return i;
 }
+
 RPBSearchResult ReliablePacketBuffer::notFound()
 {
        return m_list.end();
 }
+
 bool ReliablePacketBuffer::getFirstSeqnum(u16& result)
 {
        MutexAutoLock listlock(m_list_mutex);
        if (m_list.empty())
                return false;
-       BufferedPacket p = *m_list.begin();
-       result = readU16(&p.data[BASE_HEADER_SIZE+1]);
+       const BufferedPacket &p = *m_list.begin();
+       result = readU16(&p.data[BASE_HEADER_SIZE + 1]);
        return true;
 }
 
@@ -218,16 +222,16 @@ BufferedPacket ReliablePacketBuffer::popFirst()
                throw NotFoundException("Buffer is empty");
        BufferedPacket p = *m_list.begin();
        m_list.erase(m_list.begin());
-       --m_list_size;
 
-       if (m_list_size == 0) {
+       if (m_list.empty()) {
                m_oldest_non_answered_ack = 0;
        } else {
                m_oldest_non_answered_ack =
-                               readU16(&(*m_list.begin()).data[BASE_HEADER_SIZE+1]);
+                               readU16(&m_list.begin()->data[BASE_HEADER_SIZE + 1]);
        }
        return p;
 }
+
 BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
 {
        MutexAutoLock listlock(m_list_mutex);
@@ -248,15 +252,17 @@ BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
        }
 
        m_list.erase(r);
-       --m_list_size;
 
-       if (m_list_size == 0)
-       { m_oldest_non_answered_ack = 0; }
-       else
-       { m_oldest_non_answered_ack = readU16(&(*m_list.begin()).data[BASE_HEADER_SIZE+1]);     }
+       if (m_list.empty()) {
+               m_oldest_non_answered_ack = 0;
+       } else {
+               m_oldest_non_answered_ack =
+                               readU16(&m_list.begin()->data[BASE_HEADER_SIZE + 1]);
+       }
        return p;
 }
-void ReliablePacketBuffer::insert(BufferedPacket &p,u16 next_expected)
+
+void ReliablePacketBuffer::insert(BufferedPacket &p, u16 next_expected)
 {
        MutexAutoLock listlock(m_list_mutex);
        if (p.data.getSize() < BASE_HEADER_SIZE + 3) {
@@ -283,8 +289,7 @@ void ReliablePacketBuffer::insert(BufferedPacket &p,u16 next_expected)
                return;
        }
 
-       ++m_list_size;
-       sanity_check(m_list_size <= SEQNUM_MAX+1);      // FIXME: Handle the error?
+       sanity_check(m_list.size() <= SEQNUM_MAX); // FIXME: Handle the error?
 
        // Find the right place for the packet and insert it there
        // If list is empty, just add it
@@ -321,6 +326,8 @@ void ReliablePacketBuffer::insert(BufferedPacket &p,u16 next_expected)
        }
 
        if (s == seqnum) {
+               /* nothing to do this seems to be a resent packet */
+               /* for paranoia reason data should be compared */
                if (
                        (readU16(&(i->data[BASE_HEADER_SIZE+1])) != seqnum) ||
                        (i->data.getSize() != p.data.getSize()) ||
@@ -339,16 +346,11 @@ void ReliablePacketBuffer::insert(BufferedPacket &p,u16 next_expected)
                                        p.address.serializeString().c_str());
                        throw IncomingDataCorruption("duplicated packet isn't same as original one");
                }
-
-               /* nothing to do this seems to be a resent packet */
-               /* for paranoia reason data should be compared */
-               --m_list_size;
        }
        /* insert or push back */
        else if (i != m_list.end()) {
                m_list.insert(i, p);
-       }
-       else {
+       } else {
                m_list.push_back(p);
        }
 
@@ -383,6 +385,48 @@ std::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout,
        return timed_outs;
 }
 
+/*
+       IncomingSplitPacket
+*/
+
+bool IncomingSplitPacket::insert(u32 chunk_num, SharedBuffer<u8> &chunkdata)
+{
+       sanity_check(chunk_num < chunk_count);
+
+       // If chunk already exists, ignore it.
+       // Sometimes two identical packets may arrive when there is network
+       // lag and the server re-sends stuff.
+       if (chunks.find(chunk_num) != chunks.end())
+               return false;
+
+       // Set chunk data in buffer
+       chunks[chunk_num] = chunkdata;
+
+       return true;
+}
+
+SharedBuffer<u8> IncomingSplitPacket::reassemble()
+{
+       sanity_check(allReceived());
+
+       // Calculate total size
+       u32 totalsize = 0;
+       for (const auto &chunk : chunks)
+               totalsize += chunk.second.getSize();
+
+       SharedBuffer<u8> fulldata(totalsize);
+
+       // Copy chunks to data buffer
+       u32 start = 0;
+       for (u32 chunk_i = 0; chunk_i < chunk_count; chunk_i++) {
+               const SharedBuffer<u8> &buf = chunks[chunk_i];
+               memcpy(&fulldata[start], *buf, buf.getSize());
+               start += buf.getSize();
+       }
+
+       return fulldata;
+}
+
 /*
        IncomingSplitBuffer
 */
@@ -394,10 +438,7 @@ IncomingSplitBuffer::~IncomingSplitBuffer()
                delete i.second;
        }
 }
-/*
-       This will throw a GotSplitPacketException when a full
-       split packet is constructed.
-*/
+
 SharedBuffer<u8> IncomingSplitBuffer::insert(const BufferedPacket &p, bool reliable)
 {
        MutexAutoLock listlock(m_map_mutex);
@@ -416,57 +457,45 @@ SharedBuffer<u8> IncomingSplitBuffer::insert(const BufferedPacket &p, bool relia
                        << std::endl;
                return SharedBuffer<u8>();
        }
+       if (chunk_num >= chunk_count) {
+               errorstream << "IncomingSplitBuffer::insert(): chunk_num=" << chunk_num
+                               << " >= chunk_count=" << chunk_count << std::endl;
+               return SharedBuffer<u8>();
+       }
 
        // Add if doesn't exist
+       IncomingSplitPacket *sp;
        if (m_buf.find(seqnum) == m_buf.end()) {
-               m_buf[seqnum] = new IncomingSplitPacket(chunk_count, reliable);
+               sp = new IncomingSplitPacket(chunk_count, reliable);
+               m_buf[seqnum] = sp;
+       } else {
+               sp = m_buf[seqnum];
        }
 
-       IncomingSplitPacket *sp = m_buf[seqnum];
-
-       if (chunk_count != sp->chunk_count)
-               LOG(derr_con<<"Connection: WARNING: chunk_count="<<chunk_count
-                               <<" != sp->chunk_count="<<sp->chunk_count
-                               <<std::endl);
+       if (chunk_count != sp->chunk_count) {
+               errorstream << "IncomingSplitBuffer::insert(): chunk_count="
+                               << chunk_count << " != sp->chunk_count=" << sp->chunk_count
+                               << std::endl;
+               return SharedBuffer<u8>();
+       }
        if (reliable != sp->reliable)
                LOG(derr_con<<"Connection: WARNING: reliable="<<reliable
                                <<" != sp->reliable="<<sp->reliable
                                <<std::endl);
 
-       // If chunk already exists, ignore it.
-       // Sometimes two identical packets may arrive when there is network
-       // lag and the server re-sends stuff.
-       if (sp->chunks.find(chunk_num) != sp->chunks.end())
-               return SharedBuffer<u8>();
-
        // Cut chunk data out of packet
        u32 chunkdatasize = p.data.getSize() - headersize;
        SharedBuffer<u8> chunkdata(chunkdatasize);
        memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
 
-       // Set chunk data in buffer
-       sp->chunks[chunk_num] = chunkdata;
+       if (!sp->insert(chunk_num, chunkdata))
+               return SharedBuffer<u8>();
 
        // If not all chunks are received, return empty buffer
        if (!sp->allReceived())
                return SharedBuffer<u8>();
 
-       // Calculate total size
-       u32 totalsize = 0;
-       for (const auto &chunk : sp->chunks) {
-               totalsize += chunk.second.getSize();
-       }
-
-       SharedBuffer<u8> fulldata(totalsize);
-
-       // Copy chunks to data buffer
-       u32 start = 0;
-       for (u32 chunk_i=0; chunk_i<sp->chunk_count; chunk_i++) {
-               const SharedBuffer<u8> &buf = sp->chunks[chunk_i];
-               u16 buf_chunkdatasize = buf.getSize();
-               memcpy(&fulldata[start], *buf, buf_chunkdatasize);
-               start += buf_chunkdatasize;
-       }
+       SharedBuffer<u8> fulldata = sp->reassemble();
 
        // Remove sp from buffer
        m_buf.erase(seqnum);
@@ -474,6 +503,7 @@ SharedBuffer<u8> IncomingSplitBuffer::insert(const BufferedPacket &p, bool relia
 
        return fulldata;
 }
+
 void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
 {
        std::deque<u16> remove_queue;
@@ -600,7 +630,7 @@ void Channel::UpdateBytesSent(unsigned int bytes, unsigned int packets)
 {
        MutexAutoLock internal(m_internal_mutex);
        current_bytes_transfered += bytes;
-       current_packet_successfull += packets;
+       current_packet_successful += packets;
 }
 
 void Channel::UpdateBytesReceived(unsigned int bytes) {
@@ -627,17 +657,16 @@ void Channel::UpdatePacketTooLateCounter()
        current_packet_too_late++;
 }
 
-void Channel::UpdateTimers(float dtime,bool legacy_peer)
+void Channel::UpdateTimers(float dtime)
 {
        bpm_counter += dtime;
        packet_loss_counter += dtime;
 
-       if (packet_loss_counter > 1.0)
-       {
-               packet_loss_counter -= 1.0;
+       if (packet_loss_counter > 1.0f) {
+               packet_loss_counter -= 1.0f;
 
                unsigned int packet_loss = 11; /* use a neutral value for initialization */
-               unsigned int packets_successfull = 0;
+               unsigned int packets_successful = 0;
                //unsigned int packet_too_late = 0;
 
                bool reasonable_amount_of_data_transmitted = false;
@@ -646,94 +675,78 @@ void Channel::UpdateTimers(float dtime,bool legacy_peer)
                        MutexAutoLock internal(m_internal_mutex);
                        packet_loss = current_packet_loss;
                        //packet_too_late = current_packet_too_late;
-                       packets_successfull = current_packet_successfull;
+                       packets_successful = current_packet_successful;
 
-                       if (current_bytes_transfered > (unsigned int) (window_size*512/2))
-                       {
+                       if (current_bytes_transfered > (unsigned int) (window_size*512/2)) {
                                reasonable_amount_of_data_transmitted = true;
                        }
                        current_packet_loss = 0;
                        current_packet_too_late = 0;
-                       current_packet_successfull = 0;
+                       current_packet_successful = 0;
                }
 
-               /* dynamic window size is only available for non legacy peers */
-               if (!legacy_peer) {
-                       float successfull_to_lost_ratio = 0.0;
-                       bool done = false;
+               /* dynamic window size */
+               float successful_to_lost_ratio = 0.0f;
+               bool done = false;
+
+               if (packets_successful > 0) {
+                       successful_to_lost_ratio = packet_loss/packets_successful;
+               } else if (packet_loss > 0) {
+                       window_size = std::max(
+                                       (window_size - 10),
+                                       MIN_RELIABLE_WINDOW_SIZE);
+                       done = true;
+               }
 
-                       if (packets_successfull > 0) {
-                               successfull_to_lost_ratio = packet_loss/packets_successfull;
-                       }
-                       else if (packet_loss > 0)
-                       {
-                               window_size = MYMAX(
-                                               (window_size - 10),
+               if (!done) {
+                       if ((successful_to_lost_ratio < 0.01f) &&
+                               (window_size < MAX_RELIABLE_WINDOW_SIZE)) {
+                               /* don't even think about increasing if we didn't even
+                                * use major parts of our window */
+                               if (reasonable_amount_of_data_transmitted)
+                                       window_size = std::min(
+                                                       (window_size + 100),
+                                                       MAX_RELIABLE_WINDOW_SIZE);
+                       } else if ((successful_to_lost_ratio < 0.05f) &&
+                                       (window_size < MAX_RELIABLE_WINDOW_SIZE)) {
+                               /* don't even think about increasing if we didn't even
+                                * use major parts of our window */
+                               if (reasonable_amount_of_data_transmitted)
+                                       window_size = std::min(
+                                                       (window_size + 50),
+                                                       MAX_RELIABLE_WINDOW_SIZE);
+                       } else if (successful_to_lost_ratio > 0.15f) {
+                               window_size = std::max(
+                                               (window_size - 100),
+                                               MIN_RELIABLE_WINDOW_SIZE);
+                       } else if (successful_to_lost_ratio > 0.1f) {
+                               window_size = std::max(
+                                               (window_size - 50),
                                                MIN_RELIABLE_WINDOW_SIZE);
-                               done = true;
-                       }
-
-                       if (!done)
-                       {
-                               if ((successfull_to_lost_ratio < 0.01) &&
-                                       (window_size < MAX_RELIABLE_WINDOW_SIZE))
-                               {
-                                       /* don't even think about increasing if we didn't even
-                                        * use major parts of our window */
-                                       if (reasonable_amount_of_data_transmitted)
-                                               window_size = MYMIN(
-                                                               (window_size + 100),
-                                                               MAX_RELIABLE_WINDOW_SIZE);
-                               }
-                               else if ((successfull_to_lost_ratio < 0.05) &&
-                                               (window_size < MAX_RELIABLE_WINDOW_SIZE))
-                               {
-                                       /* don't even think about increasing if we didn't even
-                                        * use major parts of our window */
-                                       if (reasonable_amount_of_data_transmitted)
-                                               window_size = MYMIN(
-                                                               (window_size + 50),
-                                                               MAX_RELIABLE_WINDOW_SIZE);
-                               }
-                               else if (successfull_to_lost_ratio > 0.15)
-                               {
-                                       window_size = MYMAX(
-                                                       (window_size - 100),
-                                                       MIN_RELIABLE_WINDOW_SIZE);
-                               }
-                               else if (successfull_to_lost_ratio > 0.1)
-                               {
-                                       window_size = MYMAX(
-                                                       (window_size - 50),
-                                                       MIN_RELIABLE_WINDOW_SIZE);
-                               }
                        }
                }
        }
 
-       if (bpm_counter > 10.0)
-       {
+       if (bpm_counter > 10.0f) {
                {
                        MutexAutoLock internal(m_internal_mutex);
                        cur_kbps                 =
-                                       (((float) current_bytes_transfered)/bpm_counter)/1024.0;
+                                       (((float) current_bytes_transfered)/bpm_counter)/1024.0f;
                        current_bytes_transfered = 0;
                        cur_kbps_lost            =
-                                       (((float) current_bytes_lost)/bpm_counter)/1024.0;
+                                       (((float) current_bytes_lost)/bpm_counter)/1024.0f;
                        current_bytes_lost       = 0;
                        cur_incoming_kbps        =
-                                       (((float) current_bytes_received)/bpm_counter)/1024.0;
+                                       (((float) current_bytes_received)/bpm_counter)/1024.0f;
                        current_bytes_received   = 0;
-                       bpm_counter              = 0;
+                       bpm_counter              = 0.0f;
                }
 
-               if (cur_kbps > max_kbps)
-               {
+               if (cur_kbps > max_kbps) {
                        max_kbps = cur_kbps;
                }
 
-               if (cur_kbps_lost > max_kbps_lost)
-               {
+               if (cur_kbps_lost > max_kbps_lost) {
                        max_kbps_lost = cur_kbps_lost;
                }
 
@@ -825,9 +838,8 @@ void Peer::DecUseCount()
        delete this;
 }
 
-void Peer::RTTStatistics(float rtt, const std::string &profiler_id)
-{
-       static const float avg_factor = 100.0f / MAX_RELIABLE_WINDOW_SIZE;
+void Peer::RTTStatistics(float rtt, const std::string &profiler_id,
+               unsigned int num_samples) {
 
        if (m_last_rtt > 0) {
                /* set min max values */
@@ -838,14 +850,21 @@ void Peer::RTTStatistics(float rtt, const std::string &profiler_id)
 
                /* do average calculation */
                if (m_rtt.avg_rtt < 0.0)
-                       m_rtt.avg_rtt = rtt;
+                       m_rtt.avg_rtt  = rtt;
                else
-                       m_rtt.avg_rtt += (rtt - m_rtt.avg_rtt) * avg_factor;
+                       m_rtt.avg_rtt  = m_rtt.avg_rtt * (num_samples/(num_samples-1)) +
+                                                               rtt * (1/num_samples);
 
                /* do jitter calculation */
 
                //just use some neutral value at beginning
-               float jitter = std::fabs(rtt - m_last_rtt);
+               float jitter = m_rtt.jitter_min;
+
+               if (rtt > m_last_rtt)
+                       jitter = rtt-m_last_rtt;
+
+               if (rtt <= m_last_rtt)
+                       jitter = m_last_rtt - rtt;
 
                if (jitter < m_rtt.jitter_min)
                        m_rtt.jitter_min = jitter;
@@ -853,13 +872,14 @@ void Peer::RTTStatistics(float rtt, const std::string &profiler_id)
                        m_rtt.jitter_max = jitter;
 
                if (m_rtt.jitter_avg < 0.0)
-                       m_rtt.jitter_avg = jitter;
+                       m_rtt.jitter_avg  = jitter;
                else
-                       m_rtt.jitter_avg += (jitter - m_rtt.jitter_avg) * avg_factor;
+                       m_rtt.jitter_avg  = m_rtt.jitter_avg * (num_samples/(num_samples-1)) +
+                                                               jitter * (1/num_samples);
 
                if (!profiler_id.empty()) {
-                       g_profiler->graphAdd(profiler_id + "_rtt", rtt);
-                       g_profiler->graphAdd(profiler_id + "_jitter", jitter);
+                       g_profiler->graphAdd(profiler_id + " RTT [ms]", rtt * 1000.f);
+                       g_profiler->graphAdd(profiler_id + " jitter [ms]", jitter * 1000.f);
                }
        }
        /* save values required for next loop */
@@ -903,6 +923,8 @@ void Peer::Drop()
 UDPPeer::UDPPeer(u16 a_id, Address a_address, Connection* connection) :
        Peer(a_address,a_id,connection)
 {
+       for (Channel &channel : channels)
+               channel.setWindowSize(START_RELIABLE_WINDOW_SIZE);
 }
 
 bool UDPPeer::getAddress(MTProtocols type,Address& toset)
@@ -916,23 +938,18 @@ bool UDPPeer::getAddress(MTProtocols type,Address& toset)
        return false;
 }
 
-void UDPPeer::setNonLegacyPeer()
-{
-       m_legacy_peer = false;
-       for(unsigned int i=0; i< CHANNEL_COUNT; i++)
-       {
-               channels->setWindowSize(g_settings->getU16("max_packets_per_iteration"));
-       }
-}
-
 void UDPPeer::reportRTT(float rtt)
 {
-       assert(rtt >= 0.0f);
-
-       RTTStatistics(rtt, "rudp");
+       if (rtt < 0.0) {
+               return;
+       }
+       RTTStatistics(rtt,"rudp",MAX_RELIABLE_WINDOW_SIZE*10);
 
        float timeout = getStat(AVG_RTT) * RESEND_TIMEOUT_FACTOR;
-       timeout = rangelim(timeout, RESEND_TIMEOUT_MIN, RESEND_TIMEOUT_MAX);
+       if (timeout < RESEND_TIMEOUT_MIN)
+               timeout = RESEND_TIMEOUT_MIN;
+       if (timeout > RESEND_TIMEOUT_MAX)
+               timeout = RESEND_TIMEOUT_MAX;
 
        MutexAutoLock usage_lock(m_exclusive_access_mutex);
        resend_timeout = timeout;
@@ -958,22 +975,29 @@ void UDPPeer::PutReliableSendCommand(ConnectionCommand &c,
        if (m_pending_disconnect)
                return;
 
-       if ( channels[c.channelnum].queued_commands.empty() &&
+       Channel &chan = channels[c.channelnum];
+
+       if (chan.queued_commands.empty() &&
                        /* don't queue more packets then window size */
-                       (channels[c.channelnum].queued_reliables.size()
-                       < (channels[c.channelnum].getWindowSize()/2))) {
+                       (chan.queued_reliables.size() < chan.getWindowSize() / 2)) {
                LOG(dout_con<<m_connection->getDesc()
                                <<" processing reliable command for peer id: " << c.peer_id
                                <<" data size: " << c.data.getSize() << std::endl);
                if (!processReliableSendCommand(c,max_packet_size)) {
-                       channels[c.channelnum].queued_commands.push_back(c);
+                       chan.queued_commands.push_back(c);
                }
        }
        else {
                LOG(dout_con<<m_connection->getDesc()
                                <<" Queueing reliable command for peer id: " << c.peer_id
                                <<" data size: " << c.data.getSize() <<std::endl);
-               channels[c.channelnum].queued_commands.push_back(c);
+               chan.queued_commands.push_back(c);
+               if (chan.queued_commands.size() >= chan.getWindowSize() / 2) {
+                       LOG(derr_con << m_connection->getDesc()
+                                       << "Possible packet stall to peer id: " << c.peer_id
+                                       << " queued_commands=" << chan.queued_commands.size()
+                                       << std::endl);
+               }
        }
 }
 
@@ -984,6 +1008,8 @@ bool UDPPeer::processReliableSendCommand(
        if (m_pending_disconnect)
                return true;
 
+       Channel &chan = channels[c.channelnum];
+
        u32 chunksize_max = max_packet_size
                                                        - BASE_HEADER_SIZE
                                                        - RELIABLE_HEADER_SIZE;
@@ -991,13 +1017,13 @@ bool UDPPeer::processReliableSendCommand(
        sanity_check(c.data.getSize() < MAX_RELIABLE_WINDOW_SIZE*512);
 
        std::list<SharedBuffer<u8>> originals;
-       u16 split_sequence_number = channels[c.channelnum].readNextSplitSeqNum();
+       u16 split_sequence_number = chan.readNextSplitSeqNum();
 
        if (c.raw) {
                originals.emplace_back(c.data);
        } else {
                makeAutoSplitPacket(c.data, chunksize_max,split_sequence_number, &originals);
-               channels[c.channelnum].setNextSplitSeqNum(split_sequence_number);
+               chan.setNextSplitSeqNum(split_sequence_number);
        }
 
        bool have_sequence_number = true;
@@ -1006,7 +1032,7 @@ bool UDPPeer::processReliableSendCommand(
        volatile u16 initial_sequence_number = 0;
 
        for (SharedBuffer<u8> &original : originals) {
-               u16 seqnum = channels[c.channelnum].getOutgoingSequenceNumber(have_sequence_number);
+               u16 seqnum = chan.getOutgoingSequenceNumber(have_sequence_number);
 
                /* oops, we don't have enough sequence numbers to send this packet */
                if (!have_sequence_number)
@@ -1038,10 +1064,10 @@ bool UDPPeer::processReliableSendCommand(
 //                                     << " channel: " << (c.channelnum&0xFF)
 //                                     << " seqnum: " << readU16(&p.data[BASE_HEADER_SIZE+1])
 //                                     << std::endl)
-                       channels[c.channelnum].queued_reliables.push(p);
+                       chan.queued_reliables.push(p);
                        pcount++;
                }
-               sanity_check(channels[c.channelnum].queued_reliables.size() < 0xFFFF);
+               sanity_check(chan.queued_reliables.size() < 0xFFFF);
                return true;
        }
 
@@ -1056,12 +1082,16 @@ bool UDPPeer::processReliableSendCommand(
                toadd.pop();
 
                bool successfully_put_back_sequence_number
-                       = channels[c.channelnum].putBackSequenceNumber(
+                       = chan.putBackSequenceNumber(
                                (initial_sequence_number+toadd.size() % (SEQNUM_MAX+1)));
 
                FATAL_ERROR_IF(!successfully_put_back_sequence_number, "error");
        }
 
+       // DO NOT REMOVE n_queued! It avoids a deadlock of async locked
+       // 'log_message_mutex' and 'm_list_mutex'.
+       u32 n_queued = chan.outgoing_reliables_sent.size();
+
        LOG(dout_con<<m_connection->getDesc()
                        << " Windowsize exceeded on reliable sending "
                        << c.data.getSize() << " bytes"
@@ -1070,7 +1100,7 @@ bool UDPPeer::processReliableSendCommand(
                        << std::endl << "\t\tgot at most            : "
                        << packets_available << " packets"
                        << std::endl << "\t\tpackets queued         : "
-                       << channels[c.channelnum].outgoing_reliables_sent.size()
+                       << n_queued
                        << std::endl);
 
        return false;
@@ -1143,7 +1173,9 @@ Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
        m_bc_peerhandler(peerhandler)
 
 {
-       m_udpSocket.setTimeoutMs(5);
+       /* Amount of time Receive() will wait for data, this is entirely different
+        * from the connection timeout */
+       m_udpSocket.setTimeoutMs(500);
 
        m_sendThread->setParent(this);
        m_receiveThread->setParent(this);
@@ -1312,16 +1344,21 @@ void Connection::Disconnect()
        putCommand(c);
 }
 
-void Connection::Receive(NetworkPacket* pkt)
+bool Connection::Receive(NetworkPacket *pkt, u32 timeout)
 {
+       /*
+               Note that this function can potentially wait infinitely if non-data
+               events keep happening before the timeout expires.
+               This is not considered to be a problem (is it?)
+       */
        for(;;) {
-               ConnectionEvent e = waitEvent(m_bc_receive_timeout);
+               ConnectionEvent e = waitEvent(timeout);
                if (e.type != CONNEVENT_NONE)
                        LOG(dout_con << getDesc() << ": Receive: got event: "
                                        << e.describe() << std::endl);
                switch(e.type) {
                case CONNEVENT_NONE:
-                       throw NoIncomingDataException("No incoming data");
+                       return false;
                case CONNEVENT_DATA_RECEIVED:
                        // Data size is lesser than command size, ignoring packet
                        if (e.data.getSize() < 2) {
@@ -1329,7 +1366,7 @@ void Connection::Receive(NetworkPacket* pkt)
                        }
 
                        pkt->putRawPacket(*e.data, e.data.getSize(), e.peer_id);
-                       return;
+                       return true;
                case CONNEVENT_PEER_ADDED: {
                        UDPPeer tmp(e.peer_id, e.address, this);
                        if (m_bc_peerhandler)
@@ -1347,7 +1384,19 @@ void Connection::Receive(NetworkPacket* pkt)
                                        "(port already in use?)");
                }
        }
-       throw NoIncomingDataException("No incoming data");
+       return false;
+}
+
+void Connection::Receive(NetworkPacket *pkt)
+{
+       bool any = Receive(pkt, m_bc_receive_timeout);
+       if (!any)
+               throw NoIncomingDataException("No incoming data");
+}
+
+bool Connection::TryReceive(NetworkPacket *pkt)
+{
+       return Receive(pkt, 0);
 }
 
 void Connection::Send(session_t peer_id, u8 channelnum,