Network cleanup (#6302)
[oweals/minetest.git] / src / network / connection.h
1 /*
2 Minetest
3 Copyright (C) 2013 celeron55, Perttu Ahola <celeron55@gmail.com>
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation; either version 2.1 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU Lesser General Public License for more details.
14
15 You should have received a copy of the GNU Lesser General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #pragma once
21
22 #include "irrlichttypes_bloated.h"
23 #include "peerhandler.h"
24 #include "socket.h"
25 #include "constants.h"
26 #include "util/pointer.h"
27 #include "util/container.h"
28 #include "util/thread.h"
29 #include "util/numeric.h"
30 #include <iostream>
31 #include <fstream>
32 #include <list>
33 #include <map>
34
35 class NetworkPacket;
36
37 namespace con
38 {
39
40 typedef enum MTProtocols {
41         MTP_PRIMARY,
42         MTP_UDP,
43         MTP_MINETEST_RELIABLE_UDP
44 } MTProtocols;
45
46 #define SEQNUM_MAX 65535
47 inline bool seqnum_higher(u16 totest, u16 base)
48 {
49         if (totest > base)
50         {
51                 if ((totest - base) > (SEQNUM_MAX/2))
52                         return false;
53
54                 return true;
55         }
56
57         if ((base - totest) > (SEQNUM_MAX/2))
58                 return true;
59
60         return false;
61 }
62
63 inline bool seqnum_in_window(u16 seqnum, u16 next,u16 window_size)
64 {
65         u16 window_start = next;
66         u16 window_end   = ( next + window_size ) % (SEQNUM_MAX+1);
67
68         if (window_start < window_end) {
69                 return ((seqnum >= window_start) && (seqnum < window_end));
70         }
71
72
73         return ((seqnum < window_end) || (seqnum >= window_start));
74 }
75
76 struct BufferedPacket
77 {
78         BufferedPacket(u8 *a_data, u32 a_size):
79                 data(a_data, a_size)
80         {}
81         BufferedPacket(u32 a_size):
82                 data(a_size)
83         {}
84         Buffer<u8> data; // Data of the packet, including headers
85         float time = 0.0f; // Seconds from buffering the packet or re-sending
86         float totaltime = 0.0f; // Seconds from buffering the packet
87         u64 absolute_send_time = -1;
88         Address address; // Sender or destination
89         unsigned int resend_count = 0;
90 };
91
92 // This adds the base headers to the data and makes a packet out of it
93 BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
94                 u32 protocol_id, u16 sender_peer_id, u8 channel);
95 BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
96                 u32 protocol_id, u16 sender_peer_id, u8 channel);
97
98 // Add the TYPE_ORIGINAL header to the data
99 SharedBuffer<u8> makeOriginalPacket(
100                 SharedBuffer<u8> data);
101
102 // Split data in chunks and add TYPE_SPLIT headers to them
103 std::list<SharedBuffer<u8> > makeSplitPacket(
104                 SharedBuffer<u8> data,
105                 u32 chunksize_max,
106                 u16 seqnum);
107
108 // Depending on size, make a TYPE_ORIGINAL or TYPE_SPLIT packet
109 // Increments split_seqnum if a split packet is made
110 std::list<SharedBuffer<u8> > makeAutoSplitPacket(
111                 SharedBuffer<u8> data,
112                 u32 chunksize_max,
113                 u16 &split_seqnum);
114
115 // Add the TYPE_RELIABLE header to the data
116 SharedBuffer<u8> makeReliablePacket(
117                 const SharedBuffer<u8> &data,
118                 u16 seqnum);
119
120 struct IncomingSplitPacket
121 {
122         IncomingSplitPacket() = default;
123
124         // Key is chunk number, value is data without headers
125         std::map<u16, SharedBuffer<u8> > chunks;
126         u32 chunk_count;
127         float time = 0.0f; // Seconds from adding
128         bool reliable = false; // If true, isn't deleted on timeout
129
130         bool allReceived()
131         {
132                 return (chunks.size() == chunk_count);
133         }
134 };
135
136 /*
137 === NOTES ===
138
139 A packet is sent through a channel to a peer with a basic header:
140 TODO: Should we have a receiver_peer_id also?
141         Header (7 bytes):
142         [0] u32 protocol_id
143         [4] u16 sender_peer_id
144         [6] u8 channel
145 sender_peer_id:
146         Unique to each peer.
147         value 0 (PEER_ID_INEXISTENT) is reserved for making new connections
148         value 1 (PEER_ID_SERVER) is reserved for server
149         these constants are defined in constants.h
150 channel:
151         The lower the number, the higher the priority is.
152         Only channels 0, 1 and 2 exist.
153 */
154 #define BASE_HEADER_SIZE 7
155 #define CHANNEL_COUNT 3
156 /*
157 Packet types:
158
159 CONTROL: This is a packet used by the protocol.
160 - When this is processed, nothing is handed to the user.
161         Header (2 byte):
162         [0] u8 type
163         [1] u8 controltype
164 controltype and data description:
165         CONTROLTYPE_ACK
166                 [2] u16 seqnum
167         CONTROLTYPE_SET_PEER_ID
168                 [2] u16 peer_id_new
169         CONTROLTYPE_PING
170         - There is no actual reply, but this can be sent in a reliable
171           packet to get a reply
172         CONTROLTYPE_DISCO
173 */
174 #define TYPE_CONTROL 0
175 #define CONTROLTYPE_ACK 0
176 #define CONTROLTYPE_SET_PEER_ID 1
177 #define CONTROLTYPE_PING 2
178 #define CONTROLTYPE_DISCO 3
179 #define CONTROLTYPE_ENABLE_BIG_SEND_WINDOW 4
180
181 /*
182 ORIGINAL: This is a plain packet with no control and no error
183 checking at all.
184 - When this is processed, it is directly handed to the user.
185         Header (1 byte):
186         [0] u8 type
187 */
188 #define TYPE_ORIGINAL 1
189 #define ORIGINAL_HEADER_SIZE 1
190 /*
191 SPLIT: These are sequences of packets forming one bigger piece of
192 data.
193 - When processed and all the packet_nums 0...packet_count-1 are
194   present (this should be buffered), the resulting data shall be
195   directly handed to the user.
196 - If the data fails to come up in a reasonable time, the buffer shall
197   be silently discarded.
198 - These can be sent as-is or atop of a RELIABLE packet stream.
199         Header (7 bytes):
200         [0] u8 type
201         [1] u16 seqnum
202         [3] u16 chunk_count
203         [5] u16 chunk_num
204 */
205 #define TYPE_SPLIT 2
206 /*
207 RELIABLE: Delivery of all RELIABLE packets shall be forced by ACKs,
208 and they shall be delivered in the same order as sent. This is done
209 with a buffer in the receiving and transmitting end.
210 - When this is processed, the contents of each packet is recursively
211   processed as packets.
212         Header (3 bytes):
213         [0] u8 type
214         [1] u16 seqnum
215
216 */
217 #define TYPE_RELIABLE 3
218 #define RELIABLE_HEADER_SIZE 3
219 #define SEQNUM_INITIAL 65500
220
221 /*
222         A buffer which stores reliable packets and sorts them internally
223         for fast access to the smallest one.
224 */
225
226 typedef std::list<BufferedPacket>::iterator RPBSearchResult;
227
228 class ReliablePacketBuffer
229 {
230 public:
231         ReliablePacketBuffer() = default;
232
233         bool getFirstSeqnum(u16& result);
234
235         BufferedPacket popFirst();
236         BufferedPacket popSeqnum(u16 seqnum);
237         void insert(BufferedPacket &p,u16 next_expected);
238
239         void incrementTimeouts(float dtime);
240         std::list<BufferedPacket> getTimedOuts(float timeout,
241                         unsigned int max_packets);
242
243         void print();
244         bool empty();
245         bool containsPacket(u16 seqnum);
246         RPBSearchResult notFound();
247         u32 size();
248
249
250 private:
251         RPBSearchResult findPacket(u16 seqnum);
252
253         std::list<BufferedPacket> m_list;
254         u32 m_list_size = 0;
255
256         u16 m_oldest_non_answered_ack;
257
258         std::mutex m_list_mutex;
259 };
260
261 /*
262         A buffer for reconstructing split packets
263 */
264
265 class IncomingSplitBuffer
266 {
267 public:
268         ~IncomingSplitBuffer();
269         /*
270                 Returns a reference counted buffer of length != 0 when a full split
271                 packet is constructed. If not, returns one of length 0.
272         */
273         SharedBuffer<u8> insert(BufferedPacket &p, bool reliable);
274
275         void removeUnreliableTimedOuts(float dtime, float timeout);
276
277 private:
278         // Key is seqnum
279         std::map<u16, IncomingSplitPacket*> m_buf;
280
281         std::mutex m_map_mutex;
282 };
283
284 struct OutgoingPacket
285 {
286         u16 peer_id;
287         u8 channelnum;
288         SharedBuffer<u8> data;
289         bool reliable;
290         bool ack;
291
292         OutgoingPacket(u16 peer_id_, u8 channelnum_, const SharedBuffer<u8> &data_,
293                         bool reliable_,bool ack_=false):
294                 peer_id(peer_id_),
295                 channelnum(channelnum_),
296                 data(data_),
297                 reliable(reliable_),
298                 ack(ack_)
299         {
300         }
301 };
302
303 enum ConnectionCommandType{
304         CONNCMD_NONE,
305         CONNCMD_SERVE,
306         CONNCMD_CONNECT,
307         CONNCMD_DISCONNECT,
308         CONNCMD_DISCONNECT_PEER,
309         CONNCMD_SEND,
310         CONNCMD_SEND_TO_ALL,
311         CONCMD_ACK,
312         CONCMD_CREATE_PEER,
313         CONCMD_DISABLE_LEGACY
314 };
315
316 struct ConnectionCommand
317 {
318         enum ConnectionCommandType type = CONNCMD_NONE;
319         Address address;
320         u16 peer_id = PEER_ID_INEXISTENT;
321         u8 channelnum;
322         Buffer<u8> data;
323         bool reliable = false;
324         bool raw = false;
325
326         ConnectionCommand() = default;
327
328         void serve(Address address_)
329         {
330                 type = CONNCMD_SERVE;
331                 address = address_;
332         }
333         void connect(Address address_)
334         {
335                 type = CONNCMD_CONNECT;
336                 address = address_;
337         }
338         void disconnect()
339         {
340                 type = CONNCMD_DISCONNECT;
341         }
342         void disconnect_peer(u16 peer_id_)
343         {
344                 type = CONNCMD_DISCONNECT_PEER;
345                 peer_id = peer_id_;
346         }
347
348         void send(u16 peer_id_, u8 channelnum_, NetworkPacket* pkt, bool reliable_);
349
350         void ack(u16 peer_id_, u8 channelnum_, const SharedBuffer<u8> &data_)
351         {
352                 type = CONCMD_ACK;
353                 peer_id = peer_id_;
354                 channelnum = channelnum_;
355                 data = data_;
356                 reliable = false;
357         }
358
359         void createPeer(u16 peer_id_, const SharedBuffer<u8> &data_)
360         {
361                 type = CONCMD_CREATE_PEER;
362                 peer_id = peer_id_;
363                 data = data_;
364                 channelnum = 0;
365                 reliable = true;
366                 raw = true;
367         }
368
369         void disableLegacy(u16 peer_id_, const SharedBuffer<u8> &data_)
370         {
371                 type = CONCMD_DISABLE_LEGACY;
372                 peer_id = peer_id_;
373                 data = data_;
374                 channelnum = 0;
375                 reliable = true;
376                 raw = true;
377         }
378 };
379
380 /* maximum window size to use, 0xFFFF is theoretical maximum  don't think about
381  * touching it, the less you're away from it the more likely data corruption
382  * will occur
383  */
384 #define MAX_RELIABLE_WINDOW_SIZE 0x8000
385         /* starting value for window size */
386 #define MIN_RELIABLE_WINDOW_SIZE 0x40
387
388 class Channel
389 {
390
391 public:
392         u16 readNextIncomingSeqNum();
393         u16 incNextIncomingSeqNum();
394
395         u16 getOutgoingSequenceNumber(bool& successfull);
396         u16 readOutgoingSequenceNumber();
397         bool putBackSequenceNumber(u16);
398
399         u16 readNextSplitSeqNum();
400         void setNextSplitSeqNum(u16 seqnum);
401
402         // This is for buffering the incoming packets that are coming in
403         // the wrong order
404         ReliablePacketBuffer incoming_reliables;
405         // This is for buffering the sent packets so that the sender can
406         // re-send them if no ACK is received
407         ReliablePacketBuffer outgoing_reliables_sent;
408
409         //queued reliable packets
410         std::queue<BufferedPacket> queued_reliables;
411
412         //queue commands prior splitting to packets
413         std::deque<ConnectionCommand> queued_commands;
414
415         IncomingSplitBuffer incoming_splits;
416
417         Channel() = default;
418         ~Channel() = default;
419
420         void UpdatePacketLossCounter(unsigned int count);
421         void UpdatePacketTooLateCounter();
422         void UpdateBytesSent(unsigned int bytes,unsigned int packages=1);
423         void UpdateBytesLost(unsigned int bytes);
424         void UpdateBytesReceived(unsigned int bytes);
425
426         void UpdateTimers(float dtime, bool legacy_peer);
427
428         const float getCurrentDownloadRateKB()
429                 { MutexAutoLock lock(m_internal_mutex); return cur_kbps; };
430         const float getMaxDownloadRateKB()
431                 { MutexAutoLock lock(m_internal_mutex); return max_kbps; };
432
433         const float getCurrentLossRateKB()
434                 { MutexAutoLock lock(m_internal_mutex); return cur_kbps_lost; };
435         const float getMaxLossRateKB()
436                 { MutexAutoLock lock(m_internal_mutex); return max_kbps_lost; };
437
438         const float getCurrentIncomingRateKB()
439                 { MutexAutoLock lock(m_internal_mutex); return cur_incoming_kbps; };
440         const float getMaxIncomingRateKB()
441                 { MutexAutoLock lock(m_internal_mutex); return max_incoming_kbps; };
442
443         const float getAvgDownloadRateKB()
444                 { MutexAutoLock lock(m_internal_mutex); return avg_kbps; };
445         const float getAvgLossRateKB()
446                 { MutexAutoLock lock(m_internal_mutex); return avg_kbps_lost; };
447         const float getAvgIncomingRateKB()
448                 { MutexAutoLock lock(m_internal_mutex); return avg_incoming_kbps; };
449
450         const unsigned int getWindowSize() const { return window_size; };
451
452         void setWindowSize(unsigned int size) { window_size = size; };
453 private:
454         std::mutex m_internal_mutex;
455         int window_size = MIN_RELIABLE_WINDOW_SIZE;
456
457         u16 next_incoming_seqnum = SEQNUM_INITIAL;
458
459         u16 next_outgoing_seqnum = SEQNUM_INITIAL;
460         u16 next_outgoing_split_seqnum = SEQNUM_INITIAL;
461
462         unsigned int current_packet_loss = 0;
463         unsigned int current_packet_too_late = 0;
464         unsigned int current_packet_successfull = 0;
465         float packet_loss_counter = 0.0f;
466
467         unsigned int current_bytes_transfered = 0;
468         unsigned int current_bytes_received = 0;
469         unsigned int current_bytes_lost = 0;
470         float max_kbps = 0.0f;
471         float cur_kbps = 0.0f;
472         float avg_kbps = 0.0f;
473         float max_incoming_kbps = 0.0f;
474         float cur_incoming_kbps = 0.0f;
475         float avg_incoming_kbps = 0.0f;
476         float max_kbps_lost = 0.0f;
477         float cur_kbps_lost = 0.0f;
478         float avg_kbps_lost = 0.0f;
479         float bpm_counter = 0.0f;
480
481         unsigned int rate_samples = 0;
482 };
483
484 class Peer;
485
486 class PeerHelper
487 {
488 public:
489         PeerHelper() = default;
490         PeerHelper(Peer* peer);
491         ~PeerHelper();
492
493         PeerHelper&   operator=(Peer* peer);
494         Peer*         operator->() const;
495         bool          operator!();
496         Peer*         operator&() const;
497         bool          operator!=(void* ptr);
498
499 private:
500         Peer *m_peer = nullptr;
501 };
502
503 class Connection;
504
505 typedef enum {
506         CUR_DL_RATE,
507         AVG_DL_RATE,
508         CUR_INC_RATE,
509         AVG_INC_RATE,
510         CUR_LOSS_RATE,
511         AVG_LOSS_RATE,
512 } rate_stat_type;
513
514 class Peer {
515         public:
516                 friend class PeerHelper;
517
518                 Peer(Address address_,u16 id_,Connection* connection) :
519                         id(id_),
520                         m_connection(connection),
521                         address(address_),
522                         m_last_timeout_check(porting::getTimeMs())
523                 {
524                 };
525
526                 virtual ~Peer() {
527                         MutexAutoLock usage_lock(m_exclusive_access_mutex);
528                         FATAL_ERROR_IF(m_usage != 0, "Reference counting failure");
529                 };
530
531                 // Unique id of the peer
532                 u16 id;
533
534                 void Drop();
535
536                 virtual void PutReliableSendCommand(ConnectionCommand &c,
537                                                 unsigned int max_packet_size) {};
538
539                 virtual bool getAddress(MTProtocols type, Address& toset) = 0;
540
541                 bool isPendingDeletion()
542                 { MutexAutoLock lock(m_exclusive_access_mutex); return m_pending_deletion; };
543
544                 void ResetTimeout()
545                         {MutexAutoLock lock(m_exclusive_access_mutex); m_timeout_counter = 0.0; };
546
547                 bool isTimedOut(float timeout);
548
549                 unsigned int m_increment_packets_remaining = 9;
550                 unsigned int m_increment_bytes_remaining = 0;
551
552                 virtual u16 getNextSplitSequenceNumber(u8 channel) { return 0; };
553                 virtual void setNextSplitSequenceNumber(u8 channel, u16 seqnum) {};
554                 virtual SharedBuffer<u8> addSpiltPacket(u8 channel,
555                                                                                                 BufferedPacket toadd,
556                                                                                                 bool reliable)
557                                 {
558                                         fprintf(stderr,"Peer: addSplitPacket called, this is supposed to be never called!\n");
559                                         return SharedBuffer<u8>(0);
560                                 };
561
562                 virtual bool Ping(float dtime, SharedBuffer<u8>& data) { return false; };
563
564                 virtual float getStat(rtt_stat_type type) const {
565                         switch (type) {
566                                 case MIN_RTT:
567                                         return m_rtt.min_rtt;
568                                 case MAX_RTT:
569                                         return m_rtt.max_rtt;
570                                 case AVG_RTT:
571                                         return m_rtt.avg_rtt;
572                                 case MIN_JITTER:
573                                         return m_rtt.jitter_min;
574                                 case MAX_JITTER:
575                                         return m_rtt.jitter_max;
576                                 case AVG_JITTER:
577                                         return m_rtt.jitter_avg;
578                         }
579                         return -1;
580                 }
581         protected:
582                 virtual void reportRTT(float rtt) {};
583
584                 void RTTStatistics(float rtt,
585                                                         const std::string &profiler_id = "",
586                                                         unsigned int num_samples = 1000);
587
588                 bool IncUseCount();
589                 void DecUseCount();
590
591                 std::mutex m_exclusive_access_mutex;
592
593                 bool m_pending_deletion = false;
594
595                 Connection* m_connection;
596
597                 // Address of the peer
598                 Address address;
599
600                 // Ping timer
601                 float m_ping_timer = 0.0f;
602         private:
603
604                 struct rttstats {
605                         float jitter_min = FLT_MAX;
606                         float jitter_max = 0.0f;
607                         float jitter_avg = -1.0f;
608                         float min_rtt = FLT_MAX;
609                         float max_rtt = 0.0f;
610                         float avg_rtt = -1.0f;
611
612                         rttstats() = default;
613                 };
614
615                 rttstats m_rtt;
616                 float m_last_rtt = -1.0f;
617
618                 // current usage count
619                 unsigned int m_usage = 0;
620
621                 // Seconds from last receive
622                 float m_timeout_counter = 0.0f;
623
624                 u64 m_last_timeout_check;
625 };
626
627 class UDPPeer : public Peer
628 {
629 public:
630
631         friend class PeerHelper;
632         friend class ConnectionReceiveThread;
633         friend class ConnectionSendThread;
634         friend class Connection;
635
636         UDPPeer(u16 a_id, Address a_address, Connection* connection);
637         virtual ~UDPPeer() = default;
638
639         void PutReliableSendCommand(ConnectionCommand &c,
640                                                         unsigned int max_packet_size);
641
642         bool getAddress(MTProtocols type, Address& toset);
643
644         void setNonLegacyPeer();
645
646         bool getLegacyPeer()
647         { return m_legacy_peer; }
648
649         u16 getNextSplitSequenceNumber(u8 channel);
650         void setNextSplitSequenceNumber(u8 channel, u16 seqnum);
651
652         SharedBuffer<u8> addSpiltPacket(u8 channel,
653                                                                         BufferedPacket toadd,
654                                                                         bool reliable);
655
656
657 protected:
658         /*
659                 Calculates avg_rtt and resend_timeout.
660                 rtt=-1 only recalculates resend_timeout
661         */
662         void reportRTT(float rtt);
663
664         void RunCommandQueues(
665                                         unsigned int max_packet_size,
666                                         unsigned int maxcommands,
667                                         unsigned int maxtransfer);
668
669         float getResendTimeout()
670                 { MutexAutoLock lock(m_exclusive_access_mutex); return resend_timeout; }
671
672         void setResendTimeout(float timeout)
673                 { MutexAutoLock lock(m_exclusive_access_mutex); resend_timeout = timeout; }
674         bool Ping(float dtime,SharedBuffer<u8>& data);
675
676         Channel channels[CHANNEL_COUNT];
677         bool m_pending_disconnect = false;
678 private:
679         // This is changed dynamically
680         float resend_timeout = 0.5;
681
682         bool processReliableSendCommand(
683                                         ConnectionCommand &c,
684                                         unsigned int max_packet_size);
685
686         bool m_legacy_peer = true;
687 };
688
689 /*
690         Connection
691 */
692
693 enum ConnectionEventType{
694         CONNEVENT_NONE,
695         CONNEVENT_DATA_RECEIVED,
696         CONNEVENT_PEER_ADDED,
697         CONNEVENT_PEER_REMOVED,
698         CONNEVENT_BIND_FAILED,
699 };
700
701 struct ConnectionEvent
702 {
703         enum ConnectionEventType type = CONNEVENT_NONE;
704         u16 peer_id = 0;
705         Buffer<u8> data;
706         bool timeout = false;
707         Address address;
708
709         ConnectionEvent() = default;
710
711         std::string describe()
712         {
713                 switch(type) {
714                 case CONNEVENT_NONE:
715                         return "CONNEVENT_NONE";
716                 case CONNEVENT_DATA_RECEIVED:
717                         return "CONNEVENT_DATA_RECEIVED";
718                 case CONNEVENT_PEER_ADDED:
719                         return "CONNEVENT_PEER_ADDED";
720                 case CONNEVENT_PEER_REMOVED:
721                         return "CONNEVENT_PEER_REMOVED";
722                 case CONNEVENT_BIND_FAILED:
723                         return "CONNEVENT_BIND_FAILED";
724                 }
725                 return "Invalid ConnectionEvent";
726         }
727
728         void dataReceived(u16 peer_id_, const SharedBuffer<u8> &data_)
729         {
730                 type = CONNEVENT_DATA_RECEIVED;
731                 peer_id = peer_id_;
732                 data = data_;
733         }
734         void peerAdded(u16 peer_id_, Address address_)
735         {
736                 type = CONNEVENT_PEER_ADDED;
737                 peer_id = peer_id_;
738                 address = address_;
739         }
740         void peerRemoved(u16 peer_id_, bool timeout_, Address address_)
741         {
742                 type = CONNEVENT_PEER_REMOVED;
743                 peer_id = peer_id_;
744                 timeout = timeout_;
745                 address = address_;
746         }
747         void bindFailed()
748         {
749                 type = CONNEVENT_BIND_FAILED;
750         }
751 };
752
753 class ConnectionSendThread : public Thread {
754
755 public:
756         friend class UDPPeer;
757
758         ConnectionSendThread(unsigned int max_packet_size, float timeout);
759
760         void *run();
761
762         void Trigger();
763
764         void setParent(Connection* parent) {
765                 assert(parent != NULL); // Pre-condition
766                 m_connection = parent;
767         }
768
769         void setPeerTimeout(float peer_timeout)
770                 { m_timeout = peer_timeout; }
771
772 private:
773         void runTimeouts    (float dtime);
774         void rawSend        (const BufferedPacket &packet);
775         bool rawSendAsPacket(u16 peer_id, u8 channelnum,
776                                                         SharedBuffer<u8> data, bool reliable);
777
778         void processReliableCommand (ConnectionCommand &c);
779         void processNonReliableCommand (ConnectionCommand &c);
780         void serve          (Address bind_address);
781         void connect        (Address address);
782         void disconnect     ();
783         void disconnect_peer(u16 peer_id);
784         void send           (u16 peer_id, u8 channelnum,
785                                                         SharedBuffer<u8> data);
786         void sendReliable   (ConnectionCommand &c);
787         void sendToAll      (u8 channelnum,
788                                                         SharedBuffer<u8> data);
789         void sendToAllReliable(ConnectionCommand &c);
790
791         void sendPackets    (float dtime);
792
793         void sendAsPacket   (u16 peer_id, u8 channelnum,
794                                                         SharedBuffer<u8> data,bool ack=false);
795
796         void sendAsPacketReliable(BufferedPacket& p, Channel* channel);
797
798         bool packetsQueued();
799
800         Connection           *m_connection = nullptr;
801         unsigned int          m_max_packet_size;
802         float                 m_timeout;
803         std::queue<OutgoingPacket> m_outgoing_queue;
804         Semaphore             m_send_sleep_semaphore;
805
806         unsigned int          m_iteration_packets_avaialble;
807         unsigned int          m_max_commands_per_iteration = 1;
808         unsigned int          m_max_data_packets_per_iteration;
809         unsigned int          m_max_packets_requeued = 256;
810 };
811
812 class ConnectionReceiveThread : public Thread {
813 public:
814         ConnectionReceiveThread(unsigned int max_packet_size);
815
816         void *run();
817
818         void setParent(Connection *parent) {
819                 assert(parent); // Pre-condition
820                 m_connection = parent;
821         }
822
823 private:
824         void receive();
825
826         // Returns next data from a buffer if possible
827         // If found, returns true; if not, false.
828         // If found, sets peer_id and dst
829         bool getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst);
830
831         bool checkIncomingBuffers(Channel *channel, u16 &peer_id,
832                                                         SharedBuffer<u8> &dst);
833
834         /*
835                 Processes a packet with the basic header stripped out.
836                 Parameters:
837                         packetdata: Data in packet (with no base headers)
838                         peer_id: peer id of the sender of the packet in question
839                         channelnum: channel on which the packet was sent
840                         reliable: true if recursing into a reliable packet
841         */
842         SharedBuffer<u8> processPacket(Channel *channel,
843                                                         SharedBuffer<u8> packetdata, u16 peer_id,
844                                                         u8 channelnum, bool reliable);
845
846
847         Connection *m_connection = nullptr;
848 };
849
850 class PeerHandler;
851
852 class Connection
853 {
854 public:
855         friend class ConnectionSendThread;
856         friend class ConnectionReceiveThread;
857
858         Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6,
859                         PeerHandler *peerhandler);
860         ~Connection();
861
862         /* Interface */
863         ConnectionEvent waitEvent(u32 timeout_ms);
864         void putCommand(ConnectionCommand &c);
865
866         void SetTimeoutMs(int timeout) { m_bc_receive_timeout = timeout; }
867         void Serve(Address bind_addr);
868         void Connect(Address address);
869         bool Connected();
870         void Disconnect();
871         void Receive(NetworkPacket* pkt);
872         void Send(u16 peer_id, u8 channelnum, NetworkPacket* pkt, bool reliable);
873         u16 GetPeerID() { return m_peer_id; }
874         Address GetPeerAddress(u16 peer_id);
875         float getPeerStat(u16 peer_id, rtt_stat_type type);
876         float getLocalStat(rate_stat_type type);
877         const u32 GetProtocolID() const { return m_protocol_id; };
878         const std::string getDesc();
879         void DisconnectPeer(u16 peer_id);
880
881 protected:
882         PeerHelper getPeer(u16 peer_id);
883         PeerHelper getPeerNoEx(u16 peer_id);
884         u16   lookupPeer(Address& sender);
885
886         u16 createPeer(Address& sender, MTProtocols protocol, int fd);
887         UDPPeer*  createServerPeer(Address& sender);
888         bool deletePeer(u16 peer_id, bool timeout);
889
890         void SetPeerID(u16 id) { m_peer_id = id; }
891
892         void sendAck(u16 peer_id, u8 channelnum, u16 seqnum);
893
894         void PrintInfo(std::ostream &out);
895         void PrintInfo();
896
897         std::list<u16> getPeerIDs()
898         {
899                 MutexAutoLock peerlock(m_peers_mutex);
900                 return m_peer_ids;
901         }
902
903         UDPSocket m_udpSocket;
904         MutexedQueue<ConnectionCommand> m_command_queue;
905
906         void putEvent(ConnectionEvent &e);
907
908         void TriggerSend()
909                 { m_sendThread.Trigger(); }
910 private:
911         std::list<Peer*> getPeers();
912
913         MutexedQueue<ConnectionEvent> m_event_queue;
914
915         u16 m_peer_id = 0;
916         u32 m_protocol_id;
917
918         std::map<u16, Peer*> m_peers;
919         std::list<u16> m_peer_ids;
920         std::mutex m_peers_mutex;
921
922         ConnectionSendThread m_sendThread;
923         ConnectionReceiveThread m_receiveThread;
924
925         std::mutex m_info_mutex;
926
927         // Backwards compatibility
928         PeerHandler *m_bc_peerhandler;
929         int m_bc_receive_timeout = 0;
930
931         bool m_shutting_down = false;
932
933         u16 m_next_remote_peer_id = 2;
934 };
935
936 } // namespace