3 Copyright (C) 2013 celeron55, Perttu Ahola <celeron55@gmail.com>
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation; either version 2.1 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU Lesser General Public License for more details.
15 You should have received a copy of the GNU Lesser General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
22 #include "irrlichttypes_bloated.h"
23 #include "peerhandler.h"
25 #include "constants.h"
26 #include "util/pointer.h"
27 #include "util/container.h"
28 #include "util/thread.h"
29 #include "util/numeric.h"
30 #include "networkprotocol.h"
41 class ConnectionReceiveThread;
42 class ConnectionSendThread;
44 typedef enum MTProtocols {
47 MTP_MINETEST_RELIABLE_UDP
50 #define MAX_UDP_PEERS 65535
52 #define SEQNUM_MAX 65535
54 inline bool seqnum_higher(u16 totest, u16 base)
58 if ((totest - base) > (SEQNUM_MAX/2))
64 if ((base - totest) > (SEQNUM_MAX/2))
70 inline bool seqnum_in_window(u16 seqnum, u16 next,u16 window_size)
72 u16 window_start = next;
73 u16 window_end = ( next + window_size ) % (SEQNUM_MAX+1);
75 if (window_start < window_end) {
76 return ((seqnum >= window_start) && (seqnum < window_end));
80 return ((seqnum < window_end) || (seqnum >= window_start));
83 static inline float CALC_DTIME(u64 lasttime, u64 curtime)
85 float value = ( curtime - lasttime) / 1000.0;
86 return MYMAX(MYMIN(value,0.1),0.0);
91 BufferedPacket(u8 *a_data, u32 a_size):
94 BufferedPacket(u32 a_size):
97 Buffer<u8> data; // Data of the packet, including headers
98 float time = 0.0f; // Seconds from buffering the packet or re-sending
99 float totaltime = 0.0f; // Seconds from buffering the packet
100 u64 absolute_send_time = -1;
101 Address address; // Sender or destination
102 unsigned int resend_count = 0;
105 // This adds the base headers to the data and makes a packet out of it
106 BufferedPacket makePacket(Address &address, const SharedBuffer<u8> &data,
107 u32 protocol_id, session_t sender_peer_id, u8 channel);
109 // Depending on size, make a TYPE_ORIGINAL or TYPE_SPLIT packet
110 // Increments split_seqnum if a split packet is made
111 void makeAutoSplitPacket(const SharedBuffer<u8> &data, u32 chunksize_max,
112 u16 &split_seqnum, std::list<SharedBuffer<u8>> *list);
114 // Add the TYPE_RELIABLE header to the data
115 SharedBuffer<u8> makeReliablePacket(const SharedBuffer<u8> &data, u16 seqnum);
117 struct IncomingSplitPacket
119 IncomingSplitPacket(u32 cc, bool r):
120 chunk_count(cc), reliable(r) {}
122 IncomingSplitPacket() = delete;
124 float time = 0.0f; // Seconds from adding
126 bool reliable; // If true, isn't deleted on timeout
128 bool allReceived() const
130 return (chunks.size() == chunk_count);
132 bool insert(u32 chunk_num, SharedBuffer<u8> &chunkdata);
133 SharedBuffer<u8> reassemble();
136 // Key is chunk number, value is data without headers
137 std::map<u16, SharedBuffer<u8>> chunks;
143 A packet is sent through a channel to a peer with a basic header:
146 [4] session_t sender_peer_id
150 value 0 (PEER_ID_INEXISTENT) is reserved for making new connections
151 value 1 (PEER_ID_SERVER) is reserved for server
152 these constants are defined in constants.h
154 Channel numbers have no intrinsic meaning. Currently only 0, 1, 2 exist.
156 #define BASE_HEADER_SIZE 7
157 #define CHANNEL_COUNT 3
161 CONTROL: This is a packet used by the protocol.
162 - When this is processed, nothing is handed to the user.
166 controltype and data description:
169 CONTROLTYPE_SET_PEER_ID
170 [2] session_t peer_id_new
172 - There is no actual reply, but this can be sent in a reliable
173 packet to get a reply
176 //#define TYPE_CONTROL 0
177 #define CONTROLTYPE_ACK 0
178 #define CONTROLTYPE_SET_PEER_ID 1
179 #define CONTROLTYPE_PING 2
180 #define CONTROLTYPE_DISCO 3
183 ORIGINAL: This is a plain packet with no control and no error
185 - When this is processed, it is directly handed to the user.
189 //#define TYPE_ORIGINAL 1
190 #define ORIGINAL_HEADER_SIZE 1
192 SPLIT: These are sequences of packets forming one bigger piece of
194 - When processed and all the packet_nums 0...packet_count-1 are
195 present (this should be buffered), the resulting data shall be
196 directly handed to the user.
197 - If the data fails to come up in a reasonable time, the buffer shall
198 be silently discarded.
199 - These can be sent as-is or atop of a RELIABLE packet stream.
206 //#define TYPE_SPLIT 2
208 RELIABLE: Delivery of all RELIABLE packets shall be forced by ACKs,
209 and they shall be delivered in the same order as sent. This is done
210 with a buffer in the receiving and transmitting end.
211 - When this is processed, the contents of each packet is recursively
212 processed as packets.
218 //#define TYPE_RELIABLE 3
219 #define RELIABLE_HEADER_SIZE 3
220 #define SEQNUM_INITIAL 65500
222 enum PacketType: u8 {
223 PACKET_TYPE_CONTROL = 0,
224 PACKET_TYPE_ORIGINAL = 1,
225 PACKET_TYPE_SPLIT = 2,
226 PACKET_TYPE_RELIABLE = 3,
230 A buffer which stores reliable packets and sorts them internally
231 for fast access to the smallest one.
234 typedef std::list<BufferedPacket>::iterator RPBSearchResult;
236 class ReliablePacketBuffer
239 ReliablePacketBuffer() = default;
241 bool getFirstSeqnum(u16& result);
243 BufferedPacket popFirst();
244 BufferedPacket popSeqnum(u16 seqnum);
245 void insert(BufferedPacket &p, u16 next_expected);
247 void incrementTimeouts(float dtime);
248 std::list<BufferedPacket> getTimedOuts(float timeout,
249 unsigned int max_packets);
253 RPBSearchResult notFound();
258 RPBSearchResult findPacket(u16 seqnum); // does not perform locking
260 std::list<BufferedPacket> m_list;
262 u16 m_oldest_non_answered_ack;
264 std::mutex m_list_mutex;
268 A buffer for reconstructing split packets
271 class IncomingSplitBuffer
274 ~IncomingSplitBuffer();
276 Returns a reference counted buffer of length != 0 when a full split
277 packet is constructed. If not, returns one of length 0.
279 SharedBuffer<u8> insert(const BufferedPacket &p, bool reliable);
281 void removeUnreliableTimedOuts(float dtime, float timeout);
285 std::map<u16, IncomingSplitPacket*> m_buf;
287 std::mutex m_map_mutex;
290 struct OutgoingPacket
294 SharedBuffer<u8> data;
298 OutgoingPacket(session_t peer_id_, u8 channelnum_, const SharedBuffer<u8> &data_,
299 bool reliable_,bool ack_=false):
301 channelnum(channelnum_),
309 enum ConnectionCommandType{
314 CONNCMD_DISCONNECT_PEER,
321 struct ConnectionCommand
323 enum ConnectionCommandType type = CONNCMD_NONE;
325 session_t peer_id = PEER_ID_INEXISTENT;
328 bool reliable = false;
331 ConnectionCommand() = default;
332 ConnectionCommand &operator=(const ConnectionCommand &other)
335 address = other.address;
336 peer_id = other.peer_id;
337 channelnum = other.channelnum;
338 // We must copy the buffer here to prevent race condition
339 data = SharedBuffer<u8>(*other.data, other.data.getSize());
340 reliable = other.reliable;
345 void serve(Address address_)
347 type = CONNCMD_SERVE;
350 void connect(Address address_)
352 type = CONNCMD_CONNECT;
357 type = CONNCMD_DISCONNECT;
359 void disconnect_peer(session_t peer_id_)
361 type = CONNCMD_DISCONNECT_PEER;
365 void send(session_t peer_id_, u8 channelnum_, NetworkPacket *pkt, bool reliable_);
367 void ack(session_t peer_id_, u8 channelnum_, const SharedBuffer<u8> &data_)
371 channelnum = channelnum_;
376 void createPeer(session_t peer_id_, const SharedBuffer<u8> &data_)
378 type = CONCMD_CREATE_PEER;
387 /* maximum window size to use, 0xFFFF is theoretical maximum. don't think about
388 * touching it, the less you're away from it the more likely data corruption
391 #define MAX_RELIABLE_WINDOW_SIZE 0x8000
392 /* starting value for window size */
393 #define START_RELIABLE_WINDOW_SIZE 0x400
394 /* minimum value for window size */
395 #define MIN_RELIABLE_WINDOW_SIZE 0x40
401 u16 readNextIncomingSeqNum();
402 u16 incNextIncomingSeqNum();
404 u16 getOutgoingSequenceNumber(bool& successfull);
405 u16 readOutgoingSequenceNumber();
406 bool putBackSequenceNumber(u16);
408 u16 readNextSplitSeqNum();
409 void setNextSplitSeqNum(u16 seqnum);
411 // This is for buffering the incoming packets that are coming in
413 ReliablePacketBuffer incoming_reliables;
414 // This is for buffering the sent packets so that the sender can
415 // re-send them if no ACK is received
416 ReliablePacketBuffer outgoing_reliables_sent;
418 //queued reliable packets
419 std::queue<BufferedPacket> queued_reliables;
421 //queue commands prior splitting to packets
422 std::deque<ConnectionCommand> queued_commands;
424 IncomingSplitBuffer incoming_splits;
427 ~Channel() = default;
429 void UpdatePacketLossCounter(unsigned int count);
430 void UpdatePacketTooLateCounter();
431 void UpdateBytesSent(unsigned int bytes,unsigned int packages=1);
432 void UpdateBytesLost(unsigned int bytes);
433 void UpdateBytesReceived(unsigned int bytes);
435 void UpdateTimers(float dtime);
437 const float getCurrentDownloadRateKB()
438 { MutexAutoLock lock(m_internal_mutex); return cur_kbps; };
439 const float getMaxDownloadRateKB()
440 { MutexAutoLock lock(m_internal_mutex); return max_kbps; };
442 const float getCurrentLossRateKB()
443 { MutexAutoLock lock(m_internal_mutex); return cur_kbps_lost; };
444 const float getMaxLossRateKB()
445 { MutexAutoLock lock(m_internal_mutex); return max_kbps_lost; };
447 const float getCurrentIncomingRateKB()
448 { MutexAutoLock lock(m_internal_mutex); return cur_incoming_kbps; };
449 const float getMaxIncomingRateKB()
450 { MutexAutoLock lock(m_internal_mutex); return max_incoming_kbps; };
452 const float getAvgDownloadRateKB()
453 { MutexAutoLock lock(m_internal_mutex); return avg_kbps; };
454 const float getAvgLossRateKB()
455 { MutexAutoLock lock(m_internal_mutex); return avg_kbps_lost; };
456 const float getAvgIncomingRateKB()
457 { MutexAutoLock lock(m_internal_mutex); return avg_incoming_kbps; };
459 const unsigned int getWindowSize() const { return window_size; };
461 void setWindowSize(unsigned int size) { window_size = size; };
463 std::mutex m_internal_mutex;
464 int window_size = MIN_RELIABLE_WINDOW_SIZE;
466 u16 next_incoming_seqnum = SEQNUM_INITIAL;
468 u16 next_outgoing_seqnum = SEQNUM_INITIAL;
469 u16 next_outgoing_split_seqnum = SEQNUM_INITIAL;
471 unsigned int current_packet_loss = 0;
472 unsigned int current_packet_too_late = 0;
473 unsigned int current_packet_successful = 0;
474 float packet_loss_counter = 0.0f;
476 unsigned int current_bytes_transfered = 0;
477 unsigned int current_bytes_received = 0;
478 unsigned int current_bytes_lost = 0;
479 float max_kbps = 0.0f;
480 float cur_kbps = 0.0f;
481 float avg_kbps = 0.0f;
482 float max_incoming_kbps = 0.0f;
483 float cur_incoming_kbps = 0.0f;
484 float avg_incoming_kbps = 0.0f;
485 float max_kbps_lost = 0.0f;
486 float cur_kbps_lost = 0.0f;
487 float avg_kbps_lost = 0.0f;
488 float bpm_counter = 0.0f;
490 unsigned int rate_samples = 0;
498 PeerHelper() = default;
499 PeerHelper(Peer* peer);
502 PeerHelper& operator=(Peer* peer);
503 Peer* operator->() const;
505 Peer* operator&() const;
506 bool operator!=(void* ptr);
509 Peer *m_peer = nullptr;
525 friend class PeerHelper;
527 Peer(Address address_,u16 id_,Connection* connection) :
529 m_connection(connection),
531 m_last_timeout_check(porting::getTimeMs())
536 MutexAutoLock usage_lock(m_exclusive_access_mutex);
537 FATAL_ERROR_IF(m_usage != 0, "Reference counting failure");
540 // Unique id of the peer
545 virtual void PutReliableSendCommand(ConnectionCommand &c,
546 unsigned int max_packet_size) {};
548 virtual bool getAddress(MTProtocols type, Address& toset) = 0;
550 bool isPendingDeletion()
551 { MutexAutoLock lock(m_exclusive_access_mutex); return m_pending_deletion; };
554 {MutexAutoLock lock(m_exclusive_access_mutex); m_timeout_counter = 0.0; };
556 bool isTimedOut(float timeout);
558 unsigned int m_increment_packets_remaining = 0;
560 virtual u16 getNextSplitSequenceNumber(u8 channel) { return 0; };
561 virtual void setNextSplitSequenceNumber(u8 channel, u16 seqnum) {};
562 virtual SharedBuffer<u8> addSplitPacket(u8 channel, const BufferedPacket &toadd,
565 errorstream << "Peer::addSplitPacket called,"
566 << " this is supposed to be never called!" << std::endl;
567 return SharedBuffer<u8>(0);
570 virtual bool Ping(float dtime, SharedBuffer<u8>& data) { return false; };
572 virtual float getStat(rtt_stat_type type) const {
575 return m_rtt.min_rtt;
577 return m_rtt.max_rtt;
579 return m_rtt.avg_rtt;
581 return m_rtt.jitter_min;
583 return m_rtt.jitter_max;
585 return m_rtt.jitter_avg;
590 virtual void reportRTT(float rtt) {};
592 void RTTStatistics(float rtt,
593 const std::string &profiler_id = "",
594 unsigned int num_samples = 1000);
599 std::mutex m_exclusive_access_mutex;
601 bool m_pending_deletion = false;
603 Connection* m_connection;
605 // Address of the peer
609 float m_ping_timer = 0.0f;
613 float jitter_min = FLT_MAX;
614 float jitter_max = 0.0f;
615 float jitter_avg = -1.0f;
616 float min_rtt = FLT_MAX;
617 float max_rtt = 0.0f;
618 float avg_rtt = -1.0f;
620 rttstats() = default;
624 float m_last_rtt = -1.0f;
626 // current usage count
627 unsigned int m_usage = 0;
629 // Seconds from last receive
630 float m_timeout_counter = 0.0f;
632 u64 m_last_timeout_check;
635 class UDPPeer : public Peer
639 friend class PeerHelper;
640 friend class ConnectionReceiveThread;
641 friend class ConnectionSendThread;
642 friend class Connection;
644 UDPPeer(u16 a_id, Address a_address, Connection* connection);
645 virtual ~UDPPeer() = default;
647 void PutReliableSendCommand(ConnectionCommand &c,
648 unsigned int max_packet_size);
650 bool getAddress(MTProtocols type, Address& toset);
652 u16 getNextSplitSequenceNumber(u8 channel);
653 void setNextSplitSequenceNumber(u8 channel, u16 seqnum);
655 SharedBuffer<u8> addSplitPacket(u8 channel, const BufferedPacket &toadd,
660 Calculates avg_rtt and resend_timeout.
661 rtt=-1 only recalculates resend_timeout
663 void reportRTT(float rtt);
665 void RunCommandQueues(
666 unsigned int max_packet_size,
667 unsigned int maxcommands,
668 unsigned int maxtransfer);
670 float getResendTimeout()
671 { MutexAutoLock lock(m_exclusive_access_mutex); return resend_timeout; }
673 void setResendTimeout(float timeout)
674 { MutexAutoLock lock(m_exclusive_access_mutex); resend_timeout = timeout; }
675 bool Ping(float dtime,SharedBuffer<u8>& data);
677 Channel channels[CHANNEL_COUNT];
678 bool m_pending_disconnect = false;
680 // This is changed dynamically
681 float resend_timeout = 0.5;
683 bool processReliableSendCommand(
684 ConnectionCommand &c,
685 unsigned int max_packet_size);
692 enum ConnectionEventType{
694 CONNEVENT_DATA_RECEIVED,
695 CONNEVENT_PEER_ADDED,
696 CONNEVENT_PEER_REMOVED,
697 CONNEVENT_BIND_FAILED,
700 struct ConnectionEvent
702 enum ConnectionEventType type = CONNEVENT_NONE;
703 session_t peer_id = 0;
705 bool timeout = false;
708 ConnectionEvent() = default;
710 std::string describe()
714 return "CONNEVENT_NONE";
715 case CONNEVENT_DATA_RECEIVED:
716 return "CONNEVENT_DATA_RECEIVED";
717 case CONNEVENT_PEER_ADDED:
718 return "CONNEVENT_PEER_ADDED";
719 case CONNEVENT_PEER_REMOVED:
720 return "CONNEVENT_PEER_REMOVED";
721 case CONNEVENT_BIND_FAILED:
722 return "CONNEVENT_BIND_FAILED";
724 return "Invalid ConnectionEvent";
727 void dataReceived(session_t peer_id_, const SharedBuffer<u8> &data_)
729 type = CONNEVENT_DATA_RECEIVED;
733 void peerAdded(session_t peer_id_, Address address_)
735 type = CONNEVENT_PEER_ADDED;
739 void peerRemoved(session_t peer_id_, bool timeout_, Address address_)
741 type = CONNEVENT_PEER_REMOVED;
748 type = CONNEVENT_BIND_FAILED;
757 friend class ConnectionSendThread;
758 friend class ConnectionReceiveThread;
760 Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6,
761 PeerHandler *peerhandler);
765 ConnectionEvent waitEvent(u32 timeout_ms);
766 void putCommand(ConnectionCommand &c);
768 void SetTimeoutMs(u32 timeout) { m_bc_receive_timeout = timeout; }
769 void Serve(Address bind_addr);
770 void Connect(Address address);
773 void Receive(NetworkPacket* pkt);
774 bool TryReceive(NetworkPacket *pkt);
775 void Send(session_t peer_id, u8 channelnum, NetworkPacket *pkt, bool reliable);
776 session_t GetPeerID() const { return m_peer_id; }
777 Address GetPeerAddress(session_t peer_id);
778 float getPeerStat(session_t peer_id, rtt_stat_type type);
779 float getLocalStat(rate_stat_type type);
780 const u32 GetProtocolID() const { return m_protocol_id; };
781 const std::string getDesc();
782 void DisconnectPeer(session_t peer_id);
785 PeerHelper getPeerNoEx(session_t peer_id);
786 u16 lookupPeer(Address& sender);
788 u16 createPeer(Address& sender, MTProtocols protocol, int fd);
789 UDPPeer* createServerPeer(Address& sender);
790 bool deletePeer(session_t peer_id, bool timeout);
792 void SetPeerID(session_t id) { m_peer_id = id; }
794 void sendAck(session_t peer_id, u8 channelnum, u16 seqnum);
796 void PrintInfo(std::ostream &out);
798 std::list<session_t> getPeerIDs()
800 MutexAutoLock peerlock(m_peers_mutex);
804 UDPSocket m_udpSocket;
805 MutexedQueue<ConnectionCommand> m_command_queue;
807 bool Receive(NetworkPacket *pkt, u32 timeout);
809 void putEvent(ConnectionEvent &e);
813 MutexedQueue<ConnectionEvent> m_event_queue;
815 session_t m_peer_id = 0;
818 std::map<session_t, Peer *> m_peers;
819 std::list<session_t> m_peer_ids;
820 std::mutex m_peers_mutex;
822 std::unique_ptr<ConnectionSendThread> m_sendThread;
823 std::unique_ptr<ConnectionReceiveThread> m_receiveThread;
825 std::mutex m_info_mutex;
827 // Backwards compatibility
828 PeerHandler *m_bc_peerhandler;
829 u32 m_bc_receive_timeout = 0;
831 bool m_shutting_down = false;
833 session_t m_next_remote_peer_id = 2;