3 Copyright (C) 2013-2017 celeron55, Perttu Ahola <celeron55@gmail.com>
4 Copyright (C) 2017 celeron55, Loic Blot <loic.blot@unix-experience.fr>
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU Lesser General Public License as published by
8 the Free Software Foundation; either version 2.1 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU Lesser General Public License for more details.
16 You should have received a copy of the GNU Lesser General Public License along
17 with this program; if not, write to the Free Software Foundation, Inc.,
18 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 #include "connectionthreads.h"
25 #include "network/networkpacket.h"
26 #include "util/serialize.h"
31 /******************************************************************************/
32 /* defines used for debugging and profiling */
33 /******************************************************************************/
37 #undef DEBUG_CONNECTION_KBPS
39 /* this mutex is used to achieve log message consistency */
40 std::mutex log_conthread_mutex;
43 MutexAutoLock loglock(log_conthread_mutex); \
47 //#define DEBUG_CONNECTION_KBPS
48 #undef DEBUG_CONNECTION_KBPS
51 /* maximum number of retries for reliable packets */
52 #define MAX_RELIABLE_RETRY 5
56 static session_t readPeerId(u8 *packetdata)
58 return readU16(&packetdata[4]);
60 static u8 readChannel(u8 *packetdata)
62 return readU8(&packetdata[6]);
65 /******************************************************************************/
66 /* Connection Threads */
67 /******************************************************************************/
69 ConnectionSendThread::ConnectionSendThread(unsigned int max_packet_size,
71 Thread("ConnectionSend"),
72 m_max_packet_size(max_packet_size),
74 m_max_data_packets_per_iteration(g_settings->getU16("max_packets_per_iteration"))
76 SANITY_CHECK(m_max_data_packets_per_iteration > 1);
79 void *ConnectionSendThread::run()
83 LOG(dout_con << m_connection->getDesc()
84 << "ConnectionSend thread started" << std::endl);
86 u64 curtime = porting::getTimeMs();
87 u64 lasttime = curtime;
89 PROFILE(std::stringstream ThreadIdentifier);
90 PROFILE(ThreadIdentifier << "ConnectionSend: [" << m_connection->getDesc() << "]");
92 /* if stop is requested don't stop immediately but try to send all */
94 while (!stopRequested() || packetsQueued()) {
95 BEGIN_DEBUG_EXCEPTION_HANDLER
96 PROFILE(ScopeProfiler sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
98 m_iteration_packets_avaialble = m_max_data_packets_per_iteration;
100 /* wait for trigger or timeout */
101 m_send_sleep_semaphore.wait(50);
103 /* remove all triggers */
104 while (m_send_sleep_semaphore.wait(0)) {
108 curtime = porting::getTimeMs();
109 float dtime = CALC_DTIME(lasttime, curtime);
111 /* first resend timed-out packets */
113 if (m_iteration_packets_avaialble == 0) {
114 LOG(warningstream << m_connection->getDesc()
115 << " Packet quota used up after re-sending packets, "
116 << "max=" << m_max_data_packets_per_iteration << std::endl);
119 /* translate commands to packets */
120 ConnectionCommand c = m_connection->m_command_queue.pop_frontNoEx(0);
121 while (c.type != CONNCMD_NONE) {
123 processReliableCommand(c);
125 processNonReliableCommand(c);
127 c = m_connection->m_command_queue.pop_frontNoEx(0);
130 /* send queued packets */
133 END_DEBUG_EXCEPTION_HANDLER
136 PROFILE(g_profiler->remove(ThreadIdentifier.str()));
140 void ConnectionSendThread::Trigger()
142 m_send_sleep_semaphore.post();
145 bool ConnectionSendThread::packetsQueued()
147 std::list<session_t> peerIds = m_connection->getPeerIDs();
149 if (!m_outgoing_queue.empty() && !peerIds.empty())
152 for (session_t peerId : peerIds) {
153 PeerHelper peer = m_connection->getPeerNoEx(peerId);
158 if (dynamic_cast<UDPPeer *>(&peer) == 0)
161 for (Channel &channel : (dynamic_cast<UDPPeer *>(&peer))->channels) {
162 if (!channel.queued_commands.empty()) {
172 void ConnectionSendThread::runTimeouts(float dtime)
174 std::list<session_t> timeouted_peers;
175 std::list<session_t> peerIds = m_connection->getPeerIDs();
177 for (session_t &peerId : peerIds) {
178 PeerHelper peer = m_connection->getPeerNoEx(peerId);
183 UDPPeer *udpPeer = dynamic_cast<UDPPeer *>(&peer);
187 PROFILE(std::stringstream peerIdentifier);
188 PROFILE(peerIdentifier << "runTimeouts[" << m_connection->getDesc()
189 << ";" << peerId << ";RELIABLE]");
190 PROFILE(ScopeProfiler
191 peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
193 SharedBuffer<u8> data(2); // data for sending ping, required here because of goto
198 if (peer->isTimedOut(m_timeout)) {
199 infostream << m_connection->getDesc()
200 << "RunTimeouts(): Peer " << peer->id
203 // Add peer to the list
204 timeouted_peers.push_back(peer->id);
205 // Don't bother going through the buffers of this one
209 float resend_timeout = udpPeer->getResendTimeout();
210 bool retry_count_exceeded = false;
211 for (Channel &channel : udpPeer->channels) {
212 std::list<BufferedPacket> timed_outs;
214 // Remove timed out incomplete unreliable split packets
215 channel.incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
217 // Increment reliable packet times
218 channel.outgoing_reliables_sent.incrementTimeouts(dtime);
220 unsigned int numpeers = m_connection->m_peers.size();
225 // Re-send timed out outgoing reliables
226 timed_outs = channel.outgoing_reliables_sent.getTimedOuts(resend_timeout,
227 (m_max_data_packets_per_iteration / numpeers));
229 channel.UpdatePacketLossCounter(timed_outs.size());
230 g_profiler->graphAdd("packets_lost", timed_outs.size());
232 m_iteration_packets_avaialble -= timed_outs.size();
234 for (std::list<BufferedPacket>::iterator k = timed_outs.begin();
235 k != timed_outs.end(); ++k) {
236 session_t peer_id = readPeerId(*(k->data));
237 u8 channelnum = readChannel(*(k->data));
238 u16 seqnum = readU16(&(k->data[BASE_HEADER_SIZE + 1]));
240 channel.UpdateBytesLost(k->data.getSize());
243 if (k->resend_count > MAX_RELIABLE_RETRY) {
244 retry_count_exceeded = true;
245 timeouted_peers.push_back(peer->id);
246 /* no need to check additional packets if a single one did timeout*/
250 LOG(derr_con << m_connection->getDesc()
251 << "RE-SENDING timed-out RELIABLE to "
252 << k->address.serializeString()
253 << "(t/o=" << resend_timeout << "): "
254 << "from_peer_id=" << peer_id
255 << ", channel=" << ((int) channelnum & 0xff)
256 << ", seqnum=" << seqnum
261 // do not handle rtt here as we can't decide if this packet was
262 // lost or really takes more time to transmit
265 if (retry_count_exceeded) {
266 break; /* no need to check other channels if we already did timeout */
269 channel.UpdateTimers(dtime);
272 /* skip to next peer if we did timeout */
273 if (retry_count_exceeded)
276 /* send ping if necessary */
277 if (udpPeer->Ping(dtime, data)) {
278 LOG(dout_con << m_connection->getDesc()
279 << "Sending ping for peer_id: " << udpPeer->id << std::endl);
280 /* this may fail if there ain't a sequence number left */
281 if (!rawSendAsPacket(udpPeer->id, 0, data, true)) {
282 //retrigger with reduced ping interval
283 udpPeer->Ping(4.0, data);
287 udpPeer->RunCommandQueues(m_max_packet_size,
288 m_max_commands_per_iteration,
289 m_max_packets_requeued);
292 // Remove timed out peers
293 for (u16 timeouted_peer : timeouted_peers) {
294 LOG(dout_con << m_connection->getDesc()
295 << "RunTimeouts(): Removing peer " << timeouted_peer << std::endl);
296 m_connection->deletePeer(timeouted_peer, true);
300 void ConnectionSendThread::rawSend(const BufferedPacket &packet)
303 m_connection->m_udpSocket.Send(packet.address, *packet.data,
304 packet.data.getSize());
305 LOG(dout_con << m_connection->getDesc()
306 << " rawSend: " << packet.data.getSize()
307 << " bytes sent" << std::endl);
308 } catch (SendFailedException &e) {
309 LOG(derr_con << m_connection->getDesc()
310 << "Connection::rawSend(): SendFailedException: "
311 << packet.address.serializeString() << std::endl);
315 void ConnectionSendThread::sendAsPacketReliable(BufferedPacket &p, Channel *channel)
318 p.absolute_send_time = porting::getTimeMs();
320 channel->outgoing_reliables_sent.insert(p,
321 (channel->readOutgoingSequenceNumber() - MAX_RELIABLE_WINDOW_SIZE)
322 % (MAX_RELIABLE_WINDOW_SIZE + 1));
324 catch (AlreadyExistsException &e) {
325 LOG(derr_con << m_connection->getDesc()
326 << "WARNING: Going to send a reliable packet"
327 << " in outgoing buffer" << std::endl);
334 bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum,
335 const SharedBuffer<u8> &data, bool reliable)
337 PeerHelper peer = m_connection->getPeerNoEx(peer_id);
339 LOG(dout_con << m_connection->getDesc()
340 << " INFO: dropped packet for non existent peer_id: "
341 << peer_id << std::endl);
342 FATAL_ERROR_IF(!reliable,
343 "Trying to send raw packet reliable but no peer found!");
346 Channel *channel = &(dynamic_cast<UDPPeer *>(&peer)->channels[channelnum]);
349 bool have_sequence_number_for_raw_packet = true;
351 channel->getOutgoingSequenceNumber(have_sequence_number_for_raw_packet);
353 if (!have_sequence_number_for_raw_packet)
356 SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
357 Address peer_address;
358 peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
360 // Add base headers and make a packet
361 BufferedPacket p = con::makePacket(peer_address, reliable,
362 m_connection->GetProtocolID(), m_connection->GetPeerID(),
365 // first check if our send window is already maxed out
366 if (channel->outgoing_reliables_sent.size()
367 < channel->getWindowSize()) {
368 LOG(dout_con << m_connection->getDesc()
369 << " INFO: sending a reliable packet to peer_id " << peer_id
370 << " channel: " << (u32)channelnum
371 << " seqnum: " << seqnum << std::endl);
372 sendAsPacketReliable(p, channel);
376 LOG(dout_con << m_connection->getDesc()
377 << " INFO: queueing reliable packet for peer_id: " << peer_id
378 << " channel: " << (u32)channelnum
379 << " seqnum: " << seqnum << std::endl);
380 channel->queued_reliables.push(p);
384 Address peer_address;
385 if (peer->getAddress(MTP_UDP, peer_address)) {
386 // Add base headers and make a packet
387 BufferedPacket p = con::makePacket(peer_address, data,
388 m_connection->GetProtocolID(), m_connection->GetPeerID(),
396 LOG(dout_con << m_connection->getDesc()
397 << " INFO: dropped unreliable packet for peer_id: " << peer_id
398 << " because of (yet) missing udp address" << std::endl);
402 void ConnectionSendThread::processReliableCommand(ConnectionCommand &c)
404 assert(c.reliable); // Pre-condition
408 LOG(dout_con << m_connection->getDesc()
409 << "UDP processing reliable CONNCMD_NONE" << std::endl);
413 LOG(dout_con << m_connection->getDesc()
414 << "UDP processing reliable CONNCMD_SEND" << std::endl);
418 case CONNCMD_SEND_TO_ALL:
419 LOG(dout_con << m_connection->getDesc()
420 << "UDP processing CONNCMD_SEND_TO_ALL" << std::endl);
421 sendToAllReliable(c);
424 case CONCMD_CREATE_PEER:
425 LOG(dout_con << m_connection->getDesc()
426 << "UDP processing reliable CONCMD_CREATE_PEER" << std::endl);
427 if (!rawSendAsPacket(c.peer_id, c.channelnum, c.data, c.reliable)) {
428 /* put to queue if we couldn't send it immediately */
434 case CONNCMD_CONNECT:
435 case CONNCMD_DISCONNECT:
437 FATAL_ERROR("Got command that shouldn't be reliable as reliable command");
439 LOG(dout_con << m_connection->getDesc()
440 << " Invalid reliable command type: " << c.type << std::endl);
445 void ConnectionSendThread::processNonReliableCommand(ConnectionCommand &c)
447 assert(!c.reliable); // Pre-condition
451 LOG(dout_con << m_connection->getDesc()
452 << " UDP processing CONNCMD_NONE" << std::endl);
455 LOG(dout_con << m_connection->getDesc()
456 << " UDP processing CONNCMD_SERVE port="
457 << c.address.serializeString() << std::endl);
460 case CONNCMD_CONNECT:
461 LOG(dout_con << m_connection->getDesc()
462 << " UDP processing CONNCMD_CONNECT" << std::endl);
465 case CONNCMD_DISCONNECT:
466 LOG(dout_con << m_connection->getDesc()
467 << " UDP processing CONNCMD_DISCONNECT" << std::endl);
470 case CONNCMD_DISCONNECT_PEER:
471 LOG(dout_con << m_connection->getDesc()
472 << " UDP processing CONNCMD_DISCONNECT_PEER" << std::endl);
473 disconnect_peer(c.peer_id);
476 LOG(dout_con << m_connection->getDesc()
477 << " UDP processing CONNCMD_SEND" << std::endl);
478 send(c.peer_id, c.channelnum, c.data);
480 case CONNCMD_SEND_TO_ALL:
481 LOG(dout_con << m_connection->getDesc()
482 << " UDP processing CONNCMD_SEND_TO_ALL" << std::endl);
483 sendToAll(c.channelnum, c.data);
486 LOG(dout_con << m_connection->getDesc()
487 << " UDP processing CONCMD_ACK" << std::endl);
488 sendAsPacket(c.peer_id, c.channelnum, c.data, true);
490 case CONCMD_CREATE_PEER:
491 FATAL_ERROR("Got command that should be reliable as unreliable command");
493 LOG(dout_con << m_connection->getDesc()
494 << " Invalid command type: " << c.type << std::endl);
498 void ConnectionSendThread::serve(Address bind_address)
500 LOG(dout_con << m_connection->getDesc()
501 << "UDP serving at port " << bind_address.serializeString() << std::endl);
503 m_connection->m_udpSocket.Bind(bind_address);
504 m_connection->SetPeerID(PEER_ID_SERVER);
506 catch (SocketException &e) {
510 m_connection->putEvent(ce);
514 void ConnectionSendThread::connect(Address address)
516 LOG(dout_con << m_connection->getDesc() << " connecting to "
517 << address.serializeString()
518 << ":" << address.getPort() << std::endl);
520 UDPPeer *peer = m_connection->createServerPeer(address);
524 e.peerAdded(peer->id, peer->address);
525 m_connection->putEvent(e);
529 if (address.isIPv6())
530 bind_addr.setAddress((IPv6AddressBytes *) NULL);
532 bind_addr.setAddress(0, 0, 0, 0);
534 m_connection->m_udpSocket.Bind(bind_addr);
536 // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
537 m_connection->SetPeerID(PEER_ID_INEXISTENT);
538 NetworkPacket pkt(0, 0);
539 m_connection->Send(PEER_ID_SERVER, 0, &pkt, true);
542 void ConnectionSendThread::disconnect()
544 LOG(dout_con << m_connection->getDesc() << " disconnecting" << std::endl);
546 // Create and send DISCO packet
547 SharedBuffer<u8> data(2);
548 writeU8(&data[0], PACKET_TYPE_CONTROL);
549 writeU8(&data[1], CONTROLTYPE_DISCO);
553 std::list<session_t> peerids = m_connection->getPeerIDs();
555 for (session_t peerid : peerids) {
556 sendAsPacket(peerid, 0, data, false);
560 void ConnectionSendThread::disconnect_peer(session_t peer_id)
562 LOG(dout_con << m_connection->getDesc() << " disconnecting peer" << std::endl);
564 // Create and send DISCO packet
565 SharedBuffer<u8> data(2);
566 writeU8(&data[0], PACKET_TYPE_CONTROL);
567 writeU8(&data[1], CONTROLTYPE_DISCO);
568 sendAsPacket(peer_id, 0, data, false);
570 PeerHelper peer = m_connection->getPeerNoEx(peer_id);
575 if (dynamic_cast<UDPPeer *>(&peer) == 0) {
579 dynamic_cast<UDPPeer *>(&peer)->m_pending_disconnect = true;
582 void ConnectionSendThread::send(session_t peer_id, u8 channelnum,
583 const SharedBuffer<u8> &data)
585 assert(channelnum < CHANNEL_COUNT); // Pre-condition
587 PeerHelper peer = m_connection->getPeerNoEx(peer_id);
589 LOG(dout_con << m_connection->getDesc() << " peer: peer_id=" << peer_id
590 << ">>>NOT<<< found on sending packet"
591 << ", channel " << (channelnum % 0xFF)
592 << ", size: " << data.getSize() << std::endl);
596 LOG(dout_con << m_connection->getDesc() << " sending to peer_id=" << peer_id
597 << ", channel " << (channelnum % 0xFF)
598 << ", size: " << data.getSize() << std::endl);
600 u16 split_sequence_number = peer->getNextSplitSequenceNumber(channelnum);
602 u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
603 std::list<SharedBuffer<u8>> originals;
605 makeAutoSplitPacket(data, chunksize_max, split_sequence_number, &originals);
607 peer->setNextSplitSequenceNumber(channelnum, split_sequence_number);
609 for (const SharedBuffer<u8> &original : originals) {
610 sendAsPacket(peer_id, channelnum, original);
614 void ConnectionSendThread::sendReliable(ConnectionCommand &c)
616 PeerHelper peer = m_connection->getPeerNoEx(c.peer_id);
620 peer->PutReliableSendCommand(c, m_max_packet_size);
623 void ConnectionSendThread::sendToAll(u8 channelnum, const SharedBuffer<u8> &data)
625 std::list<session_t> peerids = m_connection->getPeerIDs();
627 for (session_t peerid : peerids) {
628 send(peerid, channelnum, data);
632 void ConnectionSendThread::sendToAllReliable(ConnectionCommand &c)
634 std::list<session_t> peerids = m_connection->getPeerIDs();
636 for (session_t peerid : peerids) {
637 PeerHelper peer = m_connection->getPeerNoEx(peerid);
642 peer->PutReliableSendCommand(c, m_max_packet_size);
646 void ConnectionSendThread::sendPackets(float dtime)
648 std::list<session_t> peerIds = m_connection->getPeerIDs();
649 std::list<session_t> pendingDisconnect;
650 std::map<session_t, bool> pending_unreliable;
652 const unsigned int peer_packet_quota = m_iteration_packets_avaialble
653 / MYMAX(peerIds.size(), 1);
655 for (session_t peerId : peerIds) {
656 PeerHelper peer = m_connection->getPeerNoEx(peerId);
657 //peer may have been removed
659 LOG(dout_con << m_connection->getDesc() << " Peer not found: peer_id="
664 peer->m_increment_packets_remaining = peer_packet_quota;
666 UDPPeer *udpPeer = dynamic_cast<UDPPeer *>(&peer);
672 if (udpPeer->m_pending_disconnect) {
673 pendingDisconnect.push_back(peerId);
676 PROFILE(std::stringstream
679 peerIdentifier << "sendPackets[" << m_connection->getDesc() << ";" << peerId
681 PROFILE(ScopeProfiler
682 peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
684 LOG(dout_con << m_connection->getDesc()
685 << " Handle per peer queues: peer_id=" << peerId
686 << " packet quota: " << peer->m_increment_packets_remaining << std::endl);
688 // first send queued reliable packets for all peers (if possible)
689 for (unsigned int i = 0; i < CHANNEL_COUNT; i++) {
690 Channel &channel = udpPeer->channels[i];
693 channel.outgoing_reliables_sent.getFirstSeqnum(next_to_ack);
694 u16 next_to_receive = 0;
695 channel.incoming_reliables.getFirstSeqnum(next_to_receive);
697 LOG(dout_con << m_connection->getDesc() << "\t channel: "
698 << i << ", peer quota:"
699 << peer->m_increment_packets_remaining
701 << "\t\t\treliables on wire: "
702 << channel.outgoing_reliables_sent.size()
703 << ", waiting for ack for " << next_to_ack
705 << "\t\t\tincoming_reliables: "
706 << channel.incoming_reliables.size()
707 << ", next reliable packet: "
708 << channel.readNextIncomingSeqNum()
709 << ", next queued: " << next_to_receive
711 << "\t\t\treliables queued : "
712 << channel.queued_reliables.size()
714 << "\t\t\tqueued commands : "
715 << channel.queued_commands.size()
718 while (!channel.queued_reliables.empty() &&
719 channel.outgoing_reliables_sent.size()
720 < channel.getWindowSize() &&
721 peer->m_increment_packets_remaining > 0) {
722 BufferedPacket p = channel.queued_reliables.front();
723 channel.queued_reliables.pop();
724 LOG(dout_con << m_connection->getDesc()
725 << " INFO: sending a queued reliable packet "
727 << ", seqnum: " << readU16(&p.data[BASE_HEADER_SIZE + 1])
729 sendAsPacketReliable(p, &channel);
730 peer->m_increment_packets_remaining--;
735 if (!m_outgoing_queue.empty()) {
736 LOG(dout_con << m_connection->getDesc()
737 << " Handle non reliable queue ("
738 << m_outgoing_queue.size() << " pkts)" << std::endl);
741 unsigned int initial_queuesize = m_outgoing_queue.size();
742 /* send non reliable packets*/
743 for (unsigned int i = 0; i < initial_queuesize; i++) {
744 OutgoingPacket packet = m_outgoing_queue.front();
745 m_outgoing_queue.pop();
750 PeerHelper peer = m_connection->getPeerNoEx(packet.peer_id);
752 LOG(dout_con << m_connection->getDesc()
753 << " Outgoing queue: peer_id=" << packet.peer_id
754 << ">>>NOT<<< found on sending packet"
755 << ", channel " << (packet.channelnum % 0xFF)
756 << ", size: " << packet.data.getSize() << std::endl);
760 /* send acks immediately */
761 if (packet.ack || peer->m_increment_packets_remaining > 0 || stopRequested()) {
762 rawSendAsPacket(packet.peer_id, packet.channelnum,
763 packet.data, packet.reliable);
764 if (peer->m_increment_packets_remaining > 0)
765 peer->m_increment_packets_remaining--;
767 m_outgoing_queue.push(packet);
768 pending_unreliable[packet.peer_id] = true;
772 if (peer_packet_quota > 0) {
773 for (session_t peerId : peerIds) {
774 PeerHelper peer = m_connection->getPeerNoEx(peerId);
777 if (peer->m_increment_packets_remaining == 0) {
778 LOG(warningstream << m_connection->getDesc()
779 << " Packet quota used up for peer_id=" << peerId
780 << ", was " << peer_packet_quota << " pkts" << std::endl);
785 for (session_t peerId : pendingDisconnect) {
786 if (!pending_unreliable[peerId]) {
787 m_connection->deletePeer(peerId, false);
792 void ConnectionSendThread::sendAsPacket(session_t peer_id, u8 channelnum,
793 const SharedBuffer<u8> &data, bool ack)
795 OutgoingPacket packet(peer_id, channelnum, data, false, ack);
796 m_outgoing_queue.push(packet);
799 ConnectionReceiveThread::ConnectionReceiveThread(unsigned int max_packet_size) :
800 Thread("ConnectionReceive")
804 void *ConnectionReceiveThread::run()
806 assert(m_connection);
808 LOG(dout_con << m_connection->getDesc()
809 << "ConnectionReceive thread started" << std::endl);
811 PROFILE(std::stringstream
813 PROFILE(ThreadIdentifier << "ConnectionReceive: [" << m_connection->getDesc() << "]");
815 // use IPv6 minimum allowed MTU as receive buffer size as this is
816 // theoretical reliable upper boundary of a udp packet for all IPv6 enabled
818 const unsigned int packet_maxsize = 1500;
819 SharedBuffer<u8> packetdata(packet_maxsize);
821 bool packet_queued = true;
823 #ifdef DEBUG_CONNECTION_KBPS
824 u64 curtime = porting::getTimeMs();
825 u64 lasttime = curtime;
826 float debug_print_timer = 0.0;
829 while (!stopRequested()) {
830 BEGIN_DEBUG_EXCEPTION_HANDLER
831 PROFILE(ScopeProfiler
832 sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
834 #ifdef DEBUG_CONNECTION_KBPS
836 curtime = porting::getTimeMs();
837 float dtime = CALC_DTIME(lasttime,curtime);
840 /* receive packets */
841 receive(packetdata, packet_queued);
843 #ifdef DEBUG_CONNECTION_KBPS
844 debug_print_timer += dtime;
845 if (debug_print_timer > 20.0) {
846 debug_print_timer -= 20.0;
848 std::list<session_t> peerids = m_connection->getPeerIDs();
850 for (std::list<session_t>::iterator i = peerids.begin();
854 PeerHelper peer = m_connection->getPeerNoEx(*i);
858 float peer_current = 0.0;
859 float peer_loss = 0.0;
860 float avg_rate = 0.0;
861 float avg_loss = 0.0;
863 for(u16 j=0; j<CHANNEL_COUNT; j++)
865 peer_current +=peer->channels[j].getCurrentDownloadRateKB();
866 peer_loss += peer->channels[j].getCurrentLossRateKB();
867 avg_rate += peer->channels[j].getAvgDownloadRateKB();
868 avg_loss += peer->channels[j].getAvgLossRateKB();
871 std::stringstream output;
872 output << std::fixed << std::setprecision(1);
873 output << "OUT to Peer " << *i << " RATES (good / loss) " << std::endl;
874 output << "\tcurrent (sum): " << peer_current << "kb/s "<< peer_loss << "kb/s" << std::endl;
875 output << "\taverage (sum): " << avg_rate << "kb/s "<< avg_loss << "kb/s" << std::endl;
876 output << std::setfill(' ');
877 for(u16 j=0; j<CHANNEL_COUNT; j++)
879 output << "\tcha " << j << ":"
880 << " CUR: " << std::setw(6) << peer->channels[j].getCurrentDownloadRateKB() <<"kb/s"
881 << " AVG: " << std::setw(6) << peer->channels[j].getAvgDownloadRateKB() <<"kb/s"
882 << " MAX: " << std::setw(6) << peer->channels[j].getMaxDownloadRateKB() <<"kb/s"
884 << " CUR: " << std::setw(6) << peer->channels[j].getCurrentLossRateKB() <<"kb/s"
885 << " AVG: " << std::setw(6) << peer->channels[j].getAvgLossRateKB() <<"kb/s"
886 << " MAX: " << std::setw(6) << peer->channels[j].getMaxLossRateKB() <<"kb/s"
887 << " / WS: " << peer->channels[j].getWindowSize()
891 fprintf(stderr,"%s\n",output.str().c_str());
895 END_DEBUG_EXCEPTION_HANDLER
898 PROFILE(g_profiler->remove(ThreadIdentifier.str()));
902 // Receive packets from the network and buffers and create ConnectionEvents
903 void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata,
907 // First, see if there any buffered packets we can process now
909 bool data_left = true;
911 SharedBuffer<u8> resultdata;
914 data_left = getFromBuffers(peer_id, resultdata);
917 e.dataReceived(peer_id, resultdata);
918 m_connection->putEvent(e);
921 catch (ProcessedSilentlyException &e) {
922 /* try reading again */
925 packet_queued = false;
928 // Call Receive() to wait for incoming data
930 s32 received_size = m_connection->m_udpSocket.Receive(sender,
931 *packetdata, packetdata.getSize());
932 if (received_size < 0)
935 if ((received_size < BASE_HEADER_SIZE) ||
936 (readU32(&packetdata[0]) != m_connection->GetProtocolID())) {
937 LOG(derr_con << m_connection->getDesc()
938 << "Receive(): Invalid incoming packet, "
939 << "size: " << received_size
941 << ((received_size >= 4) ? readU32(&packetdata[0]) : -1)
946 session_t peer_id = readPeerId(*packetdata);
947 u8 channelnum = readChannel(*packetdata);
949 if (channelnum > CHANNEL_COUNT - 1) {
950 LOG(derr_con << m_connection->getDesc()
951 << "Receive(): Invalid channel " << (u32)channelnum << std::endl);
955 /* Try to identify peer by sender address (may happen on join) */
956 if (peer_id == PEER_ID_INEXISTENT) {
957 peer_id = m_connection->lookupPeer(sender);
958 // We do not have to remind the peer of its
959 // peer id as the CONTROLTYPE_SET_PEER_ID
960 // command was sent reliably.
963 /* The peer was not found in our lists. Add it. */
964 if (peer_id == PEER_ID_INEXISTENT) {
965 peer_id = m_connection->createPeer(sender, MTP_MINETEST_RELIABLE_UDP, 0);
968 PeerHelper peer = m_connection->getPeerNoEx(peer_id);
970 LOG(dout_con << m_connection->getDesc()
971 << " got packet from unknown peer_id: "
972 << peer_id << " Ignoring." << std::endl);
976 // Validate peer address
978 Address peer_address;
979 if (peer->getAddress(MTP_UDP, peer_address)) {
980 if (peer_address != sender) {
981 LOG(derr_con << m_connection->getDesc()
982 << " Peer " << peer_id << " sending from different address."
983 " Ignoring." << std::endl);
987 LOG(derr_con << m_connection->getDesc()
988 << " Peer " << peer_id << " doesn't have an address?!"
989 " Ignoring." << std::endl);
993 peer->ResetTimeout();
995 Channel *channel = nullptr;
996 if (dynamic_cast<UDPPeer *>(&peer)) {
997 channel = &dynamic_cast<UDPPeer *>(&peer)->channels[channelnum];
999 LOG(derr_con << m_connection->getDesc()
1000 << "Receive(): peer_id=" << peer_id << " isn't an UDPPeer?!"
1001 " Ignoring." << std::endl);
1005 channel->UpdateBytesReceived(received_size);
1007 // Throw the received packet to channel->processPacket()
1009 // Make a new SharedBuffer from the data without the base headers
1010 SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
1011 memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
1012 strippeddata.getSize());
1015 // Process it (the result is some data with no headers made by us)
1016 SharedBuffer<u8> resultdata = processPacket
1017 (channel, strippeddata, peer_id, channelnum, false);
1019 LOG(dout_con << m_connection->getDesc()
1020 << " ProcessPacket from peer_id: " << peer_id
1021 << ", channel: " << (u32)channelnum << ", returned "
1022 << resultdata.getSize() << " bytes" << std::endl);
1025 e.dataReceived(peer_id, resultdata);
1026 m_connection->putEvent(e);
1028 catch (ProcessedSilentlyException &e) {
1030 catch (ProcessedQueued &e) {
1031 // we set it to true anyway (see below)
1034 /* Every time we receive a packet it can happen that a previously
1035 * buffered packet is now ready to process. */
1036 packet_queued = true;
1038 catch (InvalidIncomingDataException &e) {
1042 bool ConnectionReceiveThread::getFromBuffers(session_t &peer_id, SharedBuffer<u8> &dst)
1044 std::list<session_t> peerids = m_connection->getPeerIDs();
1046 for (session_t peerid : peerids) {
1047 PeerHelper peer = m_connection->getPeerNoEx(peerid);
1051 if (dynamic_cast<UDPPeer *>(&peer) == 0)
1054 for (Channel &channel : (dynamic_cast<UDPPeer *>(&peer))->channels) {
1055 if (checkIncomingBuffers(&channel, peer_id, dst)) {
1063 bool ConnectionReceiveThread::checkIncomingBuffers(Channel *channel,
1064 session_t &peer_id, SharedBuffer<u8> &dst)
1066 u16 firstseqnum = 0;
1067 if (channel->incoming_reliables.getFirstSeqnum(firstseqnum)) {
1068 if (firstseqnum == channel->readNextIncomingSeqNum()) {
1069 BufferedPacket p = channel->incoming_reliables.popFirst();
1070 peer_id = readPeerId(*p.data);
1071 u8 channelnum = readChannel(*p.data);
1072 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE + 1]);
1074 LOG(dout_con << m_connection->getDesc()
1075 << "UNBUFFERING TYPE_RELIABLE"
1076 << " seqnum=" << seqnum
1077 << " peer_id=" << peer_id
1078 << " channel=" << ((int) channelnum & 0xff)
1081 channel->incNextIncomingSeqNum();
1083 u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
1084 // Get out the inside packet and re-process it
1085 SharedBuffer<u8> payload(p.data.getSize() - headers_size);
1086 memcpy(*payload, &p.data[headers_size], payload.getSize());
1088 dst = processPacket(channel, payload, peer_id, channelnum, true);
1095 SharedBuffer<u8> ConnectionReceiveThread::processPacket(Channel *channel,
1096 const SharedBuffer<u8> &packetdata, session_t peer_id, u8 channelnum, bool reliable)
1098 PeerHelper peer = m_connection->getPeerNoEx(peer_id);
1101 errorstream << "Peer not found (possible timeout)" << std::endl;
1102 throw ProcessedSilentlyException("Peer not found (possible timeout)");
1105 if (packetdata.getSize() < 1)
1106 throw InvalidIncomingDataException("packetdata.getSize() < 1");
1108 u8 type = readU8(&(packetdata[0]));
1110 if (MAX_UDP_PEERS <= 65535 && peer_id >= MAX_UDP_PEERS) {
1111 std::string errmsg = "Invalid peer_id=" + itos(peer_id);
1112 errorstream << errmsg << std::endl;
1113 throw InvalidIncomingDataException(errmsg.c_str());
1116 if (type >= PACKET_TYPE_MAX) {
1117 derr_con << m_connection->getDesc() << "Got invalid type=" << ((int) type & 0xff)
1119 throw InvalidIncomingDataException("Invalid packet type");
1122 const PacketTypeHandler &pHandle = packetTypeRouter[type];
1123 return (this->*pHandle.handler)(channel, packetdata, &peer, channelnum, reliable);
1126 const ConnectionReceiveThread::PacketTypeHandler
1127 ConnectionReceiveThread::packetTypeRouter[PACKET_TYPE_MAX] = {
1128 {&ConnectionReceiveThread::handlePacketType_Control},
1129 {&ConnectionReceiveThread::handlePacketType_Original},
1130 {&ConnectionReceiveThread::handlePacketType_Split},
1131 {&ConnectionReceiveThread::handlePacketType_Reliable},
1134 SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *channel,
1135 const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
1137 if (packetdata.getSize() < 2)
1138 throw InvalidIncomingDataException("packetdata.getSize() < 2");
1140 u8 controltype = readU8(&(packetdata[1]));
1142 if (controltype == CONTROLTYPE_ACK) {
1143 assert(channel != NULL);
1145 if (packetdata.getSize() < 4) {
1146 throw InvalidIncomingDataException(
1147 "packetdata.getSize() < 4 (ACK header size)");
1150 u16 seqnum = readU16(&packetdata[2]);
1151 LOG(dout_con << m_connection->getDesc() << " [ CONTROLTYPE_ACK: channelnum="
1152 << ((int) channelnum & 0xff) << ", peer_id=" << peer->id << ", seqnum="
1153 << seqnum << " ]" << std::endl);
1156 BufferedPacket p = channel->outgoing_reliables_sent.popSeqnum(seqnum);
1158 // only calculate rtt from straight sent packets
1159 if (p.resend_count == 0) {
1160 // Get round trip time
1161 u64 current_time = porting::getTimeMs();
1163 // a overflow is quite unlikely but as it'd result in major
1164 // rtt miscalculation we handle it here
1165 if (current_time > p.absolute_send_time) {
1166 float rtt = (current_time - p.absolute_send_time) / 1000.0;
1168 // Let peer calculate stuff according to it
1169 // (avg_rtt and resend_timeout)
1170 dynamic_cast<UDPPeer *>(peer)->reportRTT(rtt);
1171 } else if (p.totaltime > 0) {
1172 float rtt = p.totaltime;
1174 // Let peer calculate stuff according to it
1175 // (avg_rtt and resend_timeout)
1176 dynamic_cast<UDPPeer *>(peer)->reportRTT(rtt);
1179 // put bytes for max bandwidth calculation
1180 channel->UpdateBytesSent(p.data.getSize(), 1);
1181 if (channel->outgoing_reliables_sent.size() == 0)
1182 m_connection->TriggerSend();
1183 } catch (NotFoundException &e) {
1184 LOG(derr_con << m_connection->getDesc()
1185 << "WARNING: ACKed packet not in outgoing queue"
1186 << " seqnum=" << seqnum << std::endl);
1187 channel->UpdatePacketTooLateCounter();
1190 throw ProcessedSilentlyException("Got an ACK");
1191 } else if (controltype == CONTROLTYPE_SET_PEER_ID) {
1192 // Got a packet to set our peer id
1193 if (packetdata.getSize() < 4)
1194 throw InvalidIncomingDataException
1195 ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
1196 session_t peer_id_new = readU16(&packetdata[2]);
1197 LOG(dout_con << m_connection->getDesc() << "Got new peer id: " << peer_id_new
1198 << "... " << std::endl);
1200 if (m_connection->GetPeerID() != PEER_ID_INEXISTENT) {
1201 LOG(derr_con << m_connection->getDesc()
1202 << "WARNING: Not changing existing peer id." << std::endl);
1204 LOG(dout_con << m_connection->getDesc() << "changing own peer id"
1206 m_connection->SetPeerID(peer_id_new);
1209 throw ProcessedSilentlyException("Got a SET_PEER_ID");
1210 } else if (controltype == CONTROLTYPE_PING) {
1211 // Just ignore it, the incoming data already reset
1212 // the timeout counter
1213 LOG(dout_con << m_connection->getDesc() << "PING" << std::endl);
1214 throw ProcessedSilentlyException("Got a PING");
1215 } else if (controltype == CONTROLTYPE_DISCO) {
1216 // Just ignore it, the incoming data already reset
1217 // the timeout counter
1218 LOG(dout_con << m_connection->getDesc() << "DISCO: Removing peer "
1219 << peer->id << std::endl);
1221 if (!m_connection->deletePeer(peer->id, false)) {
1222 derr_con << m_connection->getDesc() << "DISCO: Peer not found" << std::endl;
1225 throw ProcessedSilentlyException("Got a DISCO");
1227 LOG(derr_con << m_connection->getDesc()
1228 << "INVALID TYPE_CONTROL: invalid controltype="
1229 << ((int) controltype & 0xff) << std::endl);
1230 throw InvalidIncomingDataException("Invalid control type");
1234 SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Original(Channel *channel,
1235 const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
1237 if (packetdata.getSize() <= ORIGINAL_HEADER_SIZE)
1238 throw InvalidIncomingDataException
1239 ("packetdata.getSize() <= ORIGINAL_HEADER_SIZE");
1240 LOG(dout_con << m_connection->getDesc() << "RETURNING TYPE_ORIGINAL to user"
1242 // Get the inside packet out and return it
1243 SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
1244 memcpy(*payload, &(packetdata[ORIGINAL_HEADER_SIZE]), payload.getSize());
1248 SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Split(Channel *channel,
1249 const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
1251 Address peer_address;
1253 if (peer->getAddress(MTP_UDP, peer_address)) {
1254 // We have to create a packet again for buffering
1255 // This isn't actually too bad an idea.
1256 BufferedPacket packet = makePacket(peer_address,
1258 m_connection->GetProtocolID(),
1262 // Buffer the packet
1263 SharedBuffer<u8> data = peer->addSplitPacket(channelnum, packet, reliable);
1265 if (data.getSize() != 0) {
1266 LOG(dout_con << m_connection->getDesc()
1267 << "RETURNING TYPE_SPLIT: Constructed full data, "
1268 << "size=" << data.getSize() << std::endl);
1271 LOG(dout_con << m_connection->getDesc() << "BUFFERED TYPE_SPLIT" << std::endl);
1272 throw ProcessedSilentlyException("Buffered a split packet chunk");
1275 // We should never get here.
1276 FATAL_ERROR("Invalid execution point");
1279 SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *channel,
1280 const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
1282 assert(channel != NULL);
1284 // Recursive reliable packets not allowed
1286 throw InvalidIncomingDataException("Found nested reliable packets");
1288 if (packetdata.getSize() < RELIABLE_HEADER_SIZE)
1289 throw InvalidIncomingDataException("packetdata.getSize() < RELIABLE_HEADER_SIZE");
1291 u16 seqnum = readU16(&packetdata[1]);
1292 bool is_future_packet = false;
1293 bool is_old_packet = false;
1295 /* packet is within our receive window send ack */
1296 if (seqnum_in_window(seqnum,
1297 channel->readNextIncomingSeqNum(), MAX_RELIABLE_WINDOW_SIZE)) {
1298 m_connection->sendAck(peer->id, channelnum, seqnum);
1300 is_future_packet = seqnum_higher(seqnum, channel->readNextIncomingSeqNum());
1301 is_old_packet = seqnum_higher(channel->readNextIncomingSeqNum(), seqnum);
1303 /* packet is not within receive window, don't send ack. *
1304 * if this was a valid packet it's gonna be retransmitted */
1305 if (is_future_packet)
1306 throw ProcessedSilentlyException(
1307 "Received packet newer then expected, not sending ack");
1309 /* seems like our ack was lost, send another one for a old packet */
1310 if (is_old_packet) {
1311 LOG(dout_con << m_connection->getDesc()
1312 << "RE-SENDING ACK: peer_id: " << peer->id
1313 << ", channel: " << (channelnum & 0xFF)
1314 << ", seqnum: " << seqnum << std::endl;)
1315 m_connection->sendAck(peer->id, channelnum, seqnum);
1317 // we already have this packet so this one was on wire at least
1318 // the current timeout
1319 // we don't know how long this packet was on wire don't do silly guessing
1320 // dynamic_cast<UDPPeer*>(&peer)->
1321 // reportRTT(dynamic_cast<UDPPeer*>(&peer)->getResendTimeout());
1323 throw ProcessedSilentlyException("Retransmitting ack for old packet");
1327 if (seqnum != channel->readNextIncomingSeqNum()) {
1328 Address peer_address;
1330 // this is a reliable packet so we have a udp address for sure
1331 peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
1332 // This one comes later, buffer it.
1333 // Actually we have to make a packet to buffer one.
1334 // Well, we have all the ingredients, so just do it.
1335 BufferedPacket packet = con::makePacket(
1338 m_connection->GetProtocolID(),
1342 channel->incoming_reliables.insert(packet, channel->readNextIncomingSeqNum());
1344 LOG(dout_con << m_connection->getDesc()
1345 << "BUFFERING, TYPE_RELIABLE peer_id: " << peer->id
1346 << ", channel: " << (channelnum & 0xFF)
1347 << ", seqnum: " << seqnum << std::endl;)
1349 throw ProcessedQueued("Buffered future reliable packet");
1350 } catch (AlreadyExistsException &e) {
1351 } catch (IncomingDataCorruption &e) {
1352 ConnectionCommand discon;
1353 discon.disconnect_peer(peer->id);
1354 m_connection->putCommand(discon);
1356 LOG(derr_con << m_connection->getDesc()
1357 << "INVALID, TYPE_RELIABLE peer_id: " << peer->id
1358 << ", channel: " << (channelnum & 0xFF)
1359 << ", seqnum: " << seqnum
1360 << "DROPPING CLIENT!" << std::endl;)
1364 /* we got a packet to process right now */
1365 LOG(dout_con << m_connection->getDesc()
1366 << "RECURSIVE, TYPE_RELIABLE peer_id: " << peer->id
1367 << ", channel: " << (channelnum & 0xFF)
1368 << ", seqnum: " << seqnum << std::endl;)
1371 /* check for resend case */
1372 u16 queued_seqnum = 0;
1373 if (channel->incoming_reliables.getFirstSeqnum(queued_seqnum)) {
1374 if (queued_seqnum == seqnum) {
1375 BufferedPacket queued_packet = channel->incoming_reliables.popFirst();
1376 /** TODO find a way to verify the new against the old packet */
1380 channel->incNextIncomingSeqNum();
1382 // Get out the inside packet and re-process it
1383 SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
1384 memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
1386 return processPacket(channel, payload, peer->id, channelnum, true);