3 Copyright (C) 2013 celeron55, Perttu Ahola <celeron55@gmail.com>
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation; either version 2.1 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU Lesser General Public License for more details.
15 You should have received a copy of the GNU Lesser General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 #ifndef CONNECTION_HEADER
21 #define CONNECTION_HEADER
23 #include "irrlichttypes_bloated.h"
25 #include "exceptions.h"
26 #include "constants.h"
27 #include "util/pointer.h"
28 #include "util/container.h"
29 #include "util/thread.h"
30 #include "util/numeric.h"
42 class NotFoundException : public BaseException
45 NotFoundException(const char *s):
50 class PeerNotFoundException : public BaseException
53 PeerNotFoundException(const char *s):
58 class ConnectionException : public BaseException
61 ConnectionException(const char *s):
66 class ConnectionBindFailed : public BaseException
69 ConnectionBindFailed(const char *s):
74 class InvalidIncomingDataException : public BaseException
77 InvalidIncomingDataException(const char *s):
82 class InvalidOutgoingDataException : public BaseException
85 InvalidOutgoingDataException(const char *s):
90 class NoIncomingDataException : public BaseException
93 NoIncomingDataException(const char *s):
98 class ProcessedSilentlyException : public BaseException
101 ProcessedSilentlyException(const char *s):
106 class ProcessedQueued : public BaseException
109 ProcessedQueued(const char *s):
114 class IncomingDataCorruption : public BaseException
117 IncomingDataCorruption(const char *s):
122 typedef enum MTProtocols {
125 MTP_MINETEST_RELIABLE_UDP
128 #define SEQNUM_MAX 65535
129 inline bool seqnum_higher(u16 totest, u16 base)
133 if((totest - base) > (SEQNUM_MAX/2))
140 if((base - totest) > (SEQNUM_MAX/2))
147 inline bool seqnum_in_window(u16 seqnum, u16 next,u16 window_size)
149 u16 window_start = next;
150 u16 window_end = ( next + window_size ) % (SEQNUM_MAX+1);
152 if (window_start < window_end)
154 return ((seqnum >= window_start) && (seqnum < window_end));
158 return ((seqnum < window_end) || (seqnum >= window_start));
162 struct BufferedPacket
164 BufferedPacket(u8 *a_data, u32 a_size):
165 data(a_data, a_size), time(0.0), totaltime(0.0), absolute_send_time(-1),
168 BufferedPacket(u32 a_size):
169 data(a_size), time(0.0), totaltime(0.0), absolute_send_time(-1),
172 SharedBuffer<u8> data; // Data of the packet, including headers
173 float time; // Seconds from buffering the packet or re-sending
174 float totaltime; // Seconds from buffering the packet
175 unsigned int absolute_send_time;
176 Address address; // Sender or destination
177 unsigned int resend_count;
180 // This adds the base headers to the data and makes a packet out of it
181 BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
182 u32 protocol_id, u16 sender_peer_id, u8 channel);
183 BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
184 u32 protocol_id, u16 sender_peer_id, u8 channel);
186 // Add the TYPE_ORIGINAL header to the data
187 SharedBuffer<u8> makeOriginalPacket(
188 SharedBuffer<u8> data);
190 // Split data in chunks and add TYPE_SPLIT headers to them
191 std::list<SharedBuffer<u8> > makeSplitPacket(
192 SharedBuffer<u8> data,
196 // Depending on size, make a TYPE_ORIGINAL or TYPE_SPLIT packet
197 // Increments split_seqnum if a split packet is made
198 std::list<SharedBuffer<u8> > makeAutoSplitPacket(
199 SharedBuffer<u8> data,
203 // Add the TYPE_RELIABLE header to the data
204 SharedBuffer<u8> makeReliablePacket(
205 SharedBuffer<u8> data,
208 struct IncomingSplitPacket
210 IncomingSplitPacket()
215 // Key is chunk number, value is data without headers
216 std::map<u16, SharedBuffer<u8> > chunks;
218 float time; // Seconds from adding
219 bool reliable; // If true, isn't deleted on timeout
223 return (chunks.size() == chunk_count);
230 A packet is sent through a channel to a peer with a basic header:
231 TODO: Should we have a receiver_peer_id also?
234 [4] u16 sender_peer_id
238 value 0 (PEER_ID_INEXISTENT) is reserved for making new connections
239 value 1 (PEER_ID_SERVER) is reserved for server
240 these constants are defined in constants.h
242 The lower the number, the higher the priority is.
243 Only channels 0, 1 and 2 exist.
245 #define BASE_HEADER_SIZE 7
246 #define CHANNEL_COUNT 3
250 CONTROL: This is a packet used by the protocol.
251 - When this is processed, nothing is handed to the user.
255 controltype and data description:
258 CONTROLTYPE_SET_PEER_ID
261 - There is no actual reply, but this can be sent in a reliable
262 packet to get a reply
265 #define TYPE_CONTROL 0
266 #define CONTROLTYPE_ACK 0
267 #define CONTROLTYPE_SET_PEER_ID 1
268 #define CONTROLTYPE_PING 2
269 #define CONTROLTYPE_DISCO 3
270 #define CONTROLTYPE_ENABLE_BIG_SEND_WINDOW 4
273 ORIGINAL: This is a plain packet with no control and no error
275 - When this is processed, it is directly handed to the user.
279 #define TYPE_ORIGINAL 1
280 #define ORIGINAL_HEADER_SIZE 1
282 SPLIT: These are sequences of packets forming one bigger piece of
284 - When processed and all the packet_nums 0...packet_count-1 are
285 present (this should be buffered), the resulting data shall be
286 directly handed to the user.
287 - If the data fails to come up in a reasonable time, the buffer shall
288 be silently discarded.
289 - These can be sent as-is or atop of a RELIABLE packet stream.
298 RELIABLE: Delivery of all RELIABLE packets shall be forced by ACKs,
299 and they shall be delivered in the same order as sent. This is done
300 with a buffer in the receiving and transmitting end.
301 - When this is processed, the contents of each packet is recursively
302 processed as packets.
308 #define TYPE_RELIABLE 3
309 #define RELIABLE_HEADER_SIZE 3
310 #define SEQNUM_INITIAL 65500
313 A buffer which stores reliable packets and sorts them internally
314 for fast access to the smallest one.
317 typedef std::list<BufferedPacket>::iterator RPBSearchResult;
319 class ReliablePacketBuffer
322 ReliablePacketBuffer();
324 bool getFirstSeqnum(u16& result);
326 BufferedPacket popFirst();
327 BufferedPacket popSeqnum(u16 seqnum);
328 void insert(BufferedPacket &p,u16 next_expected);
330 void incrementTimeouts(float dtime);
331 std::list<BufferedPacket> getTimedOuts(float timeout,
332 unsigned int max_packets);
336 bool containsPacket(u16 seqnum);
337 RPBSearchResult notFound();
342 RPBSearchResult findPacket(u16 seqnum);
344 std::list<BufferedPacket> m_list;
347 u16 m_oldest_non_answered_ack;
353 A buffer for reconstructing split packets
356 class IncomingSplitBuffer
359 ~IncomingSplitBuffer();
361 Returns a reference counted buffer of length != 0 when a full split
362 packet is constructed. If not, returns one of length 0.
364 SharedBuffer<u8> insert(BufferedPacket &p, bool reliable);
366 void removeUnreliableTimedOuts(float dtime, float timeout);
370 std::map<u16, IncomingSplitPacket*> m_buf;
375 struct OutgoingPacket
379 SharedBuffer<u8> data;
383 OutgoingPacket(u16 peer_id_, u8 channelnum_, SharedBuffer<u8> data_,
384 bool reliable_,bool ack_=false):
386 channelnum(channelnum_),
394 enum ConnectionCommandType{
399 CONNCMD_DISCONNECT_PEER,
404 CONCMD_DISABLE_LEGACY
407 struct ConnectionCommand
409 enum ConnectionCommandType type;
417 ConnectionCommand(): type(CONNCMD_NONE), peer_id(PEER_ID_INEXISTENT), reliable(false), raw(false) {}
419 void serve(Address address_)
421 type = CONNCMD_SERVE;
424 void connect(Address address_)
426 type = CONNCMD_CONNECT;
431 type = CONNCMD_DISCONNECT;
433 void disconnect_peer(u16 peer_id_)
435 type = CONNCMD_DISCONNECT_PEER;
438 void send(u16 peer_id_, u8 channelnum_,
439 SharedBuffer<u8> data_, bool reliable_)
443 channelnum = channelnum_;
445 reliable = reliable_;
447 void sendToAll(u8 channelnum_, SharedBuffer<u8> data_, bool reliable_)
449 type = CONNCMD_SEND_TO_ALL;
450 channelnum = channelnum_;
452 reliable = reliable_;
455 void ack(u16 peer_id_, u8 channelnum_, SharedBuffer<u8> data_)
459 channelnum = channelnum_;
464 void createPeer(u16 peer_id_, SharedBuffer<u8> data_)
466 type = CONCMD_CREATE_PEER;
474 void disableLegacy(u16 peer_id_, SharedBuffer<u8> data_)
476 type = CONCMD_DISABLE_LEGACY;
489 u16 readNextIncomingSeqNum();
490 u16 incNextIncomingSeqNum();
492 u16 getOutgoingSequenceNumber(bool& successfull);
493 u16 readOutgoingSequenceNumber();
494 bool putBackSequenceNumber(u16);
496 u16 readNextSplitSeqNum();
497 void setNextSplitSeqNum(u16 seqnum);
499 // This is for buffering the incoming packets that are coming in
501 ReliablePacketBuffer incoming_reliables;
502 // This is for buffering the sent packets so that the sender can
503 // re-send them if no ACK is received
504 ReliablePacketBuffer outgoing_reliables_sent;
506 //queued reliable packets
507 Queue<BufferedPacket> queued_reliables;
509 //queue commands prior splitting to packets
510 Queue<ConnectionCommand> queued_commands;
512 IncomingSplitBuffer incoming_splits;
517 void UpdatePacketLossCounter(unsigned int count);
518 void UpdatePacketTooLateCounter();
519 void UpdateBytesSent(unsigned int bytes,unsigned int packages=1);
520 void UpdateBytesLost(unsigned int bytes);
521 void UpdateBytesReceived(unsigned int bytes);
523 void UpdateTimers(float dtime, bool legacy_peer);
525 const float getCurrentDownloadRateKB()
526 { JMutexAutoLock lock(m_internal_mutex); return cur_kbps; };
527 const float getMaxDownloadRateKB()
528 { JMutexAutoLock lock(m_internal_mutex); return max_kbps; };
530 const float getCurrentLossRateKB()
531 { JMutexAutoLock lock(m_internal_mutex); return cur_kbps_lost; };
532 const float getMaxLossRateKB()
533 { JMutexAutoLock lock(m_internal_mutex); return max_kbps_lost; };
535 const float getCurrentIncomingRateKB()
536 { JMutexAutoLock lock(m_internal_mutex); return cur_incoming_kbps; };
537 const float getMaxIncomingRateKB()
538 { JMutexAutoLock lock(m_internal_mutex); return max_incoming_kbps; };
540 const float getAvgDownloadRateKB()
541 { JMutexAutoLock lock(m_internal_mutex); return avg_kbps; };
542 const float getAvgLossRateKB()
543 { JMutexAutoLock lock(m_internal_mutex); return avg_kbps_lost; };
544 const float getAvgIncomingRateKB()
545 { JMutexAutoLock lock(m_internal_mutex); return avg_incoming_kbps; };
547 const unsigned int getWindowSize() const { return window_size; };
549 void setWindowSize(unsigned int size) { window_size = size; };
551 JMutex m_internal_mutex;
554 u16 next_incoming_seqnum;
556 u16 next_outgoing_seqnum;
557 u16 next_outgoing_split_seqnum;
559 unsigned int current_packet_loss;
560 unsigned int current_packet_too_late;
561 unsigned int current_packet_successfull;
562 float packet_loss_counter;
564 unsigned int current_bytes_transfered;
565 unsigned int current_bytes_received;
566 unsigned int current_bytes_lost;
570 float max_incoming_kbps;
571 float cur_incoming_kbps;
572 float avg_incoming_kbps;
578 unsigned int rate_samples;
602 virtual ~PeerHandler()
607 This is called after the Peer has been inserted into the
608 Connection's peer container.
610 virtual void peerAdded(Peer *peer) = 0;
612 This is called before the Peer has been removed from the
613 Connection's peer container.
615 virtual void deletingPeer(Peer *peer, bool timeout) = 0;
622 PeerHelper(Peer* peer);
625 PeerHelper& operator=(Peer* peer);
626 Peer* operator->() const;
628 Peer* operator&() const;
629 bool operator!=(void* ptr);
657 friend class PeerHelper;
659 Peer(Address address_,u16 id_,Connection* connection) :
661 m_increment_packets_remaining(9),
662 m_increment_bytes_remaining(0),
663 m_pending_deletion(false),
664 m_connection(connection),
669 m_timeout_counter(0.0),
670 m_last_timeout_check(porting::getTimeMs()),
671 m_has_sent_with_id(false)
673 m_rtt.avg_rtt = -1.0;
674 m_rtt.jitter_avg = -1.0;
675 m_rtt.jitter_max = 0.0;
677 m_rtt.jitter_min = FLT_MAX;
678 m_rtt.min_rtt = FLT_MAX;
682 JMutexAutoLock usage_lock(m_exclusive_access_mutex);
683 assert(m_usage == 0);
686 // Unique id of the peer
691 virtual void PutReliableSendCommand(ConnectionCommand &c,
692 unsigned int max_packet_size) {};
694 virtual bool isActive() { return false; };
696 virtual bool getAddress(MTProtocols type, Address& toset) = 0;
699 {JMutexAutoLock lock(m_exclusive_access_mutex); m_timeout_counter=0.0; };
701 bool isTimedOut(float timeout);
704 { JMutexAutoLock lock(m_exclusive_access_mutex); m_has_sent_with_id = true; };
707 { JMutexAutoLock lock(m_exclusive_access_mutex); return m_has_sent_with_id; };
709 unsigned int m_increment_packets_remaining;
710 unsigned int m_increment_bytes_remaining;
712 virtual u16 getNextSplitSequenceNumber(u8 channel) { return 0; };
713 virtual void setNextSplitSequenceNumber(u8 channel, u16 seqnum) {};
714 virtual SharedBuffer<u8> addSpiltPacket(u8 channel,
715 BufferedPacket toadd,
718 fprintf(stderr,"Peer: addSplitPacket called, this is supposed to be never called!\n");
719 return SharedBuffer<u8>(0);
722 virtual bool Ping(float dtime, SharedBuffer<u8>& data) { return false; };
724 virtual float getStat(rtt_stat_type type) const {
727 return m_rtt.min_rtt;
729 return m_rtt.max_rtt;
731 return m_rtt.avg_rtt;
733 return m_rtt.jitter_min;
735 return m_rtt.jitter_max;
737 return m_rtt.jitter_avg;
742 virtual void reportRTT(float rtt) {};
744 void RTTStatistics(float rtt,
745 std::string profiler_id="",
746 unsigned int num_samples=1000);
751 JMutex m_exclusive_access_mutex;
753 bool m_pending_deletion;
755 Connection* m_connection;
757 // Address of the peer
776 // current usage count
777 unsigned int m_usage;
779 // Seconds from last receive
780 float m_timeout_counter;
782 u32 m_last_timeout_check;
784 bool m_has_sent_with_id;
787 class UDPPeer : public Peer
791 friend class PeerHelper;
792 friend class ConnectionReceiveThread;
793 friend class ConnectionSendThread;
794 friend class Connection;
796 UDPPeer(u16 a_id, Address a_address, Connection* connection);
797 virtual ~UDPPeer() {};
799 void PutReliableSendCommand(ConnectionCommand &c,
800 unsigned int max_packet_size);
803 { return ((hasSentWithID()) && (!m_pending_deletion)); };
805 bool getAddress(MTProtocols type, Address& toset);
807 void setNonLegacyPeer();
810 { return m_legacy_peer; }
812 u16 getNextSplitSequenceNumber(u8 channel);
813 void setNextSplitSequenceNumber(u8 channel, u16 seqnum);
815 SharedBuffer<u8> addSpiltPacket(u8 channel,
816 BufferedPacket toadd,
822 Calculates avg_rtt and resend_timeout.
823 rtt=-1 only recalculates resend_timeout
825 void reportRTT(float rtt);
827 void RunCommandQueues(
828 unsigned int max_packet_size,
829 unsigned int maxcommands,
830 unsigned int maxtransfer);
832 float getResendTimeout()
833 { JMutexAutoLock lock(m_exclusive_access_mutex); return resend_timeout; }
835 void setResendTimeout(float timeout)
836 { JMutexAutoLock lock(m_exclusive_access_mutex); resend_timeout = timeout; }
837 bool Ping(float dtime,SharedBuffer<u8>& data);
839 Channel channels[CHANNEL_COUNT];
840 bool m_pending_disconnect;
842 // This is changed dynamically
843 float resend_timeout;
845 bool processReliableSendCommand(
846 ConnectionCommand &c,
847 unsigned int max_packet_size);
856 enum ConnectionEventType{
858 CONNEVENT_DATA_RECEIVED,
859 CONNEVENT_PEER_ADDED,
860 CONNEVENT_PEER_REMOVED,
861 CONNEVENT_BIND_FAILED,
864 struct ConnectionEvent
866 enum ConnectionEventType type;
872 ConnectionEvent(): type(CONNEVENT_NONE) {}
874 std::string describe()
878 return "CONNEVENT_NONE";
879 case CONNEVENT_DATA_RECEIVED:
880 return "CONNEVENT_DATA_RECEIVED";
881 case CONNEVENT_PEER_ADDED:
882 return "CONNEVENT_PEER_ADDED";
883 case CONNEVENT_PEER_REMOVED:
884 return "CONNEVENT_PEER_REMOVED";
885 case CONNEVENT_BIND_FAILED:
886 return "CONNEVENT_BIND_FAILED";
888 return "Invalid ConnectionEvent";
891 void dataReceived(u16 peer_id_, SharedBuffer<u8> data_)
893 type = CONNEVENT_DATA_RECEIVED;
897 void peerAdded(u16 peer_id_, Address address_)
899 type = CONNEVENT_PEER_ADDED;
903 void peerRemoved(u16 peer_id_, bool timeout_, Address address_)
905 type = CONNEVENT_PEER_REMOVED;
912 type = CONNEVENT_BIND_FAILED;
916 class ConnectionSendThread : public JThread {
919 friend class UDPPeer;
921 ConnectionSendThread(Connection* parent,
922 unsigned int max_packet_size, float timeout);
928 void setPeerTimeout(float peer_timeout)
929 { m_timeout = peer_timeout; }
932 void runTimeouts (float dtime);
933 void rawSend (const BufferedPacket &packet);
934 bool rawSendAsPacket(u16 peer_id, u8 channelnum,
935 SharedBuffer<u8> data, bool reliable);
937 void processReliableCommand (ConnectionCommand &c);
938 void processNonReliableCommand (ConnectionCommand &c);
939 void serve (Address bind_address);
940 void connect (Address address);
942 void disconnect_peer(u16 peer_id);
943 void send (u16 peer_id, u8 channelnum,
944 SharedBuffer<u8> data);
945 void sendReliable (ConnectionCommand &c);
946 void sendToAll (u8 channelnum,
947 SharedBuffer<u8> data);
948 void sendToAllReliable(ConnectionCommand &c);
950 void sendPackets (float dtime);
952 void sendAsPacket (u16 peer_id, u8 channelnum,
953 SharedBuffer<u8> data,bool ack=false);
955 void sendAsPacketReliable(BufferedPacket& p, Channel* channel);
957 bool packetsQueued();
959 Connection* m_connection;
960 unsigned int m_max_packet_size;
962 Queue<OutgoingPacket> m_outgoing_queue;
963 JSemaphore m_send_sleep_semaphore;
965 unsigned int m_iteration_packets_avaialble;
966 unsigned int m_max_commands_per_iteration;
967 unsigned int m_max_data_packets_per_iteration;
968 unsigned int m_max_packets_requeued;
971 class ConnectionReceiveThread : public JThread {
973 ConnectionReceiveThread(Connection* parent,
974 unsigned int max_packet_size);
981 // Returns next data from a buffer if possible
982 // If found, returns true; if not, false.
983 // If found, sets peer_id and dst
984 bool getFromBuffers (u16 &peer_id, SharedBuffer<u8> &dst);
986 bool checkIncomingBuffers(Channel *channel, u16 &peer_id,
987 SharedBuffer<u8> &dst);
990 Processes a packet with the basic header stripped out.
992 packetdata: Data in packet (with no base headers)
993 peer_id: peer id of the sender of the packet in question
994 channelnum: channel on which the packet was sent
995 reliable: true if recursing into a reliable packet
997 SharedBuffer<u8> processPacket(Channel *channel,
998 SharedBuffer<u8> packetdata, u16 peer_id,
999 u8 channelnum, bool reliable);
1002 Connection* m_connection;
1008 friend class ConnectionSendThread;
1009 friend class ConnectionReceiveThread;
1011 Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6);
1012 Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6,
1013 PeerHandler *peerhandler);
1017 ConnectionEvent getEvent();
1018 ConnectionEvent waitEvent(u32 timeout_ms);
1019 void putCommand(ConnectionCommand &c);
1021 void SetTimeoutMs(int timeout){ m_bc_receive_timeout = timeout; }
1022 void Serve(Address bind_addr);
1023 void Connect(Address address);
1026 u32 Receive(u16 &peer_id, SharedBuffer<u8> &data);
1027 void SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable);
1028 void Send(u16 peer_id, u8 channelnum, SharedBuffer<u8> data, bool reliable);
1029 u16 GetPeerID(){ return m_peer_id; }
1030 Address GetPeerAddress(u16 peer_id);
1031 float getPeerStat(u16 peer_id, rtt_stat_type type);
1032 float getLocalStat(rate_stat_type type);
1033 const u32 GetProtocolID() const { return m_protocol_id; };
1034 const std::string getDesc();
1035 void DisconnectPeer(u16 peer_id);
1038 PeerHelper getPeer(u16 peer_id);
1039 PeerHelper getPeerNoEx(u16 peer_id);
1040 u16 lookupPeer(Address& sender);
1042 u16 createPeer(Address& sender, MTProtocols protocol, int fd);
1043 UDPPeer* createServerPeer(Address& sender);
1044 bool deletePeer(u16 peer_id, bool timeout);
1046 void SetPeerID(u16 id){ m_peer_id = id; }
1048 void sendAck(u16 peer_id, u8 channelnum, u16 seqnum);
1050 void PrintInfo(std::ostream &out);
1053 std::list<u16> getPeerIDs();
1055 UDPSocket m_udpSocket;
1056 MutexedQueue<ConnectionCommand> m_command_queue;
1058 void putEvent(ConnectionEvent &e);
1061 { m_sendThread.Trigger(); }
1063 std::list<Peer*> getPeers();
1065 MutexedQueue<ConnectionEvent> m_event_queue;
1070 std::map<u16, Peer*> m_peers;
1071 JMutex m_peers_mutex;
1073 ConnectionSendThread m_sendThread;
1074 ConnectionReceiveThread m_receiveThread;
1076 JMutex m_info_mutex;
1078 // Backwards compatibility
1079 PeerHandler *m_bc_peerhandler;
1080 int m_bc_receive_timeout;
1082 bool m_shutting_down;
1084 u16 m_next_remote_peer_id;