c9474032d1836a1be63b8d144a50f0e774d91e3f
[oweals/minetest.git] / src / connection.h
1 /*
2 Minetest
3 Copyright (C) 2013 celeron55, Perttu Ahola <celeron55@gmail.com>
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation; either version 2.1 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU Lesser General Public License for more details.
14
15 You should have received a copy of the GNU Lesser General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #ifndef CONNECTION_HEADER
21 #define CONNECTION_HEADER
22
23 #include "irrlichttypes_bloated.h"
24 #include "socket.h"
25 #include "exceptions.h"
26 #include "constants.h"
27 #include "util/pointer.h"
28 #include "util/container.h"
29 #include "util/thread.h"
30 #include "util/numeric.h"
31 #include <iostream>
32 #include <fstream>
33 #include <list>
34 #include <map>
35
36 namespace con
37 {
38
39 /*
40         Exceptions
41 */
42 class NotFoundException : public BaseException
43 {
44 public:
45         NotFoundException(const char *s):
46                 BaseException(s)
47         {}
48 };
49
50 class PeerNotFoundException : public BaseException
51 {
52 public:
53         PeerNotFoundException(const char *s):
54                 BaseException(s)
55         {}
56 };
57
58 class ConnectionException : public BaseException
59 {
60 public:
61         ConnectionException(const char *s):
62                 BaseException(s)
63         {}
64 };
65
66 class ConnectionBindFailed : public BaseException
67 {
68 public:
69         ConnectionBindFailed(const char *s):
70                 BaseException(s)
71         {}
72 };
73
74 class InvalidIncomingDataException : public BaseException
75 {
76 public:
77         InvalidIncomingDataException(const char *s):
78                 BaseException(s)
79         {}
80 };
81
82 class InvalidOutgoingDataException : public BaseException
83 {
84 public:
85         InvalidOutgoingDataException(const char *s):
86                 BaseException(s)
87         {}
88 };
89
90 class NoIncomingDataException : public BaseException
91 {
92 public:
93         NoIncomingDataException(const char *s):
94                 BaseException(s)
95         {}
96 };
97
98 class ProcessedSilentlyException : public BaseException
99 {
100 public:
101         ProcessedSilentlyException(const char *s):
102                 BaseException(s)
103         {}
104 };
105
106 class ProcessedQueued : public BaseException
107 {
108 public:
109         ProcessedQueued(const char *s):
110                 BaseException(s)
111         {}
112 };
113
114 class IncomingDataCorruption : public BaseException
115 {
116 public:
117         IncomingDataCorruption(const char *s):
118                 BaseException(s)
119         {}
120 };
121
122 typedef enum MTProtocols {
123         PRIMARY,
124         UDP,
125         MINETEST_RELIABLE_UDP
126 } MTProtocols;
127
128 #define SEQNUM_MAX 65535
129 inline bool seqnum_higher(u16 totest, u16 base)
130 {
131         if (totest > base)
132         {
133                 if((totest - base) > (SEQNUM_MAX/2))
134                         return false;
135                 else
136                         return true;
137         }
138         else
139         {
140                 if((base - totest) > (SEQNUM_MAX/2))
141                         return true;
142                 else
143                         return false;
144         }
145 }
146
147 inline bool seqnum_in_window(u16 seqnum, u16 next,u16 window_size)
148 {
149         u16 window_start = next;
150         u16 window_end   = ( next + window_size ) % (SEQNUM_MAX+1);
151
152         if (window_start < window_end)
153         {
154                 return ((seqnum >= window_start) && (seqnum < window_end));
155         }
156         else
157         {
158                 return ((seqnum < window_end) || (seqnum >= window_start));
159         }
160 }
161
162 struct BufferedPacket
163 {
164         BufferedPacket(u8 *a_data, u32 a_size):
165                 data(a_data, a_size), time(0.0), totaltime(0.0), absolute_send_time(-1)
166         {}
167         BufferedPacket(u32 a_size):
168                 data(a_size), time(0.0), totaltime(0.0), absolute_send_time(-1)
169         {}
170         SharedBuffer<u8> data; // Data of the packet, including headers
171         float time; // Seconds from buffering the packet or re-sending
172         float totaltime; // Seconds from buffering the packet
173         unsigned int absolute_send_time;
174         Address address; // Sender or destination
175 };
176
177 // This adds the base headers to the data and makes a packet out of it
178 BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
179                 u32 protocol_id, u16 sender_peer_id, u8 channel);
180 BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
181                 u32 protocol_id, u16 sender_peer_id, u8 channel);
182
183 // Add the TYPE_ORIGINAL header to the data
184 SharedBuffer<u8> makeOriginalPacket(
185                 SharedBuffer<u8> data);
186
187 // Split data in chunks and add TYPE_SPLIT headers to them
188 std::list<SharedBuffer<u8> > makeSplitPacket(
189                 SharedBuffer<u8> data,
190                 u32 chunksize_max,
191                 u16 seqnum);
192
193 // Depending on size, make a TYPE_ORIGINAL or TYPE_SPLIT packet
194 // Increments split_seqnum if a split packet is made
195 std::list<SharedBuffer<u8> > makeAutoSplitPacket(
196                 SharedBuffer<u8> data,
197                 u32 chunksize_max,
198                 u16 &split_seqnum);
199
200 // Add the TYPE_RELIABLE header to the data
201 SharedBuffer<u8> makeReliablePacket(
202                 SharedBuffer<u8> data,
203                 u16 seqnum);
204
205 struct IncomingSplitPacket
206 {
207         IncomingSplitPacket()
208         {
209                 time = 0.0;
210                 reliable = false;
211         }
212         // Key is chunk number, value is data without headers
213         std::map<u16, SharedBuffer<u8> > chunks;
214         u32 chunk_count;
215         float time; // Seconds from adding
216         bool reliable; // If true, isn't deleted on timeout
217
218         bool allReceived()
219         {
220                 return (chunks.size() == chunk_count);
221         }
222 };
223
224 /*
225 === NOTES ===
226
227 A packet is sent through a channel to a peer with a basic header:
228 TODO: Should we have a receiver_peer_id also?
229         Header (7 bytes):
230         [0] u32 protocol_id
231         [4] u16 sender_peer_id
232         [6] u8 channel
233 sender_peer_id:
234         Unique to each peer.
235         value 0 (PEER_ID_INEXISTENT) is reserved for making new connections
236         value 1 (PEER_ID_SERVER) is reserved for server
237         these constants are defined in constants.h
238 channel:
239         The lower the number, the higher the priority is.
240         Only channels 0, 1 and 2 exist.
241 */
242 #define BASE_HEADER_SIZE 7
243 #define CHANNEL_COUNT 3
244 /*
245 Packet types:
246
247 CONTROL: This is a packet used by the protocol.
248 - When this is processed, nothing is handed to the user.
249         Header (2 byte):
250         [0] u8 type
251         [1] u8 controltype
252 controltype and data description:
253         CONTROLTYPE_ACK
254                 [2] u16 seqnum
255         CONTROLTYPE_SET_PEER_ID
256                 [2] u16 peer_id_new
257         CONTROLTYPE_PING
258         - There is no actual reply, but this can be sent in a reliable
259           packet to get a reply
260         CONTROLTYPE_DISCO
261 */
262 #define TYPE_CONTROL 0
263 #define CONTROLTYPE_ACK 0
264 #define CONTROLTYPE_SET_PEER_ID 1
265 #define CONTROLTYPE_PING 2
266 #define CONTROLTYPE_DISCO 3
267 #define CONTROLTYPE_ENABLE_BIG_SEND_WINDOW 4
268
269 /*
270 ORIGINAL: This is a plain packet with no control and no error
271 checking at all.
272 - When this is processed, it is directly handed to the user.
273         Header (1 byte):
274         [0] u8 type
275 */
276 #define TYPE_ORIGINAL 1
277 #define ORIGINAL_HEADER_SIZE 1
278 /*
279 SPLIT: These are sequences of packets forming one bigger piece of
280 data.
281 - When processed and all the packet_nums 0...packet_count-1 are
282   present (this should be buffered), the resulting data shall be
283   directly handed to the user.
284 - If the data fails to come up in a reasonable time, the buffer shall
285   be silently discarded.
286 - These can be sent as-is or atop of a RELIABLE packet stream.
287         Header (7 bytes):
288         [0] u8 type
289         [1] u16 seqnum
290         [3] u16 chunk_count
291         [5] u16 chunk_num
292 */
293 #define TYPE_SPLIT 2
294 /*
295 RELIABLE: Delivery of all RELIABLE packets shall be forced by ACKs,
296 and they shall be delivered in the same order as sent. This is done
297 with a buffer in the receiving and transmitting end.
298 - When this is processed, the contents of each packet is recursively
299   processed as packets.
300         Header (3 bytes):
301         [0] u8 type
302         [1] u16 seqnum
303
304 */
305 #define TYPE_RELIABLE 3
306 #define RELIABLE_HEADER_SIZE 3
307 #define SEQNUM_INITIAL 65500
308
309 /*
310         A buffer which stores reliable packets and sorts them internally
311         for fast access to the smallest one.
312 */
313
314 typedef std::list<BufferedPacket>::iterator RPBSearchResult;
315
316 class ReliablePacketBuffer
317 {
318 public:
319         ReliablePacketBuffer();
320
321         bool getFirstSeqnum(u16& result);
322
323         BufferedPacket popFirst();
324         BufferedPacket popSeqnum(u16 seqnum);
325         void insert(BufferedPacket &p,u16 next_expected);
326
327         void incrementTimeouts(float dtime);
328         std::list<BufferedPacket> getTimedOuts(float timeout,
329                         unsigned int max_packets);
330
331         void print();
332         bool empty();
333         bool containsPacket(u16 seqnum);
334         RPBSearchResult notFound();
335         u32 size();
336
337
338 private:
339         RPBSearchResult findPacket(u16 seqnum);
340
341         std::list<BufferedPacket> m_list;
342         u16 m_list_size;
343
344         u16 m_oldest_non_answered_ack;
345
346         JMutex m_list_mutex;
347
348         unsigned int writeptr;
349 };
350
351 /*
352         A buffer for reconstructing split packets
353 */
354
355 class IncomingSplitBuffer
356 {
357 public:
358         ~IncomingSplitBuffer();
359         /*
360                 Returns a reference counted buffer of length != 0 when a full split
361                 packet is constructed. If not, returns one of length 0.
362         */
363         SharedBuffer<u8> insert(BufferedPacket &p, bool reliable);
364         
365         void removeUnreliableTimedOuts(float dtime, float timeout);
366         
367 private:
368         // Key is seqnum
369         std::map<u16, IncomingSplitPacket*> m_buf;
370
371         JMutex m_map_mutex;
372 };
373
374 struct OutgoingPacket
375 {
376         u16 peer_id;
377         u8 channelnum;
378         SharedBuffer<u8> data;
379         bool reliable;
380         bool ack;
381
382         OutgoingPacket(u16 peer_id_, u8 channelnum_, SharedBuffer<u8> data_,
383                         bool reliable_,bool ack_=false):
384                 peer_id(peer_id_),
385                 channelnum(channelnum_),
386                 data(data_),
387                 reliable(reliable_),
388                 ack(ack_)
389         {
390         }
391 };
392
393 enum ConnectionCommandType{
394         CONNCMD_NONE,
395         CONNCMD_SERVE,
396         CONNCMD_CONNECT,
397         CONNCMD_DISCONNECT,
398         CONNCMD_DISCONNECT_PEER,
399         CONNCMD_SEND,
400         CONNCMD_SEND_TO_ALL,
401         CONCMD_ACK,
402         CONCMD_CREATE_PEER,
403         CONCMD_DISABLE_LEGACY
404 };
405
406 struct ConnectionCommand
407 {
408         enum ConnectionCommandType type;
409         u16 port;
410         Address address;
411         u16 peer_id;
412         u8 channelnum;
413         Buffer<u8> data;
414         bool reliable;
415         bool raw;
416
417         ConnectionCommand(): type(CONNCMD_NONE), peer_id(PEER_ID_INEXISTENT), reliable(false), raw(false) {}
418
419         void serve(u16 port_)
420         {
421                 type = CONNCMD_SERVE;
422                 port = port_;
423         }
424         void connect(Address address_)
425         {
426                 type = CONNCMD_CONNECT;
427                 address = address_;
428         }
429         void disconnect()
430         {
431                 type = CONNCMD_DISCONNECT;
432         }
433         void disconnect_peer(u16 peer_id_)
434         {
435                 type = CONNCMD_DISCONNECT_PEER;
436                 peer_id = peer_id_;
437         }
438         void send(u16 peer_id_, u8 channelnum_,
439                         SharedBuffer<u8> data_, bool reliable_)
440         {
441                 type = CONNCMD_SEND;
442                 peer_id = peer_id_;
443                 channelnum = channelnum_;
444                 data = data_;
445                 reliable = reliable_;
446         }
447         void sendToAll(u8 channelnum_, SharedBuffer<u8> data_, bool reliable_)
448         {
449                 type = CONNCMD_SEND_TO_ALL;
450                 channelnum = channelnum_;
451                 data = data_;
452                 reliable = reliable_;
453         }
454
455         void ack(u16 peer_id_, u8 channelnum_, SharedBuffer<u8> data_)
456         {
457                 type = CONCMD_ACK;
458                 peer_id = peer_id_;
459                 channelnum = channelnum_;
460                 data = data_;
461                 reliable = false;
462         }
463
464         void createPeer(u16 peer_id_, SharedBuffer<u8> data_)
465         {
466                 type = CONCMD_CREATE_PEER;
467                 peer_id = peer_id_;
468                 data = data_;
469                 channelnum = 0;
470                 reliable = true;
471                 raw = true;
472         }
473
474         void disableLegacy(u16 peer_id_, SharedBuffer<u8> data_)
475         {
476                 type = CONCMD_DISABLE_LEGACY;
477                 peer_id = peer_id_;
478                 data = data_;
479                 channelnum = 0;
480                 reliable = true;
481                 raw = true;
482         }
483 };
484
485 class Channel
486 {
487
488 public:
489         u16 readNextIncomingSeqNum();
490         u16 incNextIncomingSeqNum();
491
492         u16 getOutgoingSequenceNumber(bool& successfull);
493         u16 readOutgoingSequenceNumber();
494         bool putBackSequenceNumber(u16);
495
496         u16 readNextSplitSeqNum();
497         void setNextSplitSeqNum(u16 seqnum);
498         
499         // This is for buffering the incoming packets that are coming in
500         // the wrong order
501         ReliablePacketBuffer incoming_reliables;
502         // This is for buffering the sent packets so that the sender can
503         // re-send them if no ACK is received
504         ReliablePacketBuffer outgoing_reliables_sent;
505
506         //queued reliable packets
507         Queue<BufferedPacket> queued_reliables;
508
509         //queue commands prior splitting to packets
510         Queue<ConnectionCommand> queued_commands;
511
512         IncomingSplitBuffer incoming_splits;
513
514         Channel();
515         ~Channel();
516
517         void UpdatePacketLossCounter(unsigned int count);
518         void UpdatePacketTooLateCounter();
519         void UpdateBytesSent(unsigned int bytes,unsigned int packages=1);
520         void UpdateBytesLost(unsigned int bytes);
521
522         void UpdateTimers(float dtime);
523
524         const float getCurrentDownloadRateKB()
525                 { JMutexAutoLock lock(m_internal_mutex); return cur_kbps; };
526         const float getMaxDownloadRateKB()
527                 { JMutexAutoLock lock(m_internal_mutex); return max_kbps; };
528
529         const float getCurrentLossRateKB()
530                 { JMutexAutoLock lock(m_internal_mutex); return cur_kbps_lost; };
531         const float getMaxLossRateKB()
532                 { JMutexAutoLock lock(m_internal_mutex); return max_kbps_lost; };
533
534         const float getAvgDownloadRateKB()
535                 { JMutexAutoLock lock(m_internal_mutex); return avg_kbps; };
536         const float getAvgLossRateKB()
537                 { JMutexAutoLock lock(m_internal_mutex); return avg_kbps_lost; };
538
539         const unsigned int getWindowSize() const { return window_size; };
540
541         void setWindowSize(unsigned int size) { window_size = size; };
542 private:
543         JMutex m_internal_mutex;
544         unsigned int window_size;
545
546         u16 next_incoming_seqnum;
547
548         u16 next_outgoing_seqnum;
549         u16 next_outgoing_split_seqnum;
550
551         unsigned int current_packet_loss;
552         unsigned int current_packet_too_late;
553         unsigned int current_packet_successfull;
554         float packet_loss_counter;
555
556         unsigned int current_bytes_transfered;
557         unsigned int current_bytes_lost;
558         float max_kbps;
559         float cur_kbps;
560         float avg_kbps;
561         float max_kbps_lost;
562         float cur_kbps_lost;
563         float avg_kbps_lost;
564         float bpm_counter;
565 };
566
567 class Peer;
568
569 enum PeerChangeType
570 {
571         PEER_ADDED,
572         PEER_REMOVED
573 };
574 struct PeerChange
575 {
576         PeerChangeType type;
577         u16 peer_id;
578         bool timeout;
579 };
580
581 class PeerHandler
582 {
583 public:
584
585         PeerHandler()
586         {
587         }
588         virtual ~PeerHandler()
589         {
590         }
591
592         /*
593                 This is called after the Peer has been inserted into the
594                 Connection's peer container.
595         */
596         virtual void peerAdded(Peer *peer) = 0;
597         /*
598                 This is called before the Peer has been removed from the
599                 Connection's peer container.
600         */
601         virtual void deletingPeer(Peer *peer, bool timeout) = 0;
602 };
603
604 class PeerHelper
605 {
606 public:
607         PeerHelper();
608         PeerHelper(Peer* peer);
609         ~PeerHelper();
610
611         PeerHelper&   operator=(Peer* peer);
612         Peer*         operator->() const;
613         bool          operator!();
614         Peer*         operator&() const;
615         bool          operator!=(void* ptr);
616
617 private:
618         Peer* m_peer;
619 };
620
621 class Connection;
622
623 typedef enum rtt_stat_type {
624         MIN_RTT,
625         MAX_RTT,
626         AVG_RTT,
627         MIN_JITTER,
628         MAX_JITTER,
629         AVG_JITTER
630 } rtt_stat_type;
631
632 class Peer {
633         public:
634                 friend class PeerHelper;
635
636                 Peer(Address address_,u16 id_,Connection* connection) :
637                         id(id_),
638                         m_increment_packets_remaining(9),
639                         m_increment_bytes_remaining(0),
640                         m_pending_deletion(false),
641                         m_connection(connection),
642                         address(address_),
643                         m_ping_timer(0.0),
644                         m_last_rtt(-1.0),
645                         m_usage(0),
646                         m_timeout_counter(0.0),
647                         m_last_timeout_check(porting::getTimeMs()),
648                         m_has_sent_with_id(false)
649                 {
650                         m_rtt.avg_rtt = -1.0;
651                         m_rtt.jitter_avg = -1.0;
652                         m_rtt.jitter_max = 0.0;
653                         m_rtt.max_rtt = 0.0;
654                         m_rtt.jitter_min = FLT_MAX;
655                         m_rtt.min_rtt = FLT_MAX;
656                 };
657
658                 virtual ~Peer() {
659                         JMutexAutoLock usage_lock(m_exclusive_access_mutex);
660                         assert(m_usage == 0);
661                 };
662
663                 // Unique id of the peer
664                 u16 id;
665
666                 void Drop();
667
668                 virtual void PutReliableSendCommand(ConnectionCommand &c,
669                                                 unsigned int max_packet_size) {};
670
671                 virtual bool isActive() { return false; };
672
673                 virtual bool getAddress(MTProtocols type, Address& toset) = 0;
674
675                 void ResetTimeout()
676                         {JMutexAutoLock lock(m_exclusive_access_mutex); m_timeout_counter=0.0; };
677
678                 bool isTimedOut(float timeout);
679
680                 void setSentWithID()
681                 { JMutexAutoLock lock(m_exclusive_access_mutex); m_has_sent_with_id = true; };
682
683                 bool hasSentWithID()
684                 { JMutexAutoLock lock(m_exclusive_access_mutex); return m_has_sent_with_id; };
685
686                 unsigned int m_increment_packets_remaining;
687                 unsigned int m_increment_bytes_remaining;
688
689                 virtual u16 getNextSplitSequenceNumber(u8 channel) { return 0; };
690                 virtual void setNextSplitSequenceNumber(u8 channel, u16 seqnum) {};
691                 virtual SharedBuffer<u8> addSpiltPacket(u8 channel,
692                                                                                                 BufferedPacket toadd,
693                                                                                                 bool reliable)
694                                 {
695                                         fprintf(stderr,"Peer: addSplitPacket called, this is supposed to be never called!\n");
696                                         return SharedBuffer<u8>(0);
697                                 };
698
699                 virtual bool Ping(float dtime, SharedBuffer<u8>& data) { return false; };
700
701                 virtual float getStat(rtt_stat_type type) const {
702                         switch (type) {
703                                 case MIN_RTT:
704                                         return m_rtt.min_rtt;
705                                 case MAX_RTT:
706                                         return m_rtt.max_rtt;
707                                 case AVG_RTT:
708                                         return m_rtt.avg_rtt;
709                                 case MIN_JITTER:
710                                         return m_rtt.jitter_min;
711                                 case MAX_JITTER:
712                                         return m_rtt.jitter_max;
713                                 case AVG_JITTER:
714                                         return m_rtt.jitter_avg;
715                         }
716                         return -1;
717                 }
718         protected:
719                 virtual void reportRTT(float rtt) {};
720
721                 void RTTStatistics(float rtt,
722                                                         std::string profiler_id="",
723                                                         unsigned int num_samples=1000);
724
725                 bool IncUseCount();
726                 void DecUseCount();
727
728                 JMutex m_exclusive_access_mutex;
729
730                 bool m_pending_deletion;
731
732                 Connection* m_connection;
733
734                 // Address of the peer
735                 Address address;
736
737                 // Ping timer
738                 float m_ping_timer;
739         private:
740
741                 struct rttstats {
742                         float jitter_min;
743                         float jitter_max;
744                         float jitter_avg;
745                         float min_rtt;
746                         float max_rtt;
747                         float avg_rtt;
748                 };
749
750                 rttstats m_rtt;
751                 float    m_last_rtt;
752
753                 // current usage count
754                 unsigned int m_usage;
755
756                 // Seconds from last receive
757                 float m_timeout_counter;
758
759                 u32 m_last_timeout_check;
760
761                 bool m_has_sent_with_id;
762 };
763
764 class UDPPeer : public Peer
765 {
766 public:
767
768         friend class PeerHelper;
769         friend class ConnectionReceiveThread;
770         friend class ConnectionSendThread;
771
772         UDPPeer(u16 a_id, Address a_address, Connection* connection);
773         virtual ~UDPPeer() {};
774
775         void PutReliableSendCommand(ConnectionCommand &c,
776                                                         unsigned int max_packet_size);
777
778         bool isActive()
779         { return ((hasSentWithID()) && (!m_pending_deletion)); };
780
781         bool getAddress(MTProtocols type, Address& toset);
782
783         void setNonLegacyPeer();
784
785         bool getLegacyPeer()
786         { return m_legacy_peer; }
787
788         u16 getNextSplitSequenceNumber(u8 channel);
789         void setNextSplitSequenceNumber(u8 channel, u16 seqnum);
790
791         SharedBuffer<u8> addSpiltPacket(u8 channel,
792                                                                         BufferedPacket toadd,
793                                                                         bool reliable);
794
795
796 protected:
797         /*
798                 Calculates avg_rtt and resend_timeout.
799                 rtt=-1 only recalculates resend_timeout
800         */
801         void reportRTT(float rtt);
802
803         void RunCommandQueues(
804                                         unsigned int max_packet_size,
805                                         unsigned int maxcommands,
806                                         unsigned int maxtransfer);
807
808         float getResendTimeout()
809                 { JMutexAutoLock lock(m_exclusive_access_mutex); return resend_timeout; }
810
811         void setResendTimeout(float timeout)
812                 { JMutexAutoLock lock(m_exclusive_access_mutex); resend_timeout = timeout; }
813         bool Ping(float dtime,SharedBuffer<u8>& data);
814
815         Channel channels[CHANNEL_COUNT];
816         bool m_pending_disconnect;
817 private:
818         // This is changed dynamically
819         float resend_timeout;
820
821         bool processReliableSendCommand(
822                                         ConnectionCommand &c,
823                                         unsigned int max_packet_size);
824
825         bool m_legacy_peer;
826 };
827
828 /*
829         Connection
830 */
831
832 enum ConnectionEventType{
833         CONNEVENT_NONE,
834         CONNEVENT_DATA_RECEIVED,
835         CONNEVENT_PEER_ADDED,
836         CONNEVENT_PEER_REMOVED,
837         CONNEVENT_BIND_FAILED,
838 };
839
840 struct ConnectionEvent
841 {
842         enum ConnectionEventType type;
843         u16 peer_id;
844         Buffer<u8> data;
845         bool timeout;
846         Address address;
847
848         ConnectionEvent(): type(CONNEVENT_NONE) {}
849
850         std::string describe()
851         {
852                 switch(type){
853                 case CONNEVENT_NONE:
854                         return "CONNEVENT_NONE";
855                 case CONNEVENT_DATA_RECEIVED:
856                         return "CONNEVENT_DATA_RECEIVED";
857                 case CONNEVENT_PEER_ADDED:
858                         return "CONNEVENT_PEER_ADDED";
859                 case CONNEVENT_PEER_REMOVED:
860                         return "CONNEVENT_PEER_REMOVED";
861                 case CONNEVENT_BIND_FAILED:
862                         return "CONNEVENT_BIND_FAILED";
863                 }
864                 return "Invalid ConnectionEvent";
865         }
866         
867         void dataReceived(u16 peer_id_, SharedBuffer<u8> data_)
868         {
869                 type = CONNEVENT_DATA_RECEIVED;
870                 peer_id = peer_id_;
871                 data = data_;
872         }
873         void peerAdded(u16 peer_id_, Address address_)
874         {
875                 type = CONNEVENT_PEER_ADDED;
876                 peer_id = peer_id_;
877                 address = address_;
878         }
879         void peerRemoved(u16 peer_id_, bool timeout_, Address address_)
880         {
881                 type = CONNEVENT_PEER_REMOVED;
882                 peer_id = peer_id_;
883                 timeout = timeout_;
884                 address = address_;
885         }
886         void bindFailed()
887         {
888                 type = CONNEVENT_BIND_FAILED;
889         }
890 };
891
892 class ConnectionSendThread : public JThread {
893
894 public:
895         friend class UDPPeer;
896
897         ConnectionSendThread(Connection* parent,
898                                                         unsigned int max_packet_size, float timeout);
899
900         void * Thread       ();
901
902         void Trigger();
903
904         void setPeerTimeout(float peer_timeout)
905                 { m_timeout = peer_timeout; }
906
907 private:
908         void runTimeouts    (float dtime);
909         void rawSend        (const BufferedPacket &packet);
910         bool rawSendAsPacket(u16 peer_id, u8 channelnum,
911                                                         SharedBuffer<u8> data, bool reliable);
912
913         void processReliableCommand (ConnectionCommand &c);
914         void processNonReliableCommand (ConnectionCommand &c);
915         void serve          (u16 port);
916         void connect        (Address address);
917         void disconnect     ();
918         void disconnect_peer(u16 peer_id);
919         void send           (u16 peer_id, u8 channelnum,
920                                                         SharedBuffer<u8> data);
921         void sendReliable   (ConnectionCommand &c);
922         void sendToAll      (u8 channelnum,
923                                                         SharedBuffer<u8> data);
924         void sendToAllReliable(ConnectionCommand &c);
925
926         void sendPackets    (float dtime);
927
928         void sendAsPacket   (u16 peer_id, u8 channelnum,
929                                                         SharedBuffer<u8> data,bool ack=false);
930
931         void sendAsPacketReliable(BufferedPacket& p, Channel* channel);
932
933         bool packetsQueued();
934
935         Connection*           m_connection;
936         unsigned int          m_max_packet_size;
937         float                 m_timeout;
938         Queue<OutgoingPacket> m_outgoing_queue;
939         JSemaphore            m_send_sleep_semaphore;
940
941         unsigned int          m_iteration_packets_avaialble;
942         unsigned int          m_max_commands_per_iteration;
943         unsigned int          m_max_data_packets_per_iteration;
944         unsigned int          m_max_packets_requeued;
945 };
946
947 class ConnectionReceiveThread : public JThread {
948 public:
949         ConnectionReceiveThread(Connection* parent,
950                                                         unsigned int max_packet_size);
951
952         void * Thread       ();
953
954 private:
955         void receive        ();
956
957         // Returns next data from a buffer if possible
958         // If found, returns true; if not, false.
959         // If found, sets peer_id and dst
960         bool getFromBuffers (u16 &peer_id, SharedBuffer<u8> &dst);
961
962         bool checkIncomingBuffers(Channel *channel, u16 &peer_id,
963                                                         SharedBuffer<u8> &dst);
964
965         /*
966                 Processes a packet with the basic header stripped out.
967                 Parameters:
968                         packetdata: Data in packet (with no base headers)
969                         peer_id: peer id of the sender of the packet in question
970                         channelnum: channel on which the packet was sent
971                         reliable: true if recursing into a reliable packet
972         */
973         SharedBuffer<u8> processPacket(Channel *channel,
974                                                         SharedBuffer<u8> packetdata, u16 peer_id,
975                                                         u8 channelnum, bool reliable);
976
977
978         Connection*           m_connection;
979         unsigned int          m_max_packet_size;
980 };
981
982 class Connection
983 {
984 public:
985         friend class ConnectionSendThread;
986         friend class ConnectionReceiveThread;
987
988         Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6);
989         Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6,
990                         PeerHandler *peerhandler);
991         ~Connection();
992
993         /* Interface */
994         ConnectionEvent getEvent();
995         ConnectionEvent waitEvent(u32 timeout_ms);
996         void putCommand(ConnectionCommand &c);
997         
998         void SetTimeoutMs(int timeout){ m_bc_receive_timeout = timeout; }
999         void Serve(unsigned short port);
1000         void Connect(Address address);
1001         bool Connected();
1002         void Disconnect();
1003         u32 Receive(u16 &peer_id, SharedBuffer<u8> &data);
1004         void SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable);
1005         void Send(u16 peer_id, u8 channelnum, SharedBuffer<u8> data, bool reliable);
1006         u16 GetPeerID(){ return m_peer_id; }
1007         Address GetPeerAddress(u16 peer_id);
1008         float GetPeerAvgRTT(u16 peer_id);
1009         const u32 GetProtocolID() const { return m_protocol_id; };
1010         const std::string getDesc();
1011         void DisconnectPeer(u16 peer_id);
1012
1013 protected:
1014         PeerHelper getPeer(u16 peer_id);
1015         PeerHelper getPeerNoEx(u16 peer_id);
1016         u16   lookupPeer(Address& sender);
1017
1018         u16 createPeer(Address& sender, MTProtocols protocol, int fd);
1019         UDPPeer*  createServerPeer(Address& sender);
1020         bool deletePeer(u16 peer_id, bool timeout);
1021
1022         void SetPeerID(u16 id){ m_peer_id = id; }
1023
1024         void sendAck(u16 peer_id, u8 channelnum, u16 seqnum);
1025
1026         void PrintInfo(std::ostream &out);
1027         void PrintInfo();
1028
1029         std::list<u16> getPeerIDs();
1030
1031         UDPSocket m_udpSocket;
1032         MutexedQueue<ConnectionCommand> m_command_queue;
1033
1034         void putEvent(ConnectionEvent &e);
1035
1036         void TriggerSend()
1037                 { m_sendThread.Trigger(); }
1038 private:
1039         std::list<Peer*> getPeers();
1040
1041         MutexedQueue<ConnectionEvent> m_event_queue;
1042
1043         u16 m_peer_id;
1044         u32 m_protocol_id;
1045         
1046         std::map<u16, Peer*> m_peers;
1047         JMutex m_peers_mutex;
1048
1049         ConnectionSendThread m_sendThread;
1050         ConnectionReceiveThread m_receiveThread;
1051
1052         JMutex m_info_mutex;
1053
1054         // Backwards compatibility
1055         PeerHandler *m_bc_peerhandler;
1056         int m_bc_receive_timeout;
1057
1058         bool m_shutting_down;
1059
1060         u16 m_next_remote_peer_id;
1061 };
1062
1063 } // namespace
1064
1065 #endif
1066