Fixed minetest reliable udp implementation (compatible to old clients)
[oweals/minetest.git] / src / connection.h
1 /*
2 Minetest
3 Copyright (C) 2013 celeron55, Perttu Ahola <celeron55@gmail.com>
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation; either version 2.1 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU Lesser General Public License for more details.
14
15 You should have received a copy of the GNU Lesser General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #ifndef CONNECTION_HEADER
21 #define CONNECTION_HEADER
22
23 #include "irrlichttypes_bloated.h"
24 #include "socket.h"
25 #include "exceptions.h"
26 #include "constants.h"
27 #include "util/pointer.h"
28 #include "util/container.h"
29 #include "util/thread.h"
30 #include "util/numeric.h"
31 #include <iostream>
32 #include <fstream>
33 #include <list>
34 #include <map>
35
36 namespace con
37 {
38
39 /*
40         Exceptions
41 */
42 class NotFoundException : public BaseException
43 {
44 public:
45         NotFoundException(const char *s):
46                 BaseException(s)
47         {}
48 };
49
50 class PeerNotFoundException : public BaseException
51 {
52 public:
53         PeerNotFoundException(const char *s):
54                 BaseException(s)
55         {}
56 };
57
58 class ConnectionException : public BaseException
59 {
60 public:
61         ConnectionException(const char *s):
62                 BaseException(s)
63         {}
64 };
65
66 class ConnectionBindFailed : public BaseException
67 {
68 public:
69         ConnectionBindFailed(const char *s):
70                 BaseException(s)
71         {}
72 };
73
74 /*class ThrottlingException : public BaseException
75 {
76 public:
77         ThrottlingException(const char *s):
78                 BaseException(s)
79         {}
80 };*/
81
82 class InvalidIncomingDataException : public BaseException
83 {
84 public:
85         InvalidIncomingDataException(const char *s):
86                 BaseException(s)
87         {}
88 };
89
90 class InvalidOutgoingDataException : public BaseException
91 {
92 public:
93         InvalidOutgoingDataException(const char *s):
94                 BaseException(s)
95         {}
96 };
97
98 class NoIncomingDataException : public BaseException
99 {
100 public:
101         NoIncomingDataException(const char *s):
102                 BaseException(s)
103         {}
104 };
105
106 class ProcessedSilentlyException : public BaseException
107 {
108 public:
109         ProcessedSilentlyException(const char *s):
110                 BaseException(s)
111         {}
112 };
113
114 class ProcessedQueued : public BaseException
115 {
116 public:
117         ProcessedQueued(const char *s):
118                 BaseException(s)
119         {}
120 };
121
122 class IncomingDataCorruption : public BaseException
123 {
124 public:
125         IncomingDataCorruption(const char *s):
126                 BaseException(s)
127         {}
128 };
129
130 typedef enum MTProtocols {
131         PRIMARY,
132         UDP,
133         MINETEST_RELIABLE_UDP
134 } MTProtocols;
135
136 #define SEQNUM_MAX 65535
137 inline bool seqnum_higher(u16 totest, u16 base)
138 {
139         if (totest > base)
140         {
141                 if((totest - base) > (SEQNUM_MAX/2))
142                         return false;
143                 else
144                         return true;
145         }
146         else
147         {
148                 if((base - totest) > (SEQNUM_MAX/2))
149                         return true;
150                 else
151                         return false;
152         }
153 }
154
155 inline bool seqnum_in_window(u16 seqnum, u16 next,u16 window_size)
156 {
157         u16 window_start = next;
158         u16 window_end   = ( next + window_size ) % (SEQNUM_MAX+1);
159
160         if (window_start < window_end)
161         {
162                 return ((seqnum >= window_start) && (seqnum < window_end));
163         }
164         else
165         {
166                 return ((seqnum < window_end) || (seqnum >= window_start));
167         }
168 }
169
170 struct BufferedPacket
171 {
172         BufferedPacket(u8 *a_data, u32 a_size):
173                 data(a_data, a_size), time(0.0), totaltime(0.0), absolute_send_time(-1)
174         {}
175         BufferedPacket(u32 a_size):
176                 data(a_size), time(0.0), totaltime(0.0), absolute_send_time(-1)
177         {}
178         SharedBuffer<u8> data; // Data of the packet, including headers
179         float time; // Seconds from buffering the packet or re-sending
180         float totaltime; // Seconds from buffering the packet
181         unsigned int absolute_send_time;
182         Address address; // Sender or destination
183 };
184
185 // This adds the base headers to the data and makes a packet out of it
186 BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
187                 u32 protocol_id, u16 sender_peer_id, u8 channel);
188 BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
189                 u32 protocol_id, u16 sender_peer_id, u8 channel);
190
191 // Add the TYPE_ORIGINAL header to the data
192 SharedBuffer<u8> makeOriginalPacket(
193                 SharedBuffer<u8> data);
194
195 // Split data in chunks and add TYPE_SPLIT headers to them
196 std::list<SharedBuffer<u8> > makeSplitPacket(
197                 SharedBuffer<u8> data,
198                 u32 chunksize_max,
199                 u16 seqnum);
200
201 // Depending on size, make a TYPE_ORIGINAL or TYPE_SPLIT packet
202 // Increments split_seqnum if a split packet is made
203 std::list<SharedBuffer<u8> > makeAutoSplitPacket(
204                 SharedBuffer<u8> data,
205                 u32 chunksize_max,
206                 u16 &split_seqnum);
207
208 // Add the TYPE_RELIABLE header to the data
209 SharedBuffer<u8> makeReliablePacket(
210                 SharedBuffer<u8> data,
211                 u16 seqnum);
212
213 struct IncomingSplitPacket
214 {
215         IncomingSplitPacket()
216         {
217                 time = 0.0;
218                 reliable = false;
219         }
220         // Key is chunk number, value is data without headers
221         std::map<u16, SharedBuffer<u8> > chunks;
222         u32 chunk_count;
223         float time; // Seconds from adding
224         bool reliable; // If true, isn't deleted on timeout
225
226         bool allReceived()
227         {
228                 return (chunks.size() == chunk_count);
229         }
230 };
231
232 /*
233 === NOTES ===
234
235 A packet is sent through a channel to a peer with a basic header:
236 TODO: Should we have a receiver_peer_id also?
237         Header (7 bytes):
238         [0] u32 protocol_id
239         [4] u16 sender_peer_id
240         [6] u8 channel
241 sender_peer_id:
242         Unique to each peer.
243         value 0 (PEER_ID_INEXISTENT) is reserved for making new connections
244         value 1 (PEER_ID_SERVER) is reserved for server
245         these constants are defined in constants.h
246 channel:
247         The lower the number, the higher the priority is.
248         Only channels 0, 1 and 2 exist.
249 */
250 #define BASE_HEADER_SIZE 7
251 #define CHANNEL_COUNT 3
252 /*
253 Packet types:
254
255 CONTROL: This is a packet used by the protocol.
256 - When this is processed, nothing is handed to the user.
257         Header (2 byte):
258         [0] u8 type
259         [1] u8 controltype
260 controltype and data description:
261         CONTROLTYPE_ACK
262                 [2] u16 seqnum
263         CONTROLTYPE_SET_PEER_ID
264                 [2] u16 peer_id_new
265         CONTROLTYPE_PING
266         - There is no actual reply, but this can be sent in a reliable
267           packet to get a reply
268         CONTROLTYPE_DISCO
269 */
270 #define TYPE_CONTROL 0
271 #define CONTROLTYPE_ACK 0
272 #define CONTROLTYPE_SET_PEER_ID 1
273 #define CONTROLTYPE_PING 2
274 #define CONTROLTYPE_DISCO 3
275 #define CONTROLTYPE_ENABLE_BIG_SEND_WINDOW 4
276
277 /*
278 ORIGINAL: This is a plain packet with no control and no error
279 checking at all.
280 - When this is processed, it is directly handed to the user.
281         Header (1 byte):
282         [0] u8 type
283 */
284 #define TYPE_ORIGINAL 1
285 #define ORIGINAL_HEADER_SIZE 1
286 /*
287 SPLIT: These are sequences of packets forming one bigger piece of
288 data.
289 - When processed and all the packet_nums 0...packet_count-1 are
290   present (this should be buffered), the resulting data shall be
291   directly handed to the user.
292 - If the data fails to come up in a reasonable time, the buffer shall
293   be silently discarded.
294 - These can be sent as-is or atop of a RELIABLE packet stream.
295         Header (7 bytes):
296         [0] u8 type
297         [1] u16 seqnum
298         [3] u16 chunk_count
299         [5] u16 chunk_num
300 */
301 #define TYPE_SPLIT 2
302 /*
303 RELIABLE: Delivery of all RELIABLE packets shall be forced by ACKs,
304 and they shall be delivered in the same order as sent. This is done
305 with a buffer in the receiving and transmitting end.
306 - When this is processed, the contents of each packet is recursively
307   processed as packets.
308         Header (3 bytes):
309         [0] u8 type
310         [1] u16 seqnum
311
312 */
313 #define TYPE_RELIABLE 3
314 #define RELIABLE_HEADER_SIZE 3
315 #define SEQNUM_INITIAL 65500
316
317 /*
318         A buffer which stores reliable packets and sorts them internally
319         for fast access to the smallest one.
320 */
321
322 typedef std::list<BufferedPacket>::iterator RPBSearchResult;
323
324 class ReliablePacketBuffer
325 {
326 public:
327         ReliablePacketBuffer();
328
329         bool getFirstSeqnum(u16& result);
330
331         BufferedPacket popFirst();
332         BufferedPacket popSeqnum(u16 seqnum);
333         void insert(BufferedPacket &p,u16 next_expected);
334
335         void incrementTimeouts(float dtime);
336         std::list<BufferedPacket> getTimedOuts(float timeout,
337                         unsigned int max_packets);
338
339         void print();
340         bool empty();
341         bool containsPacket(u16 seqnum);
342         RPBSearchResult notFound();
343         u32 size();
344
345
346 private:
347         RPBSearchResult findPacket(u16 seqnum);
348
349         std::list<BufferedPacket> m_list;
350         u16 m_list_size;
351
352         u16 m_oldest_non_answered_ack;
353
354         JMutex m_list_mutex;
355
356         unsigned int writeptr;
357 };
358
359 /*
360         A buffer for reconstructing split packets
361 */
362
363 class IncomingSplitBuffer
364 {
365 public:
366         ~IncomingSplitBuffer();
367         /*
368                 Returns a reference counted buffer of length != 0 when a full split
369                 packet is constructed. If not, returns one of length 0.
370         */
371         SharedBuffer<u8> insert(BufferedPacket &p, bool reliable);
372         
373         void removeUnreliableTimedOuts(float dtime, float timeout);
374         
375 private:
376         // Key is seqnum
377         std::map<u16, IncomingSplitPacket*> m_buf;
378
379         JMutex m_map_mutex;
380 };
381
382 struct OutgoingPacket
383 {
384         u16 peer_id;
385         u8 channelnum;
386         SharedBuffer<u8> data;
387         bool reliable;
388         bool ack;
389
390         OutgoingPacket(u16 peer_id_, u8 channelnum_, SharedBuffer<u8> data_,
391                         bool reliable_,bool ack_=false):
392                 peer_id(peer_id_),
393                 channelnum(channelnum_),
394                 data(data_),
395                 reliable(reliable_),
396                 ack(ack_)
397         {
398         }
399 };
400
401 enum ConnectionCommandType{
402         CONNCMD_NONE,
403         CONNCMD_SERVE,
404         CONNCMD_CONNECT,
405         CONNCMD_DISCONNECT,
406         CONNCMD_DISCONNECT_PEER,
407         CONNCMD_SEND,
408         CONNCMD_SEND_TO_ALL,
409         CONNCMD_DELETE_PEER,
410         CONCMD_ACK,
411         CONCMD_CREATE_PEER,
412         CONCMD_DISABLE_LEGACY
413 };
414
415 struct ConnectionCommand
416 {
417         enum ConnectionCommandType type;
418         u16 port;
419         Address address;
420         u16 peer_id;
421         u8 channelnum;
422         Buffer<u8> data;
423         bool reliable;
424         bool raw;
425
426         ConnectionCommand(): type(CONNCMD_NONE), peer_id(PEER_ID_INEXISTENT), reliable(false), raw(false) {}
427
428         void serve(u16 port_)
429         {
430                 type = CONNCMD_SERVE;
431                 port = port_;
432         }
433         void connect(Address address_)
434         {
435                 type = CONNCMD_CONNECT;
436                 address = address_;
437         }
438         void disconnect()
439         {
440                 type = CONNCMD_DISCONNECT;
441         }
442         void disconnect_peer(u16 peer_id_)
443         {
444                 type = CONNCMD_DISCONNECT_PEER;
445                 peer_id = peer_id_;
446         }
447         void send(u16 peer_id_, u8 channelnum_,
448                         SharedBuffer<u8> data_, bool reliable_)
449         {
450                 type = CONNCMD_SEND;
451                 peer_id = peer_id_;
452                 channelnum = channelnum_;
453                 data = data_;
454                 reliable = reliable_;
455         }
456         void sendToAll(u8 channelnum_, SharedBuffer<u8> data_, bool reliable_)
457         {
458                 type = CONNCMD_SEND_TO_ALL;
459                 channelnum = channelnum_;
460                 data = data_;
461                 reliable = reliable_;
462         }
463         void deletePeer(u16 peer_id_)
464         {
465                 type = CONNCMD_DELETE_PEER;
466                 peer_id = peer_id_;
467         }
468
469         void ack(u16 peer_id_, u8 channelnum_, SharedBuffer<u8> data_)
470         {
471                 type = CONCMD_ACK;
472                 peer_id = peer_id_;
473                 channelnum = channelnum_;
474                 data = data_;
475                 reliable = false;
476         }
477
478         void createPeer(u16 peer_id_, SharedBuffer<u8> data_)
479         {
480                 type = CONCMD_CREATE_PEER;
481                 peer_id = peer_id_;
482                 data = data_;
483                 channelnum = 0;
484                 reliable = true;
485                 raw = true;
486         }
487
488         void disableLegacy(u16 peer_id_, SharedBuffer<u8> data_)
489         {
490                 type = CONCMD_DISABLE_LEGACY;
491                 peer_id = peer_id_;
492                 data = data_;
493                 channelnum = 0;
494                 reliable = true;
495                 raw = true;
496         }
497 };
498
499 class Channel
500 {
501
502 public:
503         u16 readNextIncomingSeqNum();
504         u16 incNextIncomingSeqNum();
505
506         u16 getOutgoingSequenceNumber(bool& successfull);
507         u16 readOutgoingSequenceNumber();
508         bool putBackSequenceNumber(u16);
509
510         u16 readNextSplitSeqNum();
511         void setNextSplitSeqNum(u16 seqnum);
512         
513         // This is for buffering the incoming packets that are coming in
514         // the wrong order
515         ReliablePacketBuffer incoming_reliables;
516         // This is for buffering the sent packets so that the sender can
517         // re-send them if no ACK is received
518         ReliablePacketBuffer outgoing_reliables_sent;
519
520         //queued reliable packets
521         Queue<BufferedPacket> queued_reliables;
522
523         //queue commands prior splitting to packets
524         Queue<ConnectionCommand> queued_commands;
525
526         IncomingSplitBuffer incoming_splits;
527
528         Channel();
529         ~Channel();
530
531         void UpdatePacketLossCounter(unsigned int count);
532         void UpdatePacketTooLateCounter();
533         void UpdateBytesSent(unsigned int bytes,unsigned int packages=1);
534         void UpdateBytesLost(unsigned int bytes);
535
536         void UpdateTimers(float dtime);
537
538         const float getCurrentDownloadRateKB()
539                 { JMutexAutoLock lock(m_internal_mutex); return cur_kbps; };
540         const float getMaxDownloadRateKB()
541                 { JMutexAutoLock lock(m_internal_mutex); return max_kbps; };
542
543         const float getCurrentLossRateKB()
544                 { JMutexAutoLock lock(m_internal_mutex); return cur_kbps_lost; };
545         const float getMaxLossRateKB()
546                 { JMutexAutoLock lock(m_internal_mutex); return max_kbps_lost; };
547
548         const float getAvgDownloadRateKB()
549                 { JMutexAutoLock lock(m_internal_mutex); return avg_kbps; };
550         const float getAvgLossRateKB()
551                 { JMutexAutoLock lock(m_internal_mutex); return avg_kbps_lost; };
552
553         const unsigned int getWindowSize() const { return window_size; };
554
555         void setWindowSize(unsigned int size) { window_size = size; };
556 private:
557         JMutex m_internal_mutex;
558         unsigned int window_size;
559
560         u16 next_incoming_seqnum;
561
562         u16 next_outgoing_seqnum;
563         u16 next_outgoing_split_seqnum;
564
565         unsigned int current_packet_loss;
566         unsigned int current_packet_too_late;
567         unsigned int current_packet_successfull;
568         float packet_loss_counter;
569
570         unsigned int current_bytes_transfered;
571         unsigned int current_bytes_lost;
572         float max_kbps;
573         float cur_kbps;
574         float avg_kbps;
575         float max_kbps_lost;
576         float cur_kbps_lost;
577         float avg_kbps_lost;
578         float bpm_counter;
579 };
580
581 class Peer;
582
583 class PeerHandler
584 {
585 public:
586         PeerHandler()
587         {
588         }
589         virtual ~PeerHandler()
590         {
591         }
592         
593         /*
594                 This is called after the Peer has been inserted into the
595                 Connection's peer container.
596         */
597         virtual void peerAdded(Peer *peer) = 0;
598         /*
599                 This is called before the Peer has been removed from the
600                 Connection's peer container.
601         */
602         virtual void deletingPeer(Peer *peer, bool timeout) = 0;
603 };
604
605 class PeerHelper
606 {
607 public:
608         PeerHelper();
609         PeerHelper(Peer* peer);
610         ~PeerHelper();
611
612         PeerHelper&   operator=(Peer* peer);
613         Peer*         operator->() const;
614         bool          operator!();
615         Peer*         operator&() const;
616         bool          operator!=(void* ptr);
617
618 private:
619         Peer* m_peer;
620 };
621
622 class Connection;
623
624 typedef enum rtt_stat_type {
625         MIN_RTT,
626         MAX_RTT,
627         AVG_RTT,
628         MIN_JITTER,
629         MAX_JITTER,
630         AVG_JITTER
631 } rtt_stat_type;
632
633 class Peer {
634         public:
635                 friend class PeerHelper;
636
637                 Peer(Address address_,u16 id_,Connection* connection) :
638                         id(id_),
639                         m_increment_packets_remaining(9),
640                         m_increment_bytes_remaining(0),
641                         m_pending_deletion(false),
642                         m_connection(connection),
643                         address(address_),
644                         m_ping_timer(0.0),
645                         m_last_rtt(-1.0),
646                         m_usage(0),
647                         m_timeout_counter(0.0),
648                         m_last_timeout_check(porting::getTimeMs()),
649                         m_has_sent_with_id(false)
650                 {
651                         m_rtt.avg_rtt = -1.0;
652                         m_rtt.jitter_avg = -1.0;
653                         m_rtt.jitter_max = 0.0;
654                         m_rtt.max_rtt = 0.0;
655                         m_rtt.jitter_min = FLT_MAX;
656                         m_rtt.min_rtt = FLT_MAX;
657                 };
658
659                 virtual ~Peer() {
660                         JMutexAutoLock usage_lock(m_exclusive_access_mutex);
661                         assert(m_usage == 0);
662                 };
663
664                 // Unique id of the peer
665                 u16 id;
666
667                 void Drop();
668
669                 virtual void PutReliableSendCommand(ConnectionCommand &c,
670                                                 unsigned int max_packet_size) {};
671
672                 virtual bool isActive() { return false; };
673
674                 virtual bool getAddress(MTProtocols type, Address& toset) = 0;
675
676                 void ResetTimeout()
677                         {JMutexAutoLock lock(m_exclusive_access_mutex); m_timeout_counter=0.0; };
678
679                 bool isTimedOut(float timeout);
680
681                 void setSentWithID()
682                 { JMutexAutoLock lock(m_exclusive_access_mutex); m_has_sent_with_id = true; };
683
684                 bool hasSentWithID()
685                 { JMutexAutoLock lock(m_exclusive_access_mutex); return m_has_sent_with_id; };
686
687                 unsigned int m_increment_packets_remaining;
688                 unsigned int m_increment_bytes_remaining;
689
690                 virtual u16 getNextSplitSequenceNumber(u8 channel) { return 0; };
691                 virtual void setNextSplitSequenceNumber(u8 channel, u16 seqnum) {};
692                 virtual SharedBuffer<u8> addSpiltPacket(u8 channel,
693                                                                                                 BufferedPacket toadd,
694                                                                                                 bool reliable)
695                                 {
696                                         fprintf(stderr,"Peer: addSplitPacket called, this is supposed to be never called!\n");
697                                         return SharedBuffer<u8>(0);
698                                 };
699
700                 virtual bool Ping(float dtime, SharedBuffer<u8>& data) { return false; };
701
702                 virtual float getStat(rtt_stat_type type) const {
703                         switch (type) {
704                                 case MIN_RTT:
705                                         return m_rtt.min_rtt;
706                                 case MAX_RTT:
707                                         return m_rtt.max_rtt;
708                                 case AVG_RTT:
709                                         return m_rtt.avg_rtt;
710                                 case MIN_JITTER:
711                                         return m_rtt.jitter_min;
712                                 case MAX_JITTER:
713                                         return m_rtt.jitter_max;
714                                 case AVG_JITTER:
715                                         return m_rtt.jitter_avg;
716                         }
717                         return -1;
718                 }
719         protected:
720                 virtual void reportRTT(float rtt) {};
721
722                 void RTTStatistics(float rtt,
723                                                         std::string profiler_id="",
724                                                         unsigned int num_samples=1000);
725
726                 bool IncUseCount();
727                 void DecUseCount();
728
729                 JMutex m_exclusive_access_mutex;
730
731                 bool m_pending_deletion;
732
733                 Connection* m_connection;
734
735                 // Address of the peer
736                 Address address;
737
738                 // Ping timer
739                 float m_ping_timer;
740         private:
741
742                 struct rttstats {
743                         float jitter_min;
744                         float jitter_max;
745                         float jitter_avg;
746                         float min_rtt;
747                         float max_rtt;
748                         float avg_rtt;
749                 };
750
751                 rttstats m_rtt;
752                 float    m_last_rtt;
753
754                 // current usage count
755                 unsigned int m_usage;
756
757                 // Seconds from last receive
758                 float m_timeout_counter;
759
760                 u32 m_last_timeout_check;
761
762                 bool m_has_sent_with_id;
763 };
764
765 class UDPPeer : public Peer
766 {
767 public:
768
769         friend class PeerHelper;
770         friend class ConnectionReceiveThread;
771         friend class ConnectionSendThread;
772
773         UDPPeer(u16 a_id, Address a_address, Connection* connection);
774         virtual ~UDPPeer();
775
776         void PutReliableSendCommand(ConnectionCommand &c,
777                                                         unsigned int max_packet_size);
778
779         bool isActive()
780         { return ((hasSentWithID()) && (!m_pending_deletion)); };
781
782         bool getAddress(MTProtocols type, Address& toset);
783
784         void setNonLegacyPeer()
785         { m_legacy_peer = false; }
786
787         bool getLegacyPeer()
788         { return m_legacy_peer; }
789
790         u16 getNextSplitSequenceNumber(u8 channel);
791         void setNextSplitSequenceNumber(u8 channel, u16 seqnum);
792
793         SharedBuffer<u8> addSpiltPacket(u8 channel,
794                                                                         BufferedPacket toadd,
795                                                                         bool reliable);
796 protected:
797         /*
798                 Calculates avg_rtt and resend_timeout.
799                 rtt=-1 only recalculates resend_timeout
800         */
801         void reportRTT(float rtt);
802
803         void RunCommandQueues(
804                                         unsigned int max_packet_size,
805                                         unsigned int maxcommands,
806                                         unsigned int maxtransfer);
807
808         float getResendTimeout()
809                 { JMutexAutoLock lock(m_exclusive_access_mutex); return resend_timeout; }
810
811         void setResendTimeout(float timeout)
812                 { JMutexAutoLock lock(m_exclusive_access_mutex); resend_timeout = timeout; }
813         bool Ping(float dtime,SharedBuffer<u8>& data);
814
815         Channel channels[CHANNEL_COUNT];
816 private:
817         // This is changed dynamically
818         float resend_timeout;
819
820         bool processReliableSendCommand(
821                                         ConnectionCommand &c,
822                                         unsigned int max_packet_size);
823
824         bool m_legacy_peer;
825 };
826
827 /*
828         Connection
829 */
830
831 enum ConnectionEventType{
832         CONNEVENT_NONE,
833         CONNEVENT_DATA_RECEIVED,
834         CONNEVENT_PEER_ADDED,
835         CONNEVENT_PEER_REMOVED,
836         CONNEVENT_BIND_FAILED,
837 };
838
839 struct ConnectionEvent
840 {
841         enum ConnectionEventType type;
842         u16 peer_id;
843         Buffer<u8> data;
844         bool timeout;
845         Address address;
846
847         ConnectionEvent(): type(CONNEVENT_NONE) {}
848
849         std::string describe()
850         {
851                 switch(type){
852                 case CONNEVENT_NONE:
853                         return "CONNEVENT_NONE";
854                 case CONNEVENT_DATA_RECEIVED:
855                         return "CONNEVENT_DATA_RECEIVED";
856                 case CONNEVENT_PEER_ADDED:
857                         return "CONNEVENT_PEER_ADDED";
858                 case CONNEVENT_PEER_REMOVED:
859                         return "CONNEVENT_PEER_REMOVED";
860                 case CONNEVENT_BIND_FAILED:
861                         return "CONNEVENT_BIND_FAILED";
862                 }
863                 return "Invalid ConnectionEvent";
864         }
865         
866         void dataReceived(u16 peer_id_, SharedBuffer<u8> data_)
867         {
868                 type = CONNEVENT_DATA_RECEIVED;
869                 peer_id = peer_id_;
870                 data = data_;
871         }
872         void peerAdded(u16 peer_id_, Address address_)
873         {
874                 type = CONNEVENT_PEER_ADDED;
875                 peer_id = peer_id_;
876                 address = address_;
877         }
878         void peerRemoved(u16 peer_id_, bool timeout_, Address address_)
879         {
880                 type = CONNEVENT_PEER_REMOVED;
881                 peer_id = peer_id_;
882                 timeout = timeout_;
883                 address = address_;
884         }
885         void bindFailed()
886         {
887                 type = CONNEVENT_BIND_FAILED;
888         }
889 };
890
891 class ConnectionSendThread : public JThread {
892
893 public:
894         friend class UDPPeer;
895
896         ConnectionSendThread(Connection* parent,
897                                                         unsigned int max_packet_size, float timeout);
898
899         void * Thread       ();
900
901         void Trigger();
902
903         void setPeerTimeout(float peer_timeout)
904                 { m_timeout = peer_timeout; }
905
906 private:
907         void runTimeouts    (float dtime);
908         void rawSend        (const BufferedPacket &packet);
909         bool rawSendAsPacket(u16 peer_id, u8 channelnum,
910                                                         SharedBuffer<u8> data, bool reliable);
911
912         void processReliableCommand (ConnectionCommand &c);
913         void processNonReliableCommand (ConnectionCommand &c);
914         void serve          (u16 port);
915         void connect        (Address address);
916         void disconnect     ();
917         void disconnect_peer(u16 peer_id);
918         void send           (u16 peer_id, u8 channelnum,
919                                                         SharedBuffer<u8> data);
920         void sendReliable   (ConnectionCommand &c);
921         void sendToAll      (u8 channelnum,
922                                                         SharedBuffer<u8> data);
923         void sendToAllReliable(ConnectionCommand &c);
924
925         void sendPackets    (float dtime);
926
927         void sendAsPacket   (u16 peer_id, u8 channelnum,
928                                                         SharedBuffer<u8> data,bool ack=false);
929
930         void sendAsPacketReliable(BufferedPacket& p, Channel* channel);
931
932         bool packetsQueued();
933
934         Connection*           m_connection;
935         unsigned int          m_max_packet_size;
936         float                 m_timeout;
937         Queue<OutgoingPacket> m_outgoing_queue;
938         JSemaphore            m_send_sleep_semaphore;
939
940         unsigned int          m_iteration_packets_avaialble;
941         unsigned int          m_max_commands_per_iteration;
942         unsigned int          m_max_data_packets_per_iteration;
943         unsigned int          m_max_packets_requeued;
944 };
945
946 class ConnectionReceiveThread : public JThread {
947 public:
948         ConnectionReceiveThread(Connection* parent,
949                                                         unsigned int max_packet_size);
950
951         void * Thread       ();
952
953 private:
954         void receive        ();
955
956         // Returns next data from a buffer if possible
957         // If found, returns true; if not, false.
958         // If found, sets peer_id and dst
959         bool getFromBuffers (u16 &peer_id, SharedBuffer<u8> &dst);
960
961         bool checkIncomingBuffers(Channel *channel, u16 &peer_id,
962                                                         SharedBuffer<u8> &dst);
963
964         /*
965                 Processes a packet with the basic header stripped out.
966                 Parameters:
967                         packetdata: Data in packet (with no base headers)
968                         peer_id: peer id of the sender of the packet in question
969                         channelnum: channel on which the packet was sent
970                         reliable: true if recursing into a reliable packet
971         */
972         SharedBuffer<u8> processPacket(Channel *channel,
973                                                         SharedBuffer<u8> packetdata, u16 peer_id,
974                                                         u8 channelnum, bool reliable);
975
976
977         Connection*           m_connection;
978         unsigned int          m_max_packet_size;
979 };
980
981 class Connection
982 {
983 public:
984         friend class ConnectionSendThread;
985         friend class ConnectionReceiveThread;
986
987         Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6);
988         Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6,
989                         PeerHandler *peerhandler);
990         ~Connection();
991
992         /* Interface */
993         ConnectionEvent getEvent();
994         ConnectionEvent waitEvent(u32 timeout_ms);
995         void putCommand(ConnectionCommand &c);
996         
997         void SetTimeoutMs(int timeout){ m_bc_receive_timeout = timeout; }
998         void Serve(unsigned short port);
999         void Connect(Address address);
1000         bool Connected();
1001         void Disconnect();
1002         u32 Receive(u16 &peer_id, SharedBuffer<u8> &data);
1003         void SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable);
1004         void Send(u16 peer_id, u8 channelnum, SharedBuffer<u8> data, bool reliable);
1005         void RunTimeouts(float dtime); // dummy
1006         u16 GetPeerID(){ return m_peer_id; }
1007         Address GetPeerAddress(u16 peer_id);
1008         float GetPeerAvgRTT(u16 peer_id);
1009         void DeletePeer(u16 peer_id);
1010         const u32 GetProtocolID() const { return m_protocol_id; };
1011         const std::string getDesc();
1012
1013 protected:
1014         PeerHelper getPeer(u16 peer_id);
1015         PeerHelper getPeerNoEx(u16 peer_id);
1016         u16   lookupPeer(Address& sender);
1017
1018         u16 createPeer(Address& sender, MTProtocols protocol, int fd);
1019         UDPPeer*  createServerPeer(Address& sender);
1020         bool deletePeer(u16 peer_id, bool timeout);
1021
1022         void SetPeerID(u16 id){ m_peer_id = id; }
1023
1024         void sendAck(u16 peer_id, u8 channelnum, u16 seqnum);
1025
1026         void PrintInfo(std::ostream &out);
1027         void PrintInfo();
1028
1029         std::list<u16> getPeerIDs();
1030
1031         UDPSocket m_udpSocket;
1032         MutexedQueue<ConnectionCommand> m_command_queue;
1033
1034         void putEvent(ConnectionEvent &e);
1035
1036 private:
1037         std::list<Peer*> getPeers();
1038
1039         MutexedQueue<ConnectionEvent> m_event_queue;
1040
1041         u16 m_peer_id;
1042         u32 m_protocol_id;
1043         
1044         std::map<u16, Peer*> m_peers;
1045         JMutex m_peers_mutex;
1046
1047         ConnectionSendThread m_sendThread;
1048         ConnectionReceiveThread m_receiveThread;
1049
1050         JMutex m_info_mutex;
1051
1052         // Backwards compatibility
1053         PeerHandler *m_bc_peerhandler;
1054         int m_bc_receive_timeout;
1055
1056         bool m_shutting_down;
1057 };
1058
1059 } // namespace
1060
1061 #endif
1062