3 Copyright (C) 2013-2017 celeron55, Perttu Ahola <celeron55@gmail.com>
4 Copyright (C) 2017 celeron55, Loic Blot <loic.blot@unix-experience.fr>
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU Lesser General Public License as published by
8 the Free Software Foundation; either version 2.1 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU Lesser General Public License for more details.
16 You should have received a copy of the GNU Lesser General Public License along
17 with this program; if not, write to the Free Software Foundation, Inc.,
18 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 #include "connectionthreads.h"
25 #include "network/networkpacket.h"
26 #include "util/serialize.h"
31 /******************************************************************************/
32 /* defines used for debugging and profiling */
33 /******************************************************************************/
37 #undef DEBUG_CONNECTION_KBPS
39 /* this mutex is used to achieve log message consistency */
40 std::mutex log_conthread_mutex;
43 MutexAutoLock loglock(log_conthread_mutex); \
47 //#define DEBUG_CONNECTION_KBPS
48 #undef DEBUG_CONNECTION_KBPS
51 /* maximum number of retries for reliable packets */
52 #define MAX_RELIABLE_RETRY 5
56 static session_t readPeerId(u8 *packetdata)
58 return readU16(&packetdata[4]);
60 static u8 readChannel(u8 *packetdata)
62 return readU8(&packetdata[6]);
65 /******************************************************************************/
66 /* Connection Threads */
67 /******************************************************************************/
69 ConnectionSendThread::ConnectionSendThread(unsigned int max_packet_size,
71 Thread("ConnectionSend"),
72 m_max_packet_size(max_packet_size),
74 m_max_data_packets_per_iteration(g_settings->getU16("max_packets_per_iteration"))
76 SANITY_CHECK(m_max_data_packets_per_iteration > 1);
79 void *ConnectionSendThread::run()
83 LOG(dout_con << m_connection->getDesc()
84 << "ConnectionSend thread started" << std::endl);
86 u64 curtime = porting::getTimeMs();
87 u64 lasttime = curtime;
89 PROFILE(std::stringstream ThreadIdentifier);
90 PROFILE(ThreadIdentifier << "ConnectionSend: [" << m_connection->getDesc() << "]");
92 /* if stop is requested don't stop immediately but try to send all */
94 while (!stopRequested() || packetsQueued()) {
95 BEGIN_DEBUG_EXCEPTION_HANDLER
96 PROFILE(ScopeProfiler sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
98 m_iteration_packets_avaialble = m_max_data_packets_per_iteration;
100 /* wait for trigger or timeout */
101 m_send_sleep_semaphore.wait(50);
103 /* remove all triggers */
104 while (m_send_sleep_semaphore.wait(0)) {
108 curtime = porting::getTimeMs();
109 float dtime = CALC_DTIME(lasttime, curtime);
111 /* first resend timed-out packets */
113 if (m_iteration_packets_avaialble == 0) {
114 LOG(warningstream << m_connection->getDesc()
115 << " Packet quota used up after re-sending packets, "
116 << "max=" << m_max_data_packets_per_iteration << std::endl);
119 /* translate commands to packets */
120 ConnectionCommand c = m_connection->m_command_queue.pop_frontNoEx(0);
121 while (c.type != CONNCMD_NONE) {
123 processReliableCommand(c);
125 processNonReliableCommand(c);
127 c = m_connection->m_command_queue.pop_frontNoEx(0);
130 /* send queued packets */
133 END_DEBUG_EXCEPTION_HANDLER
136 PROFILE(g_profiler->remove(ThreadIdentifier.str()));
140 void ConnectionSendThread::Trigger()
142 m_send_sleep_semaphore.post();
145 bool ConnectionSendThread::packetsQueued()
147 std::list<session_t> peerIds = m_connection->getPeerIDs();
149 if (!m_outgoing_queue.empty() && !peerIds.empty())
152 for (session_t peerId : peerIds) {
153 PeerHelper peer = m_connection->getPeerNoEx(peerId);
158 if (dynamic_cast<UDPPeer *>(&peer) == 0)
161 for (Channel &channel : (dynamic_cast<UDPPeer *>(&peer))->channels) {
162 if (!channel.queued_commands.empty()) {
172 void ConnectionSendThread::runTimeouts(float dtime)
174 std::list<session_t> timeouted_peers;
175 std::list<session_t> peerIds = m_connection->getPeerIDs();
177 for (session_t &peerId : peerIds) {
178 PeerHelper peer = m_connection->getPeerNoEx(peerId);
183 UDPPeer *udpPeer = dynamic_cast<UDPPeer *>(&peer);
187 PROFILE(std::stringstream peerIdentifier);
188 PROFILE(peerIdentifier << "runTimeouts[" << m_connection->getDesc()
189 << ";" << peerId << ";RELIABLE]");
190 PROFILE(ScopeProfiler
191 peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
193 SharedBuffer<u8> data(2); // data for sending ping, required here because of goto
198 if (peer->isTimedOut(m_timeout)) {
199 infostream << m_connection->getDesc()
200 << "RunTimeouts(): Peer " << peer->id
203 // Add peer to the list
204 timeouted_peers.push_back(peer->id);
205 // Don't bother going through the buffers of this one
209 float resend_timeout = udpPeer->getResendTimeout();
210 bool retry_count_exceeded = false;
211 for (Channel &channel : udpPeer->channels) {
212 std::list<BufferedPacket> timed_outs;
214 // Remove timed out incomplete unreliable split packets
215 channel.incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
217 // Increment reliable packet times
218 channel.outgoing_reliables_sent.incrementTimeouts(dtime);
220 unsigned int numpeers = m_connection->m_peers.size();
225 // Re-send timed out outgoing reliables
226 timed_outs = channel.outgoing_reliables_sent.getTimedOuts(resend_timeout,
227 (m_max_data_packets_per_iteration / numpeers));
229 channel.UpdatePacketLossCounter(timed_outs.size());
230 g_profiler->graphAdd("packets_lost", timed_outs.size());
232 m_iteration_packets_avaialble -= timed_outs.size();
234 for (std::list<BufferedPacket>::iterator k = timed_outs.begin();
235 k != timed_outs.end(); ++k) {
236 session_t peer_id = readPeerId(*(k->data));
237 u8 channelnum = readChannel(*(k->data));
238 u16 seqnum = readU16(&(k->data[BASE_HEADER_SIZE + 1]));
240 channel.UpdateBytesLost(k->data.getSize());
243 if (k->resend_count > MAX_RELIABLE_RETRY) {
244 retry_count_exceeded = true;
245 timeouted_peers.push_back(peer->id);
246 /* no need to check additional packets if a single one did timeout*/
250 LOG(derr_con << m_connection->getDesc()
251 << "RE-SENDING timed-out RELIABLE to "
252 << k->address.serializeString()
253 << "(t/o=" << resend_timeout << "): "
254 << "from_peer_id=" << peer_id
255 << ", channel=" << ((int) channelnum & 0xff)
256 << ", seqnum=" << seqnum
261 // do not handle rtt here as we can't decide if this packet was
262 // lost or really takes more time to transmit
265 if (retry_count_exceeded) {
266 break; /* no need to check other channels if we already did timeout */
269 channel.UpdateTimers(dtime);
272 /* skip to next peer if we did timeout */
273 if (retry_count_exceeded)
276 /* send ping if necessary */
277 if (udpPeer->Ping(dtime, data)) {
278 LOG(dout_con << m_connection->getDesc()
279 << "Sending ping for peer_id: " << udpPeer->id << std::endl);
280 /* this may fail if there ain't a sequence number left */
281 if (!rawSendAsPacket(udpPeer->id, 0, data, true)) {
282 //retrigger with reduced ping interval
283 udpPeer->Ping(4.0, data);
287 udpPeer->RunCommandQueues(m_max_packet_size,
288 m_max_commands_per_iteration,
289 m_max_packets_requeued);
292 // Remove timed out peers
293 for (u16 timeouted_peer : timeouted_peers) {
294 LOG(dout_con << m_connection->getDesc()
295 << "RunTimeouts(): Removing peer " << timeouted_peer << std::endl);
296 m_connection->deletePeer(timeouted_peer, true);
300 void ConnectionSendThread::rawSend(const BufferedPacket &packet)
303 m_connection->m_udpSocket.Send(packet.address, *packet.data,
304 packet.data.getSize());
305 LOG(dout_con << m_connection->getDesc()
306 << " rawSend: " << packet.data.getSize()
307 << " bytes sent" << std::endl);
308 } catch (SendFailedException &e) {
309 LOG(derr_con << m_connection->getDesc()
310 << "Connection::rawSend(): SendFailedException: "
311 << packet.address.serializeString() << std::endl);
315 void ConnectionSendThread::sendAsPacketReliable(BufferedPacket &p, Channel *channel)
318 p.absolute_send_time = porting::getTimeMs();
320 channel->outgoing_reliables_sent.insert(p,
321 (channel->readOutgoingSequenceNumber() - MAX_RELIABLE_WINDOW_SIZE)
322 % (MAX_RELIABLE_WINDOW_SIZE + 1));
324 catch (AlreadyExistsException &e) {
325 LOG(derr_con << m_connection->getDesc()
326 << "WARNING: Going to send a reliable packet"
327 << " in outgoing buffer" << std::endl);
334 bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum,
335 const SharedBuffer<u8> &data, bool reliable)
337 PeerHelper peer = m_connection->getPeerNoEx(peer_id);
339 LOG(errorstream << m_connection->getDesc()
340 << " dropped " << (reliable ? "reliable " : "")
341 << "packet for non existent peer_id: " << peer_id << std::endl);
344 Channel *channel = &(dynamic_cast<UDPPeer *>(&peer)->channels[channelnum]);
347 bool have_sequence_number_for_raw_packet = true;
349 channel->getOutgoingSequenceNumber(have_sequence_number_for_raw_packet);
351 if (!have_sequence_number_for_raw_packet)
354 SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
355 Address peer_address;
356 peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
358 // Add base headers and make a packet
359 BufferedPacket p = con::makePacket(peer_address, reliable,
360 m_connection->GetProtocolID(), m_connection->GetPeerID(),
363 // first check if our send window is already maxed out
364 if (channel->outgoing_reliables_sent.size()
365 < channel->getWindowSize()) {
366 LOG(dout_con << m_connection->getDesc()
367 << " INFO: sending a reliable packet to peer_id " << peer_id
368 << " channel: " << (u32)channelnum
369 << " seqnum: " << seqnum << std::endl);
370 sendAsPacketReliable(p, channel);
374 LOG(dout_con << m_connection->getDesc()
375 << " INFO: queueing reliable packet for peer_id: " << peer_id
376 << " channel: " << (u32)channelnum
377 << " seqnum: " << seqnum << std::endl);
378 channel->queued_reliables.push(p);
382 Address peer_address;
383 if (peer->getAddress(MTP_UDP, peer_address)) {
384 // Add base headers and make a packet
385 BufferedPacket p = con::makePacket(peer_address, data,
386 m_connection->GetProtocolID(), m_connection->GetPeerID(),
394 LOG(dout_con << m_connection->getDesc()
395 << " INFO: dropped unreliable packet for peer_id: " << peer_id
396 << " because of (yet) missing udp address" << std::endl);
400 void ConnectionSendThread::processReliableCommand(ConnectionCommand &c)
402 assert(c.reliable); // Pre-condition
406 LOG(dout_con << m_connection->getDesc()
407 << "UDP processing reliable CONNCMD_NONE" << std::endl);
411 LOG(dout_con << m_connection->getDesc()
412 << "UDP processing reliable CONNCMD_SEND" << std::endl);
416 case CONNCMD_SEND_TO_ALL:
417 LOG(dout_con << m_connection->getDesc()
418 << "UDP processing CONNCMD_SEND_TO_ALL" << std::endl);
419 sendToAllReliable(c);
422 case CONCMD_CREATE_PEER:
423 LOG(dout_con << m_connection->getDesc()
424 << "UDP processing reliable CONCMD_CREATE_PEER" << std::endl);
425 if (!rawSendAsPacket(c.peer_id, c.channelnum, c.data, c.reliable)) {
426 /* put to queue if we couldn't send it immediately */
432 case CONNCMD_CONNECT:
433 case CONNCMD_DISCONNECT:
435 FATAL_ERROR("Got command that shouldn't be reliable as reliable command");
437 LOG(dout_con << m_connection->getDesc()
438 << " Invalid reliable command type: " << c.type << std::endl);
443 void ConnectionSendThread::processNonReliableCommand(ConnectionCommand &c)
445 assert(!c.reliable); // Pre-condition
449 LOG(dout_con << m_connection->getDesc()
450 << " UDP processing CONNCMD_NONE" << std::endl);
453 LOG(dout_con << m_connection->getDesc()
454 << " UDP processing CONNCMD_SERVE port="
455 << c.address.serializeString() << std::endl);
458 case CONNCMD_CONNECT:
459 LOG(dout_con << m_connection->getDesc()
460 << " UDP processing CONNCMD_CONNECT" << std::endl);
463 case CONNCMD_DISCONNECT:
464 LOG(dout_con << m_connection->getDesc()
465 << " UDP processing CONNCMD_DISCONNECT" << std::endl);
468 case CONNCMD_DISCONNECT_PEER:
469 LOG(dout_con << m_connection->getDesc()
470 << " UDP processing CONNCMD_DISCONNECT_PEER" << std::endl);
471 disconnect_peer(c.peer_id);
474 LOG(dout_con << m_connection->getDesc()
475 << " UDP processing CONNCMD_SEND" << std::endl);
476 send(c.peer_id, c.channelnum, c.data);
478 case CONNCMD_SEND_TO_ALL:
479 LOG(dout_con << m_connection->getDesc()
480 << " UDP processing CONNCMD_SEND_TO_ALL" << std::endl);
481 sendToAll(c.channelnum, c.data);
484 LOG(dout_con << m_connection->getDesc()
485 << " UDP processing CONCMD_ACK" << std::endl);
486 sendAsPacket(c.peer_id, c.channelnum, c.data, true);
488 case CONCMD_CREATE_PEER:
489 FATAL_ERROR("Got command that should be reliable as unreliable command");
491 LOG(dout_con << m_connection->getDesc()
492 << " Invalid command type: " << c.type << std::endl);
496 void ConnectionSendThread::serve(Address bind_address)
498 LOG(dout_con << m_connection->getDesc()
499 << "UDP serving at port " << bind_address.serializeString() << std::endl);
501 m_connection->m_udpSocket.Bind(bind_address);
502 m_connection->SetPeerID(PEER_ID_SERVER);
504 catch (SocketException &e) {
508 m_connection->putEvent(ce);
512 void ConnectionSendThread::connect(Address address)
514 LOG(dout_con << m_connection->getDesc() << " connecting to "
515 << address.serializeString()
516 << ":" << address.getPort() << std::endl);
518 UDPPeer *peer = m_connection->createServerPeer(address);
522 e.peerAdded(peer->id, peer->address);
523 m_connection->putEvent(e);
527 if (address.isIPv6())
528 bind_addr.setAddress((IPv6AddressBytes *) NULL);
530 bind_addr.setAddress(0, 0, 0, 0);
532 m_connection->m_udpSocket.Bind(bind_addr);
534 // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
535 m_connection->SetPeerID(PEER_ID_INEXISTENT);
536 NetworkPacket pkt(0, 0);
537 m_connection->Send(PEER_ID_SERVER, 0, &pkt, true);
540 void ConnectionSendThread::disconnect()
542 LOG(dout_con << m_connection->getDesc() << " disconnecting" << std::endl);
544 // Create and send DISCO packet
545 SharedBuffer<u8> data(2);
546 writeU8(&data[0], PACKET_TYPE_CONTROL);
547 writeU8(&data[1], CONTROLTYPE_DISCO);
551 std::list<session_t> peerids = m_connection->getPeerIDs();
553 for (session_t peerid : peerids) {
554 sendAsPacket(peerid, 0, data, false);
558 void ConnectionSendThread::disconnect_peer(session_t peer_id)
560 LOG(dout_con << m_connection->getDesc() << " disconnecting peer" << std::endl);
562 // Create and send DISCO packet
563 SharedBuffer<u8> data(2);
564 writeU8(&data[0], PACKET_TYPE_CONTROL);
565 writeU8(&data[1], CONTROLTYPE_DISCO);
566 sendAsPacket(peer_id, 0, data, false);
568 PeerHelper peer = m_connection->getPeerNoEx(peer_id);
573 if (dynamic_cast<UDPPeer *>(&peer) == 0) {
577 dynamic_cast<UDPPeer *>(&peer)->m_pending_disconnect = true;
580 void ConnectionSendThread::send(session_t peer_id, u8 channelnum,
581 const SharedBuffer<u8> &data)
583 assert(channelnum < CHANNEL_COUNT); // Pre-condition
585 PeerHelper peer = m_connection->getPeerNoEx(peer_id);
587 LOG(dout_con << m_connection->getDesc() << " peer: peer_id=" << peer_id
588 << ">>>NOT<<< found on sending packet"
589 << ", channel " << (channelnum % 0xFF)
590 << ", size: " << data.getSize() << std::endl);
594 LOG(dout_con << m_connection->getDesc() << " sending to peer_id=" << peer_id
595 << ", channel " << (channelnum % 0xFF)
596 << ", size: " << data.getSize() << std::endl);
598 u16 split_sequence_number = peer->getNextSplitSequenceNumber(channelnum);
600 u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
601 std::list<SharedBuffer<u8>> originals;
603 makeAutoSplitPacket(data, chunksize_max, split_sequence_number, &originals);
605 peer->setNextSplitSequenceNumber(channelnum, split_sequence_number);
607 for (const SharedBuffer<u8> &original : originals) {
608 sendAsPacket(peer_id, channelnum, original);
612 void ConnectionSendThread::sendReliable(ConnectionCommand &c)
614 PeerHelper peer = m_connection->getPeerNoEx(c.peer_id);
618 peer->PutReliableSendCommand(c, m_max_packet_size);
621 void ConnectionSendThread::sendToAll(u8 channelnum, const SharedBuffer<u8> &data)
623 std::list<session_t> peerids = m_connection->getPeerIDs();
625 for (session_t peerid : peerids) {
626 send(peerid, channelnum, data);
630 void ConnectionSendThread::sendToAllReliable(ConnectionCommand &c)
632 std::list<session_t> peerids = m_connection->getPeerIDs();
634 for (session_t peerid : peerids) {
635 PeerHelper peer = m_connection->getPeerNoEx(peerid);
640 peer->PutReliableSendCommand(c, m_max_packet_size);
644 void ConnectionSendThread::sendPackets(float dtime)
646 std::list<session_t> peerIds = m_connection->getPeerIDs();
647 std::list<session_t> pendingDisconnect;
648 std::map<session_t, bool> pending_unreliable;
650 const unsigned int peer_packet_quota = m_iteration_packets_avaialble
651 / MYMAX(peerIds.size(), 1);
653 for (session_t peerId : peerIds) {
654 PeerHelper peer = m_connection->getPeerNoEx(peerId);
655 //peer may have been removed
657 LOG(dout_con << m_connection->getDesc() << " Peer not found: peer_id="
662 peer->m_increment_packets_remaining = peer_packet_quota;
664 UDPPeer *udpPeer = dynamic_cast<UDPPeer *>(&peer);
670 if (udpPeer->m_pending_disconnect) {
671 pendingDisconnect.push_back(peerId);
674 PROFILE(std::stringstream
677 peerIdentifier << "sendPackets[" << m_connection->getDesc() << ";" << peerId
679 PROFILE(ScopeProfiler
680 peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
682 LOG(dout_con << m_connection->getDesc()
683 << " Handle per peer queues: peer_id=" << peerId
684 << " packet quota: " << peer->m_increment_packets_remaining << std::endl);
686 // first send queued reliable packets for all peers (if possible)
687 for (unsigned int i = 0; i < CHANNEL_COUNT; i++) {
688 Channel &channel = udpPeer->channels[i];
691 channel.outgoing_reliables_sent.getFirstSeqnum(next_to_ack);
692 u16 next_to_receive = 0;
693 channel.incoming_reliables.getFirstSeqnum(next_to_receive);
695 LOG(dout_con << m_connection->getDesc() << "\t channel: "
696 << i << ", peer quota:"
697 << peer->m_increment_packets_remaining
699 << "\t\t\treliables on wire: "
700 << channel.outgoing_reliables_sent.size()
701 << ", waiting for ack for " << next_to_ack
703 << "\t\t\tincoming_reliables: "
704 << channel.incoming_reliables.size()
705 << ", next reliable packet: "
706 << channel.readNextIncomingSeqNum()
707 << ", next queued: " << next_to_receive
709 << "\t\t\treliables queued : "
710 << channel.queued_reliables.size()
712 << "\t\t\tqueued commands : "
713 << channel.queued_commands.size()
716 while (!channel.queued_reliables.empty() &&
717 channel.outgoing_reliables_sent.size()
718 < channel.getWindowSize() &&
719 peer->m_increment_packets_remaining > 0) {
720 BufferedPacket p = channel.queued_reliables.front();
721 channel.queued_reliables.pop();
722 LOG(dout_con << m_connection->getDesc()
723 << " INFO: sending a queued reliable packet "
725 << ", seqnum: " << readU16(&p.data[BASE_HEADER_SIZE + 1])
727 sendAsPacketReliable(p, &channel);
728 peer->m_increment_packets_remaining--;
733 if (!m_outgoing_queue.empty()) {
734 LOG(dout_con << m_connection->getDesc()
735 << " Handle non reliable queue ("
736 << m_outgoing_queue.size() << " pkts)" << std::endl);
739 unsigned int initial_queuesize = m_outgoing_queue.size();
740 /* send non reliable packets*/
741 for (unsigned int i = 0; i < initial_queuesize; i++) {
742 OutgoingPacket packet = m_outgoing_queue.front();
743 m_outgoing_queue.pop();
748 PeerHelper peer = m_connection->getPeerNoEx(packet.peer_id);
750 LOG(dout_con << m_connection->getDesc()
751 << " Outgoing queue: peer_id=" << packet.peer_id
752 << ">>>NOT<<< found on sending packet"
753 << ", channel " << (packet.channelnum % 0xFF)
754 << ", size: " << packet.data.getSize() << std::endl);
758 /* send acks immediately */
759 if (packet.ack || peer->m_increment_packets_remaining > 0 || stopRequested()) {
760 rawSendAsPacket(packet.peer_id, packet.channelnum,
761 packet.data, packet.reliable);
762 if (peer->m_increment_packets_remaining > 0)
763 peer->m_increment_packets_remaining--;
765 m_outgoing_queue.push(packet);
766 pending_unreliable[packet.peer_id] = true;
770 if (peer_packet_quota > 0) {
771 for (session_t peerId : peerIds) {
772 PeerHelper peer = m_connection->getPeerNoEx(peerId);
775 if (peer->m_increment_packets_remaining == 0) {
776 LOG(warningstream << m_connection->getDesc()
777 << " Packet quota used up for peer_id=" << peerId
778 << ", was " << peer_packet_quota << " pkts" << std::endl);
783 for (session_t peerId : pendingDisconnect) {
784 if (!pending_unreliable[peerId]) {
785 m_connection->deletePeer(peerId, false);
790 void ConnectionSendThread::sendAsPacket(session_t peer_id, u8 channelnum,
791 const SharedBuffer<u8> &data, bool ack)
793 OutgoingPacket packet(peer_id, channelnum, data, false, ack);
794 m_outgoing_queue.push(packet);
797 ConnectionReceiveThread::ConnectionReceiveThread(unsigned int max_packet_size) :
798 Thread("ConnectionReceive")
802 void *ConnectionReceiveThread::run()
804 assert(m_connection);
806 LOG(dout_con << m_connection->getDesc()
807 << "ConnectionReceive thread started" << std::endl);
809 PROFILE(std::stringstream
811 PROFILE(ThreadIdentifier << "ConnectionReceive: [" << m_connection->getDesc() << "]");
813 // use IPv6 minimum allowed MTU as receive buffer size as this is
814 // theoretical reliable upper boundary of a udp packet for all IPv6 enabled
816 const unsigned int packet_maxsize = 1500;
817 SharedBuffer<u8> packetdata(packet_maxsize);
819 bool packet_queued = true;
821 #ifdef DEBUG_CONNECTION_KBPS
822 u64 curtime = porting::getTimeMs();
823 u64 lasttime = curtime;
824 float debug_print_timer = 0.0;
827 while (!stopRequested()) {
828 BEGIN_DEBUG_EXCEPTION_HANDLER
829 PROFILE(ScopeProfiler
830 sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
832 #ifdef DEBUG_CONNECTION_KBPS
834 curtime = porting::getTimeMs();
835 float dtime = CALC_DTIME(lasttime,curtime);
838 /* receive packets */
839 receive(packetdata, packet_queued);
841 #ifdef DEBUG_CONNECTION_KBPS
842 debug_print_timer += dtime;
843 if (debug_print_timer > 20.0) {
844 debug_print_timer -= 20.0;
846 std::list<session_t> peerids = m_connection->getPeerIDs();
848 for (std::list<session_t>::iterator i = peerids.begin();
852 PeerHelper peer = m_connection->getPeerNoEx(*i);
856 float peer_current = 0.0;
857 float peer_loss = 0.0;
858 float avg_rate = 0.0;
859 float avg_loss = 0.0;
861 for(u16 j=0; j<CHANNEL_COUNT; j++)
863 peer_current +=peer->channels[j].getCurrentDownloadRateKB();
864 peer_loss += peer->channels[j].getCurrentLossRateKB();
865 avg_rate += peer->channels[j].getAvgDownloadRateKB();
866 avg_loss += peer->channels[j].getAvgLossRateKB();
869 std::stringstream output;
870 output << std::fixed << std::setprecision(1);
871 output << "OUT to Peer " << *i << " RATES (good / loss) " << std::endl;
872 output << "\tcurrent (sum): " << peer_current << "kb/s "<< peer_loss << "kb/s" << std::endl;
873 output << "\taverage (sum): " << avg_rate << "kb/s "<< avg_loss << "kb/s" << std::endl;
874 output << std::setfill(' ');
875 for(u16 j=0; j<CHANNEL_COUNT; j++)
877 output << "\tcha " << j << ":"
878 << " CUR: " << std::setw(6) << peer->channels[j].getCurrentDownloadRateKB() <<"kb/s"
879 << " AVG: " << std::setw(6) << peer->channels[j].getAvgDownloadRateKB() <<"kb/s"
880 << " MAX: " << std::setw(6) << peer->channels[j].getMaxDownloadRateKB() <<"kb/s"
882 << " CUR: " << std::setw(6) << peer->channels[j].getCurrentLossRateKB() <<"kb/s"
883 << " AVG: " << std::setw(6) << peer->channels[j].getAvgLossRateKB() <<"kb/s"
884 << " MAX: " << std::setw(6) << peer->channels[j].getMaxLossRateKB() <<"kb/s"
885 << " / WS: " << peer->channels[j].getWindowSize()
889 fprintf(stderr,"%s\n",output.str().c_str());
893 END_DEBUG_EXCEPTION_HANDLER
896 PROFILE(g_profiler->remove(ThreadIdentifier.str()));
900 // Receive packets from the network and buffers and create ConnectionEvents
901 void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata,
905 // First, see if there any buffered packets we can process now
907 bool data_left = true;
909 SharedBuffer<u8> resultdata;
912 data_left = getFromBuffers(peer_id, resultdata);
915 e.dataReceived(peer_id, resultdata);
916 m_connection->putEvent(e);
919 catch (ProcessedSilentlyException &e) {
920 /* try reading again */
923 packet_queued = false;
926 // Call Receive() to wait for incoming data
928 s32 received_size = m_connection->m_udpSocket.Receive(sender,
929 *packetdata, packetdata.getSize());
930 if (received_size < 0)
933 if ((received_size < BASE_HEADER_SIZE) ||
934 (readU32(&packetdata[0]) != m_connection->GetProtocolID())) {
935 LOG(derr_con << m_connection->getDesc()
936 << "Receive(): Invalid incoming packet, "
937 << "size: " << received_size
939 << ((received_size >= 4) ? readU32(&packetdata[0]) : -1)
944 session_t peer_id = readPeerId(*packetdata);
945 u8 channelnum = readChannel(*packetdata);
947 if (channelnum > CHANNEL_COUNT - 1) {
948 LOG(derr_con << m_connection->getDesc()
949 << "Receive(): Invalid channel " << (u32)channelnum << std::endl);
953 /* Try to identify peer by sender address (may happen on join) */
954 if (peer_id == PEER_ID_INEXISTENT) {
955 peer_id = m_connection->lookupPeer(sender);
956 // We do not have to remind the peer of its
957 // peer id as the CONTROLTYPE_SET_PEER_ID
958 // command was sent reliably.
961 /* The peer was not found in our lists. Add it. */
962 if (peer_id == PEER_ID_INEXISTENT) {
963 peer_id = m_connection->createPeer(sender, MTP_MINETEST_RELIABLE_UDP, 0);
966 PeerHelper peer = m_connection->getPeerNoEx(peer_id);
968 LOG(dout_con << m_connection->getDesc()
969 << " got packet from unknown peer_id: "
970 << peer_id << " Ignoring." << std::endl);
974 // Validate peer address
976 Address peer_address;
977 if (peer->getAddress(MTP_UDP, peer_address)) {
978 if (peer_address != sender) {
979 LOG(derr_con << m_connection->getDesc()
980 << " Peer " << peer_id << " sending from different address."
981 " Ignoring." << std::endl);
985 LOG(derr_con << m_connection->getDesc()
986 << " Peer " << peer_id << " doesn't have an address?!"
987 " Ignoring." << std::endl);
991 peer->ResetTimeout();
993 Channel *channel = nullptr;
994 if (dynamic_cast<UDPPeer *>(&peer)) {
995 channel = &dynamic_cast<UDPPeer *>(&peer)->channels[channelnum];
997 LOG(derr_con << m_connection->getDesc()
998 << "Receive(): peer_id=" << peer_id << " isn't an UDPPeer?!"
999 " Ignoring." << std::endl);
1003 channel->UpdateBytesReceived(received_size);
1005 // Throw the received packet to channel->processPacket()
1007 // Make a new SharedBuffer from the data without the base headers
1008 SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
1009 memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
1010 strippeddata.getSize());
1013 // Process it (the result is some data with no headers made by us)
1014 SharedBuffer<u8> resultdata = processPacket
1015 (channel, strippeddata, peer_id, channelnum, false);
1017 LOG(dout_con << m_connection->getDesc()
1018 << " ProcessPacket from peer_id: " << peer_id
1019 << ", channel: " << (u32)channelnum << ", returned "
1020 << resultdata.getSize() << " bytes" << std::endl);
1023 e.dataReceived(peer_id, resultdata);
1024 m_connection->putEvent(e);
1026 catch (ProcessedSilentlyException &e) {
1028 catch (ProcessedQueued &e) {
1029 // we set it to true anyway (see below)
1032 /* Every time we receive a packet it can happen that a previously
1033 * buffered packet is now ready to process. */
1034 packet_queued = true;
1036 catch (InvalidIncomingDataException &e) {
1040 bool ConnectionReceiveThread::getFromBuffers(session_t &peer_id, SharedBuffer<u8> &dst)
1042 std::list<session_t> peerids = m_connection->getPeerIDs();
1044 for (session_t peerid : peerids) {
1045 PeerHelper peer = m_connection->getPeerNoEx(peerid);
1049 if (dynamic_cast<UDPPeer *>(&peer) == 0)
1052 for (Channel &channel : (dynamic_cast<UDPPeer *>(&peer))->channels) {
1053 if (checkIncomingBuffers(&channel, peer_id, dst)) {
1061 bool ConnectionReceiveThread::checkIncomingBuffers(Channel *channel,
1062 session_t &peer_id, SharedBuffer<u8> &dst)
1064 u16 firstseqnum = 0;
1065 if (channel->incoming_reliables.getFirstSeqnum(firstseqnum)) {
1066 if (firstseqnum == channel->readNextIncomingSeqNum()) {
1067 BufferedPacket p = channel->incoming_reliables.popFirst();
1068 peer_id = readPeerId(*p.data);
1069 u8 channelnum = readChannel(*p.data);
1070 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE + 1]);
1072 LOG(dout_con << m_connection->getDesc()
1073 << "UNBUFFERING TYPE_RELIABLE"
1074 << " seqnum=" << seqnum
1075 << " peer_id=" << peer_id
1076 << " channel=" << ((int) channelnum & 0xff)
1079 channel->incNextIncomingSeqNum();
1081 u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
1082 // Get out the inside packet and re-process it
1083 SharedBuffer<u8> payload(p.data.getSize() - headers_size);
1084 memcpy(*payload, &p.data[headers_size], payload.getSize());
1086 dst = processPacket(channel, payload, peer_id, channelnum, true);
1093 SharedBuffer<u8> ConnectionReceiveThread::processPacket(Channel *channel,
1094 const SharedBuffer<u8> &packetdata, session_t peer_id, u8 channelnum, bool reliable)
1096 PeerHelper peer = m_connection->getPeerNoEx(peer_id);
1099 errorstream << "Peer not found (possible timeout)" << std::endl;
1100 throw ProcessedSilentlyException("Peer not found (possible timeout)");
1103 if (packetdata.getSize() < 1)
1104 throw InvalidIncomingDataException("packetdata.getSize() < 1");
1106 u8 type = readU8(&(packetdata[0]));
1108 if (MAX_UDP_PEERS <= 65535 && peer_id >= MAX_UDP_PEERS) {
1109 std::string errmsg = "Invalid peer_id=" + itos(peer_id);
1110 errorstream << errmsg << std::endl;
1111 throw InvalidIncomingDataException(errmsg.c_str());
1114 if (type >= PACKET_TYPE_MAX) {
1115 derr_con << m_connection->getDesc() << "Got invalid type=" << ((int) type & 0xff)
1117 throw InvalidIncomingDataException("Invalid packet type");
1120 const PacketTypeHandler &pHandle = packetTypeRouter[type];
1121 return (this->*pHandle.handler)(channel, packetdata, &peer, channelnum, reliable);
1124 const ConnectionReceiveThread::PacketTypeHandler
1125 ConnectionReceiveThread::packetTypeRouter[PACKET_TYPE_MAX] = {
1126 {&ConnectionReceiveThread::handlePacketType_Control},
1127 {&ConnectionReceiveThread::handlePacketType_Original},
1128 {&ConnectionReceiveThread::handlePacketType_Split},
1129 {&ConnectionReceiveThread::handlePacketType_Reliable},
1132 SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *channel,
1133 const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
1135 if (packetdata.getSize() < 2)
1136 throw InvalidIncomingDataException("packetdata.getSize() < 2");
1138 u8 controltype = readU8(&(packetdata[1]));
1140 if (controltype == CONTROLTYPE_ACK) {
1141 assert(channel != NULL);
1143 if (packetdata.getSize() < 4) {
1144 throw InvalidIncomingDataException(
1145 "packetdata.getSize() < 4 (ACK header size)");
1148 u16 seqnum = readU16(&packetdata[2]);
1149 LOG(dout_con << m_connection->getDesc() << " [ CONTROLTYPE_ACK: channelnum="
1150 << ((int) channelnum & 0xff) << ", peer_id=" << peer->id << ", seqnum="
1151 << seqnum << " ]" << std::endl);
1154 BufferedPacket p = channel->outgoing_reliables_sent.popSeqnum(seqnum);
1156 // only calculate rtt from straight sent packets
1157 if (p.resend_count == 0) {
1158 // Get round trip time
1159 u64 current_time = porting::getTimeMs();
1161 // a overflow is quite unlikely but as it'd result in major
1162 // rtt miscalculation we handle it here
1163 if (current_time > p.absolute_send_time) {
1164 float rtt = (current_time - p.absolute_send_time) / 1000.0;
1166 // Let peer calculate stuff according to it
1167 // (avg_rtt and resend_timeout)
1168 dynamic_cast<UDPPeer *>(peer)->reportRTT(rtt);
1169 } else if (p.totaltime > 0) {
1170 float rtt = p.totaltime;
1172 // Let peer calculate stuff according to it
1173 // (avg_rtt and resend_timeout)
1174 dynamic_cast<UDPPeer *>(peer)->reportRTT(rtt);
1177 // put bytes for max bandwidth calculation
1178 channel->UpdateBytesSent(p.data.getSize(), 1);
1179 if (channel->outgoing_reliables_sent.size() == 0)
1180 m_connection->TriggerSend();
1181 } catch (NotFoundException &e) {
1182 LOG(derr_con << m_connection->getDesc()
1183 << "WARNING: ACKed packet not in outgoing queue"
1184 << " seqnum=" << seqnum << std::endl);
1185 channel->UpdatePacketTooLateCounter();
1188 throw ProcessedSilentlyException("Got an ACK");
1189 } else if (controltype == CONTROLTYPE_SET_PEER_ID) {
1190 // Got a packet to set our peer id
1191 if (packetdata.getSize() < 4)
1192 throw InvalidIncomingDataException
1193 ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
1194 session_t peer_id_new = readU16(&packetdata[2]);
1195 LOG(dout_con << m_connection->getDesc() << "Got new peer id: " << peer_id_new
1196 << "... " << std::endl);
1198 if (m_connection->GetPeerID() != PEER_ID_INEXISTENT) {
1199 LOG(derr_con << m_connection->getDesc()
1200 << "WARNING: Not changing existing peer id." << std::endl);
1202 LOG(dout_con << m_connection->getDesc() << "changing own peer id"
1204 m_connection->SetPeerID(peer_id_new);
1207 throw ProcessedSilentlyException("Got a SET_PEER_ID");
1208 } else if (controltype == CONTROLTYPE_PING) {
1209 // Just ignore it, the incoming data already reset
1210 // the timeout counter
1211 LOG(dout_con << m_connection->getDesc() << "PING" << std::endl);
1212 throw ProcessedSilentlyException("Got a PING");
1213 } else if (controltype == CONTROLTYPE_DISCO) {
1214 // Just ignore it, the incoming data already reset
1215 // the timeout counter
1216 LOG(dout_con << m_connection->getDesc() << "DISCO: Removing peer "
1217 << peer->id << std::endl);
1219 if (!m_connection->deletePeer(peer->id, false)) {
1220 derr_con << m_connection->getDesc() << "DISCO: Peer not found" << std::endl;
1223 throw ProcessedSilentlyException("Got a DISCO");
1225 LOG(derr_con << m_connection->getDesc()
1226 << "INVALID TYPE_CONTROL: invalid controltype="
1227 << ((int) controltype & 0xff) << std::endl);
1228 throw InvalidIncomingDataException("Invalid control type");
1232 SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Original(Channel *channel,
1233 const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
1235 if (packetdata.getSize() <= ORIGINAL_HEADER_SIZE)
1236 throw InvalidIncomingDataException
1237 ("packetdata.getSize() <= ORIGINAL_HEADER_SIZE");
1238 LOG(dout_con << m_connection->getDesc() << "RETURNING TYPE_ORIGINAL to user"
1240 // Get the inside packet out and return it
1241 SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
1242 memcpy(*payload, &(packetdata[ORIGINAL_HEADER_SIZE]), payload.getSize());
1246 SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Split(Channel *channel,
1247 const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
1249 Address peer_address;
1251 if (peer->getAddress(MTP_UDP, peer_address)) {
1252 // We have to create a packet again for buffering
1253 // This isn't actually too bad an idea.
1254 BufferedPacket packet = makePacket(peer_address,
1256 m_connection->GetProtocolID(),
1260 // Buffer the packet
1261 SharedBuffer<u8> data = peer->addSplitPacket(channelnum, packet, reliable);
1263 if (data.getSize() != 0) {
1264 LOG(dout_con << m_connection->getDesc()
1265 << "RETURNING TYPE_SPLIT: Constructed full data, "
1266 << "size=" << data.getSize() << std::endl);
1269 LOG(dout_con << m_connection->getDesc() << "BUFFERED TYPE_SPLIT" << std::endl);
1270 throw ProcessedSilentlyException("Buffered a split packet chunk");
1273 // We should never get here.
1274 FATAL_ERROR("Invalid execution point");
1277 SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *channel,
1278 const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
1280 assert(channel != NULL);
1282 // Recursive reliable packets not allowed
1284 throw InvalidIncomingDataException("Found nested reliable packets");
1286 if (packetdata.getSize() < RELIABLE_HEADER_SIZE)
1287 throw InvalidIncomingDataException("packetdata.getSize() < RELIABLE_HEADER_SIZE");
1289 u16 seqnum = readU16(&packetdata[1]);
1290 bool is_future_packet = false;
1291 bool is_old_packet = false;
1293 /* packet is within our receive window send ack */
1294 if (seqnum_in_window(seqnum,
1295 channel->readNextIncomingSeqNum(), MAX_RELIABLE_WINDOW_SIZE)) {
1296 m_connection->sendAck(peer->id, channelnum, seqnum);
1298 is_future_packet = seqnum_higher(seqnum, channel->readNextIncomingSeqNum());
1299 is_old_packet = seqnum_higher(channel->readNextIncomingSeqNum(), seqnum);
1301 /* packet is not within receive window, don't send ack. *
1302 * if this was a valid packet it's gonna be retransmitted */
1303 if (is_future_packet)
1304 throw ProcessedSilentlyException(
1305 "Received packet newer then expected, not sending ack");
1307 /* seems like our ack was lost, send another one for a old packet */
1308 if (is_old_packet) {
1309 LOG(dout_con << m_connection->getDesc()
1310 << "RE-SENDING ACK: peer_id: " << peer->id
1311 << ", channel: " << (channelnum & 0xFF)
1312 << ", seqnum: " << seqnum << std::endl;)
1313 m_connection->sendAck(peer->id, channelnum, seqnum);
1315 // we already have this packet so this one was on wire at least
1316 // the current timeout
1317 // we don't know how long this packet was on wire don't do silly guessing
1318 // dynamic_cast<UDPPeer*>(&peer)->
1319 // reportRTT(dynamic_cast<UDPPeer*>(&peer)->getResendTimeout());
1321 throw ProcessedSilentlyException("Retransmitting ack for old packet");
1325 if (seqnum != channel->readNextIncomingSeqNum()) {
1326 Address peer_address;
1328 // this is a reliable packet so we have a udp address for sure
1329 peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
1330 // This one comes later, buffer it.
1331 // Actually we have to make a packet to buffer one.
1332 // Well, we have all the ingredients, so just do it.
1333 BufferedPacket packet = con::makePacket(
1336 m_connection->GetProtocolID(),
1340 channel->incoming_reliables.insert(packet, channel->readNextIncomingSeqNum());
1342 LOG(dout_con << m_connection->getDesc()
1343 << "BUFFERING, TYPE_RELIABLE peer_id: " << peer->id
1344 << ", channel: " << (channelnum & 0xFF)
1345 << ", seqnum: " << seqnum << std::endl;)
1347 throw ProcessedQueued("Buffered future reliable packet");
1348 } catch (AlreadyExistsException &e) {
1349 } catch (IncomingDataCorruption &e) {
1350 ConnectionCommand discon;
1351 discon.disconnect_peer(peer->id);
1352 m_connection->putCommand(discon);
1354 LOG(derr_con << m_connection->getDesc()
1355 << "INVALID, TYPE_RELIABLE peer_id: " << peer->id
1356 << ", channel: " << (channelnum & 0xFF)
1357 << ", seqnum: " << seqnum
1358 << "DROPPING CLIENT!" << std::endl;)
1362 /* we got a packet to process right now */
1363 LOG(dout_con << m_connection->getDesc()
1364 << "RECURSIVE, TYPE_RELIABLE peer_id: " << peer->id
1365 << ", channel: " << (channelnum & 0xFF)
1366 << ", seqnum: " << seqnum << std::endl;)
1369 /* check for resend case */
1370 u16 queued_seqnum = 0;
1371 if (channel->incoming_reliables.getFirstSeqnum(queued_seqnum)) {
1372 if (queued_seqnum == seqnum) {
1373 BufferedPacket queued_packet = channel->incoming_reliables.popFirst();
1374 /** TODO find a way to verify the new against the old packet */
1378 channel->incNextIncomingSeqNum();
1380 // Get out the inside packet and re-process it
1381 SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
1382 memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
1384 return processPacket(channel, payload, peer->id, channelnum, true);