3 Copyright (C) 2013 celeron55, Perttu Ahola <celeron55@gmail.com>
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation; either version 2.1 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU Lesser General Public License for more details.
15 You should have received a copy of the GNU Lesser General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 #ifndef CONNECTION_HEADER
21 #define CONNECTION_HEADER
23 #include "irrlichttypes_bloated.h"
25 #include "exceptions.h"
26 #include "constants.h"
27 #include "util/pointer.h"
28 #include "util/container.h"
29 #include "util/thread.h"
30 #include "util/numeric.h"
42 class NotFoundException : public BaseException
45 NotFoundException(const char *s):
50 class PeerNotFoundException : public BaseException
53 PeerNotFoundException(const char *s):
58 class ConnectionException : public BaseException
61 ConnectionException(const char *s):
66 class ConnectionBindFailed : public BaseException
69 ConnectionBindFailed(const char *s):
74 class InvalidIncomingDataException : public BaseException
77 InvalidIncomingDataException(const char *s):
82 class InvalidOutgoingDataException : public BaseException
85 InvalidOutgoingDataException(const char *s):
90 class NoIncomingDataException : public BaseException
93 NoIncomingDataException(const char *s):
98 class ProcessedSilentlyException : public BaseException
101 ProcessedSilentlyException(const char *s):
106 class ProcessedQueued : public BaseException
109 ProcessedQueued(const char *s):
114 class IncomingDataCorruption : public BaseException
117 IncomingDataCorruption(const char *s):
122 typedef enum MTProtocols {
125 MTP_MINETEST_RELIABLE_UDP
128 #define SEQNUM_MAX 65535
129 inline bool seqnum_higher(u16 totest, u16 base)
133 if((totest - base) > (SEQNUM_MAX/2))
140 if((base - totest) > (SEQNUM_MAX/2))
147 inline bool seqnum_in_window(u16 seqnum, u16 next,u16 window_size)
149 u16 window_start = next;
150 u16 window_end = ( next + window_size ) % (SEQNUM_MAX+1);
152 if (window_start < window_end)
154 return ((seqnum >= window_start) && (seqnum < window_end));
158 return ((seqnum < window_end) || (seqnum >= window_start));
162 struct BufferedPacket
164 BufferedPacket(u8 *a_data, u32 a_size):
165 data(a_data, a_size), time(0.0), totaltime(0.0), absolute_send_time(-1)
167 BufferedPacket(u32 a_size):
168 data(a_size), time(0.0), totaltime(0.0), absolute_send_time(-1)
170 SharedBuffer<u8> data; // Data of the packet, including headers
171 float time; // Seconds from buffering the packet or re-sending
172 float totaltime; // Seconds from buffering the packet
173 unsigned int absolute_send_time;
174 Address address; // Sender or destination
177 // This adds the base headers to the data and makes a packet out of it
178 BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
179 u32 protocol_id, u16 sender_peer_id, u8 channel);
180 BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
181 u32 protocol_id, u16 sender_peer_id, u8 channel);
183 // Add the TYPE_ORIGINAL header to the data
184 SharedBuffer<u8> makeOriginalPacket(
185 SharedBuffer<u8> data);
187 // Split data in chunks and add TYPE_SPLIT headers to them
188 std::list<SharedBuffer<u8> > makeSplitPacket(
189 SharedBuffer<u8> data,
193 // Depending on size, make a TYPE_ORIGINAL or TYPE_SPLIT packet
194 // Increments split_seqnum if a split packet is made
195 std::list<SharedBuffer<u8> > makeAutoSplitPacket(
196 SharedBuffer<u8> data,
200 // Add the TYPE_RELIABLE header to the data
201 SharedBuffer<u8> makeReliablePacket(
202 SharedBuffer<u8> data,
205 struct IncomingSplitPacket
207 IncomingSplitPacket()
212 // Key is chunk number, value is data without headers
213 std::map<u16, SharedBuffer<u8> > chunks;
215 float time; // Seconds from adding
216 bool reliable; // If true, isn't deleted on timeout
220 return (chunks.size() == chunk_count);
227 A packet is sent through a channel to a peer with a basic header:
228 TODO: Should we have a receiver_peer_id also?
231 [4] u16 sender_peer_id
235 value 0 (PEER_ID_INEXISTENT) is reserved for making new connections
236 value 1 (PEER_ID_SERVER) is reserved for server
237 these constants are defined in constants.h
239 The lower the number, the higher the priority is.
240 Only channels 0, 1 and 2 exist.
242 #define BASE_HEADER_SIZE 7
243 #define CHANNEL_COUNT 3
247 CONTROL: This is a packet used by the protocol.
248 - When this is processed, nothing is handed to the user.
252 controltype and data description:
255 CONTROLTYPE_SET_PEER_ID
258 - There is no actual reply, but this can be sent in a reliable
259 packet to get a reply
262 #define TYPE_CONTROL 0
263 #define CONTROLTYPE_ACK 0
264 #define CONTROLTYPE_SET_PEER_ID 1
265 #define CONTROLTYPE_PING 2
266 #define CONTROLTYPE_DISCO 3
267 #define CONTROLTYPE_ENABLE_BIG_SEND_WINDOW 4
270 ORIGINAL: This is a plain packet with no control and no error
272 - When this is processed, it is directly handed to the user.
276 #define TYPE_ORIGINAL 1
277 #define ORIGINAL_HEADER_SIZE 1
279 SPLIT: These are sequences of packets forming one bigger piece of
281 - When processed and all the packet_nums 0...packet_count-1 are
282 present (this should be buffered), the resulting data shall be
283 directly handed to the user.
284 - If the data fails to come up in a reasonable time, the buffer shall
285 be silently discarded.
286 - These can be sent as-is or atop of a RELIABLE packet stream.
295 RELIABLE: Delivery of all RELIABLE packets shall be forced by ACKs,
296 and they shall be delivered in the same order as sent. This is done
297 with a buffer in the receiving and transmitting end.
298 - When this is processed, the contents of each packet is recursively
299 processed as packets.
305 #define TYPE_RELIABLE 3
306 #define RELIABLE_HEADER_SIZE 3
307 #define SEQNUM_INITIAL 65500
310 A buffer which stores reliable packets and sorts them internally
311 for fast access to the smallest one.
314 typedef std::list<BufferedPacket>::iterator RPBSearchResult;
316 class ReliablePacketBuffer
319 ReliablePacketBuffer();
321 bool getFirstSeqnum(u16& result);
323 BufferedPacket popFirst();
324 BufferedPacket popSeqnum(u16 seqnum);
325 void insert(BufferedPacket &p,u16 next_expected);
327 void incrementTimeouts(float dtime);
328 std::list<BufferedPacket> getTimedOuts(float timeout,
329 unsigned int max_packets);
333 bool containsPacket(u16 seqnum);
334 RPBSearchResult notFound();
339 RPBSearchResult findPacket(u16 seqnum);
341 std::list<BufferedPacket> m_list;
344 u16 m_oldest_non_answered_ack;
350 A buffer for reconstructing split packets
353 class IncomingSplitBuffer
356 ~IncomingSplitBuffer();
358 Returns a reference counted buffer of length != 0 when a full split
359 packet is constructed. If not, returns one of length 0.
361 SharedBuffer<u8> insert(BufferedPacket &p, bool reliable);
363 void removeUnreliableTimedOuts(float dtime, float timeout);
367 std::map<u16, IncomingSplitPacket*> m_buf;
372 struct OutgoingPacket
376 SharedBuffer<u8> data;
380 OutgoingPacket(u16 peer_id_, u8 channelnum_, SharedBuffer<u8> data_,
381 bool reliable_,bool ack_=false):
383 channelnum(channelnum_),
391 enum ConnectionCommandType{
396 CONNCMD_DISCONNECT_PEER,
401 CONCMD_DISABLE_LEGACY
404 struct ConnectionCommand
406 enum ConnectionCommandType type;
414 ConnectionCommand(): type(CONNCMD_NONE), peer_id(PEER_ID_INEXISTENT), reliable(false), raw(false) {}
416 void serve(Address address_)
418 type = CONNCMD_SERVE;
421 void connect(Address address_)
423 type = CONNCMD_CONNECT;
428 type = CONNCMD_DISCONNECT;
430 void disconnect_peer(u16 peer_id_)
432 type = CONNCMD_DISCONNECT_PEER;
435 void send(u16 peer_id_, u8 channelnum_,
436 SharedBuffer<u8> data_, bool reliable_)
440 channelnum = channelnum_;
442 reliable = reliable_;
444 void sendToAll(u8 channelnum_, SharedBuffer<u8> data_, bool reliable_)
446 type = CONNCMD_SEND_TO_ALL;
447 channelnum = channelnum_;
449 reliable = reliable_;
452 void ack(u16 peer_id_, u8 channelnum_, SharedBuffer<u8> data_)
456 channelnum = channelnum_;
461 void createPeer(u16 peer_id_, SharedBuffer<u8> data_)
463 type = CONCMD_CREATE_PEER;
471 void disableLegacy(u16 peer_id_, SharedBuffer<u8> data_)
473 type = CONCMD_DISABLE_LEGACY;
486 u16 readNextIncomingSeqNum();
487 u16 incNextIncomingSeqNum();
489 u16 getOutgoingSequenceNumber(bool& successfull);
490 u16 readOutgoingSequenceNumber();
491 bool putBackSequenceNumber(u16);
493 u16 readNextSplitSeqNum();
494 void setNextSplitSeqNum(u16 seqnum);
496 // This is for buffering the incoming packets that are coming in
498 ReliablePacketBuffer incoming_reliables;
499 // This is for buffering the sent packets so that the sender can
500 // re-send them if no ACK is received
501 ReliablePacketBuffer outgoing_reliables_sent;
503 //queued reliable packets
504 Queue<BufferedPacket> queued_reliables;
506 //queue commands prior splitting to packets
507 Queue<ConnectionCommand> queued_commands;
509 IncomingSplitBuffer incoming_splits;
514 void UpdatePacketLossCounter(unsigned int count);
515 void UpdatePacketTooLateCounter();
516 void UpdateBytesSent(unsigned int bytes,unsigned int packages=1);
517 void UpdateBytesLost(unsigned int bytes);
518 void UpdateBytesReceived(unsigned int bytes);
520 void UpdateTimers(float dtime, bool legacy_peer);
522 const float getCurrentDownloadRateKB()
523 { JMutexAutoLock lock(m_internal_mutex); return cur_kbps; };
524 const float getMaxDownloadRateKB()
525 { JMutexAutoLock lock(m_internal_mutex); return max_kbps; };
527 const float getCurrentLossRateKB()
528 { JMutexAutoLock lock(m_internal_mutex); return cur_kbps_lost; };
529 const float getMaxLossRateKB()
530 { JMutexAutoLock lock(m_internal_mutex); return max_kbps_lost; };
532 const float getCurrentIncomingRateKB()
533 { JMutexAutoLock lock(m_internal_mutex); return cur_incoming_kbps; };
534 const float getMaxIncomingRateKB()
535 { JMutexAutoLock lock(m_internal_mutex); return max_incoming_kbps; };
537 const float getAvgDownloadRateKB()
538 { JMutexAutoLock lock(m_internal_mutex); return avg_kbps; };
539 const float getAvgLossRateKB()
540 { JMutexAutoLock lock(m_internal_mutex); return avg_kbps_lost; };
541 const float getAvgIncomingRateKB()
542 { JMutexAutoLock lock(m_internal_mutex); return avg_incoming_kbps; };
544 const unsigned int getWindowSize() const { return window_size; };
546 void setWindowSize(unsigned int size) { window_size = size; };
548 JMutex m_internal_mutex;
551 u16 next_incoming_seqnum;
553 u16 next_outgoing_seqnum;
554 u16 next_outgoing_split_seqnum;
556 unsigned int current_packet_loss;
557 unsigned int current_packet_too_late;
558 unsigned int current_packet_successfull;
559 float packet_loss_counter;
561 unsigned int current_bytes_transfered;
562 unsigned int current_bytes_received;
563 unsigned int current_bytes_lost;
567 float max_incoming_kbps;
568 float cur_incoming_kbps;
569 float avg_incoming_kbps;
575 unsigned int rate_samples;
599 virtual ~PeerHandler()
604 This is called after the Peer has been inserted into the
605 Connection's peer container.
607 virtual void peerAdded(Peer *peer) = 0;
609 This is called before the Peer has been removed from the
610 Connection's peer container.
612 virtual void deletingPeer(Peer *peer, bool timeout) = 0;
619 PeerHelper(Peer* peer);
622 PeerHelper& operator=(Peer* peer);
623 Peer* operator->() const;
625 Peer* operator&() const;
626 bool operator!=(void* ptr);
654 friend class PeerHelper;
656 Peer(Address address_,u16 id_,Connection* connection) :
658 m_increment_packets_remaining(9),
659 m_increment_bytes_remaining(0),
660 m_pending_deletion(false),
661 m_connection(connection),
666 m_timeout_counter(0.0),
667 m_last_timeout_check(porting::getTimeMs()),
668 m_has_sent_with_id(false)
670 m_rtt.avg_rtt = -1.0;
671 m_rtt.jitter_avg = -1.0;
672 m_rtt.jitter_max = 0.0;
674 m_rtt.jitter_min = FLT_MAX;
675 m_rtt.min_rtt = FLT_MAX;
679 JMutexAutoLock usage_lock(m_exclusive_access_mutex);
680 assert(m_usage == 0);
683 // Unique id of the peer
688 virtual void PutReliableSendCommand(ConnectionCommand &c,
689 unsigned int max_packet_size) {};
691 virtual bool isActive() { return false; };
693 virtual bool getAddress(MTProtocols type, Address& toset) = 0;
696 {JMutexAutoLock lock(m_exclusive_access_mutex); m_timeout_counter=0.0; };
698 bool isTimedOut(float timeout);
701 { JMutexAutoLock lock(m_exclusive_access_mutex); m_has_sent_with_id = true; };
704 { JMutexAutoLock lock(m_exclusive_access_mutex); return m_has_sent_with_id; };
706 unsigned int m_increment_packets_remaining;
707 unsigned int m_increment_bytes_remaining;
709 virtual u16 getNextSplitSequenceNumber(u8 channel) { return 0; };
710 virtual void setNextSplitSequenceNumber(u8 channel, u16 seqnum) {};
711 virtual SharedBuffer<u8> addSpiltPacket(u8 channel,
712 BufferedPacket toadd,
715 fprintf(stderr,"Peer: addSplitPacket called, this is supposed to be never called!\n");
716 return SharedBuffer<u8>(0);
719 virtual bool Ping(float dtime, SharedBuffer<u8>& data) { return false; };
721 virtual float getStat(rtt_stat_type type) const {
724 return m_rtt.min_rtt;
726 return m_rtt.max_rtt;
728 return m_rtt.avg_rtt;
730 return m_rtt.jitter_min;
732 return m_rtt.jitter_max;
734 return m_rtt.jitter_avg;
739 virtual void reportRTT(float rtt) {};
741 void RTTStatistics(float rtt,
742 std::string profiler_id="",
743 unsigned int num_samples=1000);
748 JMutex m_exclusive_access_mutex;
750 bool m_pending_deletion;
752 Connection* m_connection;
754 // Address of the peer
773 // current usage count
774 unsigned int m_usage;
776 // Seconds from last receive
777 float m_timeout_counter;
779 u32 m_last_timeout_check;
781 bool m_has_sent_with_id;
784 class UDPPeer : public Peer
788 friend class PeerHelper;
789 friend class ConnectionReceiveThread;
790 friend class ConnectionSendThread;
791 friend class Connection;
793 UDPPeer(u16 a_id, Address a_address, Connection* connection);
794 virtual ~UDPPeer() {};
796 void PutReliableSendCommand(ConnectionCommand &c,
797 unsigned int max_packet_size);
800 { return ((hasSentWithID()) && (!m_pending_deletion)); };
802 bool getAddress(MTProtocols type, Address& toset);
804 void setNonLegacyPeer();
807 { return m_legacy_peer; }
809 u16 getNextSplitSequenceNumber(u8 channel);
810 void setNextSplitSequenceNumber(u8 channel, u16 seqnum);
812 SharedBuffer<u8> addSpiltPacket(u8 channel,
813 BufferedPacket toadd,
819 Calculates avg_rtt and resend_timeout.
820 rtt=-1 only recalculates resend_timeout
822 void reportRTT(float rtt);
824 void RunCommandQueues(
825 unsigned int max_packet_size,
826 unsigned int maxcommands,
827 unsigned int maxtransfer);
829 float getResendTimeout()
830 { JMutexAutoLock lock(m_exclusive_access_mutex); return resend_timeout; }
832 void setResendTimeout(float timeout)
833 { JMutexAutoLock lock(m_exclusive_access_mutex); resend_timeout = timeout; }
834 bool Ping(float dtime,SharedBuffer<u8>& data);
836 Channel channels[CHANNEL_COUNT];
837 bool m_pending_disconnect;
839 // This is changed dynamically
840 float resend_timeout;
842 bool processReliableSendCommand(
843 ConnectionCommand &c,
844 unsigned int max_packet_size);
853 enum ConnectionEventType{
855 CONNEVENT_DATA_RECEIVED,
856 CONNEVENT_PEER_ADDED,
857 CONNEVENT_PEER_REMOVED,
858 CONNEVENT_BIND_FAILED,
861 struct ConnectionEvent
863 enum ConnectionEventType type;
869 ConnectionEvent(): type(CONNEVENT_NONE) {}
871 std::string describe()
875 return "CONNEVENT_NONE";
876 case CONNEVENT_DATA_RECEIVED:
877 return "CONNEVENT_DATA_RECEIVED";
878 case CONNEVENT_PEER_ADDED:
879 return "CONNEVENT_PEER_ADDED";
880 case CONNEVENT_PEER_REMOVED:
881 return "CONNEVENT_PEER_REMOVED";
882 case CONNEVENT_BIND_FAILED:
883 return "CONNEVENT_BIND_FAILED";
885 return "Invalid ConnectionEvent";
888 void dataReceived(u16 peer_id_, SharedBuffer<u8> data_)
890 type = CONNEVENT_DATA_RECEIVED;
894 void peerAdded(u16 peer_id_, Address address_)
896 type = CONNEVENT_PEER_ADDED;
900 void peerRemoved(u16 peer_id_, bool timeout_, Address address_)
902 type = CONNEVENT_PEER_REMOVED;
909 type = CONNEVENT_BIND_FAILED;
913 class ConnectionSendThread : public JThread {
916 friend class UDPPeer;
918 ConnectionSendThread(Connection* parent,
919 unsigned int max_packet_size, float timeout);
925 void setPeerTimeout(float peer_timeout)
926 { m_timeout = peer_timeout; }
929 void runTimeouts (float dtime);
930 void rawSend (const BufferedPacket &packet);
931 bool rawSendAsPacket(u16 peer_id, u8 channelnum,
932 SharedBuffer<u8> data, bool reliable);
934 void processReliableCommand (ConnectionCommand &c);
935 void processNonReliableCommand (ConnectionCommand &c);
936 void serve (Address bind_address);
937 void connect (Address address);
939 void disconnect_peer(u16 peer_id);
940 void send (u16 peer_id, u8 channelnum,
941 SharedBuffer<u8> data);
942 void sendReliable (ConnectionCommand &c);
943 void sendToAll (u8 channelnum,
944 SharedBuffer<u8> data);
945 void sendToAllReliable(ConnectionCommand &c);
947 void sendPackets (float dtime);
949 void sendAsPacket (u16 peer_id, u8 channelnum,
950 SharedBuffer<u8> data,bool ack=false);
952 void sendAsPacketReliable(BufferedPacket& p, Channel* channel);
954 bool packetsQueued();
956 Connection* m_connection;
957 unsigned int m_max_packet_size;
959 Queue<OutgoingPacket> m_outgoing_queue;
960 JSemaphore m_send_sleep_semaphore;
962 unsigned int m_iteration_packets_avaialble;
963 unsigned int m_max_commands_per_iteration;
964 unsigned int m_max_data_packets_per_iteration;
965 unsigned int m_max_packets_requeued;
968 class ConnectionReceiveThread : public JThread {
970 ConnectionReceiveThread(Connection* parent,
971 unsigned int max_packet_size);
978 // Returns next data from a buffer if possible
979 // If found, returns true; if not, false.
980 // If found, sets peer_id and dst
981 bool getFromBuffers (u16 &peer_id, SharedBuffer<u8> &dst);
983 bool checkIncomingBuffers(Channel *channel, u16 &peer_id,
984 SharedBuffer<u8> &dst);
987 Processes a packet with the basic header stripped out.
989 packetdata: Data in packet (with no base headers)
990 peer_id: peer id of the sender of the packet in question
991 channelnum: channel on which the packet was sent
992 reliable: true if recursing into a reliable packet
994 SharedBuffer<u8> processPacket(Channel *channel,
995 SharedBuffer<u8> packetdata, u16 peer_id,
996 u8 channelnum, bool reliable);
999 Connection* m_connection;
1005 friend class ConnectionSendThread;
1006 friend class ConnectionReceiveThread;
1008 Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6);
1009 Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6,
1010 PeerHandler *peerhandler);
1014 ConnectionEvent getEvent();
1015 ConnectionEvent waitEvent(u32 timeout_ms);
1016 void putCommand(ConnectionCommand &c);
1018 void SetTimeoutMs(int timeout){ m_bc_receive_timeout = timeout; }
1019 void Serve(Address bind_addr);
1020 void Connect(Address address);
1023 u32 Receive(u16 &peer_id, SharedBuffer<u8> &data);
1024 void SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable);
1025 void Send(u16 peer_id, u8 channelnum, SharedBuffer<u8> data, bool reliable);
1026 u16 GetPeerID(){ return m_peer_id; }
1027 Address GetPeerAddress(u16 peer_id);
1028 float getPeerStat(u16 peer_id, rtt_stat_type type);
1029 float getLocalStat(rate_stat_type type);
1030 const u32 GetProtocolID() const { return m_protocol_id; };
1031 const std::string getDesc();
1032 void DisconnectPeer(u16 peer_id);
1035 PeerHelper getPeer(u16 peer_id);
1036 PeerHelper getPeerNoEx(u16 peer_id);
1037 u16 lookupPeer(Address& sender);
1039 u16 createPeer(Address& sender, MTProtocols protocol, int fd);
1040 UDPPeer* createServerPeer(Address& sender);
1041 bool deletePeer(u16 peer_id, bool timeout);
1043 void SetPeerID(u16 id){ m_peer_id = id; }
1045 void sendAck(u16 peer_id, u8 channelnum, u16 seqnum);
1047 void PrintInfo(std::ostream &out);
1050 std::list<u16> getPeerIDs();
1052 UDPSocket m_udpSocket;
1053 MutexedQueue<ConnectionCommand> m_command_queue;
1055 void putEvent(ConnectionEvent &e);
1058 { m_sendThread.Trigger(); }
1060 std::list<Peer*> getPeers();
1062 MutexedQueue<ConnectionEvent> m_event_queue;
1067 std::map<u16, Peer*> m_peers;
1068 JMutex m_peers_mutex;
1070 ConnectionSendThread m_sendThread;
1071 ConnectionReceiveThread m_receiveThread;
1073 JMutex m_info_mutex;
1075 // Backwards compatibility
1076 PeerHandler *m_bc_peerhandler;
1077 int m_bc_receive_timeout;
1079 bool m_shutting_down;
1081 u16 m_next_remote_peer_id;