Code modernization: subfolders (#6283)
[oweals/minetest.git] / src / network / connection.h
1 /*
2 Minetest
3 Copyright (C) 2013 celeron55, Perttu Ahola <celeron55@gmail.com>
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation; either version 2.1 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU Lesser General Public License for more details.
14
15 You should have received a copy of the GNU Lesser General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #pragma once
21
22 #include "irrlichttypes_bloated.h"
23 #include "socket.h"
24 #include "exceptions.h"
25 #include "constants.h"
26 #include "util/pointer.h"
27 #include "util/container.h"
28 #include "util/thread.h"
29 #include "util/numeric.h"
30 #include <iostream>
31 #include <fstream>
32 #include <list>
33 #include <map>
34
35 class NetworkPacket;
36
37 namespace con
38 {
39
40 /*
41         Exceptions
42 */
43 class NotFoundException : public BaseException
44 {
45 public:
46         NotFoundException(const char *s):
47                 BaseException(s)
48         {}
49 };
50
51 class PeerNotFoundException : public BaseException
52 {
53 public:
54         PeerNotFoundException(const char *s):
55                 BaseException(s)
56         {}
57 };
58
59 class ConnectionException : public BaseException
60 {
61 public:
62         ConnectionException(const char *s):
63                 BaseException(s)
64         {}
65 };
66
67 class ConnectionBindFailed : public BaseException
68 {
69 public:
70         ConnectionBindFailed(const char *s):
71                 BaseException(s)
72         {}
73 };
74
75 class InvalidIncomingDataException : public BaseException
76 {
77 public:
78         InvalidIncomingDataException(const char *s):
79                 BaseException(s)
80         {}
81 };
82
83 class InvalidOutgoingDataException : public BaseException
84 {
85 public:
86         InvalidOutgoingDataException(const char *s):
87                 BaseException(s)
88         {}
89 };
90
91 class NoIncomingDataException : public BaseException
92 {
93 public:
94         NoIncomingDataException(const char *s):
95                 BaseException(s)
96         {}
97 };
98
99 class ProcessedSilentlyException : public BaseException
100 {
101 public:
102         ProcessedSilentlyException(const char *s):
103                 BaseException(s)
104         {}
105 };
106
107 class ProcessedQueued : public BaseException
108 {
109 public:
110         ProcessedQueued(const char *s):
111                 BaseException(s)
112         {}
113 };
114
115 class IncomingDataCorruption : public BaseException
116 {
117 public:
118         IncomingDataCorruption(const char *s):
119                 BaseException(s)
120         {}
121 };
122
123 typedef enum MTProtocols {
124         MTP_PRIMARY,
125         MTP_UDP,
126         MTP_MINETEST_RELIABLE_UDP
127 } MTProtocols;
128
129 #define SEQNUM_MAX 65535
130 inline bool seqnum_higher(u16 totest, u16 base)
131 {
132         if (totest > base)
133         {
134                 if ((totest - base) > (SEQNUM_MAX/2))
135                         return false;
136
137                 return true;
138         }
139
140         if ((base - totest) > (SEQNUM_MAX/2))
141                 return true;
142
143         return false;
144 }
145
146 inline bool seqnum_in_window(u16 seqnum, u16 next,u16 window_size)
147 {
148         u16 window_start = next;
149         u16 window_end   = ( next + window_size ) % (SEQNUM_MAX+1);
150
151         if (window_start < window_end) {
152                 return ((seqnum >= window_start) && (seqnum < window_end));
153         }
154
155
156         return ((seqnum < window_end) || (seqnum >= window_start));
157 }
158
159 struct BufferedPacket
160 {
161         BufferedPacket(u8 *a_data, u32 a_size):
162                 data(a_data, a_size)
163         {}
164         BufferedPacket(u32 a_size):
165                 data(a_size)
166         {}
167         Buffer<u8> data; // Data of the packet, including headers
168         float time = 0.0f; // Seconds from buffering the packet or re-sending
169         float totaltime = 0.0f; // Seconds from buffering the packet
170         u64 absolute_send_time = -1;
171         Address address; // Sender or destination
172         unsigned int resend_count = 0;
173 };
174
175 // This adds the base headers to the data and makes a packet out of it
176 BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
177                 u32 protocol_id, u16 sender_peer_id, u8 channel);
178 BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
179                 u32 protocol_id, u16 sender_peer_id, u8 channel);
180
181 // Add the TYPE_ORIGINAL header to the data
182 SharedBuffer<u8> makeOriginalPacket(
183                 SharedBuffer<u8> data);
184
185 // Split data in chunks and add TYPE_SPLIT headers to them
186 std::list<SharedBuffer<u8> > makeSplitPacket(
187                 SharedBuffer<u8> data,
188                 u32 chunksize_max,
189                 u16 seqnum);
190
191 // Depending on size, make a TYPE_ORIGINAL or TYPE_SPLIT packet
192 // Increments split_seqnum if a split packet is made
193 std::list<SharedBuffer<u8> > makeAutoSplitPacket(
194                 SharedBuffer<u8> data,
195                 u32 chunksize_max,
196                 u16 &split_seqnum);
197
198 // Add the TYPE_RELIABLE header to the data
199 SharedBuffer<u8> makeReliablePacket(
200                 const SharedBuffer<u8> &data,
201                 u16 seqnum);
202
203 struct IncomingSplitPacket
204 {
205         IncomingSplitPacket() = default;
206
207         // Key is chunk number, value is data without headers
208         std::map<u16, SharedBuffer<u8> > chunks;
209         u32 chunk_count;
210         float time = 0.0f; // Seconds from adding
211         bool reliable = false; // If true, isn't deleted on timeout
212
213         bool allReceived()
214         {
215                 return (chunks.size() == chunk_count);
216         }
217 };
218
219 /*
220 === NOTES ===
221
222 A packet is sent through a channel to a peer with a basic header:
223 TODO: Should we have a receiver_peer_id also?
224         Header (7 bytes):
225         [0] u32 protocol_id
226         [4] u16 sender_peer_id
227         [6] u8 channel
228 sender_peer_id:
229         Unique to each peer.
230         value 0 (PEER_ID_INEXISTENT) is reserved for making new connections
231         value 1 (PEER_ID_SERVER) is reserved for server
232         these constants are defined in constants.h
233 channel:
234         The lower the number, the higher the priority is.
235         Only channels 0, 1 and 2 exist.
236 */
237 #define BASE_HEADER_SIZE 7
238 #define CHANNEL_COUNT 3
239 /*
240 Packet types:
241
242 CONTROL: This is a packet used by the protocol.
243 - When this is processed, nothing is handed to the user.
244         Header (2 byte):
245         [0] u8 type
246         [1] u8 controltype
247 controltype and data description:
248         CONTROLTYPE_ACK
249                 [2] u16 seqnum
250         CONTROLTYPE_SET_PEER_ID
251                 [2] u16 peer_id_new
252         CONTROLTYPE_PING
253         - There is no actual reply, but this can be sent in a reliable
254           packet to get a reply
255         CONTROLTYPE_DISCO
256 */
257 #define TYPE_CONTROL 0
258 #define CONTROLTYPE_ACK 0
259 #define CONTROLTYPE_SET_PEER_ID 1
260 #define CONTROLTYPE_PING 2
261 #define CONTROLTYPE_DISCO 3
262 #define CONTROLTYPE_ENABLE_BIG_SEND_WINDOW 4
263
264 /*
265 ORIGINAL: This is a plain packet with no control and no error
266 checking at all.
267 - When this is processed, it is directly handed to the user.
268         Header (1 byte):
269         [0] u8 type
270 */
271 #define TYPE_ORIGINAL 1
272 #define ORIGINAL_HEADER_SIZE 1
273 /*
274 SPLIT: These are sequences of packets forming one bigger piece of
275 data.
276 - When processed and all the packet_nums 0...packet_count-1 are
277   present (this should be buffered), the resulting data shall be
278   directly handed to the user.
279 - If the data fails to come up in a reasonable time, the buffer shall
280   be silently discarded.
281 - These can be sent as-is or atop of a RELIABLE packet stream.
282         Header (7 bytes):
283         [0] u8 type
284         [1] u16 seqnum
285         [3] u16 chunk_count
286         [5] u16 chunk_num
287 */
288 #define TYPE_SPLIT 2
289 /*
290 RELIABLE: Delivery of all RELIABLE packets shall be forced by ACKs,
291 and they shall be delivered in the same order as sent. This is done
292 with a buffer in the receiving and transmitting end.
293 - When this is processed, the contents of each packet is recursively
294   processed as packets.
295         Header (3 bytes):
296         [0] u8 type
297         [1] u16 seqnum
298
299 */
300 #define TYPE_RELIABLE 3
301 #define RELIABLE_HEADER_SIZE 3
302 #define SEQNUM_INITIAL 65500
303
304 /*
305         A buffer which stores reliable packets and sorts them internally
306         for fast access to the smallest one.
307 */
308
309 typedef std::list<BufferedPacket>::iterator RPBSearchResult;
310
311 class ReliablePacketBuffer
312 {
313 public:
314         ReliablePacketBuffer() = default;
315
316         bool getFirstSeqnum(u16& result);
317
318         BufferedPacket popFirst();
319         BufferedPacket popSeqnum(u16 seqnum);
320         void insert(BufferedPacket &p,u16 next_expected);
321
322         void incrementTimeouts(float dtime);
323         std::list<BufferedPacket> getTimedOuts(float timeout,
324                         unsigned int max_packets);
325
326         void print();
327         bool empty();
328         bool containsPacket(u16 seqnum);
329         RPBSearchResult notFound();
330         u32 size();
331
332
333 private:
334         RPBSearchResult findPacket(u16 seqnum);
335
336         std::list<BufferedPacket> m_list;
337         u32 m_list_size = 0;
338
339         u16 m_oldest_non_answered_ack;
340
341         std::mutex m_list_mutex;
342 };
343
344 /*
345         A buffer for reconstructing split packets
346 */
347
348 class IncomingSplitBuffer
349 {
350 public:
351         ~IncomingSplitBuffer();
352         /*
353                 Returns a reference counted buffer of length != 0 when a full split
354                 packet is constructed. If not, returns one of length 0.
355         */
356         SharedBuffer<u8> insert(BufferedPacket &p, bool reliable);
357
358         void removeUnreliableTimedOuts(float dtime, float timeout);
359
360 private:
361         // Key is seqnum
362         std::map<u16, IncomingSplitPacket*> m_buf;
363
364         std::mutex m_map_mutex;
365 };
366
367 struct OutgoingPacket
368 {
369         u16 peer_id;
370         u8 channelnum;
371         SharedBuffer<u8> data;
372         bool reliable;
373         bool ack;
374
375         OutgoingPacket(u16 peer_id_, u8 channelnum_, const SharedBuffer<u8> &data_,
376                         bool reliable_,bool ack_=false):
377                 peer_id(peer_id_),
378                 channelnum(channelnum_),
379                 data(data_),
380                 reliable(reliable_),
381                 ack(ack_)
382         {
383         }
384 };
385
386 enum ConnectionCommandType{
387         CONNCMD_NONE,
388         CONNCMD_SERVE,
389         CONNCMD_CONNECT,
390         CONNCMD_DISCONNECT,
391         CONNCMD_DISCONNECT_PEER,
392         CONNCMD_SEND,
393         CONNCMD_SEND_TO_ALL,
394         CONCMD_ACK,
395         CONCMD_CREATE_PEER,
396         CONCMD_DISABLE_LEGACY
397 };
398
399 struct ConnectionCommand
400 {
401         enum ConnectionCommandType type = CONNCMD_NONE;
402         Address address;
403         u16 peer_id = PEER_ID_INEXISTENT;
404         u8 channelnum;
405         Buffer<u8> data;
406         bool reliable = false;
407         bool raw = false;
408
409         ConnectionCommand() = default;
410
411         void serve(Address address_)
412         {
413                 type = CONNCMD_SERVE;
414                 address = address_;
415         }
416         void connect(Address address_)
417         {
418                 type = CONNCMD_CONNECT;
419                 address = address_;
420         }
421         void disconnect()
422         {
423                 type = CONNCMD_DISCONNECT;
424         }
425         void disconnect_peer(u16 peer_id_)
426         {
427                 type = CONNCMD_DISCONNECT_PEER;
428                 peer_id = peer_id_;
429         }
430
431         void send(u16 peer_id_, u8 channelnum_, NetworkPacket* pkt, bool reliable_);
432
433         void ack(u16 peer_id_, u8 channelnum_, const SharedBuffer<u8> &data_)
434         {
435                 type = CONCMD_ACK;
436                 peer_id = peer_id_;
437                 channelnum = channelnum_;
438                 data = data_;
439                 reliable = false;
440         }
441
442         void createPeer(u16 peer_id_, const SharedBuffer<u8> &data_)
443         {
444                 type = CONCMD_CREATE_PEER;
445                 peer_id = peer_id_;
446                 data = data_;
447                 channelnum = 0;
448                 reliable = true;
449                 raw = true;
450         }
451
452         void disableLegacy(u16 peer_id_, const SharedBuffer<u8> &data_)
453         {
454                 type = CONCMD_DISABLE_LEGACY;
455                 peer_id = peer_id_;
456                 data = data_;
457                 channelnum = 0;
458                 reliable = true;
459                 raw = true;
460         }
461 };
462
463 /* maximum window size to use, 0xFFFF is theoretical maximum  don't think about
464  * touching it, the less you're away from it the more likely data corruption
465  * will occur
466  */
467 #define MAX_RELIABLE_WINDOW_SIZE 0x8000
468         /* starting value for window size */
469 #define MIN_RELIABLE_WINDOW_SIZE 0x40
470
471 class Channel
472 {
473
474 public:
475         u16 readNextIncomingSeqNum();
476         u16 incNextIncomingSeqNum();
477
478         u16 getOutgoingSequenceNumber(bool& successfull);
479         u16 readOutgoingSequenceNumber();
480         bool putBackSequenceNumber(u16);
481
482         u16 readNextSplitSeqNum();
483         void setNextSplitSeqNum(u16 seqnum);
484
485         // This is for buffering the incoming packets that are coming in
486         // the wrong order
487         ReliablePacketBuffer incoming_reliables;
488         // This is for buffering the sent packets so that the sender can
489         // re-send them if no ACK is received
490         ReliablePacketBuffer outgoing_reliables_sent;
491
492         //queued reliable packets
493         std::queue<BufferedPacket> queued_reliables;
494
495         //queue commands prior splitting to packets
496         std::deque<ConnectionCommand> queued_commands;
497
498         IncomingSplitBuffer incoming_splits;
499
500         Channel() = default;
501         ~Channel() = default;
502
503         void UpdatePacketLossCounter(unsigned int count);
504         void UpdatePacketTooLateCounter();
505         void UpdateBytesSent(unsigned int bytes,unsigned int packages=1);
506         void UpdateBytesLost(unsigned int bytes);
507         void UpdateBytesReceived(unsigned int bytes);
508
509         void UpdateTimers(float dtime, bool legacy_peer);
510
511         const float getCurrentDownloadRateKB()
512                 { MutexAutoLock lock(m_internal_mutex); return cur_kbps; };
513         const float getMaxDownloadRateKB()
514                 { MutexAutoLock lock(m_internal_mutex); return max_kbps; };
515
516         const float getCurrentLossRateKB()
517                 { MutexAutoLock lock(m_internal_mutex); return cur_kbps_lost; };
518         const float getMaxLossRateKB()
519                 { MutexAutoLock lock(m_internal_mutex); return max_kbps_lost; };
520
521         const float getCurrentIncomingRateKB()
522                 { MutexAutoLock lock(m_internal_mutex); return cur_incoming_kbps; };
523         const float getMaxIncomingRateKB()
524                 { MutexAutoLock lock(m_internal_mutex); return max_incoming_kbps; };
525
526         const float getAvgDownloadRateKB()
527                 { MutexAutoLock lock(m_internal_mutex); return avg_kbps; };
528         const float getAvgLossRateKB()
529                 { MutexAutoLock lock(m_internal_mutex); return avg_kbps_lost; };
530         const float getAvgIncomingRateKB()
531                 { MutexAutoLock lock(m_internal_mutex); return avg_incoming_kbps; };
532
533         const unsigned int getWindowSize() const { return window_size; };
534
535         void setWindowSize(unsigned int size) { window_size = size; };
536 private:
537         std::mutex m_internal_mutex;
538         int window_size = MIN_RELIABLE_WINDOW_SIZE;
539
540         u16 next_incoming_seqnum = SEQNUM_INITIAL;
541
542         u16 next_outgoing_seqnum = SEQNUM_INITIAL;
543         u16 next_outgoing_split_seqnum = SEQNUM_INITIAL;
544
545         unsigned int current_packet_loss = 0;
546         unsigned int current_packet_too_late = 0;
547         unsigned int current_packet_successfull = 0;
548         float packet_loss_counter = 0.0f;
549
550         unsigned int current_bytes_transfered = 0;
551         unsigned int current_bytes_received = 0;
552         unsigned int current_bytes_lost = 0;
553         float max_kbps = 0.0f;
554         float cur_kbps = 0.0f;
555         float avg_kbps = 0.0f;
556         float max_incoming_kbps = 0.0f;
557         float cur_incoming_kbps = 0.0f;
558         float avg_incoming_kbps = 0.0f;
559         float max_kbps_lost = 0.0f;
560         float cur_kbps_lost = 0.0f;
561         float avg_kbps_lost = 0.0f;
562         float bpm_counter = 0.0f;
563
564         unsigned int rate_samples = 0;
565 };
566
567 class Peer;
568
569 enum PeerChangeType
570 {
571         PEER_ADDED,
572         PEER_REMOVED
573 };
574 struct PeerChange
575 {
576         PeerChange(PeerChangeType t, u16 _peer_id, bool _timeout):
577                 type(t), peer_id(_peer_id), timeout(_timeout) {}
578         PeerChange() = delete;
579
580         PeerChangeType type;
581         u16 peer_id;
582         bool timeout;
583 };
584
585 class PeerHandler
586 {
587 public:
588
589         PeerHandler() = default;
590         virtual ~PeerHandler() = default;
591
592         /*
593                 This is called after the Peer has been inserted into the
594                 Connection's peer container.
595         */
596         virtual void peerAdded(Peer *peer) = 0;
597         /*
598                 This is called before the Peer has been removed from the
599                 Connection's peer container.
600         */
601         virtual void deletingPeer(Peer *peer, bool timeout) = 0;
602 };
603
604 class PeerHelper
605 {
606 public:
607         PeerHelper() = default;;
608         PeerHelper(Peer* peer);
609         ~PeerHelper();
610
611         PeerHelper&   operator=(Peer* peer);
612         Peer*         operator->() const;
613         bool          operator!();
614         Peer*         operator&() const;
615         bool          operator!=(void* ptr);
616
617 private:
618         Peer *m_peer = nullptr;
619 };
620
621 class Connection;
622
623 typedef enum {
624         MIN_RTT,
625         MAX_RTT,
626         AVG_RTT,
627         MIN_JITTER,
628         MAX_JITTER,
629         AVG_JITTER
630 } rtt_stat_type;
631
632 typedef enum {
633         CUR_DL_RATE,
634         AVG_DL_RATE,
635         CUR_INC_RATE,
636         AVG_INC_RATE,
637         CUR_LOSS_RATE,
638         AVG_LOSS_RATE,
639 } rate_stat_type;
640
641 class Peer {
642         public:
643                 friend class PeerHelper;
644
645                 Peer(Address address_,u16 id_,Connection* connection) :
646                         id(id_),
647                         m_connection(connection),
648                         address(address_),
649                         m_last_timeout_check(porting::getTimeMs())
650                 {
651                 };
652
653                 virtual ~Peer() {
654                         MutexAutoLock usage_lock(m_exclusive_access_mutex);
655                         FATAL_ERROR_IF(m_usage != 0, "Reference counting failure");
656                 };
657
658                 // Unique id of the peer
659                 u16 id;
660
661                 void Drop();
662
663                 virtual void PutReliableSendCommand(ConnectionCommand &c,
664                                                 unsigned int max_packet_size) {};
665
666                 virtual bool getAddress(MTProtocols type, Address& toset) = 0;
667
668                 bool isPendingDeletion()
669                 { MutexAutoLock lock(m_exclusive_access_mutex); return m_pending_deletion; };
670
671                 void ResetTimeout()
672                         {MutexAutoLock lock(m_exclusive_access_mutex); m_timeout_counter = 0.0; };
673
674                 bool isTimedOut(float timeout);
675
676                 unsigned int m_increment_packets_remaining = 9;
677                 unsigned int m_increment_bytes_remaining = 0;
678
679                 virtual u16 getNextSplitSequenceNumber(u8 channel) { return 0; };
680                 virtual void setNextSplitSequenceNumber(u8 channel, u16 seqnum) {};
681                 virtual SharedBuffer<u8> addSpiltPacket(u8 channel,
682                                                                                                 BufferedPacket toadd,
683                                                                                                 bool reliable)
684                                 {
685                                         fprintf(stderr,"Peer: addSplitPacket called, this is supposed to be never called!\n");
686                                         return SharedBuffer<u8>(0);
687                                 };
688
689                 virtual bool Ping(float dtime, SharedBuffer<u8>& data) { return false; };
690
691                 virtual float getStat(rtt_stat_type type) const {
692                         switch (type) {
693                                 case MIN_RTT:
694                                         return m_rtt.min_rtt;
695                                 case MAX_RTT:
696                                         return m_rtt.max_rtt;
697                                 case AVG_RTT:
698                                         return m_rtt.avg_rtt;
699                                 case MIN_JITTER:
700                                         return m_rtt.jitter_min;
701                                 case MAX_JITTER:
702                                         return m_rtt.jitter_max;
703                                 case AVG_JITTER:
704                                         return m_rtt.jitter_avg;
705                         }
706                         return -1;
707                 }
708         protected:
709                 virtual void reportRTT(float rtt) {};
710
711                 void RTTStatistics(float rtt,
712                                                         const std::string &profiler_id = "",
713                                                         unsigned int num_samples = 1000);
714
715                 bool IncUseCount();
716                 void DecUseCount();
717
718                 std::mutex m_exclusive_access_mutex;
719
720                 bool m_pending_deletion = false;
721
722                 Connection* m_connection;
723
724                 // Address of the peer
725                 Address address;
726
727                 // Ping timer
728                 float m_ping_timer = 0.0f;
729         private:
730
731                 struct rttstats {
732                         float jitter_min = FLT_MAX;
733                         float jitter_max = 0.0f;
734                         float jitter_avg = -1.0f;
735                         float min_rtt = FLT_MAX;
736                         float max_rtt = 0.0f;
737                         float avg_rtt = -1.0f;
738
739                         rttstats() = default;
740                 };
741
742                 rttstats m_rtt;
743                 float m_last_rtt = -1.0f;
744
745                 // current usage count
746                 unsigned int m_usage = 0;
747
748                 // Seconds from last receive
749                 float m_timeout_counter = 0.0f;
750
751                 u64 m_last_timeout_check;
752 };
753
754 class UDPPeer : public Peer
755 {
756 public:
757
758         friend class PeerHelper;
759         friend class ConnectionReceiveThread;
760         friend class ConnectionSendThread;
761         friend class Connection;
762
763         UDPPeer(u16 a_id, Address a_address, Connection* connection);
764         virtual ~UDPPeer() = default;
765
766         void PutReliableSendCommand(ConnectionCommand &c,
767                                                         unsigned int max_packet_size);
768
769         bool getAddress(MTProtocols type, Address& toset);
770
771         void setNonLegacyPeer();
772
773         bool getLegacyPeer()
774         { return m_legacy_peer; }
775
776         u16 getNextSplitSequenceNumber(u8 channel);
777         void setNextSplitSequenceNumber(u8 channel, u16 seqnum);
778
779         SharedBuffer<u8> addSpiltPacket(u8 channel,
780                                                                         BufferedPacket toadd,
781                                                                         bool reliable);
782
783
784 protected:
785         /*
786                 Calculates avg_rtt and resend_timeout.
787                 rtt=-1 only recalculates resend_timeout
788         */
789         void reportRTT(float rtt);
790
791         void RunCommandQueues(
792                                         unsigned int max_packet_size,
793                                         unsigned int maxcommands,
794                                         unsigned int maxtransfer);
795
796         float getResendTimeout()
797                 { MutexAutoLock lock(m_exclusive_access_mutex); return resend_timeout; }
798
799         void setResendTimeout(float timeout)
800                 { MutexAutoLock lock(m_exclusive_access_mutex); resend_timeout = timeout; }
801         bool Ping(float dtime,SharedBuffer<u8>& data);
802
803         Channel channels[CHANNEL_COUNT];
804         bool m_pending_disconnect = false;
805 private:
806         // This is changed dynamically
807         float resend_timeout = 0.5;
808
809         bool processReliableSendCommand(
810                                         ConnectionCommand &c,
811                                         unsigned int max_packet_size);
812
813         bool m_legacy_peer = true;
814 };
815
816 /*
817         Connection
818 */
819
820 enum ConnectionEventType{
821         CONNEVENT_NONE,
822         CONNEVENT_DATA_RECEIVED,
823         CONNEVENT_PEER_ADDED,
824         CONNEVENT_PEER_REMOVED,
825         CONNEVENT_BIND_FAILED,
826 };
827
828 struct ConnectionEvent
829 {
830         enum ConnectionEventType type = CONNEVENT_NONE;
831         u16 peer_id = 0;
832         Buffer<u8> data;
833         bool timeout = false;
834         Address address;
835
836         ConnectionEvent() = default;
837
838         std::string describe()
839         {
840                 switch(type) {
841                 case CONNEVENT_NONE:
842                         return "CONNEVENT_NONE";
843                 case CONNEVENT_DATA_RECEIVED:
844                         return "CONNEVENT_DATA_RECEIVED";
845                 case CONNEVENT_PEER_ADDED:
846                         return "CONNEVENT_PEER_ADDED";
847                 case CONNEVENT_PEER_REMOVED:
848                         return "CONNEVENT_PEER_REMOVED";
849                 case CONNEVENT_BIND_FAILED:
850                         return "CONNEVENT_BIND_FAILED";
851                 }
852                 return "Invalid ConnectionEvent";
853         }
854
855         void dataReceived(u16 peer_id_, const SharedBuffer<u8> &data_)
856         {
857                 type = CONNEVENT_DATA_RECEIVED;
858                 peer_id = peer_id_;
859                 data = data_;
860         }
861         void peerAdded(u16 peer_id_, Address address_)
862         {
863                 type = CONNEVENT_PEER_ADDED;
864                 peer_id = peer_id_;
865                 address = address_;
866         }
867         void peerRemoved(u16 peer_id_, bool timeout_, Address address_)
868         {
869                 type = CONNEVENT_PEER_REMOVED;
870                 peer_id = peer_id_;
871                 timeout = timeout_;
872                 address = address_;
873         }
874         void bindFailed()
875         {
876                 type = CONNEVENT_BIND_FAILED;
877         }
878 };
879
880 class ConnectionSendThread : public Thread {
881
882 public:
883         friend class UDPPeer;
884
885         ConnectionSendThread(unsigned int max_packet_size, float timeout);
886
887         void *run();
888
889         void Trigger();
890
891         void setParent(Connection* parent) {
892                 assert(parent != NULL); // Pre-condition
893                 m_connection = parent;
894         }
895
896         void setPeerTimeout(float peer_timeout)
897                 { m_timeout = peer_timeout; }
898
899 private:
900         void runTimeouts    (float dtime);
901         void rawSend        (const BufferedPacket &packet);
902         bool rawSendAsPacket(u16 peer_id, u8 channelnum,
903                                                         SharedBuffer<u8> data, bool reliable);
904
905         void processReliableCommand (ConnectionCommand &c);
906         void processNonReliableCommand (ConnectionCommand &c);
907         void serve          (Address bind_address);
908         void connect        (Address address);
909         void disconnect     ();
910         void disconnect_peer(u16 peer_id);
911         void send           (u16 peer_id, u8 channelnum,
912                                                         SharedBuffer<u8> data);
913         void sendReliable   (ConnectionCommand &c);
914         void sendToAll      (u8 channelnum,
915                                                         SharedBuffer<u8> data);
916         void sendToAllReliable(ConnectionCommand &c);
917
918         void sendPackets    (float dtime);
919
920         void sendAsPacket   (u16 peer_id, u8 channelnum,
921                                                         SharedBuffer<u8> data,bool ack=false);
922
923         void sendAsPacketReliable(BufferedPacket& p, Channel* channel);
924
925         bool packetsQueued();
926
927         Connection           *m_connection = nullptr;
928         unsigned int          m_max_packet_size;
929         float                 m_timeout;
930         std::queue<OutgoingPacket> m_outgoing_queue;
931         Semaphore             m_send_sleep_semaphore;
932
933         unsigned int          m_iteration_packets_avaialble;
934         unsigned int          m_max_commands_per_iteration = 1;
935         unsigned int          m_max_data_packets_per_iteration;
936         unsigned int          m_max_packets_requeued = 256;
937 };
938
939 class ConnectionReceiveThread : public Thread {
940 public:
941         ConnectionReceiveThread(unsigned int max_packet_size);
942
943         void *run();
944
945         void setParent(Connection *parent) {
946                 assert(parent); // Pre-condition
947                 m_connection = parent;
948         }
949
950 private:
951         void receive();
952
953         // Returns next data from a buffer if possible
954         // If found, returns true; if not, false.
955         // If found, sets peer_id and dst
956         bool getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst);
957
958         bool checkIncomingBuffers(Channel *channel, u16 &peer_id,
959                                                         SharedBuffer<u8> &dst);
960
961         /*
962                 Processes a packet with the basic header stripped out.
963                 Parameters:
964                         packetdata: Data in packet (with no base headers)
965                         peer_id: peer id of the sender of the packet in question
966                         channelnum: channel on which the packet was sent
967                         reliable: true if recursing into a reliable packet
968         */
969         SharedBuffer<u8> processPacket(Channel *channel,
970                                                         SharedBuffer<u8> packetdata, u16 peer_id,
971                                                         u8 channelnum, bool reliable);
972
973
974         Connection *m_connection = nullptr;
975 };
976
977 class Connection
978 {
979 public:
980         friend class ConnectionSendThread;
981         friend class ConnectionReceiveThread;
982
983         Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6,
984                         PeerHandler *peerhandler);
985         ~Connection();
986
987         /* Interface */
988         ConnectionEvent waitEvent(u32 timeout_ms);
989         void putCommand(ConnectionCommand &c);
990
991         void SetTimeoutMs(int timeout) { m_bc_receive_timeout = timeout; }
992         void Serve(Address bind_addr);
993         void Connect(Address address);
994         bool Connected();
995         void Disconnect();
996         void Receive(NetworkPacket* pkt);
997         void Send(u16 peer_id, u8 channelnum, NetworkPacket* pkt, bool reliable);
998         u16 GetPeerID() { return m_peer_id; }
999         Address GetPeerAddress(u16 peer_id);
1000         float getPeerStat(u16 peer_id, rtt_stat_type type);
1001         float getLocalStat(rate_stat_type type);
1002         const u32 GetProtocolID() const { return m_protocol_id; };
1003         const std::string getDesc();
1004         void DisconnectPeer(u16 peer_id);
1005
1006 protected:
1007         PeerHelper getPeer(u16 peer_id);
1008         PeerHelper getPeerNoEx(u16 peer_id);
1009         u16   lookupPeer(Address& sender);
1010
1011         u16 createPeer(Address& sender, MTProtocols protocol, int fd);
1012         UDPPeer*  createServerPeer(Address& sender);
1013         bool deletePeer(u16 peer_id, bool timeout);
1014
1015         void SetPeerID(u16 id) { m_peer_id = id; }
1016
1017         void sendAck(u16 peer_id, u8 channelnum, u16 seqnum);
1018
1019         void PrintInfo(std::ostream &out);
1020         void PrintInfo();
1021
1022         std::list<u16> getPeerIDs()
1023         {
1024                 MutexAutoLock peerlock(m_peers_mutex);
1025                 return m_peer_ids;
1026         }
1027
1028         UDPSocket m_udpSocket;
1029         MutexedQueue<ConnectionCommand> m_command_queue;
1030
1031         void putEvent(ConnectionEvent &e);
1032
1033         void TriggerSend()
1034                 { m_sendThread.Trigger(); }
1035 private:
1036         std::list<Peer*> getPeers();
1037
1038         MutexedQueue<ConnectionEvent> m_event_queue;
1039
1040         u16 m_peer_id = 0;
1041         u32 m_protocol_id;
1042
1043         std::map<u16, Peer*> m_peers;
1044         std::list<u16> m_peer_ids;
1045         std::mutex m_peers_mutex;
1046
1047         ConnectionSendThread m_sendThread;
1048         ConnectionReceiveThread m_receiveThread;
1049
1050         std::mutex m_info_mutex;
1051
1052         // Backwards compatibility
1053         PeerHandler *m_bc_peerhandler;
1054         int m_bc_receive_timeout = 0;
1055
1056         bool m_shutting_down = false;
1057
1058         u16 m_next_remote_peer_id = 2;
1059 };
1060
1061 } // namespace