3 Copyright (C) 2013-2017 celeron55, Perttu Ahola <celeron55@gmail.com>
4 Copyright (C) 2017 celeron55, Loic Blot <loic.blot@unix-experience.fr>
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU Lesser General Public License as published by
8 the Free Software Foundation; either version 2.1 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU Lesser General Public License for more details.
16 You should have received a copy of the GNU Lesser General Public License along
17 with this program; if not, write to the Free Software Foundation, Inc.,
18 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 #include "connectionthreads.h"
25 #include "network/networkpacket.h"
26 #include "util/serialize.h"
31 /******************************************************************************/
32 /* defines used for debugging and profiling */
33 /******************************************************************************/
37 #undef DEBUG_CONNECTION_KBPS
39 /* this mutex is used to achieve log message consistency */
40 std::mutex log_conthread_mutex;
43 MutexAutoLock loglock(log_conthread_mutex); \
47 //#define DEBUG_CONNECTION_KBPS
48 #undef DEBUG_CONNECTION_KBPS
51 /* maximum number of retries for reliable packets */
52 #define MAX_RELIABLE_RETRY 5
56 static session_t readPeerId(u8 *packetdata)
58 return readU16(&packetdata[4]);
60 static u8 readChannel(u8 *packetdata)
62 return readU8(&packetdata[6]);
65 /******************************************************************************/
66 /* Connection Threads */
67 /******************************************************************************/
69 ConnectionSendThread::ConnectionSendThread(unsigned int max_packet_size,
71 Thread("ConnectionSend"),
72 m_max_packet_size(max_packet_size),
74 m_max_data_packets_per_iteration(g_settings->getU16("max_packets_per_iteration"))
78 void *ConnectionSendThread::run()
82 LOG(dout_con << m_connection->getDesc()
83 << "ConnectionSend thread started" << std::endl);
85 u64 curtime = porting::getTimeMs();
86 u64 lasttime = curtime;
88 PROFILE(std::stringstream ThreadIdentifier);
89 PROFILE(ThreadIdentifier << "ConnectionSend: [" << m_connection->getDesc() << "]");
91 /* if stop is requested don't stop immediately but try to send all */
93 while (!stopRequested() || packetsQueued()) {
94 BEGIN_DEBUG_EXCEPTION_HANDLER
95 PROFILE(ScopeProfiler sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
97 m_iteration_packets_avaialble = m_max_data_packets_per_iteration;
99 /* wait for trigger or timeout */
100 m_send_sleep_semaphore.wait(50);
102 /* remove all triggers */
103 while (m_send_sleep_semaphore.wait(0)) {
107 curtime = porting::getTimeMs();
108 float dtime = CALC_DTIME(lasttime, curtime);
110 /* first do all the reliable stuff */
113 /* translate commands to packets */
114 ConnectionCommand c = m_connection->m_command_queue.pop_frontNoEx(0);
115 while (c.type != CONNCMD_NONE) {
117 processReliableCommand(c);
119 processNonReliableCommand(c);
121 c = m_connection->m_command_queue.pop_frontNoEx(0);
124 /* send non reliable packets */
127 END_DEBUG_EXCEPTION_HANDLER
130 PROFILE(g_profiler->remove(ThreadIdentifier.str()));
134 void ConnectionSendThread::Trigger()
136 m_send_sleep_semaphore.post();
139 bool ConnectionSendThread::packetsQueued()
141 std::list<session_t> peerIds = m_connection->getPeerIDs();
143 if (!m_outgoing_queue.empty() && !peerIds.empty())
146 for (session_t peerId : peerIds) {
147 PeerHelper peer = m_connection->getPeerNoEx(peerId);
152 if (dynamic_cast<UDPPeer *>(&peer) == 0)
155 for (Channel &channel : (dynamic_cast<UDPPeer *>(&peer))->channels) {
156 if (!channel.queued_commands.empty()) {
166 void ConnectionSendThread::runTimeouts(float dtime)
168 std::list<session_t> timeouted_peers;
169 std::list<session_t> peerIds = m_connection->getPeerIDs();
171 for (session_t &peerId : peerIds) {
172 PeerHelper peer = m_connection->getPeerNoEx(peerId);
177 UDPPeer *udpPeer = dynamic_cast<UDPPeer *>(&peer);
181 PROFILE(std::stringstream peerIdentifier);
182 PROFILE(peerIdentifier << "runTimeouts[" << m_connection->getDesc()
183 << ";" << peerId << ";RELIABLE]");
184 PROFILE(ScopeProfiler
185 peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
187 SharedBuffer<u8> data(2); // data for sending ping, required here because of goto
192 if (peer->isTimedOut(m_timeout)) {
193 infostream << m_connection->getDesc()
194 << "RunTimeouts(): Peer " << peer->id
196 << " (source=peer->timeout_counter)"
198 // Add peer to the list
199 timeouted_peers.push_back(peer->id);
200 // Don't bother going through the buffers of this one
204 float resend_timeout = udpPeer->getResendTimeout();
205 bool retry_count_exceeded = false;
206 for (Channel &channel : udpPeer->channels) {
207 std::list<BufferedPacket> timed_outs;
209 if (udpPeer->getLegacyPeer())
210 channel.setWindowSize(WINDOW_SIZE);
212 // Remove timed out incomplete unreliable split packets
213 channel.incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
215 // Increment reliable packet times
216 channel.outgoing_reliables_sent.incrementTimeouts(dtime);
218 unsigned int numpeers = m_connection->m_peers.size();
223 // Re-send timed out outgoing reliables
224 timed_outs = channel.outgoing_reliables_sent.getTimedOuts(resend_timeout,
225 (m_max_data_packets_per_iteration / numpeers));
227 channel.UpdatePacketLossCounter(timed_outs.size());
228 g_profiler->graphAdd("packets_lost", timed_outs.size());
230 m_iteration_packets_avaialble -= timed_outs.size();
232 for (std::list<BufferedPacket>::iterator k = timed_outs.begin();
233 k != timed_outs.end(); ++k) {
234 session_t peer_id = readPeerId(*(k->data));
235 u8 channelnum = readChannel(*(k->data));
236 u16 seqnum = readU16(&(k->data[BASE_HEADER_SIZE + 1]));
238 channel.UpdateBytesLost(k->data.getSize());
241 if (k->resend_count > MAX_RELIABLE_RETRY) {
242 retry_count_exceeded = true;
243 timeouted_peers.push_back(peer->id);
244 /* no need to check additional packets if a single one did timeout*/
248 LOG(derr_con << m_connection->getDesc()
249 << "RE-SENDING timed-out RELIABLE to "
250 << k->address.serializeString()
251 << "(t/o=" << resend_timeout << "): "
252 << "from_peer_id=" << peer_id
253 << ", channel=" << ((int) channelnum & 0xff)
254 << ", seqnum=" << seqnum
259 // do not handle rtt here as we can't decide if this packet was
260 // lost or really takes more time to transmit
263 if (retry_count_exceeded) {
264 break; /* no need to check other channels if we already did timeout */
267 channel.UpdateTimers(dtime, udpPeer->getLegacyPeer());
270 /* skip to next peer if we did timeout */
271 if (retry_count_exceeded)
274 /* send ping if necessary */
275 if (udpPeer->Ping(dtime, data)) {
276 LOG(dout_con << m_connection->getDesc()
277 << "Sending ping for peer_id: " << udpPeer->id << std::endl);
278 /* this may fail if there ain't a sequence number left */
279 if (!rawSendAsPacket(udpPeer->id, 0, data, true)) {
280 //retrigger with reduced ping interval
281 udpPeer->Ping(4.0, data);
285 udpPeer->RunCommandQueues(m_max_packet_size,
286 m_max_commands_per_iteration,
287 m_max_packets_requeued);
290 // Remove timed out peers
291 for (u16 timeouted_peer : timeouted_peers) {
292 LOG(derr_con << m_connection->getDesc()
293 << "RunTimeouts(): Removing peer " << timeouted_peer << std::endl);
294 m_connection->deletePeer(timeouted_peer, true);
298 void ConnectionSendThread::rawSend(const BufferedPacket &packet)
301 m_connection->m_udpSocket.Send(packet.address, *packet.data,
302 packet.data.getSize());
303 LOG(dout_con << m_connection->getDesc()
304 << " rawSend: " << packet.data.getSize()
305 << " bytes sent" << std::endl);
306 } catch (SendFailedException &e) {
307 LOG(derr_con << m_connection->getDesc()
308 << "Connection::rawSend(): SendFailedException: "
309 << packet.address.serializeString() << std::endl);
313 void ConnectionSendThread::sendAsPacketReliable(BufferedPacket &p, Channel *channel)
316 p.absolute_send_time = porting::getTimeMs();
318 channel->outgoing_reliables_sent.insert(p,
319 (channel->readOutgoingSequenceNumber() - MAX_RELIABLE_WINDOW_SIZE)
320 % (MAX_RELIABLE_WINDOW_SIZE + 1));
322 catch (AlreadyExistsException &e) {
323 LOG(derr_con << m_connection->getDesc()
324 << "WARNING: Going to send a reliable packet"
325 << " in outgoing buffer" << std::endl);
332 bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum,
333 SharedBuffer<u8> data, bool reliable)
335 PeerHelper peer = m_connection->getPeerNoEx(peer_id);
337 LOG(dout_con << m_connection->getDesc()
338 << " INFO: dropped packet for non existent peer_id: "
339 << peer_id << std::endl);
340 FATAL_ERROR_IF(!reliable,
341 "Trying to send raw packet reliable but no peer found!");
344 Channel *channel = &(dynamic_cast<UDPPeer *>(&peer)->channels[channelnum]);
347 bool have_sequence_number_for_raw_packet = true;
349 channel->getOutgoingSequenceNumber(have_sequence_number_for_raw_packet);
351 if (!have_sequence_number_for_raw_packet)
354 SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
355 Address peer_address;
356 peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
358 // Add base headers and make a packet
359 BufferedPacket p = con::makePacket(peer_address, reliable,
360 m_connection->GetProtocolID(), m_connection->GetPeerID(),
363 // first check if our send window is already maxed out
364 if (channel->outgoing_reliables_sent.size()
365 < channel->getWindowSize()) {
366 LOG(dout_con << m_connection->getDesc()
367 << " INFO: sending a reliable packet to peer_id " << peer_id
368 << " channel: " << channelnum
369 << " seqnum: " << seqnum << std::endl);
370 sendAsPacketReliable(p, channel);
374 LOG(dout_con << m_connection->getDesc()
375 << " INFO: queueing reliable packet for peer_id: " << peer_id
376 << " channel: " << channelnum
377 << " seqnum: " << seqnum << std::endl);
378 channel->queued_reliables.push(p);
382 Address peer_address;
383 if (peer->getAddress(MTP_UDP, peer_address)) {
384 // Add base headers and make a packet
385 BufferedPacket p = con::makePacket(peer_address, data,
386 m_connection->GetProtocolID(), m_connection->GetPeerID(),
394 LOG(dout_con << m_connection->getDesc()
395 << " INFO: dropped unreliable packet for peer_id: " << peer_id
396 << " because of (yet) missing udp address" << std::endl);
400 void ConnectionSendThread::processReliableCommand(ConnectionCommand &c)
402 assert(c.reliable); // Pre-condition
406 LOG(dout_con << m_connection->getDesc()
407 << "UDP processing reliable CONNCMD_NONE" << std::endl);
411 LOG(dout_con << m_connection->getDesc()
412 << "UDP processing reliable CONNCMD_SEND" << std::endl);
416 case CONNCMD_SEND_TO_ALL:
417 LOG(dout_con << m_connection->getDesc()
418 << "UDP processing CONNCMD_SEND_TO_ALL" << std::endl);
419 sendToAllReliable(c);
422 case CONCMD_CREATE_PEER:
423 LOG(dout_con << m_connection->getDesc()
424 << "UDP processing reliable CONCMD_CREATE_PEER" << std::endl);
425 if (!rawSendAsPacket(c.peer_id, c.channelnum, c.data, c.reliable)) {
426 /* put to queue if we couldn't send it immediately */
431 case CONCMD_DISABLE_LEGACY:
432 LOG(dout_con << m_connection->getDesc()
433 << "UDP processing reliable CONCMD_DISABLE_LEGACY" << std::endl);
434 if (!rawSendAsPacket(c.peer_id, c.channelnum, c.data, c.reliable)) {
435 /* put to queue if we couldn't send it immediately */
441 case CONNCMD_CONNECT:
442 case CONNCMD_DISCONNECT:
444 FATAL_ERROR("Got command that shouldn't be reliable as reliable command");
446 LOG(dout_con << m_connection->getDesc()
447 << " Invalid reliable command type: " << c.type << std::endl);
452 void ConnectionSendThread::processNonReliableCommand(ConnectionCommand &c)
454 assert(!c.reliable); // Pre-condition
458 LOG(dout_con << m_connection->getDesc()
459 << " UDP processing CONNCMD_NONE" << std::endl);
462 LOG(dout_con << m_connection->getDesc()
463 << " UDP processing CONNCMD_SERVE port="
464 << c.address.serializeString() << std::endl);
467 case CONNCMD_CONNECT:
468 LOG(dout_con << m_connection->getDesc()
469 << " UDP processing CONNCMD_CONNECT" << std::endl);
472 case CONNCMD_DISCONNECT:
473 LOG(dout_con << m_connection->getDesc()
474 << " UDP processing CONNCMD_DISCONNECT" << std::endl);
477 case CONNCMD_DISCONNECT_PEER:
478 LOG(dout_con << m_connection->getDesc()
479 << " UDP processing CONNCMD_DISCONNECT_PEER" << std::endl);
480 disconnect_peer(c.peer_id);
483 LOG(dout_con << m_connection->getDesc()
484 << " UDP processing CONNCMD_SEND" << std::endl);
485 send(c.peer_id, c.channelnum, c.data);
487 case CONNCMD_SEND_TO_ALL:
488 LOG(dout_con << m_connection->getDesc()
489 << " UDP processing CONNCMD_SEND_TO_ALL" << std::endl);
490 sendToAll(c.channelnum, c.data);
493 LOG(dout_con << m_connection->getDesc()
494 << " UDP processing CONCMD_ACK" << std::endl);
495 sendAsPacket(c.peer_id, c.channelnum, c.data, true);
497 case CONCMD_CREATE_PEER:
498 FATAL_ERROR("Got command that should be reliable as unreliable command");
500 LOG(dout_con << m_connection->getDesc()
501 << " Invalid command type: " << c.type << std::endl);
505 void ConnectionSendThread::serve(Address bind_address)
507 LOG(dout_con << m_connection->getDesc()
508 << "UDP serving at port " << bind_address.serializeString() << std::endl);
510 m_connection->m_udpSocket.Bind(bind_address);
511 m_connection->SetPeerID(PEER_ID_SERVER);
513 catch (SocketException &e) {
517 m_connection->putEvent(ce);
521 void ConnectionSendThread::connect(Address address)
523 LOG(dout_con << m_connection->getDesc() << " connecting to "
524 << address.serializeString()
525 << ":" << address.getPort() << std::endl);
527 UDPPeer *peer = m_connection->createServerPeer(address);
531 e.peerAdded(peer->id, peer->address);
532 m_connection->putEvent(e);
536 if (address.isIPv6())
537 bind_addr.setAddress((IPv6AddressBytes *) NULL);
539 bind_addr.setAddress(0, 0, 0, 0);
541 m_connection->m_udpSocket.Bind(bind_addr);
543 // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
544 m_connection->SetPeerID(PEER_ID_INEXISTENT);
545 NetworkPacket pkt(0, 0);
546 m_connection->Send(PEER_ID_SERVER, 0, &pkt, true);
549 void ConnectionSendThread::disconnect()
551 LOG(dout_con << m_connection->getDesc() << " disconnecting" << std::endl);
553 // Create and send DISCO packet
554 SharedBuffer<u8> data(2);
555 writeU8(&data[0], PACKET_TYPE_CONTROL);
556 writeU8(&data[1], CONTROLTYPE_DISCO);
560 std::list<session_t> peerids = m_connection->getPeerIDs();
562 for (session_t peerid : peerids) {
563 sendAsPacket(peerid, 0, data, false);
567 void ConnectionSendThread::disconnect_peer(session_t peer_id)
569 LOG(dout_con << m_connection->getDesc() << " disconnecting peer" << std::endl);
571 // Create and send DISCO packet
572 SharedBuffer<u8> data(2);
573 writeU8(&data[0], PACKET_TYPE_CONTROL);
574 writeU8(&data[1], CONTROLTYPE_DISCO);
575 sendAsPacket(peer_id, 0, data, false);
577 PeerHelper peer = m_connection->getPeerNoEx(peer_id);
582 if (dynamic_cast<UDPPeer *>(&peer) == 0) {
586 dynamic_cast<UDPPeer *>(&peer)->m_pending_disconnect = true;
589 void ConnectionSendThread::send(session_t peer_id, u8 channelnum,
590 SharedBuffer<u8> data)
592 assert(channelnum < CHANNEL_COUNT); // Pre-condition
594 PeerHelper peer = m_connection->getPeerNoEx(peer_id);
596 LOG(dout_con << m_connection->getDesc() << " peer: peer_id=" << peer_id
597 << ">>>NOT<<< found on sending packet"
598 << ", channel " << (channelnum % 0xFF)
599 << ", size: " << data.getSize() << std::endl);
603 LOG(dout_con << m_connection->getDesc() << " sending to peer_id=" << peer_id
604 << ", channel " << (channelnum % 0xFF)
605 << ", size: " << data.getSize() << std::endl);
607 u16 split_sequence_number = peer->getNextSplitSequenceNumber(channelnum);
609 u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
610 std::list<SharedBuffer<u8>> originals;
612 makeAutoSplitPacket(data, chunksize_max, split_sequence_number, &originals);
614 peer->setNextSplitSequenceNumber(channelnum, split_sequence_number);
616 for (const SharedBuffer<u8> &original : originals) {
617 sendAsPacket(peer_id, channelnum, original);
621 void ConnectionSendThread::sendReliable(ConnectionCommand &c)
623 PeerHelper peer = m_connection->getPeerNoEx(c.peer_id);
627 peer->PutReliableSendCommand(c, m_max_packet_size);
630 void ConnectionSendThread::sendToAll(u8 channelnum, SharedBuffer<u8> data)
632 std::list<session_t> peerids = m_connection->getPeerIDs();
634 for (session_t peerid : peerids) {
635 send(peerid, channelnum, data);
639 void ConnectionSendThread::sendToAllReliable(ConnectionCommand &c)
641 std::list<session_t> peerids = m_connection->getPeerIDs();
643 for (session_t peerid : peerids) {
644 PeerHelper peer = m_connection->getPeerNoEx(peerid);
649 peer->PutReliableSendCommand(c, m_max_packet_size);
653 void ConnectionSendThread::sendPackets(float dtime)
655 std::list<session_t> peerIds = m_connection->getPeerIDs();
656 std::list<session_t> pendingDisconnect;
657 std::map<session_t, bool> pending_unreliable;
659 for (session_t peerId : peerIds) {
660 PeerHelper peer = m_connection->getPeerNoEx(peerId);
661 //peer may have been removed
663 LOG(dout_con << m_connection->getDesc() << " Peer not found: peer_id="
668 peer->m_increment_packets_remaining =
669 m_iteration_packets_avaialble / m_connection->m_peers.size();
671 UDPPeer *udpPeer = dynamic_cast<UDPPeer *>(&peer);
677 if (udpPeer->m_pending_disconnect) {
678 pendingDisconnect.push_back(peerId);
681 PROFILE(std::stringstream
684 peerIdentifier << "sendPackets[" << m_connection->getDesc() << ";" << peerId
686 PROFILE(ScopeProfiler
687 peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
689 LOG(dout_con << m_connection->getDesc()
690 << " Handle per peer queues: peer_id=" << peerId
691 << " packet quota: " << peer->m_increment_packets_remaining << std::endl);
693 // first send queued reliable packets for all peers (if possible)
694 for (unsigned int i = 0; i < CHANNEL_COUNT; i++) {
695 Channel &channel = udpPeer->channels[i];
698 channel.outgoing_reliables_sent.getFirstSeqnum(next_to_ack);
699 u16 next_to_receive = 0;
700 channel.incoming_reliables.getFirstSeqnum(next_to_receive);
702 LOG(dout_con << m_connection->getDesc() << "\t channel: "
703 << i << ", peer quota:"
704 << peer->m_increment_packets_remaining
706 << "\t\t\treliables on wire: "
707 << channel.outgoing_reliables_sent.size()
708 << ", waiting for ack for " << next_to_ack
710 << "\t\t\tincoming_reliables: "
711 << channel.incoming_reliables.size()
712 << ", next reliable packet: "
713 << channel.readNextIncomingSeqNum()
714 << ", next queued: " << next_to_receive
716 << "\t\t\treliables queued : "
717 << channel.queued_reliables.size()
719 << "\t\t\tqueued commands : "
720 << channel.queued_commands.size()
723 while (!channel.queued_reliables.empty() &&
724 channel.outgoing_reliables_sent.size()
725 < channel.getWindowSize() &&
726 peer->m_increment_packets_remaining > 0) {
727 BufferedPacket p = channel.queued_reliables.front();
728 channel.queued_reliables.pop();
729 LOG(dout_con << m_connection->getDesc()
730 << " INFO: sending a queued reliable packet "
732 << ", seqnum: " << readU16(&p.data[BASE_HEADER_SIZE + 1])
734 sendAsPacketReliable(p, &channel);
735 peer->m_increment_packets_remaining--;
740 if (!m_outgoing_queue.empty()) {
741 LOG(dout_con << m_connection->getDesc()
742 << " Handle non reliable queue ("
743 << m_outgoing_queue.size() << " pkts)" << std::endl);
746 unsigned int initial_queuesize = m_outgoing_queue.size();
747 /* send non reliable packets*/
748 for (unsigned int i = 0; i < initial_queuesize; i++) {
749 OutgoingPacket packet = m_outgoing_queue.front();
750 m_outgoing_queue.pop();
755 PeerHelper peer = m_connection->getPeerNoEx(packet.peer_id);
757 LOG(dout_con << m_connection->getDesc()
758 << " Outgoing queue: peer_id=" << packet.peer_id
759 << ">>>NOT<<< found on sending packet"
760 << ", channel " << (packet.channelnum % 0xFF)
761 << ", size: " << packet.data.getSize() << std::endl);
765 /* send acks immediately */
767 rawSendAsPacket(packet.peer_id, packet.channelnum,
768 packet.data, packet.reliable);
769 peer->m_increment_packets_remaining =
770 MYMIN(0, peer->m_increment_packets_remaining--);
772 (peer->m_increment_packets_remaining > 0) ||
774 rawSendAsPacket(packet.peer_id, packet.channelnum,
775 packet.data, packet.reliable);
776 peer->m_increment_packets_remaining--;
778 m_outgoing_queue.push(packet);
779 pending_unreliable[packet.peer_id] = true;
783 for (session_t peerId : pendingDisconnect) {
784 if (!pending_unreliable[peerId]) {
785 m_connection->deletePeer(peerId, false);
790 void ConnectionSendThread::sendAsPacket(session_t peer_id, u8 channelnum,
791 SharedBuffer<u8> data, bool ack)
793 OutgoingPacket packet(peer_id, channelnum, data, false, ack);
794 m_outgoing_queue.push(packet);
797 ConnectionReceiveThread::ConnectionReceiveThread(unsigned int max_packet_size) :
798 Thread("ConnectionReceive")
802 void *ConnectionReceiveThread::run()
804 assert(m_connection);
806 LOG(dout_con << m_connection->getDesc()
807 << "ConnectionReceive thread started" << std::endl);
809 PROFILE(std::stringstream
811 PROFILE(ThreadIdentifier << "ConnectionReceive: [" << m_connection->getDesc() << "]");
813 #ifdef DEBUG_CONNECTION_KBPS
814 u64 curtime = porting::getTimeMs();
815 u64 lasttime = curtime;
816 float debug_print_timer = 0.0;
819 while (!stopRequested()) {
820 BEGIN_DEBUG_EXCEPTION_HANDLER
821 PROFILE(ScopeProfiler
822 sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
824 #ifdef DEBUG_CONNECTION_KBPS
826 curtime = porting::getTimeMs();
827 float dtime = CALC_DTIME(lasttime,curtime);
830 /* receive packets */
833 #ifdef DEBUG_CONNECTION_KBPS
834 debug_print_timer += dtime;
835 if (debug_print_timer > 20.0) {
836 debug_print_timer -= 20.0;
838 std::list<session_t> peerids = m_connection->getPeerIDs();
840 for (std::list<session_t>::iterator i = peerids.begin();
844 PeerHelper peer = m_connection->getPeerNoEx(*i);
848 float peer_current = 0.0;
849 float peer_loss = 0.0;
850 float avg_rate = 0.0;
851 float avg_loss = 0.0;
853 for(u16 j=0; j<CHANNEL_COUNT; j++)
855 peer_current +=peer->channels[j].getCurrentDownloadRateKB();
856 peer_loss += peer->channels[j].getCurrentLossRateKB();
857 avg_rate += peer->channels[j].getAvgDownloadRateKB();
858 avg_loss += peer->channels[j].getAvgLossRateKB();
861 std::stringstream output;
862 output << std::fixed << std::setprecision(1);
863 output << "OUT to Peer " << *i << " RATES (good / loss) " << std::endl;
864 output << "\tcurrent (sum): " << peer_current << "kb/s "<< peer_loss << "kb/s" << std::endl;
865 output << "\taverage (sum): " << avg_rate << "kb/s "<< avg_loss << "kb/s" << std::endl;
866 output << std::setfill(' ');
867 for(u16 j=0; j<CHANNEL_COUNT; j++)
869 output << "\tcha " << j << ":"
870 << " CUR: " << std::setw(6) << peer->channels[j].getCurrentDownloadRateKB() <<"kb/s"
871 << " AVG: " << std::setw(6) << peer->channels[j].getAvgDownloadRateKB() <<"kb/s"
872 << " MAX: " << std::setw(6) << peer->channels[j].getMaxDownloadRateKB() <<"kb/s"
874 << " CUR: " << std::setw(6) << peer->channels[j].getCurrentLossRateKB() <<"kb/s"
875 << " AVG: " << std::setw(6) << peer->channels[j].getAvgLossRateKB() <<"kb/s"
876 << " MAX: " << std::setw(6) << peer->channels[j].getMaxLossRateKB() <<"kb/s"
877 << " / WS: " << peer->channels[j].getWindowSize()
881 fprintf(stderr,"%s\n",output.str().c_str());
885 END_DEBUG_EXCEPTION_HANDLER
888 PROFILE(g_profiler->remove(ThreadIdentifier.str()));
892 // Receive packets from the network and buffers and create ConnectionEvents
893 void ConnectionReceiveThread::receive()
895 // use IPv6 minimum allowed MTU as receive buffer size as this is
896 // theoretical reliable upper boundary of a udp packet for all IPv6 enabled
898 unsigned int packet_maxsize = 1500;
899 SharedBuffer<u8> packetdata(packet_maxsize);
901 bool packet_queued = true;
903 unsigned int loop_count = 0;
905 /* first of all read packets from socket */
906 /* check for incoming data available */
907 while ((loop_count < 10) &&
908 (m_connection->m_udpSocket.WaitData(50))) {
912 bool data_left = true;
914 SharedBuffer<u8> resultdata;
917 data_left = getFromBuffers(peer_id, resultdata);
920 e.dataReceived(peer_id, resultdata);
921 m_connection->putEvent(e);
924 catch (ProcessedSilentlyException &e) {
925 /* try reading again */
928 packet_queued = false;
932 s32 received_size = m_connection->m_udpSocket.Receive(sender, *packetdata,
935 if ((received_size < BASE_HEADER_SIZE) ||
936 (readU32(&packetdata[0]) != m_connection->GetProtocolID())) {
937 LOG(derr_con << m_connection->getDesc()
938 << "Receive(): Invalid incoming packet, "
939 << "size: " << received_size
941 << ((received_size >= 4) ? readU32(&packetdata[0]) : -1)
946 session_t peer_id = readPeerId(*packetdata);
947 u8 channelnum = readChannel(*packetdata);
949 if (channelnum > CHANNEL_COUNT - 1) {
950 LOG(derr_con << m_connection->getDesc()
951 << "Receive(): Invalid channel " << channelnum << std::endl);
952 throw InvalidIncomingDataException("Channel doesn't exist");
955 /* Try to identify peer by sender address (may happen on join) */
956 if (peer_id == PEER_ID_INEXISTENT) {
957 peer_id = m_connection->lookupPeer(sender);
958 // We do not have to remind the peer of its
959 // peer id as the CONTROLTYPE_SET_PEER_ID
960 // command was sent reliably.
963 /* The peer was not found in our lists. Add it. */
964 if (peer_id == PEER_ID_INEXISTENT) {
965 peer_id = m_connection->createPeer(sender, MTP_MINETEST_RELIABLE_UDP, 0);
968 PeerHelper peer = m_connection->getPeerNoEx(peer_id);
971 LOG(dout_con << m_connection->getDesc()
972 << " got packet from unknown peer_id: "
973 << peer_id << " Ignoring." << std::endl);
977 // Validate peer address
979 Address peer_address;
981 if (peer->getAddress(MTP_UDP, peer_address)) {
982 if (peer_address != sender) {
983 LOG(derr_con << m_connection->getDesc()
984 << m_connection->getDesc()
985 << " Peer " << peer_id << " sending from different address."
986 " Ignoring." << std::endl);
991 bool invalid_address = true;
992 if (invalid_address) {
993 LOG(derr_con << m_connection->getDesc()
994 << m_connection->getDesc()
995 << " Peer " << peer_id << " unknown."
996 " Ignoring." << std::endl);
1001 peer->ResetTimeout();
1003 Channel *channel = 0;
1005 if (dynamic_cast<UDPPeer *>(&peer) != 0) {
1006 channel = &(dynamic_cast<UDPPeer *>(&peer)->channels[channelnum]);
1010 channel->UpdateBytesReceived(received_size);
1013 // Throw the received packet to channel->processPacket()
1015 // Make a new SharedBuffer from the data without the base headers
1016 SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
1017 memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
1018 strippeddata.getSize());
1021 // Process it (the result is some data with no headers made by us)
1022 SharedBuffer<u8> resultdata = processPacket
1023 (channel, strippeddata, peer_id, channelnum, false);
1025 LOG(dout_con << m_connection->getDesc()
1026 << " ProcessPacket from peer_id: " << peer_id
1027 << ",channel: " << (channelnum & 0xFF) << ", returned "
1028 << resultdata.getSize() << " bytes" << std::endl);
1031 e.dataReceived(peer_id, resultdata);
1032 m_connection->putEvent(e);
1034 catch (ProcessedSilentlyException &e) {
1036 catch (ProcessedQueued &e) {
1037 packet_queued = true;
1040 catch (InvalidIncomingDataException &e) {
1042 catch (ProcessedSilentlyException &e) {
1047 bool ConnectionReceiveThread::getFromBuffers(session_t &peer_id, SharedBuffer<u8> &dst)
1049 std::list<session_t> peerids = m_connection->getPeerIDs();
1051 for (session_t peerid : peerids) {
1052 PeerHelper peer = m_connection->getPeerNoEx(peerid);
1056 if (dynamic_cast<UDPPeer *>(&peer) == 0)
1059 for (Channel &channel : (dynamic_cast<UDPPeer *>(&peer))->channels) {
1060 if (checkIncomingBuffers(&channel, peer_id, dst)) {
1068 bool ConnectionReceiveThread::checkIncomingBuffers(Channel *channel,
1069 session_t &peer_id, SharedBuffer<u8> &dst)
1071 u16 firstseqnum = 0;
1072 if (channel->incoming_reliables.getFirstSeqnum(firstseqnum)) {
1073 if (firstseqnum == channel->readNextIncomingSeqNum()) {
1074 BufferedPacket p = channel->incoming_reliables.popFirst();
1075 peer_id = readPeerId(*p.data);
1076 u8 channelnum = readChannel(*p.data);
1077 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE + 1]);
1079 LOG(dout_con << m_connection->getDesc()
1080 << "UNBUFFERING TYPE_RELIABLE"
1081 << " seqnum=" << seqnum
1082 << " peer_id=" << peer_id
1083 << " channel=" << ((int) channelnum & 0xff)
1086 channel->incNextIncomingSeqNum();
1088 u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
1089 // Get out the inside packet and re-process it
1090 SharedBuffer<u8> payload(p.data.getSize() - headers_size);
1091 memcpy(*payload, &p.data[headers_size], payload.getSize());
1093 dst = processPacket(channel, payload, peer_id, channelnum, true);
1100 SharedBuffer<u8> ConnectionReceiveThread::processPacket(Channel *channel,
1101 SharedBuffer<u8> packetdata, session_t peer_id, u8 channelnum, bool reliable)
1103 PeerHelper peer = m_connection->getPeerNoEx(peer_id);
1106 errorstream << "Peer not found (possible timeout)" << std::endl;
1107 throw ProcessedSilentlyException("Peer not found (possible timeout)");
1110 if (packetdata.getSize() < 1)
1111 throw InvalidIncomingDataException("packetdata.getSize() < 1");
1113 u8 type = readU8(&(packetdata[0]));
1115 if (MAX_UDP_PEERS <= 65535 && peer_id >= MAX_UDP_PEERS) {
1116 std::string errmsg = "Invalid peer_id=" + itos(peer_id);
1117 errorstream << errmsg << std::endl;
1118 throw InvalidIncomingDataException(errmsg.c_str());
1121 if (type >= PACKET_TYPE_MAX) {
1122 derr_con << m_connection->getDesc() << "Got invalid type=" << ((int) type & 0xff)
1124 throw InvalidIncomingDataException("Invalid packet type");
1127 const PacketTypeHandler &pHandle = packetTypeRouter[type];
1128 return (this->*pHandle.handler)(channel, packetdata, &peer, channelnum, reliable);
1131 const ConnectionReceiveThread::PacketTypeHandler
1132 ConnectionReceiveThread::packetTypeRouter[PACKET_TYPE_MAX] = {
1133 {&ConnectionReceiveThread::handlePacketType_Control},
1134 {&ConnectionReceiveThread::handlePacketType_Original},
1135 {&ConnectionReceiveThread::handlePacketType_Split},
1136 {&ConnectionReceiveThread::handlePacketType_Reliable},
1139 SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *channel,
1140 SharedBuffer<u8> packetdata, Peer *peer, u8 channelnum, bool reliable)
1142 if (packetdata.getSize() < 2)
1143 throw InvalidIncomingDataException("packetdata.getSize() < 2");
1145 u8 controltype = readU8(&(packetdata[1]));
1147 if (controltype == CONTROLTYPE_ACK) {
1148 assert(channel != NULL);
1150 if (packetdata.getSize() < 4) {
1151 throw InvalidIncomingDataException(
1152 "packetdata.getSize() < 4 (ACK header size)");
1155 u16 seqnum = readU16(&packetdata[2]);
1156 LOG(dout_con << m_connection->getDesc() << " [ CONTROLTYPE_ACK: channelnum="
1157 << ((int) channelnum & 0xff) << ", peer_id=" << peer->id << ", seqnum="
1158 << seqnum << " ]" << std::endl);
1161 BufferedPacket p = channel->outgoing_reliables_sent.popSeqnum(seqnum);
1163 // only calculate rtt from straight sent packets
1164 if (p.resend_count == 0) {
1165 // Get round trip time
1166 u64 current_time = porting::getTimeMs();
1168 // a overflow is quite unlikely but as it'd result in major
1169 // rtt miscalculation we handle it here
1170 if (current_time > p.absolute_send_time) {
1171 float rtt = (current_time - p.absolute_send_time) / 1000.0;
1173 // Let peer calculate stuff according to it
1174 // (avg_rtt and resend_timeout)
1175 dynamic_cast<UDPPeer *>(peer)->reportRTT(rtt);
1176 } else if (p.totaltime > 0) {
1177 float rtt = p.totaltime;
1179 // Let peer calculate stuff according to it
1180 // (avg_rtt and resend_timeout)
1181 dynamic_cast<UDPPeer *>(peer)->reportRTT(rtt);
1184 // put bytes for max bandwidth calculation
1185 channel->UpdateBytesSent(p.data.getSize(), 1);
1186 if (channel->outgoing_reliables_sent.size() == 0)
1187 m_connection->TriggerSend();
1188 } catch (NotFoundException &e) {
1189 LOG(derr_con << m_connection->getDesc()
1190 << "WARNING: ACKed packet not in outgoing queue" << std::endl);
1191 channel->UpdatePacketTooLateCounter();
1194 throw ProcessedSilentlyException("Got an ACK");
1195 } else if (controltype == CONTROLTYPE_SET_PEER_ID) {
1196 // Got a packet to set our peer id
1197 if (packetdata.getSize() < 4)
1198 throw InvalidIncomingDataException
1199 ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
1200 session_t peer_id_new = readU16(&packetdata[2]);
1201 LOG(dout_con << m_connection->getDesc() << "Got new peer id: " << peer_id_new
1202 << "... " << std::endl);
1204 if (m_connection->GetPeerID() != PEER_ID_INEXISTENT) {
1205 LOG(derr_con << m_connection->getDesc()
1206 << "WARNING: Not changing existing peer id." << std::endl);
1208 LOG(dout_con << m_connection->getDesc() << "changing own peer id"
1210 m_connection->SetPeerID(peer_id_new);
1213 ConnectionCommand cmd;
1215 SharedBuffer<u8> reply(2);
1216 writeU8(&reply[0], PACKET_TYPE_CONTROL);
1217 writeU8(&reply[1], CONTROLTYPE_ENABLE_BIG_SEND_WINDOW);
1218 cmd.disableLegacy(PEER_ID_SERVER, reply);
1219 m_connection->putCommand(cmd);
1221 throw ProcessedSilentlyException("Got a SET_PEER_ID");
1222 } else if (controltype == CONTROLTYPE_PING) {
1223 // Just ignore it, the incoming data already reset
1224 // the timeout counter
1225 LOG(dout_con << m_connection->getDesc() << "PING" << std::endl);
1226 throw ProcessedSilentlyException("Got a PING");
1227 } else if (controltype == CONTROLTYPE_DISCO) {
1228 // Just ignore it, the incoming data already reset
1229 // the timeout counter
1230 LOG(dout_con << m_connection->getDesc() << "DISCO: Removing peer "
1231 << peer->id << std::endl);
1233 if (!m_connection->deletePeer(peer->id, false)) {
1234 derr_con << m_connection->getDesc() << "DISCO: Peer not found" << std::endl;
1237 throw ProcessedSilentlyException("Got a DISCO");
1238 } else if (controltype == CONTROLTYPE_ENABLE_BIG_SEND_WINDOW) {
1239 dynamic_cast<UDPPeer *>(peer)->setNonLegacyPeer();
1240 throw ProcessedSilentlyException("Got non legacy control");
1242 LOG(derr_con << m_connection->getDesc()
1243 << "INVALID TYPE_CONTROL: invalid controltype="
1244 << ((int) controltype & 0xff) << std::endl);
1245 throw InvalidIncomingDataException("Invalid control type");
1249 SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Original(Channel *channel,
1250 SharedBuffer<u8> packetdata, Peer *peer, u8 channelnum, bool reliable)
1252 if (packetdata.getSize() <= ORIGINAL_HEADER_SIZE)
1253 throw InvalidIncomingDataException
1254 ("packetdata.getSize() <= ORIGINAL_HEADER_SIZE");
1255 LOG(dout_con << m_connection->getDesc() << "RETURNING TYPE_ORIGINAL to user"
1257 // Get the inside packet out and return it
1258 SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
1259 memcpy(*payload, &(packetdata[ORIGINAL_HEADER_SIZE]), payload.getSize());
1263 SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Split(Channel *channel,
1264 SharedBuffer<u8> packetdata, Peer *peer, u8 channelnum, bool reliable)
1266 Address peer_address;
1268 if (peer->getAddress(MTP_UDP, peer_address)) {
1269 // We have to create a packet again for buffering
1270 // This isn't actually too bad an idea.
1271 BufferedPacket packet = makePacket(peer_address,
1273 m_connection->GetProtocolID(),
1277 // Buffer the packet
1278 SharedBuffer<u8> data = peer->addSplitPacket(channelnum, packet, reliable);
1280 if (data.getSize() != 0) {
1281 LOG(dout_con << m_connection->getDesc()
1282 << "RETURNING TYPE_SPLIT: Constructed full data, "
1283 << "size=" << data.getSize() << std::endl);
1286 LOG(dout_con << m_connection->getDesc() << "BUFFERED TYPE_SPLIT" << std::endl);
1287 throw ProcessedSilentlyException("Buffered a split packet chunk");
1290 // We should never get here.
1291 FATAL_ERROR("Invalid execution point");
1294 SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *channel,
1295 SharedBuffer<u8> packetdata, Peer *peer, u8 channelnum, bool reliable)
1297 assert(channel != NULL);
1299 // Recursive reliable packets not allowed
1301 throw InvalidIncomingDataException("Found nested reliable packets");
1303 if (packetdata.getSize() < RELIABLE_HEADER_SIZE)
1304 throw InvalidIncomingDataException("packetdata.getSize() < RELIABLE_HEADER_SIZE");
1306 u16 seqnum = readU16(&packetdata[1]);
1307 bool is_future_packet = false;
1308 bool is_old_packet = false;
1310 /* packet is within our receive window send ack */
1311 if (seqnum_in_window(seqnum,
1312 channel->readNextIncomingSeqNum(), MAX_RELIABLE_WINDOW_SIZE)) {
1313 m_connection->sendAck(peer->id, channelnum, seqnum);
1315 is_future_packet = seqnum_higher(seqnum, channel->readNextIncomingSeqNum());
1316 is_old_packet = seqnum_higher(channel->readNextIncomingSeqNum(), seqnum);
1318 /* packet is not within receive window, don't send ack. *
1319 * if this was a valid packet it's gonna be retransmitted */
1320 if (is_future_packet)
1321 throw ProcessedSilentlyException(
1322 "Received packet newer then expected, not sending ack");
1324 /* seems like our ack was lost, send another one for a old packet */
1325 if (is_old_packet) {
1326 LOG(dout_con << m_connection->getDesc()
1327 << "RE-SENDING ACK: peer_id: " << peer->id
1328 << ", channel: " << (channelnum & 0xFF)
1329 << ", seqnum: " << seqnum << std::endl;)
1330 m_connection->sendAck(peer->id, channelnum, seqnum);
1332 // we already have this packet so this one was on wire at least
1333 // the current timeout
1334 // we don't know how long this packet was on wire don't do silly guessing
1335 // dynamic_cast<UDPPeer*>(&peer)->
1336 // reportRTT(dynamic_cast<UDPPeer*>(&peer)->getResendTimeout());
1338 throw ProcessedSilentlyException("Retransmitting ack for old packet");
1342 if (seqnum != channel->readNextIncomingSeqNum()) {
1343 Address peer_address;
1345 // this is a reliable packet so we have a udp address for sure
1346 peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
1347 // This one comes later, buffer it.
1348 // Actually we have to make a packet to buffer one.
1349 // Well, we have all the ingredients, so just do it.
1350 BufferedPacket packet = con::makePacket(
1353 m_connection->GetProtocolID(),
1357 channel->incoming_reliables.insert(packet, channel->readNextIncomingSeqNum());
1359 LOG(dout_con << m_connection->getDesc()
1360 << "BUFFERING, TYPE_RELIABLE peer_id: " << peer->id
1361 << ", channel: " << (channelnum & 0xFF)
1362 << ", seqnum: " << seqnum << std::endl;)
1364 throw ProcessedQueued("Buffered future reliable packet");
1365 } catch (AlreadyExistsException &e) {
1366 } catch (IncomingDataCorruption &e) {
1367 ConnectionCommand discon;
1368 discon.disconnect_peer(peer->id);
1369 m_connection->putCommand(discon);
1371 LOG(derr_con << m_connection->getDesc()
1372 << "INVALID, TYPE_RELIABLE peer_id: " << peer->id
1373 << ", channel: " << (channelnum & 0xFF)
1374 << ", seqnum: " << seqnum
1375 << "DROPPING CLIENT!" << std::endl;)
1379 /* we got a packet to process right now */
1380 LOG(dout_con << m_connection->getDesc()
1381 << "RECURSIVE, TYPE_RELIABLE peer_id: " << peer->id
1382 << ", channel: " << (channelnum & 0xFF)
1383 << ", seqnum: " << seqnum << std::endl;)
1386 /* check for resend case */
1387 u16 queued_seqnum = 0;
1388 if (channel->incoming_reliables.getFirstSeqnum(queued_seqnum)) {
1389 if (queued_seqnum == seqnum) {
1390 BufferedPacket queued_packet = channel->incoming_reliables.popFirst();
1391 /** TODO find a way to verify the new against the old packet */
1395 channel->incNextIncomingSeqNum();
1397 // Get out the inside packet and re-process it
1398 SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
1399 memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
1401 return processPacket(channel, payload, peer->id, channelnum, true);