X-Git-Url: https://git.librecmc.org/?a=blobdiff_plain;f=src%2Fconnection.h;h=516702cb8e19daa5639b0db0749bfcc711df0117;hb=58e6d25e033c76dc91aaac18fdeda92ac23fe0e1;hp=6eb2f2824642cc86661fd4c788e5c090d515e366;hpb=7cfb71385d00d1cbbbfd9c76f6c01adafa9e648a;p=oweals%2Fminetest.git diff --git a/src/connection.h b/src/connection.h index 6eb2f2824..516702cb8 100644 --- a/src/connection.h +++ b/src/connection.h @@ -1,18 +1,18 @@ /* -Minetest-c55 -Copyright (C) 2010 celeron55, Perttu Ahola +Minetest +Copyright (C) 2013 celeron55, Perttu Ahola This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; either version 2 of the License, or +it under the terms of the GNU Lesser General Public License as published by +the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. +GNU Lesser General Public License for more details. -You should have received a copy of the GNU General Public License along +You should have received a copy of the GNU Lesser General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ @@ -20,14 +20,18 @@ with this program; if not, write to the Free Software Foundation, Inc., #ifndef CONNECTION_HEADER #define CONNECTION_HEADER -#include -#include -#include "debug.h" -#include "common_irrlicht.h" +#include "irrlichttypes_bloated.h" #include "socket.h" -#include "utility.h" #include "exceptions.h" #include "constants.h" +#include "util/pointer.h" +#include "util/container.h" +#include "util/thread.h" +#include "util/numeric.h" +#include +#include +#include +#include namespace con { @@ -59,13 +63,13 @@ public: {} }; -/*class ThrottlingException : public BaseException +class ConnectionBindFailed : public BaseException { public: - ThrottlingException(const char *s): + ConnectionBindFailed(const char *s): BaseException(s) {} -};*/ +}; class InvalidIncomingDataException : public BaseException { @@ -99,36 +103,78 @@ public: {} }; -inline u16 readPeerId(u8 *packetdata) +class ProcessedQueued : public BaseException { - return readU16(&packetdata[4]); -} -inline u8 readChannel(u8 *packetdata) +public: + ProcessedQueued(const char *s): + BaseException(s) + {} +}; + +class IncomingDataCorruption : public BaseException { - return readU8(&packetdata[6]); -} +public: + IncomingDataCorruption(const char *s): + BaseException(s) + {} +}; + +typedef enum MTProtocols { + MTP_PRIMARY, + MTP_UDP, + MTP_MINETEST_RELIABLE_UDP +} MTProtocols; #define SEQNUM_MAX 65535 -inline bool seqnum_higher(u16 higher, u16 lower) +inline bool seqnum_higher(u16 totest, u16 base) { - if(lower > higher && lower - higher > SEQNUM_MAX/2){ - return true; + if (totest > base) + { + if((totest - base) > (SEQNUM_MAX/2)) + return false; + else + return true; + } + else + { + if((base - totest) > (SEQNUM_MAX/2)) + return true; + else + return false; + } +} + +inline bool seqnum_in_window(u16 seqnum, u16 next,u16 window_size) +{ + u16 window_start = next; + u16 window_end = ( next + window_size ) % (SEQNUM_MAX+1); + + if (window_start < window_end) + { + return ((seqnum >= window_start) && (seqnum < window_end)); + } + else + { + return ((seqnum < window_end) || (seqnum >= window_start)); } - return (higher > lower); } struct BufferedPacket { BufferedPacket(u8 *a_data, u32 a_size): - data(a_data, a_size), time(0.0), totaltime(0.0) + data(a_data, a_size), time(0.0), totaltime(0.0), absolute_send_time(-1), + resend_count(0) {} BufferedPacket(u32 a_size): - data(a_size), time(0.0), totaltime(0.0) + data(a_size), time(0.0), totaltime(0.0), absolute_send_time(-1), + resend_count(0) {} SharedBuffer data; // Data of the packet, including headers float time; // Seconds from buffering the packet or re-sending float totaltime; // Seconds from buffering the packet + unsigned int absolute_send_time; Address address; // Sender or destination + unsigned int resend_count; }; // This adds the base headers to the data and makes a packet out of it @@ -142,14 +188,14 @@ SharedBuffer makeOriginalPacket( SharedBuffer data); // Split data in chunks and add TYPE_SPLIT headers to them -core::list > makeSplitPacket( +std::list > makeSplitPacket( SharedBuffer data, u32 chunksize_max, u16 seqnum); // Depending on size, make a TYPE_ORIGINAL or TYPE_SPLIT packet // Increments split_seqnum if a split packet is made -core::list > makeAutoSplitPacket( +std::list > makeAutoSplitPacket( SharedBuffer data, u32 chunksize_max, u16 &split_seqnum); @@ -167,7 +213,7 @@ struct IncomingSplitPacket reliable = false; } // Key is chunk number, value is data without headers - core::map > chunks; + std::map > chunks; u32 chunk_count; float time; // Seconds from adding bool reliable; // If true, isn't deleted on timeout @@ -189,15 +235,14 @@ TODO: Should we have a receiver_peer_id also? [6] u8 channel sender_peer_id: Unique to each peer. - value 0 is reserved for making new connections - value 1 is reserved for server + value 0 (PEER_ID_INEXISTENT) is reserved for making new connections + value 1 (PEER_ID_SERVER) is reserved for server + these constants are defined in constants.h channel: The lower the number, the higher the priority is. Only channels 0, 1 and 2 exist. */ #define BASE_HEADER_SIZE 7 -#define PEER_ID_INEXISTENT 0 -#define PEER_ID_SERVER 1 #define CHANNEL_COUNT 3 /* Packet types: @@ -222,6 +267,8 @@ controltype and data description: #define CONTROLTYPE_SET_PEER_ID 1 #define CONTROLTYPE_PING 2 #define CONTROLTYPE_DISCO 3 +#define CONTROLTYPE_ENABLE_BIG_SEND_WINDOW 4 + /* ORIGINAL: This is a plain packet with no control and no error checking at all. @@ -260,7 +307,6 @@ with a buffer in the receiving and transmitting end. */ #define TYPE_RELIABLE 3 #define RELIABLE_HEADER_SIZE 3 -//#define SEQNUM_INITIAL 0x10 #define SEQNUM_INITIAL 65500 /* @@ -268,28 +314,39 @@ with a buffer in the receiving and transmitting end. for fast access to the smallest one. */ -typedef core::list::Iterator RPBSearchResult; +typedef std::list::iterator RPBSearchResult; class ReliablePacketBuffer { public: - - void print(); - bool empty(); - u32 size(); - RPBSearchResult findPacket(u16 seqnum); - RPBSearchResult notFound(); - u16 getFirstSeqnum(); + ReliablePacketBuffer(); + + bool getFirstSeqnum(u16& result); + BufferedPacket popFirst(); BufferedPacket popSeqnum(u16 seqnum); - void insert(BufferedPacket &p); + void insert(BufferedPacket &p,u16 next_expected); + void incrementTimeouts(float dtime); - void resetTimedOuts(float timeout); - bool anyTotaltimeReached(float timeout); - core::list getTimedOuts(float timeout); + std::list getTimedOuts(float timeout, + unsigned int max_packets); + + void print(); + bool empty(); + bool containsPacket(u16 seqnum); + RPBSearchResult notFound(); + u32 size(); + private: - core::list m_list; + RPBSearchResult findPacket(u16 seqnum); + + std::list m_list; + u32 m_list_size; + + u16 m_oldest_non_answered_ack; + + JMutex m_list_mutex; }; /* @@ -310,64 +367,242 @@ public: private: // Key is seqnum - core::map m_buf; + std::map m_buf; + + JMutex m_map_mutex; }; -class Connection; +struct OutgoingPacket +{ + u16 peer_id; + u8 channelnum; + SharedBuffer data; + bool reliable; + bool ack; -struct Channel + OutgoingPacket(u16 peer_id_, u8 channelnum_, SharedBuffer data_, + bool reliable_,bool ack_=false): + peer_id(peer_id_), + channelnum(channelnum_), + data(data_), + reliable(reliable_), + ack(ack_) + { + } +}; + +enum ConnectionCommandType{ + CONNCMD_NONE, + CONNCMD_SERVE, + CONNCMD_CONNECT, + CONNCMD_DISCONNECT, + CONNCMD_DISCONNECT_PEER, + CONNCMD_SEND, + CONNCMD_SEND_TO_ALL, + CONCMD_ACK, + CONCMD_CREATE_PEER, + CONCMD_DISABLE_LEGACY +}; + +struct ConnectionCommand { - Channel(); - ~Channel(); - /* - Processes a packet with the basic header stripped out. - Parameters: - packetdata: Data in packet (with no base headers) - con: The connection to which the channel is associated - (used for sending back stuff (ACKs)) - peer_id: peer id of the sender of the packet in question - channelnum: channel on which the packet was sent - reliable: true if recursing into a reliable packet - */ - SharedBuffer ProcessPacket( - SharedBuffer packetdata, - Connection *con, - u16 peer_id, - u8 channelnum, - bool reliable=false); - - // Returns next data from a buffer if possible - // throws a NoIncomingDataException if no data is available - // If found, sets peer_id - SharedBuffer CheckIncomingBuffers(Connection *con, - u16 &peer_id); + enum ConnectionCommandType type; + Address address; + u16 peer_id; + u8 channelnum; + Buffer data; + bool reliable; + bool raw; - u16 next_outgoing_seqnum; - u16 next_incoming_seqnum; - u16 next_outgoing_split_seqnum; + ConnectionCommand(): type(CONNCMD_NONE), peer_id(PEER_ID_INEXISTENT), reliable(false), raw(false) {} + + void serve(Address address_) + { + type = CONNCMD_SERVE; + address = address_; + } + void connect(Address address_) + { + type = CONNCMD_CONNECT; + address = address_; + } + void disconnect() + { + type = CONNCMD_DISCONNECT; + } + void disconnect_peer(u16 peer_id_) + { + type = CONNCMD_DISCONNECT_PEER; + peer_id = peer_id_; + } + void send(u16 peer_id_, u8 channelnum_, + SharedBuffer data_, bool reliable_) + { + type = CONNCMD_SEND; + peer_id = peer_id_; + channelnum = channelnum_; + data = data_; + reliable = reliable_; + } + void sendToAll(u8 channelnum_, SharedBuffer data_, bool reliable_) + { + type = CONNCMD_SEND_TO_ALL; + channelnum = channelnum_; + data = data_; + reliable = reliable_; + } + + void ack(u16 peer_id_, u8 channelnum_, SharedBuffer data_) + { + type = CONCMD_ACK; + peer_id = peer_id_; + channelnum = channelnum_; + data = data_; + reliable = false; + } + + void createPeer(u16 peer_id_, SharedBuffer data_) + { + type = CONCMD_CREATE_PEER; + peer_id = peer_id_; + data = data_; + channelnum = 0; + reliable = true; + raw = true; + } + + void disableLegacy(u16 peer_id_, SharedBuffer data_) + { + type = CONCMD_DISABLE_LEGACY; + peer_id = peer_id_; + data = data_; + channelnum = 0; + reliable = true; + raw = true; + } +}; + +class Channel +{ + +public: + u16 readNextIncomingSeqNum(); + u16 incNextIncomingSeqNum(); + + u16 getOutgoingSequenceNumber(bool& successfull); + u16 readOutgoingSequenceNumber(); + bool putBackSequenceNumber(u16); + + u16 readNextSplitSeqNum(); + void setNextSplitSeqNum(u16 seqnum); // This is for buffering the incoming packets that are coming in // the wrong order ReliablePacketBuffer incoming_reliables; // This is for buffering the sent packets so that the sender can // re-send them if no ACK is received - ReliablePacketBuffer outgoing_reliables; + ReliablePacketBuffer outgoing_reliables_sent; + + //queued reliable packets + Queue queued_reliables; + + //queue commands prior splitting to packets + Queue queued_commands; IncomingSplitBuffer incoming_splits; + + Channel(); + ~Channel(); + + void UpdatePacketLossCounter(unsigned int count); + void UpdatePacketTooLateCounter(); + void UpdateBytesSent(unsigned int bytes,unsigned int packages=1); + void UpdateBytesLost(unsigned int bytes); + void UpdateBytesReceived(unsigned int bytes); + + void UpdateTimers(float dtime, bool legacy_peer); + + const float getCurrentDownloadRateKB() + { JMutexAutoLock lock(m_internal_mutex); return cur_kbps; }; + const float getMaxDownloadRateKB() + { JMutexAutoLock lock(m_internal_mutex); return max_kbps; }; + + const float getCurrentLossRateKB() + { JMutexAutoLock lock(m_internal_mutex); return cur_kbps_lost; }; + const float getMaxLossRateKB() + { JMutexAutoLock lock(m_internal_mutex); return max_kbps_lost; }; + + const float getCurrentIncomingRateKB() + { JMutexAutoLock lock(m_internal_mutex); return cur_incoming_kbps; }; + const float getMaxIncomingRateKB() + { JMutexAutoLock lock(m_internal_mutex); return max_incoming_kbps; }; + + const float getAvgDownloadRateKB() + { JMutexAutoLock lock(m_internal_mutex); return avg_kbps; }; + const float getAvgLossRateKB() + { JMutexAutoLock lock(m_internal_mutex); return avg_kbps_lost; }; + const float getAvgIncomingRateKB() + { JMutexAutoLock lock(m_internal_mutex); return avg_incoming_kbps; }; + + const unsigned int getWindowSize() const { return window_size; }; + + void setWindowSize(unsigned int size) { window_size = size; }; +private: + JMutex m_internal_mutex; + int window_size; + + u16 next_incoming_seqnum; + + u16 next_outgoing_seqnum; + u16 next_outgoing_split_seqnum; + + unsigned int current_packet_loss; + unsigned int current_packet_too_late; + unsigned int current_packet_successfull; + float packet_loss_counter; + + unsigned int current_bytes_transfered; + unsigned int current_bytes_received; + unsigned int current_bytes_lost; + float max_kbps; + float cur_kbps; + float avg_kbps; + float max_incoming_kbps; + float cur_incoming_kbps; + float avg_incoming_kbps; + float max_kbps_lost; + float cur_kbps_lost; + float avg_kbps_lost; + float bpm_counter; + + unsigned int rate_samples; }; class Peer; +enum PeerChangeType +{ + PEER_ADDED, + PEER_REMOVED +}; +struct PeerChange +{ + PeerChangeType type; + u16 peer_id; + bool timeout; +}; + class PeerHandler { public: + PeerHandler() { } virtual ~PeerHandler() { } - + /* This is called after the Peer has been inserted into the Connection's peer container. @@ -380,110 +615,473 @@ public: virtual void deletingPeer(Peer *peer, bool timeout) = 0; }; -class Peer +class PeerHelper { public: + PeerHelper(); + PeerHelper(Peer* peer); + ~PeerHelper(); - Peer(u16 a_id, Address a_address); - virtual ~Peer(); - + PeerHelper& operator=(Peer* peer); + Peer* operator->() const; + bool operator!(); + Peer* operator&() const; + bool operator!=(void* ptr); + +private: + Peer* m_peer; +}; + +class Connection; + +typedef enum { + MIN_RTT, + MAX_RTT, + AVG_RTT, + MIN_JITTER, + MAX_JITTER, + AVG_JITTER +} rtt_stat_type; + +typedef enum { + CUR_DL_RATE, + AVG_DL_RATE, + CUR_INC_RATE, + AVG_INC_RATE, + CUR_LOSS_RATE, + AVG_LOSS_RATE, +} rate_stat_type; + +class Peer { + public: + friend class PeerHelper; + + Peer(Address address_,u16 id_,Connection* connection) : + id(id_), + m_increment_packets_remaining(9), + m_increment_bytes_remaining(0), + m_pending_deletion(false), + m_connection(connection), + address(address_), + m_ping_timer(0.0), + m_last_rtt(-1.0), + m_usage(0), + m_timeout_counter(0.0), + m_last_timeout_check(porting::getTimeMs()), + m_has_sent_with_id(false) + { + m_rtt.avg_rtt = -1.0; + m_rtt.jitter_avg = -1.0; + m_rtt.jitter_max = 0.0; + m_rtt.max_rtt = 0.0; + m_rtt.jitter_min = FLT_MAX; + m_rtt.min_rtt = FLT_MAX; + }; + + virtual ~Peer() { + JMutexAutoLock usage_lock(m_exclusive_access_mutex); + assert(m_usage == 0); + }; + + // Unique id of the peer + u16 id; + + void Drop(); + + virtual void PutReliableSendCommand(ConnectionCommand &c, + unsigned int max_packet_size) {}; + + virtual bool isActive() { return false; }; + + virtual bool getAddress(MTProtocols type, Address& toset) = 0; + + void ResetTimeout() + {JMutexAutoLock lock(m_exclusive_access_mutex); m_timeout_counter=0.0; }; + + bool isTimedOut(float timeout); + + void setSentWithID() + { JMutexAutoLock lock(m_exclusive_access_mutex); m_has_sent_with_id = true; }; + + bool hasSentWithID() + { JMutexAutoLock lock(m_exclusive_access_mutex); return m_has_sent_with_id; }; + + unsigned int m_increment_packets_remaining; + unsigned int m_increment_bytes_remaining; + + virtual u16 getNextSplitSequenceNumber(u8 channel) { return 0; }; + virtual void setNextSplitSequenceNumber(u8 channel, u16 seqnum) {}; + virtual SharedBuffer addSpiltPacket(u8 channel, + BufferedPacket toadd, + bool reliable) + { + fprintf(stderr,"Peer: addSplitPacket called, this is supposed to be never called!\n"); + return SharedBuffer(0); + }; + + virtual bool Ping(float dtime, SharedBuffer& data) { return false; }; + + virtual float getStat(rtt_stat_type type) const { + switch (type) { + case MIN_RTT: + return m_rtt.min_rtt; + case MAX_RTT: + return m_rtt.max_rtt; + case AVG_RTT: + return m_rtt.avg_rtt; + case MIN_JITTER: + return m_rtt.jitter_min; + case MAX_JITTER: + return m_rtt.jitter_max; + case AVG_JITTER: + return m_rtt.jitter_avg; + } + return -1; + } + protected: + virtual void reportRTT(float rtt) {}; + + void RTTStatistics(float rtt, + std::string profiler_id="", + unsigned int num_samples=1000); + + bool IncUseCount(); + void DecUseCount(); + + JMutex m_exclusive_access_mutex; + + bool m_pending_deletion; + + Connection* m_connection; + + // Address of the peer + Address address; + + // Ping timer + float m_ping_timer; + private: + + struct rttstats { + float jitter_min; + float jitter_max; + float jitter_avg; + float min_rtt; + float max_rtt; + float avg_rtt; + }; + + rttstats m_rtt; + float m_last_rtt; + + // current usage count + unsigned int m_usage; + + // Seconds from last receive + float m_timeout_counter; + + u32 m_last_timeout_check; + + bool m_has_sent_with_id; +}; + +class UDPPeer : public Peer +{ +public: + + friend class PeerHelper; + friend class ConnectionReceiveThread; + friend class ConnectionSendThread; + friend class Connection; + + UDPPeer(u16 a_id, Address a_address, Connection* connection); + virtual ~UDPPeer() {}; + + void PutReliableSendCommand(ConnectionCommand &c, + unsigned int max_packet_size); + + bool isActive() + { return ((hasSentWithID()) && (!m_pending_deletion)); }; + + bool getAddress(MTProtocols type, Address& toset); + + void setNonLegacyPeer(); + + bool getLegacyPeer() + { return m_legacy_peer; } + + u16 getNextSplitSequenceNumber(u8 channel); + void setNextSplitSequenceNumber(u8 channel, u16 seqnum); + + SharedBuffer addSpiltPacket(u8 channel, + BufferedPacket toadd, + bool reliable); + + +protected: /* Calculates avg_rtt and resend_timeout. - rtt=-1 only recalculates resend_timeout */ void reportRTT(float rtt); - Channel channels[CHANNEL_COUNT]; + void RunCommandQueues( + unsigned int max_packet_size, + unsigned int maxcommands, + unsigned int maxtransfer); - // Address of the peer - Address address; - // Unique id of the peer - u16 id; - // Seconds from last receive - float timeout_counter; - // Ping timer - float ping_timer; + float getResendTimeout() + { JMutexAutoLock lock(m_exclusive_access_mutex); return resend_timeout; } + + void setResendTimeout(float timeout) + { JMutexAutoLock lock(m_exclusive_access_mutex); resend_timeout = timeout; } + bool Ping(float dtime,SharedBuffer& data); + + Channel channels[CHANNEL_COUNT]; + bool m_pending_disconnect; +private: // This is changed dynamically float resend_timeout; - // Updated when an ACK is received - float avg_rtt; - // This is set to true when the peer has actually sent something - // with the id we have given to it - bool has_sent_with_id; + + bool processReliableSendCommand( + ConnectionCommand &c, + unsigned int max_packet_size); + + bool m_legacy_peer; +}; + +/* + Connection +*/ + +enum ConnectionEventType{ + CONNEVENT_NONE, + CONNEVENT_DATA_RECEIVED, + CONNEVENT_PEER_ADDED, + CONNEVENT_PEER_REMOVED, + CONNEVENT_BIND_FAILED, +}; + +struct ConnectionEvent +{ + enum ConnectionEventType type; + u16 peer_id; + Buffer data; + bool timeout; + Address address; + + ConnectionEvent(): type(CONNEVENT_NONE) {} + + std::string describe() + { + switch(type){ + case CONNEVENT_NONE: + return "CONNEVENT_NONE"; + case CONNEVENT_DATA_RECEIVED: + return "CONNEVENT_DATA_RECEIVED"; + case CONNEVENT_PEER_ADDED: + return "CONNEVENT_PEER_ADDED"; + case CONNEVENT_PEER_REMOVED: + return "CONNEVENT_PEER_REMOVED"; + case CONNEVENT_BIND_FAILED: + return "CONNEVENT_BIND_FAILED"; + } + return "Invalid ConnectionEvent"; + } + void dataReceived(u16 peer_id_, SharedBuffer data_) + { + type = CONNEVENT_DATA_RECEIVED; + peer_id = peer_id_; + data = data_; + } + void peerAdded(u16 peer_id_, Address address_) + { + type = CONNEVENT_PEER_ADDED; + peer_id = peer_id_; + address = address_; + } + void peerRemoved(u16 peer_id_, bool timeout_, Address address_) + { + type = CONNEVENT_PEER_REMOVED; + peer_id = peer_id_; + timeout = timeout_; + address = address_; + } + void bindFailed() + { + type = CONNEVENT_BIND_FAILED; + } +}; + +class ConnectionSendThread : public JThread { + +public: + friend class UDPPeer; + + ConnectionSendThread(Connection* parent, + unsigned int max_packet_size, float timeout); + + void * Thread (); + + void Trigger(); + + void setPeerTimeout(float peer_timeout) + { m_timeout = peer_timeout; } + private: + void runTimeouts (float dtime); + void rawSend (const BufferedPacket &packet); + bool rawSendAsPacket(u16 peer_id, u8 channelnum, + SharedBuffer data, bool reliable); + + void processReliableCommand (ConnectionCommand &c); + void processNonReliableCommand (ConnectionCommand &c); + void serve (Address bind_address); + void connect (Address address); + void disconnect (); + void disconnect_peer(u16 peer_id); + void send (u16 peer_id, u8 channelnum, + SharedBuffer data); + void sendReliable (ConnectionCommand &c); + void sendToAll (u8 channelnum, + SharedBuffer data); + void sendToAllReliable(ConnectionCommand &c); + + void sendPackets (float dtime); + + void sendAsPacket (u16 peer_id, u8 channelnum, + SharedBuffer data,bool ack=false); + + void sendAsPacketReliable(BufferedPacket& p, Channel* channel); + + bool packetsQueued(); + + Connection* m_connection; + unsigned int m_max_packet_size; + float m_timeout; + Queue m_outgoing_queue; + JSemaphore m_send_sleep_semaphore; + + unsigned int m_iteration_packets_avaialble; + unsigned int m_max_commands_per_iteration; + unsigned int m_max_data_packets_per_iteration; + unsigned int m_max_packets_requeued; +}; + +class ConnectionReceiveThread : public JThread { +public: + ConnectionReceiveThread(Connection* parent, + unsigned int max_packet_size); + + void * Thread (); + +private: + void receive (); + + // Returns next data from a buffer if possible + // If found, returns true; if not, false. + // If found, sets peer_id and dst + bool getFromBuffers (u16 &peer_id, SharedBuffer &dst); + + bool checkIncomingBuffers(Channel *channel, u16 &peer_id, + SharedBuffer &dst); + + /* + Processes a packet with the basic header stripped out. + Parameters: + packetdata: Data in packet (with no base headers) + peer_id: peer id of the sender of the packet in question + channelnum: channel on which the packet was sent + reliable: true if recursing into a reliable packet + */ + SharedBuffer processPacket(Channel *channel, + SharedBuffer packetdata, u16 peer_id, + u8 channelnum, bool reliable); + + + Connection* m_connection; }; class Connection { public: - Connection( - u32 protocol_id, - u32 max_packet_size, - float timeout, - PeerHandler *peerhandler - ); + friend class ConnectionSendThread; + friend class ConnectionReceiveThread; + + Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6); + Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6, + PeerHandler *peerhandler); ~Connection(); - void setTimeoutMs(int timeout){ m_socket.setTimeoutMs(timeout); } - // Start being a server - void Serve(unsigned short port); - // Connect to a server + + /* Interface */ + ConnectionEvent getEvent(); + ConnectionEvent waitEvent(u32 timeout_ms); + void putCommand(ConnectionCommand &c); + + void SetTimeoutMs(int timeout){ m_bc_receive_timeout = timeout; } + void Serve(Address bind_addr); void Connect(Address address); bool Connected(); - void Disconnect(); - - // Sets peer_id - SharedBuffer GetFromBuffers(u16 &peer_id); - - // The peer_id of sender is stored in peer_id - // Return value: I guess this always throws an exception or - // actually gets data - // May call PeerHandler methods - u32 Receive(u16 &peer_id, u8 *data, u32 datasize); - - // These will automatically package the data as an original or split + u32 Receive(u16 &peer_id, SharedBuffer &data); void SendToAll(u8 channelnum, SharedBuffer data, bool reliable); void Send(u16 peer_id, u8 channelnum, SharedBuffer data, bool reliable); - // Send data as a packet; it will be wrapped in base header and - // optionally to a reliable packet. - void SendAsPacket(u16 peer_id, u8 channelnum, - SharedBuffer data, bool reliable); - // Sends a raw packet - void RawSend(const BufferedPacket &packet); - - // May call PeerHandler methods - void RunTimeouts(float dtime); - - // Can throw a PeerNotFoundException - Peer* GetPeer(u16 peer_id); - // returns NULL if failed - Peer* GetPeerNoEx(u16 peer_id); - core::list GetPeers(); - - // Calls PeerHandler::deletingPeer - // Returns false if peer was not found + u16 GetPeerID(){ return m_peer_id; } + Address GetPeerAddress(u16 peer_id); + float getPeerStat(u16 peer_id, rtt_stat_type type); + float getLocalStat(rate_stat_type type); + const u32 GetProtocolID() const { return m_protocol_id; }; + const std::string getDesc(); + void DisconnectPeer(u16 peer_id); + +protected: + PeerHelper getPeer(u16 peer_id); + PeerHelper getPeerNoEx(u16 peer_id); + u16 lookupPeer(Address& sender); + + u16 createPeer(Address& sender, MTProtocols protocol, int fd); + UDPPeer* createServerPeer(Address& sender); bool deletePeer(u16 peer_id, bool timeout); void SetPeerID(u16 id){ m_peer_id = id; } - u16 GetPeerID(){ return m_peer_id; } - u32 GetProtocolID(){ return m_protocol_id; } - // For debug printing + void sendAck(u16 peer_id, u8 channelnum, u16 seqnum); + void PrintInfo(std::ostream &out); void PrintInfo(); - u16 m_indentation; + std::list getPeerIDs(); + + UDPSocket m_udpSocket; + MutexedQueue m_command_queue; + + void putEvent(ConnectionEvent &e); + + void TriggerSend() + { m_sendThread.Trigger(); } private: - u32 m_protocol_id; - float m_timeout; - PeerHandler *m_peerhandler; - core::map m_peers; + std::list getPeers(); + + MutexedQueue m_event_queue; + u16 m_peer_id; - //bool m_waiting_new_peer_id; - u32 m_max_packet_size; - UDPSocket m_socket; + u32 m_protocol_id; + + std::map m_peers; + JMutex m_peers_mutex; + + ConnectionSendThread m_sendThread; + ConnectionReceiveThread m_receiveThread; + + JMutex m_info_mutex; + + // Backwards compatibility + PeerHandler *m_bc_peerhandler; + int m_bc_receive_timeout; + + bool m_shutting_down; + + u16 m_next_remote_peer_id; }; } // namespace