#include <list>
#include <map>
+class NetworkPacket;
+
namespace con
{
data(a_size), time(0.0), totaltime(0.0), absolute_send_time(-1),
resend_count(0)
{}
- SharedBuffer<u8> data; // Data of the packet, including headers
+ Buffer<u8> data; // Data of the packet, including headers
float time; // Seconds from buffering the packet or re-sending
float totaltime; // Seconds from buffering the packet
- unsigned int absolute_send_time;
+ u64 absolute_send_time;
Address address; // Sender or destination
unsigned int resend_count;
};
u16 m_oldest_non_answered_ack;
- JMutex m_list_mutex;
+ Mutex m_list_mutex;
};
/*
// Key is seqnum
std::map<u16, IncomingSplitPacket*> m_buf;
- JMutex m_map_mutex;
+ Mutex m_map_mutex;
};
struct OutgoingPacket
bool reliable;
bool ack;
- OutgoingPacket(u16 peer_id_, u8 channelnum_, SharedBuffer<u8> data_,
+ OutgoingPacket(u16 peer_id_, u8 channelnum_, const SharedBuffer<u8> &data_,
bool reliable_,bool ack_=false):
peer_id(peer_id_),
channelnum(channelnum_),
peer_id = peer_id_;
}
void send(u16 peer_id_, u8 channelnum_,
- SharedBuffer<u8> data_, bool reliable_)
+ NetworkPacket* pkt, bool reliable_)
{
type = CONNCMD_SEND;
peer_id = peer_id_;
channelnum = channelnum_;
- data = data_;
- reliable = reliable_;
- }
- void sendToAll(u8 channelnum_, SharedBuffer<u8> data_, bool reliable_)
- {
- type = CONNCMD_SEND_TO_ALL;
- channelnum = channelnum_;
- data = data_;
+ data = pkt->oldForgePacket();
reliable = reliable_;
}
- void ack(u16 peer_id_, u8 channelnum_, SharedBuffer<u8> data_)
+ void ack(u16 peer_id_, u8 channelnum_, const SharedBuffer<u8> &data_)
{
type = CONCMD_ACK;
peer_id = peer_id_;
reliable = false;
}
- void createPeer(u16 peer_id_, SharedBuffer<u8> data_)
+ void createPeer(u16 peer_id_, const SharedBuffer<u8> &data_)
{
type = CONCMD_CREATE_PEER;
peer_id = peer_id_;
raw = true;
}
- void disableLegacy(u16 peer_id_, SharedBuffer<u8> data_)
+ void disableLegacy(u16 peer_id_, const SharedBuffer<u8> &data_)
{
type = CONCMD_DISABLE_LEGACY;
peer_id = peer_id_;
std::queue<BufferedPacket> queued_reliables;
//queue commands prior splitting to packets
- std::queue<ConnectionCommand> queued_commands;
+ std::deque<ConnectionCommand> queued_commands;
IncomingSplitBuffer incoming_splits;
void UpdateTimers(float dtime, bool legacy_peer);
const float getCurrentDownloadRateKB()
- { JMutexAutoLock lock(m_internal_mutex); return cur_kbps; };
+ { MutexAutoLock lock(m_internal_mutex); return cur_kbps; };
const float getMaxDownloadRateKB()
- { JMutexAutoLock lock(m_internal_mutex); return max_kbps; };
+ { MutexAutoLock lock(m_internal_mutex); return max_kbps; };
const float getCurrentLossRateKB()
- { JMutexAutoLock lock(m_internal_mutex); return cur_kbps_lost; };
+ { MutexAutoLock lock(m_internal_mutex); return cur_kbps_lost; };
const float getMaxLossRateKB()
- { JMutexAutoLock lock(m_internal_mutex); return max_kbps_lost; };
+ { MutexAutoLock lock(m_internal_mutex); return max_kbps_lost; };
const float getCurrentIncomingRateKB()
- { JMutexAutoLock lock(m_internal_mutex); return cur_incoming_kbps; };
+ { MutexAutoLock lock(m_internal_mutex); return cur_incoming_kbps; };
const float getMaxIncomingRateKB()
- { JMutexAutoLock lock(m_internal_mutex); return max_incoming_kbps; };
+ { MutexAutoLock lock(m_internal_mutex); return max_incoming_kbps; };
const float getAvgDownloadRateKB()
- { JMutexAutoLock lock(m_internal_mutex); return avg_kbps; };
+ { MutexAutoLock lock(m_internal_mutex); return avg_kbps; };
const float getAvgLossRateKB()
- { JMutexAutoLock lock(m_internal_mutex); return avg_kbps_lost; };
+ { MutexAutoLock lock(m_internal_mutex); return avg_kbps_lost; };
const float getAvgIncomingRateKB()
- { JMutexAutoLock lock(m_internal_mutex); return avg_incoming_kbps; };
+ { MutexAutoLock lock(m_internal_mutex); return avg_incoming_kbps; };
const unsigned int getWindowSize() const { return window_size; };
void setWindowSize(unsigned int size) { window_size = size; };
private:
- JMutex m_internal_mutex;
+ Mutex m_internal_mutex;
int window_size;
u16 next_incoming_seqnum;
m_last_rtt(-1.0),
m_usage(0),
m_timeout_counter(0.0),
- m_last_timeout_check(porting::getTimeMs()),
- m_has_sent_with_id(false)
+ m_last_timeout_check(porting::getTimeMs())
{
m_rtt.avg_rtt = -1.0;
m_rtt.jitter_avg = -1.0;
};
virtual ~Peer() {
- JMutexAutoLock usage_lock(m_exclusive_access_mutex);
- assert(m_usage == 0);
+ MutexAutoLock usage_lock(m_exclusive_access_mutex);
+ FATAL_ERROR_IF(m_usage != 0, "Reference counting failure");
};
// Unique id of the peer
virtual void PutReliableSendCommand(ConnectionCommand &c,
unsigned int max_packet_size) {};
- virtual bool isActive() { return false; };
-
virtual bool getAddress(MTProtocols type, Address& toset) = 0;
+ bool isPendingDeletion()
+ { MutexAutoLock lock(m_exclusive_access_mutex); return m_pending_deletion; };
+
void ResetTimeout()
- {JMutexAutoLock lock(m_exclusive_access_mutex); m_timeout_counter=0.0; };
+ {MutexAutoLock lock(m_exclusive_access_mutex); m_timeout_counter=0.0; };
bool isTimedOut(float timeout);
- void setSentWithID()
- { JMutexAutoLock lock(m_exclusive_access_mutex); m_has_sent_with_id = true; };
-
- bool hasSentWithID()
- { JMutexAutoLock lock(m_exclusive_access_mutex); return m_has_sent_with_id; };
-
unsigned int m_increment_packets_remaining;
unsigned int m_increment_bytes_remaining;
virtual void reportRTT(float rtt) {};
void RTTStatistics(float rtt,
- std::string profiler_id="",
- unsigned int num_samples=1000);
+ const std::string &profiler_id = "",
+ unsigned int num_samples = 1000);
bool IncUseCount();
void DecUseCount();
- JMutex m_exclusive_access_mutex;
+ Mutex m_exclusive_access_mutex;
bool m_pending_deletion;
float m_timeout_counter;
u32 m_last_timeout_check;
-
- bool m_has_sent_with_id;
};
class UDPPeer : public Peer
void PutReliableSendCommand(ConnectionCommand &c,
unsigned int max_packet_size);
- bool isActive()
- { return ((hasSentWithID()) && (!m_pending_deletion)); };
-
bool getAddress(MTProtocols type, Address& toset);
void setNonLegacyPeer();
unsigned int maxtransfer);
float getResendTimeout()
- { JMutexAutoLock lock(m_exclusive_access_mutex); return resend_timeout; }
+ { MutexAutoLock lock(m_exclusive_access_mutex); return resend_timeout; }
void setResendTimeout(float timeout)
- { JMutexAutoLock lock(m_exclusive_access_mutex); resend_timeout = timeout; }
+ { MutexAutoLock lock(m_exclusive_access_mutex); resend_timeout = timeout; }
bool Ping(float dtime,SharedBuffer<u8>& data);
Channel channels[CHANNEL_COUNT];
bool timeout;
Address address;
- ConnectionEvent(): type(CONNEVENT_NONE) {}
+ ConnectionEvent(): type(CONNEVENT_NONE), peer_id(0),
+ timeout(false) {}
std::string describe()
{
return "Invalid ConnectionEvent";
}
- void dataReceived(u16 peer_id_, SharedBuffer<u8> data_)
+ void dataReceived(u16 peer_id_, const SharedBuffer<u8> &data_)
{
type = CONNEVENT_DATA_RECEIVED;
peer_id = peer_id_;
}
};
-class ConnectionSendThread : public JThread {
+class ConnectionSendThread : public Thread {
public:
friend class UDPPeer;
ConnectionSendThread(unsigned int max_packet_size, float timeout);
- void * Thread ();
+ void *run();
void Trigger();
void setParent(Connection* parent) {
- assert(parent != NULL);
+ assert(parent != NULL); // Pre-condition
m_connection = parent;
}
unsigned int m_max_packet_size;
float m_timeout;
std::queue<OutgoingPacket> m_outgoing_queue;
- JSemaphore m_send_sleep_semaphore;
+ Semaphore m_send_sleep_semaphore;
unsigned int m_iteration_packets_avaialble;
unsigned int m_max_commands_per_iteration;
unsigned int m_max_packets_requeued;
};
-class ConnectionReceiveThread : public JThread {
+class ConnectionReceiveThread : public Thread {
public:
ConnectionReceiveThread(unsigned int max_packet_size);
- void * Thread ();
+ void *run();
- void setParent(Connection* parent) {
- assert(parent != NULL);
+ void setParent(Connection *parent) {
+ assert(parent); // Pre-condition
m_connection = parent;
}
private:
- void receive ();
+ void receive();
// Returns next data from a buffer if possible
// If found, returns true; if not, false.
// If found, sets peer_id and dst
- bool getFromBuffers (u16 &peer_id, SharedBuffer<u8> &dst);
+ bool getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst);
bool checkIncomingBuffers(Channel *channel, u16 &peer_id,
SharedBuffer<u8> &dst);
friend class ConnectionSendThread;
friend class ConnectionReceiveThread;
- Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6);
Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6,
PeerHandler *peerhandler);
~Connection();
/* Interface */
- ConnectionEvent getEvent();
ConnectionEvent waitEvent(u32 timeout_ms);
void putCommand(ConnectionCommand &c);
void Connect(Address address);
bool Connected();
void Disconnect();
- u32 Receive(u16 &peer_id, SharedBuffer<u8> &data);
+ void Receive(NetworkPacket* pkt);
void Send(u16 peer_id, u8 channelnum, NetworkPacket* pkt, bool reliable);
u16 GetPeerID() { return m_peer_id; }
Address GetPeerAddress(u16 peer_id);
void PrintInfo(std::ostream &out);
void PrintInfo();
- std::list<u16> getPeerIDs() { return m_peer_ids; }
+ std::list<u16> getPeerIDs()
+ {
+ MutexAutoLock peerlock(m_peers_mutex);
+ return m_peer_ids;
+ }
UDPSocket m_udpSocket;
MutexedQueue<ConnectionCommand> m_command_queue;
std::map<u16, Peer*> m_peers;
std::list<u16> m_peer_ids;
- JMutex m_peers_mutex;
+ Mutex m_peers_mutex;
ConnectionSendThread m_sendThread;
ConnectionReceiveThread m_receiveThread;
- JMutex m_info_mutex;
+ Mutex m_info_mutex;
// Backwards compatibility
PeerHandler *m_bc_peerhandler;