/*
-Minetest-c55
-Copyright (C) 2010 celeron55, Perttu Ahola <celeron55@gmail.com>
+Minetest
+Copyright (C) 2013 celeron55, Perttu Ahola <celeron55@gmail.com>
This program is free software; you can redistribute it and/or modify
-it under the terms of the GNU General Public License as published by
-the Free Software Foundation; either version 2 of the License, or
+it under the terms of the GNU Lesser General Public License as published by
+the Free Software Foundation; either version 2.1 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-GNU General Public License for more details.
+GNU Lesser General Public License for more details.
-You should have received a copy of the GNU General Public License along
+You should have received a copy of the GNU Lesser General Public License along
with this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#ifndef CONNECTION_HEADER
#define CONNECTION_HEADER
-#include <iostream>
-#include <fstream>
-#include "debug.h"
-#include "common_irrlicht.h"
+#include "irrlichttypes_bloated.h"
#include "socket.h"
-#include "utility.h"
#include "exceptions.h"
#include "constants.h"
+#include "util/pointer.h"
+#include "util/container.h"
+#include "util/thread.h"
+#include "util/numeric.h"
+#include <iostream>
+#include <fstream>
+#include <list>
+#include <map>
namespace con
{
{}
};
-/*class ThrottlingException : public BaseException
+class ConnectionBindFailed : public BaseException
{
public:
- ThrottlingException(const char *s):
+ ConnectionBindFailed(const char *s):
BaseException(s)
{}
-};*/
+};
class InvalidIncomingDataException : public BaseException
{
{}
};
-class GotSplitPacketException
+class ProcessedQueued : public BaseException
{
- SharedBuffer<u8> m_data;
public:
- GotSplitPacketException(SharedBuffer<u8> data):
- m_data(data)
+ ProcessedQueued(const char *s):
+ BaseException(s)
{}
- SharedBuffer<u8> getData()
- {
- return m_data;
- }
};
-inline u16 readPeerId(u8 *packetdata)
+class IncomingDataCorruption : public BaseException
{
- return readU16(&packetdata[4]);
-}
-inline u8 readChannel(u8 *packetdata)
+public:
+ IncomingDataCorruption(const char *s):
+ BaseException(s)
+ {}
+};
+
+typedef enum MTProtocols {
+ MTP_PRIMARY,
+ MTP_UDP,
+ MTP_MINETEST_RELIABLE_UDP
+} MTProtocols;
+
+#define SEQNUM_MAX 65535
+inline bool seqnum_higher(u16 totest, u16 base)
{
- return readU8(&packetdata[6]);
+ if (totest > base)
+ {
+ if((totest - base) > (SEQNUM_MAX/2))
+ return false;
+ else
+ return true;
+ }
+ else
+ {
+ if((base - totest) > (SEQNUM_MAX/2))
+ return true;
+ else
+ return false;
+ }
}
-#define SEQNUM_MAX 65535
-inline bool seqnum_higher(u16 higher, u16 lower)
+inline bool seqnum_in_window(u16 seqnum, u16 next,u16 window_size)
{
- if(lower > higher && lower - higher > SEQNUM_MAX/2){
- return true;
+ u16 window_start = next;
+ u16 window_end = ( next + window_size ) % (SEQNUM_MAX+1);
+
+ if (window_start < window_end)
+ {
+ return ((seqnum >= window_start) && (seqnum < window_end));
+ }
+ else
+ {
+ return ((seqnum < window_end) || (seqnum >= window_start));
}
- return (higher > lower);
}
struct BufferedPacket
{
BufferedPacket(u8 *a_data, u32 a_size):
- data(a_data, a_size), time(0.0), totaltime(0.0)
+ data(a_data, a_size), time(0.0), totaltime(0.0), absolute_send_time(-1),
+ resend_count(0)
{}
BufferedPacket(u32 a_size):
- data(a_size), time(0.0), totaltime(0.0)
+ data(a_size), time(0.0), totaltime(0.0), absolute_send_time(-1),
+ resend_count(0)
{}
SharedBuffer<u8> data; // Data of the packet, including headers
float time; // Seconds from buffering the packet or re-sending
float totaltime; // Seconds from buffering the packet
+ unsigned int absolute_send_time;
Address address; // Sender or destination
+ unsigned int resend_count;
};
// This adds the base headers to the data and makes a packet out of it
SharedBuffer<u8> data);
// Split data in chunks and add TYPE_SPLIT headers to them
-core::list<SharedBuffer<u8> > makeSplitPacket(
+std::list<SharedBuffer<u8> > makeSplitPacket(
SharedBuffer<u8> data,
u32 chunksize_max,
u16 seqnum);
// Depending on size, make a TYPE_ORIGINAL or TYPE_SPLIT packet
// Increments split_seqnum if a split packet is made
-core::list<SharedBuffer<u8> > makeAutoSplitPacket(
+std::list<SharedBuffer<u8> > makeAutoSplitPacket(
SharedBuffer<u8> data,
u32 chunksize_max,
u16 &split_seqnum);
reliable = false;
}
// Key is chunk number, value is data without headers
- core::map<u16, SharedBuffer<u8> > chunks;
+ std::map<u16, SharedBuffer<u8> > chunks;
u32 chunk_count;
float time; // Seconds from adding
bool reliable; // If true, isn't deleted on timeout
[6] u8 channel
sender_peer_id:
Unique to each peer.
- value 0 is reserved for making new connections
- value 1 is reserved for server
+ value 0 (PEER_ID_INEXISTENT) is reserved for making new connections
+ value 1 (PEER_ID_SERVER) is reserved for server
+ these constants are defined in constants.h
channel:
The lower the number, the higher the priority is.
Only channels 0, 1 and 2 exist.
*/
#define BASE_HEADER_SIZE 7
-#define PEER_ID_NEW 0
-#define PEER_ID_SERVER 1
#define CHANNEL_COUNT 3
/*
Packet types:
CONTROLTYPE_SET_PEER_ID
[2] u16 peer_id_new
CONTROLTYPE_PING
- - This can be sent in a reliable packet to get a reply
+ - There is no actual reply, but this can be sent in a reliable
+ packet to get a reply
+ CONTROLTYPE_DISCO
*/
#define TYPE_CONTROL 0
#define CONTROLTYPE_ACK 0
#define CONTROLTYPE_SET_PEER_ID 1
#define CONTROLTYPE_PING 2
+#define CONTROLTYPE_DISCO 3
+#define CONTROLTYPE_ENABLE_BIG_SEND_WINDOW 4
+
/*
ORIGINAL: This is a plain packet with no control and no error
checking at all.
*/
#define TYPE_RELIABLE 3
#define RELIABLE_HEADER_SIZE 3
-//#define SEQNUM_INITIAL 0x10
#define SEQNUM_INITIAL 65500
/*
for fast access to the smallest one.
*/
-typedef core::list<BufferedPacket>::Iterator RPBSearchResult;
+typedef std::list<BufferedPacket>::iterator RPBSearchResult;
class ReliablePacketBuffer
{
public:
-
- void print();
- bool empty();
- u32 size();
- RPBSearchResult findPacket(u16 seqnum);
- RPBSearchResult notFound();
- u16 getFirstSeqnum();
+ ReliablePacketBuffer();
+
+ bool getFirstSeqnum(u16& result);
+
BufferedPacket popFirst();
BufferedPacket popSeqnum(u16 seqnum);
- void insert(BufferedPacket &p);
+ void insert(BufferedPacket &p,u16 next_expected);
+
void incrementTimeouts(float dtime);
- void resetTimedOuts(float timeout);
- bool anyTotaltimeReached(float timeout);
- core::list<BufferedPacket> getTimedOuts(float timeout);
+ std::list<BufferedPacket> getTimedOuts(float timeout,
+ unsigned int max_packets);
+
+ void print();
+ bool empty();
+ bool containsPacket(u16 seqnum);
+ RPBSearchResult notFound();
+ u32 size();
+
private:
- core::list<BufferedPacket> m_list;
+ RPBSearchResult findPacket(u16 seqnum);
+
+ std::list<BufferedPacket> m_list;
+ u32 m_list_size;
+
+ u16 m_oldest_non_answered_ack;
+
+ JMutex m_list_mutex;
};
/*
public:
~IncomingSplitBuffer();
/*
- This will throw a GotSplitPacketException when a full
- split packet is constructed.
+ Returns a reference counted buffer of length != 0 when a full split
+ packet is constructed. If not, returns one of length 0.
*/
- void insert(BufferedPacket &p, bool reliable);
+ SharedBuffer<u8> insert(BufferedPacket &p, bool reliable);
void removeUnreliableTimedOuts(float dtime, float timeout);
private:
// Key is seqnum
- core::map<u16, IncomingSplitPacket*> m_buf;
+ std::map<u16, IncomingSplitPacket*> m_buf;
+
+ JMutex m_map_mutex;
};
-class Connection;
+struct OutgoingPacket
+{
+ u16 peer_id;
+ u8 channelnum;
+ SharedBuffer<u8> data;
+ bool reliable;
+ bool ack;
+
+ OutgoingPacket(u16 peer_id_, u8 channelnum_, SharedBuffer<u8> data_,
+ bool reliable_,bool ack_=false):
+ peer_id(peer_id_),
+ channelnum(channelnum_),
+ data(data_),
+ reliable(reliable_),
+ ack(ack_)
+ {
+ }
+};
-struct Channel
+enum ConnectionCommandType{
+ CONNCMD_NONE,
+ CONNCMD_SERVE,
+ CONNCMD_CONNECT,
+ CONNCMD_DISCONNECT,
+ CONNCMD_DISCONNECT_PEER,
+ CONNCMD_SEND,
+ CONNCMD_SEND_TO_ALL,
+ CONCMD_ACK,
+ CONCMD_CREATE_PEER,
+ CONCMD_DISABLE_LEGACY
+};
+
+struct ConnectionCommand
{
- Channel();
- ~Channel();
- /*
- Processes a packet with the basic header stripped out.
- Parameters:
- packetdata: Data in packet (with no base headers)
- con: The connection to which the channel is associated
- (used for sending back stuff (ACKs))
- peer_id: peer id of the sender of the packet in question
- channelnum: channel on which the packet was sent
- reliable: true if recursing into a reliable packet
- */
- SharedBuffer<u8> ProcessPacket(
- SharedBuffer<u8> packetdata,
- Connection *con,
- u16 peer_id,
- u8 channelnum,
- bool reliable=false);
-
- // Returns next data from a buffer if possible
- // throws a NoIncomingDataException if no data is available
- // If found, sets peer_id
- SharedBuffer<u8> CheckIncomingBuffers(Connection *con,
- u16 &peer_id);
+ enum ConnectionCommandType type;
+ Address address;
+ u16 peer_id;
+ u8 channelnum;
+ Buffer<u8> data;
+ bool reliable;
+ bool raw;
- u16 next_outgoing_seqnum;
- u16 next_incoming_seqnum;
- u16 next_outgoing_split_seqnum;
+ ConnectionCommand(): type(CONNCMD_NONE), peer_id(PEER_ID_INEXISTENT), reliable(false), raw(false) {}
+
+ void serve(Address address_)
+ {
+ type = CONNCMD_SERVE;
+ address = address_;
+ }
+ void connect(Address address_)
+ {
+ type = CONNCMD_CONNECT;
+ address = address_;
+ }
+ void disconnect()
+ {
+ type = CONNCMD_DISCONNECT;
+ }
+ void disconnect_peer(u16 peer_id_)
+ {
+ type = CONNCMD_DISCONNECT_PEER;
+ peer_id = peer_id_;
+ }
+ void send(u16 peer_id_, u8 channelnum_,
+ SharedBuffer<u8> data_, bool reliable_)
+ {
+ type = CONNCMD_SEND;
+ peer_id = peer_id_;
+ channelnum = channelnum_;
+ data = data_;
+ reliable = reliable_;
+ }
+ void sendToAll(u8 channelnum_, SharedBuffer<u8> data_, bool reliable_)
+ {
+ type = CONNCMD_SEND_TO_ALL;
+ channelnum = channelnum_;
+ data = data_;
+ reliable = reliable_;
+ }
+
+ void ack(u16 peer_id_, u8 channelnum_, SharedBuffer<u8> data_)
+ {
+ type = CONCMD_ACK;
+ peer_id = peer_id_;
+ channelnum = channelnum_;
+ data = data_;
+ reliable = false;
+ }
+
+ void createPeer(u16 peer_id_, SharedBuffer<u8> data_)
+ {
+ type = CONCMD_CREATE_PEER;
+ peer_id = peer_id_;
+ data = data_;
+ channelnum = 0;
+ reliable = true;
+ raw = true;
+ }
+
+ void disableLegacy(u16 peer_id_, SharedBuffer<u8> data_)
+ {
+ type = CONCMD_DISABLE_LEGACY;
+ peer_id = peer_id_;
+ data = data_;
+ channelnum = 0;
+ reliable = true;
+ raw = true;
+ }
+};
+
+class Channel
+{
+
+public:
+ u16 readNextIncomingSeqNum();
+ u16 incNextIncomingSeqNum();
+
+ u16 getOutgoingSequenceNumber(bool& successfull);
+ u16 readOutgoingSequenceNumber();
+ bool putBackSequenceNumber(u16);
+
+ u16 readNextSplitSeqNum();
+ void setNextSplitSeqNum(u16 seqnum);
// This is for buffering the incoming packets that are coming in
// the wrong order
ReliablePacketBuffer incoming_reliables;
// This is for buffering the sent packets so that the sender can
// re-send them if no ACK is received
- ReliablePacketBuffer outgoing_reliables;
+ ReliablePacketBuffer outgoing_reliables_sent;
+
+ //queued reliable packets
+ Queue<BufferedPacket> queued_reliables;
+
+ //queue commands prior splitting to packets
+ Queue<ConnectionCommand> queued_commands;
IncomingSplitBuffer incoming_splits;
+
+ Channel();
+ ~Channel();
+
+ void UpdatePacketLossCounter(unsigned int count);
+ void UpdatePacketTooLateCounter();
+ void UpdateBytesSent(unsigned int bytes,unsigned int packages=1);
+ void UpdateBytesLost(unsigned int bytes);
+ void UpdateBytesReceived(unsigned int bytes);
+
+ void UpdateTimers(float dtime, bool legacy_peer);
+
+ const float getCurrentDownloadRateKB()
+ { JMutexAutoLock lock(m_internal_mutex); return cur_kbps; };
+ const float getMaxDownloadRateKB()
+ { JMutexAutoLock lock(m_internal_mutex); return max_kbps; };
+
+ const float getCurrentLossRateKB()
+ { JMutexAutoLock lock(m_internal_mutex); return cur_kbps_lost; };
+ const float getMaxLossRateKB()
+ { JMutexAutoLock lock(m_internal_mutex); return max_kbps_lost; };
+
+ const float getCurrentIncomingRateKB()
+ { JMutexAutoLock lock(m_internal_mutex); return cur_incoming_kbps; };
+ const float getMaxIncomingRateKB()
+ { JMutexAutoLock lock(m_internal_mutex); return max_incoming_kbps; };
+
+ const float getAvgDownloadRateKB()
+ { JMutexAutoLock lock(m_internal_mutex); return avg_kbps; };
+ const float getAvgLossRateKB()
+ { JMutexAutoLock lock(m_internal_mutex); return avg_kbps_lost; };
+ const float getAvgIncomingRateKB()
+ { JMutexAutoLock lock(m_internal_mutex); return avg_incoming_kbps; };
+
+ const unsigned int getWindowSize() const { return window_size; };
+
+ void setWindowSize(unsigned int size) { window_size = size; };
+private:
+ JMutex m_internal_mutex;
+ int window_size;
+
+ u16 next_incoming_seqnum;
+
+ u16 next_outgoing_seqnum;
+ u16 next_outgoing_split_seqnum;
+
+ unsigned int current_packet_loss;
+ unsigned int current_packet_too_late;
+ unsigned int current_packet_successfull;
+ float packet_loss_counter;
+
+ unsigned int current_bytes_transfered;
+ unsigned int current_bytes_received;
+ unsigned int current_bytes_lost;
+ float max_kbps;
+ float cur_kbps;
+ float avg_kbps;
+ float max_incoming_kbps;
+ float cur_incoming_kbps;
+ float avg_incoming_kbps;
+ float max_kbps_lost;
+ float cur_kbps_lost;
+ float avg_kbps_lost;
+ float bpm_counter;
+
+ unsigned int rate_samples;
};
class Peer;
+enum PeerChangeType
+{
+ PEER_ADDED,
+ PEER_REMOVED
+};
+struct PeerChange
+{
+ PeerChangeType type;
+ u16 peer_id;
+ bool timeout;
+};
+
class PeerHandler
{
public:
+
PeerHandler()
{
}
virtual ~PeerHandler()
{
}
-
+
/*
This is called after the Peer has been inserted into the
Connection's peer container.
virtual void deletingPeer(Peer *peer, bool timeout) = 0;
};
-class Peer
+class PeerHelper
{
public:
+ PeerHelper();
+ PeerHelper(Peer* peer);
+ ~PeerHelper();
- Peer(u16 a_id, Address a_address);
- virtual ~Peer();
-
+ PeerHelper& operator=(Peer* peer);
+ Peer* operator->() const;
+ bool operator!();
+ Peer* operator&() const;
+ bool operator!=(void* ptr);
+
+private:
+ Peer* m_peer;
+};
+
+class Connection;
+
+typedef enum {
+ MIN_RTT,
+ MAX_RTT,
+ AVG_RTT,
+ MIN_JITTER,
+ MAX_JITTER,
+ AVG_JITTER
+} rtt_stat_type;
+
+typedef enum {
+ CUR_DL_RATE,
+ AVG_DL_RATE,
+ CUR_INC_RATE,
+ AVG_INC_RATE,
+ CUR_LOSS_RATE,
+ AVG_LOSS_RATE,
+} rate_stat_type;
+
+class Peer {
+ public:
+ friend class PeerHelper;
+
+ Peer(Address address_,u16 id_,Connection* connection) :
+ id(id_),
+ m_increment_packets_remaining(9),
+ m_increment_bytes_remaining(0),
+ m_pending_deletion(false),
+ m_connection(connection),
+ address(address_),
+ m_ping_timer(0.0),
+ m_last_rtt(-1.0),
+ m_usage(0),
+ m_timeout_counter(0.0),
+ m_last_timeout_check(porting::getTimeMs()),
+ m_has_sent_with_id(false)
+ {
+ m_rtt.avg_rtt = -1.0;
+ m_rtt.jitter_avg = -1.0;
+ m_rtt.jitter_max = 0.0;
+ m_rtt.max_rtt = 0.0;
+ m_rtt.jitter_min = FLT_MAX;
+ m_rtt.min_rtt = FLT_MAX;
+ };
+
+ virtual ~Peer() {
+ JMutexAutoLock usage_lock(m_exclusive_access_mutex);
+ assert(m_usage == 0);
+ };
+
+ // Unique id of the peer
+ u16 id;
+
+ void Drop();
+
+ virtual void PutReliableSendCommand(ConnectionCommand &c,
+ unsigned int max_packet_size) {};
+
+ virtual bool isActive() { return false; };
+
+ virtual bool getAddress(MTProtocols type, Address& toset) = 0;
+
+ void ResetTimeout()
+ {JMutexAutoLock lock(m_exclusive_access_mutex); m_timeout_counter=0.0; };
+
+ bool isTimedOut(float timeout);
+
+ void setSentWithID()
+ { JMutexAutoLock lock(m_exclusive_access_mutex); m_has_sent_with_id = true; };
+
+ bool hasSentWithID()
+ { JMutexAutoLock lock(m_exclusive_access_mutex); return m_has_sent_with_id; };
+
+ unsigned int m_increment_packets_remaining;
+ unsigned int m_increment_bytes_remaining;
+
+ virtual u16 getNextSplitSequenceNumber(u8 channel) { return 0; };
+ virtual void setNextSplitSequenceNumber(u8 channel, u16 seqnum) {};
+ virtual SharedBuffer<u8> addSpiltPacket(u8 channel,
+ BufferedPacket toadd,
+ bool reliable)
+ {
+ fprintf(stderr,"Peer: addSplitPacket called, this is supposed to be never called!\n");
+ return SharedBuffer<u8>(0);
+ };
+
+ virtual bool Ping(float dtime, SharedBuffer<u8>& data) { return false; };
+
+ virtual float getStat(rtt_stat_type type) const {
+ switch (type) {
+ case MIN_RTT:
+ return m_rtt.min_rtt;
+ case MAX_RTT:
+ return m_rtt.max_rtt;
+ case AVG_RTT:
+ return m_rtt.avg_rtt;
+ case MIN_JITTER:
+ return m_rtt.jitter_min;
+ case MAX_JITTER:
+ return m_rtt.jitter_max;
+ case AVG_JITTER:
+ return m_rtt.jitter_avg;
+ }
+ return -1;
+ }
+ protected:
+ virtual void reportRTT(float rtt) {};
+
+ void RTTStatistics(float rtt,
+ std::string profiler_id="",
+ unsigned int num_samples=1000);
+
+ bool IncUseCount();
+ void DecUseCount();
+
+ JMutex m_exclusive_access_mutex;
+
+ bool m_pending_deletion;
+
+ Connection* m_connection;
+
+ // Address of the peer
+ Address address;
+
+ // Ping timer
+ float m_ping_timer;
+ private:
+
+ struct rttstats {
+ float jitter_min;
+ float jitter_max;
+ float jitter_avg;
+ float min_rtt;
+ float max_rtt;
+ float avg_rtt;
+ };
+
+ rttstats m_rtt;
+ float m_last_rtt;
+
+ // current usage count
+ unsigned int m_usage;
+
+ // Seconds from last receive
+ float m_timeout_counter;
+
+ u32 m_last_timeout_check;
+
+ bool m_has_sent_with_id;
+};
+
+class UDPPeer : public Peer
+{
+public:
+
+ friend class PeerHelper;
+ friend class ConnectionReceiveThread;
+ friend class ConnectionSendThread;
+ friend class Connection;
+
+ UDPPeer(u16 a_id, Address a_address, Connection* connection);
+ virtual ~UDPPeer() {};
+
+ void PutReliableSendCommand(ConnectionCommand &c,
+ unsigned int max_packet_size);
+
+ bool isActive()
+ { return ((hasSentWithID()) && (!m_pending_deletion)); };
+
+ bool getAddress(MTProtocols type, Address& toset);
+
+ void setNonLegacyPeer();
+
+ bool getLegacyPeer()
+ { return m_legacy_peer; }
+
+ u16 getNextSplitSequenceNumber(u8 channel);
+ void setNextSplitSequenceNumber(u8 channel, u16 seqnum);
+
+ SharedBuffer<u8> addSpiltPacket(u8 channel,
+ BufferedPacket toadd,
+ bool reliable);
+
+
+protected:
/*
Calculates avg_rtt and resend_timeout.
-
rtt=-1 only recalculates resend_timeout
*/
void reportRTT(float rtt);
- Channel channels[CHANNEL_COUNT];
+ void RunCommandQueues(
+ unsigned int max_packet_size,
+ unsigned int maxcommands,
+ unsigned int maxtransfer);
- // Address of the peer
- Address address;
- // Unique id of the peer
- u16 id;
- // Seconds from last receive
- float timeout_counter;
- // Ping timer
- float ping_timer;
+ float getResendTimeout()
+ { JMutexAutoLock lock(m_exclusive_access_mutex); return resend_timeout; }
+
+ void setResendTimeout(float timeout)
+ { JMutexAutoLock lock(m_exclusive_access_mutex); resend_timeout = timeout; }
+ bool Ping(float dtime,SharedBuffer<u8>& data);
+
+ Channel channels[CHANNEL_COUNT];
+ bool m_pending_disconnect;
+private:
// This is changed dynamically
float resend_timeout;
- // Updated when an ACK is received
- float avg_rtt;
- // This is set to true when the peer has actually sent something
- // with the id we have given to it
- bool has_sent_with_id;
+
+ bool processReliableSendCommand(
+ ConnectionCommand &c,
+ unsigned int max_packet_size);
+
+ bool m_legacy_peer;
+};
+
+/*
+ Connection
+*/
+
+enum ConnectionEventType{
+ CONNEVENT_NONE,
+ CONNEVENT_DATA_RECEIVED,
+ CONNEVENT_PEER_ADDED,
+ CONNEVENT_PEER_REMOVED,
+ CONNEVENT_BIND_FAILED,
+};
+
+struct ConnectionEvent
+{
+ enum ConnectionEventType type;
+ u16 peer_id;
+ Buffer<u8> data;
+ bool timeout;
+ Address address;
+
+ ConnectionEvent(): type(CONNEVENT_NONE) {}
+
+ std::string describe()
+ {
+ switch(type){
+ case CONNEVENT_NONE:
+ return "CONNEVENT_NONE";
+ case CONNEVENT_DATA_RECEIVED:
+ return "CONNEVENT_DATA_RECEIVED";
+ case CONNEVENT_PEER_ADDED:
+ return "CONNEVENT_PEER_ADDED";
+ case CONNEVENT_PEER_REMOVED:
+ return "CONNEVENT_PEER_REMOVED";
+ case CONNEVENT_BIND_FAILED:
+ return "CONNEVENT_BIND_FAILED";
+ }
+ return "Invalid ConnectionEvent";
+ }
+ void dataReceived(u16 peer_id_, SharedBuffer<u8> data_)
+ {
+ type = CONNEVENT_DATA_RECEIVED;
+ peer_id = peer_id_;
+ data = data_;
+ }
+ void peerAdded(u16 peer_id_, Address address_)
+ {
+ type = CONNEVENT_PEER_ADDED;
+ peer_id = peer_id_;
+ address = address_;
+ }
+ void peerRemoved(u16 peer_id_, bool timeout_, Address address_)
+ {
+ type = CONNEVENT_PEER_REMOVED;
+ peer_id = peer_id_;
+ timeout = timeout_;
+ address = address_;
+ }
+ void bindFailed()
+ {
+ type = CONNEVENT_BIND_FAILED;
+ }
+};
+
+class ConnectionSendThread : public JThread {
+
+public:
+ friend class UDPPeer;
+
+ ConnectionSendThread(Connection* parent,
+ unsigned int max_packet_size, float timeout);
+
+ void * Thread ();
+
+ void Trigger();
+
+ void setPeerTimeout(float peer_timeout)
+ { m_timeout = peer_timeout; }
+
+private:
+ void runTimeouts (float dtime);
+ void rawSend (const BufferedPacket &packet);
+ bool rawSendAsPacket(u16 peer_id, u8 channelnum,
+ SharedBuffer<u8> data, bool reliable);
+
+ void processReliableCommand (ConnectionCommand &c);
+ void processNonReliableCommand (ConnectionCommand &c);
+ void serve (Address bind_address);
+ void connect (Address address);
+ void disconnect ();
+ void disconnect_peer(u16 peer_id);
+ void send (u16 peer_id, u8 channelnum,
+ SharedBuffer<u8> data);
+ void sendReliable (ConnectionCommand &c);
+ void sendToAll (u8 channelnum,
+ SharedBuffer<u8> data);
+ void sendToAllReliable(ConnectionCommand &c);
+
+ void sendPackets (float dtime);
+
+ void sendAsPacket (u16 peer_id, u8 channelnum,
+ SharedBuffer<u8> data,bool ack=false);
+
+ void sendAsPacketReliable(BufferedPacket& p, Channel* channel);
+
+ bool packetsQueued();
+
+ Connection* m_connection;
+ unsigned int m_max_packet_size;
+ float m_timeout;
+ Queue<OutgoingPacket> m_outgoing_queue;
+ JSemaphore m_send_sleep_semaphore;
+
+ unsigned int m_iteration_packets_avaialble;
+ unsigned int m_max_commands_per_iteration;
+ unsigned int m_max_data_packets_per_iteration;
+ unsigned int m_max_packets_requeued;
+};
+
+class ConnectionReceiveThread : public JThread {
+public:
+ ConnectionReceiveThread(Connection* parent,
+ unsigned int max_packet_size);
+
+ void * Thread ();
+
private:
+ void receive ();
+
+ // Returns next data from a buffer if possible
+ // If found, returns true; if not, false.
+ // If found, sets peer_id and dst
+ bool getFromBuffers (u16 &peer_id, SharedBuffer<u8> &dst);
+
+ bool checkIncomingBuffers(Channel *channel, u16 &peer_id,
+ SharedBuffer<u8> &dst);
+
+ /*
+ Processes a packet with the basic header stripped out.
+ Parameters:
+ packetdata: Data in packet (with no base headers)
+ peer_id: peer id of the sender of the packet in question
+ channelnum: channel on which the packet was sent
+ reliable: true if recursing into a reliable packet
+ */
+ SharedBuffer<u8> processPacket(Channel *channel,
+ SharedBuffer<u8> packetdata, u16 peer_id,
+ u8 channelnum, bool reliable);
+
+
+ Connection* m_connection;
};
class Connection
{
public:
- Connection(
- u32 protocol_id,
- u32 max_packet_size,
- float timeout,
- PeerHandler *peerhandler
- );
- ~Connection();
- void setTimeoutMs(int timeout){ m_socket.setTimeoutMs(timeout); }
- // Start being a server
- void Serve(unsigned short port);
- // Connect to a server
- void Connect(Address address);
- bool Connected();
+ friend class ConnectionSendThread;
+ friend class ConnectionReceiveThread;
- // Sets peer_id
- SharedBuffer<u8> GetFromBuffers(u16 &peer_id);
+ Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6);
+ Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6,
+ PeerHandler *peerhandler);
+ ~Connection();
- // The peer_id of sender is stored in peer_id
- // Return value: I guess this always throws an exception or
- // actually gets data
- u32 Receive(u16 &peer_id, u8 *data, u32 datasize);
+ /* Interface */
+ ConnectionEvent getEvent();
+ ConnectionEvent waitEvent(u32 timeout_ms);
+ void putCommand(ConnectionCommand &c);
- // These will automatically package the data as an original or split
+ void SetTimeoutMs(int timeout){ m_bc_receive_timeout = timeout; }
+ void Serve(Address bind_addr);
+ void Connect(Address address);
+ bool Connected();
+ void Disconnect();
+ u32 Receive(u16 &peer_id, SharedBuffer<u8> &data);
void SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable);
void Send(u16 peer_id, u8 channelnum, SharedBuffer<u8> data, bool reliable);
- // Send data as a packet; it will be wrapped in base header and
- // optionally to a reliable packet.
- void SendAsPacket(u16 peer_id, u8 channelnum,
- SharedBuffer<u8> data, bool reliable);
- // Sends a raw packet
- void RawSend(const BufferedPacket &packet);
-
- void RunTimeouts(float dtime);
- // Can throw a PeerNotFoundException
- Peer* GetPeer(u16 peer_id);
- // returns NULL if failed
- Peer* GetPeerNoEx(u16 peer_id);
- core::list<Peer*> GetPeers();
+ u16 GetPeerID(){ return m_peer_id; }
+ Address GetPeerAddress(u16 peer_id);
+ float getPeerStat(u16 peer_id, rtt_stat_type type);
+ float getLocalStat(rate_stat_type type);
+ const u32 GetProtocolID() const { return m_protocol_id; };
+ const std::string getDesc();
+ void DisconnectPeer(u16 peer_id);
+
+protected:
+ PeerHelper getPeer(u16 peer_id);
+ PeerHelper getPeerNoEx(u16 peer_id);
+ u16 lookupPeer(Address& sender);
+
+ u16 createPeer(Address& sender, MTProtocols protocol, int fd);
+ UDPPeer* createServerPeer(Address& sender);
+ bool deletePeer(u16 peer_id, bool timeout);
void SetPeerID(u16 id){ m_peer_id = id; }
- u16 GetPeerID(){ return m_peer_id; }
- u32 GetProtocolID(){ return m_protocol_id; }
- // For debug printing
+ void sendAck(u16 peer_id, u8 channelnum, u16 seqnum);
+
void PrintInfo(std::ostream &out);
void PrintInfo();
- u16 m_indentation;
+ std::list<u16> getPeerIDs();
+
+ UDPSocket m_udpSocket;
+ MutexedQueue<ConnectionCommand> m_command_queue;
+
+ void putEvent(ConnectionEvent &e);
+
+ void TriggerSend()
+ { m_sendThread.Trigger(); }
private:
- u32 m_protocol_id;
- float m_timeout;
- PeerHandler *m_peerhandler;
- core::map<u16, Peer*> m_peers;
+ std::list<Peer*> getPeers();
+
+ MutexedQueue<ConnectionEvent> m_event_queue;
+
u16 m_peer_id;
- //bool m_waiting_new_peer_id;
- u32 m_max_packet_size;
- UDPSocket m_socket;
+ u32 m_protocol_id;
+
+ std::map<u16, Peer*> m_peers;
+ JMutex m_peers_mutex;
+
+ ConnectionSendThread m_sendThread;
+ ConnectionReceiveThread m_receiveThread;
+
+ JMutex m_info_mutex;
+
+ // Backwards compatibility
+ PeerHandler *m_bc_peerhandler;
+ int m_bc_receive_timeout;
+
+ bool m_shutting_down;
+
+ u16 m_next_remote_peer_id;
};
} // namespace