3 Copyright (C) 2013 celeron55, Perttu Ahola <celeron55@gmail.com>
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation; either version 2.1 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU Lesser General Public License for more details.
15 You should have received a copy of the GNU Lesser General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 #ifndef CONNECTION_HEADER
21 #define CONNECTION_HEADER
23 #include "irrlichttypes_bloated.h"
25 #include "exceptions.h"
26 #include "constants.h"
27 #include "util/pointer.h"
28 #include "util/container.h"
29 #include "util/thread.h"
30 #include "util/numeric.h"
42 class NotFoundException : public BaseException
45 NotFoundException(const char *s):
50 class PeerNotFoundException : public BaseException
53 PeerNotFoundException(const char *s):
58 class ConnectionException : public BaseException
61 ConnectionException(const char *s):
66 class ConnectionBindFailed : public BaseException
69 ConnectionBindFailed(const char *s):
74 /*class ThrottlingException : public BaseException
77 ThrottlingException(const char *s):
82 class InvalidIncomingDataException : public BaseException
85 InvalidIncomingDataException(const char *s):
90 class InvalidOutgoingDataException : public BaseException
93 InvalidOutgoingDataException(const char *s):
98 class NoIncomingDataException : public BaseException
101 NoIncomingDataException(const char *s):
106 class ProcessedSilentlyException : public BaseException
109 ProcessedSilentlyException(const char *s):
114 class ProcessedQueued : public BaseException
117 ProcessedQueued(const char *s):
122 class IncomingDataCorruption : public BaseException
125 IncomingDataCorruption(const char *s):
130 typedef enum MTProtocols {
133 MINETEST_RELIABLE_UDP
136 #define SEQNUM_MAX 65535
137 inline bool seqnum_higher(u16 totest, u16 base)
141 if((totest - base) > (SEQNUM_MAX/2))
148 if((base - totest) > (SEQNUM_MAX/2))
155 inline bool seqnum_in_window(u16 seqnum, u16 next,u16 window_size)
157 u16 window_start = next;
158 u16 window_end = ( next + window_size ) % (SEQNUM_MAX+1);
160 if (window_start < window_end)
162 return ((seqnum >= window_start) && (seqnum < window_end));
166 return ((seqnum < window_end) || (seqnum >= window_start));
170 struct BufferedPacket
172 BufferedPacket(u8 *a_data, u32 a_size):
173 data(a_data, a_size), time(0.0), totaltime(0.0), absolute_send_time(-1)
175 BufferedPacket(u32 a_size):
176 data(a_size), time(0.0), totaltime(0.0), absolute_send_time(-1)
178 SharedBuffer<u8> data; // Data of the packet, including headers
179 float time; // Seconds from buffering the packet or re-sending
180 float totaltime; // Seconds from buffering the packet
181 unsigned int absolute_send_time;
182 Address address; // Sender or destination
185 // This adds the base headers to the data and makes a packet out of it
186 BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
187 u32 protocol_id, u16 sender_peer_id, u8 channel);
188 BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
189 u32 protocol_id, u16 sender_peer_id, u8 channel);
191 // Add the TYPE_ORIGINAL header to the data
192 SharedBuffer<u8> makeOriginalPacket(
193 SharedBuffer<u8> data);
195 // Split data in chunks and add TYPE_SPLIT headers to them
196 std::list<SharedBuffer<u8> > makeSplitPacket(
197 SharedBuffer<u8> data,
201 // Depending on size, make a TYPE_ORIGINAL or TYPE_SPLIT packet
202 // Increments split_seqnum if a split packet is made
203 std::list<SharedBuffer<u8> > makeAutoSplitPacket(
204 SharedBuffer<u8> data,
208 // Add the TYPE_RELIABLE header to the data
209 SharedBuffer<u8> makeReliablePacket(
210 SharedBuffer<u8> data,
213 struct IncomingSplitPacket
215 IncomingSplitPacket()
220 // Key is chunk number, value is data without headers
221 std::map<u16, SharedBuffer<u8> > chunks;
223 float time; // Seconds from adding
224 bool reliable; // If true, isn't deleted on timeout
228 return (chunks.size() == chunk_count);
235 A packet is sent through a channel to a peer with a basic header:
236 TODO: Should we have a receiver_peer_id also?
239 [4] u16 sender_peer_id
243 value 0 (PEER_ID_INEXISTENT) is reserved for making new connections
244 value 1 (PEER_ID_SERVER) is reserved for server
245 these constants are defined in constants.h
247 The lower the number, the higher the priority is.
248 Only channels 0, 1 and 2 exist.
250 #define BASE_HEADER_SIZE 7
251 #define CHANNEL_COUNT 3
255 CONTROL: This is a packet used by the protocol.
256 - When this is processed, nothing is handed to the user.
260 controltype and data description:
263 CONTROLTYPE_SET_PEER_ID
266 - There is no actual reply, but this can be sent in a reliable
267 packet to get a reply
270 #define TYPE_CONTROL 0
271 #define CONTROLTYPE_ACK 0
272 #define CONTROLTYPE_SET_PEER_ID 1
273 #define CONTROLTYPE_PING 2
274 #define CONTROLTYPE_DISCO 3
275 #define CONTROLTYPE_ENABLE_BIG_SEND_WINDOW 4
278 ORIGINAL: This is a plain packet with no control and no error
280 - When this is processed, it is directly handed to the user.
284 #define TYPE_ORIGINAL 1
285 #define ORIGINAL_HEADER_SIZE 1
287 SPLIT: These are sequences of packets forming one bigger piece of
289 - When processed and all the packet_nums 0...packet_count-1 are
290 present (this should be buffered), the resulting data shall be
291 directly handed to the user.
292 - If the data fails to come up in a reasonable time, the buffer shall
293 be silently discarded.
294 - These can be sent as-is or atop of a RELIABLE packet stream.
303 RELIABLE: Delivery of all RELIABLE packets shall be forced by ACKs,
304 and they shall be delivered in the same order as sent. This is done
305 with a buffer in the receiving and transmitting end.
306 - When this is processed, the contents of each packet is recursively
307 processed as packets.
313 #define TYPE_RELIABLE 3
314 #define RELIABLE_HEADER_SIZE 3
315 #define SEQNUM_INITIAL 65500
318 A buffer which stores reliable packets and sorts them internally
319 for fast access to the smallest one.
322 typedef std::list<BufferedPacket>::iterator RPBSearchResult;
324 class ReliablePacketBuffer
327 ReliablePacketBuffer();
329 bool getFirstSeqnum(u16& result);
331 BufferedPacket popFirst();
332 BufferedPacket popSeqnum(u16 seqnum);
333 void insert(BufferedPacket &p,u16 next_expected);
335 void incrementTimeouts(float dtime);
336 std::list<BufferedPacket> getTimedOuts(float timeout,
337 unsigned int max_packets);
341 bool containsPacket(u16 seqnum);
342 RPBSearchResult notFound();
347 RPBSearchResult findPacket(u16 seqnum);
349 std::list<BufferedPacket> m_list;
352 u16 m_oldest_non_answered_ack;
356 unsigned int writeptr;
360 A buffer for reconstructing split packets
363 class IncomingSplitBuffer
366 ~IncomingSplitBuffer();
368 Returns a reference counted buffer of length != 0 when a full split
369 packet is constructed. If not, returns one of length 0.
371 SharedBuffer<u8> insert(BufferedPacket &p, bool reliable);
373 void removeUnreliableTimedOuts(float dtime, float timeout);
377 std::map<u16, IncomingSplitPacket*> m_buf;
382 struct OutgoingPacket
386 SharedBuffer<u8> data;
390 OutgoingPacket(u16 peer_id_, u8 channelnum_, SharedBuffer<u8> data_,
391 bool reliable_,bool ack_=false):
393 channelnum(channelnum_),
401 enum ConnectionCommandType{
406 CONNCMD_DISCONNECT_PEER,
412 CONCMD_DISABLE_LEGACY
415 struct ConnectionCommand
417 enum ConnectionCommandType type;
426 ConnectionCommand(): type(CONNCMD_NONE), peer_id(PEER_ID_INEXISTENT), reliable(false), raw(false) {}
428 void serve(u16 port_)
430 type = CONNCMD_SERVE;
433 void connect(Address address_)
435 type = CONNCMD_CONNECT;
440 type = CONNCMD_DISCONNECT;
442 void disconnect_peer(u16 peer_id_)
444 type = CONNCMD_DISCONNECT_PEER;
447 void send(u16 peer_id_, u8 channelnum_,
448 SharedBuffer<u8> data_, bool reliable_)
452 channelnum = channelnum_;
454 reliable = reliable_;
456 void sendToAll(u8 channelnum_, SharedBuffer<u8> data_, bool reliable_)
458 type = CONNCMD_SEND_TO_ALL;
459 channelnum = channelnum_;
461 reliable = reliable_;
463 void deletePeer(u16 peer_id_)
465 type = CONNCMD_DELETE_PEER;
469 void ack(u16 peer_id_, u8 channelnum_, SharedBuffer<u8> data_)
473 channelnum = channelnum_;
478 void createPeer(u16 peer_id_, SharedBuffer<u8> data_)
480 type = CONCMD_CREATE_PEER;
488 void disableLegacy(u16 peer_id_, SharedBuffer<u8> data_)
490 type = CONCMD_DISABLE_LEGACY;
503 u16 readNextIncomingSeqNum();
504 u16 incNextIncomingSeqNum();
506 u16 getOutgoingSequenceNumber(bool& successfull);
507 u16 readOutgoingSequenceNumber();
508 bool putBackSequenceNumber(u16);
510 u16 readNextSplitSeqNum();
511 void setNextSplitSeqNum(u16 seqnum);
513 // This is for buffering the incoming packets that are coming in
515 ReliablePacketBuffer incoming_reliables;
516 // This is for buffering the sent packets so that the sender can
517 // re-send them if no ACK is received
518 ReliablePacketBuffer outgoing_reliables_sent;
520 //queued reliable packets
521 Queue<BufferedPacket> queued_reliables;
523 //queue commands prior splitting to packets
524 Queue<ConnectionCommand> queued_commands;
526 IncomingSplitBuffer incoming_splits;
531 void UpdatePacketLossCounter(unsigned int count);
532 void UpdatePacketTooLateCounter();
533 void UpdateBytesSent(unsigned int bytes,unsigned int packages=1);
534 void UpdateBytesLost(unsigned int bytes);
536 void UpdateTimers(float dtime);
538 const float getCurrentDownloadRateKB()
539 { JMutexAutoLock lock(m_internal_mutex); return cur_kbps; };
540 const float getMaxDownloadRateKB()
541 { JMutexAutoLock lock(m_internal_mutex); return max_kbps; };
543 const float getCurrentLossRateKB()
544 { JMutexAutoLock lock(m_internal_mutex); return cur_kbps_lost; };
545 const float getMaxLossRateKB()
546 { JMutexAutoLock lock(m_internal_mutex); return max_kbps_lost; };
548 const float getAvgDownloadRateKB()
549 { JMutexAutoLock lock(m_internal_mutex); return avg_kbps; };
550 const float getAvgLossRateKB()
551 { JMutexAutoLock lock(m_internal_mutex); return avg_kbps_lost; };
553 const unsigned int getWindowSize() const { return window_size; };
555 void setWindowSize(unsigned int size) { window_size = size; };
557 JMutex m_internal_mutex;
558 unsigned int window_size;
560 u16 next_incoming_seqnum;
562 u16 next_outgoing_seqnum;
563 u16 next_outgoing_split_seqnum;
565 unsigned int current_packet_loss;
566 unsigned int current_packet_too_late;
567 unsigned int current_packet_successfull;
568 float packet_loss_counter;
570 unsigned int current_bytes_transfered;
571 unsigned int current_bytes_lost;
589 virtual ~PeerHandler()
594 This is called after the Peer has been inserted into the
595 Connection's peer container.
597 virtual void peerAdded(Peer *peer) = 0;
599 This is called before the Peer has been removed from the
600 Connection's peer container.
602 virtual void deletingPeer(Peer *peer, bool timeout) = 0;
609 PeerHelper(Peer* peer);
612 PeerHelper& operator=(Peer* peer);
613 Peer* operator->() const;
615 Peer* operator&() const;
616 bool operator!=(void* ptr);
624 typedef enum rtt_stat_type {
635 friend class PeerHelper;
637 Peer(Address address_,u16 id_,Connection* connection) :
639 m_increment_packets_remaining(9),
640 m_increment_bytes_remaining(0),
641 m_pending_deletion(false),
642 m_connection(connection),
647 m_timeout_counter(0.0),
648 m_last_timeout_check(porting::getTimeMs()),
649 m_has_sent_with_id(false)
651 m_rtt.avg_rtt = -1.0;
652 m_rtt.jitter_avg = -1.0;
653 m_rtt.jitter_max = 0.0;
655 m_rtt.jitter_min = FLT_MAX;
656 m_rtt.min_rtt = FLT_MAX;
660 JMutexAutoLock usage_lock(m_exclusive_access_mutex);
661 assert(m_usage == 0);
664 // Unique id of the peer
669 virtual void PutReliableSendCommand(ConnectionCommand &c,
670 unsigned int max_packet_size) {};
672 virtual bool isActive() { return false; };
674 virtual bool getAddress(MTProtocols type, Address& toset) = 0;
677 {JMutexAutoLock lock(m_exclusive_access_mutex); m_timeout_counter=0.0; };
679 bool isTimedOut(float timeout);
682 { JMutexAutoLock lock(m_exclusive_access_mutex); m_has_sent_with_id = true; };
685 { JMutexAutoLock lock(m_exclusive_access_mutex); return m_has_sent_with_id; };
687 unsigned int m_increment_packets_remaining;
688 unsigned int m_increment_bytes_remaining;
690 virtual u16 getNextSplitSequenceNumber(u8 channel) { return 0; };
691 virtual void setNextSplitSequenceNumber(u8 channel, u16 seqnum) {};
692 virtual SharedBuffer<u8> addSpiltPacket(u8 channel,
693 BufferedPacket toadd,
696 fprintf(stderr,"Peer: addSplitPacket called, this is supposed to be never called!\n");
697 return SharedBuffer<u8>(0);
700 virtual bool Ping(float dtime, SharedBuffer<u8>& data) { return false; };
702 virtual float getStat(rtt_stat_type type) const {
705 return m_rtt.min_rtt;
707 return m_rtt.max_rtt;
709 return m_rtt.avg_rtt;
711 return m_rtt.jitter_min;
713 return m_rtt.jitter_max;
715 return m_rtt.jitter_avg;
720 virtual void reportRTT(float rtt) {};
722 void RTTStatistics(float rtt,
723 std::string profiler_id="",
724 unsigned int num_samples=1000);
729 JMutex m_exclusive_access_mutex;
731 bool m_pending_deletion;
733 Connection* m_connection;
735 // Address of the peer
754 // current usage count
755 unsigned int m_usage;
757 // Seconds from last receive
758 float m_timeout_counter;
760 u32 m_last_timeout_check;
762 bool m_has_sent_with_id;
765 class UDPPeer : public Peer
769 friend class PeerHelper;
770 friend class ConnectionReceiveThread;
771 friend class ConnectionSendThread;
773 UDPPeer(u16 a_id, Address a_address, Connection* connection);
776 void PutReliableSendCommand(ConnectionCommand &c,
777 unsigned int max_packet_size);
780 { return ((hasSentWithID()) && (!m_pending_deletion)); };
782 bool getAddress(MTProtocols type, Address& toset);
784 void setNonLegacyPeer()
785 { m_legacy_peer = false; }
788 { return m_legacy_peer; }
790 u16 getNextSplitSequenceNumber(u8 channel);
791 void setNextSplitSequenceNumber(u8 channel, u16 seqnum);
793 SharedBuffer<u8> addSpiltPacket(u8 channel,
794 BufferedPacket toadd,
798 Calculates avg_rtt and resend_timeout.
799 rtt=-1 only recalculates resend_timeout
801 void reportRTT(float rtt);
803 void RunCommandQueues(
804 unsigned int max_packet_size,
805 unsigned int maxcommands,
806 unsigned int maxtransfer);
808 float getResendTimeout()
809 { JMutexAutoLock lock(m_exclusive_access_mutex); return resend_timeout; }
811 void setResendTimeout(float timeout)
812 { JMutexAutoLock lock(m_exclusive_access_mutex); resend_timeout = timeout; }
813 bool Ping(float dtime,SharedBuffer<u8>& data);
815 Channel channels[CHANNEL_COUNT];
817 // This is changed dynamically
818 float resend_timeout;
820 bool processReliableSendCommand(
821 ConnectionCommand &c,
822 unsigned int max_packet_size);
831 enum ConnectionEventType{
833 CONNEVENT_DATA_RECEIVED,
834 CONNEVENT_PEER_ADDED,
835 CONNEVENT_PEER_REMOVED,
836 CONNEVENT_BIND_FAILED,
839 struct ConnectionEvent
841 enum ConnectionEventType type;
847 ConnectionEvent(): type(CONNEVENT_NONE) {}
849 std::string describe()
853 return "CONNEVENT_NONE";
854 case CONNEVENT_DATA_RECEIVED:
855 return "CONNEVENT_DATA_RECEIVED";
856 case CONNEVENT_PEER_ADDED:
857 return "CONNEVENT_PEER_ADDED";
858 case CONNEVENT_PEER_REMOVED:
859 return "CONNEVENT_PEER_REMOVED";
860 case CONNEVENT_BIND_FAILED:
861 return "CONNEVENT_BIND_FAILED";
863 return "Invalid ConnectionEvent";
866 void dataReceived(u16 peer_id_, SharedBuffer<u8> data_)
868 type = CONNEVENT_DATA_RECEIVED;
872 void peerAdded(u16 peer_id_, Address address_)
874 type = CONNEVENT_PEER_ADDED;
878 void peerRemoved(u16 peer_id_, bool timeout_, Address address_)
880 type = CONNEVENT_PEER_REMOVED;
887 type = CONNEVENT_BIND_FAILED;
891 class ConnectionSendThread : public JThread {
894 friend class UDPPeer;
896 ConnectionSendThread(Connection* parent,
897 unsigned int max_packet_size, float timeout);
903 void setPeerTimeout(float peer_timeout)
904 { m_timeout = peer_timeout; }
907 void runTimeouts (float dtime);
908 void rawSend (const BufferedPacket &packet);
909 bool rawSendAsPacket(u16 peer_id, u8 channelnum,
910 SharedBuffer<u8> data, bool reliable);
912 void processReliableCommand (ConnectionCommand &c);
913 void processNonReliableCommand (ConnectionCommand &c);
914 void serve (u16 port);
915 void connect (Address address);
917 void disconnect_peer(u16 peer_id);
918 void send (u16 peer_id, u8 channelnum,
919 SharedBuffer<u8> data);
920 void sendReliable (ConnectionCommand &c);
921 void sendToAll (u8 channelnum,
922 SharedBuffer<u8> data);
923 void sendToAllReliable(ConnectionCommand &c);
925 void sendPackets (float dtime);
927 void sendAsPacket (u16 peer_id, u8 channelnum,
928 SharedBuffer<u8> data,bool ack=false);
930 void sendAsPacketReliable(BufferedPacket& p, Channel* channel);
932 bool packetsQueued();
934 Connection* m_connection;
935 unsigned int m_max_packet_size;
937 Queue<OutgoingPacket> m_outgoing_queue;
938 JSemaphore m_send_sleep_semaphore;
940 unsigned int m_iteration_packets_avaialble;
941 unsigned int m_max_commands_per_iteration;
942 unsigned int m_max_data_packets_per_iteration;
943 unsigned int m_max_packets_requeued;
946 class ConnectionReceiveThread : public JThread {
948 ConnectionReceiveThread(Connection* parent,
949 unsigned int max_packet_size);
956 // Returns next data from a buffer if possible
957 // If found, returns true; if not, false.
958 // If found, sets peer_id and dst
959 bool getFromBuffers (u16 &peer_id, SharedBuffer<u8> &dst);
961 bool checkIncomingBuffers(Channel *channel, u16 &peer_id,
962 SharedBuffer<u8> &dst);
965 Processes a packet with the basic header stripped out.
967 packetdata: Data in packet (with no base headers)
968 peer_id: peer id of the sender of the packet in question
969 channelnum: channel on which the packet was sent
970 reliable: true if recursing into a reliable packet
972 SharedBuffer<u8> processPacket(Channel *channel,
973 SharedBuffer<u8> packetdata, u16 peer_id,
974 u8 channelnum, bool reliable);
977 Connection* m_connection;
978 unsigned int m_max_packet_size;
984 friend class ConnectionSendThread;
985 friend class ConnectionReceiveThread;
987 Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6);
988 Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6,
989 PeerHandler *peerhandler);
993 ConnectionEvent getEvent();
994 ConnectionEvent waitEvent(u32 timeout_ms);
995 void putCommand(ConnectionCommand &c);
997 void SetTimeoutMs(int timeout){ m_bc_receive_timeout = timeout; }
998 void Serve(unsigned short port);
999 void Connect(Address address);
1002 u32 Receive(u16 &peer_id, SharedBuffer<u8> &data);
1003 void SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable);
1004 void Send(u16 peer_id, u8 channelnum, SharedBuffer<u8> data, bool reliable);
1005 void RunTimeouts(float dtime); // dummy
1006 u16 GetPeerID(){ return m_peer_id; }
1007 Address GetPeerAddress(u16 peer_id);
1008 float GetPeerAvgRTT(u16 peer_id);
1009 void DeletePeer(u16 peer_id);
1010 const u32 GetProtocolID() const { return m_protocol_id; };
1011 const std::string getDesc();
1014 PeerHelper getPeer(u16 peer_id);
1015 PeerHelper getPeerNoEx(u16 peer_id);
1016 u16 lookupPeer(Address& sender);
1018 u16 createPeer(Address& sender, MTProtocols protocol, int fd);
1019 UDPPeer* createServerPeer(Address& sender);
1020 bool deletePeer(u16 peer_id, bool timeout);
1022 void SetPeerID(u16 id){ m_peer_id = id; }
1024 void sendAck(u16 peer_id, u8 channelnum, u16 seqnum);
1026 void PrintInfo(std::ostream &out);
1029 std::list<u16> getPeerIDs();
1031 UDPSocket m_udpSocket;
1032 MutexedQueue<ConnectionCommand> m_command_queue;
1034 void putEvent(ConnectionEvent &e);
1037 std::list<Peer*> getPeers();
1039 MutexedQueue<ConnectionEvent> m_event_queue;
1044 std::map<u16, Peer*> m_peers;
1045 JMutex m_peers_mutex;
1047 ConnectionSendThread m_sendThread;
1048 ConnectionReceiveThread m_receiveThread;
1050 JMutex m_info_mutex;
1052 // Backwards compatibility
1053 PeerHandler *m_bc_peerhandler;
1054 int m_bc_receive_timeout;
1056 bool m_shutting_down;