3dc87b489eaf4995844703aa5a2efb4eeecd8222
[oweals/minetest.git] / src / network / connection.h
1 /*
2 Minetest
3 Copyright (C) 2013 celeron55, Perttu Ahola <celeron55@gmail.com>
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation; either version 2.1 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU Lesser General Public License for more details.
14
15 You should have received a copy of the GNU Lesser General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #ifndef CONNECTION_HEADER
21 #define CONNECTION_HEADER
22
23 #include "irrlichttypes_bloated.h"
24 #include "socket.h"
25 #include "exceptions.h"
26 #include "constants.h"
27 #include "util/pointer.h"
28 #include "util/container.h"
29 #include "util/thread.h"
30 #include "util/numeric.h"
31 #include <iostream>
32 #include <fstream>
33 #include <list>
34 #include <map>
35
36 class NetworkPacket;
37
38 namespace con
39 {
40
41 /*
42         Exceptions
43 */
44 class NotFoundException : public BaseException
45 {
46 public:
47         NotFoundException(const char *s):
48                 BaseException(s)
49         {}
50 };
51
52 class PeerNotFoundException : public BaseException
53 {
54 public:
55         PeerNotFoundException(const char *s):
56                 BaseException(s)
57         {}
58 };
59
60 class ConnectionException : public BaseException
61 {
62 public:
63         ConnectionException(const char *s):
64                 BaseException(s)
65         {}
66 };
67
68 class ConnectionBindFailed : public BaseException
69 {
70 public:
71         ConnectionBindFailed(const char *s):
72                 BaseException(s)
73         {}
74 };
75
76 class InvalidIncomingDataException : public BaseException
77 {
78 public:
79         InvalidIncomingDataException(const char *s):
80                 BaseException(s)
81         {}
82 };
83
84 class InvalidOutgoingDataException : public BaseException
85 {
86 public:
87         InvalidOutgoingDataException(const char *s):
88                 BaseException(s)
89         {}
90 };
91
92 class NoIncomingDataException : public BaseException
93 {
94 public:
95         NoIncomingDataException(const char *s):
96                 BaseException(s)
97         {}
98 };
99
100 class ProcessedSilentlyException : public BaseException
101 {
102 public:
103         ProcessedSilentlyException(const char *s):
104                 BaseException(s)
105         {}
106 };
107
108 class ProcessedQueued : public BaseException
109 {
110 public:
111         ProcessedQueued(const char *s):
112                 BaseException(s)
113         {}
114 };
115
116 class IncomingDataCorruption : public BaseException
117 {
118 public:
119         IncomingDataCorruption(const char *s):
120                 BaseException(s)
121         {}
122 };
123
124 typedef enum MTProtocols {
125         MTP_PRIMARY,
126         MTP_UDP,
127         MTP_MINETEST_RELIABLE_UDP
128 } MTProtocols;
129
130 #define SEQNUM_MAX 65535
131 inline bool seqnum_higher(u16 totest, u16 base)
132 {
133         if (totest > base)
134         {
135                 if ((totest - base) > (SEQNUM_MAX/2))
136                         return false;
137                 else
138                         return true;
139         }
140         else
141         {
142                 if ((base - totest) > (SEQNUM_MAX/2))
143                         return true;
144                 else
145                         return false;
146         }
147 }
148
149 inline bool seqnum_in_window(u16 seqnum, u16 next,u16 window_size)
150 {
151         u16 window_start = next;
152         u16 window_end   = ( next + window_size ) % (SEQNUM_MAX+1);
153
154         if (window_start < window_end)
155         {
156                 return ((seqnum >= window_start) && (seqnum < window_end));
157         }
158         else
159         {
160                 return ((seqnum < window_end) || (seqnum >= window_start));
161         }
162 }
163
164 struct BufferedPacket
165 {
166         BufferedPacket(u8 *a_data, u32 a_size):
167                 data(a_data, a_size)
168         {}
169         BufferedPacket(u32 a_size):
170                 data(a_size), time(0.0), totaltime(0.0), absolute_send_time(-1),
171                 resend_count(0)
172         {}
173         Buffer<u8> data; // Data of the packet, including headers
174         float time = 0.0f; // Seconds from buffering the packet or re-sending
175         float totaltime = 0.0f; // Seconds from buffering the packet
176         u64 absolute_send_time = -1;
177         Address address; // Sender or destination
178         unsigned int resend_count = 0;
179 };
180
181 // This adds the base headers to the data and makes a packet out of it
182 BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
183                 u32 protocol_id, u16 sender_peer_id, u8 channel);
184 BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
185                 u32 protocol_id, u16 sender_peer_id, u8 channel);
186
187 // Add the TYPE_ORIGINAL header to the data
188 SharedBuffer<u8> makeOriginalPacket(
189                 SharedBuffer<u8> data);
190
191 // Split data in chunks and add TYPE_SPLIT headers to them
192 std::list<SharedBuffer<u8> > makeSplitPacket(
193                 SharedBuffer<u8> data,
194                 u32 chunksize_max,
195                 u16 seqnum);
196
197 // Depending on size, make a TYPE_ORIGINAL or TYPE_SPLIT packet
198 // Increments split_seqnum if a split packet is made
199 std::list<SharedBuffer<u8> > makeAutoSplitPacket(
200                 SharedBuffer<u8> data,
201                 u32 chunksize_max,
202                 u16 &split_seqnum);
203
204 // Add the TYPE_RELIABLE header to the data
205 SharedBuffer<u8> makeReliablePacket(
206                 SharedBuffer<u8> data,
207                 u16 seqnum);
208
209 struct IncomingSplitPacket
210 {
211         IncomingSplitPacket() {}
212         // Key is chunk number, value is data without headers
213         std::map<u16, SharedBuffer<u8> > chunks;
214         u32 chunk_count;
215         float time = 0.0f; // Seconds from adding
216         bool reliable = false; // If true, isn't deleted on timeout
217
218         bool allReceived()
219         {
220                 return (chunks.size() == chunk_count);
221         }
222 };
223
224 /*
225 === NOTES ===
226
227 A packet is sent through a channel to a peer with a basic header:
228 TODO: Should we have a receiver_peer_id also?
229         Header (7 bytes):
230         [0] u32 protocol_id
231         [4] u16 sender_peer_id
232         [6] u8 channel
233 sender_peer_id:
234         Unique to each peer.
235         value 0 (PEER_ID_INEXISTENT) is reserved for making new connections
236         value 1 (PEER_ID_SERVER) is reserved for server
237         these constants are defined in constants.h
238 channel:
239         The lower the number, the higher the priority is.
240         Only channels 0, 1 and 2 exist.
241 */
242 #define BASE_HEADER_SIZE 7
243 #define CHANNEL_COUNT 3
244 /*
245 Packet types:
246
247 CONTROL: This is a packet used by the protocol.
248 - When this is processed, nothing is handed to the user.
249         Header (2 byte):
250         [0] u8 type
251         [1] u8 controltype
252 controltype and data description:
253         CONTROLTYPE_ACK
254                 [2] u16 seqnum
255         CONTROLTYPE_SET_PEER_ID
256                 [2] u16 peer_id_new
257         CONTROLTYPE_PING
258         - There is no actual reply, but this can be sent in a reliable
259           packet to get a reply
260         CONTROLTYPE_DISCO
261 */
262 #define TYPE_CONTROL 0
263 #define CONTROLTYPE_ACK 0
264 #define CONTROLTYPE_SET_PEER_ID 1
265 #define CONTROLTYPE_PING 2
266 #define CONTROLTYPE_DISCO 3
267 #define CONTROLTYPE_ENABLE_BIG_SEND_WINDOW 4
268
269 /*
270 ORIGINAL: This is a plain packet with no control and no error
271 checking at all.
272 - When this is processed, it is directly handed to the user.
273         Header (1 byte):
274         [0] u8 type
275 */
276 #define TYPE_ORIGINAL 1
277 #define ORIGINAL_HEADER_SIZE 1
278 /*
279 SPLIT: These are sequences of packets forming one bigger piece of
280 data.
281 - When processed and all the packet_nums 0...packet_count-1 are
282   present (this should be buffered), the resulting data shall be
283   directly handed to the user.
284 - If the data fails to come up in a reasonable time, the buffer shall
285   be silently discarded.
286 - These can be sent as-is or atop of a RELIABLE packet stream.
287         Header (7 bytes):
288         [0] u8 type
289         [1] u16 seqnum
290         [3] u16 chunk_count
291         [5] u16 chunk_num
292 */
293 #define TYPE_SPLIT 2
294 /*
295 RELIABLE: Delivery of all RELIABLE packets shall be forced by ACKs,
296 and they shall be delivered in the same order as sent. This is done
297 with a buffer in the receiving and transmitting end.
298 - When this is processed, the contents of each packet is recursively
299   processed as packets.
300         Header (3 bytes):
301         [0] u8 type
302         [1] u16 seqnum
303
304 */
305 #define TYPE_RELIABLE 3
306 #define RELIABLE_HEADER_SIZE 3
307 #define SEQNUM_INITIAL 65500
308
309 /*
310         A buffer which stores reliable packets and sorts them internally
311         for fast access to the smallest one.
312 */
313
314 typedef std::list<BufferedPacket>::iterator RPBSearchResult;
315
316 class ReliablePacketBuffer
317 {
318 public:
319         ReliablePacketBuffer() {};
320
321         bool getFirstSeqnum(u16& result);
322
323         BufferedPacket popFirst();
324         BufferedPacket popSeqnum(u16 seqnum);
325         void insert(BufferedPacket &p,u16 next_expected);
326
327         void incrementTimeouts(float dtime);
328         std::list<BufferedPacket> getTimedOuts(float timeout,
329                         unsigned int max_packets);
330
331         void print();
332         bool empty();
333         bool containsPacket(u16 seqnum);
334         RPBSearchResult notFound();
335         u32 size();
336
337
338 private:
339         RPBSearchResult findPacket(u16 seqnum);
340
341         std::list<BufferedPacket> m_list;
342         u32 m_list_size = 0;
343
344         u16 m_oldest_non_answered_ack;
345
346         std::mutex m_list_mutex;
347 };
348
349 /*
350         A buffer for reconstructing split packets
351 */
352
353 class IncomingSplitBuffer
354 {
355 public:
356         ~IncomingSplitBuffer();
357         /*
358                 Returns a reference counted buffer of length != 0 when a full split
359                 packet is constructed. If not, returns one of length 0.
360         */
361         SharedBuffer<u8> insert(BufferedPacket &p, bool reliable);
362
363         void removeUnreliableTimedOuts(float dtime, float timeout);
364
365 private:
366         // Key is seqnum
367         std::map<u16, IncomingSplitPacket*> m_buf;
368
369         std::mutex m_map_mutex;
370 };
371
372 struct OutgoingPacket
373 {
374         u16 peer_id;
375         u8 channelnum;
376         SharedBuffer<u8> data;
377         bool reliable;
378         bool ack;
379
380         OutgoingPacket(u16 peer_id_, u8 channelnum_, const SharedBuffer<u8> &data_,
381                         bool reliable_,bool ack_=false):
382                 peer_id(peer_id_),
383                 channelnum(channelnum_),
384                 data(data_),
385                 reliable(reliable_),
386                 ack(ack_)
387         {
388         }
389 };
390
391 enum ConnectionCommandType{
392         CONNCMD_NONE,
393         CONNCMD_SERVE,
394         CONNCMD_CONNECT,
395         CONNCMD_DISCONNECT,
396         CONNCMD_DISCONNECT_PEER,
397         CONNCMD_SEND,
398         CONNCMD_SEND_TO_ALL,
399         CONCMD_ACK,
400         CONCMD_CREATE_PEER,
401         CONCMD_DISABLE_LEGACY
402 };
403
404 struct ConnectionCommand
405 {
406         enum ConnectionCommandType type = CONNCMD_NONE;
407         Address address;
408         u16 peer_id = PEER_ID_INEXISTENT;
409         u8 channelnum;
410         Buffer<u8> data;
411         bool reliable = false;
412         bool raw = false;
413
414         ConnectionCommand() {}
415
416         void serve(Address address_)
417         {
418                 type = CONNCMD_SERVE;
419                 address = address_;
420         }
421         void connect(Address address_)
422         {
423                 type = CONNCMD_CONNECT;
424                 address = address_;
425         }
426         void disconnect()
427         {
428                 type = CONNCMD_DISCONNECT;
429         }
430         void disconnect_peer(u16 peer_id_)
431         {
432                 type = CONNCMD_DISCONNECT_PEER;
433                 peer_id = peer_id_;
434         }
435
436         void send(u16 peer_id_, u8 channelnum_, NetworkPacket* pkt, bool reliable_);
437
438         void ack(u16 peer_id_, u8 channelnum_, const SharedBuffer<u8> &data_)
439         {
440                 type = CONCMD_ACK;
441                 peer_id = peer_id_;
442                 channelnum = channelnum_;
443                 data = data_;
444                 reliable = false;
445         }
446
447         void createPeer(u16 peer_id_, const SharedBuffer<u8> &data_)
448         {
449                 type = CONCMD_CREATE_PEER;
450                 peer_id = peer_id_;
451                 data = data_;
452                 channelnum = 0;
453                 reliable = true;
454                 raw = true;
455         }
456
457         void disableLegacy(u16 peer_id_, const SharedBuffer<u8> &data_)
458         {
459                 type = CONCMD_DISABLE_LEGACY;
460                 peer_id = peer_id_;
461                 data = data_;
462                 channelnum = 0;
463                 reliable = true;
464                 raw = true;
465         }
466 };
467
468 /* maximum window size to use, 0xFFFF is theoretical maximum  don't think about
469  * touching it, the less you're away from it the more likely data corruption
470  * will occur
471  */
472 #define MAX_RELIABLE_WINDOW_SIZE 0x8000
473         /* starting value for window size */
474 #define MIN_RELIABLE_WINDOW_SIZE 0x40
475
476 class Channel
477 {
478
479 public:
480         u16 readNextIncomingSeqNum();
481         u16 incNextIncomingSeqNum();
482
483         u16 getOutgoingSequenceNumber(bool& successfull);
484         u16 readOutgoingSequenceNumber();
485         bool putBackSequenceNumber(u16);
486
487         u16 readNextSplitSeqNum();
488         void setNextSplitSeqNum(u16 seqnum);
489
490         // This is for buffering the incoming packets that are coming in
491         // the wrong order
492         ReliablePacketBuffer incoming_reliables;
493         // This is for buffering the sent packets so that the sender can
494         // re-send them if no ACK is received
495         ReliablePacketBuffer outgoing_reliables_sent;
496
497         //queued reliable packets
498         std::queue<BufferedPacket> queued_reliables;
499
500         //queue commands prior splitting to packets
501         std::deque<ConnectionCommand> queued_commands;
502
503         IncomingSplitBuffer incoming_splits;
504
505         Channel() {};
506         ~Channel() {};
507
508         void UpdatePacketLossCounter(unsigned int count);
509         void UpdatePacketTooLateCounter();
510         void UpdateBytesSent(unsigned int bytes,unsigned int packages=1);
511         void UpdateBytesLost(unsigned int bytes);
512         void UpdateBytesReceived(unsigned int bytes);
513
514         void UpdateTimers(float dtime, bool legacy_peer);
515
516         const float getCurrentDownloadRateKB()
517                 { MutexAutoLock lock(m_internal_mutex); return cur_kbps; };
518         const float getMaxDownloadRateKB()
519                 { MutexAutoLock lock(m_internal_mutex); return max_kbps; };
520
521         const float getCurrentLossRateKB()
522                 { MutexAutoLock lock(m_internal_mutex); return cur_kbps_lost; };
523         const float getMaxLossRateKB()
524                 { MutexAutoLock lock(m_internal_mutex); return max_kbps_lost; };
525
526         const float getCurrentIncomingRateKB()
527                 { MutexAutoLock lock(m_internal_mutex); return cur_incoming_kbps; };
528         const float getMaxIncomingRateKB()
529                 { MutexAutoLock lock(m_internal_mutex); return max_incoming_kbps; };
530
531         const float getAvgDownloadRateKB()
532                 { MutexAutoLock lock(m_internal_mutex); return avg_kbps; };
533         const float getAvgLossRateKB()
534                 { MutexAutoLock lock(m_internal_mutex); return avg_kbps_lost; };
535         const float getAvgIncomingRateKB()
536                 { MutexAutoLock lock(m_internal_mutex); return avg_incoming_kbps; };
537
538         const unsigned int getWindowSize() const { return window_size; };
539
540         void setWindowSize(unsigned int size) { window_size = size; };
541 private:
542         std::mutex m_internal_mutex;
543         int window_size = MIN_RELIABLE_WINDOW_SIZE;
544
545         u16 next_incoming_seqnum = SEQNUM_INITIAL;
546
547         u16 next_outgoing_seqnum = SEQNUM_INITIAL;
548         u16 next_outgoing_split_seqnum = SEQNUM_INITIAL;
549
550         unsigned int current_packet_loss = 0;
551         unsigned int current_packet_too_late = 0;
552         unsigned int current_packet_successfull = 0;
553         float packet_loss_counter = 0.0f;
554
555         unsigned int current_bytes_transfered = 0;
556         unsigned int current_bytes_received = 0;
557         unsigned int current_bytes_lost = 0;
558         float max_kbps = 0.0f;
559         float cur_kbps = 0.0f;
560         float avg_kbps = 0.0f;
561         float max_incoming_kbps = 0.0f;
562         float cur_incoming_kbps = 0.0f;
563         float avg_incoming_kbps = 0.0f;
564         float max_kbps_lost = 0.0f;
565         float cur_kbps_lost = 0.0f;
566         float avg_kbps_lost = 0.0f;
567         float bpm_counter = 0.0f;
568
569         unsigned int rate_samples = 0;
570 };
571
572 class Peer;
573
574 enum PeerChangeType
575 {
576         PEER_ADDED,
577         PEER_REMOVED
578 };
579 struct PeerChange
580 {
581         PeerChange(PeerChangeType t, u16 _peer_id, bool _timeout):
582                 type(t), peer_id(_peer_id), timeout(_timeout) {}
583         PeerChange() = delete;
584
585         PeerChangeType type;
586         u16 peer_id;
587         bool timeout;
588 };
589
590 class PeerHandler
591 {
592 public:
593
594         PeerHandler()
595         {
596         }
597         virtual ~PeerHandler()
598         {
599         }
600
601         /*
602                 This is called after the Peer has been inserted into the
603                 Connection's peer container.
604         */
605         virtual void peerAdded(Peer *peer) = 0;
606         /*
607                 This is called before the Peer has been removed from the
608                 Connection's peer container.
609         */
610         virtual void deletingPeer(Peer *peer, bool timeout) = 0;
611 };
612
613 class PeerHelper
614 {
615 public:
616         PeerHelper() {};
617         PeerHelper(Peer* peer);
618         ~PeerHelper();
619
620         PeerHelper&   operator=(Peer* peer);
621         Peer*         operator->() const;
622         bool          operator!();
623         Peer*         operator&() const;
624         bool          operator!=(void* ptr);
625
626 private:
627         Peer *m_peer = nullptr;
628 };
629
630 class Connection;
631
632 typedef enum {
633         MIN_RTT,
634         MAX_RTT,
635         AVG_RTT,
636         MIN_JITTER,
637         MAX_JITTER,
638         AVG_JITTER
639 } rtt_stat_type;
640
641 typedef enum {
642         CUR_DL_RATE,
643         AVG_DL_RATE,
644         CUR_INC_RATE,
645         AVG_INC_RATE,
646         CUR_LOSS_RATE,
647         AVG_LOSS_RATE,
648 } rate_stat_type;
649
650 class Peer {
651         public:
652                 friend class PeerHelper;
653
654                 Peer(Address address_,u16 id_,Connection* connection) :
655                         id(id_),
656                         m_connection(connection),
657                         address(address_),
658                         m_last_timeout_check(porting::getTimeMs())
659                 {
660                 };
661
662                 virtual ~Peer() {
663                         MutexAutoLock usage_lock(m_exclusive_access_mutex);
664                         FATAL_ERROR_IF(m_usage != 0, "Reference counting failure");
665                 };
666
667                 // Unique id of the peer
668                 u16 id;
669
670                 void Drop();
671
672                 virtual void PutReliableSendCommand(ConnectionCommand &c,
673                                                 unsigned int max_packet_size) {};
674
675                 virtual bool getAddress(MTProtocols type, Address& toset) = 0;
676
677                 bool isPendingDeletion()
678                 { MutexAutoLock lock(m_exclusive_access_mutex); return m_pending_deletion; };
679
680                 void ResetTimeout()
681                         {MutexAutoLock lock(m_exclusive_access_mutex); m_timeout_counter = 0.0; };
682
683                 bool isTimedOut(float timeout);
684
685                 unsigned int m_increment_packets_remaining = 9;
686                 unsigned int m_increment_bytes_remaining = 0;
687
688                 virtual u16 getNextSplitSequenceNumber(u8 channel) { return 0; };
689                 virtual void setNextSplitSequenceNumber(u8 channel, u16 seqnum) {};
690                 virtual SharedBuffer<u8> addSpiltPacket(u8 channel,
691                                                                                                 BufferedPacket toadd,
692                                                                                                 bool reliable)
693                                 {
694                                         fprintf(stderr,"Peer: addSplitPacket called, this is supposed to be never called!\n");
695                                         return SharedBuffer<u8>(0);
696                                 };
697
698                 virtual bool Ping(float dtime, SharedBuffer<u8>& data) { return false; };
699
700                 virtual float getStat(rtt_stat_type type) const {
701                         switch (type) {
702                                 case MIN_RTT:
703                                         return m_rtt.min_rtt;
704                                 case MAX_RTT:
705                                         return m_rtt.max_rtt;
706                                 case AVG_RTT:
707                                         return m_rtt.avg_rtt;
708                                 case MIN_JITTER:
709                                         return m_rtt.jitter_min;
710                                 case MAX_JITTER:
711                                         return m_rtt.jitter_max;
712                                 case AVG_JITTER:
713                                         return m_rtt.jitter_avg;
714                         }
715                         return -1;
716                 }
717         protected:
718                 virtual void reportRTT(float rtt) {};
719
720                 void RTTStatistics(float rtt,
721                                                         const std::string &profiler_id = "",
722                                                         unsigned int num_samples = 1000);
723
724                 bool IncUseCount();
725                 void DecUseCount();
726
727                 std::mutex m_exclusive_access_mutex;
728
729                 bool m_pending_deletion = false;
730
731                 Connection* m_connection;
732
733                 // Address of the peer
734                 Address address;
735
736                 // Ping timer
737                 float m_ping_timer = 0.0f;
738         private:
739
740                 struct rttstats {
741                         float jitter_min = FLT_MAX;
742                         float jitter_max = 0.0f;
743                         float jitter_avg = -1.0f;
744                         float min_rtt = FLT_MAX;
745                         float max_rtt = 0.0f;
746                         float avg_rtt = -1.0f;
747
748                         rttstats() {};
749                 };
750
751                 rttstats m_rtt;
752                 float m_last_rtt = -1.0f;
753
754                 // current usage count
755                 unsigned int m_usage = 0;
756
757                 // Seconds from last receive
758                 float m_timeout_counter = 0.0f;
759
760                 u64 m_last_timeout_check;
761 };
762
763 class UDPPeer : public Peer
764 {
765 public:
766
767         friend class PeerHelper;
768         friend class ConnectionReceiveThread;
769         friend class ConnectionSendThread;
770         friend class Connection;
771
772         UDPPeer(u16 a_id, Address a_address, Connection* connection);
773         virtual ~UDPPeer() {};
774
775         void PutReliableSendCommand(ConnectionCommand &c,
776                                                         unsigned int max_packet_size);
777
778         bool getAddress(MTProtocols type, Address& toset);
779
780         void setNonLegacyPeer();
781
782         bool getLegacyPeer()
783         { return m_legacy_peer; }
784
785         u16 getNextSplitSequenceNumber(u8 channel);
786         void setNextSplitSequenceNumber(u8 channel, u16 seqnum);
787
788         SharedBuffer<u8> addSpiltPacket(u8 channel,
789                                                                         BufferedPacket toadd,
790                                                                         bool reliable);
791
792
793 protected:
794         /*
795                 Calculates avg_rtt and resend_timeout.
796                 rtt=-1 only recalculates resend_timeout
797         */
798         void reportRTT(float rtt);
799
800         void RunCommandQueues(
801                                         unsigned int max_packet_size,
802                                         unsigned int maxcommands,
803                                         unsigned int maxtransfer);
804
805         float getResendTimeout()
806                 { MutexAutoLock lock(m_exclusive_access_mutex); return resend_timeout; }
807
808         void setResendTimeout(float timeout)
809                 { MutexAutoLock lock(m_exclusive_access_mutex); resend_timeout = timeout; }
810         bool Ping(float dtime,SharedBuffer<u8>& data);
811
812         Channel channels[CHANNEL_COUNT];
813         bool m_pending_disconnect = false;
814 private:
815         // This is changed dynamically
816         float resend_timeout = 0.5;
817
818         bool processReliableSendCommand(
819                                         ConnectionCommand &c,
820                                         unsigned int max_packet_size);
821
822         bool m_legacy_peer = true;
823 };
824
825 /*
826         Connection
827 */
828
829 enum ConnectionEventType{
830         CONNEVENT_NONE,
831         CONNEVENT_DATA_RECEIVED,
832         CONNEVENT_PEER_ADDED,
833         CONNEVENT_PEER_REMOVED,
834         CONNEVENT_BIND_FAILED,
835 };
836
837 struct ConnectionEvent
838 {
839         enum ConnectionEventType type = CONNEVENT_NONE;
840         u16 peer_id = 0;
841         Buffer<u8> data;
842         bool timeout = false;
843         Address address;
844
845         ConnectionEvent() {}
846
847         std::string describe()
848         {
849                 switch(type) {
850                 case CONNEVENT_NONE:
851                         return "CONNEVENT_NONE";
852                 case CONNEVENT_DATA_RECEIVED:
853                         return "CONNEVENT_DATA_RECEIVED";
854                 case CONNEVENT_PEER_ADDED:
855                         return "CONNEVENT_PEER_ADDED";
856                 case CONNEVENT_PEER_REMOVED:
857                         return "CONNEVENT_PEER_REMOVED";
858                 case CONNEVENT_BIND_FAILED:
859                         return "CONNEVENT_BIND_FAILED";
860                 }
861                 return "Invalid ConnectionEvent";
862         }
863
864         void dataReceived(u16 peer_id_, const SharedBuffer<u8> &data_)
865         {
866                 type = CONNEVENT_DATA_RECEIVED;
867                 peer_id = peer_id_;
868                 data = data_;
869         }
870         void peerAdded(u16 peer_id_, Address address_)
871         {
872                 type = CONNEVENT_PEER_ADDED;
873                 peer_id = peer_id_;
874                 address = address_;
875         }
876         void peerRemoved(u16 peer_id_, bool timeout_, Address address_)
877         {
878                 type = CONNEVENT_PEER_REMOVED;
879                 peer_id = peer_id_;
880                 timeout = timeout_;
881                 address = address_;
882         }
883         void bindFailed()
884         {
885                 type = CONNEVENT_BIND_FAILED;
886         }
887 };
888
889 class ConnectionSendThread : public Thread {
890
891 public:
892         friend class UDPPeer;
893
894         ConnectionSendThread(unsigned int max_packet_size, float timeout);
895
896         void *run();
897
898         void Trigger();
899
900         void setParent(Connection* parent) {
901                 assert(parent != NULL); // Pre-condition
902                 m_connection = parent;
903         }
904
905         void setPeerTimeout(float peer_timeout)
906                 { m_timeout = peer_timeout; }
907
908 private:
909         void runTimeouts    (float dtime);
910         void rawSend        (const BufferedPacket &packet);
911         bool rawSendAsPacket(u16 peer_id, u8 channelnum,
912                                                         SharedBuffer<u8> data, bool reliable);
913
914         void processReliableCommand (ConnectionCommand &c);
915         void processNonReliableCommand (ConnectionCommand &c);
916         void serve          (Address bind_address);
917         void connect        (Address address);
918         void disconnect     ();
919         void disconnect_peer(u16 peer_id);
920         void send           (u16 peer_id, u8 channelnum,
921                                                         SharedBuffer<u8> data);
922         void sendReliable   (ConnectionCommand &c);
923         void sendToAll      (u8 channelnum,
924                                                         SharedBuffer<u8> data);
925         void sendToAllReliable(ConnectionCommand &c);
926
927         void sendPackets    (float dtime);
928
929         void sendAsPacket   (u16 peer_id, u8 channelnum,
930                                                         SharedBuffer<u8> data,bool ack=false);
931
932         void sendAsPacketReliable(BufferedPacket& p, Channel* channel);
933
934         bool packetsQueued();
935
936         Connection           *m_connection = nullptr;
937         unsigned int          m_max_packet_size;
938         float                 m_timeout;
939         std::queue<OutgoingPacket> m_outgoing_queue;
940         Semaphore             m_send_sleep_semaphore;
941
942         unsigned int          m_iteration_packets_avaialble;
943         unsigned int          m_max_commands_per_iteration = 1;
944         unsigned int          m_max_data_packets_per_iteration;
945         unsigned int          m_max_packets_requeued = 256;
946 };
947
948 class ConnectionReceiveThread : public Thread {
949 public:
950         ConnectionReceiveThread(unsigned int max_packet_size);
951
952         void *run();
953
954         void setParent(Connection *parent) {
955                 assert(parent); // Pre-condition
956                 m_connection = parent;
957         }
958
959 private:
960         void receive();
961
962         // Returns next data from a buffer if possible
963         // If found, returns true; if not, false.
964         // If found, sets peer_id and dst
965         bool getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst);
966
967         bool checkIncomingBuffers(Channel *channel, u16 &peer_id,
968                                                         SharedBuffer<u8> &dst);
969
970         /*
971                 Processes a packet with the basic header stripped out.
972                 Parameters:
973                         packetdata: Data in packet (with no base headers)
974                         peer_id: peer id of the sender of the packet in question
975                         channelnum: channel on which the packet was sent
976                         reliable: true if recursing into a reliable packet
977         */
978         SharedBuffer<u8> processPacket(Channel *channel,
979                                                         SharedBuffer<u8> packetdata, u16 peer_id,
980                                                         u8 channelnum, bool reliable);
981
982
983         Connection *m_connection = nullptr;
984 };
985
986 class Connection
987 {
988 public:
989         friend class ConnectionSendThread;
990         friend class ConnectionReceiveThread;
991
992         Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6,
993                         PeerHandler *peerhandler);
994         ~Connection();
995
996         /* Interface */
997         ConnectionEvent waitEvent(u32 timeout_ms);
998         void putCommand(ConnectionCommand &c);
999
1000         void SetTimeoutMs(int timeout) { m_bc_receive_timeout = timeout; }
1001         void Serve(Address bind_addr);
1002         void Connect(Address address);
1003         bool Connected();
1004         void Disconnect();
1005         void Receive(NetworkPacket* pkt);
1006         void Send(u16 peer_id, u8 channelnum, NetworkPacket* pkt, bool reliable);
1007         u16 GetPeerID() { return m_peer_id; }
1008         Address GetPeerAddress(u16 peer_id);
1009         float getPeerStat(u16 peer_id, rtt_stat_type type);
1010         float getLocalStat(rate_stat_type type);
1011         const u32 GetProtocolID() const { return m_protocol_id; };
1012         const std::string getDesc();
1013         void DisconnectPeer(u16 peer_id);
1014
1015 protected:
1016         PeerHelper getPeer(u16 peer_id);
1017         PeerHelper getPeerNoEx(u16 peer_id);
1018         u16   lookupPeer(Address& sender);
1019
1020         u16 createPeer(Address& sender, MTProtocols protocol, int fd);
1021         UDPPeer*  createServerPeer(Address& sender);
1022         bool deletePeer(u16 peer_id, bool timeout);
1023
1024         void SetPeerID(u16 id) { m_peer_id = id; }
1025
1026         void sendAck(u16 peer_id, u8 channelnum, u16 seqnum);
1027
1028         void PrintInfo(std::ostream &out);
1029         void PrintInfo();
1030
1031         std::list<u16> getPeerIDs()
1032         {
1033                 MutexAutoLock peerlock(m_peers_mutex);
1034                 return m_peer_ids;
1035         }
1036
1037         UDPSocket m_udpSocket;
1038         MutexedQueue<ConnectionCommand> m_command_queue;
1039
1040         void putEvent(ConnectionEvent &e);
1041
1042         void TriggerSend()
1043                 { m_sendThread.Trigger(); }
1044 private:
1045         std::list<Peer*> getPeers();
1046
1047         MutexedQueue<ConnectionEvent> m_event_queue;
1048
1049         u16 m_peer_id = 0;
1050         u32 m_protocol_id;
1051
1052         std::map<u16, Peer*> m_peers;
1053         std::list<u16> m_peer_ids;
1054         std::mutex m_peers_mutex;
1055
1056         ConnectionSendThread m_sendThread;
1057         ConnectionReceiveThread m_receiveThread;
1058
1059         std::mutex m_info_mutex;
1060
1061         // Backwards compatibility
1062         PeerHandler *m_bc_peerhandler;
1063         int m_bc_receive_timeout = 0;
1064
1065         bool m_shutting_down = false;
1066
1067         u16 m_next_remote_peer_id = 2;
1068 };
1069
1070 } // namespace
1071
1072 #endif