Modernize client code (#6250)
[oweals/minetest.git] / src / network / connection.h
1 /*
2 Minetest
3 Copyright (C) 2013 celeron55, Perttu Ahola <celeron55@gmail.com>
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation; either version 2.1 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU Lesser General Public License for more details.
14
15 You should have received a copy of the GNU Lesser General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #ifndef CONNECTION_HEADER
21 #define CONNECTION_HEADER
22
23 #include "irrlichttypes_bloated.h"
24 #include "socket.h"
25 #include "exceptions.h"
26 #include "constants.h"
27 #include "network/networkpacket.h"
28 #include "util/pointer.h"
29 #include "util/container.h"
30 #include "util/thread.h"
31 #include "util/numeric.h"
32 #include <iostream>
33 #include <fstream>
34 #include <list>
35 #include <map>
36
37 class NetworkPacket;
38
39 namespace con
40 {
41
42 /*
43         Exceptions
44 */
45 class NotFoundException : public BaseException
46 {
47 public:
48         NotFoundException(const char *s):
49                 BaseException(s)
50         {}
51 };
52
53 class PeerNotFoundException : public BaseException
54 {
55 public:
56         PeerNotFoundException(const char *s):
57                 BaseException(s)
58         {}
59 };
60
61 class ConnectionException : public BaseException
62 {
63 public:
64         ConnectionException(const char *s):
65                 BaseException(s)
66         {}
67 };
68
69 class ConnectionBindFailed : public BaseException
70 {
71 public:
72         ConnectionBindFailed(const char *s):
73                 BaseException(s)
74         {}
75 };
76
77 class InvalidIncomingDataException : public BaseException
78 {
79 public:
80         InvalidIncomingDataException(const char *s):
81                 BaseException(s)
82         {}
83 };
84
85 class InvalidOutgoingDataException : public BaseException
86 {
87 public:
88         InvalidOutgoingDataException(const char *s):
89                 BaseException(s)
90         {}
91 };
92
93 class NoIncomingDataException : public BaseException
94 {
95 public:
96         NoIncomingDataException(const char *s):
97                 BaseException(s)
98         {}
99 };
100
101 class ProcessedSilentlyException : public BaseException
102 {
103 public:
104         ProcessedSilentlyException(const char *s):
105                 BaseException(s)
106         {}
107 };
108
109 class ProcessedQueued : public BaseException
110 {
111 public:
112         ProcessedQueued(const char *s):
113                 BaseException(s)
114         {}
115 };
116
117 class IncomingDataCorruption : public BaseException
118 {
119 public:
120         IncomingDataCorruption(const char *s):
121                 BaseException(s)
122         {}
123 };
124
125 typedef enum MTProtocols {
126         MTP_PRIMARY,
127         MTP_UDP,
128         MTP_MINETEST_RELIABLE_UDP
129 } MTProtocols;
130
131 #define SEQNUM_MAX 65535
132 inline bool seqnum_higher(u16 totest, u16 base)
133 {
134         if (totest > base)
135         {
136                 if ((totest - base) > (SEQNUM_MAX/2))
137                         return false;
138                 else
139                         return true;
140         }
141         else
142         {
143                 if ((base - totest) > (SEQNUM_MAX/2))
144                         return true;
145                 else
146                         return false;
147         }
148 }
149
150 inline bool seqnum_in_window(u16 seqnum, u16 next,u16 window_size)
151 {
152         u16 window_start = next;
153         u16 window_end   = ( next + window_size ) % (SEQNUM_MAX+1);
154
155         if (window_start < window_end)
156         {
157                 return ((seqnum >= window_start) && (seqnum < window_end));
158         }
159         else
160         {
161                 return ((seqnum < window_end) || (seqnum >= window_start));
162         }
163 }
164
165 struct BufferedPacket
166 {
167         BufferedPacket(u8 *a_data, u32 a_size):
168                 data(a_data, a_size)
169         {}
170         BufferedPacket(u32 a_size):
171                 data(a_size), time(0.0), totaltime(0.0), absolute_send_time(-1),
172                 resend_count(0)
173         {}
174         Buffer<u8> data; // Data of the packet, including headers
175         float time = 0.0f; // Seconds from buffering the packet or re-sending
176         float totaltime = 0.0f; // Seconds from buffering the packet
177         u64 absolute_send_time = -1;
178         Address address; // Sender or destination
179         unsigned int resend_count = 0;
180 };
181
182 // This adds the base headers to the data and makes a packet out of it
183 BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
184                 u32 protocol_id, u16 sender_peer_id, u8 channel);
185 BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
186                 u32 protocol_id, u16 sender_peer_id, u8 channel);
187
188 // Add the TYPE_ORIGINAL header to the data
189 SharedBuffer<u8> makeOriginalPacket(
190                 SharedBuffer<u8> data);
191
192 // Split data in chunks and add TYPE_SPLIT headers to them
193 std::list<SharedBuffer<u8> > makeSplitPacket(
194                 SharedBuffer<u8> data,
195                 u32 chunksize_max,
196                 u16 seqnum);
197
198 // Depending on size, make a TYPE_ORIGINAL or TYPE_SPLIT packet
199 // Increments split_seqnum if a split packet is made
200 std::list<SharedBuffer<u8> > makeAutoSplitPacket(
201                 SharedBuffer<u8> data,
202                 u32 chunksize_max,
203                 u16 &split_seqnum);
204
205 // Add the TYPE_RELIABLE header to the data
206 SharedBuffer<u8> makeReliablePacket(
207                 SharedBuffer<u8> data,
208                 u16 seqnum);
209
210 struct IncomingSplitPacket
211 {
212         IncomingSplitPacket() {}
213         // Key is chunk number, value is data without headers
214         std::map<u16, SharedBuffer<u8> > chunks;
215         u32 chunk_count;
216         float time = 0.0f; // Seconds from adding
217         bool reliable = false; // If true, isn't deleted on timeout
218
219         bool allReceived()
220         {
221                 return (chunks.size() == chunk_count);
222         }
223 };
224
225 /*
226 === NOTES ===
227
228 A packet is sent through a channel to a peer with a basic header:
229 TODO: Should we have a receiver_peer_id also?
230         Header (7 bytes):
231         [0] u32 protocol_id
232         [4] u16 sender_peer_id
233         [6] u8 channel
234 sender_peer_id:
235         Unique to each peer.
236         value 0 (PEER_ID_INEXISTENT) is reserved for making new connections
237         value 1 (PEER_ID_SERVER) is reserved for server
238         these constants are defined in constants.h
239 channel:
240         The lower the number, the higher the priority is.
241         Only channels 0, 1 and 2 exist.
242 */
243 #define BASE_HEADER_SIZE 7
244 #define CHANNEL_COUNT 3
245 /*
246 Packet types:
247
248 CONTROL: This is a packet used by the protocol.
249 - When this is processed, nothing is handed to the user.
250         Header (2 byte):
251         [0] u8 type
252         [1] u8 controltype
253 controltype and data description:
254         CONTROLTYPE_ACK
255                 [2] u16 seqnum
256         CONTROLTYPE_SET_PEER_ID
257                 [2] u16 peer_id_new
258         CONTROLTYPE_PING
259         - There is no actual reply, but this can be sent in a reliable
260           packet to get a reply
261         CONTROLTYPE_DISCO
262 */
263 #define TYPE_CONTROL 0
264 #define CONTROLTYPE_ACK 0
265 #define CONTROLTYPE_SET_PEER_ID 1
266 #define CONTROLTYPE_PING 2
267 #define CONTROLTYPE_DISCO 3
268 #define CONTROLTYPE_ENABLE_BIG_SEND_WINDOW 4
269
270 /*
271 ORIGINAL: This is a plain packet with no control and no error
272 checking at all.
273 - When this is processed, it is directly handed to the user.
274         Header (1 byte):
275         [0] u8 type
276 */
277 #define TYPE_ORIGINAL 1
278 #define ORIGINAL_HEADER_SIZE 1
279 /*
280 SPLIT: These are sequences of packets forming one bigger piece of
281 data.
282 - When processed and all the packet_nums 0...packet_count-1 are
283   present (this should be buffered), the resulting data shall be
284   directly handed to the user.
285 - If the data fails to come up in a reasonable time, the buffer shall
286   be silently discarded.
287 - These can be sent as-is or atop of a RELIABLE packet stream.
288         Header (7 bytes):
289         [0] u8 type
290         [1] u16 seqnum
291         [3] u16 chunk_count
292         [5] u16 chunk_num
293 */
294 #define TYPE_SPLIT 2
295 /*
296 RELIABLE: Delivery of all RELIABLE packets shall be forced by ACKs,
297 and they shall be delivered in the same order as sent. This is done
298 with a buffer in the receiving and transmitting end.
299 - When this is processed, the contents of each packet is recursively
300   processed as packets.
301         Header (3 bytes):
302         [0] u8 type
303         [1] u16 seqnum
304
305 */
306 #define TYPE_RELIABLE 3
307 #define RELIABLE_HEADER_SIZE 3
308 #define SEQNUM_INITIAL 65500
309
310 /*
311         A buffer which stores reliable packets and sorts them internally
312         for fast access to the smallest one.
313 */
314
315 typedef std::list<BufferedPacket>::iterator RPBSearchResult;
316
317 class ReliablePacketBuffer
318 {
319 public:
320         ReliablePacketBuffer() {};
321
322         bool getFirstSeqnum(u16& result);
323
324         BufferedPacket popFirst();
325         BufferedPacket popSeqnum(u16 seqnum);
326         void insert(BufferedPacket &p,u16 next_expected);
327
328         void incrementTimeouts(float dtime);
329         std::list<BufferedPacket> getTimedOuts(float timeout,
330                         unsigned int max_packets);
331
332         void print();
333         bool empty();
334         bool containsPacket(u16 seqnum);
335         RPBSearchResult notFound();
336         u32 size();
337
338
339 private:
340         RPBSearchResult findPacket(u16 seqnum);
341
342         std::list<BufferedPacket> m_list;
343         u32 m_list_size = 0;
344
345         u16 m_oldest_non_answered_ack;
346
347         std::mutex m_list_mutex;
348 };
349
350 /*
351         A buffer for reconstructing split packets
352 */
353
354 class IncomingSplitBuffer
355 {
356 public:
357         ~IncomingSplitBuffer();
358         /*
359                 Returns a reference counted buffer of length != 0 when a full split
360                 packet is constructed. If not, returns one of length 0.
361         */
362         SharedBuffer<u8> insert(BufferedPacket &p, bool reliable);
363
364         void removeUnreliableTimedOuts(float dtime, float timeout);
365
366 private:
367         // Key is seqnum
368         std::map<u16, IncomingSplitPacket*> m_buf;
369
370         std::mutex m_map_mutex;
371 };
372
373 struct OutgoingPacket
374 {
375         u16 peer_id;
376         u8 channelnum;
377         SharedBuffer<u8> data;
378         bool reliable;
379         bool ack;
380
381         OutgoingPacket(u16 peer_id_, u8 channelnum_, const SharedBuffer<u8> &data_,
382                         bool reliable_,bool ack_=false):
383                 peer_id(peer_id_),
384                 channelnum(channelnum_),
385                 data(data_),
386                 reliable(reliable_),
387                 ack(ack_)
388         {
389         }
390 };
391
392 enum ConnectionCommandType{
393         CONNCMD_NONE,
394         CONNCMD_SERVE,
395         CONNCMD_CONNECT,
396         CONNCMD_DISCONNECT,
397         CONNCMD_DISCONNECT_PEER,
398         CONNCMD_SEND,
399         CONNCMD_SEND_TO_ALL,
400         CONCMD_ACK,
401         CONCMD_CREATE_PEER,
402         CONCMD_DISABLE_LEGACY
403 };
404
405 struct ConnectionCommand
406 {
407         enum ConnectionCommandType type = CONNCMD_NONE;
408         Address address;
409         u16 peer_id = PEER_ID_INEXISTENT;
410         u8 channelnum;
411         Buffer<u8> data;
412         bool reliable = false;
413         bool raw = false;
414
415         ConnectionCommand() {}
416
417         void serve(Address address_)
418         {
419                 type = CONNCMD_SERVE;
420                 address = address_;
421         }
422         void connect(Address address_)
423         {
424                 type = CONNCMD_CONNECT;
425                 address = address_;
426         }
427         void disconnect()
428         {
429                 type = CONNCMD_DISCONNECT;
430         }
431         void disconnect_peer(u16 peer_id_)
432         {
433                 type = CONNCMD_DISCONNECT_PEER;
434                 peer_id = peer_id_;
435         }
436         void send(u16 peer_id_, u8 channelnum_,
437                         NetworkPacket* pkt, bool reliable_)
438         {
439                 type = CONNCMD_SEND;
440                 peer_id = peer_id_;
441                 channelnum = channelnum_;
442                 data = pkt->oldForgePacket();
443                 reliable = reliable_;
444         }
445
446         void ack(u16 peer_id_, u8 channelnum_, const SharedBuffer<u8> &data_)
447         {
448                 type = CONCMD_ACK;
449                 peer_id = peer_id_;
450                 channelnum = channelnum_;
451                 data = data_;
452                 reliable = false;
453         }
454
455         void createPeer(u16 peer_id_, const SharedBuffer<u8> &data_)
456         {
457                 type = CONCMD_CREATE_PEER;
458                 peer_id = peer_id_;
459                 data = data_;
460                 channelnum = 0;
461                 reliable = true;
462                 raw = true;
463         }
464
465         void disableLegacy(u16 peer_id_, const SharedBuffer<u8> &data_)
466         {
467                 type = CONCMD_DISABLE_LEGACY;
468                 peer_id = peer_id_;
469                 data = data_;
470                 channelnum = 0;
471                 reliable = true;
472                 raw = true;
473         }
474 };
475
476 /* maximum window size to use, 0xFFFF is theoretical maximum  don't think about
477  * touching it, the less you're away from it the more likely data corruption
478  * will occur
479  */
480 #define MAX_RELIABLE_WINDOW_SIZE 0x8000
481         /* starting value for window size */
482 #define MIN_RELIABLE_WINDOW_SIZE 0x40
483
484 class Channel
485 {
486
487 public:
488         u16 readNextIncomingSeqNum();
489         u16 incNextIncomingSeqNum();
490
491         u16 getOutgoingSequenceNumber(bool& successfull);
492         u16 readOutgoingSequenceNumber();
493         bool putBackSequenceNumber(u16);
494
495         u16 readNextSplitSeqNum();
496         void setNextSplitSeqNum(u16 seqnum);
497
498         // This is for buffering the incoming packets that are coming in
499         // the wrong order
500         ReliablePacketBuffer incoming_reliables;
501         // This is for buffering the sent packets so that the sender can
502         // re-send them if no ACK is received
503         ReliablePacketBuffer outgoing_reliables_sent;
504
505         //queued reliable packets
506         std::queue<BufferedPacket> queued_reliables;
507
508         //queue commands prior splitting to packets
509         std::deque<ConnectionCommand> queued_commands;
510
511         IncomingSplitBuffer incoming_splits;
512
513         Channel() {};
514         ~Channel() {};
515
516         void UpdatePacketLossCounter(unsigned int count);
517         void UpdatePacketTooLateCounter();
518         void UpdateBytesSent(unsigned int bytes,unsigned int packages=1);
519         void UpdateBytesLost(unsigned int bytes);
520         void UpdateBytesReceived(unsigned int bytes);
521
522         void UpdateTimers(float dtime, bool legacy_peer);
523
524         const float getCurrentDownloadRateKB()
525                 { MutexAutoLock lock(m_internal_mutex); return cur_kbps; };
526         const float getMaxDownloadRateKB()
527                 { MutexAutoLock lock(m_internal_mutex); return max_kbps; };
528
529         const float getCurrentLossRateKB()
530                 { MutexAutoLock lock(m_internal_mutex); return cur_kbps_lost; };
531         const float getMaxLossRateKB()
532                 { MutexAutoLock lock(m_internal_mutex); return max_kbps_lost; };
533
534         const float getCurrentIncomingRateKB()
535                 { MutexAutoLock lock(m_internal_mutex); return cur_incoming_kbps; };
536         const float getMaxIncomingRateKB()
537                 { MutexAutoLock lock(m_internal_mutex); return max_incoming_kbps; };
538
539         const float getAvgDownloadRateKB()
540                 { MutexAutoLock lock(m_internal_mutex); return avg_kbps; };
541         const float getAvgLossRateKB()
542                 { MutexAutoLock lock(m_internal_mutex); return avg_kbps_lost; };
543         const float getAvgIncomingRateKB()
544                 { MutexAutoLock lock(m_internal_mutex); return avg_incoming_kbps; };
545
546         const unsigned int getWindowSize() const { return window_size; };
547
548         void setWindowSize(unsigned int size) { window_size = size; };
549 private:
550         std::mutex m_internal_mutex;
551         int window_size = MIN_RELIABLE_WINDOW_SIZE;
552
553         u16 next_incoming_seqnum = SEQNUM_INITIAL;
554
555         u16 next_outgoing_seqnum = SEQNUM_INITIAL;
556         u16 next_outgoing_split_seqnum = SEQNUM_INITIAL;
557
558         unsigned int current_packet_loss = 0;
559         unsigned int current_packet_too_late = 0;
560         unsigned int current_packet_successfull = 0;
561         float packet_loss_counter = 0.0f;
562
563         unsigned int current_bytes_transfered = 0;
564         unsigned int current_bytes_received = 0;
565         unsigned int current_bytes_lost = 0;
566         float max_kbps = 0.0f;
567         float cur_kbps = 0.0f;
568         float avg_kbps = 0.0f;
569         float max_incoming_kbps = 0.0f;
570         float cur_incoming_kbps = 0.0f;
571         float avg_incoming_kbps = 0.0f;
572         float max_kbps_lost = 0.0f;
573         float cur_kbps_lost = 0.0f;
574         float avg_kbps_lost = 0.0f;
575         float bpm_counter = 0.0f;
576
577         unsigned int rate_samples = 0;
578 };
579
580 class Peer;
581
582 enum PeerChangeType
583 {
584         PEER_ADDED,
585         PEER_REMOVED
586 };
587 struct PeerChange
588 {
589         PeerChange(PeerChangeType t, u16 _peer_id, bool _timeout):
590                 type(t), peer_id(_peer_id), timeout(_timeout) {}
591         PeerChange() = delete;
592
593         PeerChangeType type;
594         u16 peer_id;
595         bool timeout;
596 };
597
598 class PeerHandler
599 {
600 public:
601
602         PeerHandler()
603         {
604         }
605         virtual ~PeerHandler()
606         {
607         }
608
609         /*
610                 This is called after the Peer has been inserted into the
611                 Connection's peer container.
612         */
613         virtual void peerAdded(Peer *peer) = 0;
614         /*
615                 This is called before the Peer has been removed from the
616                 Connection's peer container.
617         */
618         virtual void deletingPeer(Peer *peer, bool timeout) = 0;
619 };
620
621 class PeerHelper
622 {
623 public:
624         PeerHelper() {};
625         PeerHelper(Peer* peer);
626         ~PeerHelper();
627
628         PeerHelper&   operator=(Peer* peer);
629         Peer*         operator->() const;
630         bool          operator!();
631         Peer*         operator&() const;
632         bool          operator!=(void* ptr);
633
634 private:
635         Peer *m_peer = nullptr;
636 };
637
638 class Connection;
639
640 typedef enum {
641         MIN_RTT,
642         MAX_RTT,
643         AVG_RTT,
644         MIN_JITTER,
645         MAX_JITTER,
646         AVG_JITTER
647 } rtt_stat_type;
648
649 typedef enum {
650         CUR_DL_RATE,
651         AVG_DL_RATE,
652         CUR_INC_RATE,
653         AVG_INC_RATE,
654         CUR_LOSS_RATE,
655         AVG_LOSS_RATE,
656 } rate_stat_type;
657
658 class Peer {
659         public:
660                 friend class PeerHelper;
661
662                 Peer(Address address_,u16 id_,Connection* connection) :
663                         id(id_),
664                         m_connection(connection),
665                         address(address_),
666                         m_last_timeout_check(porting::getTimeMs())
667                 {
668                 };
669
670                 virtual ~Peer() {
671                         MutexAutoLock usage_lock(m_exclusive_access_mutex);
672                         FATAL_ERROR_IF(m_usage != 0, "Reference counting failure");
673                 };
674
675                 // Unique id of the peer
676                 u16 id;
677
678                 void Drop();
679
680                 virtual void PutReliableSendCommand(ConnectionCommand &c,
681                                                 unsigned int max_packet_size) {};
682
683                 virtual bool getAddress(MTProtocols type, Address& toset) = 0;
684
685                 bool isPendingDeletion()
686                 { MutexAutoLock lock(m_exclusive_access_mutex); return m_pending_deletion; };
687
688                 void ResetTimeout()
689                         {MutexAutoLock lock(m_exclusive_access_mutex); m_timeout_counter = 0.0; };
690
691                 bool isTimedOut(float timeout);
692
693                 unsigned int m_increment_packets_remaining = 9;
694                 unsigned int m_increment_bytes_remaining = 0;
695
696                 virtual u16 getNextSplitSequenceNumber(u8 channel) { return 0; };
697                 virtual void setNextSplitSequenceNumber(u8 channel, u16 seqnum) {};
698                 virtual SharedBuffer<u8> addSpiltPacket(u8 channel,
699                                                                                                 BufferedPacket toadd,
700                                                                                                 bool reliable)
701                                 {
702                                         fprintf(stderr,"Peer: addSplitPacket called, this is supposed to be never called!\n");
703                                         return SharedBuffer<u8>(0);
704                                 };
705
706                 virtual bool Ping(float dtime, SharedBuffer<u8>& data) { return false; };
707
708                 virtual float getStat(rtt_stat_type type) const {
709                         switch (type) {
710                                 case MIN_RTT:
711                                         return m_rtt.min_rtt;
712                                 case MAX_RTT:
713                                         return m_rtt.max_rtt;
714                                 case AVG_RTT:
715                                         return m_rtt.avg_rtt;
716                                 case MIN_JITTER:
717                                         return m_rtt.jitter_min;
718                                 case MAX_JITTER:
719                                         return m_rtt.jitter_max;
720                                 case AVG_JITTER:
721                                         return m_rtt.jitter_avg;
722                         }
723                         return -1;
724                 }
725         protected:
726                 virtual void reportRTT(float rtt) {};
727
728                 void RTTStatistics(float rtt,
729                                                         const std::string &profiler_id = "",
730                                                         unsigned int num_samples = 1000);
731
732                 bool IncUseCount();
733                 void DecUseCount();
734
735                 std::mutex m_exclusive_access_mutex;
736
737                 bool m_pending_deletion = false;
738
739                 Connection* m_connection;
740
741                 // Address of the peer
742                 Address address;
743
744                 // Ping timer
745                 float m_ping_timer = 0.0f;
746         private:
747
748                 struct rttstats {
749                         float jitter_min = FLT_MAX;
750                         float jitter_max = 0.0f;
751                         float jitter_avg = -1.0f;
752                         float min_rtt = FLT_MAX;
753                         float max_rtt = 0.0f;
754                         float avg_rtt = -1.0f;
755
756                         rttstats() {};
757                 };
758
759                 rttstats m_rtt;
760                 float m_last_rtt = -1.0f;
761
762                 // current usage count
763                 unsigned int m_usage = 0;
764
765                 // Seconds from last receive
766                 float m_timeout_counter = 0.0f;
767
768                 u64 m_last_timeout_check;
769 };
770
771 class UDPPeer : public Peer
772 {
773 public:
774
775         friend class PeerHelper;
776         friend class ConnectionReceiveThread;
777         friend class ConnectionSendThread;
778         friend class Connection;
779
780         UDPPeer(u16 a_id, Address a_address, Connection* connection);
781         virtual ~UDPPeer() {};
782
783         void PutReliableSendCommand(ConnectionCommand &c,
784                                                         unsigned int max_packet_size);
785
786         bool getAddress(MTProtocols type, Address& toset);
787
788         void setNonLegacyPeer();
789
790         bool getLegacyPeer()
791         { return m_legacy_peer; }
792
793         u16 getNextSplitSequenceNumber(u8 channel);
794         void setNextSplitSequenceNumber(u8 channel, u16 seqnum);
795
796         SharedBuffer<u8> addSpiltPacket(u8 channel,
797                                                                         BufferedPacket toadd,
798                                                                         bool reliable);
799
800
801 protected:
802         /*
803                 Calculates avg_rtt and resend_timeout.
804                 rtt=-1 only recalculates resend_timeout
805         */
806         void reportRTT(float rtt);
807
808         void RunCommandQueues(
809                                         unsigned int max_packet_size,
810                                         unsigned int maxcommands,
811                                         unsigned int maxtransfer);
812
813         float getResendTimeout()
814                 { MutexAutoLock lock(m_exclusive_access_mutex); return resend_timeout; }
815
816         void setResendTimeout(float timeout)
817                 { MutexAutoLock lock(m_exclusive_access_mutex); resend_timeout = timeout; }
818         bool Ping(float dtime,SharedBuffer<u8>& data);
819
820         Channel channels[CHANNEL_COUNT];
821         bool m_pending_disconnect = false;
822 private:
823         // This is changed dynamically
824         float resend_timeout = 0.5;
825
826         bool processReliableSendCommand(
827                                         ConnectionCommand &c,
828                                         unsigned int max_packet_size);
829
830         bool m_legacy_peer = true;
831 };
832
833 /*
834         Connection
835 */
836
837 enum ConnectionEventType{
838         CONNEVENT_NONE,
839         CONNEVENT_DATA_RECEIVED,
840         CONNEVENT_PEER_ADDED,
841         CONNEVENT_PEER_REMOVED,
842         CONNEVENT_BIND_FAILED,
843 };
844
845 struct ConnectionEvent
846 {
847         enum ConnectionEventType type = CONNEVENT_NONE;
848         u16 peer_id = 0;
849         Buffer<u8> data;
850         bool timeout = false;
851         Address address;
852
853         ConnectionEvent() {}
854
855         std::string describe()
856         {
857                 switch(type) {
858                 case CONNEVENT_NONE:
859                         return "CONNEVENT_NONE";
860                 case CONNEVENT_DATA_RECEIVED:
861                         return "CONNEVENT_DATA_RECEIVED";
862                 case CONNEVENT_PEER_ADDED:
863                         return "CONNEVENT_PEER_ADDED";
864                 case CONNEVENT_PEER_REMOVED:
865                         return "CONNEVENT_PEER_REMOVED";
866                 case CONNEVENT_BIND_FAILED:
867                         return "CONNEVENT_BIND_FAILED";
868                 }
869                 return "Invalid ConnectionEvent";
870         }
871
872         void dataReceived(u16 peer_id_, const SharedBuffer<u8> &data_)
873         {
874                 type = CONNEVENT_DATA_RECEIVED;
875                 peer_id = peer_id_;
876                 data = data_;
877         }
878         void peerAdded(u16 peer_id_, Address address_)
879         {
880                 type = CONNEVENT_PEER_ADDED;
881                 peer_id = peer_id_;
882                 address = address_;
883         }
884         void peerRemoved(u16 peer_id_, bool timeout_, Address address_)
885         {
886                 type = CONNEVENT_PEER_REMOVED;
887                 peer_id = peer_id_;
888                 timeout = timeout_;
889                 address = address_;
890         }
891         void bindFailed()
892         {
893                 type = CONNEVENT_BIND_FAILED;
894         }
895 };
896
897 class ConnectionSendThread : public Thread {
898
899 public:
900         friend class UDPPeer;
901
902         ConnectionSendThread(unsigned int max_packet_size, float timeout);
903
904         void *run();
905
906         void Trigger();
907
908         void setParent(Connection* parent) {
909                 assert(parent != NULL); // Pre-condition
910                 m_connection = parent;
911         }
912
913         void setPeerTimeout(float peer_timeout)
914                 { m_timeout = peer_timeout; }
915
916 private:
917         void runTimeouts    (float dtime);
918         void rawSend        (const BufferedPacket &packet);
919         bool rawSendAsPacket(u16 peer_id, u8 channelnum,
920                                                         SharedBuffer<u8> data, bool reliable);
921
922         void processReliableCommand (ConnectionCommand &c);
923         void processNonReliableCommand (ConnectionCommand &c);
924         void serve          (Address bind_address);
925         void connect        (Address address);
926         void disconnect     ();
927         void disconnect_peer(u16 peer_id);
928         void send           (u16 peer_id, u8 channelnum,
929                                                         SharedBuffer<u8> data);
930         void sendReliable   (ConnectionCommand &c);
931         void sendToAll      (u8 channelnum,
932                                                         SharedBuffer<u8> data);
933         void sendToAllReliable(ConnectionCommand &c);
934
935         void sendPackets    (float dtime);
936
937         void sendAsPacket   (u16 peer_id, u8 channelnum,
938                                                         SharedBuffer<u8> data,bool ack=false);
939
940         void sendAsPacketReliable(BufferedPacket& p, Channel* channel);
941
942         bool packetsQueued();
943
944         Connection           *m_connection = nullptr;
945         unsigned int          m_max_packet_size;
946         float                 m_timeout;
947         std::queue<OutgoingPacket> m_outgoing_queue;
948         Semaphore             m_send_sleep_semaphore;
949
950         unsigned int          m_iteration_packets_avaialble;
951         unsigned int          m_max_commands_per_iteration = 1;
952         unsigned int          m_max_data_packets_per_iteration;
953         unsigned int          m_max_packets_requeued = 256;
954 };
955
956 class ConnectionReceiveThread : public Thread {
957 public:
958         ConnectionReceiveThread(unsigned int max_packet_size);
959
960         void *run();
961
962         void setParent(Connection *parent) {
963                 assert(parent); // Pre-condition
964                 m_connection = parent;
965         }
966
967 private:
968         void receive();
969
970         // Returns next data from a buffer if possible
971         // If found, returns true; if not, false.
972         // If found, sets peer_id and dst
973         bool getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst);
974
975         bool checkIncomingBuffers(Channel *channel, u16 &peer_id,
976                                                         SharedBuffer<u8> &dst);
977
978         /*
979                 Processes a packet with the basic header stripped out.
980                 Parameters:
981                         packetdata: Data in packet (with no base headers)
982                         peer_id: peer id of the sender of the packet in question
983                         channelnum: channel on which the packet was sent
984                         reliable: true if recursing into a reliable packet
985         */
986         SharedBuffer<u8> processPacket(Channel *channel,
987                                                         SharedBuffer<u8> packetdata, u16 peer_id,
988                                                         u8 channelnum, bool reliable);
989
990
991         Connection *m_connection = nullptr;
992 };
993
994 class Connection
995 {
996 public:
997         friend class ConnectionSendThread;
998         friend class ConnectionReceiveThread;
999
1000         Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6,
1001                         PeerHandler *peerhandler);
1002         ~Connection();
1003
1004         /* Interface */
1005         ConnectionEvent waitEvent(u32 timeout_ms);
1006         void putCommand(ConnectionCommand &c);
1007
1008         void SetTimeoutMs(int timeout) { m_bc_receive_timeout = timeout; }
1009         void Serve(Address bind_addr);
1010         void Connect(Address address);
1011         bool Connected();
1012         void Disconnect();
1013         void Receive(NetworkPacket* pkt);
1014         void Send(u16 peer_id, u8 channelnum, NetworkPacket* pkt, bool reliable);
1015         u16 GetPeerID() { return m_peer_id; }
1016         Address GetPeerAddress(u16 peer_id);
1017         float getPeerStat(u16 peer_id, rtt_stat_type type);
1018         float getLocalStat(rate_stat_type type);
1019         const u32 GetProtocolID() const { return m_protocol_id; };
1020         const std::string getDesc();
1021         void DisconnectPeer(u16 peer_id);
1022
1023 protected:
1024         PeerHelper getPeer(u16 peer_id);
1025         PeerHelper getPeerNoEx(u16 peer_id);
1026         u16   lookupPeer(Address& sender);
1027
1028         u16 createPeer(Address& sender, MTProtocols protocol, int fd);
1029         UDPPeer*  createServerPeer(Address& sender);
1030         bool deletePeer(u16 peer_id, bool timeout);
1031
1032         void SetPeerID(u16 id) { m_peer_id = id; }
1033
1034         void sendAck(u16 peer_id, u8 channelnum, u16 seqnum);
1035
1036         void PrintInfo(std::ostream &out);
1037         void PrintInfo();
1038
1039         std::list<u16> getPeerIDs()
1040         {
1041                 MutexAutoLock peerlock(m_peers_mutex);
1042                 return m_peer_ids;
1043         }
1044
1045         UDPSocket m_udpSocket;
1046         MutexedQueue<ConnectionCommand> m_command_queue;
1047
1048         void putEvent(ConnectionEvent &e);
1049
1050         void TriggerSend()
1051                 { m_sendThread.Trigger(); }
1052 private:
1053         std::list<Peer*> getPeers();
1054
1055         MutexedQueue<ConnectionEvent> m_event_queue;
1056
1057         u16 m_peer_id = 0;
1058         u32 m_protocol_id;
1059
1060         std::map<u16, Peer*> m_peers;
1061         std::list<u16> m_peer_ids;
1062         std::mutex m_peers_mutex;
1063
1064         ConnectionSendThread m_sendThread;
1065         ConnectionReceiveThread m_receiveThread;
1066
1067         std::mutex m_info_mutex;
1068
1069         // Backwards compatibility
1070         PeerHandler *m_bc_peerhandler;
1071         int m_bc_receive_timeout = 0;
1072
1073         bool m_shutting_down = false;
1074
1075         u16 m_next_remote_peer_id = 2;
1076 };
1077
1078 } // namespace
1079
1080 #endif