3 Copyright (C) 2013 celeron55, Perttu Ahola <celeron55@gmail.com>
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation; either version 2.1 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU Lesser General Public License for more details.
15 You should have received a copy of the GNU Lesser General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
22 #include "irrlichttypes_bloated.h"
23 #include "peerhandler.h"
25 #include "constants.h"
26 #include "util/pointer.h"
27 #include "util/container.h"
28 #include "util/thread.h"
29 #include "util/numeric.h"
40 typedef enum MTProtocols {
43 MTP_MINETEST_RELIABLE_UDP
46 #define SEQNUM_MAX 65535
47 inline bool seqnum_higher(u16 totest, u16 base)
51 if ((totest - base) > (SEQNUM_MAX/2))
57 if ((base - totest) > (SEQNUM_MAX/2))
63 inline bool seqnum_in_window(u16 seqnum, u16 next,u16 window_size)
65 u16 window_start = next;
66 u16 window_end = ( next + window_size ) % (SEQNUM_MAX+1);
68 if (window_start < window_end) {
69 return ((seqnum >= window_start) && (seqnum < window_end));
73 return ((seqnum < window_end) || (seqnum >= window_start));
78 BufferedPacket(u8 *a_data, u32 a_size):
81 BufferedPacket(u32 a_size):
84 Buffer<u8> data; // Data of the packet, including headers
85 float time = 0.0f; // Seconds from buffering the packet or re-sending
86 float totaltime = 0.0f; // Seconds from buffering the packet
87 u64 absolute_send_time = -1;
88 Address address; // Sender or destination
89 unsigned int resend_count = 0;
92 // This adds the base headers to the data and makes a packet out of it
93 BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
94 u32 protocol_id, u16 sender_peer_id, u8 channel);
95 BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
96 u32 protocol_id, u16 sender_peer_id, u8 channel);
98 // Add the TYPE_ORIGINAL header to the data
99 SharedBuffer<u8> makeOriginalPacket(
100 SharedBuffer<u8> data);
102 // Split data in chunks and add TYPE_SPLIT headers to them
103 std::list<SharedBuffer<u8> > makeSplitPacket(
104 SharedBuffer<u8> data,
108 // Depending on size, make a TYPE_ORIGINAL or TYPE_SPLIT packet
109 // Increments split_seqnum if a split packet is made
110 std::list<SharedBuffer<u8> > makeAutoSplitPacket(
111 SharedBuffer<u8> data,
115 // Add the TYPE_RELIABLE header to the data
116 SharedBuffer<u8> makeReliablePacket(
117 const SharedBuffer<u8> &data,
120 struct IncomingSplitPacket
122 IncomingSplitPacket() = default;
124 // Key is chunk number, value is data without headers
125 std::map<u16, SharedBuffer<u8> > chunks;
127 float time = 0.0f; // Seconds from adding
128 bool reliable = false; // If true, isn't deleted on timeout
132 return (chunks.size() == chunk_count);
139 A packet is sent through a channel to a peer with a basic header:
140 TODO: Should we have a receiver_peer_id also?
143 [4] u16 sender_peer_id
147 value 0 (PEER_ID_INEXISTENT) is reserved for making new connections
148 value 1 (PEER_ID_SERVER) is reserved for server
149 these constants are defined in constants.h
151 The lower the number, the higher the priority is.
152 Only channels 0, 1 and 2 exist.
154 #define BASE_HEADER_SIZE 7
155 #define CHANNEL_COUNT 3
159 CONTROL: This is a packet used by the protocol.
160 - When this is processed, nothing is handed to the user.
164 controltype and data description:
167 CONTROLTYPE_SET_PEER_ID
170 - There is no actual reply, but this can be sent in a reliable
171 packet to get a reply
174 #define TYPE_CONTROL 0
175 #define CONTROLTYPE_ACK 0
176 #define CONTROLTYPE_SET_PEER_ID 1
177 #define CONTROLTYPE_PING 2
178 #define CONTROLTYPE_DISCO 3
179 #define CONTROLTYPE_ENABLE_BIG_SEND_WINDOW 4
182 ORIGINAL: This is a plain packet with no control and no error
184 - When this is processed, it is directly handed to the user.
188 #define TYPE_ORIGINAL 1
189 #define ORIGINAL_HEADER_SIZE 1
191 SPLIT: These are sequences of packets forming one bigger piece of
193 - When processed and all the packet_nums 0...packet_count-1 are
194 present (this should be buffered), the resulting data shall be
195 directly handed to the user.
196 - If the data fails to come up in a reasonable time, the buffer shall
197 be silently discarded.
198 - These can be sent as-is or atop of a RELIABLE packet stream.
207 RELIABLE: Delivery of all RELIABLE packets shall be forced by ACKs,
208 and they shall be delivered in the same order as sent. This is done
209 with a buffer in the receiving and transmitting end.
210 - When this is processed, the contents of each packet is recursively
211 processed as packets.
217 #define TYPE_RELIABLE 3
218 #define RELIABLE_HEADER_SIZE 3
219 #define SEQNUM_INITIAL 65500
222 A buffer which stores reliable packets and sorts them internally
223 for fast access to the smallest one.
226 typedef std::list<BufferedPacket>::iterator RPBSearchResult;
228 class ReliablePacketBuffer
231 ReliablePacketBuffer() = default;
233 bool getFirstSeqnum(u16& result);
235 BufferedPacket popFirst();
236 BufferedPacket popSeqnum(u16 seqnum);
237 void insert(BufferedPacket &p,u16 next_expected);
239 void incrementTimeouts(float dtime);
240 std::list<BufferedPacket> getTimedOuts(float timeout,
241 unsigned int max_packets);
245 bool containsPacket(u16 seqnum);
246 RPBSearchResult notFound();
251 RPBSearchResult findPacket(u16 seqnum);
253 std::list<BufferedPacket> m_list;
256 u16 m_oldest_non_answered_ack;
258 std::mutex m_list_mutex;
262 A buffer for reconstructing split packets
265 class IncomingSplitBuffer
268 ~IncomingSplitBuffer();
270 Returns a reference counted buffer of length != 0 when a full split
271 packet is constructed. If not, returns one of length 0.
273 SharedBuffer<u8> insert(BufferedPacket &p, bool reliable);
275 void removeUnreliableTimedOuts(float dtime, float timeout);
279 std::map<u16, IncomingSplitPacket*> m_buf;
281 std::mutex m_map_mutex;
284 struct OutgoingPacket
288 SharedBuffer<u8> data;
292 OutgoingPacket(u16 peer_id_, u8 channelnum_, const SharedBuffer<u8> &data_,
293 bool reliable_,bool ack_=false):
295 channelnum(channelnum_),
303 enum ConnectionCommandType{
308 CONNCMD_DISCONNECT_PEER,
313 CONCMD_DISABLE_LEGACY
316 struct ConnectionCommand
318 enum ConnectionCommandType type = CONNCMD_NONE;
320 u16 peer_id = PEER_ID_INEXISTENT;
323 bool reliable = false;
326 ConnectionCommand() = default;
328 void serve(Address address_)
330 type = CONNCMD_SERVE;
333 void connect(Address address_)
335 type = CONNCMD_CONNECT;
340 type = CONNCMD_DISCONNECT;
342 void disconnect_peer(u16 peer_id_)
344 type = CONNCMD_DISCONNECT_PEER;
348 void send(u16 peer_id_, u8 channelnum_, NetworkPacket* pkt, bool reliable_);
350 void ack(u16 peer_id_, u8 channelnum_, const SharedBuffer<u8> &data_)
354 channelnum = channelnum_;
359 void createPeer(u16 peer_id_, const SharedBuffer<u8> &data_)
361 type = CONCMD_CREATE_PEER;
369 void disableLegacy(u16 peer_id_, const SharedBuffer<u8> &data_)
371 type = CONCMD_DISABLE_LEGACY;
380 /* maximum window size to use, 0xFFFF is theoretical maximum don't think about
381 * touching it, the less you're away from it the more likely data corruption
384 #define MAX_RELIABLE_WINDOW_SIZE 0x8000
385 /* starting value for window size */
386 #define MIN_RELIABLE_WINDOW_SIZE 0x40
392 u16 readNextIncomingSeqNum();
393 u16 incNextIncomingSeqNum();
395 u16 getOutgoingSequenceNumber(bool& successfull);
396 u16 readOutgoingSequenceNumber();
397 bool putBackSequenceNumber(u16);
399 u16 readNextSplitSeqNum();
400 void setNextSplitSeqNum(u16 seqnum);
402 // This is for buffering the incoming packets that are coming in
404 ReliablePacketBuffer incoming_reliables;
405 // This is for buffering the sent packets so that the sender can
406 // re-send them if no ACK is received
407 ReliablePacketBuffer outgoing_reliables_sent;
409 //queued reliable packets
410 std::queue<BufferedPacket> queued_reliables;
412 //queue commands prior splitting to packets
413 std::deque<ConnectionCommand> queued_commands;
415 IncomingSplitBuffer incoming_splits;
418 ~Channel() = default;
420 void UpdatePacketLossCounter(unsigned int count);
421 void UpdatePacketTooLateCounter();
422 void UpdateBytesSent(unsigned int bytes,unsigned int packages=1);
423 void UpdateBytesLost(unsigned int bytes);
424 void UpdateBytesReceived(unsigned int bytes);
426 void UpdateTimers(float dtime, bool legacy_peer);
428 const float getCurrentDownloadRateKB()
429 { MutexAutoLock lock(m_internal_mutex); return cur_kbps; };
430 const float getMaxDownloadRateKB()
431 { MutexAutoLock lock(m_internal_mutex); return max_kbps; };
433 const float getCurrentLossRateKB()
434 { MutexAutoLock lock(m_internal_mutex); return cur_kbps_lost; };
435 const float getMaxLossRateKB()
436 { MutexAutoLock lock(m_internal_mutex); return max_kbps_lost; };
438 const float getCurrentIncomingRateKB()
439 { MutexAutoLock lock(m_internal_mutex); return cur_incoming_kbps; };
440 const float getMaxIncomingRateKB()
441 { MutexAutoLock lock(m_internal_mutex); return max_incoming_kbps; };
443 const float getAvgDownloadRateKB()
444 { MutexAutoLock lock(m_internal_mutex); return avg_kbps; };
445 const float getAvgLossRateKB()
446 { MutexAutoLock lock(m_internal_mutex); return avg_kbps_lost; };
447 const float getAvgIncomingRateKB()
448 { MutexAutoLock lock(m_internal_mutex); return avg_incoming_kbps; };
450 const unsigned int getWindowSize() const { return window_size; };
452 void setWindowSize(unsigned int size) { window_size = size; };
454 std::mutex m_internal_mutex;
455 int window_size = MIN_RELIABLE_WINDOW_SIZE;
457 u16 next_incoming_seqnum = SEQNUM_INITIAL;
459 u16 next_outgoing_seqnum = SEQNUM_INITIAL;
460 u16 next_outgoing_split_seqnum = SEQNUM_INITIAL;
462 unsigned int current_packet_loss = 0;
463 unsigned int current_packet_too_late = 0;
464 unsigned int current_packet_successfull = 0;
465 float packet_loss_counter = 0.0f;
467 unsigned int current_bytes_transfered = 0;
468 unsigned int current_bytes_received = 0;
469 unsigned int current_bytes_lost = 0;
470 float max_kbps = 0.0f;
471 float cur_kbps = 0.0f;
472 float avg_kbps = 0.0f;
473 float max_incoming_kbps = 0.0f;
474 float cur_incoming_kbps = 0.0f;
475 float avg_incoming_kbps = 0.0f;
476 float max_kbps_lost = 0.0f;
477 float cur_kbps_lost = 0.0f;
478 float avg_kbps_lost = 0.0f;
479 float bpm_counter = 0.0f;
481 unsigned int rate_samples = 0;
489 PeerHelper() = default;
490 PeerHelper(Peer* peer);
493 PeerHelper& operator=(Peer* peer);
494 Peer* operator->() const;
496 Peer* operator&() const;
497 bool operator!=(void* ptr);
500 Peer *m_peer = nullptr;
516 friend class PeerHelper;
518 Peer(Address address_,u16 id_,Connection* connection) :
520 m_connection(connection),
522 m_last_timeout_check(porting::getTimeMs())
527 MutexAutoLock usage_lock(m_exclusive_access_mutex);
528 FATAL_ERROR_IF(m_usage != 0, "Reference counting failure");
531 // Unique id of the peer
536 virtual void PutReliableSendCommand(ConnectionCommand &c,
537 unsigned int max_packet_size) {};
539 virtual bool getAddress(MTProtocols type, Address& toset) = 0;
541 bool isPendingDeletion()
542 { MutexAutoLock lock(m_exclusive_access_mutex); return m_pending_deletion; };
545 {MutexAutoLock lock(m_exclusive_access_mutex); m_timeout_counter = 0.0; };
547 bool isTimedOut(float timeout);
549 unsigned int m_increment_packets_remaining = 9;
550 unsigned int m_increment_bytes_remaining = 0;
552 virtual u16 getNextSplitSequenceNumber(u8 channel) { return 0; };
553 virtual void setNextSplitSequenceNumber(u8 channel, u16 seqnum) {};
554 virtual SharedBuffer<u8> addSpiltPacket(u8 channel,
555 BufferedPacket toadd,
558 fprintf(stderr,"Peer: addSplitPacket called, this is supposed to be never called!\n");
559 return SharedBuffer<u8>(0);
562 virtual bool Ping(float dtime, SharedBuffer<u8>& data) { return false; };
564 virtual float getStat(rtt_stat_type type) const {
567 return m_rtt.min_rtt;
569 return m_rtt.max_rtt;
571 return m_rtt.avg_rtt;
573 return m_rtt.jitter_min;
575 return m_rtt.jitter_max;
577 return m_rtt.jitter_avg;
582 virtual void reportRTT(float rtt) {};
584 void RTTStatistics(float rtt,
585 const std::string &profiler_id = "",
586 unsigned int num_samples = 1000);
591 std::mutex m_exclusive_access_mutex;
593 bool m_pending_deletion = false;
595 Connection* m_connection;
597 // Address of the peer
601 float m_ping_timer = 0.0f;
605 float jitter_min = FLT_MAX;
606 float jitter_max = 0.0f;
607 float jitter_avg = -1.0f;
608 float min_rtt = FLT_MAX;
609 float max_rtt = 0.0f;
610 float avg_rtt = -1.0f;
612 rttstats() = default;
616 float m_last_rtt = -1.0f;
618 // current usage count
619 unsigned int m_usage = 0;
621 // Seconds from last receive
622 float m_timeout_counter = 0.0f;
624 u64 m_last_timeout_check;
627 class UDPPeer : public Peer
631 friend class PeerHelper;
632 friend class ConnectionReceiveThread;
633 friend class ConnectionSendThread;
634 friend class Connection;
636 UDPPeer(u16 a_id, Address a_address, Connection* connection);
637 virtual ~UDPPeer() = default;
639 void PutReliableSendCommand(ConnectionCommand &c,
640 unsigned int max_packet_size);
642 bool getAddress(MTProtocols type, Address& toset);
644 void setNonLegacyPeer();
647 { return m_legacy_peer; }
649 u16 getNextSplitSequenceNumber(u8 channel);
650 void setNextSplitSequenceNumber(u8 channel, u16 seqnum);
652 SharedBuffer<u8> addSpiltPacket(u8 channel,
653 BufferedPacket toadd,
659 Calculates avg_rtt and resend_timeout.
660 rtt=-1 only recalculates resend_timeout
662 void reportRTT(float rtt);
664 void RunCommandQueues(
665 unsigned int max_packet_size,
666 unsigned int maxcommands,
667 unsigned int maxtransfer);
669 float getResendTimeout()
670 { MutexAutoLock lock(m_exclusive_access_mutex); return resend_timeout; }
672 void setResendTimeout(float timeout)
673 { MutexAutoLock lock(m_exclusive_access_mutex); resend_timeout = timeout; }
674 bool Ping(float dtime,SharedBuffer<u8>& data);
676 Channel channels[CHANNEL_COUNT];
677 bool m_pending_disconnect = false;
679 // This is changed dynamically
680 float resend_timeout = 0.5;
682 bool processReliableSendCommand(
683 ConnectionCommand &c,
684 unsigned int max_packet_size);
686 bool m_legacy_peer = true;
693 enum ConnectionEventType{
695 CONNEVENT_DATA_RECEIVED,
696 CONNEVENT_PEER_ADDED,
697 CONNEVENT_PEER_REMOVED,
698 CONNEVENT_BIND_FAILED,
701 struct ConnectionEvent
703 enum ConnectionEventType type = CONNEVENT_NONE;
706 bool timeout = false;
709 ConnectionEvent() = default;
711 std::string describe()
715 return "CONNEVENT_NONE";
716 case CONNEVENT_DATA_RECEIVED:
717 return "CONNEVENT_DATA_RECEIVED";
718 case CONNEVENT_PEER_ADDED:
719 return "CONNEVENT_PEER_ADDED";
720 case CONNEVENT_PEER_REMOVED:
721 return "CONNEVENT_PEER_REMOVED";
722 case CONNEVENT_BIND_FAILED:
723 return "CONNEVENT_BIND_FAILED";
725 return "Invalid ConnectionEvent";
728 void dataReceived(u16 peer_id_, const SharedBuffer<u8> &data_)
730 type = CONNEVENT_DATA_RECEIVED;
734 void peerAdded(u16 peer_id_, Address address_)
736 type = CONNEVENT_PEER_ADDED;
740 void peerRemoved(u16 peer_id_, bool timeout_, Address address_)
742 type = CONNEVENT_PEER_REMOVED;
749 type = CONNEVENT_BIND_FAILED;
753 class ConnectionSendThread : public Thread {
756 friend class UDPPeer;
758 ConnectionSendThread(unsigned int max_packet_size, float timeout);
764 void setParent(Connection* parent) {
765 assert(parent != NULL); // Pre-condition
766 m_connection = parent;
769 void setPeerTimeout(float peer_timeout)
770 { m_timeout = peer_timeout; }
773 void runTimeouts (float dtime);
774 void rawSend (const BufferedPacket &packet);
775 bool rawSendAsPacket(u16 peer_id, u8 channelnum,
776 SharedBuffer<u8> data, bool reliable);
778 void processReliableCommand (ConnectionCommand &c);
779 void processNonReliableCommand (ConnectionCommand &c);
780 void serve (Address bind_address);
781 void connect (Address address);
783 void disconnect_peer(u16 peer_id);
784 void send (u16 peer_id, u8 channelnum,
785 SharedBuffer<u8> data);
786 void sendReliable (ConnectionCommand &c);
787 void sendToAll (u8 channelnum,
788 SharedBuffer<u8> data);
789 void sendToAllReliable(ConnectionCommand &c);
791 void sendPackets (float dtime);
793 void sendAsPacket (u16 peer_id, u8 channelnum,
794 SharedBuffer<u8> data,bool ack=false);
796 void sendAsPacketReliable(BufferedPacket& p, Channel* channel);
798 bool packetsQueued();
800 Connection *m_connection = nullptr;
801 unsigned int m_max_packet_size;
803 std::queue<OutgoingPacket> m_outgoing_queue;
804 Semaphore m_send_sleep_semaphore;
806 unsigned int m_iteration_packets_avaialble;
807 unsigned int m_max_commands_per_iteration = 1;
808 unsigned int m_max_data_packets_per_iteration;
809 unsigned int m_max_packets_requeued = 256;
812 class ConnectionReceiveThread : public Thread {
814 ConnectionReceiveThread(unsigned int max_packet_size);
818 void setParent(Connection *parent) {
819 assert(parent); // Pre-condition
820 m_connection = parent;
826 // Returns next data from a buffer if possible
827 // If found, returns true; if not, false.
828 // If found, sets peer_id and dst
829 bool getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst);
831 bool checkIncomingBuffers(Channel *channel, u16 &peer_id,
832 SharedBuffer<u8> &dst);
835 Processes a packet with the basic header stripped out.
837 packetdata: Data in packet (with no base headers)
838 peer_id: peer id of the sender of the packet in question
839 channelnum: channel on which the packet was sent
840 reliable: true if recursing into a reliable packet
842 SharedBuffer<u8> processPacket(Channel *channel,
843 SharedBuffer<u8> packetdata, u16 peer_id,
844 u8 channelnum, bool reliable);
847 Connection *m_connection = nullptr;
855 friend class ConnectionSendThread;
856 friend class ConnectionReceiveThread;
858 Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6,
859 PeerHandler *peerhandler);
863 ConnectionEvent waitEvent(u32 timeout_ms);
864 void putCommand(ConnectionCommand &c);
866 void SetTimeoutMs(int timeout) { m_bc_receive_timeout = timeout; }
867 void Serve(Address bind_addr);
868 void Connect(Address address);
871 void Receive(NetworkPacket* pkt);
872 void Send(u16 peer_id, u8 channelnum, NetworkPacket* pkt, bool reliable);
873 u16 GetPeerID() { return m_peer_id; }
874 Address GetPeerAddress(u16 peer_id);
875 float getPeerStat(u16 peer_id, rtt_stat_type type);
876 float getLocalStat(rate_stat_type type);
877 const u32 GetProtocolID() const { return m_protocol_id; };
878 const std::string getDesc();
879 void DisconnectPeer(u16 peer_id);
882 PeerHelper getPeer(u16 peer_id);
883 PeerHelper getPeerNoEx(u16 peer_id);
884 u16 lookupPeer(Address& sender);
886 u16 createPeer(Address& sender, MTProtocols protocol, int fd);
887 UDPPeer* createServerPeer(Address& sender);
888 bool deletePeer(u16 peer_id, bool timeout);
890 void SetPeerID(u16 id) { m_peer_id = id; }
892 void sendAck(u16 peer_id, u8 channelnum, u16 seqnum);
894 void PrintInfo(std::ostream &out);
897 std::list<u16> getPeerIDs()
899 MutexAutoLock peerlock(m_peers_mutex);
903 UDPSocket m_udpSocket;
904 MutexedQueue<ConnectionCommand> m_command_queue;
906 void putEvent(ConnectionEvent &e);
909 { m_sendThread.Trigger(); }
911 std::list<Peer*> getPeers();
913 MutexedQueue<ConnectionEvent> m_event_queue;
918 std::map<u16, Peer*> m_peers;
919 std::list<u16> m_peer_ids;
920 std::mutex m_peers_mutex;
922 ConnectionSendThread m_sendThread;
923 ConnectionReceiveThread m_receiveThread;
925 std::mutex m_info_mutex;
927 // Backwards compatibility
928 PeerHandler *m_bc_peerhandler;
929 int m_bc_receive_timeout = 0;
931 bool m_shutting_down = false;
933 u16 m_next_remote_peer_id = 2;